Search on blog:

Python: How to reuse Pool in multiprocessing and append new processes to running Pool?

In documentation for multiprocessing you can see example with Pool similar to

    from multiprocessing import Pool

    def function(x):
        return x*x

    if __name__ == '__main__':

        with Pool(5) as pool: 
            print(pool.map(function, [1, 2, 3]))
            print(pool.map(function, [3, 4, 5]))
            print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument

but it can runs pool only in current place/function, it has to wait for results and it destroys pool so you can't reuse it later - you would have to run all code inside with Pool as pool.

You can create pool in different way

    import multiprocessing

    def function(x):
        return x*x

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        print(pool.map(function, [1, 2, 3]))
        print(pool.map(function, [3, 4, 5]))
        print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument

        pool.close()
        pool.join()

and then you can reuse the same pool in different functions

    import multiprocessing

    def function(x):
        return x*x

    def use1(pool):
        print(pool.map(function, [1, 2, 3]))

    def use2(pool):
        print(pool.map(function, [3, 4, 5]))

    def use3(pool):
        print(pool.apply(function, [9,])) # it has to be list/tuple even for single argument

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        # ... code ...

        use1(pool)

        # ... code ...

        use2(pool)

        # ... code ...

        use3(pool)

        # ... code ...

        pool.close()
        pool.join()

But map and apply wait for results so you have to wait before you can append new processes.

Using map_async and apply_async it doesn't wait for results so you can add new processes when other processes still use pool. And pool will manage all processes together with one limit of active processes.

Because it doesn't wait for results so it gives you special object with access to results when they will be ready. You can use .get() to wait and get result, or .wait() only to wait.

    import multiprocessing

    def function(x):
        return x*x

    def use1(pool):
        result_access = pool.map_async(function, [1, 2, 3])
        return result_access

    def use2(pool):
        result_access = pool.map_async(function, [3, 4, 5])
        return result_access 

    def use3(pool):
        result_access = pool.apply_async(function, [9,]) # it has to be list/tuple even for single argument
        return result_access 

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        # ... code ...

        result_access1 = use1(pool)

        # ... code ...

        result_access2 = use2(pool)

        # ... code ...

        result_access3 = use3(pool)

        # ... code ...

        # later get results

        print(result_access1.get())
        print(result_access2.get())
        print(result_access3.get())

        pool.close()
        pool.join()

Instead of waiting for result you can use callback= to assign function which has to be executed when results will be ready. Function will get result as argument and it can (for example) display results.

    import multiprocessing

    def function(x):
        return x*x

    def use1(pool):
        pool.map_async(function, [1, 2, 3], callback=display)

    def use2(pool):
        pool.map_async(function, [3, 4, 5], callback=display)

    def use3(pool):
        pool.apply_async(function, [9,], callback=display) # it has to be list/tuple even for single argument

    def display(result):
        print(result)

    if __name__ == '__main__':

        pool = multiprocessing.Pool(5)

        # ... code ...

        use1(pool)

        # ... code ...

        use2(pool)

        # ... code ...

        use3(pool)

        # ... code ...

        pool.close()
        pool.join()

Using many apply_sync you may get results in different order. Using single map_async with list of arguments you get all results with the same order as arguments on list.

Python: Jaj użyć wielokrotnie Pool w multiprocessing i dodawać nowe processy do działającego już Pool?

W dokumentacji dla multiprocessing można zobaczyć przykład z Pool podobny do tego

    from multiprocessing import Pool

    def function(x):
        return x*x

    if __name__ == '__main__':

        with Pool(5) as pool: 
            print(pool.map(function, [1, 2, 3]))
            print(pool.map(function, [3, 4, 5]))
            print(pool.apply(function, [9,])) # it …

« Page: 1 / 1 »