c82bac772bc20156e16a1ff592804b1c3a07cb4a
[JavaForFun] /
1 package de.example.spring.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerConfig;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.kafka.core.ProducerFactory;
12
13 import java.util.HashMap;
14 import java.util.Map;
15
16 @Configuration
17 public class SenderConfig {
18
19   @Value("${kafka.bootstrap.servers}")
20   private String bootstrapServers;
21
22   @Bean
23   public Map producerConfigs() {
24     Map<String, Object> props = new HashMap<>();
25     // list of host:port pairs used for establishing the initial connections
26     // to the Kakfa cluster
27     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
28     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
29     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
30     // value to block, after which it will throw a TimeoutException
31     props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
32
33     return props;
34   }
35
36   @Bean
37   public ProducerFactory producerFactory() {
38     return new DefaultKafkaProducerFactory<>(producerConfigs());
39   }
40
41   @Bean
42   public KafkaTemplate kafkaTemplate() {
43     return new KafkaTemplate(producerFactory());
44   }
45
46   @Bean
47   public Sender sender() {
48     return new Sender();
49   }
50 }