Search on blog:

Python: Jaj użyć wielokrotnie Pool w multiprocessing i dodawać nowe processy do działającego już Pool?

W dokumentacji dla multiprocessing można zobaczyć przykład z Pool podobny do tego

    from multiprocessing import Pool

    def function(x):
        return x*x

    if __name__ == '__main__':

        with Pool(5) as pool: 
            print(pool.map(function, [1, 2, 3]))
            print(pool.map(function, [3, 4, 5]))
            print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument

ale ten przykład może używać pool tylko w danym miejscu (w danej funkcji), musi on czekać na wyniki i niszczy on pool więc nie można użyć go ponownie - chyba że cały kod będzie umiesczony w with Pool as pool.

Ale można utworzyć pool w inny sposób

    import multiprocessing

    def function(x):
        return x*x

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        print(pool.map(function, [1, 2, 3]))
        print(pool.map(function, [3, 4, 5]))
        print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument

        pool.close()
        pool.join()

i wtedy można użyć ponownie pool w innych funkcjach

    import multiprocessing

    def function(x):
        return x*x

    def use1(pool):
        print(pool.map(function, [1, 2, 3]))

    def use2(pool):
        print(pool.map(function, [3, 4, 5]))

    def use3(pool):
        print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        # ... code ...

        use1(pool)

        # ... code ...

        use2(pool)

        # ... code ...

        use3(pool)

        # ... code ...

        pool.close()
        pool.join()

Ale map i apply czekają na wyniki z funkcji więc trzeba czekać zanim się doda nowe procesy.

Uzywając map_async i apply_async program nie czeka na wyniki z funkcji więc można dodawać nowe processy gdy wcześniejsze procesy wciąż używają pool. I pool będzie zarządzał wszystkimi procesami razem z jednym limitem ilości aktywnych procesów.

Ponieważ ta wersja nie czeka na wyniki więc funkcje dają specjalny obiekt z dostępem do wyników gdy będą one gotowe. Można użyć .get() aby zaczekać na wyniki i je pobrać, lub .wait() aby tylko na nie poczekać.

    import multiprocessing

    def function(x):
        return x*x

    def use1(pool):
        result_access = pool.map_async(function, [1, 2, 3])
        return result_access

    def use2(pool):
        result_access = pool.map_async(function, [3, 4, 5])
        return result_access 

    def use3(pool):
        result_access = pool.apply_async(function, [9,]) # it has to be list/tuple even for single argument
        return result_access 

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        # ... code ...

        result_access1 = use1(pool)

        # ... code ...

        result_access2 = use2(pool)

        # ... code ...

        result_access3 = use3(pool)

        # ... code ...

        # later get results

        print(result_access1.get())
        print(result_access2.get())
        print(result_access3.get())

        pool.close()
        pool.join()

Zamiast czekać na wyniki można użyć opcji callback= do przypisania funkcji, która zostanie wykonana gdy wyniki już będą gotowe. Funkcja ta dostanie wyniki jako argument i może ona (dla przykładu) wyświetlić wyniki.

    import multiprocessing

    def function(x):
        return x*x

    def use1(pool):
        pool.map_async(function, [1, 2, 3], callback=display)

    def use2(pool):
        pool.map_async(function, [3, 4, 5], callback=display)

    def use3(pool):
        pool.apply_async(function, [9,], callback=display) # it has to be list/tuple even for single argument

    def display(result):
        print(result)

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        # ... code ...

        use1(pool)

        # ... code ...

        use2(pool)

        # ... code ...

        use3(pool)

        # ... code ...

        pool.close()
        pool.join()

Używając wiele apply_sync można otrzymać wyniki w odmiennej kolejności. Uzywając pojedynczy map_async z listą argumentów otrzymuje się wyniki w takiej kolejności jka były argumenty na liście.

If you like it
Buy a Coffee