1 package de.example.spring.kafka;
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
9 import org.springframework.kafka.core.KafkaTemplate;
10 import org.springframework.kafka.core.ProducerFactory;
12 import java.util.HashMap;
15 public class SenderConfig {
17 @Value("${kafka.bootstrap.servers}")
18 private String bootstrapServers;
21 public Map producerConfigs() {
22 Map<String, Object> props = new HashMap<>();
23 // list of host:port pairs used for establishing the initial connections
24 // to the Kakfa cluster
25 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
26 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
27 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
28 // value to block, after which it will throw a TimeoutException
29 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
35 public ProducerFactory producerFactory() {
36 return new DefaultKafkaProducerFactory<>(producerConfigs());
40 public KafkaTemplate kafkaTemplate() {
41 return new KafkaTemplate(producerFactory());
45 public Sender sender() {