Converting Long Running Kafka Consumers to Web Services in Python

In modern software architectures, event-driven systems are becoming increasingly popular due to their ability to handle high volumes of asynchronous data. Kafka is a key tool in this approach, allowing different parts of an application to communicate by producing and consuming messages from topics. However, consuming messages from Kafka can often involve long-running processes, which can be challenging when trying to test, scale or manage them. In this blog post, we will explore how to convert long-running Kafka consumers into Flask web services. Flask is a lightweight Python web framework that makes it easy to build scalable and testable APIs. By turning Kafka consumers into Flask web services, we can take advantage of features like parallel processing, better scalability, and easier testing. Whether you’re building a new system or looking to improve an existing one, this approach can help you create more efficient and manageable event-driven applications.

People Throwing Pins From Pexels

Traditional Way

To get started with Kafka in Python, the first step is to install the kafka-python package, which provides a simple interface to communicate with Kafka.


Visit Deep Learning Enabled Art Exhibition: Digital Van Gogh




You can install the package using pip: $pip install kafka-python

Once the package is installed, we can set up a Kafka consumer that listens to messages from a Kafka topic. Here’s an example of how you can do that:

import json
from kafka import KafkaConsumer
from datetime import datetime

consumer = KafkaConsumer(
    bootstrap_servers=["localhost:9092"],
    group_id='my-group-id',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
consumer.subscribe(pattern="MY-KAFKA-TOPIC")

for event in consumer:
    topic = event.topic
    event_time = datetime.fromtimestamp(event.timestamp / 1000)
    key = event.key.decode("utf-8")
    value = event.value
    print(f"a new event created in {topic=} at {event_time=} with {key=} and {value=}")

In this example, the KafkaConsumer subscribes to a Kafka topic MY-KAFKA-TOPIC. It uses the bootstrap_servers parameter to connect to the Kafka cluster, and the group_id helps group multiple consumers for coordinated message consumption.

The for loop runs indefinitely, continuously checking for new messages in the Kafka topic. Once a message is published to the topic, the consumer will pick it up and print the message details, including the topic name, event timestamp, key, and value.

This approach works well for event-driven systems, but it has some limitations, especially when it comes to scaling and managing long-running processes. In the next section, we will look at how we can convert this traditional Kafka consumer into a Flask web service for better scalability and flexibility.

Web Service

To convert our Kafka consumer into a web service, we will need the Flask framework along with the flask-kafka package, which simplifies integrating Kafka with Flask.

First, let’s install the necessary packages:

$ pip install Flask flask-kafka

We will set up a bus to communicate with Kafka. This bus will be responsible for consuming messages from Kafka and integrating with our Flask application. The following code snippet, placed in src/receiver/bus.py, shows how to set up the Kafka bus:





import signal
from threading import Event
from flask_kafka import FlaskKafka

# Event to handle server interruptions gracefully
INTERRUPT_EVENT = Event()

# Setting up the Kafka bus
bus = FlaskKafka(
    INTERRUPT_EVENT,
    bootstrap_servers=["localhost:9092"],
    group_id="my-group-id",
    security_protocol="SASL_SSL",
    ssl_check_hostname=False,
    ssl_cafile="/home/sefik/certs/kafka.pem",
    sasl_mechanism="SCRAM-SHA-512",
    sasl_plain_username=SASL_USERNAME,
    sasl_plain_password=SASL_PASSWORD,
    api_version=(0, 10, 2),
)

# Function to handle server interruptions and cleanly shut down the Kafka consumer
def listen_kill_server(bus_obj):
    signal.signal(signal.SIGTERM, bus_obj.interrupted_process)
    signal.signal(signal.SIGINT, bus_obj.interrupted_process)
    signal.signal(signal.SIGQUIT, bus_obj.interrupted_process)
    signal.signal(signal.SIGHUP, bus_obj.interrupted_process)

Next, we will create the Flask routes in src/receiver/routes.py to handle incoming HTTP requests and Kafka messages. The following snippet shows how to set up a route for receiving messages from Kafka and a POST endpoint for testing:

# built-in dependencies
import json

# 3rd party dependencies
from flask import Blueprint

# project dependencies
from receiver.bus import bus

blueprint = Blueprint("blueprint_receiver", __name__)

# Kafka consumer event handler
@bus.handle("MY-KAFKA-TOPIC")
@blueprint.route("/receive/message", methods=["POST"])
def receive_message(request):
    event = json.loads(request.value)
    print(f"Received event: {event}")

In this example, when a message is sent to the MY-KAFKA-TOPIC Kafka topic, or when the /receive/message endpoint is called via HTTP POST (for testing purposes), the receive_message function will be triggered. It will print the received event to the console.

We will create the main Flask application in src/app.py, where we configure the application, register the routes, and integrate the Kafka consumer to process messages.

# 3rd party dependencies
from flask import Flask
from flask_cors import CORS

# project dependencies
from routes import blueprint
from receiver.bus import bus, listen_kill_server

def create_app():
    app = Flask(__name__)
    CORS(app)  # Enable cross-origin resource sharing

    app.register_blueprint(blueprint)

    bus.run()
    listen_kill_server(bus)

    print(f"Welcome to my web service!")
    return app

With this setup, when you run the Flask application, the Kafka consumer will run in parallel, processing messages from Kafka, and the application will gracefully handle server shutdowns.

For testing purposes, we can create an src/api.py file to run the application locally:

import app

if __name__ == "__main__":
    my_app = app.create_app()
    my_app.run(host="0.0.0.0", port=5000)

Running python src/api.py will start the Flask application on localhost:5000, but note that this is only for testing. It will run with a single worker and is not recommended for production environments.

For production, we need to run the application with Gunicorn, a production-ready WSGI server that can handle multiple worker processes. To do this, we can install Gunicorn:

$ pip install gunicorn

Then, we can run the following command to start the Flask application with 8 worker processes:

gunicorn --workers=8 --timeout=3600 --bind=0.0.0.0:5000 "app:create_app()"

This will run the Flask web service with 8 workers, allowing it to handle multiple Kafka message events concurrently and scale efficiently under heavy loads.

Conclusion

In this post, we demonstrated how to convert a long-running Kafka consumer into a scalable Flask web service. By integrating Flask with Kafka, we were able to process messages asynchronously, making the system more flexible and easier to maintain. This approach ensures that even if the application temporarily goes down, it can continue processing messages from where it left off when it restarts, preventing message loss.





A key advantage of using Flask with Kafka is the ability to scale the application. By adjusting the number of workers in the Gunicorn configuration, you can increase the number of parallel processes that consume messages from Kafka. For example, setting the number of workers to 8 will allow the application to handle 8 concurrent Kafka messages, improving performance and scalability under heavy loads. This configuration change can be easily made, allowing you to fine-tune your application’s capacity based on traffic and resource availability.

In conclusion, integrating Flask with Kafka allows you to build robust, scalable, and fault-tolerant event-driven systems that can grow with your application’s needs.


Support this blog if you do like!

Buy me a coffee      Buy me a coffee