1 package de.example.spring.kafka;
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.kafka.core.KafkaTemplate;
7 import org.springframework.kafka.support.SendResult;
8 import org.springframework.util.concurrent.ListenableFuture;
9 import org.springframework.util.concurrent.ListenableFutureCallback;
12 private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
15 private KafkaTemplate<Integer, String> kafkaTemplate;
17 public void sendMessage(String topic, String message) {
18 // the KafkaTemplate provides asynchronous send methods returning a
20 ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
21 .send(topic, message);
23 // you can register a callback with the listener to receive the result
24 // of the send asynchronously
26 new ListenableFutureCallback<SendResult<Integer, String>>() {
29 public void onSuccess(
30 SendResult<Integer, String> result) {
31 LOGGER.info("sent message='{}' with offset={}",
33 result.getRecordMetadata().offset());
37 public void onFailure(Throwable ex) {
38 LOGGER.error("unable to send message='{}'",
43 // alternatively, to block the sending thread, to await the result,
44 // invoke the future’s get() method