1 package de.example.spring.kafka;
3 import org.springframework.boot.CommandLineRunner;
4 import org.springframework.boot.SpringApplication;
5 import org.springframework.boot.autoconfigure.SpringBootApplication;
6 import org.springframework.boot.builder.SpringApplicationBuilder;
7 import org.springframework.context.ConfigurableApplicationContext;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.kafka.support.KafkaNull;
10 import org.springframework.messaging.Message;
11 import org.springframework.messaging.MessageChannel;
12 import org.springframework.messaging.PollableChannel;
13 import org.springframework.messaging.support.GenericMessage;
15 @SpringBootApplication
16 public class Application {
18 public static void main(String[] args) throws Exception {
19 ConfigurableApplicationContext context
20 = new SpringApplicationBuilder(Application.class)
23 MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
24 for (int i = 0; i < 10; i++) {
25 toKafka.send(new GenericMessage<>("foo" + i));
27 toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE));
28 PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
29 Message<?> received = fromKafka.receive(10000);
30 while (received != null) {
31 System.out.println(received);
32 received = fromKafka.receive(10000);