Spring Kafka, swapping directories
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Wed, 25 Jan 2017 22:04:46 +0000 (23:04 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Wed, 25 Jan 2017 22:04:46 +0000 (23:04 +0100)
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java [deleted file]
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java [deleted file]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java [deleted file]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java [deleted file]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java [new file with mode: 0644]
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java [new file with mode: 0644]

index 34409bf..8c533c5 100644 (file)
@@ -13,18 +13,9 @@ public class Application {
   }
 
   @Bean
-  CommandLineRunner lookup(Sender sender) {
+  CommandLineRunner lookup(Receiver receiver) {
     return args -> {
-      String topic = "test";
-      String message = "example message";
 
-
-      if (args.length > 0) {
-        topic = args[0];
-        message = args[1];
-      }
-
-      sender.sendMessage(topic, message);
     };
   }
 
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java
new file mode 100644 (file)
index 0000000..c969cb2
--- /dev/null
@@ -0,0 +1,16 @@
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+
+public class Receiver {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+  @KafkaListener(topics = "test")
+  public void receiveMessage(String message) {
+    LOGGER.info("received message='{}'", message);
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java
new file mode 100644 (file)
index 0000000..238766b
--- /dev/null
@@ -0,0 +1,56 @@
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableKafka
+public class ReceiverConfig {
+
+  @Value("${kafka.bootstrap.servers}")
+  private String bootstrapServers;
+
+  @Bean
+  public Map consumerConfigs() {
+    Map<String, Object> props = new HashMap<>();
+    // list of host:port pairs used for establishing the initial connections
+    // to the Kakfa cluster
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    // consumer groups allow a pool of processes to divide the work of
+    // consuming and processing records
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
+
+    return props;
+  }
+
+  @Bean
+  public ConsumerFactory consumerFactory() {
+    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+  }
+
+  @Bean
+  public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+    factory.setConsumerFactory(consumerFactory());
+
+    return factory;
+  }
+
+  @Bean
+  public Receiver receiver() {
+    return new Receiver();
+  }
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java
deleted file mode 100644 (file)
index 0ed6b5b..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.example.spring.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-public class Sender {
-  private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
-
-  @Autowired
-  private KafkaTemplate<Integer, String> kafkaTemplate;
-
-  public void sendMessage(String topic, String message) {
-    // the KafkaTemplate provides asynchronous send methods returning a
-    // Future
-    ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
-        .send(topic, message);
-
-    // you can register a callback with the listener to receive the result
-    // of the send asynchronously
-    future.addCallback(
-        new ListenableFutureCallback<SendResult<Integer, String>>() {
-
-          @Override
-          public void onSuccess(
-              SendResult<Integer, String> result) {
-            LOGGER.info("sent message='{}' with offset={}",
-                message,
-                result.getRecordMetadata().offset());
-          }
-
-          @Override
-          public void onFailure(Throwable ex) {
-            LOGGER.error("unable to send message='{}'",
-                message, ex);
-          }
-        });
-
-    // alternatively, to block the sending thread, to await the result,
-    // invoke the future’s get() method
-  }
-}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java
deleted file mode 100644 (file)
index c82bac7..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-package de.example.spring.kafka;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-public class SenderConfig {
-
-  @Value("${kafka.bootstrap.servers}")
-  private String bootstrapServers;
-
-  @Bean
-  public Map producerConfigs() {
-    Map<String, Object> props = new HashMap<>();
-    // list of host:port pairs used for establishing the initial connections
-    // to the Kakfa cluster
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-    // value to block, after which it will throw a TimeoutException
-    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
-
-    return props;
-  }
-
-  @Bean
-  public ProducerFactory producerFactory() {
-    return new DefaultKafkaProducerFactory<>(producerConfigs());
-  }
-
-  @Bean
-  public KafkaTemplate kafkaTemplate() {
-    return new KafkaTemplate(producerFactory());
-  }
-
-  @Bean
-  public Sender sender() {
-    return new Sender();
-  }
-}
index 8c533c5..34409bf 100644 (file)
@@ -13,9 +13,18 @@ public class Application {
   }
 
   @Bean
-  CommandLineRunner lookup(Receiver receiver) {
+  CommandLineRunner lookup(Sender sender) {
     return args -> {
+      String topic = "test";
+      String message = "example message";
 
+
+      if (args.length > 0) {
+        topic = args[0];
+        message = args[1];
+      }
+
+      sender.sendMessage(topic, message);
     };
   }
 
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java
deleted file mode 100644 (file)
index c969cb2..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.example.spring.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.kafka.annotation.KafkaListener;
-
-public class Receiver {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
-
-  @KafkaListener(topics = "test")
-  public void receiveMessage(String message) {
-    LOGGER.info("received message='{}'", message);
-  }
-
-}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java
deleted file mode 100644 (file)
index 238766b..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-package de.example.spring.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-@EnableKafka
-public class ReceiverConfig {
-
-  @Value("${kafka.bootstrap.servers}")
-  private String bootstrapServers;
-
-  @Bean
-  public Map consumerConfigs() {
-    Map<String, Object> props = new HashMap<>();
-    // list of host:port pairs used for establishing the initial connections
-    // to the Kakfa cluster
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    // consumer groups allow a pool of processes to divide the work of
-    // consuming and processing records
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
-
-    return props;
-  }
-
-  @Bean
-  public ConsumerFactory consumerFactory() {
-    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
-  }
-
-  @Bean
-  public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
-    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
-    factory.setConsumerFactory(consumerFactory());
-
-    return factory;
-  }
-
-  @Bean
-  public Receiver receiver() {
-    return new Receiver();
-  }
-}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java
new file mode 100644 (file)
index 0000000..0ed6b5b
--- /dev/null
@@ -0,0 +1,46 @@
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+public class Sender {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
+
+  @Autowired
+  private KafkaTemplate<Integer, String> kafkaTemplate;
+
+  public void sendMessage(String topic, String message) {
+    // the KafkaTemplate provides asynchronous send methods returning a
+    // Future
+    ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
+        .send(topic, message);
+
+    // you can register a callback with the listener to receive the result
+    // of the send asynchronously
+    future.addCallback(
+        new ListenableFutureCallback<SendResult<Integer, String>>() {
+
+          @Override
+          public void onSuccess(
+              SendResult<Integer, String> result) {
+            LOGGER.info("sent message='{}' with offset={}",
+                message,
+                result.getRecordMetadata().offset());
+          }
+
+          @Override
+          public void onFailure(Throwable ex) {
+            LOGGER.error("unable to send message='{}'",
+                message, ex);
+          }
+        });
+
+    // alternatively, to block the sending thread, to await the result,
+    // invoke the future’s get() method
+  }
+}
diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java
new file mode 100644 (file)
index 0000000..c82bac7
--- /dev/null
@@ -0,0 +1,50 @@
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class SenderConfig {
+
+  @Value("${kafka.bootstrap.servers}")
+  private String bootstrapServers;
+
+  @Bean
+  public Map producerConfigs() {
+    Map<String, Object> props = new HashMap<>();
+    // list of host:port pairs used for establishing the initial connections
+    // to the Kakfa cluster
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    // value to block, after which it will throw a TimeoutException
+    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
+
+    return props;
+  }
+
+  @Bean
+  public ProducerFactory producerFactory() {
+    return new DefaultKafkaProducerFactory<>(producerConfigs());
+  }
+
+  @Bean
+  public KafkaTemplate kafkaTemplate() {
+    return new KafkaTemplate(producerFactory());
+  }
+
+  @Bean
+  public Sender sender() {
+    return new Sender();
+  }
+}