--- /dev/null
+It does not work on gradle. Going back to the old and good maven configuration.
+
+
+Documentation: http://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
+ https://github.com/mbogoevici/spring-cloud-stream
+
+Examples: https://github.com/spring-cloud/spring-cloud-stream-samples
+
--- /dev/null
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-example</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>spring-cloud-stream-kafka-example</name>
+ <url>https://gumartinm.name/</url>
+ <description>Spring Cloud Stream Kafka</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name/</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavForFun</url>
+ </scm>
+
+ <modules>
+ <module>spring-cloud-stream-kafka-consumer-example</module>
+ <module>spring-cloud-stream-kafka-producer-example</module>
+ </modules>
+
+</project>
+
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-cloud-stream-kafka-consumer-example</artifactId>
+ <name>spring-cloud-stream-kafka-consumer-example</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Consumer</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+
+ @Bean
+ CommandLineRunner lookup(Receiver receiver) {
+ return args -> {
+
+ };
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.converter.AbstractMessageConverter;
+import org.springframework.util.MimeType;
+
+public class MyCustomMessageConverter extends AbstractMessageConverter {
+
+ public MyCustomMessageConverter() {
+ super(new MimeType("application", "example"));
+ }
+
+ @Override
+ protected boolean supports(Class<?> clazz) {
+ return (String.class == clazz);
+ }
+
+ @Override
+ protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
+ Object payload = message.getPayload();
+
+ logger.info("convertFromInternal, payload: " + payload);
+
+ return payload;
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+
+@EnableBinding(Sink.class)
+public class Receiver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+
+ @StreamListener(Sink.INPUT)
+ public void handle(String message) {
+ LOGGER.info("received message='{}'", message);
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.util.MimeType;
+
+import java.io.IOException;
+
+@Configuration
+public class ReceiverConfig {
+
+ @Bean
+ public Receiver receiver() {
+ return new Receiver();
+ }
+
+
+ @Bean
+ public MessageConverter customMessageConverter() {
+ return new MyCustomMessageConverter();
+ }
+
+ @Bean
+ public MessageConverter avroMessageConverter() throws IOException {
+ AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+ converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+ return converter;
+ }
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework: info
+ ROOT: info
+
+spring:
+ main:
+ banner-mode: "off"
+
+kafka:
+ topic: test
+ messageKey: keyTest
+
+
+
+# *************************** BINDERS CONFIGURATION ***************************
+spring:
+ cloud:
+ stream:
+ bindings:
+ input:
+ binder: kafka
+ destination: test
+ group: helloworld
+ contentType: ""
+ consumer:
+ concurrency: 1
+ partitioned: false
+ headerMode: raw
+ maxAttempts: 3
+ backOffInitialInterval:
+ instanceCount: 1
+ instanceIndex: 0
+ kafka:
+ binder:
+ brokers: "kafka:9092,kafka:9093,kafka:9094"
+ autoCreateTopics: true
+
+# Health indicator for binders.
+management:
+ health:
+ binders:
+ enabled: true
+
+# ME QUEDA 7.3 Partitioning
+# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-starter-parent</artifactId>
+ <version>Camden.SR4</version>
+ </parent>
+
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>spring-cloud-stream-kafka-example-bom</name>
+
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka BOM example</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+ <properties>
+ <skip.unit.tests>false</skip.unit.tests>
+ <skip.integration.tests>false</skip.integration.tests>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-starter-stream-kafka</artifactId>
+ <version>1.1.1.RELEASE</version>
+ </dependency>
+
+ <!-- Required for using Apache Avro -->
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-schema</artifactId>
+ <version>1.1.1.RELEASE</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.7</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Specification-Title>${project.description}</Specification-Title>
+ <Specification-Version>${project.version}</Specification-Version>
+ <Specification-Vendor>${project.organization.name}</Specification-Vendor>
+ <Implementation-Title>${project.description}</Implementation-Title>
+ <Implementation-Version>${project.version}</Implementation-Version>
+ <Implementation-Vendor>${project.organization.name}</Implementation-Vendor>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <!-- Used for unit tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>2.19.1</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <!-- Skips unit tests if the value of skip.unit.tests property is true -->
+ <skipTests>${skip.unit.tests}</skipTests>
+ <!-- Excludes integration tests when unit tests are run. -->
+ <excludes>
+ <exclude>**/*IT.java</exclude>
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <!-- Used for integration tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <!-- Skips integration tests if the value of skip.integration.tests
+ property is true -->
+ <skipTests>${skip.integration.tests}</skipTests>
+ <includes>
+ <include>**/*IT.java</include>
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!--
+ It will create our running jar file.
+ The main class must be located in: src/main/java/de/example/spring/kafka
+ otherwise you need an explicit declaration using this property
+ <start-class>de.exmaple.spring.kafka.Application</start-class>
+
+ The start-class property is automatically populated by spring-boot-maven-plugin and it is
+ used in spring-boot-starter-parent.
+
+ See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
+ -->
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>1.4.4.RELEASE</version>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-cloud-stream-kafka-producer-example</artifactId>
+ <name>spring-cloud-stream-kafka-producer-example</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Producer</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ </dependency>
+ </dependencies>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+ @Bean
+ CommandLineRunner lookup(Sender sender) {
+ return args -> {
+ String topic = "test";
+ String message = "example message";
+
+
+ if (args.length > 0) {
+ topic = args[0];
+ message = args[1];
+ }
+
+ sender.sendMessage(message);
+ };
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+/**
+ * En lugar de esta definicion "custom" usaremos las dos que
+ * vienen por defecto en:
+ * org.springframework.cloud.stream.messaging.Sink
+ * org.springframework.cloud.stream.messaging.Source
+ *
+ * Esta definicion custom se usaría igual que Sink y Source :)
+ * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
+ */
+public interface InputOutputChannels {
+
+ @Input("inputChannel")
+ SubscribableChannel input();
+
+ @Output("outputChannel")
+ MessageChannel output();
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.support.MessageBuilder;
+
+import javax.inject.Inject;
+
+public class Sender {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
+
+ // Aquí podrías haber usado tu custom interface: InputOutputChannels :)
+ private final Source source;
+
+ @Inject
+ public Sender(Source source) {
+ this.source = source;
+ }
+
+ public void sendMessage(String message) {
+ source.output().send(MessageBuilder.withPayload(message).build());
+ }
+}
--- /dev/null
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SenderConfig {
+
+ @Bean
+ public Sender sender(Source source) {
+ return new Sender(source);
+ }
+}
--- /dev/null
+logging:
+ level:
+ org:
+ springframework: info
+ ROOT: info
+
+spring:
+ main:
+ banner-mode: "off"
+
+
+
+# *************************** BINDERS CONFIGURATION ***************************
+spring:
+ cloud:
+ stream:
+ bindings:
+ output:
+ binder: kafka
+ destination: test
+ contentType: ""
+ headerMode: raw
+ instanceCount: 1
+ instanceIndex: 0
+ kafka:
+ binder:
+ brokers: "kafka:9092,kafka:9093,kafka:9094"
+ autoCreateTopics: true
+
+# Health indicator for binders.
+management:
+ health:
+ binders:
+ enabled: true
+
+
+# ME QUEDA 7.3 Partitioning
+# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options