From aeaab6ed5663a5b6aefaefa7c03131c4f7425024 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Thu, 26 Jan 2017 01:26:59 +0100 Subject: [PATCH] Fixing errors. --- .../java/de/example/spring/kafka/Application.java | 35 ++++++++++----- .../de/example/spring/kafka/ReceiverConfig.java | 14 +++--- .../main/java/de/example/spring/kafka/Sender.java | 51 ++++++++++++++++------ .../java/de/example/spring/kafka/SenderConfig.java | 21 +++++---- 4 files changed, 83 insertions(+), 38 deletions(-) diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java index 8c533c5..c0222a0 100644 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java @@ -3,20 +3,35 @@ package de.example.spring.kafka; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.KafkaNull; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; @SpringBootApplication public class Application { - public static void main(String[] args) { - SpringApplication.run(Application.class); + public static void main(String[] args) throws Exception { + ConfigurableApplicationContext context + = new SpringApplicationBuilder(Application.class) + .web(false) + .run(args); + MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class); + for (int i = 0; i < 10; i++) { + toKafka.send(new GenericMessage<>("foo" + i)); + } + toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE)); + PollableChannel fromKafka = context.getBean("received", PollableChannel.class); + Message received = fromKafka.receive(10000); + while (received != null) { + System.out.println(received); + received = fromKafka.receive(10000); + } + context.close(); + System.exit(0); } - - @Bean - CommandLineRunner lookup(Receiver receiver) { - return args -> { - - }; - } - } diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java index 238766b..7d1a3e0 100644 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -1,7 +1,6 @@ package de.example.spring.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -22,28 +21,31 @@ public class ReceiverConfig { private String bootstrapServers; @Bean - public Map consumerConfigs() { + public Map consumerConfigs() { Map props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // consumer groups allow a pool of processes to divide the work of // consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); return props; } @Bean - public ConsumerFactory consumerFactory() { + public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java index 0ed6b5b..3208931 100644 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java @@ -1,42 +1,65 @@ package de.example.spring.kafka; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; +import javax.inject.Inject; + public class Sender { private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); - @Autowired - private KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; + + @Inject + public Sender(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } public void sendMessage(String topic, String message) { // the KafkaTemplate provides asynchronous send methods returning a // Future - ListenableFuture> future = kafkaTemplate - .send(topic, message); + ListenableFuture> future = kafkaTemplate.send(topic, message); + + // The same with MessageBuilder + // ListenableFuture> future = + // kafkaTemplate.send( + // MessageBuilder.withPayload(message) + // .setHeader(KafkaHeaders.TOPIC, topic) + // .build()); // you can register a callback with the listener to receive the result // of the send asynchronously - future.addCallback( - new ListenableFutureCallback>() { + future.addCallback(new ListenableFutureCallback>() { @Override - public void onSuccess( - SendResult result) { - LOGGER.info("sent message='{}' with offset={}", - message, - result.getRecordMetadata().offset()); + public void onSuccess(SendResult result) { + RecordMetadata recordMetadata = result.getRecordMetadata(); + ProducerRecord producerRecord = result.getProducerRecord(); + + LOGGER.info("sent message='{}'", message); + LOGGER.info("RecordMetadata"); + LOGGER.info("with offset={}", recordMetadata.offset()); + LOGGER.info("with partition={}", recordMetadata.partition()); + LOGGER.info("with checksum={}", recordMetadata.checksum()); + LOGGER.info("with timestamp={}", recordMetadata.timestamp()); + LOGGER.info("with timestamp={}", recordMetadata.topic()); + + LOGGER.info("ProducerRecord"); + LOGGER.info("with partition={}", producerRecord.partition()); + LOGGER.info("with value={}", producerRecord.value()); + LOGGER.info("with timestamp={}", producerRecord.timestamp()); + LOGGER.info("with topic={}", producerRecord.topic()); } @Override public void onFailure(Throwable ex) { - LOGGER.error("unable to send message='{}'", - message, ex); + LOGGER.error("unable to send message='{}'", message, ex); } }); diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java index c82bac7..61badbb 100644 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -1,7 +1,6 @@ package de.example.spring.kafka; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -20,13 +19,19 @@ public class SenderConfig { private String bootstrapServers; @Bean - public Map producerConfigs() { + public Map producerConfigs() { Map props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections // to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 2097152); + props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + props.put(ProducerConfig.LINGER_MS_CONFIG, 1); + // value to block, after which it will throw a TimeoutException props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); @@ -34,17 +39,17 @@ public class SenderConfig { } @Bean - public ProducerFactory producerFactory() { + public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate(producerFactory()); + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); } @Bean - public Sender sender() { - return new Sender(); + public Sender sender(KafkaTemplate kafkaTemplate) { + return new Sender(kafkaTemplate); } } -- 2.1.4