238766b4356193ef6aa50c87d261f5126fe5d64e
[JavaForFun] /
1 package de.example.spring.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerConfig;
4 import org.apache.kafka.common.serialization.IntegerDeserializer;
5 import org.apache.kafka.common.serialization.StringDeserializer;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9 import org.springframework.kafka.annotation.EnableKafka;
10 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
11 import org.springframework.kafka.core.ConsumerFactory;
12 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
13
14 import java.util.HashMap;
15 import java.util.Map;
16
17 @Configuration
18 @EnableKafka
19 public class ReceiverConfig {
20
21   @Value("${kafka.bootstrap.servers}")
22   private String bootstrapServers;
23
24   @Bean
25   public Map consumerConfigs() {
26     Map<String, Object> props = new HashMap<>();
27     // list of host:port pairs used for establishing the initial connections
28     // to the Kakfa cluster
29     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
30     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
31     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
32     // consumer groups allow a pool of processes to divide the work of
33     // consuming and processing records
34     props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
35
36     return props;
37   }
38
39   @Bean
40   public ConsumerFactory consumerFactory() {
41     return new DefaultKafkaConsumerFactory<>(consumerConfigs());
42   }
43
44   @Bean
45   public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
46     ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
47     factory.setConsumerFactory(consumerFactory());
48
49     return factory;
50   }
51
52   @Bean
53   public Receiver receiver() {
54     return new Receiver();
55   }
56 }