0ed6b5b2e6055b9b985946c754edc9a52dbd34b9
[JavaForFun] /
1 package de.example.spring.kafka;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.kafka.core.KafkaTemplate;
7 import org.springframework.kafka.support.SendResult;
8 import org.springframework.util.concurrent.ListenableFuture;
9 import org.springframework.util.concurrent.ListenableFutureCallback;
10
11 public class Sender {
12   private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
13
14   @Autowired
15   private KafkaTemplate<Integer, String> kafkaTemplate;
16
17   public void sendMessage(String topic, String message) {
18     // the KafkaTemplate provides asynchronous send methods returning a
19     // Future
20     ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate
21         .send(topic, message);
22
23     // you can register a callback with the listener to receive the result
24     // of the send asynchronously
25     future.addCallback(
26         new ListenableFutureCallback<SendResult<Integer, String>>() {
27
28           @Override
29           public void onSuccess(
30               SendResult<Integer, String> result) {
31             LOGGER.info("sent message='{}' with offset={}",
32                 message,
33                 result.getRecordMetadata().offset());
34           }
35
36           @Override
37           public void onFailure(Throwable ex) {
38             LOGGER.error("unable to send message='{}'",
39                 message, ex);
40           }
41         });
42
43     // alternatively, to block the sending thread, to await the result,
44     // invoke the future’s get() method
45   }
46 }