--- /dev/null
+package de.example.spring.kafka;
+
+import javax.inject.Named;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Named
+public class DummyService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class);
+
+
+ public void iAmVeryDummy(String message) {
+ LOGGER.info("I am a dummy service: '{}'", message);
+ }
+}
package de.example.spring.kafka;
+import javax.inject.Inject;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(Sink.class)
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+ private final DummyService dummyService;
+
+ @Inject
+ public Receiver(DummyService dummyService) {
+ this.dummyService = dummyService;
+ }
@StreamListener(Sink.INPUT)
public void handle(Product product) {
LOGGER.info("product name='{}'", product.getName());
LOGGER.info("product description='{}'", product.getDescription());
+
+ dummyService.iAmVeryDummy(product.getName());
}
}
public class ReceiverConfig {
@Bean
- public Receiver receiver() {
- return new Receiver();
+ public Receiver receiver(DummyService dummyService) {
+ return new Receiver(dummyService);
}
--- /dev/null
+package de.example.spring.kafka;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doNothing;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = { Receiver.class })
+@DirtiesContext
+public class ReceiverShould {
+
+ @Inject
+ Sink source;
+
+ @MockBean
+ DummyService dummyService;
+
+ @Test
+ public void callSomeDummy() {
+ String productName = "product";
+ String productDescription = "productDescription";
+ Product product = new Product(productName, productDescription);
+ ArgumentCaptor<String> dummyArgCaptor = ArgumentCaptor.forClass(String.class);
+ doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture());
+
+ Message<Product> message = new GenericMessage<>(product);
+ source.input().send(message);
+
+ assertThat(dummyArgCaptor.getValue(), is(product.getName()));
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.Message;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = { Sender.class })
+@DirtiesContext
+public class SenderShould {
+
+ @Inject
+ Source source;
+
+ @Inject
+ Sender sender;
+
+ @Inject
+ private MessageCollector messageCollector;
+
+ @Test
+ public void sendSomeProduct() {
+ Product product = new Product("hello", "this is some description");
+
+ sender.sendMessage("hello");
+
+ Message<Product> received = (Message<Product>) messageCollector.forChannel(source.output()).poll();
+
+ assertThat(received.getPayload().getDescription(), is(product.getDescription()));
+ }
+
+}