Real-Time Data Pipeline: From Wikimedia to Kafka to OpenSearch Using Python

python-apache-kafka

In today’s data-driven world, building real-time pipelines is a key skill for engineers and data scientists alike. In this tutorial, we will create a simple yet powerful real-time data pipeline that consumes live edit events from Wikimedia, pushes them into Apache Kafka, and then reads them from Kafka to store in OpenSearch.

Whether you’re building analytics dashboards, monitoring systems, or enriching search indexes, this setup is a great foundation. Let’s dive in!

Project Overview

Here’s what we’ll build:

  • A Python producer that:
  • A Python consumer that:
    • Reads messages from the Kafka topic
    • Parses and sends them into an OpenSearch index

Source code:

https://github.com/mjmichael73/python-wikimedia-kafka-opensearch

Prerequisites

We’ll assume you already have:

✅ A running Kafka cluster – local using docker compose or remote
✅ A running OpenSearch cluster – local using docker compose or remote
✅ Python 3.8+ installed

✅ Docker and Docker compose installed
✅ Kafka topic named wikimedia.recentchange created

Project Structure:

Step 1:

create a project folder wherever you want on your local computer.

we name it “python-wikimedia-kafka-opensearch”.

Step 2:

create a folder named “producer” and another folder named “consumer” inside the project root.

Step 3:

create a docker compose file named “docker-compose.yml” in project root with this content:

This file includes OpenSearch and Kafka broker with “producer-app” and “consumer-app”.

Step 4:

Create a Dockerfile inside “producer” folder named “Dockerfile”:

This is necessary for docker compose to work, as we are dockerizing the project.

Step 5:

Create a requirements.txt file inside “producer” folder with this content:

These are necessary libraries to work with Kafka and OpenSearch and also getting stream of data from Wikimedia.

Step 6:

Now create a file named “main.py” inside “producer” folder with this content:

As you can see after importing necessary libraries, we are creating two functions named “create_producer” and “main”, The first one is to create a producer and its repeatedly trying to create a Kafka Producer, This is because when we are using Docker it takes some time for the Kafka broker to gets up and running and we are handling this using retry mechanism.

The main function, creates a “producer” object using “create_producer” function and then requests to WikiMedia to get its stream of data using SSEClient.

Now we are done with “producer” folder.

Let’s go to implementation of the “consumer” folder.

Step 7:

Create a Dockerfile inside “consumer” folder with this content:

Step 8:

Create a “requirements.txt” file inside “consumer” folder with this content:

As you can see we are using kafka-python library to connect to Apache Kafka container and we are using opensearch-py to connect to OpenSearch container.

Step 9:

Create a main.py file inside “consumer” folder with this content:

We are creating three functions named “connent_consumer” and “connect_opensearch” and “main”.

The first one is for creating a consumer object with retry mechanism as I described for creating a producer above.

The second one is to create a OpenSearch client object, also with retry mechanism, as I described it takes some time for containers to get up and running.

The main function first creates two clients one named “consumer” which is our Kafka Consumer object and the second one is our OpenSearch object.

Then in the main function we do a for loop and try to send messages to OpenSearch Indexing Database.

Now the project is complete, Let’s test it.

Step 10:

To test the project, just run this command:

docker compose up --build -d

After doing this containers get up and running as you can see in the image below:

Now lets run this command:

docker compose ps

As you can see all the containers are up and running.

Now lets see the logs of “producer-app” container by running this command:

docker compose logs -f producer-app

Here is the results:

As you can see it’s getting streams from Wikimedia and then sending them as events to Apache Kafka.

Now let’s look at “consumer-app” by running this command:

docker compose logs -f consumer-app

As you can see it’s endlessly receiving events from the broker and sending them to OpenSearch Indexing database container.

Now let’s query something in OpenSearch Indexing database to see if the data is indexed or not.

To do this we will use OpenSearchDashboard container which we have in our docker compose file, to access it go to this address:

http://localhost:5601

After opening this URL open the left side menu and go to Management section as below:

Click on DevTools and You will see something like this:

In the left panel write something like this:

GET /wikimedia-changes

And as you can see everything is working fine.

Hooray !

The source code of project is available at:

https://github.com/mjmichael73/python-wikimedia-kafka-opensearch

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *