xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-example</artifactId>
+ <artifactId>spring-stream-kafka-example</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
- <name>spring-cloud-stream-kafka-example</name>
+ <name>spring-stream-kafka-example</name>
<url>https://gumartinm.name/</url>
<description>Spring Cloud Stream Kafka</description>
<organization>
</scm>
<modules>
- <module>spring-cloud-stream-kafka-example-bom</module>
- <module>spring-cloud-stream-kafka-common-example</module>
- <module>spring-cloud-stream-kafka-consumer-example</module>
- <module>spring-cloud-stream-kafka-producer-example</module>
+ <module>spring-stream-kafka-bom</module>
+ <module>spring-stream-kafka-common</module>
+ <module>spring-stream-kafka-consumer</module>
+ <module>spring-stream-kafka-producer</module>
</modules>
</project>
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
- <name>spring-cloud-stream-kafka-common-example</name>
- <url>https://gumartinm.name</url>
- <description>Spring Cloud Stream Kafka Commons</description>
- <organization>
- <name>gumartinm</name>
- <url>https://gumartinm.name</url>
- </organization>
- <scm>
- <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
- <url>https://git.gumartinm.name/JavaForFun</url>
- </scm>
-
-</project>
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.messaging.converter.MappingJackson2MessageConverter;
-import org.springframework.util.MimeType;
-
-/**
- * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject()
- *
- * This code:
- * if (payload instanceof String) {
- * return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
- * : MimeTypeUtils.TEXT_PLAIN;
- * }
- *
- * Changes messages from:
- * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
- *
- * to:
- * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
- *
- *
- * By means of my custom converter we end up having:
- *
- * contentType "application/json"{"name":"example message","description":"this is some description"}
- *
- * About "application/json" and character encoding:
- * https://tools.ietf.org/html/rfc7158#section-8.1 "The default encoding is UTF-8" :)
- *
- *
- *
- */
-
-
-//
-// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload
-// org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary
-// org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal
-//
-// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in
-// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on
-// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is
-// always using UTF-8. It does not matter what character set I am receiving :/
-//
-//
-//
-//
-// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems
-// not using Spring (Spring is creating a mess with the headers...)
-//
-//
-
-
-
-//
-// Kafka messages WITHOUT MyCustomMessageConverter:
-// headers:
-// contentType "text/plain" <--------------- perhaps this is important.
-// originalContentType "application/json;charset=UTF-8"
-// payload:
-// {"name":"example message","description":"this is some description"}
-//
-//
-// Kafka messages WITH MyCustomMessageConverter:
-// headers:
-// contentType "application/json"
-// payload:
-// {"name":"example message","description":"this is some description"}
-//
-
-
-// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :(
-public class MyCustomMessageConverter extends MappingJackson2MessageConverter {
-
- public MyCustomMessageConverter() {
- super(new MimeType("application", "json"));
- }
-
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-public class Product {
- private String name;
- private String description;
-
- /**
- * Required for deserialization from Jackson.
- */
- protected Product() {
-
- }
-
- public Product(String name, String description) {
- this.name = name;
- this.description = description;
- }
-
- public String getName() {
- return this.name;
- }
-
- public String getDescription() {
- return this.description;
- }
-}
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>spring-cloud-stream-kafka-consumer-example</artifactId>
- <name>spring-cloud-stream-kafka-consumer-example</name>
- <url>https://gumartinm.name</url>
- <description>Spring Cloud Stream Kafka Consumer</description>
- <organization>
- <name>gumartinm</name>
- <url>https://gumartinm.name</url>
- </organization>
- <scm>
- <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
- <url>https://git.gumartinm.name/JavaForFun</url>
- </scm>
-
-
- <dependencies>
- <dependency>
- <groupId>javax.inject</groupId>
- <artifactId>javax.inject</artifactId>
- <version>1</version>
- </dependency>
-
- <dependency>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-@SpringBootApplication
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
- }
-
-
- @Bean
- CommandLineRunner lookup(Receiver receiver) {
- return args -> {
-
- };
- }
-
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import javax.inject.Named;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Named
-public class DummyService {
- private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class);
-
-
- public void iAmVeryDummy(String message) {
- LOGGER.info("I am a dummy service: '{}'", message);
- }
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import javax.inject.Inject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.messaging.Sink;
-
-@EnableBinding(Sink.class)
-public class Receiver {
- private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
-
- private final DummyService dummyService;
-
- @Inject
- public Receiver(DummyService dummyService) {
- this.dummyService = dummyService;
- }
-
- @StreamListener(Sink.INPUT)
- public void handle(Product product) {
- LOGGER.info("product name='{}'", product.getName());
- LOGGER.info("product description='{}'", product.getDescription());
-
- dummyService.iAmVeryDummy(product.getName());
- }
-
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class ReceiverConfig {
-
- @Bean
- public Receiver receiver(DummyService dummyService) {
- return new Receiver(dummyService);
- }
-
-
-// @Bean
-// public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
-// MyCustomMessageConverter converter = new MyCustomMessageConverter();
-// converter.setSerializedPayloadClass(String.class);
-// if (objectMapper != null) {
-// converter.setObjectMapper(objectMapper);
-// }
-//
-// return converter;
-// }
-
-
-// @Bean
-// public MessageConverter avroMessageConverter() throws IOException {
-// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
-// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
-// return converter;
-// }
-}
+++ /dev/null
-logging:
- level:
- org:
- springframework: info
- ROOT: info
-
-kafka:
- topic: test
- messageKey: keyTest
-
-spring:
- main:
- banner-mode: "off"
- # We do not need Tomcat running (this application runs from console)
- web-environment: false
- cloud:
- stream:
- bindings:
- input:
- binder: kafka
- destination: test
- group: helloworld
- contentType: application/json
- startOffset: latest
- consumer:
- concurrency: 1
- partitioned: false
- # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
- headerMode: embeddedHeaders
- maxAttempts: 3
- backOffInitialInterval: 1000
- instanceCount: 1
- instanceIndex: 0
- kafka:
- binder:
- zkNodes: "kafka:2181"
- brokers: "kafka:9092,kafka:9093,kafka:9094"
- autoCreateTopics: true
-
-# Health indicator for binders.
-management:
- health:
- binders:
- enabled: true
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
+++ /dev/null
-package de.example.spring.kafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doNothing;
-
-import javax.inject.Inject;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.cloud.stream.messaging.Sink;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.GenericMessage;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@SpringBootTest(classes = { Receiver.class })
-@DirtiesContext
-public class ReceiverShould {
-
- @Inject
- Sink source;
-
- @MockBean
- DummyService dummyService;
-
- @Test
- public void callSomeDummy() {
- String productName = "product";
- String productDescription = "productDescription";
- Product product = new Product(productName, productDescription);
- ArgumentCaptor<String> dummyArgCaptor = ArgumentCaptor.forClass(String.class);
- doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture());
-
- Message<Product> message = new GenericMessage<>(product);
- source.input().send(message);
-
- assertThat(dummyArgCaptor.getValue(), is(product.getName()));
- }
-
-}
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-parent</artifactId>
- <version>Camden.SR4</version>
- </parent>
-
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>pom</packaging>
- <name>spring-cloud-stream-kafka-example-bom</name>
-
- <url>https://gumartinm.name</url>
- <description>Spring Cloud Stream Kafka BOM example</description>
- <organization>
- <name>gumartinm</name>
- <url>https://gumartinm.name</url>
- </organization>
- <scm>
- <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
- <url>https://git.gumartinm.name/JavaForFun</url>
- </scm>
- <properties>
- <skip.unit.tests>false</skip.unit.tests>
- <skip.integration.tests>false</skip.integration.tests>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-kafka</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
-
- <!-- Required for using Kafka 0.10.1.1 -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>1.1.2.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-kafka</artifactId>
- <version>2.1.0.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>0.10.1.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- Required for using Apache Avro -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-schema</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.8.1</version>
- </dependency>
-
- <!-- Tests -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-test-support</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <dependencyManagement>
-
- <!--
- Schema Registry Server. See: http://docs.spring.io/spring-cloud-stream/docs/current/reference/html/contenttypemanagement.html
- -->
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-server</artifactId>
- </dependency>
- </dependencies>
-
- </dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
- <configuration>
- <encoding>${project.build.sourceEncoding}</encoding>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <archive>
- <manifestEntries>
- <Specification-Title>${project.description}</Specification-Title>
- <Specification-Version>${project.version}</Specification-Version>
- <Specification-Vendor>${project.organization.name}</Specification-Vendor>
- <Implementation-Title>${project.description}</Implementation-Title>
- <Implementation-Version>${project.version}</Implementation-Version>
- <Implementation-Vendor>${project.organization.name}</Implementation-Vendor>
- </manifestEntries>
- </archive>
- </configuration>
- </plugin>
-
- <!-- Used for unit tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.maven.surefire</groupId>
- <artifactId>surefire-junit47</artifactId>
- <version>2.18.1</version>
- </dependency>
- </dependencies>
- <configuration>
- <!-- Skips unit tests if the value of skip.unit.tests property is true -->
- <skipTests>${skip.unit.tests}</skipTests>
- <!-- Excludes integration tests when unit tests are run. -->
- <excludes>
- <exclude>**/*IT.java</exclude>
- <exclude>**/*IntegrationTest.java</exclude>
- </excludes>
- </configuration>
- </plugin>
-
- <!-- Used for integration tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- <configuration>
- <!-- Skips integration tests if the value of skip.integration.tests
- property is true -->
- <skipTests>${skip.integration.tests}</skipTests>
- <includes>
- <include>**/*IT.java</include>
- <include>**/*IntegrationTest.java</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <pluginManagement>
- <plugins>
- <!--
- It will create our running jar file.
- The main class must be located in: src/main/java/de/example/spring/kafka
- otherwise you need an explicit declaration using this property
- <start-class>de.exmaple.spring.kafka.Application</start-class>
-
- The start-class property is automatically populated by spring-boot-maven-plugin and it is
- used in spring-boot-starter-parent.
-
- See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
- -->
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.4.RELEASE</version>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-</project>
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>spring-cloud-stream-kafka-producer-example</artifactId>
- <name>spring-cloud-stream-kafka-producer-example</name>
- <url>https://gumartinm.name</url>
- <description>Spring Cloud Stream Kafka Producer</description>
- <organization>
- <name>gumartinm</name>
- <url>https://gumartinm.name</url>
- </organization>
- <scm>
- <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
- <url>https://git.gumartinm.name/JavaForFun</url>
- </scm>
-
- <dependencies>
- <dependency>
- <groupId>javax.inject</groupId>
- <artifactId>javax.inject</artifactId>
- <version>1</version>
- </dependency>
-
- <dependency>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-@SpringBootApplication
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
- }
-
- @Bean
- CommandLineRunner lookup(Sender sender) {
- return args -> {
- String message = "example message";
-
-
- if (args.length > 0) {
- message = args[0];
- }
-
- sender.sendMessage(message);
- };
- }
-
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.cloud.stream.annotation.Input;
-import org.springframework.cloud.stream.annotation.Output;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.SubscribableChannel;
-
-/**
- * En lugar de esta definicion "custom" usaremos las dos que
- * vienen por defecto en:
- * org.springframework.cloud.stream.messaging.Sink
- * org.springframework.cloud.stream.messaging.Source
- *
- * Esta definicion custom se usaría igual que Sink y Source :)
- * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
- */
-public interface InputOutputChannels {
-
- @Input("inputChannel")
- SubscribableChannel input();
-
- @Output("outputChannel")
- MessageChannel output();
-
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import javax.inject.Inject;
-
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.messaging.support.MessageBuilder;
-
-@EnableBinding(Source.class)
-public class Sender {
- // You could use here your custom interface. See: InputOutputChannels :)
- private final Source source;
-
- @Inject
- public Sender(Source source) {
- this.source = source;
- }
-
- public void sendMessage(String message) {
- Product product = new Product(message, "this is some description");
- source.output().send(MessageBuilder.withPayload(product).build());
- }
-}
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class SenderConfig {
-
- @Bean
- public Sender sender(Source source) {
- return new Sender(source);
- }
-
-// @Bean
-// public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
-// MyCustomMessageConverter converter = new MyCustomMessageConverter();
-// converter.setSerializedPayloadClass(String.class);
-// if (objectMapper != null) {
-// converter.setObjectMapper(objectMapper);
-// }
-//
-// return converter;
-// }
-
-// @Bean
-// public MessageConverter avroMessageConverter() throws IOException {
-// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
-// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
-// return converter;
-// }
-}
+++ /dev/null
-logging:
- level:
- org:
- springframework: info
- ROOT: info
-
-spring:
- main:
- banner-mode: "off"
- # We do not need Tomcat running (this application runs from console)
- web-environment: false
- cloud:
- stream:
- bindings:
- output:
- binder: kafka
- destination: test
- contentType: application/json
- # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
- headerMode: embeddedHeaders
- instanceCount: 1
- instanceIndex: 0
- kafka:
- binder:
- zkNodes: "kafka:2181"
- brokers: "kafka:9092,kafka:9093,kafka:9094"
- autoCreateTopics: true
-
-# Health indicator for binders.
-management:
- health:
- binders:
- enabled: true
-
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
+++ /dev/null
-package de.example.spring.kafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-import javax.inject.Inject;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
-import org.springframework.messaging.Message;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@SpringBootTest(classes = { Sender.class })
-@DirtiesContext
-public class SenderShould {
-
- @Inject
- Source source;
-
- @Inject
- Sender sender;
-
- @Inject
- private MessageCollector messageCollector;
-
- @Test
- public void sendSomeProduct() {
- Product product = new Product("hello", "this is some description");
-
- sender.sendMessage("hello");
-
- Message<Product> received = (Message<Product>) messageCollector.forChannel(source.output()).poll();
-
- assertThat(received.getPayload().getDescription(), is(product.getDescription()));
- }
-
-}
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>spring-cloud-stream-kafka-registry-example</artifactId>
- <name>spring-cloud-stream-kafka-registry-example</name>
- <url>https://gumartinm.name</url>
- <description>Spring Cloud Stream Kafka Registry</description>
- <organization>
- <name>gumartinm</name>
- <url>https://gumartinm.name</url>
- </organization>
- <scm>
- <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
- <url>https://git.gumartinm.name/JavaForFun</url>
- </scm>
-
-
- <dependencies>
- <dependency>
- <groupId>javax.inject</groupId>
- <artifactId>javax.inject</artifactId>
- <version>1</version>
- </dependency>
-
- <dependency>
- <groupId>de.example.spring.kafka</groupId>
- <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-server</artifactId>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
-</project>
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer;
-
-@SpringBootApplication
-@EnableSchemaRegistryServer
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
- }
-
-}
+++ /dev/null
-logging:
- level:
- org:
- springframework: info
- ROOT: info
-
-server:
- port: 8888
-spring:
- cloud:
- stream:
- schema:
- server:
- path: schema
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-starter-parent</artifactId>
+ <version>Camden.SR4</version>
+ </parent>
+
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-stream-kafka-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>spring-stream-kafka-bom</name>
+
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka BOM example</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+ <properties>
+ <skip.unit.tests>false</skip.unit.tests>
+ <skip.integration.tests>false</skip.integration.tests>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-starter-stream-kafka</artifactId>
+ <version>1.1.1.RELEASE</version>
+ </dependency>
+
+ <!-- Required for using Kafka 0.10.1.1 -->
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <version>1.1.2.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.integration</groupId>
+ <artifactId>spring-integration-kafka</artifactId>
+ <version>2.1.0.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Required for using Apache Avro -->
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-schema</artifactId>
+ <version>1.1.1.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+
+ <!-- Tests -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-test-support</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <dependencyManagement>
+
+ <!--
+ Schema Registry Server. See: http://docs.spring.io/spring-cloud-stream/docs/current/reference/html/contenttypemanagement.html
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-server</artifactId>
+ </dependency>
+ </dependencies>
+ -->
+
+ </dependencyManagement>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Specification-Title>${project.description}</Specification-Title>
+ <Specification-Version>${project.version}</Specification-Version>
+ <Specification-Vendor>${project.organization.name}</Specification-Vendor>
+ <Implementation-Title>${project.description}</Implementation-Title>
+ <Implementation-Version>${project.version}</Implementation-Version>
+ <Implementation-Vendor>${project.organization.name}</Implementation-Vendor>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <!-- Used for unit tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>2.18.1</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <!-- Skips unit tests if the value of skip.unit.tests property is true -->
+ <skipTests>${skip.unit.tests}</skipTests>
+ <!-- Excludes integration tests when unit tests are run. -->
+ <excludes>
+ <exclude>**/*IT.java</exclude>
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <!-- Used for integration tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <!-- Skips integration tests if the value of skip.integration.tests
+ property is true -->
+ <skipTests>${skip.integration.tests}</skipTests>
+ <includes>
+ <include>**/*IT.java</include>
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--
+ It will create our running jar file.
+ The main class must be located in: src/main/java/de/example/spring/kafka
+ otherwise you need an explicit declaration using this property
+ <start-class>de.exmaple.spring.kafka.Application</start-class>
+
+ The start-class property is automatically populated by spring-boot-maven-plugin and it is
+ used in spring-boot-starter-parent.
+
+ See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
+ -->
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>1.4.4.RELEASE</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-stream-kafka-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-stream-kafka-common</artifactId>
+ <name>spring-stream-kafka-common</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Commons</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.util.MimeType;
+
+/**
+ * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject()
+ *
+ * This code:
+ * if (payload instanceof String) {
+ * return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
+ * : MimeTypeUtils.TEXT_PLAIN;
+ * }
+ *
+ * Changes messages from:
+ * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ *
+ * to:
+ * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ *
+ *
+ * By means of my custom converter we end up having:
+ *
+ * contentType "application/json"{"name":"example message","description":"this is some description"}
+ *
+ * About "application/json" and character encoding:
+ * https://tools.ietf.org/html/rfc7158#section-8.1 "The default encoding is UTF-8" :)
+ *
+ *
+ *
+ */
+
+
+//
+// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload
+// org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary
+// org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal
+//
+// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in
+// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on
+// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is
+// always using UTF-8. It does not matter what character set I am receiving :/
+//
+//
+//
+//
+// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems
+// not using Spring (Spring is creating a mess with the headers...)
+//
+//
+
+
+
+//
+// Kafka messages WITHOUT MyCustomMessageConverter:
+// headers:
+// contentType "text/plain" <--------------- perhaps this is important.
+// originalContentType "application/json;charset=UTF-8"
+// payload:
+// {"name":"example message","description":"this is some description"}
+//
+//
+// Kafka messages WITH MyCustomMessageConverter:
+// headers:
+// contentType "application/json"
+// payload:
+// {"name":"example message","description":"this is some description"}
+//
+
+
+// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :(
+public class MyCustomMessageConverter extends MappingJackson2MessageConverter {
+
+ public MyCustomMessageConverter() {
+ super(new MimeType("application", "json"));
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+public class Product {
+ private String name;
+ private String description;
+
+ /**
+ * Required for deserialization from Jackson.
+ */
+ protected Product() {
+
+ }
+
+ public Product(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getDescription() {
+ return this.description;
+ }
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-stream-kafka-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-stream-kafka-consumer</artifactId>
+ <name>spring-stream-kafka-consumer</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Consumer</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+
+ @Bean
+ CommandLineRunner lookup(Receiver receiver) {
+ return args -> {
+
+ };
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import javax.inject.Named;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Named
+public class DummyService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class);
+
+
+ public void iAmVeryDummy(String message) {
+ LOGGER.info("I am a dummy service: '{}'", message);
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import javax.inject.Inject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+
+@EnableBinding(Sink.class)
+public class Receiver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+ private final DummyService dummyService;
+
+ @Inject
+ public Receiver(DummyService dummyService) {
+ this.dummyService = dummyService;
+ }
+
+ @StreamListener(Sink.INPUT)
+ public void handle(Product product) {
+ LOGGER.info("product name='{}'", product.getName());
+ LOGGER.info("product description='{}'", product.getDescription());
+
+ dummyService.iAmVeryDummy(product.getName());
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ReceiverConfig {
+
+ @Bean
+ public Receiver receiver(DummyService dummyService) {
+ return new Receiver(dummyService);
+ }
+
+
+// @Bean
+// public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+// MyCustomMessageConverter converter = new MyCustomMessageConverter();
+// converter.setSerializedPayloadClass(String.class);
+// if (objectMapper != null) {
+// converter.setObjectMapper(objectMapper);
+// }
+//
+// return converter;
+// }
+
+
+// @Bean
+// public MessageConverter avroMessageConverter() throws IOException {
+// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+// return converter;
+// }
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework: info
+ ROOT: info
+
+kafka:
+ topic: test
+ messageKey: keyTest
+
+spring:
+ main:
+ banner-mode: "off"
+ # We do not need Tomcat running (this application runs from console)
+ web-environment: false
+ cloud:
+ stream:
+ bindings:
+ input:
+ binder: kafka
+ destination: test
+ group: helloworld
+ contentType: application/json
+ startOffset: latest
+ consumer:
+ concurrency: 1
+ partitioned: false
+ # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+ headerMode: embeddedHeaders
+ maxAttempts: 3
+ backOffInitialInterval: 1000
+ instanceCount: 1
+ instanceIndex: 0
+ kafka:
+ binder:
+ zkNodes: "kafka:2181"
+ brokers: "kafka:9092,kafka:9093,kafka:9094"
+ autoCreateTopics: true
+
+# Health indicator for binders.
+management:
+ health:
+ binders:
+ enabled: true
+
+# ME QUEDA 7.3 Partitioning
+# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
--- /dev/null
+package de.example.spring.kafka;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doNothing;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = { Receiver.class })
+@DirtiesContext
+public class ReceiverShould {
+
+ @Inject
+ Sink source;
+
+ @MockBean
+ DummyService dummyService;
+
+ @Test
+ public void callSomeDummy() {
+ String productName = "product";
+ String productDescription = "productDescription";
+ Product product = new Product(productName, productDescription);
+ ArgumentCaptor<String> dummyArgCaptor = ArgumentCaptor.forClass(String.class);
+ doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture());
+
+ Message<Product> message = new GenericMessage<>(product);
+ source.input().send(message);
+
+ assertThat(dummyArgCaptor.getValue(), is(product.getName()));
+ }
+
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-stream-kafka-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-stream-kafka-producer</artifactId>
+ <name>spring-stream-kafka-producer</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Producer</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+ @Bean
+ CommandLineRunner lookup(Sender sender) {
+ return args -> {
+ String message = "example message";
+
+
+ if (args.length > 0) {
+ message = args[0];
+ }
+
+ sender.sendMessage(message);
+ };
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+/**
+ * En lugar de esta definicion "custom" usaremos las dos que
+ * vienen por defecto en:
+ * org.springframework.cloud.stream.messaging.Sink
+ * org.springframework.cloud.stream.messaging.Source
+ *
+ * Esta definicion custom se usaría igual que Sink y Source :)
+ * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
+ */
+public interface InputOutputChannels {
+
+ @Input("inputChannel")
+ SubscribableChannel input();
+
+ @Output("outputChannel")
+ MessageChannel output();
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import javax.inject.Inject;
+
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.support.MessageBuilder;
+
+@EnableBinding(Source.class)
+public class Sender {
+ // You could use here your custom interface. See: InputOutputChannels :)
+ private final Source source;
+
+ @Inject
+ public Sender(Source source) {
+ this.source = source;
+ }
+
+ public void sendMessage(String message) {
+ Product product = new Product(message, "this is some description");
+ source.output().send(MessageBuilder.withPayload(product).build());
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SenderConfig {
+
+ @Bean
+ public Sender sender(Source source) {
+ return new Sender(source);
+ }
+
+// @Bean
+// public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+// MyCustomMessageConverter converter = new MyCustomMessageConverter();
+// converter.setSerializedPayloadClass(String.class);
+// if (objectMapper != null) {
+// converter.setObjectMapper(objectMapper);
+// }
+//
+// return converter;
+// }
+
+// @Bean
+// public MessageConverter avroMessageConverter() throws IOException {
+// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+// return converter;
+// }
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework: info
+ ROOT: info
+
+spring:
+ main:
+ banner-mode: "off"
+ # We do not need Tomcat running (this application runs from console)
+ web-environment: false
+ cloud:
+ stream:
+ bindings:
+ output:
+ binder: kafka
+ destination: test
+ contentType: application/json
+ # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+ headerMode: embeddedHeaders
+ instanceCount: 1
+ instanceIndex: 0
+ kafka:
+ binder:
+ zkNodes: "kafka:2181"
+ brokers: "kafka:9092,kafka:9093,kafka:9094"
+ autoCreateTopics: true
+
+# Health indicator for binders.
+management:
+ health:
+ binders:
+ enabled: true
+
+
+# ME QUEDA 7.3 Partitioning
+# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
--- /dev/null
+package de.example.spring.kafka;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.Message;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = { Sender.class })
+@DirtiesContext
+public class SenderShould {
+
+ @Inject
+ Source source;
+
+ @Inject
+ Sender sender;
+
+ @Inject
+ private MessageCollector messageCollector;
+
+ @Test
+ public void sendSomeProduct() {
+ Product product = new Product("hello", "this is some description");
+
+ sender.sendMessage("hello");
+
+ Message<Product> received = (Message<Product>) messageCollector.forChannel(source.output()).poll();
+
+ assertThat(received.getPayload().getDescription(), is(product.getDescription()));
+ }
+
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-stream-kafka-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-stream-kafka-registry</artifactId>
+ <name>spring-stream-kafka-registry</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Registry</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-server</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer;
+
+@SpringBootApplication
+@EnableSchemaRegistryServer
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework: info
+ ROOT: info
+
+server:
+ port: 8888
+spring:
+ cloud:
+ stream:
+ schema:
+ server:
+ path: schema