import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.KafkaNull;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.PollableChannel;
+import org.springframework.messaging.support.GenericMessage;
@SpringBootApplication
public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
+ public static void main(String[] args) throws Exception {
+ ConfigurableApplicationContext context
+ = new SpringApplicationBuilder(Application.class)
+ .web(false)
+ .run(args);
+ MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
+ for (int i = 0; i < 10; i++) {
+ toKafka.send(new GenericMessage<>("foo" + i));
+ }
+ toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE));
+ PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
+ Message<?> received = fromKafka.receive(10000);
+ while (received != null) {
+ System.out.println(received);
+ received = fromKafka.receive(10000);
+ }
+ context.close();
+ System.exit(0);
}
-
- @Bean
- CommandLineRunner lookup(Receiver receiver) {
- return args -> {
-
- };
- }
-
}
package de.example.spring.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
private String bootstrapServers;
@Bean
- public Map consumerConfigs() {
+ public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
return props;
}
@Bean
- public ConsumerFactory consumerFactory() {
+ public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
package de.example.spring.kafka;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
+import javax.inject.Inject;
+
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
- @Autowired
- private KafkaTemplate<Integer, String> kafkaTemplate;
+ private final KafkaTemplate<String, String> kafkaTemplate;
+
+ @Inject
+ public Sender(KafkaTemplate<String, String> kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
public void sendMessage(String topic, String message) {
// the KafkaTemplate provides asynchronous send methods returning a
// Future
- ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
- .send(topic, message);
+ ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
+
+ // The same with MessageBuilder
+ // ListenableFuture<SendResult<String, String>> future =
+ // kafkaTemplate.send(
+ // MessageBuilder.withPayload(message)
+ // .setHeader(KafkaHeaders.TOPIC, topic)
+ // .build());
// you can register a callback with the listener to receive the result
// of the send asynchronously
- future.addCallback(
- new ListenableFutureCallback<SendResult<Integer, String>>() {
+ future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
- public void onSuccess(
- SendResult<Integer, String> result) {
- LOGGER.info("sent message='{}' with offset={}",
- message,
- result.getRecordMetadata().offset());
+ public void onSuccess(SendResult<String, String> result) {
+ RecordMetadata recordMetadata = result.getRecordMetadata();
+ ProducerRecord producerRecord = result.getProducerRecord();
+
+ LOGGER.info("sent message='{}'", message);
+ LOGGER.info("RecordMetadata");
+ LOGGER.info("with offset={}", recordMetadata.offset());
+ LOGGER.info("with partition={}", recordMetadata.partition());
+ LOGGER.info("with checksum={}", recordMetadata.checksum());
+ LOGGER.info("with timestamp={}", recordMetadata.timestamp());
+ LOGGER.info("with timestamp={}", recordMetadata.topic());
+
+ LOGGER.info("ProducerRecord");
+ LOGGER.info("with partition={}", producerRecord.partition());
+ LOGGER.info("with value={}", producerRecord.value());
+ LOGGER.info("with timestamp={}", producerRecord.timestamp());
+ LOGGER.info("with topic={}", producerRecord.topic());
}
@Override
public void onFailure(Throwable ex) {
- LOGGER.error("unable to send message='{}'",
- message, ex);
+ LOGGER.error("unable to send message='{}'", message, ex);
}
});
package de.example.spring.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
private String bootstrapServers;
@Bean
- public Map producerConfigs() {
+ public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 2097152);
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+
// value to block, after which it will throw a TimeoutException
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
}
@Bean
- public ProducerFactory producerFactory() {
+ public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
- public KafkaTemplate kafkaTemplate() {
- return new KafkaTemplate(producerFactory());
+ public KafkaTemplate<String, String> kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
}
@Bean
- public Sender sender() {
- return new Sender();
+ public Sender sender(KafkaTemplate<String, String> kafkaTemplate) {
+ return new Sender(kafkaTemplate);
}
}