Willkommen zu dieser ersten Ueberarbeitung meines Artikels “Ereignisgesteuerte Architekturen mit Kafka und Python”.

Im urspruenglichen Artikel war das gesendete Ereignis ein String. Waehrend dies fuer viele Anwendungsfaelle ausreicht, erfordern die meisten meiner Anwendungsfaelle das Senden eines Objekts eines benutzerdefinierten Typs.

Daher habe ich den Code und das Tutorial geaendert, um Objektserialisierung und -deserialisierung mit JSON zu verwenden.

Natuerlich gibt es andere Methoden zur Serialisierung und Deserialisierung; eine beliebte ist Avro.

Viele Wege fuehren nach Rom; wir beginnen mit einem und fahren mit dem Tutorial fort.

Einfuehrung

Ereignisgesteuerte Architekturen sind in den letzten Jahren prominent geworden, wobei Kafka der De-facto-Standard fuer Werkzeuge ist.

Dieser Beitrag bietet ein vollstaendiges Beispiel einer ereignisgesteuerten Architektur, implementiert mit zwei Python-Services, die ueber Kafka kommunizieren.

Das Hauptziel ist es, ein funktionierendes Beispiel bereitzustellen, ohne in Details einzutauchen, die davon ablenken wuerden, etwas schnell zum Laufen zu bringen.

Wir haben einige Bausteine, hauptsaechlich

  • Infrastruktur (Kafka, Zookeeper)
  • Producer (Python Service)
  • Consumer (Python Service)

Die einzige Aufgabe des Producers ist es, periodisch ein Ereignis an Kafka zu senden. Dieses Ereignis traegt einen Zeitstempel. Die Aufgabe des Consumers ist es, auf dieses Ereignis zu lauschen und den Zeitstempel auszugeben.

Kafka

Die Implementierung fuehrte zur folgenden Projektstruktur.

Projektstruktur

Der vollstaendige Code kann hier heruntergeladen werden.

Infrastruktur

Neben den Services werden nur zwei Komponenten benoetigt, um eine ereignisbasierte Architektur auszufuehren: Kafka und Zookeeper.

Siehe den Ressourcenabschnitt am Ende des Tutorials fuer Links zu beiden.

Kafka ist die Hauptkomponente fuer den Ereignisaustausch; Zookeeper wird fuer mehrere Unterstuetzungsfunktionen benoetigt. Von der Zookeeper-Website:

ZooKeeper ist ein zentralisierter Dienst zur Verwaltung von Konfigurationsinformationen, Benennung, verteilter Synchronisation und Bereitstellung von Gruppendiensten.

Unten ist die docker-compose.yml, um beide zum Laufen zu bringen:

version: '3'
 
services:
 
 kafka:
  image: wurstmeister/kafka
  container_name: kafka
  ports:
  - "9092:9092"
  environment:
  - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
  - KAFKA_ADVERTISED_PORT=9092
  - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
  depends_on:
  - zookeeper
 
 zookeeper:
  image: wurstmeister/zookeeper
  ports:
  - "2181:2181"
  environment:
  - KAFKA_ADVERTISED_HOST_NAME=zookeeper

Sobald vorhanden, werden nur die beiden Services benoetigt, die die “Geschaeftsdomaene” implementieren. Hier ein sehr einfaches Beispiel: Senden und Empfangen eines Zeitstempels.

Code-Setup

Es gibt zwei Python-Projekte; der gesamte Code fuer jedes befindet sich in seiner main.py.

Jedes hat nur eine Abhaengigkeit, die in seiner requirements.txt aufgefuehrt ist:

kafka-python==2.0.2

Installiere diese Abhaengigkeit fuer jedes Projekt mit folgendem Befehl:

python3 -m pip install -r requirements.txt

Producer

Wie oben erwaehnt, generiert der Producer Zeitstempel.

class TimestampEvent:
 def __init__(self, timestamp):
 self.timestamp = timestamp

Diese Zeitstempel werden an Kafka veroeffentlicht, damit interessierte Consumer sie empfangen koennen.

import json
from kafka import KafkaProducer
from datetime import datetime
from time import sleep
 
from TimestampEvent import TimestampEvent
 
producer = KafkaProducer(bootstrap_servers='localhost:9092',
       value_serializer=lambda x: json.dumps(x.__dict__).encode('utf-8'))
 
while True:
 timestampEvent = TimestampEvent(datetime.now().strftime("%H:%M:%S"))
 print("Sending: " + timestampEvent.timestamp)
 producer.send('timestamp', timestampEvent)
 sleep(5)

Beim Erstellen des Producers geben wir zwei Informationen an:

  • bootstrap_servers: wo Kafka zu finden ist. Dies haette weggelassen werden koennen, da localhost:9092 der Standard ist.
  • value_serializer: wie Nachrichten kodiert werden.

Nachrichten werden an das timestamp-Topic gesendet. Dies ist relevant, damit der Consumer nur Nachrichten von diesem Topic empfaengt.

Das ist alles fuer den Producer. Weiter zum Consumer…

Consumer

Die Datenstruktur fuer Zeitstempel ist identisch mit der des Producers.

class TimestampEvent:
 def __init__(self, timestamp):
  self.timestamp = timestamp

Jetzt empfangen wir die Zeitstempel, die der Producer gesendet hat.

from kafka import KafkaConsumer
import json

from TimestampEvent import TimestampEvent

consumer = KafkaConsumer('timestamp',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 timestampEvent = TimestampEvent(**(message.value))
 print("Received: " + timestampEvent.timestamp)

Diesmal haben wir bootstrap_servers nicht angegeben wie beim Producer. Der Standardwert ist localhost:9092.

Die notwendigen angegebenen Parameter sind:

  • Das Topic, auf das der Consumer lauscht: timestamp
  • value_deserializer: wie Nachrichten nach dem Empfang dekodiert werden

Alles ist jetzt vorhanden. Bereit fuer die Aktion.

Beispielcode ausfuehren

Fuehre jetzt alles aus. Denke an die folgende Projektstruktur:

event-driven-architectures
- docker-compose.yml
- python-tutorial
-- producer
--- main.py
-- consumer
--- main.py

Im Verzeichnis event-driven-architectures werden Kafka und Zookeeper ueber docker-compose gestartet:

docker-compose up -d

Wechsle in das Verzeichnis producer und starte den Service mit:

$ source venv/bin/activate
(venv) $ python3 main.py

Oeffne schliesslich ein neues Terminalfenster, wechsle in das Verzeichnis consumer und starte den Service auf die gleiche Weise:

$ source venv/bin/activate
(venv) $ python3 main.py

Jetzt solltest du etwas wie dies sehen. Links ist die Log-Ausgabe des Producers; rechts ist die Log-Ausgabe des Consumers.

Herzlichen Glueckwunsch, wenn du es bis hierher geschafft hast. Es sollte einfach zu folgen gewesen sein.

Aufraeumen

Wenn du fertig bist, kannst du die Python-Umgebung verlassen, indem du eingibst:

deactivate
(venv) $ deactivate

Die Docker-Services laufen noch. Diese muessen ebenfalls gestoppt und aufgeraeumt werden.

Der folgende Befehl fuehrt Folgendes aus:

  • Stoppt alle laufenden Docker-Container
  • Entfernt gestoppte Docker-Container
  • Entfernt alle Volumes
$ docker-compose stop && docker-compose rm -f && docker volume prune -f
Stopping kafka      ... done
Stopping kafka-python-tutorial_zookeeper_1... done
Going to remove kafka, kafka-python-tutorial_zookeeper_1
Removing kafka      ... done
Removing kafka-python-tutorial_zookeeper_1... done
Deleted Volumes:
e4380413983bb36f914621dac4019565cd9ed130c04c5336c898874b648c2c92
120ab4ab7e227bdc5ee155d1cc61f29b1b0f8d7ed2fa9ee29deb05c90e33b8fe
0636bf46ec05cdda15deec280cdef672c68366a7d8d57ff424938069498e4063

Total reclaimed space: 67.13MB

Fazit

Dies schliesst das Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Python ab.

Der vollstaendige Projektcode kann hier heruntergeladen werden.

Ressourcen