}
@Bean
- CommandLineRunner lookup(Sender sender) {
+ CommandLineRunner lookup(Receiver receiver) {
return args -> {
- String topic = "test";
- String message = "example message";
-
- if (args.length > 0) {
- topic = args[0];
- message = args[1];
- }
-
- sender.sendMessage(topic, message);
};
}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.kafka.annotation.KafkaListener;
+
+public class Receiver {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+ @KafkaListener(topics = "test")
+ public void receiveMessage(String message) {
+ LOGGER.info("received message='{}'", message);
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableKafka
+public class ReceiverConfig {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public Map consumerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ // list of host:port pairs used for establishing the initial connections
+ // to the Kakfa cluster
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ // consumer groups allow a pool of processes to divide the work of
+ // consuming and processing records
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
+
+ return props;
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+
+ return factory;
+ }
+
+ @Bean
+ public Receiver receiver() {
+ return new Receiver();
+ }
+}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-public class Sender {
- private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
-
- @Autowired
- private KafkaTemplate<Integer, String> kafkaTemplate;
-
- public void sendMessage(String topic, String message) {
- // the KafkaTemplate provides asynchronous send methods returning a
- // Future
- ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
- .send(topic, message);
-
- // you can register a callback with the listener to receive the result
- // of the send asynchronously
- future.addCallback(
- new ListenableFutureCallback<SendResult<Integer, String>>() {
-
- @Override
- public void onSuccess(
- SendResult<Integer, String> result) {
- LOGGER.info("sent message='{}' with offset={}",
- message,
- result.getRecordMetadata().offset());
- }
-
- @Override
- public void onFailure(Throwable ex) {
- LOGGER.error("unable to send message='{}'",
- message, ex);
- }
- });
-
- // alternatively, to block the sending thread, to await the result,
- // invoke the future’s get() method
- }
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-public class SenderConfig {
-
- @Value("${kafka.bootstrap.servers}")
- private String bootstrapServers;
-
- @Bean
- public Map producerConfigs() {
- Map<String, Object> props = new HashMap<>();
- // list of host:port pairs used for establishing the initial connections
- // to the Kakfa cluster
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- // value to block, after which it will throw a TimeoutException
- props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
-
- return props;
- }
-
- @Bean
- public ProducerFactory producerFactory() {
- return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
-
- @Bean
- public KafkaTemplate kafkaTemplate() {
- return new KafkaTemplate(producerFactory());
- }
-
- @Bean
- public Sender sender() {
- return new Sender();
- }
-}
}
@Bean
- CommandLineRunner lookup(Receiver receiver) {
+ CommandLineRunner lookup(Sender sender) {
return args -> {
+ String topic = "test";
+ String message = "example message";
+
+ if (args.length > 0) {
+ topic = args[0];
+ message = args[1];
+ }
+
+ sender.sendMessage(topic, message);
};
}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.kafka.annotation.KafkaListener;
-
-public class Receiver {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
-
- @KafkaListener(topics = "test")
- public void receiveMessage(String message) {
- LOGGER.info("received message='{}'", message);
- }
-
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-@EnableKafka
-public class ReceiverConfig {
-
- @Value("${kafka.bootstrap.servers}")
- private String bootstrapServers;
-
- @Bean
- public Map consumerConfigs() {
- Map<String, Object> props = new HashMap<>();
- // list of host:port pairs used for establishing the initial connections
- // to the Kakfa cluster
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- // consumer groups allow a pool of processes to divide the work of
- // consuming and processing records
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
-
- return props;
- }
-
- @Bean
- public ConsumerFactory consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- @Bean
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
-
- return factory;
- }
-
- @Bean
- public Receiver receiver() {
- return new Receiver();
- }
-}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+public class Sender {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
+
+ @Autowired
+ private KafkaTemplate<Integer, String> kafkaTemplate;
+
+ public void sendMessage(String topic, String message) {
+ // the KafkaTemplate provides asynchronous send methods returning a
+ // Future
+ ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
+ .send(topic, message);
+
+ // you can register a callback with the listener to receive the result
+ // of the send asynchronously
+ future.addCallback(
+ new ListenableFutureCallback<SendResult<Integer, String>>() {
+
+ @Override
+ public void onSuccess(
+ SendResult<Integer, String> result) {
+ LOGGER.info("sent message='{}' with offset={}",
+ message,
+ result.getRecordMetadata().offset());
+ }
+
+ @Override
+ public void onFailure(Throwable ex) {
+ LOGGER.error("unable to send message='{}'",
+ message, ex);
+ }
+ });
+
+ // alternatively, to block the sending thread, to await the result,
+ // invoke the future’s get() method
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class SenderConfig {
+
+ @Value("${kafka.bootstrap.servers}")
+ private String bootstrapServers;
+
+ @Bean
+ public Map producerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ // list of host:port pairs used for establishing the initial connections
+ // to the Kakfa cluster
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ // value to block, after which it will throw a TimeoutException
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
+
+ return props;
+ }
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ return new DefaultKafkaProducerFactory<>(producerConfigs());
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate(producerFactory());
+ }
+
+ @Bean
+ public Sender sender() {
+ return new Sender();
+ }
+}