Handling Hang in Python Multiprocessing

Parallelism is a must for a production-driven application. Even though python programming language is pretty, its parallelism module might be problematic. I had often hang and deadlock problems when I use its multiprocessing module. This might be the worst case for a production application because if you had any errors or exceptions then you can resubmit your job. On the other hand, if your program hangs and you have a deadlock, your program will wait forever. It pretends to work but there is not cpu consumption in the terminal. In this post, I’m going to share you how to handle this hang and deadlock problem.

6-armed Spider-Man

Approach causing deadlock

I was using multiprocessing.pool module and call its starmap method. Starmap lets you to pass multiple items whereas regular map does not. We have a generic function – my_function. It expects two integers as inputs. We are going to pass successive numbers in a for loop. This usage sometimes causes getting your program stuck in my case. I observed that hangout problem appears when you pass large scale inputs to the function.


Visit Deep Learning Enabled Art Exhibition: Digital Van Gogh




from multiprocessing import Pool

def my_function(a, b):
  return a + b

with Pool(POOL_SIZE) as pool:
  results = pool.starmap(my_function, [(i, i+1) for i in range(0, 100)])
  pool.close()
  pool.join()

I mentioned this approach in this post already: Tips and Tricks for GPU and Multiprocessing in TensorFlow

Workaround

I used apply_async method of multiprocessing.pool module to handle the deadlock problem. I stored the response of the apply_async method in a function list. It’s not going to process the items in the queue until get method is called. Luckily, get method comes with a timeout input argument. If the child process hangs, then the parent process terminate it because of the timeout.

import multiprocessing
import multiprocessing.pool
import time

def my_function(a, b):
  #time.sleep(150)
  return a + b

POOL_SIZE = 10

with closing(multiprocessing.Pool(POOL_SIZE)) as pool:
  funclist = []

  for i in range(0, 100):
    f = pool.apply_async(my_function, [i, i+1])
    funclist.append(f)

  #-----------------------
  results = []
  for f in funclist:
    result = f.get(timeout = 120)
    results.append(result)

  #-----------------------

  pool.close()
  pool.terminate()

You can add time.sleep in my_function to see timeout is working. The parent process expects results in 120 seconds. If the processing time reached, then you will have an exception. That’s a good thing because I can re-run the program if I know it is finished with an error.

Making parallel the child processes

This approach does not let you to make parallel your child processes if your parent process is paralleled. In this case, you should create a no daemon process functions as shown below and call MyPool instead of multiprocessing.Pool. This post might attract you if you have a similar task: Designing Recursive Functions with Python Multiprocessing

class NoDaemonProcess(multiprocessing.Process):
	# make 'daemon' attribute always return False
	def _get_daemon(self):
		return False
	def _set_daemon(self, value):
		pass
	daemon = property(_get_daemon, _set_daemon)

class NoDaemonContext(type(multiprocessing.get_context())):
	Process = NoDaemonProcess

class MyPool(multiprocessing.pool.Pool):
	def __init__(self, *args, **kwargs):
		kwargs['context'] = NoDaemonContext()
		super(MyPool, self).__init__(*args, **kwargs)

#----------------------------

with closing(MyPool(POOL_SIZE)) as pool:
  funclist = []

  for i in range(0, 100):
    f = pool.apply_async(my_function, [i, i+1])
    funclist.append(f)

  #-----------------------
  results = []
  for f in funclist:
    result = f.get(timeout = 120)
    results.append(result)

  #-----------------------

  pool.close()
  pool.terminate()

Conclusion

So, we have mentioned how to handle the hang and freeze problem in python multiprocessing. It seems apply_asyns works well instead of map or starmap. This approach saved my life and I hope it will work for your case! Comment this post if it helps you.

BTW, you can follow the chefboost decision tree framework to understand how this approach is applied in a large scale framework: Training module, Random Forest module.


Like this blog? Support me on Patreon

Buy me a coffee


1 Comment

Comments are closed.