Datenbanken sind heutzutage ueberall, und mit Change Data Capture kannst du sie mit wenig Aufwand in eine Quelle fuer ereignisgesteuerte Systeme verwandeln.

Tools wie Debezium und Kafka Connect ueberbruecken die Luecke zwischen diesen beiden Welten und ermoeglichen es dir, eine traditionelle datengesteuerte Anwendung in eine ereignisgesteuerte umzuwandeln.

Dieser Artikel bietet ein Schritt-fuer-Schritt-Tutorial fuer die Einrichtung von Change Data Capture mit Debezium auf einer MySQL-Datenbank und Serverless Kafka auf Upstash.

Mit einem ereignisgesteuerten System wie dem unten dargestellten kannst du auf jede Aenderung in der Datenbank reagieren - ob Schema-Aenderungen oder Datenmodifikationen - und diese Events an andere Systeme weiterleiten.

In den “alten Tagen” haettest du periodisch Batch-Jobs ausfuehren muessen, um die in einer Datenbank gespeicherten Daten zu verarbeiten. Dieser Ansatz hatte viele Nachteile: Operationen waren teuer und fuehrten zu erheblichen Verzoegerungen.

Jetzt koennen wir kleinere, gezielte Prozesse genau dann ausfuehren, wenn Datenaenderungen auftreten.

Die naechsten Abschnitte stellen die Teile vor, die zur Implementierung eines solchen Systems benoetigt werden:

  • Datenbank (MySQL)
  • Debezium
  • Kafka (Upstash)
  • Kafka Connect

Lass uns jeden Teil etwas besser kennenlernen. Haftungsausschluss: Ich werde nicht erklaeren, was eine Datenbank ist, aber ich werde Hinweise fuer die anderen Komponenten geben.

Was ist Debezium?

Debezium ist eine Plattform, die Datenbanken ueberwacht, sodass du sofort auf Aenderungen reagieren kannst. Es erkennt Einfuegungen, Aktualisierungen, Loeschungen und sogar Schema-Aenderungen.

Es verwendet das Change Data Capture-Muster, um den Datenbankzustand kontinuierlich zu ueberwachen und Aenderungsereignisse an Kafka zu senden.

Was ist Kafka?

Kafka ist eine verteilte Event-Streaming-Plattform. Fuer dieses Tutorial verwende ich Serverless Kafka von Upstash.

Upstash ist eine On-Demand-Loesung mit nutzungsbasierter Abrechnung - keine Kreditkarte fuer kleine Projekte oder Experimente erforderlich. Du zahlst mehr nur, wenn du deinen Kafka-Cluster skalierst und das Nachrichtenvolumen erhoehst.

Siehe diesen Artikel, um mit Upstash zu beginnen

Erste Schritte mit Upstash fuer Kafka

Was ist Kafka Connect?

Kafka Connect ist ein Framework, das Streaming zwischen Kafka und anderen Systemen ermoeglicht - stell es dir als Bruecke zwischen Kafka und, in diesem Tutorial, einer Datenbank vor.

Jetzt, da wir alle Teile haben, lass sie uns zusammenfuegen.

Die Teile zusammenfuegen

Hier ist, was wir in diesem Tutorial verwenden werden.

Auf dem lokalen Rechner fuehren wir eine MySQL-Datenbank und Kafka Connect mit Docker Compose aus. Unser Serverless Kafka laeuft auf Upstash. Wir schuetzen die Kommunikation zwischen unserem lokalen Kafka Connect und Upstash mit SSL.

Dies sind die Schritte, denen wir folgen werden:

  • Lokale Datenbank einrichten
  • Kafka-Cluster und Topics auf Upstash konfigurieren
  • Kafka Connect und Debezium konfigurieren
  • Datenbank erstellen, Daten hinzufuegen und aendern
  • Alle Kafka-Events im Zusammenhang mit diesen Datenbankoperationen ueberwachen

Einrichten der lokalen Datenbank

Lass uns mit der Einrichtung der Datenbank beginnen.

Dies ist die MySQL-Service-Beschreibung fuer die docker-compose.yml:

mysql:
 image: mysql:latest
 container_name: mysql
 command:
  --server-id=12345
  --log-bin=mysql-bin
  --binlog-format=ROW
  --binlog-row-image=full
 networks:
  - tutorial
 environment:
  MYSQL_USER: foo
  MYSQL_PASSWORD: bar
  MYSQL_ROOT_PASSWORD: foobar
 volumes:
  - db_data:/var/lib/mysql

Starte es mit:

docker-compose up mysql

Verbinde dich mit der Datenbankkonsole:

docker-compose exec mysql bash -c 'mysql -uroot -pfoobar'

Erstelle die Datenbank von der MySQL-Konsole:

create database testdb;
use testdb;

Dies ist die Datenbank, die wir in diesem Tutorial aendern werden. Debezium wird die Aenderungen erfassen und Events automatisch an Kafka senden.

Erteile dem Benutzer Berechtigungen, um diese Aenderungen zu ueberwachen:

GRANT REPLICATION CLIENT, REPLICATION SLAVE, LOCK TABLES ON *.* TO 'foo'@'%';
FLUSH PRIVILEGES;

Die Datenbank ist bereit. Als Naechstes konfiguriere den Kafka-Cluster.

Upstash-Konto und Konfiguration von Topics

Ich gehe davon aus, dass du einen laufenden Kafka-Cluster hast.

Wenn dein Cluster Topics nicht automatisch erstellt, muessen einige Topics manuell erstellt werden.

Auf Upstash sollte das Setup so aussehen:

Unabhaengig davon, wie deine Datenbank heisst, werden die folgenden drei Topics immer benoetigt:

  • connect-offsets
  • connect-configs
  • connect-statuses

Stelle sicher, dass sie eine Cleanup-Policy von “compact” haben.

Dann gibt es zwei datenbankspezifische Topics:

  • schema-changes.testdb
  • theservername

Und ein Topic fuer die Tabelle, die wir ueberwachen werden (die du moeglicherweise noch nicht erstellt hast):

  • theservername.testdb.planes

Diese Namen werden an verschiedenen Stellen konfiguriert. Die ersten drei Topics werden von Kafka Connect in unserer docker-compose.yml referenziert.

Konfigurieren von Kafka Connect mit dem Debezium Connector

Jetzt konfiguriere Kafka Connect. Hier ist der Kafka Connect-Service in Docker Compose:

connect:
 image: quay.io/debezium/connect:1.9
 container_name: connect
 ports:
  - "8083:8083"
 networks:
  - tutorial
 environment:
  BOOTSTRAP_SERVERS:...
 
  CONNECT_SECURITY_PROTOCOL: SASL_SSL
  CONNECT_SASL_MECHANISM: SCRAM-SHA-256
  CONNECT_SASL_JAAS_CONFIG:...
 
  CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
  CONNECT_PRODUCER_SASL_MECHANISM: SCRAM-SHA-256
  CONNECT_PRODUCER_SASL_JAAS_CONFIG:...
 
  CONFIG_STORAGE_TOPIC: connect-configs
  OFFSET_STORAGE_TOPIC: connect-offsets
  STATUS_STORAGE_TOPIC: connect-statuses

Ersetze die Punkte durch deinen Servernamen und deine Anmeldedaten.

Starte es:

docker-compose up -d connect

Registriere den Debezium-Connector mit diesem curl-Befehl:

curl --location --request POST '127.0.0.1:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
 "name": "tutorial-connector",
 "config": {
 "connector.class": "io.debezium.connector.mysql.MySqlConnector",
 "tasks.max": "1",
 "database.hostname": "mysql",
 "database.port": "3306",
 "database.user": "foo",
 "database.password": "bar",
 "database.server.id": "12345",
 "database.allowPublicKeyRetrieval":"true",
 "database.server.name": "theservername",
 "database.include.list": "testdb",
 "table.whitelist": "testdb.planes",
 "table.include.list": "testdb.planes",
 "database.history.kafka.bootstrap.servers": "...",
 "database.history.kafka.topic": "schema-changes.testdb",
 "database.history.consumer.security.protocol": "SASL_SSL",
 "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
 "database.history.consumer.sasl.mechanism": "SCRAM-SHA-256",
 "database.history.consumer.sasl.jaas.config": "..."
 "database.history.producer.security.protocol": "SASL_SSL",
 "database.history.producer.ssl.endpoint.identification.algorithm": "https",
 "database.history.producer.sasl.mechanism": "SCRAM-SHA-256",
 "database.history.producer.sasl.jaas.config": "..."
 }
}'

Ersetze erneut den Servernamen und die Anmeldedaten.

Pruefe den Connector:

curl --location --request GET '127.0.0.1:8083/connectors/tutorial-connector'

Jetzt sind die Komponenten an ihrem Platz:

  • Kafka-Cluster
  • Kafka Connect und Debezium
  • MySQL-Datenbank

Als Naechstes werden wir:

  • Eine Tabelle erstellen
  • Daten hinzufuegen
  • Daten aendern
  • Die zugehoerigen Kafka-Topics ueberwachen

Datenbankoperationen in Kafka ueberwachen

Mit allem an Ort und Stelle, erstelle die Tabelle, fuege Daten hinzu, aendere sie und beobachte die Events in Kafka.

Wir werden kafka-console-consumer verwenden. Erstelle eine Konfigurationsdatei (ich habe sie upstash.config genannt):

sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=...

Du kannst diese Werte von Upstash kopieren

Ueberwache diese beiden Topics:

  • schema-changes.testdb
  • theservername.testdb.planes

Ueberwache das erste Topic:

kafka-console-consumer --bootstrap-server... --consumer.config upstash.config --from-beginning --topic schema-changes.testdb

Erstelle in der MySQL-Konsole eine Tabelle:

CREATE TABLE planes (color INT);

Dies sollte ein Kafka-Event aehnlich diesem erzeugen:

{
 "source": {
 "server": "theservername"
 },
 "position": {
 "transaction_id": null,
 "ts_sec": 1652358237,
 "file": "mysql-bin.000003",
 "pos": 1624,
 "server_id": 12345
 },
 "databaseName": "testdb",
 "ddl": "CREATE TABLE planes (color INT)",
 "tableChanges": [ {
 "type": "CREATE",
 "id": "\"testdb\".\"planes\"",
 "table": {
  "defaultCharsetName": "utf8mb4",
  "primaryKeyColumnNames": [ ],
  "columns": [ {
  "name": "color",
  "jdbcType": 4,
  "typeName": "INT",
  "typeExpression": "INT",
  "charsetName": null,
  "position": 1,
  "optional": true,
  "autoIncremented": false,
  "generated": false,
  "comment": null,
  "hasDefaultValue": true,
  "enumValues": [ ]
  } ]
 },
 "comment": null
 } ]
}

Beginne mit der Ueberwachung des Tabellen-Topics:

kafka-console-consumer --bootstrap-server... --consumer.config upstash.config --from-beginning --topic theservername.testdb.planes

Fuege Daten ein:

INSERT INTO planes (color) VALUES (1);

Du solltest ein Event beobachten - pruefe die “before”- und “after”-Felder im Payload.

Aktualisiere die Daten:

UPDATE planes SET color=2 WHERE color=1;

Pruefe erneut die “before”- und “after”-Felder im Payload.

Einige weitere Befehle zum Ausprobieren:

ALTER TABLE PLANES ADD NAME varchar(100);
DROP TABLE PLANES;

Aufraeumen

Fertig? Bereinige lokale Container und entferne sie:

docker-compose rm -fsv

Du kannst auch ungenutzte Volumes entfernen (Vorsicht - dies koennte noch verwendete Volumes entfernen):

docker system prune -f --volumes

Gehe schliesslich auf Upstash zu “Details” und scrolle zur Option, den Cluster zu loeschen.

Fazit

Dieser Artikel bietet eine praktische Einfuehrung in Change Data Capture mit Debezium.

Wir haben die erforderlichen Komponenten umrissen und gezeigt, wie man Kafka Connect und Debezium verwendet, um eine Datenbank in die Quelle eines ereignisgesteuerten Systems zu verwandeln.

Danke fuers Lesen!

Ressourcen