
Ereignisgesteuerte Architekturen sind in den letzten Jahren weit verbreitet geworden, wobei Kafka der De-facto-Standard fuer Tools ist.
Dieser Beitrag bietet ein vollstaendiges Beispiel einer ereignisgesteuerten Architektur, implementiert mit zwei Java Spring Boot-Services, die ueber Kafka kommunizieren.
Das Hauptziel dieses Tutorials ist es, ein funktionierendes Beispiel zu liefern, ohne zu tief in Details einzutauchen, die davon ablenken koennten, schnell etwas zum Laufen zu bringen.
Das Projekt besteht aus einigen Bausteinen:
- Infrastruktur (Kafka, Zookeeper)
- Producer (Java Spring Boot-Service)
- Consumer (Java Spring Boot-Service)
Die einzige Aufgabe des Producers ist es, periodisch ein Event an Kafka zu senden. Das Event traegt einfach einen Zeitstempel. Der Consumer hoert auf dieses Event und gibt den Zeitstempel aus.

Die Implementierung fuehrte zu folgender Projektstruktur.

Der vollstaendige Projektcode kann hier heruntergeladen werden.
Du kannst das von der Kommandozeile aus bauen, wie unten erklaert, oder es in eine IDE wie IntelliJ importieren.

Infrastruktur
Nur zwei Komponenten, neben den Services, werden benoetigt, um eine ereignisbasierte Architektur zum Laufen zu bringen: Kafka und Zookeeper.
Siehe den Ressourcenabschnitt am Ende des Tutorials fuer Links zu beiden.
Kafka ist die Hauptkomponente fuer die Verarbeitung von Events; Zookeeper wird fuer mehrere unterstuetzende Funktionen benoetigt. Von der Zookeeper-Website:
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
Unten ist die docker-compose.yml, um beides zum Laufen zu bringen:
version: '3'
services:
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
environment:
- KAFKA_ADVERTISED_HOST_NAME=zookeeperWenn das an Ort und Stelle ist, werden nur die beiden Java-Services benoetigt, die die “Geschaeftsdomaene” implementieren - eine sehr einfache: Senden und Empfangen eines Zeitstempels.
Code-Setup
Es gibt eine praktische Website, auf der du ein Spring-Projekt mit allen erforderlichen Abhaengigkeiten erstellen und initialisieren kannst: Spring Initializr.
Zuerst habe ich die Producer-Anwendung erstellt:

Als Naechstes die Consumer-Anwendung:

Beachte, dass ich bereits die Abhaengigkeit fuer “Spring for Apache Kafka” hinzugefuegt habe.
Nach dem Herunterladen und Entpacken der Projektdateien war es Zeit, mit der Implementierung zu beginnen.
Fuer sowohl den Producer als auch den Consumer werden vier Dateien benoetigt:
- Die Application
- Die Configuration
- Der Producer (oder der Consumer)
- Die Properties-Datei
Was in diese Dateien kommt, wird in den naechsten beiden Kapiteln erklaert. Ich gehe hier nicht auf die Details ein, da dieses Tutorial nicht als ausfuehrlicher Kafka-Leitfaden gedacht ist.
Producer
Wie oben erwaehnt, “produziert” der Producer Zeitstempel und sendet sie ueber Kafka an interessierte Consumer.
Alles beginnt mit der Application-Klasse in ProducerApplication.java, die groesstenteils unveraendert geblieben ist. Nur die @EnableScheduling-Annotation wurde hinzugefuegt, die vom Producer benoetigt wird.
package net.wissmueller.kafkatutorial.producer;
// imports...
@SpringBootApplication
@EnableScheduling
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}Etwas Konfiguration wird benoetigt, die ich in ProducerConfiguration.java platziert habe.
Wir muessen dem Producer mitteilen, wo er Kafka findet und welche Serialisierer fuer die Events verwendet werden sollen. Dies geschieht in producerConfigs().
Ein Event hat einen Key und einen Value. Fuer beide verwenden wir die String-Klasse. Dies wird in kafkaTemplate() angegeben.
Wir benoetigen auch das Topic, auf dem die Events gesendet werden sollen. Daher haben wir timestampTopic(), das ein NewTopic zurueckgibt.
package net.wissmueller.kafkatutorial.producer;
// imports...
public class ProducerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public NewTopic timestampTopic() {
return TopicBuilder.name("timestamp")
.build();
}
}Diese Klasse erfordert einige Properties an der ueblichen Stelle: application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup
Der Producer befindet sich in KafkaProducer.java:
package net.wissmueller.kafkatutorial.producer;
// imports...
@Component
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
private KafkaTemplate<String, String> kafkaTemplate;
KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
void sendMessage(String message, String topicName) {
kafkaTemplate.send(topicName, message);
}
@Scheduled(fixedRate = 5000)
public void reportCurrentTime() {
String timestamp = dateFormat.format(new Date());
sendMessage(timestamp, "timestamp");
log.info("Sent: {}", timestamp);
}
}Diese Klasse wird mit dem KafkaTemplate initialisiert.
In reportCurrentTime() wird der Zeitstempel alle 5 Sekunden an Kafka gesendet, implementiert ueber die @Scheduled-Annotation. Dies funktioniert nur, wenn @EnableScheduling in der Application-Klasse gesetzt ist.
Das ist alles fuer den Producer. Weiter zum Consumer…
Consumer
Jetzt empfangen wir die vom Producer gesendeten Zeitstempel.
Wie beim Producer ist der Einstiegspunkt die Application-Klasse in ConsumerApplication.java. Diese Datei ist unveraendert gegenueber der vom Spring Initializr generierten Version.
package net.wissmueller.kafkatutorial.consumer;
// imports...
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}Die Konfiguration befindet sich in ConsumerConfiguration.java, die analog zum Producer Folgendes spezifiziert:
- wie man sich mit Kafka verbindet
- welche Serialisierer verwendet werden
- das Format der Kafka-Events
package net.wissmueller.kafkatutorial.consumer;
// imports...
public class ConsumerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public NewTopic timestampTopic() {
return TopicBuilder.name("timestamp")
.build();
}
}Hier muessen wir die folgenden Properties in application.properties setzen:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=tutorialGroup
Zu guter Letzt haben wir den Consumer in KafkaConsumer.java. Wir muessen nur einen Listener auf einem Topic mit der @KafkaListener-Annotation spezifizieren und die Aktion definieren. In diesem Fall wird der Zeitstempel protokolliert.
package net.wissmueller.kafkatutorial.consumer;
// imports...
@Component
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "timestamp")
void listener(String timestamp) {
log.info("Received: {}", timestamp);
}
}Beispielcode ausfuehren
Es ist Zeit, alles auszufuehren. Erinnere dich an die folgende Projektstruktur:
code/
- docker-compose.yml
- producer/
-- pom.xml
--...
- consumer/
-- pom.xml
--...
Im code-Verzeichnis werden Kafka und Zookeeper ueber docker-compose gestartet:
docker-compose up -d
Im producer-Verzeichnis starte den Service mit:
mvn spring-boot:run
Schliesslich wechsle in einem neuen Terminalfenster in das consumer-Verzeichnis und starte den Service auf die gleiche Weise:
mvn spring-boot:run
Du solltest eine Ausgabe aehnlich der folgenden sehen. Links ist die Log-Ausgabe des Producers und rechts die Log-Ausgabe des Consumers.

Damit ist das einfuehrende Tutorial zur Erstellung einer ereignisgesteuerten Architektur mit Kafka und Java Spring Boot abgeschlossen.
Der vollstaendige Projektcode kann hier heruntergeladen werden.