c0222a0151ae2d1f2ecd84f3c11a7a975dc60eed
[JavaForFun] /
1 package de.example.spring.kafka;
2
3 import org.springframework.boot.CommandLineRunner;
4 import org.springframework.boot.SpringApplication;
5 import org.springframework.boot.autoconfigure.SpringBootApplication;
6 import org.springframework.boot.builder.SpringApplicationBuilder;
7 import org.springframework.context.ConfigurableApplicationContext;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.kafka.support.KafkaNull;
10 import org.springframework.messaging.Message;
11 import org.springframework.messaging.MessageChannel;
12 import org.springframework.messaging.PollableChannel;
13 import org.springframework.messaging.support.GenericMessage;
14
15 @SpringBootApplication
16 public class Application {
17
18   public static void main(String[] args) throws Exception {
19     ConfigurableApplicationContext context
20         = new SpringApplicationBuilder(Application.class)
21         .web(false)
22         .run(args);
23     MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
24     for (int i = 0; i < 10; i++) {
25       toKafka.send(new GenericMessage<>("foo" + i));
26     }
27     toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE));
28     PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
29     Message<?> received = fromKafka.receive(10000);
30     while (received != null) {
31       System.out.println(received);
32       received = fromKafka.receive(10000);
33     }
34     context.close();
35     System.exit(0);
36   }
37 }