Ereignisgesteuerte Architekturen sind in den letzten Jahren weit verbreitet geworden, wobei Kafka der De-facto-Standard fuer Tools ist.

Dieser Beitrag bietet ein vollstaendiges Beispiel einer ereignisgesteuerten Architektur, implementiert mit zwei Java Spring Boot-Services, die ueber Kafka kommunizieren.

Das Hauptziel dieses Tutorials ist es, ein funktionierendes Beispiel zu liefern, ohne zu tief in Details einzutauchen, die davon ablenken koennten, schnell etwas zum Laufen zu bringen.

Das Projekt besteht aus einigen Bausteinen:

  • Infrastruktur (Kafka, Zookeeper)
  • Producer (Java Spring Boot-Service)
  • Consumer (Java Spring Boot-Service)

Die einzige Aufgabe des Producers ist es, periodisch ein Event an Kafka zu senden. Das Event traegt einfach einen Zeitstempel. Der Consumer hoert auf dieses Event und gibt den Zeitstempel aus.

Kafka

Die Implementierung fuehrte zu folgender Projektstruktur.

Projektstruktur

Der vollstaendige Projektcode kann hier heruntergeladen werden.

Du kannst das von der Kommandozeile aus bauen, wie unten erklaert, oder es in eine IDE wie IntelliJ importieren.

IntelliJ-Projekt

Infrastruktur

Nur zwei Komponenten, neben den Services, werden benoetigt, um eine ereignisbasierte Architektur zum Laufen zu bringen: Kafka und Zookeeper.

Siehe den Ressourcenabschnitt am Ende des Tutorials fuer Links zu beiden.

Kafka ist die Hauptkomponente fuer die Verarbeitung von Events; Zookeeper wird fuer mehrere unterstuetzende Funktionen benoetigt. Von der Zookeeper-Website:

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Unten ist die docker-compose.yml, um beides zum Laufen zu bringen:

version: '3'
 
services:
 
  kafka:
   image: wurstmeister/kafka
   container_name: kafka
   ports:
    - "9092:9092"
   environment:
    - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
    - KAFKA_ADVERTISED_PORT=9092
    - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
   depends_on:
    - zookeeper
 
  zookeeper:
   image: wurstmeister/zookeeper
   ports:
    - "2181:2181"
   environment:
    - KAFKA_ADVERTISED_HOST_NAME=zookeeper

Wenn das an Ort und Stelle ist, werden nur die beiden Java-Services benoetigt, die die “Geschaeftsdomaene” implementieren - eine sehr einfache: Senden und Empfangen eines Zeitstempels.

Code-Setup

Es gibt eine praktische Website, auf der du ein Spring-Projekt mit allen erforderlichen Abhaengigkeiten erstellen und initialisieren kannst: Spring Initializr.

Zuerst habe ich die Producer-Anwendung erstellt:

Producer-Initialisierung

Als Naechstes die Consumer-Anwendung:

Consumer-Initialisierung

Beachte, dass ich bereits die Abhaengigkeit fuer “Spring for Apache Kafka” hinzugefuegt habe.

Nach dem Herunterladen und Entpacken der Projektdateien war es Zeit, mit der Implementierung zu beginnen.

Fuer sowohl den Producer als auch den Consumer werden vier Dateien benoetigt:

  • Die Application
  • Die Configuration
  • Der Producer (oder der Consumer)
  • Die Properties-Datei

Was in diese Dateien kommt, wird in den naechsten beiden Kapiteln erklaert. Ich gehe hier nicht auf die Details ein, da dieses Tutorial nicht als ausfuehrlicher Kafka-Leitfaden gedacht ist.

Producer

Wie oben erwaehnt, “produziert” der Producer Zeitstempel und sendet sie ueber Kafka an interessierte Consumer.

Alles beginnt mit der Application-Klasse in ProducerApplication.java, die groesstenteils unveraendert geblieben ist. Nur die @EnableScheduling-Annotation wurde hinzugefuegt, die vom Producer benoetigt wird.

package net.wissmueller.kafkatutorial.producer;
 
// imports...
 
@SpringBootApplication
@EnableScheduling
public class ProducerApplication {
 
 public static void main(String[] args) {
  SpringApplication.run(ProducerApplication.class, args);
 }
 
}

Etwas Konfiguration wird benoetigt, die ich in ProducerConfiguration.java platziert habe.

Wir muessen dem Producer mitteilen, wo er Kafka findet und welche Serialisierer fuer die Events verwendet werden sollen. Dies geschieht in producerConfigs().

Ein Event hat einen Key und einen Value. Fuer beide verwenden wir die String-Klasse. Dies wird in kafkaTemplate() angegeben.

Wir benoetigen auch das Topic, auf dem die Events gesendet werden sollen. Daher haben wir timestampTopic(), das ein NewTopic zurueckgibt.

package net.wissmueller.kafkatutorial.producer;
 
// imports...
 
public class ProducerConfiguration {
 
 @Value("${kafka.bootstrap-servers}")
 private String bootstrapServers;
 
 @Bean
 public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  return props;
 }
 
 @Bean
 public ProducerFactory<String, String> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
 }
 
 @Bean
 public KafkaTemplate<String, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
 }
 
 @Bean
 public NewTopic timestampTopic() {
  return TopicBuilder.name("timestamp")
           .build();
 }
 
}

Diese Klasse erfordert einige Properties an der ueblichen Stelle: application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup

Der Producer befindet sich in KafkaProducer.java:

package net.wissmueller.kafkatutorial.producer;
 
// imports...
 
@Component
public class KafkaProducer {
 private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
 
 private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
 
 private KafkaTemplate<String, String> kafkaTemplate;
 
 KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  this.kafkaTemplate = kafkaTemplate;
 }
 
 void sendMessage(String message, String topicName) {
  kafkaTemplate.send(topicName, message);
 }
 
 @Scheduled(fixedRate = 5000)
 public void reportCurrentTime() {
  String timestamp = dateFormat.format(new Date());
  sendMessage(timestamp, "timestamp");
  log.info("Sent: {}", timestamp);
 }
}

Diese Klasse wird mit dem KafkaTemplate initialisiert.

In reportCurrentTime() wird der Zeitstempel alle 5 Sekunden an Kafka gesendet, implementiert ueber die @Scheduled-Annotation. Dies funktioniert nur, wenn @EnableScheduling in der Application-Klasse gesetzt ist.

Das ist alles fuer den Producer. Weiter zum Consumer…

Consumer

Jetzt empfangen wir die vom Producer gesendeten Zeitstempel.

Wie beim Producer ist der Einstiegspunkt die Application-Klasse in ConsumerApplication.java. Diese Datei ist unveraendert gegenueber der vom Spring Initializr generierten Version.

package net.wissmueller.kafkatutorial.consumer;
 
// imports...
 
@SpringBootApplication
public class ConsumerApplication {
 
 public static void main(String[] args) {
  SpringApplication.run(ConsumerApplication.class, args);
 }
 
}

Die Konfiguration befindet sich in ConsumerConfiguration.java, die analog zum Producer Folgendes spezifiziert:

  • wie man sich mit Kafka verbindet
  • welche Serialisierer verwendet werden
  • das Format der Kafka-Events
package net.wissmueller.kafkatutorial.consumer;
 
// imports...
 
public class ConsumerConfiguration {
 
 @Value("${kafka.bootstrap-servers}")
 private String bootstrapServers;
 
 @Bean
 public Map<String, Object> consumerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  return props;
 }
 
 @Bean
 public ConsumerFactory<String, String> consumerFactory() {
  return new DefaultKafkaConsumerFactory<>(consumerConfigs());
 }
 
 @Bean
 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  return factory;
 }
 
 @Bean
 public NewTopic timestampTopic() {
  return TopicBuilder.name("timestamp")
           .build();
 }
 
}

Hier muessen wir die folgenden Properties in application.properties setzen:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup

Zu guter Letzt haben wir den Consumer in KafkaConsumer.java. Wir muessen nur einen Listener auf einem Topic mit der @KafkaListener-Annotation spezifizieren und die Aktion definieren. In diesem Fall wird der Zeitstempel protokolliert.

package net.wissmueller.kafkatutorial.consumer;
 
// imports...
 
@Component
public class KafkaConsumer {
 private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
 
 @KafkaListener(topics = "timestamp")
 void listener(String timestamp) {
  log.info("Received: {}", timestamp);
 }
}

Beispielcode ausfuehren

Es ist Zeit, alles auszufuehren. Erinnere dich an die folgende Projektstruktur:

code/
- docker-compose.yml
- producer/
-- pom.xml
--...
- consumer/
-- pom.xml
--...

Im code-Verzeichnis werden Kafka und Zookeeper ueber docker-compose gestartet:

docker-compose up -d

Im producer-Verzeichnis starte den Service mit:

mvn spring-boot:run

Schliesslich wechsle in einem neuen Terminalfenster in das consumer-Verzeichnis und starte den Service auf die gleiche Weise:

mvn spring-boot:run

Du solltest eine Ausgabe aehnlich der folgenden sehen. Links ist die Log-Ausgabe des Producers und rechts die Log-Ausgabe des Consumers.

Terminal-Ausgabe

Damit ist das einfuehrende Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Java Spring Boot abgeschlossen.

Der vollstaendige Projektcode kann hier heruntergeladen werden.

Ressourcen