Willkommen zu einer weiteren Episode meiner Serie ueber ereignisgesteuerte Architekturen. Dies ist der dritte Teil der Serie.

Bisher haben wir:

Dieser Artikel zeigt, wie man eine ereignisgesteuerte Architektur mit Kotlin realisiert, mit einigen Unterschieden zu den vorherigen Beitraegen:

  • Wir verwenden Kotlin statt Java.
  • Wir verwenden Gradle statt Maven fuer das Abhaengigkeitsmanagement und das Bauen.
  • Das Projekt wurde mit IntelliJs integriertem Spring Initializr erstellt, nicht dem webbasierten.
  • Ein Projekt/Service enthaelt sowohl den Producer als auch den Consumer. Das aendert das Tutorial nicht und ist einfacher zu handhaben.
  • Anstelle einer lokalen Kafka-Instanz via docker-compose stellt ein externer Service namens Upstash die Kafka-Infrastruktur bereit.

Upstash ist ein echtes Serverless-Angebot fuer Kafka und Redis, das eine On-Demand-Loesung mit nutzungsbasierter Abrechnung bietet, ohne Hardware, VMs oder Docker-Container verwalten zu muessen.

Ich habe zuvor einen einfuehrenden Beitrag zu Upstash geschrieben, genannt “Erste Schritte mit Upstash fuer Kafka.”

Einfuehrung

Ereignisgesteuerte Architekturen sind in den letzten Jahren populaer geworden, wobei Kafka als De-facto-Standard fuer das Tooling gilt.

Dieser Beitrag bietet ein vollstaendiges Beispiel einer ereignisgesteuerten Architektur, implementiert mit einem Kotlin Spring Boot-Service, der ueber einen Kafka-Cluster auf Upstash kommuniziert.

Das Hauptziel dieses Tutorials ist es, ein funktionierendes Beispiel zu liefern, ohne sich in Details zu verlieren, die von der Hauptaufgabe ablenken, schnell etwas zum Laufen zu bringen.

Wir haben ein paar Bausteine:

  • Infrastruktur (Kafka auf Upstash)
  • Producer (Kotlin, Spring Boot)
  • Consumer (Kotlin, Spring Boot)

Der Producer sendet periodisch ein Event an Kafka. Das Event traegt einen Zeitstempel. Der Consumer hoert auf dieses Event und gibt den Zeitstempel aus.

Die Implementierung fuehrte zu dieser Projektstruktur:

Der vollstaendige Code kann hier heruntergeladen werden. https://github.com/twissmueller/event-driven-architectures

Dies kann direkt von der Kommandozeile gebaut oder in eine IDE wie IntelliJ importiert werden.

Code-Setup

Benutzer der IntelliJ Ultimate Edition koennen den Spring Initializr aus dem “New Project”-Dialog der IDE verwenden. Wenn er nicht verfuegbar ist, kannst du das Projekt unter https://start.spring.io erstellen, das ZIP herunterladen und entpacken und es in IntelliJ oder einen anderen Editor importieren.

Zuerst habe ich ein neues Projekt erstellt, das Kotlin als Sprache und Gradle fuer das Abhaengigkeits- und Build-Management verwendet.

Im naechsten Schritt habe ich eine Spring-Abhaengigkeit hinzugefuegt; der Rest wurde spaeter manuell hinzugefuegt.

Nach dem Hinzufuegen der Dateien fuer Producer und Consumer sah das Projekt so aus:

Was in diese Dateien kommt, wird in den naechsten Abschnitten erklaert.

Producer

Der Producer sendet Events, die einen Zeitstempel enthalten, an Kafka.

Events in Kafka sind Key-Value-Paare. Wir muessen einen Serialisierer fuer die Produktion und einen Deserialisierer fuer den Konsum definieren. Ich habe dies in application.properties konfiguriert:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Der Key ist ein einfacher String; der Value ist JSON.

Der Producer hat eine Methode, die das Event alle 5 Sekunden sendet. Die @Scheduled-Annotation macht dies einfach.

Zuerst wird ein Zeitstempel erstellt und dann an das tutorial-topic gesendet:

@Component
class KafkaProducer(private val kafkaTemplate: KafkaTemplate<String, TimestampEvent>) {
 
 @Scheduled(fixedRate = 5000)
 fun send() {
  val event = TimestampEvent(timestamp = ZonedDateTime.now())
  kafkaTemplate.send("tutorial-topic", event)
  println("Sent: ${event.timestamp.toString()}")
 }
 
}

Das ist alles fuer den Producer. Weiter zum Consumer…

Consumer

Jetzt empfangen wir die vom Producer gesendeten Zeitstempel-Events.

Wir beginnen mit der Konfiguration. Aehnlich wie beim Producer spezifizieren wir die Datentypen fuer Key und Value. Der Consumer gibt auch seine Consumer-Gruppe an.

Wir teilen Kafka auch mit, was zu tun ist, wenn es keinen initialen Offset gibt oder der aktuelle Offset nicht mehr existiert. Mit auto-offset-reset=earliest weisen wir Kafka an, den fruehesten verfuegbaren Offset zu verwenden.

Wir muessen auch eine kommagetrennte Liste von Paketen angeben, die fuer die Deserialisierung erlaubt sind, mit spring.kafka.consumer.properties.spring.json.trusted.packages.

Hier ist die vollstaendige Konfiguration in application.properties:

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=tutorial-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.kotlintutorial

Schliesslich spezifiziert der Consumer in KafkaConsumer.kt einen Listener auf einem Topic mit @KafkaListener und behandelt das eingehende Event, indem er den Zeitstempel protokolliert:

@Component
class KafkaConsumer {
 
 @KafkaListener(topics = ["tutorial-topic"])
 fun processMessage(event: TimestampEvent) {
  println("Received: ${event.timestamp.toString()}")
 }
 
}

Jetzt, da der Code vorhanden ist, brauchen wir nur noch die Infrastruktur, die unseren Kafka-Cluster bereitstellt.

Infrastruktur

Wie erwaehnt, stellt Upstash den Kafka-Cluster bereit.

Wenn du noch kein Konto mit einem laufenden Cluster hast, siehe diesen Artikel fuer erste Schritte:

2022-02-01 First Steps with Upstash for Kafka

Im Vergleich zum Java-Tutorial habe ich die gesamte Konfiguration in Properties gepackt.

Wenn dein Cluster bereit ist, aendere zwei Properties in application.properties.

Setze zuerst die Cluster-Adresse in spring.kafka.bootstrap-servers:

spring.kafka.bootstrap-servers=CHANGE_ME

Setze dann einige sicherheitsrelevante Properties. Die, die du aendern musst, ist spring.kafka.properties.sasl.jaas.config:

spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=CHANGE_ME

Die Werte fuer bootstrap.servers und sasl.jaas.config werden fuer dich in der Upstash-Konsole generiert. Kopiere nicht die Werte unten - sie gehoerten zu meinem Cluster, der nach diesem Tutorial geloescht wurde.

Beispielcode ausfuehren

Jetzt fuehre alles aus. Stelle zuerst sicher, dass dein Cluster auf Upstash laeuft.

Fuehre das Projekt von der IDE aus oder vom Terminal mit:

./gradlew bootRun

Du solltest eine Ausgabe aehnlich dieser im Terminal sehen:

Die IDE-Konsolenausgabe sieht so aus:

Fazit

Damit ist dieses einfuehrende Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Kotlin auf Upstash abgeschlossen.

Du weisst jetzt, wie man:

  • einen Cluster erstellt
  • Events mit einem Producer sendet
  • Events mit einem Consumer empfaengt
  • all das in Kotlin macht

Danke fuers Lesen!

Ressourcen