Deep Face Recognition with Hadoop and Spark

Large scale face recognition is always a challenging task. Nowadays, hadoop based approaches become a de-facto standard for solutions. This comes with the power of map reduce, easiness of scalability and enabling distributed computing on clusters. Such that we can perform sql queries on petabytes of data. In this post, we will build a face recognition pipeline with hadoop and spark pair to be able to query hadoop distributed file system.

Elephant is an iconic mascot of hadoop

Prerequisites

I have been using hadoop and hive for a long time. However, environments are mostly ready to use in the enterprise organizations. That’s why, I had to get hadoop environment up on my Windows machine to write this blog post. In this section, I’m going to explain the steps what to do. If you have a hadoop environment already, you might skip the following section.


🙋‍♂️ You may consider to enroll my top-rated machine learning course on Udemy

Decision Trees for Machine Learning

Firstly, my experiments were run for hadoop 3.2.2. You should downlad the latest version here. You should unzip the downloaded file in your local disk (e.g. under the C:/hadoop, I’m going to call this HADOOP_HOME). Most of apache products are ready to use. You just need to call its sh or bat in bin directory. However, you have to do configurations for hadoop.

Visit HADOOP_HOME/etc/hadoop folder and change the content of the following files.

core-site.xml

You should define the default file system and your temporary directory here.

<configuration>
	<property>
		<name>fs.defaultFS</name>
		<value>hdfs://0.0.0.0:9000</value>
	</property>
	<property>
	   <name>hadoop.tmp.dir</name>
	   <value>C:/hadoop/temp</value>
	</property>
</configuration>

Defining the temporary directory is not a must because if you haven’t define it, then it will use C:/tmp folder but this folder requires administrator permissions to write. If you haven’t run command prompt as administrator, then you might have some privilege related exceptions: “Error while running command to get file permissions : ExitCodeException exitCode=-1073741515“. Here you can either run start-yarn as administrator or define a temporary directory in core-site.xml to avoid this exception.

hdfs-site.xml

We will define the namenode and datanode here. Notice that I created a data folder in the HADOOP_HOME and datanode and namenode pairs are folders here.

<configuration>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
	<property>
		<name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
		<value>org.apache.hadoop.mapred.ShuffleHandler</value>
	</property>
</configuration>

If your hadoop was not stopped correctly, then it might cause corruption and you will have troubles in restarting. You should delete the datanode folder manually to handle this issue. Otherwise, you would have the following exception.

WARN common.Storage: Failed to add storage directory [DISK]file:/C:/hadoop/hadoop-3.2.2/data/datanode
java.io.IOException: Incompatible clusterIDs in C:\hadoop\hadoop-3.2.2\data\datanode: namenode clusterID = CID-9ef17be1-bf4d-4692-954d-ccc157015b2f; datanode clusterID = CID-523cca25-6591-4f08-976e-d6b62421a47d

ERROR datanode.DataNode: Initialization failed for Block pool (Datanode Uuid 9daa992a-f81f-4218-8994-ecdbb0d12b99) service to /0.0.0.0:9000. Exiting. java.io.IOException: All specified directories have failed to load.





mapred-site.xml

Here, we are going to define the map reduce to be handled with yarn.

<configuration>
	<property>
		<name>mapreduce.framework.name</name>
		<value>yarn</value>
	</property>
</configuration>
yarn-site.xml

Here, we are going to define some yarn related configurations.

<configuration>
	<property>
		<name>yarn.nodemanager.aux-services</name>
		<value>mapreduce_shuffle</value>
	</property>
	<property>
		<name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
		<value>org.apache.hadoop.mapred.ShuffleHandler</value>
	</property>
</configuration>
hadoop-env.cmd

You should define the JAVA_HOME location in this file strictly.

set JAVA_HOME=C:\Java\jdk1.8.0_112
Adding environment variables

Once you modified the files mentioned above, the you should edit environment variables for your account. You should add HADOOP_HOME/bin and HODOOP_HOME/sbin folder in the Path variable under user variables. I also added and HADOOP_HOME variable here and referenced bin folder of hadoop.

Winutils

Here, you should copy the windows binaries for hadoop versions. Notice that I installed hadoop 3.2.2. That’s why, I copy the hadoop-3.0.0/bin content to the HADOOP_HOME/bin.

Initalize hadoop namenode

Hadoop configurations are just finished. We should initialize the hadoop namenode now with calling the following command in the command prompt. Notice that hdfs.cmd is in the HADOOP_HOME/bin directory and we already add this folder into the Path.

hdfs namenode -format

You should see the following information log in the command prompt if everything is ok.

INFO common.Storage: Storage directory C:\hadoop\hadoop-3.2.2\data\namenode has been successfully formatted.

Starting the hadoop

We have done the modifications in configuration files, copied windows binaries and finally initialize the name node. Now, we should call start-dfs and start-yarn commands respectively. Notice that those callables are in HADOOP_HOME/sbin and we already added it into the Path.





That’s all! Notice that we mentioned the default file system in core-site.xml and it was hdfs://0.0.0.0:9000. We will access hdfs with this url. You should see the following information log when you called start-dfs to confirm the target url.

INFO namenode.NameNode: Clients should use 0.0.0.0:9000 to access this namenode/service

BTW, I got the error message – ERROR util.SysInfoWindows: ExitCodeException exitCode=-1073741515 – repeatedly when I called start-yarn. You can ignore this message because it will not block you and you can still able to write and read data from hdfs.

Face recognition pipeline

A modern face recognition pipeline consists of 4 common stages: detect, align, represent and verify. Herein, deepface framework for python covers all of those stages.

Face recognition model

Deepface wraps several state-of-the-art face recognition models: VGG-Face, Google FaceNet, OpenFace, Facebook DeepFace, DeepID, Dlib and ArcFace. Those models offer the higher accuracy than human level performance. In this post, we will use FaceNet model to represent facial images as vectors. This model expexts 160×160 shaped inputs whereas it represents facial images as 128 dimensional vector.

#!pip install deepface
from deepface import DeepFace

FaceNet, VGG-Face and ArcFace overperform among others. Here, you can watch how to determine the best model.

Facial database

We will use the unit test items of deepface as facial database. Let’s get the file names in this folder first.

import os
facial_img_paths = []
#Available at: https://github.com/serengil/deepface/tree/master/tests/dataset
for root, directory, files in os.walk("deepface/tests/dataset"):
    for file in files:
        if '.jpg' in file:
            facial_img_paths.append(root+"/"+file)

Firstly, we will apply pre-processing stages: detect and align. Secondly, represent pre-processed facial images as vectors.

import pandas as pd
from tqdm import tqdm
from deepface.commons import functions

instances = []

for i in tqdm(range(0, len(facial_img_paths))):
    facial_img_path = facial_img_paths[i]
    
    #represent
    embedding = DeepFace.represent(img_path = facial_img_path, model_name = "Facenet")[0]["embedding"]
    
    #store
    instance = []
    instance.append(facial_img_path)
    instance.append(embedding)
    instances.append(instance)

df = pd.DataFrame(instances, columns = ["img_name", "embedding"])

Deepface actually wraps several face detectors: opencv, ssd, dlib and mtcnn. Its default is opencv. Here, MTCNN is the most robust one whereas SSD is the fastest. You can see the detection performance of those detectors in the following video.

To have a more information about face detectors, you can watch the following video.





Herein, retinaface is the cutting-edge technology for face detection. It can even detect faces in the crowd. Besides, it finds some facial landmarks including eye coordinates. In this way, its alignment score is high as well.

Spark

We will read data from hdfs with Spark. It has a high level Python API. We also know that Spark is 100 times faster than map reduce.

Notice that scala spark is 10 times faster than pyspark. Luckily, spark related functions are common in both scala spark and pyspark. In other words, you can easily adapt the code written in this post into the scala spark even though this is written in pyspark.

Let’s create a spark session first.

from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("face-recognition").getOrCreate()

We stored the facial database in a pandas data frame in the previous section. Now, we should convert it to spark data frame.

spark_df = sparkSession.createDataFrame(df)
print(spark_df.printSchema())

Notice that unless you called show or collect functions of spark data frame, it won’t do any calculations on cpu. It just stores the data types and procedures to do in the collect function.

Writing to hdfs

Now, it is time to store the spark data frame into the hdfs. Notice that we set the default file system to hdfs://0.0.0.0:9000. That would be the root address. I will store the facial database in the facial_db folder and embeddings file as parquet.

# Write into HDFS
spark_df.write.parquet(path = "hdfs://0.0.0.0:9000/facial_db/embeddings.parquet"
    , mode='append'
    #, partitionBy = []
)

Notice that model is set to append. In this way, many workers can store facial embeddings into embeddings file. Embeddings file is not a single file actually even though it seems. Hdfs is acronym of hadoop distributed file system. I mean that rows will be stored in distributed file systems.

Partition by key will make the calculations faster. For example, if you have the gender feature for each row, you can create partitions based on this key. Then, you can search a woman image faster because engine stored man and woman rows in different files in the background.

You can list the stored file in hdfs with the command hdfs dfs.





Listing files stored in hdfs

Target image

We stored the embeddings of lots of facial images in the hdfs. Let’s search a new facial images which does not appear in hfds. This requires to apply pre-processing and representation stages handled in bulk images before.

target_img_path = "target.png"
target_img = DeepFace.extract_faces(target_img_path)[0]["face"]
target_embedding = DeepFace.represent(img_path = target_img_path, model_name = "Facenet")[0]["embedding"]

That’s the target image I will search in hdfs.

Target image

Reading data from hdfs

Suppose that embeddings parquet file stores millions of facial representations. I will define where to read it in the spark data frame. Notice that this won’t read the parquet file until I call show or collect functions. You should avoid to call show command because it is costly. However, you can call print schema uncostly.

sdf = sparkSession.read.parquet('hdfs://0.0.0.0:9000/facial_db/embeddings.parquet')
print(sdf.printSchema())

Spark data frame stores image name and its representation. Besides, I found the vector representation of the target image in the previous stage. I will put the target embedding as a column for all rows in the spark data frame. In this way, I can find the distance between two vectors easily.

import pyspark.sql.functions as F
sdf = sdf.withColumn('target', F.array([F.lit(i) for i in target_embedding]))

Now, each row stores a source and target embedding. Euclidean distance requires to subtract all dimensions and find the squared values. I will define an user defined function to calculate this logic. Notice that it will not be performed in the client side. They are all handled in distributed systems.

@F.udf("array<double>")
def subtract_dims(xs, ys):
    return [(x - y)*(x - y) for x, y in zip(xs, ys)]

sdf = sdf.withColumn("dim_subtracts", subtract_dims("embedding", "target"))

Once we found the squared values of the subtraction of each dimension, we need the squared root of the sum. Finally, we will not need the both source and target embeddings and subtract values of dimensions anymore.

sdf = sdf.select('*', (sum([F.col('dim_subtracts').getItem(i) for i in range(128)]).alias('distance')))
sdf = sdf.withColumn("distance", F.sqrt("distance"))
sdf = sdf.drop("embedding", "target", "dim_subtracts")

SQL

Spark data frame stores the data structure and procedures to do. We can run some sql queries on it as well.

We know that distance between the face pairs of same person should be low whereas distance should be higher for different persons. The threshold value for FaceNet and Euclidean distance is 10. Here, you can find the threshold values for face recognition models and distance metrics. This should be added in the where clause of the query.

sdf.createOrReplaceTempView("facial_db")
query = sparkSession.sql("SELECT img_name, distance from facial_db where distance < 10 order by distance asc")

In this way, distributed systems will find the distance and ignore the higher ones. That’s the regular usage of map reduce technology. Nothing was handled in the client side.

Collect

We’ve avoided to call show command until now. Now, we can call collect function and start calculations on distributed nodes.





results = query.collect()
print(results)
Results
Lightweight way

If your task does not require high scalability, a lightweight way exists!

Validate results

Deepface offers a custom find function to do same task. We can validate the results because it returns exact same results with hadoop spark pair.

val_dfs = DeepFace.find(img_path = target_img_path, db_path = "deepface/tests/dataset"
, model_name = 'Facenet', model = model, distance_metric = 'euclidean'
, detector_backend = 'opencv')
Find function results

Here, you can watch the following videos to have a more information about the verify and find functions.

Looking for an identity in a dataset. In other words, applyting face verification several times in the background.

Other no sql solutions

Hadoop might be the best solution for really large data sets. Herein, mongoDB, Cassandra or Redis come with the power of no sql databases as well. Especially, cassandra and redis are key value stores and they offer high performance if you need face verification instead of face recognition.

Approximate nearest neighbor

Notice that face recognition has a O(nxd) time complexity. Here, n is the size of your database whereas d is the number of dimensions in the vector embeddings. In a large scale data set, it is expected the n is much more greater than d. That’s why, we can ignore the value of d and define the complexity O(n) in big O notation.

Hadoop and spark pair can find the same identities even in billions if there are lots of clusters. However, we can do it just in seconds with a limited hardware as well. Some approximate nearest neighbor libraries such as annoy, faiss, nmslib or elasticsearch reduce the time complexity dramatically.

Tech Stack Recommendations

Face recognition is mainly based on representing facial images as vectors. Herein, storing the vector representations is a key factor for building robust facial recognition systems. I summarize the tech stack recommendations in the following video.

Conclusion

So, we mentioned how to use hadoop and spark pair for a face recognition task. This approach is as strong as the distributed nodes you have. No matter how large a database you have. You might have petabyes of data but you can still perform sql like queries with Spark just in seconds.

I pushed the source code of this study into GitHub. You can support this study if you star the repo.






Like this blog? Support me on Patreon

Buy me a coffee