
Ereignisgesteuerte Architekturen sind in den letzten Jahren populaer geworden, und Kafka ist der De-facto-Standard fuer dieses Tooling.
Dieser Beitrag bietet ein vollstaendiges Beispiel einer ereignisgesteuerten Architektur, implementiert mit zwei Python-Services, die ueber Kafka kommunizieren.
Das Hauptziel dieses Tutorials ist es, ein funktionierendes Beispiel zu liefern, ohne in Details einzutauchen, die von der Hauptaufgabe ablenken wuerden, schnell etwas zum Laufen zu bringen.
Wir haben ein paar Bausteine:
- Infrastruktur (Kafka, Zookeeper)
- Producer (Python-Service)
- Consumer (Python-Service)
Die einzige Aufgabe des Producers ist es, periodisch ein Event an Kafka zu senden, das einen Zeitstempel enthaelt. Der Consumer hoert auf dieses Event und gibt den Zeitstempel aus.

Die Implementierung fuehrte zu folgender Projektstruktur.

Der vollstaendige Code kann hier heruntergeladen werden.
Infrastruktur
Nur zwei Komponenten (neben den Services) werden benoetigt, um eine ereignisbasierte Architektur zum Laufen zu bringen: Kafka und Zookeeper.
Siehe den Ressourcenabschnitt am Ende dieses Tutorials fuer Links zu beiden.
Waehrend Kafka die Hauptkomponente fuer den Austausch von Events ist, wird Zookeeper aus mehreren Gruenden benoetigt. Von der Zookeeper-Website:
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
Unten ist die docker-compose.yml, um beides zum Laufen zu bringen:
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeperWenn das an Ort und Stelle ist, werden nur die beiden Services benoetigt, die die “Geschaeftsdomaene” implementieren. Ein sehr einfaches Beispiel: Senden und Empfangen eines Zeitstempels.
Code-Setup
Es gibt zwei Python-Projekte; der gesamte Code fuer jedes Projekt befindet sich in seiner main.py.
Jedes hat eine einzelne Abhaengigkeit, die in seiner requirements.txt aufgefuehrt ist:
kafka-python==2.0.2
Installiere diese Abhaengigkeit fuer jedes Projekt mit dem folgenden Befehl:
python3 -m pip install -r requirements.txt
Producer
Wie oben erwaehnt, generiert der Producer Zeitstempel und sendet sie ueber Kafka an interessierte Consumer.
from kafka import KafkaProducer
from datetime import datetime
from json import dumps
from time import sleep
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda x: dumps(x).encode('utf-8'))
while True:
timestampStr = datetime.now().strftime("%H:%M:%S")
print("Sending: " + timestampStr)
producer.send('timestamp', timestampStr)
sleep(5)Beim Erstellen des Producers geben wir zwei Informationen an:
bootstrap_servers: wo Kafka zu finden ist. Dies haette weggelassen werden koennen, dalocalhost:9092der Standard ist.value_serializer: wie Nachrichten kodiert werden.
Nachrichten werden an das timestamp-Topic gesendet. Das ist wichtig, damit der Consumer nur Nachrichten von diesem Topic abhoeren kann.
Das ist alles fuer den Producer. Weiter zum Consumer.
Consumer
Jetzt empfangen wir die vom Producer gesendeten Zeitstempel.
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('timestamp',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
print(message.value)
Dieses Mal haben wir bootstrap_servers nicht angegeben, wie wir es beim Consumer getan haben. Es wird standardmaessig auf localhost:9092 gesetzt.
Die notwendigen Parameter sind:
- Das Topic, auf das der Consumer hoert:
timestamp. value_deserializer: Wie Nachrichten dekodiert werden, nachdem sie empfangen wurden. Hinweis: Dies muss mit demvalue_serializerdes Producers kompatibel sein.
Alles ist jetzt an seinem Platz. Bereit fuer etwas Action.
Beispielcode ausfuehren
Es ist Zeit, alles auszufuehren. Erinnere dich an die folgende Projektstruktur:
code /
- docker-compose.yml
- producer
-- main.py
- consumer
-- main.py
Im code-Verzeichnis werden Kafka und ZooKeeper ueber docker-compose gestartet:
docker-compose up -d
Nach dem Wechsel in das producer-Verzeichnis starte den Service mit:
$ source venv/bin/activate
(venv) $ python3 main.py
Schliesslich wechsle in einem neuen Terminalfenster in das consumer-Verzeichnis und starte den Service auf die gleiche Weise:
$ source venv/bin/activate
(venv) $ python3 main.py
Jetzt solltest du etwas wie dies sehen. Links ist die Log-Ausgabe des Producers und rechts die Log-Ausgabe des Consumers.

Herzlichen Glueckwunsch, wenn du es bis hierher geschafft hast!
Aufraeumen
Wenn du fertig bist, kannst du die Python-Umgebung verlassen, indem du eingibst:
(venv) $ deactivate
Docker-Services laufen noch und muessen gestoppt und aufgeraeumt werden.
Der Befehl unten macht Folgendes:
- Stoppt alle laufenden Docker-Container
- Loescht gestoppte Docker-Container
- Entfernt alle Docker-Volumes
$ docker-compose stop && docker-compose rm -f && docker volume prune -f
Stopping kafka ... done
Stopping kafka-python-tutorial_zookeeper_1... done
Going to remove kafka, kafka-python-tutorial_zookeeper_1
Removing kafka ... done
Removing kafka-python-tutorial_zookeeper_1... done
Deleted Volumes:
e4380413983bb36f914621dac4019565cd9ed130c04c5336c898874b648c2c92
120ab4ab7e227bdc5ee155d1cc61f29b1b0f8d7ed2fa9ee29deb05c90e33b8fe
0636bf46ec05cdda15deec280cdef672c68366a7d8d57ff424938069498e4063
Total reclaimed space: 67.13MB
Fazit
Damit ist das Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Python abgeschlossen.
Der vollstaendige Projektcode kann hier heruntergeladen werden.