--- /dev/null
+
+project.ext {
+ springKafkaVersion = '1.1.2.RELEASE'
+}
+
+
+buildscript {
+ ext {
+ springBootGradlePluginVersion = '1.4.3.RELEASE'
+ }
+ repositories {
+ mavenCentral()
+ }
+ dependencies {
+ classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootGradlePluginVersion}")
+ }
+}
+
+
+
+allprojects {
+
+ group = theGroup
+ version = theVersion
+}
+
+
+subprojects {
+ apply plugin: 'java'
+ apply plugin: 'eclipse'
+ apply plugin: 'idea'
+ apply plugin: 'org.springframework.boot'
+
+ repositories {
+ mavenCentral()
+ }
+
+
+ sourceCompatibility = 1.8
+ targetCompatibility = 1.8
+
+ // ***************** MANIFEST FILE *****************
+ jar {
+ manifest {
+ attributes('Implementation-Title': 'Spring Kafka example',
+ 'Implementation-Version': theVersion,
+ 'Build-Time': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ"),
+ 'Built-By': System.getProperty('user.name'),
+ 'Built-JDK': System.getProperty('java.version')
+ )
+ }
+ }
+
+
+ dependencies {
+ compile('org.springframework.boot:spring-boot-starter')
+ compile('javax.inject:javax.inject:1')
+ compile("org.springframework.kafka:spring-kafka:${springKafkaVersion}")
+
+ testCompile('org.springframework.boot:spring-boot-starter-test')
+ testCompile("org.springframework.kafka:spring-kafka-test:${springKafkaVersion}")
+ }
+
+}
+
--- /dev/null
+theGroup='de.example.spring.kafka'
+theName='spring-kafka-example'
+theVersion='1.0-SNAPSHOT'
--- /dev/null
+rootProject.name = theName
+
+include ':spring-kafka-producer-example'
+include ':spring-kafka-consumer-example'
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+ @Bean
+ CommandLineRunner lookup(Sender sender) {
+ return args -> {
+ String topic = "example.topic";
+ String message = "example message";
+
+
+ if (args.length > 0) {
+ topic = args[0];
+ message = args[1];
+ }
+
+ sender.sendMessage(topic, message);
+ };
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+public class Sender {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
+
+ @Autowired
+ private KafkaTemplate<Integer, String> kafkaTemplate;
+
+ public void sendMessage(String topic, String message) {
+ // the KafkaTemplate provides asynchronous send methods returning a
+ // Future
+ ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
+ .send(topic, message);
+
+ // you can register a callback with the listener to receive the result
+ // of the send asynchronously
+ future.addCallback(
+ new ListenableFutureCallback<SendResult<Integer, String>>() {
+
+ @Override
+ public void onSuccess(
+ SendResult<Integer, String> result) {
+ LOGGER.info("sent message='{}' with offset={}",
+ message,
+ result.getRecordMetadata().offset());
+ }
+
+ @Override
+ public void onFailure(Throwable ex) {
+ LOGGER.error("unable to send message='{}'",
+ message, ex);
+ }
+ });
+
+ // alternatively, to block the sending thread, to await the result,
+ // invoke the future’s get() method
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SenderConfig {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public Map producerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ // list of host:port pairs used for establishing the initial connections
+ // to the Kakfa cluster
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ // value to block, after which it will throw a TimeoutException
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
+
+ return props;
+ }
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ return new DefaultKafkaProducerFactory<>(producerConfigs());
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate(producerFactory());
+ }
+
+ @Bean
+ public Sender sender() {
+ return new Sender();
+ }
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework:
+ ws: debug
+ ROOT: debug
+
+spring:
+ main:
+ banner-mode: "off"
+
+kafka:
+ bootstrap:
+ servers: "kafka:9092,kafka:9093,kafka:9094"
+
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+ @Bean
+ CommandLineRunner lookup(Receiver receiver) {
+ return args -> {
+ String sleepTime = "60000";
+
+
+ if (args.length > 0) {
+ sleepTime = args[0];
+ }
+
+ Thread.sleep(Long.valueOf(sleepTime));
+ };
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+
+import java.util.concurrent.CountDownLatch;
+
+public class Receiver {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+ private CountDownLatch latch = new CountDownLatch(1);
+
+ @KafkaListener(topics = "example.topic")
+ public void receiveMessage(String message) {
+ LOGGER.info("received message='{}'", message);
+ latch.countDown();
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableKafka
+public class ReceiverConfig {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public Map consumerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ // list of host:port pairs used for establishing the initial connections
+ // to the Kakfa cluster
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ // consumer groups allow a pool of processes to divide the work of
+ // consuming and processing records
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
+
+ return props;
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+
+ return factory;
+ }
+
+ @Bean
+ public Receiver receiver() {
+ return new Receiver();
+ }
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework:
+ ws: debug
+ ROOT: debug
+
+spring:
+ main:
+ banner-mode: "off"
+
+kafka:
+ bootstrap:
+ servers: "kafka:9092,kafka:9093,kafka:9094"
+