Renaming modules in SpringCloudStream
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Tue, 31 Jan 2017 00:52:29 +0000 (01:52 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Tue, 31 Jan 2017 00:52:29 +0000 (01:52 +0100)
43 files changed:
SpringJava/Kafka/SpringCloudStream/pom.xml
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml [new file with mode: 0644]

index 712b080..2b4a46a 100644 (file)
@@ -2,10 +2,10 @@
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <groupId>de.example.spring.kafka</groupId>
-    <artifactId>spring-cloud-stream-kafka-example</artifactId>
+    <artifactId>spring-stream-kafka-example</artifactId>
     <version>1.0-SNAPSHOT</version>
     <packaging>pom</packaging>
-    <name>spring-cloud-stream-kafka-example</name>
+    <name>spring-stream-kafka-example</name>
     <url>https://gumartinm.name/</url>
     <description>Spring Cloud Stream Kafka</description>
     <organization>
     </scm>
 
     <modules>
-        <module>spring-cloud-stream-kafka-example-bom</module>
-        <module>spring-cloud-stream-kafka-common-example</module>
-        <module>spring-cloud-stream-kafka-consumer-example</module>
-        <module>spring-cloud-stream-kafka-producer-example</module>
+        <module>spring-stream-kafka-bom</module>
+        <module>spring-stream-kafka-common</module>
+        <module>spring-stream-kafka-consumer</module>
+        <module>spring-stream-kafka-producer</module>
     </modules>
 
 </project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml
deleted file mode 100644 (file)
index c7d4f82..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" 
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>de.example.spring.kafka</groupId>
-        <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-
-       <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
-       <name>spring-cloud-stream-kafka-common-example</name>
-       <url>https://gumartinm.name</url>
-       <description>Spring Cloud Stream Kafka Commons</description>
-       <organization>
-           <name>gumartinm</name>
-           <url>https://gumartinm.name</url>
-       </organization>
-    <scm>
-        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
-        <url>https://git.gumartinm.name/JavaForFun</url>
-    </scm>
-
-</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java
deleted file mode 100644 (file)
index 529bcb0..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.messaging.converter.MappingJackson2MessageConverter;
-import org.springframework.util.MimeType;
-
-/**
- * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject()
- *
- * This code:
- *                     if (payload instanceof String) {
- *                             return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
- *                                             : MimeTypeUtils.TEXT_PLAIN;
- *                     }
- *
- * Changes messages from: 
- * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
- * 
- * to:
- * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
- * 
- * 
- * By means of my custom converter we end up having:
- * 
- * contentType "application/json"{"name":"example message","description":"this is some description"}
- * 
- * About "application/json" and character encoding:
- * https://tools.ietf.org/html/rfc7158#section-8.1   "The default encoding is UTF-8"  :)
- * 
- * 
- * 
- */
-
-
-// 
-// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload
-//                                                          org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary
-//                               org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal                              
-// 
-// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in
-// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on
-// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is
-// always using UTF-8. It does not matter what character set I am receiving :/
-// 
-// 
-// 
-// 
-// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems
-// not using Spring (Spring is creating a mess with the headers...)
-// 
-//
-
-
-
-// 
-//  Kafka messages WITHOUT MyCustomMessageConverter:
-//  headers:
-//     contentType "text/plain"   <--------------- perhaps this is important.
-//     originalContentType "application/json;charset=UTF-8"
-//  payload:
-//     {"name":"example message","description":"this is some description"}
-//
-//
-//  Kafka messages WITH MyCustomMessageConverter:
-//  headers:
-//     contentType "application/json"
-//  payload:
-//     {"name":"example message","description":"this is some description"}
-// 
-
-
-// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :(
-public class MyCustomMessageConverter extends MappingJackson2MessageConverter {
-
-  public MyCustomMessageConverter() {
-               super(new MimeType("application", "json"));
-  }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java
deleted file mode 100644 (file)
index 6053200..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-package de.example.spring.kafka;
-
-public class Product {
-       private String name;
-       private String description;
-       
-       /**
-        * Required for deserialization from Jackson.
-        */
-       protected Product() {
-
-       }
-
-       public Product(String name, String description) {
-               this.name = name;
-               this.description = description;
-       }
-
-       public String getName() {
-               return this.name;
-       }
-       
-       public String getDescription() {
-               return this.description;
-       }
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml
deleted file mode 100644 (file)
index e8cfa41..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" 
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>de.example.spring.kafka</groupId>
-        <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-
-       <artifactId>spring-cloud-stream-kafka-consumer-example</artifactId>
-       <name>spring-cloud-stream-kafka-consumer-example</name>
-       <url>https://gumartinm.name</url>
-       <description>Spring Cloud Stream Kafka Consumer</description>
-       <organization>
-           <name>gumartinm</name>
-           <url>https://gumartinm.name</url>
-       </organization>
-    <scm>
-        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
-        <url>https://git.gumartinm.name/JavaForFun</url>
-    </scm>
-
-
-    <dependencies>
-        <dependency>
-            <groupId>javax.inject</groupId>
-            <artifactId>javax.inject</artifactId>
-            <version>1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>de.example.spring.kafka</groupId>
-            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-             </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java
deleted file mode 100644 (file)
index c367219..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-@SpringBootApplication
-public class Application {
-
-  public static void main(String[] args) {
-    SpringApplication.run(Application.class);
-  }
-
-
-  @Bean
-  CommandLineRunner lookup(Receiver receiver) {
-    return args -> {
-
-    };
-  }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java
deleted file mode 100644 (file)
index 70f7a62..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.example.spring.kafka;
-
-import javax.inject.Named;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Named
-public class DummyService {
-       private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class);
-
-
-       public void iAmVeryDummy(String message) {
-               LOGGER.info("I am a dummy service: '{}'", message);
-       }
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java
deleted file mode 100644 (file)
index 7681ab5..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-package de.example.spring.kafka;
-
-import javax.inject.Inject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.messaging.Sink;
-
-@EnableBinding(Sink.class)
-public class Receiver {
-  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
-  
-  private final DummyService dummyService;
-
-  @Inject
-  public Receiver(DummyService dummyService) {
-       this.dummyService = dummyService;
-  }
-
-  @StreamListener(Sink.INPUT)
-  public void handle(Product product) {
-    LOGGER.info("product name='{}'", product.getName());
-    LOGGER.info("product description='{}'", product.getDescription());
-    
-    dummyService.iAmVeryDummy(product.getName());
-  }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java
deleted file mode 100644 (file)
index f635538..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class ReceiverConfig {
-
-  @Bean
-  public Receiver receiver(DummyService dummyService) {
-    return new Receiver(dummyService);
-  }
-
-
-//  @Bean
-//  public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
-//       MyCustomMessageConverter converter = new MyCustomMessageConverter();
-//       converter.setSerializedPayloadClass(String.class);
-//       if (objectMapper != null) {
-//               converter.setObjectMapper(objectMapper);
-//       }
-//       
-//       return converter;
-//  }
-
-
-//  @Bean
-//  public MessageConverter avroMessageConverter() throws IOException {
-//      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
-//      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
-//      return converter;
-//  }
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml
deleted file mode 100644 (file)
index f4f8991..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-logging:
-  level:
-    org:
-      springframework: info
-    ROOT: info
-
-kafka:
-  topic: test
-  messageKey: keyTest
-
-spring:
-  main:
-    banner-mode: "off"
-    # We do not need Tomcat running (this application runs from console)
-    web-environment: false
-  cloud:
-    stream:
-      bindings:
-        input:
-          binder: kafka
-          destination: test
-          group: helloworld
-          contentType: application/json
-          startOffset: latest
-          consumer:
-            concurrency: 1
-            partitioned: false
-            # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
-            headerMode: embeddedHeaders
-            maxAttempts: 3
-            backOffInitialInterval: 1000
-      instanceCount: 1
-      instanceIndex: 0
-      kafka:
-        binder:
-          zkNodes: "kafka:2181"
-          brokers: "kafka:9092,kafka:9093,kafka:9094"
-          autoCreateTopics: true
-
-# Health indicator for binders.
-management:
-  health:
-    binders:
-      enabled: true
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java
deleted file mode 100644 (file)
index 6291ea4..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-package de.example.spring.kafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doNothing;
-
-import javax.inject.Inject;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.cloud.stream.messaging.Sink;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.GenericMessage;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@SpringBootTest(classes = { Receiver.class })
-@DirtiesContext
-public class ReceiverShould {
-
-       @Inject
-       Sink source;
-       
-       @MockBean
-       DummyService dummyService;
-       
-       @Test
-       public void callSomeDummy() {
-               String productName = "product";
-               String productDescription = "productDescription";
-               Product product = new Product(productName, productDescription);
-           ArgumentCaptor<String> dummyArgCaptor = ArgumentCaptor.forClass(String.class);
-               doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture());
-               
-           Message<Product> message = new GenericMessage<>(product);
-           source.input().send(message);
-
-           assertThat(dummyArgCaptor.getValue(), is(product.getName()));
-       }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml
deleted file mode 100644 (file)
index 3d5df09..0000000
+++ /dev/null
@@ -1,207 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" 
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.springframework.cloud</groupId>
-        <artifactId>spring-cloud-starter-parent</artifactId>
-        <version>Camden.SR4</version>
-    </parent>
-
-    <groupId>de.example.spring.kafka</groupId>
-       <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
-       <version>1.0-SNAPSHOT</version>
-    <packaging>pom</packaging>
-    <name>spring-cloud-stream-kafka-example-bom</name>
-       
-       <url>https://gumartinm.name</url>
-       <description>Spring Cloud Stream Kafka BOM example</description>
-       <organization>
-           <name>gumartinm</name>
-           <url>https://gumartinm.name</url>
-       </organization>
-    <scm>
-        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
-        <url>https://git.gumartinm.name/JavaForFun</url>
-    </scm>
-       <properties>
-        <skip.unit.tests>false</skip.unit.tests>
-        <skip.integration.tests>false</skip.integration.tests>
-               <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-       </properties>
-
-       <dependencies>
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
-            <version>1.1.1.RELEASE</version>
-        </dependency>
-
-        <!-- Required for using Kafka 0.10.1.1 -->
-        <dependency>
-            <groupId>org.springframework.kafka</groupId>
-            <artifactId>spring-kafka</artifactId>
-            <version>1.1.2.RELEASE</version>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.integration</groupId>
-            <artifactId>spring-integration-kafka</artifactId>
-            <version>2.1.0.RELEASE</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
-            <version>0.10.1.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- Required for using Apache Avro -->
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-stream-schema</artifactId>
-            <version>1.1.1.RELEASE</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-            <version>1.8.1</version>
-        </dependency>
-        
-        <!-- Tests -->
-         <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-stream-test-support</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <dependencyManagement>
-
-        <!--
-            Schema Registry Server. See: http://docs.spring.io/spring-cloud-stream/docs/current/reference/html/contenttypemanagement.html
-        -->
-        <dependencies>
-            <dependency>
-                <groupId>org.springframework.cloud</groupId>
-                <artifactId>spring-cloud-stream-server</artifactId>
-            </dependency>
-        </dependencies>
-
-    </dependencyManagement>
-    
-       <build>
-               <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                    <encoding>${project.build.sourceEncoding}</encoding>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
-                <configuration>
-                    <encoding>${project.build.sourceEncoding}</encoding>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <configuration>
-                    <archive>
-                        <manifestEntries>
-                            <Specification-Title>${project.description}</Specification-Title>
-                            <Specification-Version>${project.version}</Specification-Version>
-                            <Specification-Vendor>${project.organization.name}</Specification-Vendor>
-                            <Implementation-Title>${project.description}</Implementation-Title>
-                            <Implementation-Version>${project.version}</Implementation-Version>
-                            <Implementation-Vendor>${project.organization.name}</Implementation-Vendor>
-                        </manifestEntries>
-                    </archive>
-                </configuration>
-            </plugin>
-
-            <!-- Used for unit tests -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.apache.maven.surefire</groupId>
-                        <artifactId>surefire-junit47</artifactId>
-                        <version>2.18.1</version>
-                    </dependency>
-                </dependencies>
-                <configuration>
-                    <!-- Skips unit tests if the value of skip.unit.tests property is true -->
-                    <skipTests>${skip.unit.tests}</skipTests>
-                    <!-- Excludes integration tests when unit tests are run. -->
-                    <excludes>
-                        <exclude>**/*IT.java</exclude>
-                        <exclude>**/*IntegrationTest.java</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-
-            <!-- Used for integration tests -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                        <configuration>
-                            <!-- Skips integration tests if the value of skip.integration.tests 
-                                property is true -->
-                            <skipTests>${skip.integration.tests}</skipTests>
-                            <includes>
-                                <include>**/*IT.java</include>
-                                <include>**/*IntegrationTest.java</include>
-                            </includes>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-       </plugins>
-
-         <pluginManagement>
-            <plugins>
-                <!--
-                      It will create our running jar file.
-                      The main class must be located in: src/main/java/de/example/spring/kafka
-                      otherwise you need an explicit declaration using this property
-                      <start-class>de.exmaple.spring.kafka.Application</start-class>
-
-                      The start-class property is automatically populated by spring-boot-maven-plugin and it is
-                      used in spring-boot-starter-parent.
-                    
-                      See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
-                 -->
-                 <plugin>
-                     <groupId>org.springframework.boot</groupId>
-                     <artifactId>spring-boot-maven-plugin</artifactId>
-                     <version>1.4.4.RELEASE</version>
-                 </plugin>
-            </plugins>
-         </pluginManagement>
-       </build>
-</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml
deleted file mode 100644 (file)
index 3335647..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" 
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>de.example.spring.kafka</groupId>
-        <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-
-       <artifactId>spring-cloud-stream-kafka-producer-example</artifactId>
-       <name>spring-cloud-stream-kafka-producer-example</name>
-       <url>https://gumartinm.name</url>
-       <description>Spring Cloud Stream Kafka Producer</description>
-       <organization>
-           <name>gumartinm</name>
-           <url>https://gumartinm.name</url>
-       </organization>
-    <scm>
-        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
-        <url>https://git.gumartinm.name/JavaForFun</url>
-    </scm>
-
-       <dependencies>
-        <dependency>
-            <groupId>javax.inject</groupId>
-            <artifactId>javax.inject</artifactId>
-            <version>1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>de.example.spring.kafka</groupId>
-            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-               <plugin>
-               <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-             </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java
deleted file mode 100644 (file)
index 1034a98..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-@SpringBootApplication
-public class Application {
-
-  public static void main(String[] args) {
-    SpringApplication.run(Application.class);
-  }
-
-  @Bean
-  CommandLineRunner lookup(Sender sender) {
-    return args -> {
-      String message = "example message";
-
-
-      if (args.length > 0) {
-        message = args[0];
-      }
-
-      sender.sendMessage(message);
-    };
-  }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java
deleted file mode 100644 (file)
index ab612ec..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.cloud.stream.annotation.Input;
-import org.springframework.cloud.stream.annotation.Output;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.SubscribableChannel;
-
-/**
- * En lugar de esta definicion "custom" usaremos las dos que
- * vienen por defecto en:
- * org.springframework.cloud.stream.messaging.Sink
- * org.springframework.cloud.stream.messaging.Source
- *
- * Esta definicion custom se usaría igual que Sink y Source :)
- * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
- */
-public interface InputOutputChannels {
-
-  @Input("inputChannel")
-  SubscribableChannel input();
-
-  @Output("outputChannel")
-  MessageChannel output();
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java
deleted file mode 100644 (file)
index 1a778ca..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-package de.example.spring.kafka;
-
-import javax.inject.Inject;
-
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.messaging.support.MessageBuilder;
-
-@EnableBinding(Source.class)
-public class Sender {
-  // You could use here your custom interface. See: InputOutputChannels :)
-  private final Source source;
-
-  @Inject
-  public Sender(Source source) {
-    this.source = source;
-  }
-
-  public void sendMessage(String message) {
-         Product product = new Product(message, "this is some description");
-         source.output().send(MessageBuilder.withPayload(product).build());
-  }
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java
deleted file mode 100644 (file)
index e64a561..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class SenderConfig {
-
-  @Bean
-  public Sender sender(Source source) {
-    return new Sender(source);
-  }
-  
-//  @Bean
-//  public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
-//       MyCustomMessageConverter converter = new MyCustomMessageConverter();
-//       converter.setSerializedPayloadClass(String.class);
-//       if (objectMapper != null) {
-//               converter.setObjectMapper(objectMapper);
-//       }
-//       
-//       return converter;
-//  }
-
-//  @Bean
-//  public MessageConverter avroMessageConverter() throws IOException {
-//      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
-//      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
-//      return converter;
-//  }
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml
deleted file mode 100644 (file)
index c2546c4..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-logging:
-  level:
-    org:
-      springframework: info
-    ROOT: info
-
-spring:
-  main:
-    banner-mode: "off"
-    # We do not need Tomcat running (this application runs from console)
-    web-environment: false
-  cloud:
-    stream:
-      bindings:
-        output:
-          binder: kafka
-          destination: test
-          contentType: application/json
-          # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
-          headerMode: embeddedHeaders
-      instanceCount: 1
-      instanceIndex: 0
-      kafka:
-        binder:
-          zkNodes: "kafka:2181"
-          brokers: "kafka:9092,kafka:9093,kafka:9094"
-          autoCreateTopics: true
-
-# Health indicator for binders.
-management:
-  health:
-    binders:
-      enabled: true
-
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java
deleted file mode 100644 (file)
index 71fbfec..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-package de.example.spring.kafka;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-import javax.inject.Inject;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
-import org.springframework.messaging.Message;
-import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@SpringBootTest(classes = { Sender.class })
-@DirtiesContext
-public class SenderShould {
-
-       @Inject
-       Source source;
-       
-       @Inject
-       Sender sender;
-       
-       @Inject
-       private MessageCollector messageCollector;
-       
-       @Test
-       public void sendSomeProduct() {
-               Product product = new Product("hello", "this is some description");
-               
-               sender.sendMessage("hello");
-               
-               Message<Product> received = (Message<Product>) messageCollector.forChannel(source.output()).poll();
-               
-           assertThat(received.getPayload().getDescription(), is(product.getDescription()));
-       }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml
deleted file mode 100644 (file)
index e530f21..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" 
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>de.example.spring.kafka</groupId>
-        <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-
-       <artifactId>spring-cloud-stream-kafka-registry-example</artifactId>
-       <name>spring-cloud-stream-kafka-registry-example</name>
-       <url>https://gumartinm.name</url>
-       <description>Spring Cloud Stream Kafka Registry</description>
-       <organization>
-           <name>gumartinm</name>
-           <url>https://gumartinm.name</url>
-       </organization>
-    <scm>
-        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
-        <url>https://git.gumartinm.name/JavaForFun</url>
-    </scm>
-
-
-    <dependencies>
-        <dependency>
-            <groupId>javax.inject</groupId>
-            <artifactId>javax.inject</artifactId>
-            <version>1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>de.example.spring.kafka</groupId>
-            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-
-        
-        <dependency>
-            <groupId>org.springframework.cloud</groupId>
-            <artifactId>spring-cloud-stream-server</artifactId>
-        </dependency>
-       
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-             </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java
deleted file mode 100644 (file)
index 9817e62..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer;
-
-@SpringBootApplication
-@EnableSchemaRegistryServer
-public class Application {
-
-  public static void main(String[] args) {
-    SpringApplication.run(Application.class);
-  }
-
-}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml
deleted file mode 100644 (file)
index bebb6c8..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-logging:
-  level:
-    org:
-      springframework: info
-    ROOT: info
-
-server:
-  port: 8888
-spring:
-  cloud:
-    stream:
-      schema:
-        server:
-          path: schema
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml
new file mode 100644 (file)
index 0000000..319e2ad
--- /dev/null
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.springframework.cloud</groupId>
+        <artifactId>spring-cloud-starter-parent</artifactId>
+        <version>Camden.SR4</version>
+    </parent>
+
+    <groupId>de.example.spring.kafka</groupId>
+       <artifactId>spring-stream-kafka-bom</artifactId>
+       <version>1.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+    <name>spring-stream-kafka-bom</name>
+       
+       <url>https://gumartinm.name</url>
+       <description>Spring Cloud Stream Kafka BOM example</description>
+       <organization>
+           <name>gumartinm</name>
+           <url>https://gumartinm.name</url>
+       </organization>
+    <scm>
+        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+        <url>https://git.gumartinm.name/JavaForFun</url>
+    </scm>
+       <properties>
+        <skip.unit.tests>false</skip.unit.tests>
+        <skip.integration.tests>false</skip.integration.tests>
+               <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+       </properties>
+
+       <dependencies>
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
+            <version>1.1.1.RELEASE</version>
+        </dependency>
+
+        <!-- Required for using Kafka 0.10.1.1 -->
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <version>1.1.2.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-kafka</artifactId>
+            <version>2.1.0.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>0.10.1.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Required for using Apache Avro -->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-stream-schema</artifactId>
+            <version>1.1.1.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+        
+        <!-- Tests -->
+         <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-stream-test-support</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+
+        <!--
+            Schema Registry Server. See: http://docs.spring.io/spring-cloud-stream/docs/current/reference/html/contenttypemanagement.html
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework.cloud</groupId>
+                <artifactId>spring-cloud-stream-server</artifactId>
+            </dependency>
+        </dependencies>
+        -->
+
+    </dependencyManagement>
+    
+       <build>
+               <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-resources-plugin</artifactId>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifestEntries>
+                            <Specification-Title>${project.description}</Specification-Title>
+                            <Specification-Version>${project.version}</Specification-Version>
+                            <Specification-Vendor>${project.organization.name}</Specification-Vendor>
+                            <Implementation-Title>${project.description}</Implementation-Title>
+                            <Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>${project.organization.name}</Implementation-Vendor>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+            </plugin>
+
+            <!-- Used for unit tests -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.maven.surefire</groupId>
+                        <artifactId>surefire-junit47</artifactId>
+                        <version>2.18.1</version>
+                    </dependency>
+                </dependencies>
+                <configuration>
+                    <!-- Skips unit tests if the value of skip.unit.tests property is true -->
+                    <skipTests>${skip.unit.tests}</skipTests>
+                    <!-- Excludes integration tests when unit tests are run. -->
+                    <excludes>
+                        <exclude>**/*IT.java</exclude>
+                        <exclude>**/*IntegrationTest.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+
+            <!-- Used for integration tests -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Skips integration tests if the value of skip.integration.tests 
+                                property is true -->
+                            <skipTests>${skip.integration.tests}</skipTests>
+                            <includes>
+                                <include>**/*IT.java</include>
+                                <include>**/*IntegrationTest.java</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+       </plugins>
+
+         <pluginManagement>
+            <plugins>
+                <!--
+                      It will create our running jar file.
+                      The main class must be located in: src/main/java/de/example/spring/kafka
+                      otherwise you need an explicit declaration using this property
+                      <start-class>de.exmaple.spring.kafka.Application</start-class>
+
+                      The start-class property is automatically populated by spring-boot-maven-plugin and it is
+                      used in spring-boot-starter-parent.
+                    
+                      See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
+                 -->
+                 <plugin>
+                     <groupId>org.springframework.boot</groupId>
+                     <artifactId>spring-boot-maven-plugin</artifactId>
+                     <version>1.4.4.RELEASE</version>
+                 </plugin>
+            </plugins>
+         </pluginManagement>
+       </build>
+</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml
new file mode 100644 (file)
index 0000000..068ada0
--- /dev/null
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>de.example.spring.kafka</groupId>
+        <artifactId>spring-stream-kafka-bom</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+       <artifactId>spring-stream-kafka-common</artifactId>
+       <name>spring-stream-kafka-common</name>
+       <url>https://gumartinm.name</url>
+       <description>Spring Cloud Stream Kafka Commons</description>
+       <organization>
+           <name>gumartinm</name>
+           <url>https://gumartinm.name</url>
+       </organization>
+    <scm>
+        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+        <url>https://git.gumartinm.name/JavaForFun</url>
+    </scm>
+
+</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java
new file mode 100644 (file)
index 0000000..529bcb0
--- /dev/null
@@ -0,0 +1,78 @@
+package de.example.spring.kafka;
+
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.util.MimeType;
+
+/**
+ * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject()
+ *
+ * This code:
+ *                     if (payload instanceof String) {
+ *                             return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
+ *                                             : MimeTypeUtils.TEXT_PLAIN;
+ *                     }
+ *
+ * Changes messages from: 
+ * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ * 
+ * to:
+ * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ * 
+ * 
+ * By means of my custom converter we end up having:
+ * 
+ * contentType "application/json"{"name":"example message","description":"this is some description"}
+ * 
+ * About "application/json" and character encoding:
+ * https://tools.ietf.org/html/rfc7158#section-8.1   "The default encoding is UTF-8"  :)
+ * 
+ * 
+ * 
+ */
+
+
+// 
+// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload
+//                                                          org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary
+//                               org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal                              
+// 
+// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in
+// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on
+// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is
+// always using UTF-8. It does not matter what character set I am receiving :/
+// 
+// 
+// 
+// 
+// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems
+// not using Spring (Spring is creating a mess with the headers...)
+// 
+//
+
+
+
+// 
+//  Kafka messages WITHOUT MyCustomMessageConverter:
+//  headers:
+//     contentType "text/plain"   <--------------- perhaps this is important.
+//     originalContentType "application/json;charset=UTF-8"
+//  payload:
+//     {"name":"example message","description":"this is some description"}
+//
+//
+//  Kafka messages WITH MyCustomMessageConverter:
+//  headers:
+//     contentType "application/json"
+//  payload:
+//     {"name":"example message","description":"this is some description"}
+// 
+
+
+// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :(
+public class MyCustomMessageConverter extends MappingJackson2MessageConverter {
+
+  public MyCustomMessageConverter() {
+               super(new MimeType("application", "json"));
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java
new file mode 100644 (file)
index 0000000..6053200
--- /dev/null
@@ -0,0 +1,26 @@
+package de.example.spring.kafka;
+
+public class Product {
+       private String name;
+       private String description;
+       
+       /**
+        * Required for deserialization from Jackson.
+        */
+       protected Product() {
+
+       }
+
+       public Product(String name, String description) {
+               this.name = name;
+               this.description = description;
+       }
+
+       public String getName() {
+               return this.name;
+       }
+       
+       public String getDescription() {
+               return this.description;
+       }
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml
new file mode 100644 (file)
index 0000000..dd8f856
--- /dev/null
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>de.example.spring.kafka</groupId>
+        <artifactId>spring-stream-kafka-bom</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+       <artifactId>spring-stream-kafka-consumer</artifactId>
+       <name>spring-stream-kafka-consumer</name>
+       <url>https://gumartinm.name</url>
+       <description>Spring Cloud Stream Kafka Consumer</description>
+       <organization>
+           <name>gumartinm</name>
+           <url>https://gumartinm.name</url>
+       </organization>
+    <scm>
+        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+        <url>https://git.gumartinm.name/JavaForFun</url>
+    </scm>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>javax.inject</groupId>
+            <artifactId>javax.inject</artifactId>
+            <version>1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.example.spring.kafka</groupId>
+            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+             </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java
new file mode 100644 (file)
index 0000000..c367219
--- /dev/null
@@ -0,0 +1,23 @@
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+  public static void main(String[] args) {
+    SpringApplication.run(Application.class);
+  }
+
+
+  @Bean
+  CommandLineRunner lookup(Receiver receiver) {
+    return args -> {
+
+    };
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java
new file mode 100644 (file)
index 0000000..70f7a62
--- /dev/null
@@ -0,0 +1,16 @@
+package de.example.spring.kafka;
+
+import javax.inject.Named;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Named
+public class DummyService {
+       private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class);
+
+
+       public void iAmVeryDummy(String message) {
+               LOGGER.info("I am a dummy service: '{}'", message);
+       }
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java
new file mode 100644 (file)
index 0000000..7681ab5
--- /dev/null
@@ -0,0 +1,30 @@
+package de.example.spring.kafka;
+
+import javax.inject.Inject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import org.springframework.cloud.stream.messaging.Sink;
+
+@EnableBinding(Sink.class)
+public class Receiver {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
+  
+  private final DummyService dummyService;
+
+  @Inject
+  public Receiver(DummyService dummyService) {
+       this.dummyService = dummyService;
+  }
+
+  @StreamListener(Sink.INPUT)
+  public void handle(Product product) {
+    LOGGER.info("product name='{}'", product.getName());
+    LOGGER.info("product description='{}'", product.getDescription());
+    
+    dummyService.iAmVeryDummy(product.getName());
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java
new file mode 100644 (file)
index 0000000..f635538
--- /dev/null
@@ -0,0 +1,33 @@
+package de.example.spring.kafka;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ReceiverConfig {
+
+  @Bean
+  public Receiver receiver(DummyService dummyService) {
+    return new Receiver(dummyService);
+  }
+
+
+//  @Bean
+//  public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+//       MyCustomMessageConverter converter = new MyCustomMessageConverter();
+//       converter.setSerializedPayloadClass(String.class);
+//       if (objectMapper != null) {
+//               converter.setObjectMapper(objectMapper);
+//       }
+//       
+//       return converter;
+//  }
+
+
+//  @Bean
+//  public MessageConverter avroMessageConverter() throws IOException {
+//      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+//      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+//      return converter;
+//  }
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..f4f8991
--- /dev/null
@@ -0,0 +1,47 @@
+logging:
+  level:
+    org:
+      springframework: info
+    ROOT: info
+
+kafka:
+  topic: test
+  messageKey: keyTest
+
+spring:
+  main:
+    banner-mode: "off"
+    # We do not need Tomcat running (this application runs from console)
+    web-environment: false
+  cloud:
+    stream:
+      bindings:
+        input:
+          binder: kafka
+          destination: test
+          group: helloworld
+          contentType: application/json
+          startOffset: latest
+          consumer:
+            concurrency: 1
+            partitioned: false
+            # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+            headerMode: embeddedHeaders
+            maxAttempts: 3
+            backOffInitialInterval: 1000
+      instanceCount: 1
+      instanceIndex: 0
+      kafka:
+        binder:
+          zkNodes: "kafka:2181"
+          brokers: "kafka:9092,kafka:9093,kafka:9094"
+          autoCreateTopics: true
+
+# Health indicator for binders.
+management:
+  health:
+    binders:
+      enabled: true
+
+# ME QUEDA 7.3 Partitioning
+# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java
new file mode 100644 (file)
index 0000000..6291ea4
--- /dev/null
@@ -0,0 +1,45 @@
+package de.example.spring.kafka;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doNothing;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.cloud.stream.messaging.Sink;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = { Receiver.class })
+@DirtiesContext
+public class ReceiverShould {
+
+       @Inject
+       Sink source;
+       
+       @MockBean
+       DummyService dummyService;
+       
+       @Test
+       public void callSomeDummy() {
+               String productName = "product";
+               String productDescription = "productDescription";
+               Product product = new Product(productName, productDescription);
+           ArgumentCaptor<String> dummyArgCaptor = ArgumentCaptor.forClass(String.class);
+               doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture());
+               
+           Message<Product> message = new GenericMessage<>(product);
+           source.input().send(message);
+
+           assertThat(dummyArgCaptor.getValue(), is(product.getName()));
+       }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml
new file mode 100644 (file)
index 0000000..da3f81e
--- /dev/null
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>de.example.spring.kafka</groupId>
+        <artifactId>spring-stream-kafka-bom</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+       <artifactId>spring-stream-kafka-producer</artifactId>
+       <name>spring-stream-kafka-producer</name>
+       <url>https://gumartinm.name</url>
+       <description>Spring Cloud Stream Kafka Producer</description>
+       <organization>
+           <name>gumartinm</name>
+           <url>https://gumartinm.name</url>
+       </organization>
+    <scm>
+        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+        <url>https://git.gumartinm.name/JavaForFun</url>
+    </scm>
+
+       <dependencies>
+        <dependency>
+            <groupId>javax.inject</groupId>
+            <artifactId>javax.inject</artifactId>
+            <version>1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.example.spring.kafka</groupId>
+            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+               <plugin>
+               <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+             </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java
new file mode 100644 (file)
index 0000000..1034a98
--- /dev/null
@@ -0,0 +1,29 @@
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+  public static void main(String[] args) {
+    SpringApplication.run(Application.class);
+  }
+
+  @Bean
+  CommandLineRunner lookup(Sender sender) {
+    return args -> {
+      String message = "example message";
+
+
+      if (args.length > 0) {
+        message = args[0];
+      }
+
+      sender.sendMessage(message);
+    };
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java
new file mode 100644 (file)
index 0000000..ab612ec
--- /dev/null
@@ -0,0 +1,25 @@
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+/**
+ * En lugar de esta definicion "custom" usaremos las dos que
+ * vienen por defecto en:
+ * org.springframework.cloud.stream.messaging.Sink
+ * org.springframework.cloud.stream.messaging.Source
+ *
+ * Esta definicion custom se usaría igual que Sink y Source :)
+ * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
+ */
+public interface InputOutputChannels {
+
+  @Input("inputChannel")
+  SubscribableChannel input();
+
+  @Output("outputChannel")
+  MessageChannel output();
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java
new file mode 100644 (file)
index 0000000..1a778ca
--- /dev/null
@@ -0,0 +1,23 @@
+package de.example.spring.kafka;
+
+import javax.inject.Inject;
+
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.messaging.support.MessageBuilder;
+
+@EnableBinding(Source.class)
+public class Sender {
+  // You could use here your custom interface. See: InputOutputChannels :)
+  private final Source source;
+
+  @Inject
+  public Sender(Source source) {
+    this.source = source;
+  }
+
+  public void sendMessage(String message) {
+         Product product = new Product(message, "this is some description");
+         source.output().send(MessageBuilder.withPayload(product).build());
+  }
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java
new file mode 100644 (file)
index 0000000..e64a561
--- /dev/null
@@ -0,0 +1,32 @@
+package de.example.spring.kafka;
+
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SenderConfig {
+
+  @Bean
+  public Sender sender(Source source) {
+    return new Sender(source);
+  }
+  
+//  @Bean
+//  public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+//       MyCustomMessageConverter converter = new MyCustomMessageConverter();
+//       converter.setSerializedPayloadClass(String.class);
+//       if (objectMapper != null) {
+//               converter.setObjectMapper(objectMapper);
+//       }
+//       
+//       return converter;
+//  }
+
+//  @Bean
+//  public MessageConverter avroMessageConverter() throws IOException {
+//      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+//      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+//      return converter;
+//  }
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..c2546c4
--- /dev/null
@@ -0,0 +1,37 @@
+logging:
+  level:
+    org:
+      springframework: info
+    ROOT: info
+
+spring:
+  main:
+    banner-mode: "off"
+    # We do not need Tomcat running (this application runs from console)
+    web-environment: false
+  cloud:
+    stream:
+      bindings:
+        output:
+          binder: kafka
+          destination: test
+          contentType: application/json
+          # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+          headerMode: embeddedHeaders
+      instanceCount: 1
+      instanceIndex: 0
+      kafka:
+        binder:
+          zkNodes: "kafka:2181"
+          brokers: "kafka:9092,kafka:9093,kafka:9094"
+          autoCreateTopics: true
+
+# Health indicator for binders.
+management:
+  health:
+    binders:
+      enabled: true
+
+
+# ME QUEDA 7.3 Partitioning
+# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java
new file mode 100644 (file)
index 0000000..71fbfec
--- /dev/null
@@ -0,0 +1,42 @@
+package de.example.spring.kafka;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import javax.inject.Inject;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.messaging.Message;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = { Sender.class })
+@DirtiesContext
+public class SenderShould {
+
+       @Inject
+       Source source;
+       
+       @Inject
+       Sender sender;
+       
+       @Inject
+       private MessageCollector messageCollector;
+       
+       @Test
+       public void sendSomeProduct() {
+               Product product = new Product("hello", "this is some description");
+               
+               sender.sendMessage("hello");
+               
+               Message<Product> received = (Message<Product>) messageCollector.forChannel(source.output()).poll();
+               
+           assertThat(received.getPayload().getDescription(), is(product.getDescription()));
+       }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml
new file mode 100644 (file)
index 0000000..3239586
--- /dev/null
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>de.example.spring.kafka</groupId>
+        <artifactId>spring-stream-kafka-bom</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+       <artifactId>spring-stream-kafka-registry</artifactId>
+       <name>spring-stream-kafka-registry</name>
+       <url>https://gumartinm.name</url>
+       <description>Spring Cloud Stream Kafka Registry</description>
+       <organization>
+           <name>gumartinm</name>
+           <url>https://gumartinm.name</url>
+       </organization>
+    <scm>
+        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+        <url>https://git.gumartinm.name/JavaForFun</url>
+    </scm>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>javax.inject</groupId>
+            <artifactId>javax.inject</artifactId>
+            <version>1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.example.spring.kafka</groupId>
+            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+
+        
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-stream-server</artifactId>
+        </dependency>
+       
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+             </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java
new file mode 100644 (file)
index 0000000..9817e62
--- /dev/null
@@ -0,0 +1,17 @@
+package de.example.spring.kafka;
+
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer;
+
+@SpringBootApplication
+@EnableSchemaRegistryServer
+public class Application {
+
+  public static void main(String[] args) {
+    SpringApplication.run(Application.class);
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..bebb6c8
--- /dev/null
@@ -0,0 +1,14 @@
+logging:
+  level:
+    org:
+      springframework: info
+    ROOT: info
+
+server:
+  port: 8888
+spring:
+  cloud:
+    stream:
+      schema:
+        server:
+          path: schema