1 package de.example.spring.kafka;
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.kafka.core.ProducerFactory;
13 import java.util.HashMap;
17 public class SenderConfig {
19 @Value("${kafka.bootstrap.servers}")
20 private String bootstrapServers;
23 public Map producerConfigs() {
24 Map<String, Object> props = new HashMap<>();
25 // list of host:port pairs used for establishing the initial connections
26 // to the Kakfa cluster
27 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
28 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
29 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
30 // value to block, after which it will throw a TimeoutException
31 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
37 public ProducerFactory producerFactory() {
38 return new DefaultKafkaProducerFactory<>(producerConfigs());
42 public KafkaTemplate kafkaTemplate() {
43 return new KafkaTemplate(producerFactory());
47 public Sender sender() {