Willkommen zu dieser ersten Ueberarbeitung meines Artikels “Ereignisgesteuerte Architekturen mit Kafka und Java Spring-Boot”.

Im urspruenglichen Artikel war das gesendete Ereignis ein String. Waehrend dies fuer viele Anwendungsfaelle ausreicht, erfordern die meisten meiner Anwendungsfaelle das Senden eines Objekts eines benutzerdefinierten Typs.

Daher habe ich den Code und das Tutorial geaendert, um Objektserialisierung und -deserialisierung mit JSON zu verwenden.

Natuerlich gibt es andere Methoden zur Serialisierung und Deserialisierung von Daten. Eine sehr beliebte ist zum Beispiel Avro.

Viele Wege fuehren nach Rom, lass uns mit einem beginnen und mit dem Tutorial fortfahren.


Einfuehrung

Ereignisgesteuerte Architekturen sind in den letzten Jahren ueblich geworden, wobei Kafka der De-facto-Standard ist, wenn es um Werkzeuge geht.

Dieser Beitrag bietet ein vollstaendiges Beispiel fuer eine ereignisgesteuerte Architektur, implementiert mit zwei Java Spring-Boot-Services, die ueber Kafka kommunizieren.

Das Hauptziel dieses Tutorials war es, ein funktionierendes Beispiel bereitzustellen, ohne zu sehr ins Detail zu gehen, was meiner Meinung nach unnoetig von der Hauptaufgabe ablenkt, “etwas” so schnell wie moeglich zum Laufen zu bringen.

Wir haben einige Bausteine, hauptsaechlich

  • Infrastruktur (Kafka, Zookeeper)
  • Producer (Java Spring-Boot Service)
  • Consumer (Java Spring-Boot Service)

Die einzige Aufgabe des Producers ist es, periodisch ein Ereignis an Kafka zu senden. Dieses Ereignis traegt nur einen Zeitstempel. Die Aufgabe des Consumers ist es, auf dieses Ereignis zu lauschen und den Zeitstempel auszugeben.

Kafka

Die gesamte Implementierung fuehrte zur folgenden Projektstruktur.

Projektstruktur

Der vollstaendige Projektcode kann hier heruntergeladen werden.

Dies kann auf der Kommandozeile wie unten erklaert erstellt oder in eine IDE wie IntelliJ importiert werden.

IntelliJ-Projekt


Infrastruktur

Neben den Services werden nur zwei Komponenten benoetigt, um eine ereignisbasierte Architektur zum Laufen zu bringen: Kafka und Zookeeper.

Sehen Sie sich den Ressourcenabschnitt am Ende des Tutorials fuer Links zu beiden an.

Waehrend Kafka der Hauptteil der Ereignisverarbeitung ist, wird Zookeeper aus mehreren Gruenden benoetigt. Von der Zookeeper-Website:

ZooKeeper ist ein zentralisierter Dienst zur Verwaltung von Konfigurationsinformationen, Benennung, verteilter Synchronisation und Bereitstellung von Gruppendiensten.

Unten ist die docker-compose.yml, um beide zum Laufen zu bringen:

version: '3'
 
services:
 
 kafka:
  image: wurstmeister/kafka
  container_name: kafka
  ports:
  - "9092:9092"
  environment:
  - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
  - KAFKA_ADVERTISED_PORT=9092
  - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
  depends_on:
  - zookeeper
 
 zookeeper:
  image: wurstmeister/zookeeper
  ports:
  - "2181:2181"
  environment:
  - KAFKA_ADVERTISED_HOST_NAME=zookeeper

Wenn dies vorhanden ist, werden nur die beiden Java-Services benoetigt, die die “Geschaeftsdomaene” implementieren. Nun, eine sehr einfache: Senden und Empfangen eines Zeitstempels.


Code-Setup

Es gibt eine sehr praktische Website, auf der man ein Spring-Projekt mit allen erforderlichen Abhaengigkeiten erstellen und initialisieren kann: Spring Initializr.

Zuerst habe ich die Producer-Anwendung erstellt:

Producer-Initialisierung

Als naechstes kam die Consumer-Anwendung:

Consumer-Initialisierung

Beachten Sie, dass ich bereits die Abhaengigkeit fuer “Spring for Apache Kafka” hinzugefuegt habe.

Nach dem Herunterladen und Entpacken der Projektdateien war es Zeit, mit der Implementierung zu beginnen.

Fuer den Producer und den Consumer werden jeweils vier Dateien benoetigt:

  • Die Application
  • Die Configuration
  • Der Producer, bzw. der Consumer
  • Die Properties-Datei

Was in diese Dateien kommt, wird in den naechsten beiden Kapiteln erklaert. Ich gehe hier nicht ins Detail, da dieses Tutorial kein ausfuehrliches Kafka-Tutorial sein soll.


Producer

Wie oben erwaehnt, “produziert” der Producer Ereignisse, die einen Zeitstempel enthalten, und sendet sie ueber Kafka an alle, die daran interessiert sind, sie zu empfangen.

package net.wissmueller.kafkatutorial.producer;
 
import java.time.ZonedDateTime;
 
public class TimestampEvent {
 private ZonedDateTime timestamp;
 
 public TimestampEvent(ZonedDateTime timestamp) {
 this.timestamp = timestamp;
 }
 
 public ZonedDateTime getTimestamp() {
 return timestamp;
 }
 
 public void setTimestamp(ZonedDateTime timestamp) {
 this.timestamp = timestamp;
 }
}

Alles beginnt mit der Application-Klasse in ProducerApplication.java, die mehr oder weniger unveraendert gelassen wurde. Nur die @EnableScheduling-Annotation wurde hinzugefuegt, die im Producer selbst benoetigt wird.

package net.wissmueller.kafkatutorial.producer;
 
// imports...
 
@SpringBootApplication
@EnableScheduling
public class ProducerApplication {
 
 public static void main(String[] args) {
 SpringApplication.run(ProducerApplication.class, args);
 }
 
}

Etwas Konfiguration ist erforderlich, die ich in ProducerConfiguration.java eingefuegt habe.

In producerFactory() spezifizieren wir:

  • Wo der Server zu finden ist: BOOTSTRAP_SERVERS_CONFIG
  • Den Serializer fuer den Event-Schluessel: KEY_SERIALIZER_CLASS_CONFIG
  • Den Serializer fuer den Event-Wert: VALUE_SERIALIZER_CLASS_CONFIG
  • Die ID fuer die Kafka-Gruppe: GROUP_ID_CONFIG

Wir benoetigen auch das Topic, auf dem die Ereignisse gesendet werden. Daher haben wir timestampTopic(), das NewTopic zurueckgibt.

package net.wissmueller.kafkatutorial.producer;
 
// imports...
 
public class ProducerConfiguration {
 
 @Bean
 public ProducerFactory<String, TimestampEvent> producerFactory() {
 var props = new HashMap<String, Object>();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "tutorialGroup");
 return new DefaultKafkaProducerFactory<>(props);
 }
 
 @Bean
 public KafkaTemplate<String, TimestampEvent> kafkaTemplate() {
 return new KafkaTemplate<>(producerFactory());
 }
 
 @Bean
 public NewTopic timestampTopic() {
 return TopicBuilder.name("timestamp")
     .build();
 }
}

Der Producer selbst befindet sich in KafkaProducer.java:

package net.wissmueller.kafkatutorial.producer;
 
// imports...
 
@Component
public class KafkaProducer {
 private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
 
 @Autowired
 private KafkaTemplate<String, TimestampEvent> kafkaTemplate;
 
 @Scheduled(fixedRate = 5000)
 public void reportCurrentTime() {
 var event = new TimestampEvent(ZonedDateTime.now());
 kafkaTemplate.send("timestamp", event);
 log.info("Sent: {}", event.getTimestamp().toString());
 }
}

Diese Klasse wird mit dem KafkaTemplate initialisiert.

In reportCurrentTime() wird der Zeitstempel alle 5 Sekunden an Kafka gesendet, was ueber die @Scheduled-Annotation implementiert wird. Dies funktioniert nur, wenn die @EnableScheduling-Annotation in der Application-Klasse gesetzt wurde.

Das ist alles fuer den Producer. Weiter zum Consumer…


Consumer

Es ist Zeit, die Ereignisse mit den Zeitstempeln zu empfangen, die vom Producer gesendet wurden.

package net.wissmueller.kafkatutorial.consumer;
 
import java.time.ZonedDateTime;
 
public class TimestampEvent {
 private ZonedDateTime timestamp;
 
 public TimestampEvent() {}
 
 public ZonedDateTime getTimestamp() {
 return timestamp;
 }
 
 public void setTimestamp(ZonedDateTime timestamp) {
 this.timestamp = timestamp;
 }
}

Wie beim Producer ist der Einstiegspunkt die Application-Klasse in ConsumerApplication.java. Diesmal komplett unveraendert, genau wie sie von Spring Initializr generiert wurde.

package net.wissmueller.kafkatutorial.consumer;
 
// imports...
 
@SpringBootApplication
public class ConsumerApplication {
 
 public static void main(String[] args) {
 SpringApplication.run(ConsumerApplication.class, args);
 }
 
}

Die Konfiguration befindet sich in ConsumerConfiguration.java, wo wir analog zum Producer

in consumerFactory() spezifizieren:

  • Wo der Server zu finden ist: BOOTSTRAP_SERVERS_CONFIG
  • Den Deserializer fuer den Event-Schluessel: KEY_DESERIALIZER_CLASS_CONFIG
  • Den Deserializer fuer den Event-Wert: VALUE_DESERIALIZER_CLASS_CONFIG
  • Die ID fuer die Kafka-Gruppe: GROUP_ID_CONFIG
package net.wissmueller.kafkatutorial.consumer;
 
// imports...
 
public class ConsumerConfiguration {
 
 @Bean
 public ConsumerFactory<String, TimestampEvent> consumerFactory() {
 var timestampEventDeserializer = new JsonDeserializer<TimestampEvent>(TimestampEvent.class);
 timestampEventDeserializer.setRemoveTypeHeaders(false);
 timestampEventDeserializer.addTrustedPackages("*");
 timestampEventDeserializer.setUseTypeMapperForKey(true);
 
 var props = new HashMap<String, Object>();
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "tutorialGroup");
 
 return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), timestampEventDeserializer);
 }
 
 @Bean
 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TimestampEvent>> kafkaListenerContainerFactory() {
 var factory = new ConcurrentKafkaListenerContainerFactory<String, TimestampEvent>();
 factory.setConsumerFactory(consumerFactory());
 return factory;
 }
 
 @Bean
 public NewTopic timestampTopic() {
 return TopicBuilder.name("timestamp")
     .build();
 }
}

Zu guter Letzt haben wir den Consumer in KafkaConsumer.java. Wir muessen nur einen Listener auf einem Topic mit der @KafkaListener-Annotation spezifizieren und die Aktion definieren. In diesem Fall wird der Zeitstempel nur geloggt.

package net.wissmueller.kafkatutorial.consumer;
 
// imports...
 
@Component
public class KafkaConsumer {
 private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
 
 @KafkaListener(topics = "timestamp", containerFactory = "kafkaListenerContainerFactory")
 void listener(TimestampEvent event) {
 log.info("Received: {}", event.getTimestamp()
        .toString());
 }
}

Beispielcode ausfuehren

Es ist Zeit, alles auszufuehren. Denk an die folgende Projektstruktur:

code/
- docker-compose.yml
- producer/
-- pom.xml
--...
- consumer/
-- pom.xml
--...

Im Verzeichnis code starten Sie Kafka und Zookeeper ueber docker-compose:

docker-compose up -d

Wechseln Sie in das Verzeichnis producer und starten Sie den Service mit:

mvn spring-boot:run

Oeffnen Sie schliesslich ein neues Terminalfenster, wechseln Sie in das Verzeichnis consumer und starten Sie den Service auf die gleiche Weise:

mvn spring-boot:run

Jetzt sollten Sie etwas Aehnliches wie dies sehen. Links ist die Log-Ausgabe des Producers und rechts die Log-Ausgabe des Consumers.

Dies schliesst dieses einfuehrende Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Java Spring-Boot ab.

Der vollstaendige Projektcode kann hier heruntergeladen werden.


Ressourcen