From 15fa1e82639375af71c1a37e10082782819f0915 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Wed, 25 Jan 2017 19:57:18 +0100 Subject: [PATCH] Spring Kafka --- SpringJava/Kafka/SpringKafka/build.gradle | 65 ++++++++++++++++++++++ SpringJava/Kafka/SpringKafka/gradle.properties | 3 + SpringJava/Kafka/SpringKafka/settings.gradle | 4 ++ .../spring-kafka-consumer-example/build.gradle | 0 .../java/de/example/spring/kafka/Application.java | 31 +++++++++++ .../main/java/de/example/spring/kafka/Sender.java | 46 +++++++++++++++ .../java/de/example/spring/kafka/SenderConfig.java | 48 ++++++++++++++++ .../src/main/resources/application.yml | 15 +++++ .../spring-kafka-producer-example/build.gradle | 0 .../java/de/example/spring/kafka/Application.java | 29 ++++++++++ .../java/de/example/spring/kafka/Receiver.java | 24 ++++++++ .../de/example/spring/kafka/ReceiverConfig.java | 56 +++++++++++++++++++ .../src/main/resources/application.yml | 15 +++++ 13 files changed, 336 insertions(+) create mode 100644 SpringJava/Kafka/SpringKafka/build.gradle create mode 100644 SpringJava/Kafka/SpringKafka/gradle.properties create mode 100644 SpringJava/Kafka/SpringKafka/settings.gradle create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/build.gradle create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/build.gradle create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml diff --git a/SpringJava/Kafka/SpringKafka/build.gradle b/SpringJava/Kafka/SpringKafka/build.gradle new file mode 100644 index 0000000..819aae3 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/build.gradle @@ -0,0 +1,65 @@ + +project.ext { + springKafkaVersion = '1.1.2.RELEASE' +} + + +buildscript { + ext { + springBootGradlePluginVersion = '1.4.3.RELEASE' + } + repositories { + mavenCentral() + } + dependencies { + classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootGradlePluginVersion}") + } +} + + + +allprojects { + + group = theGroup + version = theVersion +} + + +subprojects { + apply plugin: 'java' + apply plugin: 'eclipse' + apply plugin: 'idea' + apply plugin: 'org.springframework.boot' + + repositories { + mavenCentral() + } + + + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + + // ***************** MANIFEST FILE ***************** + jar { + manifest { + attributes('Implementation-Title': 'Spring Kafka example', + 'Implementation-Version': theVersion, + 'Build-Time': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ"), + 'Built-By': System.getProperty('user.name'), + 'Built-JDK': System.getProperty('java.version') + ) + } + } + + + dependencies { + compile('org.springframework.boot:spring-boot-starter') + compile('javax.inject:javax.inject:1') + compile("org.springframework.kafka:spring-kafka:${springKafkaVersion}") + + testCompile('org.springframework.boot:spring-boot-starter-test') + testCompile("org.springframework.kafka:spring-kafka-test:${springKafkaVersion}") + } + +} + diff --git a/SpringJava/Kafka/SpringKafka/gradle.properties b/SpringJava/Kafka/SpringKafka/gradle.properties new file mode 100644 index 0000000..7d4fd97 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/gradle.properties @@ -0,0 +1,3 @@ +theGroup='de.example.spring.kafka' +theName='spring-kafka-example' +theVersion='1.0-SNAPSHOT' diff --git a/SpringJava/Kafka/SpringKafka/settings.gradle b/SpringJava/Kafka/SpringKafka/settings.gradle new file mode 100644 index 0000000..8836f03 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/settings.gradle @@ -0,0 +1,4 @@ +rootProject.name = theName + +include ':spring-kafka-producer-example' +include ':spring-kafka-consumer-example' diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/build.gradle b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/build.gradle new file mode 100644 index 0000000..e69de29 diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..7448902 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,31 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + @Bean + CommandLineRunner lookup(Sender sender) { + return args -> { + String topic = "example.topic"; + String message = "example message"; + + + if (args.length > 0) { + topic = args[0]; + message = args[1]; + } + + sender.sendMessage(topic, message); + }; + } + +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java new file mode 100644 index 0000000..0ed6b5b --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java @@ -0,0 +1,46 @@ +package de.example.spring.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +public class Sender { + private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void sendMessage(String topic, String message) { + // the KafkaTemplate provides asynchronous send methods returning a + // Future + ListenableFuture> future = kafkaTemplate + .send(topic, message); + + // you can register a callback with the listener to receive the result + // of the send asynchronously + future.addCallback( + new ListenableFutureCallback>() { + + @Override + public void onSuccess( + SendResult result) { + LOGGER.info("sent message='{}' with offset={}", + message, + result.getRecordMetadata().offset()); + } + + @Override + public void onFailure(Throwable ex) { + LOGGER.error("unable to send message='{}'", + message, ex); + } + }); + + // alternatively, to block the sending thread, to await the result, + // invoke the future’s get() method + } +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java new file mode 100644 index 0000000..79db5d0 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -0,0 +1,48 @@ +package de.example.spring.kafka; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +public class SenderConfig { + + @Value("${kafka.bootstrap.servers}") + private String bootstrapServers; + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + // list of host:port pairs used for establishing the initial connections + // to the Kakfa cluster + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // value to block, after which it will throw a TimeoutException + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); + + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); + } + + @Bean + public Sender sender() { + return new Sender(); + } +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml new file mode 100644 index 0000000..a04f533 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml @@ -0,0 +1,15 @@ +logging: + level: + org: + springframework: + ws: debug + ROOT: debug + +spring: + main: + banner-mode: "off" + +kafka: + bootstrap: + servers: "kafka:9092,kafka:9093,kafka:9094" + diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/build.gradle b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/build.gradle new file mode 100644 index 0000000..e69de29 diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..3c88f0a --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,29 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + @Bean + CommandLineRunner lookup(Receiver receiver) { + return args -> { + String sleepTime = "60000"; + + + if (args.length > 0) { + sleepTime = args[0]; + } + + Thread.sleep(Long.valueOf(sleepTime)); + }; + } + +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java new file mode 100644 index 0000000..b606e95 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java @@ -0,0 +1,24 @@ +package de.example.spring.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; + +import java.util.concurrent.CountDownLatch; + +public class Receiver { + + private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); + + private CountDownLatch latch = new CountDownLatch(1); + + @KafkaListener(topics = "example.topic") + public void receiveMessage(String message) { + LOGGER.info("received message='{}'", message); + latch.countDown(); + } + + public CountDownLatch getLatch() { + return latch; + } +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java new file mode 100644 index 0000000..238766b --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -0,0 +1,56 @@ +package de.example.spring.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@EnableKafka +public class ReceiverConfig { + + @Value("${kafka.bootstrap.servers}") + private String bootstrapServers; + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + // list of host:port pairs used for establishing the initial connections + // to the Kakfa cluster + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + // consumer groups allow a pool of processes to divide the work of + // consuming and processing records + props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); + + return props; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + + return factory; + } + + @Bean + public Receiver receiver() { + return new Receiver(); + } +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml new file mode 100644 index 0000000..a04f533 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml @@ -0,0 +1,15 @@ +logging: + level: + org: + springframework: + ws: debug + ROOT: debug + +spring: + main: + banner-mode: "off" + +kafka: + bootstrap: + servers: "kafka:9092,kafka:9093,kafka:9094" + -- 2.1.4