Fixing errors.
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Thu, 26 Jan 2017 00:26:59 +0000 (01:26 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Thu, 26 Jan 2017 08:03:06 +0000 (09:03 +0100)
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java
SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java
SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java

index 8c533c5..c0222a0 100644 (file)
@@ -3,20 +3,35 @@ package de.example.spring.kafka;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.KafkaNull;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.PollableChannel;
+import org.springframework.messaging.support.GenericMessage;
 
 @SpringBootApplication
 public class Application {
 
-  public static void main(String[] args) {
-    SpringApplication.run(Application.class);
+  public static void main(String[] args) throws Exception {
+    ConfigurableApplicationContext context
+        = new SpringApplicationBuilder(Application.class)
+        .web(false)
+        .run(args);
+    MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
+    for (int i = 0; i < 10; i++) {
+      toKafka.send(new GenericMessage<>("foo" + i));
+    }
+    toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE));
+    PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
+    Message<?> received = fromKafka.receive(10000);
+    while (received != null) {
+      System.out.println(received);
+      received = fromKafka.receive(10000);
+    }
+    context.close();
+    System.exit(0);
   }
-
-  @Bean
-  CommandLineRunner lookup(Receiver receiver) {
-    return args -> {
-
-    };
-  }
-
 }
index 238766b..7d1a3e0 100644 (file)
@@ -1,7 +1,6 @@
 package de.example.spring.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
@@ -22,28 +21,31 @@ public class ReceiverConfig {
   private String bootstrapServers;
 
   @Bean
-  public Map consumerConfigs() {
+  public Map<String, Object> consumerConfigs() {
     Map<String, Object> props = new HashMap<>();
     // list of host:port pairs used for establishing the initial connections
     // to the Kakfa cluster
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     // consumer groups allow a pool of processes to divide the work of
     // consuming and processing records
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
 
     return props;
   }
 
   @Bean
-  public ConsumerFactory consumerFactory() {
+  public ConsumerFactory<String, String> consumerFactory() {
     return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }
 
   @Bean
   public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
-    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(consumerFactory());
 
     return factory;
index 0ed6b5b..3208931 100644 (file)
@@ -1,42 +1,65 @@
 package de.example.spring.kafka;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.util.concurrent.ListenableFutureCallback;
 
+import javax.inject.Inject;
+
 public class Sender {
   private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
 
-  @Autowired
-  private KafkaTemplate<Integer, String> kafkaTemplate;
+  private final KafkaTemplate<String, String> kafkaTemplate;
+
+  @Inject
+  public Sender(KafkaTemplate<String, String> kafkaTemplate) {
+    this.kafkaTemplate = kafkaTemplate;
+  }
 
   public void sendMessage(String topic, String message) {
     // the KafkaTemplate provides asynchronous send methods returning a
     // Future
-    ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
-        .send(topic, message);
+    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
+
+    // The same with MessageBuilder
+    // ListenableFuture<SendResult<String, String>> future =
+    //    kafkaTemplate.send(
+    //        MessageBuilder.withPayload(message)
+    //            .setHeader(KafkaHeaders.TOPIC, topic)
+    //            .build());
 
     // you can register a callback with the listener to receive the result
     // of the send asynchronously
-    future.addCallback(
-        new ListenableFutureCallback<SendResult<Integer, String>>() {
+    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
 
           @Override
-          public void onSuccess(
-              SendResult<Integer, String> result) {
-            LOGGER.info("sent message='{}' with offset={}",
-                message,
-                result.getRecordMetadata().offset());
+          public void onSuccess(SendResult<String, String> result) {
+            RecordMetadata recordMetadata = result.getRecordMetadata();
+            ProducerRecord producerRecord = result.getProducerRecord();
+
+            LOGGER.info("sent message='{}'", message);
+            LOGGER.info("RecordMetadata");
+            LOGGER.info("with offset={}", recordMetadata.offset());
+            LOGGER.info("with partition={}", recordMetadata.partition());
+            LOGGER.info("with checksum={}", recordMetadata.checksum());
+            LOGGER.info("with timestamp={}", recordMetadata.timestamp());
+            LOGGER.info("with timestamp={}", recordMetadata.topic());
+
+            LOGGER.info("ProducerRecord");
+            LOGGER.info("with partition={}", producerRecord.partition());
+            LOGGER.info("with value={}", producerRecord.value());
+            LOGGER.info("with timestamp={}", producerRecord.timestamp());
+            LOGGER.info("with topic={}", producerRecord.topic());
           }
 
           @Override
           public void onFailure(Throwable ex) {
-            LOGGER.error("unable to send message='{}'",
-                message, ex);
+            LOGGER.error("unable to send message='{}'", message, ex);
           }
         });
 
index c82bac7..61badbb 100644 (file)
@@ -1,7 +1,6 @@
 package de.example.spring.kafka;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
@@ -20,13 +19,19 @@ public class SenderConfig {
   private String bootstrapServers;
 
   @Bean
-  public Map producerConfigs() {
+  public Map<String, Object> producerConfigs() {
     Map<String, Object> props = new HashMap<>();
     // list of host:port pairs used for establishing the initial connections
     // to the Kakfa cluster
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 2097152);
+    props.put(ProducerConfig.RETRIES_CONFIG, 0);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+
     // value to block, after which it will throw a TimeoutException
     props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
 
@@ -34,17 +39,17 @@ public class SenderConfig {
   }
 
   @Bean
-  public ProducerFactory producerFactory() {
+  public ProducerFactory<String, String> producerFactory() {
     return new DefaultKafkaProducerFactory<>(producerConfigs());
   }
 
   @Bean
-  public KafkaTemplate kafkaTemplate() {
-    return new KafkaTemplate(producerFactory());
+  public KafkaTemplate<String, String> kafkaTemplate() {
+    return new KafkaTemplate<>(producerFactory());
   }
 
   @Bean
-  public Sender sender() {
-    return new Sender();
+  public Sender sender(KafkaTemplate<String, String> kafkaTemplate) {
+    return new Sender(kafkaTemplate);
   }
 }