Python: Jaj użyć wielokrotnie Pool w multiprocessing i dodawać nowe processy do działającego już Pool?
W dokumentacji dla multiprocessing można zobaczyć przykład z Pool
podobny do tego
from multiprocessing import Pool
def function(x):
return x*x
if __name__ == '__main__':
with Pool(5) as pool:
print(pool.map(function, [1, 2, 3]))
print(pool.map(function, [3, 4, 5]))
print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument
ale ten przykład może używać pool tylko w danym miejscu (w danej funkcji), musi on czekać na wyniki i niszczy on pool więc nie można użyć go ponownie - chyba że cały kod będzie umiesczony w with Pool as pool
.
Ale można utworzyć pool
w inny sposób
import multiprocessing
def function(x):
return x*x
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
print(pool.map(function, [1, 2, 3]))
print(pool.map(function, [3, 4, 5]))
print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument
pool.close()
pool.join()
i wtedy można użyć ponownie pool
w innych funkcjach
import multiprocessing
def function(x):
return x*x
def use1(pool):
print(pool.map(function, [1, 2, 3]))
def use2(pool):
print(pool.map(function, [3, 4, 5]))
def use3(pool):
print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
# ... code ...
use1(pool)
# ... code ...
use2(pool)
# ... code ...
use3(pool)
# ... code ...
pool.close()
pool.join()
Ale map
i apply
czekają na wyniki z funkcji więc trzeba czekać zanim się doda nowe procesy.
Uzywając map_async
i apply_async
program nie czeka na wyniki z funkcji więc można dodawać nowe processy gdy wcześniejsze procesy wciąż używają pool.
I pool będzie zarządzał wszystkimi procesami razem z jednym limitem ilości aktywnych procesów.
Ponieważ ta wersja nie czeka na wyniki więc funkcje dają specjalny obiekt z dostępem do wyników gdy będą one gotowe.
Można użyć .get()
aby zaczekać na wyniki i je pobrać, lub .wait()
aby tylko na nie poczekać.
import multiprocessing
def function(x):
return x*x
def use1(pool):
result_access = pool.map_async(function, [1, 2, 3])
return result_access
def use2(pool):
result_access = pool.map_async(function, [3, 4, 5])
return result_access
def use3(pool):
result_access = pool.apply_async(function, [9,]) # it has to be list/tuple even for single argument
return result_access
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
# ... code ...
result_access1 = use1(pool)
# ... code ...
result_access2 = use2(pool)
# ... code ...
result_access3 = use3(pool)
# ... code ...
# later get results
print(result_access1.get())
print(result_access2.get())
print(result_access3.get())
pool.close()
pool.join()
Zamiast czekać na wyniki można użyć opcji callback=
do przypisania funkcji, która zostanie wykonana gdy wyniki już będą gotowe.
Funkcja ta dostanie wyniki jako argument i może ona (dla przykładu) wyświetlić wyniki.
import multiprocessing
def function(x):
return x*x
def use1(pool):
pool.map_async(function, [1, 2, 3], callback=display)
def use2(pool):
pool.map_async(function, [3, 4, 5], callback=display)
def use3(pool):
pool.apply_async(function, [9,], callback=display) # it has to be list/tuple even for single argument
def display(result):
print(result)
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
# ... code ...
use1(pool)
# ... code ...
use2(pool)
# ... code ...
use3(pool)
# ... code ...
pool.close()
pool.join()
Używając wiele apply_sync
można otrzymać wyniki w odmiennej kolejności.
Uzywając pojedynczy map_async
z listą argumentów otrzymuje się wyniki w takiej kolejności jka były argumenty na liście.
