From d8417948c0b21f443e6cc7306a50ee33f523562d Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Wed, 25 Jan 2017 23:04:46 +0100 Subject: [PATCH] Spring Kafka, swapping directories --- .../java/de/example/spring/kafka/Application.java | 11 +---- .../java/de/example/spring/kafka/Receiver.java | 16 +++++++ .../de/example/spring/kafka/ReceiverConfig.java | 56 ++++++++++++++++++++++ .../main/java/de/example/spring/kafka/Sender.java | 46 ------------------ .../java/de/example/spring/kafka/SenderConfig.java | 50 ------------------- .../java/de/example/spring/kafka/Application.java | 11 ++++- .../java/de/example/spring/kafka/Receiver.java | 16 ------- .../de/example/spring/kafka/ReceiverConfig.java | 56 ---------------------- .../main/java/de/example/spring/kafka/Sender.java | 46 ++++++++++++++++++ .../java/de/example/spring/kafka/SenderConfig.java | 50 +++++++++++++++++++ 10 files changed, 179 insertions(+), 179 deletions(-) create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java delete mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java delete mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java delete mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java delete mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java create mode 100644 SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java index 34409bf..8c533c5 100644 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java @@ -13,18 +13,9 @@ public class Application { } @Bean - CommandLineRunner lookup(Sender sender) { + CommandLineRunner lookup(Receiver receiver) { return args -> { - String topic = "test"; - String message = "example message"; - - if (args.length > 0) { - topic = args[0]; - message = args[1]; - } - - sender.sendMessage(topic, message); }; } diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java new file mode 100644 index 0000000..c969cb2 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java @@ -0,0 +1,16 @@ +package de.example.spring.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; + +public class Receiver { + + private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); + + @KafkaListener(topics = "test") + public void receiveMessage(String message) { + LOGGER.info("received message='{}'", message); + } + +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java new file mode 100644 index 0000000..238766b --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -0,0 +1,56 @@ +package de.example.spring.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@EnableKafka +public class ReceiverConfig { + + @Value("${kafka.bootstrap.servers}") + private String bootstrapServers; + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + // list of host:port pairs used for establishing the initial connections + // to the Kakfa cluster + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + // consumer groups allow a pool of processes to divide the work of + // consuming and processing records + props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); + + return props; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + + return factory; + } + + @Bean + public Receiver receiver() { + return new Receiver(); + } +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java deleted file mode 100644 index 0ed6b5b..0000000 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/Sender.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.example.spring.kafka; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; - -public class Sender { - private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); - - @Autowired - private KafkaTemplate kafkaTemplate; - - public void sendMessage(String topic, String message) { - // the KafkaTemplate provides asynchronous send methods returning a - // Future - ListenableFuture> future = kafkaTemplate - .send(topic, message); - - // you can register a callback with the listener to receive the result - // of the send asynchronously - future.addCallback( - new ListenableFutureCallback>() { - - @Override - public void onSuccess( - SendResult result) { - LOGGER.info("sent message='{}' with offset={}", - message, - result.getRecordMetadata().offset()); - } - - @Override - public void onFailure(Throwable ex) { - LOGGER.error("unable to send message='{}'", - message, ex); - } - }); - - // alternatively, to block the sending thread, to await the result, - // invoke the future’s get() method - } -} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java deleted file mode 100644 index c82bac7..0000000 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-consumer-example/src/main/java/de/example/spring/kafka/SenderConfig.java +++ /dev/null @@ -1,50 +0,0 @@ -package de.example.spring.kafka; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -public class SenderConfig { - - @Value("${kafka.bootstrap.servers}") - private String bootstrapServers; - - @Bean - public Map producerConfigs() { - Map props = new HashMap<>(); - // list of host:port pairs used for establishing the initial connections - // to the Kakfa cluster - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - // value to block, after which it will throw a TimeoutException - props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); - - return props; - } - - @Bean - public ProducerFactory producerFactory() { - return new DefaultKafkaProducerFactory<>(producerConfigs()); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate(producerFactory()); - } - - @Bean - public Sender sender() { - return new Sender(); - } -} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java index 8c533c5..34409bf 100644 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java @@ -13,9 +13,18 @@ public class Application { } @Bean - CommandLineRunner lookup(Receiver receiver) { + CommandLineRunner lookup(Sender sender) { return args -> { + String topic = "test"; + String message = "example message"; + + if (args.length > 0) { + topic = args[0]; + message = args[1]; + } + + sender.sendMessage(topic, message); }; } diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java deleted file mode 100644 index c969cb2..0000000 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Receiver.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.example.spring.kafka; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.kafka.annotation.KafkaListener; - -public class Receiver { - - private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); - - @KafkaListener(topics = "test") - public void receiveMessage(String message) { - LOGGER.info("received message='{}'", message); - } - -} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java deleted file mode 100644 index 238766b..0000000 --- a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java +++ /dev/null @@ -1,56 +0,0 @@ -package de.example.spring.kafka; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -@EnableKafka -public class ReceiverConfig { - - @Value("${kafka.bootstrap.servers}") - private String bootstrapServers; - - @Bean - public Map consumerConfigs() { - Map props = new HashMap<>(); - // list of host:port pairs used for establishing the initial connections - // to the Kakfa cluster - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - // consumer groups allow a pool of processes to divide the work of - // consuming and processing records - props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld"); - - return props; - } - - @Bean - public ConsumerFactory consumerFactory() { - return new DefaultKafkaConsumerFactory<>(consumerConfigs()); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - - return factory; - } - - @Bean - public Receiver receiver() { - return new Receiver(); - } -} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java new file mode 100644 index 0000000..0ed6b5b --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java @@ -0,0 +1,46 @@ +package de.example.spring.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +public class Sender { + private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void sendMessage(String topic, String message) { + // the KafkaTemplate provides asynchronous send methods returning a + // Future + ListenableFuture> future = kafkaTemplate + .send(topic, message); + + // you can register a callback with the listener to receive the result + // of the send asynchronously + future.addCallback( + new ListenableFutureCallback>() { + + @Override + public void onSuccess( + SendResult result) { + LOGGER.info("sent message='{}' with offset={}", + message, + result.getRecordMetadata().offset()); + } + + @Override + public void onFailure(Throwable ex) { + LOGGER.error("unable to send message='{}'", + message, ex); + } + }); + + // alternatively, to block the sending thread, to await the result, + // invoke the future’s get() method + } +} diff --git a/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java new file mode 100644 index 0000000..c82bac7 --- /dev/null +++ b/SpringJava/Kafka/SpringKafka/spring-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -0,0 +1,50 @@ +package de.example.spring.kafka; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class SenderConfig { + + @Value("${kafka.bootstrap.servers}") + private String bootstrapServers; + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + // list of host:port pairs used for establishing the initial connections + // to the Kakfa cluster + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // value to block, after which it will throw a TimeoutException + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); + + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); + } + + @Bean + public Sender sender() { + return new Sender(); + } +} -- 2.1.4