Designing Recursive Functions with Python Multiprocessing

Multiprocessing is a must to develop high scalable products. However, python multiprocessing module is mostly problematic when it is compared to message queue mechanisms. In this post, I will share my experiments to use python multiprocessing module for recursive functions. Troubles I had and approaches I applied to handle.

Golden ratio
Multiprocessing with high level APIs

Here, you can find one of my previous posts mentioning multiprocessing. I focused on calling high level APIs such as Keras in there. Those APIs applies greedy approach and consume all system sources it can take. That’s why, we need to limit the memory and cpu/gpu cores in sub programs. In this post, I will mostly mention developing some programs from scratch. It requires to use low-level dependencies such as numpy or pandas.


Visit Deep Learning Enabled Art Exhibition: Digital Van Gogh




Decision trees are recursive

As an use case, we will talk about decision trees. Decision trees are recursive algorithms. You start from its root node and find the feature which maximizes the information gain. Then, you expand tree with the classes of the winning feature. For example, if the feature maximizing the information gain is Outlook, then you should create leafs with the classes of Outlook feature. Those are Sunny, Overcast and Rain. We then filter the data set for each leaf. I mean that 1st leaf stores just sunny outlook instances, 2nd leaf stores just Overcast outlook instances, and finally 3rd leaf stores just rain outlook instances. We then need to find the feature which maximizing the information gain again. In other words, we will call decision tree algorithm for this sub data set again and again. That makes decision tree algorithm recursive.

They are convenient for parallelism

Notice that leafs of a single decision tree are independent. They store totally different sub data sets and we will run same algorithm for those different data set. That’s why, we can run leaf building procedure in parallel.

Besides, random forest requires to separate the data set into several sub data sets. Then, it will build different decision trees. We can run random forest in parallel. Its decision tree building procedure could be run in parallel as well.

Architecture

I’m going to call build decision tree function first. This function will find the winning feature. It will then call create branch function for each class of the winning feature. Finally, create branch function will call build decision tree function. This procedure will be applied again and again until reaching a decision. Here, input parameters are list of tuple. That’s why, I called starmap instead of map.

import multiprocessing
from multiprocessing import Pool

def buildDecisionTree():
	#...
	num_cores = multiprocessing.cpu_count()
	pool = Pool(num_cores)
	pool.starmap(createBranch, input_params)

def createBranch():
	#...
	buildDecisionTree()

The length of the input_params and num_cores should be same. That’s not a must but I had hang troubles a lot if the length of the input params is greater than parallel threads. I recommend you to call map / startmap in a for loop and always set the length of the input params in an iteration to the number of cores. I mentioned this deeply in this post: Tips and Tricks for Multiprocessing.

However, this approach causes the trouble in my case: daemonic processes are not allowed to have children. That is because of the recursive function.

Managing daemon processes

I found the following workaround to solve this. I created my own pool and set daemon process here manually.

import multiprocessing
from multiprocessing import Pool

def buildDecisionTree():
	#...
	num_cores = multiprocessing.cpu_count()
	pool = MyPool(num_cores)
	results = pool.starmap(createBranch, input_params)
	pool.close()
	pool.join()

def createBranch():
	#...
	buildDecisionTree()

class NoDaemonProcess(multiprocessing.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess
Bottleneck

Overcoming the daemon related error causes bottleneck in this time. Running the same program in parallel lasts much longer than running it serial. That does not make any sense. I could understand its reason when I monitor the task manager. It creates hundreds of processes even though I have 4 cpu cores.





Bottleneck

I used to work with message queues (MQ). You put several messages into a queue. Your task will be parallelized with the number of mq listeners working. Increasing the queue depth would not cause bottleneck. Here, python multiprocessing module creates hundreds of active processes and they are all trying to run meanwhile. That causes bottleneck.

Limiting the active processes

To avoid bottleneck, I store the process id in the main program once. Then, I pass it to the all sub programs. Notice that I create parallel threads in buildDecisionTree function. Here, I check the number of child processes of the main program and the number of processes that will be created. If it is less than the total number of my cpu cores, I will run it in parallel. Otherwise, I run it in serial. In this way, I can limit the number of active processes.

import os
import psutil
import multiprocessing
from multiprocessing import Pool

def main():
	#...
	process_id = os.getpid()
	buildDecisionTree(process_id)	

def buildDecisionTree(main_process_id, ...):
	#...
	
	num_cores = multiprocessing.cpu_count()
	
	#find winnig feature
	#get winning features's classes
	for i in classes:
		input_params.append((main_process_id, ...))
	
	main_process = psutil.Process(main_process_id)
	children = main_process.children(recursive=True)
	active_processes = len(children) + 1 #plus parent
	new_processes = len(classes) #length of classes processes will be created
	
	#parallel
	if num_cores >= active_processes + new_processes:
		pool = MyPool(num_cores)
		results = pool.starmap(createBranch, input_params)
		pool.close()
		pool.join()
	else: #serial
		for input_param in input_params:
			createBranch(input_params)

def createBranch(main_process_id, ...):
	#...
	buildDecisionTree(main_process_id, ...)

class NoDaemonProcess(multiprocessing.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

I applied this approach in the single decision tree building stage. Gradient boosting and random forest algorithms build single decision trees sequentially. That’s why, this logic will be inherited for those algorithms as well. However, random forest algorithm builds several decision trees separately. That’s why, I should parallelize it. Here, you can find the random forest implementation. I will build several trees in parallel and each tree will create its branches and leafs in parallel as well.

Code repository

So, we have mentioned python multiprocessing module for a recursive function. Troubles I had when I applied regular approach and solutions I found to handle common issues.

I shared the code snippets for multiprocessing in recursive functions. You can find the whole code repo here. My main program is chefboost/Chefboost.py. I store the main process id here. Then, call buildDecisionTree and createBranch functions under training.Training.py again and again.


Like this blog? Support me on Patreon

Buy me a coffee