Spring Kafka
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Wed, 25 Jan 2017 18:57:18 +0000 (19:57 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Wed, 25 Jan 2017 18:57:18 +0000 (19:57 +0100)
13 files changed:
SpringJava/Kafka/SpringKafka/build.gradle [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/gradle.properties [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/settings.gradle [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/build.gradle [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/build.gradle [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml [new file with mode: 0644]

diff --git a/SpringJava/Kafka/SpringKafka/build.gradle b/SpringJava/Kafka/SpringKafka/build.gradle
new file mode 100644 (file)
index 0000000..819aae3
--- /dev/null
@@ -0,0 +1,65 @@
+
+project.ext {
+    springKafkaVersion = '1.1.2.RELEASE'
+}
+
+
+buildscript {
+    ext {
+        springBootGradlePluginVersion = '1.4.3.RELEASE'
+    }
+    repositories {
+        mavenCentral()
+    }
+    dependencies {
+        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootGradlePluginVersion}")
+    }
+}
+
+
+allprojects {
+
+    group = theGroup
+    version = theVersion
+}
+
+
+subprojects {
+    apply plugin: 'java'
+    apply plugin: 'eclipse'
+    apply plugin: 'idea'
+    apply plugin: 'org.springframework.boot'
+    
+    repositories {
+        mavenCentral()
+    }
+    
+    
+    sourceCompatibility = 1.8
+    targetCompatibility = 1.8
+    
+    // *****************   MANIFEST FILE   *****************
+    jar {
+        manifest {
+            attributes('Implementation-Title': 'Spring Kafka example',
+                'Implementation-Version': theVersion,
+                'Build-Time': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ"),
+                'Built-By': System.getProperty('user.name'),
+                'Built-JDK': System.getProperty('java.version')
+            )
+        }
+    }
+    
+    
+    dependencies {
+        compile('org.springframework.boot:spring-boot-starter')
+        compile('javax.inject:javax.inject:1')
+        compile("org.springframework.kafka:spring-kafka:${springKafkaVersion}")
+    
+        testCompile('org.springframework.boot:spring-boot-starter-test')
+        testCompile("org.springframework.kafka:spring-kafka-test:${springKafkaVersion}")
+    }
+
+}
+
diff --git a/SpringJava/Kafka/SpringKafka/gradle.properties b/SpringJava/Kafka/SpringKafka/gradle.properties
new file mode 100644 (file)
index 0000000..7d4fd97
--- /dev/null
@@ -0,0 +1,3 @@
+theGroup='de.example.spring.kafka'
+theName='spring-kafka-example'
+theVersion='1.0-SNAPSHOT'
diff --git a/SpringJava/Kafka/SpringKafka/settings.gradle b/SpringJava/Kafka/SpringKafka/settings.gradle
new file mode 100644 (file)
index 0000000..8836f03
--- /dev/null
@@ -0,0 +1,4 @@
+rootProject.name = theName
+
+include ':spring-kafka-producer-example'
+include ':spring-kafka-consumer-example'
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/build.gradle b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/build.gradle
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java
new file mode 100644 (file)
index 0000000..7448902
--- /dev/null
@@ -0,0 +1,31 @@
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+  public static void main(String[] args) {
+    SpringApplication.run(Application.class);
+  }
+
+  @Bean
+  CommandLineRunner lookup(Sender sender) {
+    return args -> {
+      String topic = "example.topic";
+      String message = "example message";
+
+
+      if (args.length > 0) {
+        topic = args[0];
+        message = args[1];
+      }
+
+      sender.sendMessage(topic, message);
+    };
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java
new file mode 100644 (file)
index 0000000..0ed6b5b
--- /dev/null
@@ -0,0 +1,46 @@
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+public class Sender {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
+
+  @Autowired
+  private KafkaTemplate<Integer, String> kafkaTemplate;
+
+  public void sendMessage(String topic, String message) {
+    // the KafkaTemplate provides asynchronous send methods returning a
+    // Future
+    ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
+        .send(topic, message);
+
+    // you can register a callback with the listener to receive the result
+    // of the send asynchronously
+    future.addCallback(
+        new ListenableFutureCallback<SendResult<Integer, String>>() {
+
+          @Override
+          public void onSuccess(
+              SendResult<Integer, String> result) {
+            LOGGER.info("sent message='{}' with offset={}",
+                message,
+                result.getRecordMetadata().offset());
+          }
+
+          @Override
+          public void onFailure(Throwable ex) {
+            LOGGER.error("unable to send message='{}'",
+                message, ex);
+          }
+        });
+
+    // alternatively, to block the sending thread, to await the result,
+    // invoke the future’s get() method
+  }
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java
new file mode 100644 (file)
index 0000000..79db5d0
--- /dev/null
@@ -0,0 +1,48 @@
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SenderConfig {
+
+  @Value("${kafka.bootstrap.servers}")
+  private String bootstrapServers;
+
+  @Bean
+  public Map producerConfigs() {
+    Map<String, Object> props = new HashMap<>();
+    // list of host:port pairs used for establishing the initial connections
+    // to the Kakfa cluster
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    // value to block, after which it will throw a TimeoutException
+    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
+
+    return props;
+  }
+
+  @Bean
+  public ProducerFactory producerFactory() {
+    return new DefaultKafkaProducerFactory<>(producerConfigs());
+  }
+
+  @Bean
+  public KafkaTemplate kafkaTemplate() {
+    return new KafkaTemplate(producerFactory());
+  }
+
+  @Bean
+  public Sender sender() {
+    return new Sender();
+  }
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..a04f533
--- /dev/null
@@ -0,0 +1,15 @@
+logging:
+  level:
+    org:
+      springframework:
+        ws: debug
+    ROOT: debug
+
+spring:
+  main:
+    banner-mode: "off"
+
+kafka:
+  bootstrap:
+    servers: "kafka:9092,kafka:9093,kafka:9094"
+
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/build.gradle b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/build.gradle
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java
new file mode 100644 (file)
index 0000000..3c88f0a
--- /dev/null
@@ -0,0 +1,29 @@
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+  public static void main(String[] args) {
+    SpringApplication.run(Application.class);
+  }
+
+  @Bean
+  CommandLineRunner lookup(Receiver receiver) {
+    return args -> {
+      String sleepTime = "60000";
+
+
+      if (args.length > 0) {
+        sleepTime = args[0];
+      }
+
+      Thread.sleep(Long.valueOf(sleepTime));
+    };
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java
new file mode 100644 (file)
index 0000000..b606e95
--- /dev/null
@@ -0,0 +1,24 @@
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+
+import java.util.concurrent.CountDownLatch;
+
+public class Receiver {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+  private CountDownLatch latch = new CountDownLatch(1);
+
+  @KafkaListener(topics = "example.topic")
+  public void receiveMessage(String message) {
+    LOGGER.info("received message='{}'", message);
+    latch.countDown();
+  }
+
+  public CountDownLatch getLatch() {
+    return latch;
+  }
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java
new file mode 100644 (file)
index 0000000..238766b
--- /dev/null
@@ -0,0 +1,56 @@
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableKafka
+public class ReceiverConfig {
+
+  @Value("${kafka.bootstrap.servers}")
+  private String bootstrapServers;
+
+  @Bean
+  public Map consumerConfigs() {
+    Map<String, Object> props = new HashMap<>();
+    // list of host:port pairs used for establishing the initial connections
+    // to the Kakfa cluster
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    // consumer groups allow a pool of processes to divide the work of
+    // consuming and processing records
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
+
+    return props;
+  }
+
+  @Bean
+  public ConsumerFactory consumerFactory() {
+    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+  }
+
+  @Bean
+  public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+    factory.setConsumerFactory(consumerFactory());
+
+    return factory;
+  }
+
+  @Bean
+  public Receiver receiver() {
+    return new Receiver();
+  }
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..a04f533
--- /dev/null
@@ -0,0 +1,15 @@
+logging:
+  level:
+    org:
+      springframework:
+        ws: debug
+    ROOT: debug
+
+spring:
+  main:
+    banner-mode: "off"
+
+kafka:
+  bootstrap:
+    servers: "kafka:9092,kafka:9093,kafka:9094"
+