
Willkommen zu dieser ersten Ueberarbeitung meines Artikels “Ereignisgesteuerte Architekturen mit Kafka und Python”.
Im urspruenglichen Artikel war das gesendete Ereignis ein String. Waehrend dies fuer viele Anwendungsfaelle ausreicht, erfordern die meisten meiner Anwendungsfaelle das Senden eines Objekts eines benutzerdefinierten Typs.
Daher habe ich den Code und das Tutorial geaendert, um Objektserialisierung und -deserialisierung mit JSON zu verwenden.
Natuerlich gibt es andere Methoden zur Serialisierung und Deserialisierung; eine beliebte ist Avro.
Viele Wege fuehren nach Rom; wir beginnen mit einem und fahren mit dem Tutorial fort.
Einfuehrung
Ereignisgesteuerte Architekturen sind in den letzten Jahren prominent geworden, wobei Kafka der De-facto-Standard fuer Werkzeuge ist.
Dieser Beitrag bietet ein vollstaendiges Beispiel einer ereignisgesteuerten Architektur, implementiert mit zwei Python-Services, die ueber Kafka kommunizieren.
Das Hauptziel ist es, ein funktionierendes Beispiel bereitzustellen, ohne in Details einzutauchen, die davon ablenken wuerden, etwas schnell zum Laufen zu bringen.
Wir haben einige Bausteine, hauptsaechlich
- Infrastruktur (Kafka, Zookeeper)
- Producer (Python Service)
- Consumer (Python Service)
Die einzige Aufgabe des Producers ist es, periodisch ein Ereignis an Kafka zu senden. Dieses Ereignis traegt einen Zeitstempel. Die Aufgabe des Consumers ist es, auf dieses Ereignis zu lauschen und den Zeitstempel auszugeben.

Die Implementierung fuehrte zur folgenden Projektstruktur.

Der vollstaendige Code kann hier heruntergeladen werden.
Infrastruktur
Neben den Services werden nur zwei Komponenten benoetigt, um eine ereignisbasierte Architektur auszufuehren: Kafka und Zookeeper.
Siehe den Ressourcenabschnitt am Ende des Tutorials fuer Links zu beiden.
Kafka ist die Hauptkomponente fuer den Ereignisaustausch; Zookeeper wird fuer mehrere Unterstuetzungsfunktionen benoetigt. Von der Zookeeper-Website:
ZooKeeper ist ein zentralisierter Dienst zur Verwaltung von Konfigurationsinformationen, Benennung, verteilter Synchronisation und Bereitstellung von Gruppendiensten.
Unten ist die docker-compose.yml, um beide zum Laufen zu bringen:
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeperSobald vorhanden, werden nur die beiden Services benoetigt, die die “Geschaeftsdomaene” implementieren. Hier ein sehr einfaches Beispiel: Senden und Empfangen eines Zeitstempels.
Code-Setup
Es gibt zwei Python-Projekte; der gesamte Code fuer jedes befindet sich in seiner main.py.
Jedes hat nur eine Abhaengigkeit, die in seiner requirements.txt aufgefuehrt ist:
kafka-python==2.0.2
Installiere diese Abhaengigkeit fuer jedes Projekt mit folgendem Befehl:
python3 -m pip install -r requirements.txt
Producer
Wie oben erwaehnt, generiert der Producer Zeitstempel.
class TimestampEvent:
def __init__(self, timestamp):
self.timestamp = timestamp
Diese Zeitstempel werden an Kafka veroeffentlicht, damit interessierte Consumer sie empfangen koennen.
import json
from kafka import KafkaProducer
from datetime import datetime
from time import sleep
from TimestampEvent import TimestampEvent
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda x: json.dumps(x.__dict__).encode('utf-8'))
while True:
timestampEvent = TimestampEvent(datetime.now().strftime("%H:%M:%S"))
print("Sending: " + timestampEvent.timestamp)
producer.send('timestamp', timestampEvent)
sleep(5)Beim Erstellen des Producers geben wir zwei Informationen an:
bootstrap_servers: wo Kafka zu finden ist. Dies haette weggelassen werden koennen, dalocalhost:9092der Standard ist.value_serializer: wie Nachrichten kodiert werden.
Nachrichten werden an das timestamp-Topic gesendet. Dies ist relevant, damit der Consumer nur Nachrichten von diesem Topic empfaengt.
Das ist alles fuer den Producer. Weiter zum Consumer…
Consumer
Die Datenstruktur fuer Zeitstempel ist identisch mit der des Producers.
class TimestampEvent:
def __init__(self, timestamp):
self.timestamp = timestamp
Jetzt empfangen wir die Zeitstempel, die der Producer gesendet hat.
from kafka import KafkaConsumer
import json
from TimestampEvent import TimestampEvent
consumer = KafkaConsumer('timestamp',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
timestampEvent = TimestampEvent(**(message.value))
print("Received: " + timestampEvent.timestamp)
Diesmal haben wir bootstrap_servers nicht angegeben wie beim Producer. Der Standardwert ist localhost:9092.
Die notwendigen angegebenen Parameter sind:
- Das Topic, auf das der Consumer lauscht:
timestamp value_deserializer: wie Nachrichten nach dem Empfang dekodiert werden
Alles ist jetzt vorhanden. Bereit fuer die Aktion.
Beispielcode ausfuehren
Fuehre jetzt alles aus. Denke an die folgende Projektstruktur:
event-driven-architectures
- docker-compose.yml
- python-tutorial
-- producer
--- main.py
-- consumer
--- main.py
Im Verzeichnis event-driven-architectures werden Kafka und Zookeeper ueber docker-compose gestartet:
docker-compose up -d
Wechsle in das Verzeichnis producer und starte den Service mit:
$ source venv/bin/activate
(venv) $ python3 main.py
Oeffne schliesslich ein neues Terminalfenster, wechsle in das Verzeichnis consumer und starte den Service auf die gleiche Weise:
$ source venv/bin/activate
(venv) $ python3 main.py
Jetzt solltest du etwas wie dies sehen. Links ist die Log-Ausgabe des Producers; rechts ist die Log-Ausgabe des Consumers.

Herzlichen Glueckwunsch, wenn du es bis hierher geschafft hast. Es sollte einfach zu folgen gewesen sein.
Aufraeumen
Wenn du fertig bist, kannst du die Python-Umgebung verlassen, indem du eingibst:
deactivate
(venv) $ deactivate
Die Docker-Services laufen noch. Diese muessen ebenfalls gestoppt und aufgeraeumt werden.
Der folgende Befehl fuehrt Folgendes aus:
- Stoppt alle laufenden Docker-Container
- Entfernt gestoppte Docker-Container
- Entfernt alle Volumes
$ docker-compose stop && docker-compose rm -f && docker volume prune -f
Stopping kafka ... done
Stopping kafka-python-tutorial_zookeeper_1... done
Going to remove kafka, kafka-python-tutorial_zookeeper_1
Removing kafka ... done
Removing kafka-python-tutorial_zookeeper_1... done
Deleted Volumes:
e4380413983bb36f914621dac4019565cd9ed130c04c5336c898874b648c2c92
120ab4ab7e227bdc5ee155d1cc61f29b1b0f8d7ed2fa9ee29deb05c90e33b8fe
0636bf46ec05cdda15deec280cdef672c68366a7d8d57ff424938069498e4063
Total reclaimed space: 67.13MB
Fazit
Dies schliesst das Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Python ab.
Der vollstaendige Projektcode kann hier heruntergeladen werden.