79db5d044a0e790a976c2f5f34396b2d8229d2cf
[JavaForFun] /
1 package de.example.spring.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
9 import org.springframework.kafka.core.KafkaTemplate;
10 import org.springframework.kafka.core.ProducerFactory;
11
12 import java.util.HashMap;
13 import java.util.Map;
14
15 public class SenderConfig {
16
17   @Value("${kafka.bootstrap.servers}")
18   private String bootstrapServers;
19
20   @Bean
21   public Map producerConfigs() {
22     Map<String, Object> props = new HashMap<>();
23     // list of host:port pairs used for establishing the initial connections
24     // to the Kakfa cluster
25     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
26     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
27     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
28     // value to block, after which it will throw a TimeoutException
29     props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
30
31     return props;
32   }
33
34   @Bean
35   public ProducerFactory producerFactory() {
36     return new DefaultKafkaProducerFactory<>(producerConfigs());
37   }
38
39   @Bean
40   public KafkaTemplate kafkaTemplate() {
41     return new KafkaTemplate(producerFactory());
42   }
43
44   @Bean
45   public Sender sender() {
46     return new Sender();
47   }
48 }