- Kje,
Bonjour tout le monde,
une fois n'est pas coutume, je viens ici poser une question technique. En réalité ce n'est ni urgent ni critique car j'ai déjà une solution suffisament performante pour mes besoins court et moyen terme mais je m'interoge vraiment si on ne peut pas faire mieux.
Mon but est de calculer quelques valeurs dont voici la formulation mathématique (tiré d'un article) :
Sachant que
Bien donc on voit en réalité qu'il y a 3 dimensions dans cette histoire : N, K et D car ce qui m'interesse c'est de calculé ça pour tous les K.
En terme matriciel, avec des boucles, ça peut donner ça :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import numpy as np def calc_1(gamma, x): # gamma.shape == (N,K) # x.shape == (N, D) N, K = gamma.shape _, D = x.shape S0 = np.zeros((K,)) S1 = np.zeros((K, D)) S2 = np.zeros((K, D)) for n in range(N): for k in range(K): gnk = gamma[n, k] S0[k] += gnk # gamma[n, k] for d in range(D): tsn = gnk * x[n, d] S1[k, d] += tsn # gamma[n, k] * x[n, d] S2[k, d] += tsn * x[n, d] # gamma[n, k] * x[n, d] * x[n, d] return S0, S1, S2 |
Cette version est évidement ultra lente. J'ai donc fais une version purrement vectoriel en utilisant le broadcasting pour éviter les boucles. Ça donne cela :
1 2 3 4 5 6 7 8 9 10 | def calc_2(gamma, x): # gamma.shape == (N,K) # x.shape == (N, D) S0 = np.sum(gamma, axis=0) ts = gamma[:, :, np.newaxis] * x[:, np.newaxis, :] S1 = np.sum(ts, axis=0) S2 = np.sum(ts * x[:, np.newaxis, :], axis=0) return S0, S1, S2 |
Là, en théorie, il y a pas grand chose qui coince… sauf que je vie dans le monde réel. Ce que je n'ai pas encore détaillé c'est les valeurs des dimensions :
Les plus observateurs ont peut être trouvé ce qui me dérange avec calc_2
: la variable intermédiaire ts
qui est de dimension float
sur 64 bits et mes valeurs typique cela nous fais un petit tableau de
Donc une solution intermédiaire et facile est de mixer les deux :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | def calc_3(gamma, x): # gamma.shape == (N,K) # x.shape == (N, D) N, K = gamma.shape _, D = x.shape S0 = np.sum(gamma, axis=0) S1 = np.zeros((K, D)) S2 = np.zeros((K, D)) for n in range(N): tsn = gamma[n, :, np.newaxis] * x[n, np.newaxis, :] S1 += tsn S2 += tsn * x[n, np.newaxis, :] return S0, S1, S2 |
On en arrive donc a ma question. Je me demande si il n'y aurait pas une fonction numpy qui ferait tout seul ce genre de calcul, simple, pour m'éviter de déplier une boucle sans exploser ma mémoire. Si vous avez une fonction magique, n'hésitez pas !
Pour vous donner une idée, voici des temps de traitements de ces fonctions (avec toutes les dimensions divisés par 4 pour garder le même ordre de grandeur) :
Fonction | temps moyen (sec) |
---|---|
calc_1 |
|
calc_2 |
|
calc_3 |
On se rend donc compte que la version sans boucle n'est pas la plus rapide pour autant. La création et l'accès mémoire doit y être pour beaucoup.
Et le code complet de test, avec les fonctions si vous voulez essayer chez vous :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | import numpy as np def calc_1(gamma, x): # gamma.shape == (N,K) # x.shape == (N, D) N, K = gamma.shape _, D = x.shape S0 = np.zeros((K,)) S1 = np.zeros((K, D)) S2 = np.zeros((K, D)) for n in range(N): for k in range(K): gnk = gamma[n, k] S0[k] += gnk # gamma[n, k] for d in range(D): tsn = gnk * x[n, d] S1[k, d] += tsn # gamma[n, k] * x[n, d] S2[k, d] += tsn * x[n, d] # gamma[n, k] * x[n, d] * x[n, d] return S0, S1, S2 def calc_2(gamma, x): # gamma.shape == (N,K) # x.shape == (N, D) S0 = np.sum(gamma, axis=0) ts = gamma[:, :, np.newaxis] * x[:, np.newaxis, :] S1 = np.sum(ts, axis=0) S2 = np.sum(ts * x[:, np.newaxis, :], axis=0) return S0, S1, S2 def calc_3(gamma, x): # gamma.shape == (N,K) # x.shape == (N, D) N, K = gamma.shape _, D = x.shape S0 = np.sum(gamma, axis=0) S1 = np.zeros((K, D)) S2 = np.zeros((K, D)) for n in range(N): tsn = gamma[n, :, np.newaxis] * x[n, np.newaxis, :] S1 += tsn S2 += tsn * x[n, np.newaxis, :] return S0, S1, S2 if __name__ == "__main__": import timeit # calc_1 n'est calculé que sur 3 itération parce que beaucoup trop lente ! for i, n in enumerate((3, 100, 100), start=1): setup = ";".join(("from __main__ import calc_{}".format(i), "import numpy as np", "np.random.seed(42)", "K = 256", "D = 24", "N = 5000", "gamma = np.random.rand(N, K)", "x = np.random.rand(N, D)")) print(i, ": ", timeit.timeit("calc_{}(gamma, x)".format(i), setup=setup, number=n)) |