Parallelism is a must for a production-driven application. Even though python programming language is pretty, its parallelism module might be problematic. I had often hang and deadlock problems when I use its multiprocessing module. This might be the worst case for a production application because if you had any errors or exceptions then you can resubmit your job. On the other hand, if your program hangs and you have a deadlock, your program will wait forever. It pretends to work but there is not cpu consumption in the terminal. In this post, I’m going to share you how to handle this hang and deadlock problem.
Approach causing deadlock
I was using multiprocessing.pool module and call its starmap method. Starmap lets you to pass multiple items whereas regular map does not. We have a generic function – my_function. It expects two integers as inputs. We are going to pass successive numbers in a for loop. This usage sometimes causes getting your program stuck in my case. I observed that hangout problem appears when you pass large scale inputs to the function.
from multiprocessing import Pool def my_function(a, b): return a + b with Pool(POOL_SIZE) as pool: results = pool.starmap(my_function, [(i, i+1) for i in range(0, 100)]) pool.close() pool.join()
I mentioned this approach in this post already: Tips and Tricks for GPU and Multiprocessing in TensorFlow
Workaround
I used apply_async method of multiprocessing.pool module to handle the deadlock problem. I stored the response of the apply_async method in a function list. It’s not going to process the items in the queue until get method is called. Luckily, get method comes with a timeout input argument. If the child process hangs, then the parent process terminate it because of the timeout.
import multiprocessing import multiprocessing.pool import time def my_function(a, b): #time.sleep(150) return a + b POOL_SIZE = 10 with closing(multiprocessing.Pool(POOL_SIZE)) as pool: funclist = [] for i in range(0, 100): f = pool.apply_async(my_function, [i, i+1]) funclist.append(f) #----------------------- results = [] for f in funclist: result = f.get(timeout = 120) results.append(result) #----------------------- pool.close() pool.terminate()
You can add time.sleep in my_function to see timeout is working. The parent process expects results in 120 seconds. If the processing time reached, then you will have an exception. That’s a good thing because I can re-run the program if I know it is finished with an error.
Making parallel the child processes
This approach does not let you to make parallel your child processes if your parent process is paralleled. In this case, you should create a no daemon process functions as shown below and call MyPool instead of multiprocessing.Pool. This post might attract you if you have a similar task: Designing Recursive Functions with Python Multiprocessing
class NoDaemonProcess(multiprocessing.Process): # make 'daemon' attribute always return False def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) class NoDaemonContext(type(multiprocessing.get_context())): Process = NoDaemonProcess class MyPool(multiprocessing.pool.Pool): def __init__(self, *args, **kwargs): kwargs['context'] = NoDaemonContext() super(MyPool, self).__init__(*args, **kwargs) #---------------------------- with closing(MyPool(POOL_SIZE)) as pool: funclist = [] for i in range(0, 100): f = pool.apply_async(my_function, [i, i+1]) funclist.append(f) #----------------------- results = [] for f in funclist: result = f.get(timeout = 120) results.append(result) #----------------------- pool.close() pool.terminate()
Conclusion
So, we have mentioned how to handle the hang and freeze problem in python multiprocessing. It seems apply_asyns works well instead of map or starmap. This approach saved my life and I hope it will work for your case! Comment this post if it helps you.
BTW, you can follow the chefboost decision tree framework to understand how this approach is applied in a large scale framework: Training module, Random Forest module.
Support this blog if you do like!
This didn’t work for me.
Turns out this was an issue with the MKL library: https://github.com/joblib/joblib/issues/138
Upgrading to the latest MKL version solved my problem!