In modern software architectures, event-driven systems are becoming increasingly popular due to their ability to handle high volumes of asynchronous data. Kafka is a key tool in this approach, allowing different parts of an application to communicate by producing and consuming messages from topics. However, consuming messages from Kafka can often involve long-running processes, which can be challenging when trying to test, scale or manage them. In this blog post, we will explore how to convert long-running Kafka consumers into Flask web services. Flask is a lightweight Python web framework that makes it easy to build scalable and testable APIs. By turning Kafka consumers into Flask web services, we can take advantage of features like parallel processing, better scalability, and easier testing. Whether you’re building a new system or looking to improve an existing one, this approach can help you create more efficient and manageable event-driven applications.
Traditional Way
To get started with Kafka in Python, the first step is to install the kafka-python package, which provides a simple interface to communicate with Kafka.
You can install the package using pip: $pip install kafka-python
Once the package is installed, we can set up a Kafka consumer that listens to messages from a Kafka topic. Here’s an example of how you can do that:
import json from kafka import KafkaConsumer from datetime import datetime consumer = KafkaConsumer( bootstrap_servers=["localhost:9092"], group_id='my-group-id', value_deserializer=lambda x: json.loads(x.decode('utf-8')) ) consumer.subscribe(pattern="MY-KAFKA-TOPIC") for event in consumer: topic = event.topic event_time = datetime.fromtimestamp(event.timestamp / 1000) key = event.key.decode("utf-8") value = event.value print(f"a new event created in {topic=} at {event_time=} with {key=} and {value=}")
In this example, the KafkaConsumer subscribes to a Kafka topic MY-KAFKA-TOPIC. It uses the bootstrap_servers parameter to connect to the Kafka cluster, and the group_id helps group multiple consumers for coordinated message consumption.
The for loop runs indefinitely, continuously checking for new messages in the Kafka topic. Once a message is published to the topic, the consumer will pick it up and print the message details, including the topic name, event timestamp, key, and value.
This approach works well for event-driven systems, but it has some limitations, especially when it comes to scaling and managing long-running processes. In the next section, we will look at how we can convert this traditional Kafka consumer into a Flask web service for better scalability and flexibility.
Web Service
To convert our Kafka consumer into a web service, we will need the Flask framework along with the flask-kafka package, which simplifies integrating Kafka with Flask.
First, let’s install the necessary packages:
$ pip install Flask flask-kafka
We will set up a bus to communicate with Kafka. This bus will be responsible for consuming messages from Kafka and integrating with our Flask application. The following code snippet, placed in src/receiver/bus.py, shows how to set up the Kafka bus:
import signal from threading import Event from flask_kafka import FlaskKafka # Event to handle server interruptions gracefully INTERRUPT_EVENT = Event() # Setting up the Kafka bus bus = FlaskKafka( INTERRUPT_EVENT, bootstrap_servers=["localhost:9092"], group_id="my-group-id", security_protocol="SASL_SSL", ssl_check_hostname=False, ssl_cafile="/home/sefik/certs/kafka.pem", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username=SASL_USERNAME, sasl_plain_password=SASL_PASSWORD, api_version=(0, 10, 2), ) # Function to handle server interruptions and cleanly shut down the Kafka consumer def listen_kill_server(bus_obj): signal.signal(signal.SIGTERM, bus_obj.interrupted_process) signal.signal(signal.SIGINT, bus_obj.interrupted_process) signal.signal(signal.SIGQUIT, bus_obj.interrupted_process) signal.signal(signal.SIGHUP, bus_obj.interrupted_process)
Next, we will create the Flask routes in src/receiver/routes.py to handle incoming HTTP requests and Kafka messages. The following snippet shows how to set up a route for receiving messages from Kafka and a POST endpoint for testing:
# built-in dependencies import json # 3rd party dependencies from flask import Blueprint # project dependencies from receiver.bus import bus blueprint = Blueprint("blueprint_receiver", __name__) # Kafka consumer event handler @bus.handle("MY-KAFKA-TOPIC") @blueprint.route("/receive/message", methods=["POST"]) def receive_message(request): event = json.loads(request.value) print(f"Received event: {event}")
In this example, when a message is sent to the MY-KAFKA-TOPIC Kafka topic, or when the /receive/message endpoint is called via HTTP POST (for testing purposes), the receive_message function will be triggered. It will print the received event to the console.
We will create the main Flask application in src/app.py, where we configure the application, register the routes, and integrate the Kafka consumer to process messages.
# 3rd party dependencies from flask import Flask from flask_cors import CORS # project dependencies from routes import blueprint from receiver.bus import bus, listen_kill_server def create_app(): app = Flask(__name__) CORS(app) # Enable cross-origin resource sharing app.register_blueprint(blueprint) bus.run() listen_kill_server(bus) print(f"Welcome to my web service!") return app
With this setup, when you run the Flask application, the Kafka consumer will run in parallel, processing messages from Kafka, and the application will gracefully handle server shutdowns.
For testing purposes, we can create an src/api.py file to run the application locally:
import app if __name__ == "__main__": my_app = app.create_app() my_app.run(host="0.0.0.0", port=5000)
Running python src/api.py will start the Flask application on localhost:5000, but note that this is only for testing. It will run with a single worker and is not recommended for production environments.
For production, we need to run the application with Gunicorn, a production-ready WSGI server that can handle multiple worker processes. To do this, we can install Gunicorn:
$ pip install gunicorn
Then, we can run the following command to start the Flask application with 8 worker processes:
gunicorn --workers=8 --timeout=3600 --bind=0.0.0.0:5000 "app:create_app()"
This will run the Flask web service with 8 workers, allowing it to handle multiple Kafka message events concurrently and scale efficiently under heavy loads.
Conclusion
In this post, we demonstrated how to convert a long-running Kafka consumer into a scalable Flask web service. By integrating Flask with Kafka, we were able to process messages asynchronously, making the system more flexible and easier to maintain. This approach ensures that even if the application temporarily goes down, it can continue processing messages from where it left off when it restarts, preventing message loss.
A key advantage of using Flask with Kafka is the ability to scale the application. By adjusting the number of workers in the Gunicorn configuration, you can increase the number of parallel processes that consume messages from Kafka. For example, setting the number of workers to 8 will allow the application to handle 8 concurrent Kafka messages, improving performance and scalability under heavy loads. This configuration change can be easily made, allowing you to fine-tune your application’s capacity based on traffic and resource availability.
In conclusion, integrating Flask with Kafka allows you to build robust, scalable, and fault-tolerant event-driven systems that can grow with your application’s needs.
Support this blog if you do like!