1 package de.example.spring.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerConfig;
4 import org.apache.kafka.common.serialization.IntegerDeserializer;
5 import org.apache.kafka.common.serialization.StringDeserializer;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9 import org.springframework.kafka.annotation.EnableKafka;
10 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
11 import org.springframework.kafka.core.ConsumerFactory;
12 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14 import java.util.HashMap;
19 public class ReceiverConfig {
21 @Value("${kafka.bootstrap.servers}")
22 private String bootstrapServers;
25 public Map consumerConfigs() {
26 Map<String, Object> props = new HashMap<>();
27 // list of host:port pairs used for establishing the initial connections
28 // to the Kakfa cluster
29 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
30 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
31 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
32 // consumer groups allow a pool of processes to divide the work of
33 // consuming and processing records
34 props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
40 public ConsumerFactory consumerFactory() {
41 return new DefaultKafkaConsumerFactory<>(consumerConfigs());
45 public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
46 ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
47 factory.setConsumerFactory(consumerFactory());
53 public Receiver receiver() {
54 return new Receiver();