From 07328d85fc5ce9fb29fd7eb499559ac8b8bef912 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Tue, 31 Jan 2017 01:52:29 +0100 Subject: [PATCH] Renaming modules in SpringCloudStream --- SpringJava/Kafka/SpringCloudStream/pom.xml | 12 +- .../pom.xml | 27 --- .../spring/kafka/MyCustomMessageConverter.java | 78 -------- .../main/java/de/example/spring/kafka/Product.java | 26 --- .../pom.xml | 51 ----- .../java/de/example/spring/kafka/Application.java | 23 --- .../java/de/example/spring/kafka/DummyService.java | 16 -- .../java/de/example/spring/kafka/Receiver.java | 30 --- .../de/example/spring/kafka/ReceiverConfig.java | 33 ---- .../src/main/resources/application.yml | 47 ----- .../de/example/spring/kafka/ReceiverShould.java | 45 ----- .../spring-cloud-stream-kafka-example-bom/pom.xml | 207 --------------------- .../pom.xml | 50 ----- .../java/de/example/spring/kafka/Application.java | 29 --- .../example/spring/kafka/InputOutputChannels.java | 25 --- .../main/java/de/example/spring/kafka/Sender.java | 23 --- .../java/de/example/spring/kafka/SenderConfig.java | 32 ---- .../src/main/resources/application.yml | 37 ---- .../java/de/example/spring/kafka/SenderShould.java | 42 ----- .../pom.xml | 58 ------ .../java/de/example/spring/kafka/Application.java | 17 -- .../src/main/resources/application.yml | 14 -- .../spring-stream-kafka-bom/pom.xml | 207 +++++++++++++++++++++ .../spring-stream-kafka-common/pom.xml | 27 +++ .../spring/kafka/MyCustomMessageConverter.java | 78 ++++++++ .../main/java/de/example/spring/kafka/Product.java | 26 +++ .../spring-stream-kafka-consumer/pom.xml | 51 +++++ .../java/de/example/spring/kafka/Application.java | 23 +++ .../java/de/example/spring/kafka/DummyService.java | 16 ++ .../java/de/example/spring/kafka/Receiver.java | 30 +++ .../de/example/spring/kafka/ReceiverConfig.java | 33 ++++ .../src/main/resources/application.yml | 47 +++++ .../de/example/spring/kafka/ReceiverShould.java | 45 +++++ .../spring-stream-kafka-producer/pom.xml | 50 +++++ .../java/de/example/spring/kafka/Application.java | 29 +++ .../example/spring/kafka/InputOutputChannels.java | 25 +++ .../main/java/de/example/spring/kafka/Sender.java | 23 +++ .../java/de/example/spring/kafka/SenderConfig.java | 32 ++++ .../src/main/resources/application.yml | 37 ++++ .../java/de/example/spring/kafka/SenderShould.java | 42 +++++ .../spring-stream-kafka-registry/pom.xml | 58 ++++++ .../java/de/example/spring/kafka/Application.java | 17 ++ .../src/main/resources/application.yml | 14 ++ 43 files changed, 916 insertions(+), 916 deletions(-) delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java delete mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml diff --git a/SpringJava/Kafka/SpringCloudStream/pom.xml b/SpringJava/Kafka/SpringCloudStream/pom.xml index 712b080..2b4a46a 100644 --- a/SpringJava/Kafka/SpringCloudStream/pom.xml +++ b/SpringJava/Kafka/SpringCloudStream/pom.xml @@ -2,10 +2,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 de.example.spring.kafka - spring-cloud-stream-kafka-example + spring-stream-kafka-example 1.0-SNAPSHOT pom - spring-cloud-stream-kafka-example + spring-stream-kafka-example https://gumartinm.name/ Spring Cloud Stream Kafka @@ -18,10 +18,10 @@ - spring-cloud-stream-kafka-example-bom - spring-cloud-stream-kafka-common-example - spring-cloud-stream-kafka-consumer-example - spring-cloud-stream-kafka-producer-example + spring-stream-kafka-bom + spring-stream-kafka-common + spring-stream-kafka-consumer + spring-stream-kafka-producer diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml deleted file mode 100644 index c7d4f82..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml +++ /dev/null @@ -1,27 +0,0 @@ - - - - 4.0.0 - - - de.example.spring.kafka - spring-cloud-stream-kafka-example-bom - 1.0-SNAPSHOT - - - spring-cloud-stream-kafka-common-example - spring-cloud-stream-kafka-common-example - https://gumartinm.name - Spring Cloud Stream Kafka Commons - - gumartinm - https://gumartinm.name - - - scm:git:https://git.gumartinm.name/JavaForFun - https://git.gumartinm.name/JavaForFun - - - diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java deleted file mode 100644 index 529bcb0..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java +++ /dev/null @@ -1,78 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.messaging.converter.MappingJackson2MessageConverter; -import org.springframework.util.MimeType; - -/** - * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject() - * - * This code: - * if (payload instanceof String) { - * return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON - * : MimeTypeUtils.TEXT_PLAIN; - * } - * - * Changes messages from: - * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} - * - * to: - * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} - * - * - * By means of my custom converter we end up having: - * - * contentType "application/json"{"name":"example message","description":"this is some description"} - * - * About "application/json" and character encoding: - * https://tools.ietf.org/html/rfc7158#section-8.1 "The default encoding is UTF-8" :) - * - * - * - */ - - -// -// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload -// org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary -// org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal -// -// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in -// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on -// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is -// always using UTF-8. It does not matter what character set I am receiving :/ -// -// -// -// -// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems -// not using Spring (Spring is creating a mess with the headers...) -// -// - - - -// -// Kafka messages WITHOUT MyCustomMessageConverter: -// headers: -// contentType "text/plain" <--------------- perhaps this is important. -// originalContentType "application/json;charset=UTF-8" -// payload: -// {"name":"example message","description":"this is some description"} -// -// -// Kafka messages WITH MyCustomMessageConverter: -// headers: -// contentType "application/json" -// payload: -// {"name":"example message","description":"this is some description"} -// - - -// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :( -public class MyCustomMessageConverter extends MappingJackson2MessageConverter { - - public MyCustomMessageConverter() { - super(new MimeType("application", "json")); - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java deleted file mode 100644 index 6053200..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java +++ /dev/null @@ -1,26 +0,0 @@ -package de.example.spring.kafka; - -public class Product { - private String name; - private String description; - - /** - * Required for deserialization from Jackson. - */ - protected Product() { - - } - - public Product(String name, String description) { - this.name = name; - this.description = description; - } - - public String getName() { - return this.name; - } - - public String getDescription() { - return this.description; - } -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml deleted file mode 100644 index e8cfa41..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml +++ /dev/null @@ -1,51 +0,0 @@ - - - - 4.0.0 - - - de.example.spring.kafka - spring-cloud-stream-kafka-example-bom - 1.0-SNAPSHOT - - - spring-cloud-stream-kafka-consumer-example - spring-cloud-stream-kafka-consumer-example - https://gumartinm.name - Spring Cloud Stream Kafka Consumer - - gumartinm - https://gumartinm.name - - - scm:git:https://git.gumartinm.name/JavaForFun - https://git.gumartinm.name/JavaForFun - - - - - - javax.inject - javax.inject - 1 - - - - de.example.spring.kafka - spring-cloud-stream-kafka-common-example - 1.0-SNAPSHOT - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - - diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java deleted file mode 100644 index c367219..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java +++ /dev/null @@ -1,23 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; - -@SpringBootApplication -public class Application { - - public static void main(String[] args) { - SpringApplication.run(Application.class); - } - - - @Bean - CommandLineRunner lookup(Receiver receiver) { - return args -> { - - }; - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java deleted file mode 100644 index 70f7a62..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/DummyService.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.example.spring.kafka; - -import javax.inject.Named; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Named -public class DummyService { - private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class); - - - public void iAmVeryDummy(String message) { - LOGGER.info("I am a dummy service: '{}'", message); - } -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java deleted file mode 100644 index 7681ab5..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.example.spring.kafka; - -import javax.inject.Inject; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; - -@EnableBinding(Sink.class) -public class Receiver { - private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); - - private final DummyService dummyService; - - @Inject - public Receiver(DummyService dummyService) { - this.dummyService = dummyService; - } - - @StreamListener(Sink.INPUT) - public void handle(Product product) { - LOGGER.info("product name='{}'", product.getName()); - LOGGER.info("product description='{}'", product.getDescription()); - - dummyService.iAmVeryDummy(product.getName()); - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java deleted file mode 100644 index f635538..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java +++ /dev/null @@ -1,33 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class ReceiverConfig { - - @Bean - public Receiver receiver(DummyService dummyService) { - return new Receiver(dummyService); - } - - -// @Bean -// public MessageConverter customMessageConverter(ObjectMapper objectMapper) { -// MyCustomMessageConverter converter = new MyCustomMessageConverter(); -// converter.setSerializedPayloadClass(String.class); -// if (objectMapper != null) { -// converter.setObjectMapper(objectMapper); -// } -// -// return converter; -// } - - -// @Bean -// public MessageConverter avroMessageConverter() throws IOException { -// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); -// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); -// return converter; -// } -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml deleted file mode 100644 index f4f8991..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml +++ /dev/null @@ -1,47 +0,0 @@ -logging: - level: - org: - springframework: info - ROOT: info - -kafka: - topic: test - messageKey: keyTest - -spring: - main: - banner-mode: "off" - # We do not need Tomcat running (this application runs from console) - web-environment: false - cloud: - stream: - bindings: - input: - binder: kafka - destination: test - group: helloworld - contentType: application/json - startOffset: latest - consumer: - concurrency: 1 - partitioned: false - # Consumer (input) and producer (output) are Spring Cloud Stream applications :) - headerMode: embeddedHeaders - maxAttempts: 3 - backOffInitialInterval: 1000 - instanceCount: 1 - instanceIndex: 0 - kafka: - binder: - zkNodes: "kafka:2181" - brokers: "kafka:9092,kafka:9093,kafka:9094" - autoCreateTopics: true - -# Health indicator for binders. -management: - health: - binders: - enabled: true - -# ME QUEDA 7.3 Partitioning -# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java deleted file mode 100644 index 6291ea4..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/test/java/de/example/spring/kafka/ReceiverShould.java +++ /dev/null @@ -1,45 +0,0 @@ -package de.example.spring.kafka; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doNothing; - -import javax.inject.Inject; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = { Receiver.class }) -@DirtiesContext -public class ReceiverShould { - - @Inject - Sink source; - - @MockBean - DummyService dummyService; - - @Test - public void callSomeDummy() { - String productName = "product"; - String productDescription = "productDescription"; - Product product = new Product(productName, productDescription); - ArgumentCaptor dummyArgCaptor = ArgumentCaptor.forClass(String.class); - doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture()); - - Message message = new GenericMessage<>(product); - source.input().send(message); - - assertThat(dummyArgCaptor.getValue(), is(product.getName())); - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml deleted file mode 100644 index 3d5df09..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml +++ /dev/null @@ -1,207 +0,0 @@ - - - - 4.0.0 - - - org.springframework.cloud - spring-cloud-starter-parent - Camden.SR4 - - - de.example.spring.kafka - spring-cloud-stream-kafka-example-bom - 1.0-SNAPSHOT - pom - spring-cloud-stream-kafka-example-bom - - https://gumartinm.name - Spring Cloud Stream Kafka BOM example - - gumartinm - https://gumartinm.name - - - scm:git:https://git.gumartinm.name/JavaForFun - https://git.gumartinm.name/JavaForFun - - - false - false - UTF-8 - - - - - org.springframework.cloud - spring-cloud-starter-stream-kafka - 1.1.1.RELEASE - - - - - org.springframework.kafka - spring-kafka - 1.1.2.RELEASE - - - org.springframework.integration - spring-integration-kafka - 2.1.0.RELEASE - - - org.apache.kafka - kafka_2.11 - 0.10.1.1 - - - org.slf4j - slf4j-log4j12 - - - - - - - org.springframework.cloud - spring-cloud-stream-schema - 1.1.1.RELEASE - - - org.apache.avro - avro - 1.8.1 - - - - - org.springframework.boot - spring-boot-starter-test - test - - - org.springframework.cloud - spring-cloud-stream-test-support - test - - - - - - - - - org.springframework.cloud - spring-cloud-stream-server - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - ${project.build.sourceEncoding} - - - - org.apache.maven.plugins - maven-resources-plugin - - ${project.build.sourceEncoding} - - - - org.apache.maven.plugins - maven-jar-plugin - - - - ${project.description} - ${project.version} - ${project.organization.name} - ${project.description} - ${project.version} - ${project.organization.name} - - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - org.apache.maven.surefire - surefire-junit47 - 2.18.1 - - - - - ${skip.unit.tests} - - - **/*IT.java - **/*IntegrationTest.java - - - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - - integration-test - verify - - - - ${skip.integration.tests} - - **/*IT.java - **/*IntegrationTest.java - - - - - - - - - - - - org.springframework.boot - spring-boot-maven-plugin - 1.4.4.RELEASE - - - - - diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml deleted file mode 100644 index 3335647..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml +++ /dev/null @@ -1,50 +0,0 @@ - - - - 4.0.0 - - - de.example.spring.kafka - spring-cloud-stream-kafka-example-bom - 1.0-SNAPSHOT - - - spring-cloud-stream-kafka-producer-example - spring-cloud-stream-kafka-producer-example - https://gumartinm.name - Spring Cloud Stream Kafka Producer - - gumartinm - https://gumartinm.name - - - scm:git:https://git.gumartinm.name/JavaForFun - https://git.gumartinm.name/JavaForFun - - - - - javax.inject - javax.inject - 1 - - - - de.example.spring.kafka - spring-cloud-stream-kafka-common-example - 1.0-SNAPSHOT - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - - diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java deleted file mode 100644 index 1034a98..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; - -@SpringBootApplication -public class Application { - - public static void main(String[] args) { - SpringApplication.run(Application.class); - } - - @Bean - CommandLineRunner lookup(Sender sender) { - return args -> { - String message = "example message"; - - - if (args.length > 0) { - message = args[0]; - } - - sender.sendMessage(message); - }; - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java deleted file mode 100644 index ab612ec..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java +++ /dev/null @@ -1,25 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.Output; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; - -/** - * En lugar de esta definicion "custom" usaremos las dos que - * vienen por defecto en: - * org.springframework.cloud.stream.messaging.Sink - * org.springframework.cloud.stream.messaging.Source - * - * Esta definicion custom se usaría igual que Sink y Source :) - * Donde veas Sink y Source podrías haber puesto esto en su lugar ;) - */ -public interface InputOutputChannels { - - @Input("inputChannel") - SubscribableChannel input(); - - @Output("outputChannel") - MessageChannel output(); - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java deleted file mode 100644 index 1a778ca..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java +++ /dev/null @@ -1,23 +0,0 @@ -package de.example.spring.kafka; - -import javax.inject.Inject; - -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.messaging.support.MessageBuilder; - -@EnableBinding(Source.class) -public class Sender { - // You could use here your custom interface. See: InputOutputChannels :) - private final Source source; - - @Inject - public Sender(Source source) { - this.source = source; - } - - public void sendMessage(String message) { - Product product = new Product(message, "this is some description"); - source.output().send(MessageBuilder.withPayload(product).build()); - } -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java deleted file mode 100644 index e64a561..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java +++ /dev/null @@ -1,32 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class SenderConfig { - - @Bean - public Sender sender(Source source) { - return new Sender(source); - } - -// @Bean -// public MessageConverter customMessageConverter(ObjectMapper objectMapper) { -// MyCustomMessageConverter converter = new MyCustomMessageConverter(); -// converter.setSerializedPayloadClass(String.class); -// if (objectMapper != null) { -// converter.setObjectMapper(objectMapper); -// } -// -// return converter; -// } - -// @Bean -// public MessageConverter avroMessageConverter() throws IOException { -// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); -// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); -// return converter; -// } -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml deleted file mode 100644 index c2546c4..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml +++ /dev/null @@ -1,37 +0,0 @@ -logging: - level: - org: - springframework: info - ROOT: info - -spring: - main: - banner-mode: "off" - # We do not need Tomcat running (this application runs from console) - web-environment: false - cloud: - stream: - bindings: - output: - binder: kafka - destination: test - contentType: application/json - # Consumer (input) and producer (output) are Spring Cloud Stream applications :) - headerMode: embeddedHeaders - instanceCount: 1 - instanceIndex: 0 - kafka: - binder: - zkNodes: "kafka:2181" - brokers: "kafka:9092,kafka:9093,kafka:9094" - autoCreateTopics: true - -# Health indicator for binders. -management: - health: - binders: - enabled: true - - -# ME QUEDA 7.3 Partitioning -# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java deleted file mode 100644 index 71fbfec..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/test/java/de/example/spring/kafka/SenderShould.java +++ /dev/null @@ -1,42 +0,0 @@ -package de.example.spring.kafka; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - -import javax.inject.Inject; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.cloud.stream.test.binder.MessageCollector; -import org.springframework.messaging.Message; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = { Sender.class }) -@DirtiesContext -public class SenderShould { - - @Inject - Source source; - - @Inject - Sender sender; - - @Inject - private MessageCollector messageCollector; - - @Test - public void sendSomeProduct() { - Product product = new Product("hello", "this is some description"); - - sender.sendMessage("hello"); - - Message received = (Message) messageCollector.forChannel(source.output()).poll(); - - assertThat(received.getPayload().getDescription(), is(product.getDescription())); - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml deleted file mode 100644 index e530f21..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/pom.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - 4.0.0 - - - de.example.spring.kafka - spring-cloud-stream-kafka-example-bom - 1.0-SNAPSHOT - - - spring-cloud-stream-kafka-registry-example - spring-cloud-stream-kafka-registry-example - https://gumartinm.name - Spring Cloud Stream Kafka Registry - - gumartinm - https://gumartinm.name - - - scm:git:https://git.gumartinm.name/JavaForFun - https://git.gumartinm.name/JavaForFun - - - - - - javax.inject - javax.inject - 1 - - - - de.example.spring.kafka - spring-cloud-stream-kafka-common-example - 1.0-SNAPSHOT - - - - - org.springframework.cloud - spring-cloud-stream-server - - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - - diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java deleted file mode 100644 index 9817e62..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/java/de/example/spring/kafka/Application.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer; - -@SpringBootApplication -@EnableSchemaRegistryServer -public class Application { - - public static void main(String[] args) { - SpringApplication.run(Application.class); - } - -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml deleted file mode 100644 index bebb6c8..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-registry-example/src/main/resources/application.yml +++ /dev/null @@ -1,14 +0,0 @@ -logging: - level: - org: - springframework: info - ROOT: info - -server: - port: 8888 -spring: - cloud: - stream: - schema: - server: - path: schema diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml new file mode 100644 index 0000000..319e2ad --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-bom/pom.xml @@ -0,0 +1,207 @@ + + + + 4.0.0 + + + org.springframework.cloud + spring-cloud-starter-parent + Camden.SR4 + + + de.example.spring.kafka + spring-stream-kafka-bom + 1.0-SNAPSHOT + pom + spring-stream-kafka-bom + + https://gumartinm.name + Spring Cloud Stream Kafka BOM example + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + false + false + UTF-8 + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + 1.1.1.RELEASE + + + + + org.springframework.kafka + spring-kafka + 1.1.2.RELEASE + + + org.springframework.integration + spring-integration-kafka + 2.1.0.RELEASE + + + org.apache.kafka + kafka_2.11 + 0.10.1.1 + + + org.slf4j + slf4j-log4j12 + + + + + + + org.springframework.cloud + spring-cloud-stream-schema + 1.1.1.RELEASE + + + org.apache.avro + avro + 1.8.1 + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-stream-test-support + test + + + + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-resources-plugin + + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-jar-plugin + + + + ${project.description} + ${project.version} + ${project.organization.name} + ${project.description} + ${project.version} + ${project.organization.name} + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.maven.surefire + surefire-junit47 + 2.18.1 + + + + + ${skip.unit.tests} + + + **/*IT.java + **/*IntegrationTest.java + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + integration-test + verify + + + + ${skip.integration.tests} + + **/*IT.java + **/*IntegrationTest.java + + + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 1.4.4.RELEASE + + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml new file mode 100644 index 0000000..068ada0 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/pom.xml @@ -0,0 +1,27 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-stream-kafka-bom + 1.0-SNAPSHOT + + + spring-stream-kafka-common + spring-stream-kafka-common + https://gumartinm.name + Spring Cloud Stream Kafka Commons + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java new file mode 100644 index 0000000..529bcb0 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java @@ -0,0 +1,78 @@ +package de.example.spring.kafka; + +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.util.MimeType; + +/** + * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject() + * + * This code: + * if (payload instanceof String) { + * return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON + * : MimeTypeUtils.TEXT_PLAIN; + * } + * + * Changes messages from: + * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} + * + * to: + * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} + * + * + * By means of my custom converter we end up having: + * + * contentType "application/json"{"name":"example message","description":"this is some description"} + * + * About "application/json" and character encoding: + * https://tools.ietf.org/html/rfc7158#section-8.1 "The default encoding is UTF-8" :) + * + * + * + */ + + +// +// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload +// org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary +// org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal +// +// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in +// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on +// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is +// always using UTF-8. It does not matter what character set I am receiving :/ +// +// +// +// +// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems +// not using Spring (Spring is creating a mess with the headers...) +// +// + + + +// +// Kafka messages WITHOUT MyCustomMessageConverter: +// headers: +// contentType "text/plain" <--------------- perhaps this is important. +// originalContentType "application/json;charset=UTF-8" +// payload: +// {"name":"example message","description":"this is some description"} +// +// +// Kafka messages WITH MyCustomMessageConverter: +// headers: +// contentType "application/json" +// payload: +// {"name":"example message","description":"this is some description"} +// + + +// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :( +public class MyCustomMessageConverter extends MappingJackson2MessageConverter { + + public MyCustomMessageConverter() { + super(new MimeType("application", "json")); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java new file mode 100644 index 0000000..6053200 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-common/src/main/java/de/example/spring/kafka/Product.java @@ -0,0 +1,26 @@ +package de.example.spring.kafka; + +public class Product { + private String name; + private String description; + + /** + * Required for deserialization from Jackson. + */ + protected Product() { + + } + + public Product(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return this.description; + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml new file mode 100644 index 0000000..dd8f856 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-stream-kafka-bom + 1.0-SNAPSHOT + + + spring-stream-kafka-consumer + spring-stream-kafka-consumer + https://gumartinm.name + Spring Cloud Stream Kafka Consumer + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + + + + javax.inject + javax.inject + 1 + + + + de.example.spring.kafka + spring-cloud-stream-kafka-common-example + 1.0-SNAPSHOT + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..c367219 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,23 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + + @Bean + CommandLineRunner lookup(Receiver receiver) { + return args -> { + + }; + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java new file mode 100644 index 0000000..70f7a62 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/DummyService.java @@ -0,0 +1,16 @@ +package de.example.spring.kafka; + +import javax.inject.Named; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Named +public class DummyService { + private static final Logger LOGGER = LoggerFactory.getLogger(DummyService.class); + + + public void iAmVeryDummy(String message) { + LOGGER.info("I am a dummy service: '{}'", message); + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java new file mode 100644 index 0000000..7681ab5 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/Receiver.java @@ -0,0 +1,30 @@ +package de.example.spring.kafka; + +import javax.inject.Inject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; + +@EnableBinding(Sink.class) +public class Receiver { + private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); + + private final DummyService dummyService; + + @Inject + public Receiver(DummyService dummyService) { + this.dummyService = dummyService; + } + + @StreamListener(Sink.INPUT) + public void handle(Product product) { + LOGGER.info("product name='{}'", product.getName()); + LOGGER.info("product description='{}'", product.getDescription()); + + dummyService.iAmVeryDummy(product.getName()); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java new file mode 100644 index 0000000..f635538 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -0,0 +1,33 @@ +package de.example.spring.kafka; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ReceiverConfig { + + @Bean + public Receiver receiver(DummyService dummyService) { + return new Receiver(dummyService); + } + + +// @Bean +// public MessageConverter customMessageConverter(ObjectMapper objectMapper) { +// MyCustomMessageConverter converter = new MyCustomMessageConverter(); +// converter.setSerializedPayloadClass(String.class); +// if (objectMapper != null) { +// converter.setObjectMapper(objectMapper); +// } +// +// return converter; +// } + + +// @Bean +// public MessageConverter avroMessageConverter() throws IOException { +// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); +// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); +// return converter; +// } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml new file mode 100644 index 0000000..f4f8991 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml @@ -0,0 +1,47 @@ +logging: + level: + org: + springframework: info + ROOT: info + +kafka: + topic: test + messageKey: keyTest + +spring: + main: + banner-mode: "off" + # We do not need Tomcat running (this application runs from console) + web-environment: false + cloud: + stream: + bindings: + input: + binder: kafka + destination: test + group: helloworld + contentType: application/json + startOffset: latest + consumer: + concurrency: 1 + partitioned: false + # Consumer (input) and producer (output) are Spring Cloud Stream applications :) + headerMode: embeddedHeaders + maxAttempts: 3 + backOffInitialInterval: 1000 + instanceCount: 1 + instanceIndex: 0 + kafka: + binder: + zkNodes: "kafka:2181" + brokers: "kafka:9092,kafka:9093,kafka:9094" + autoCreateTopics: true + +# Health indicator for binders. +management: + health: + binders: + enabled: true + +# ME QUEDA 7.3 Partitioning +# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java new file mode 100644 index 0000000..6291ea4 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/test/java/de/example/spring/kafka/ReceiverShould.java @@ -0,0 +1,45 @@ +package de.example.spring.kafka; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doNothing; + +import javax.inject.Inject; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = { Receiver.class }) +@DirtiesContext +public class ReceiverShould { + + @Inject + Sink source; + + @MockBean + DummyService dummyService; + + @Test + public void callSomeDummy() { + String productName = "product"; + String productDescription = "productDescription"; + Product product = new Product(productName, productDescription); + ArgumentCaptor dummyArgCaptor = ArgumentCaptor.forClass(String.class); + doNothing().when(dummyService).iAmVeryDummy(dummyArgCaptor.capture()); + + Message message = new GenericMessage<>(product); + source.input().send(message); + + assertThat(dummyArgCaptor.getValue(), is(product.getName())); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml new file mode 100644 index 0000000..da3f81e --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-stream-kafka-bom + 1.0-SNAPSHOT + + + spring-stream-kafka-producer + spring-stream-kafka-producer + https://gumartinm.name + Spring Cloud Stream Kafka Producer + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + + + javax.inject + javax.inject + 1 + + + + de.example.spring.kafka + spring-cloud-stream-kafka-common-example + 1.0-SNAPSHOT + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..1034a98 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,29 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + @Bean + CommandLineRunner lookup(Sender sender) { + return args -> { + String message = "example message"; + + + if (args.length > 0) { + message = args[0]; + } + + sender.sendMessage(message); + }; + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java new file mode 100644 index 0000000..ab612ec --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java @@ -0,0 +1,25 @@ +package de.example.spring.kafka; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +/** + * En lugar de esta definicion "custom" usaremos las dos que + * vienen por defecto en: + * org.springframework.cloud.stream.messaging.Sink + * org.springframework.cloud.stream.messaging.Source + * + * Esta definicion custom se usaría igual que Sink y Source :) + * Donde veas Sink y Source podrías haber puesto esto en su lugar ;) + */ +public interface InputOutputChannels { + + @Input("inputChannel") + SubscribableChannel input(); + + @Output("outputChannel") + MessageChannel output(); + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java new file mode 100644 index 0000000..1a778ca --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/Sender.java @@ -0,0 +1,23 @@ +package de.example.spring.kafka; + +import javax.inject.Inject; + +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.messaging.support.MessageBuilder; + +@EnableBinding(Source.class) +public class Sender { + // You could use here your custom interface. See: InputOutputChannels :) + private final Source source; + + @Inject + public Sender(Source source) { + this.source = source; + } + + public void sendMessage(String message) { + Product product = new Product(message, "this is some description"); + source.output().send(MessageBuilder.withPayload(product).build()); + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java new file mode 100644 index 0000000..e64a561 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -0,0 +1,32 @@ +package de.example.spring.kafka; + +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SenderConfig { + + @Bean + public Sender sender(Source source) { + return new Sender(source); + } + +// @Bean +// public MessageConverter customMessageConverter(ObjectMapper objectMapper) { +// MyCustomMessageConverter converter = new MyCustomMessageConverter(); +// converter.setSerializedPayloadClass(String.class); +// if (objectMapper != null) { +// converter.setObjectMapper(objectMapper); +// } +// +// return converter; +// } + +// @Bean +// public MessageConverter avroMessageConverter() throws IOException { +// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); +// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); +// return converter; +// } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml new file mode 100644 index 0000000..c2546c4 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml @@ -0,0 +1,37 @@ +logging: + level: + org: + springframework: info + ROOT: info + +spring: + main: + banner-mode: "off" + # We do not need Tomcat running (this application runs from console) + web-environment: false + cloud: + stream: + bindings: + output: + binder: kafka + destination: test + contentType: application/json + # Consumer (input) and producer (output) are Spring Cloud Stream applications :) + headerMode: embeddedHeaders + instanceCount: 1 + instanceIndex: 0 + kafka: + binder: + zkNodes: "kafka:2181" + brokers: "kafka:9092,kafka:9093,kafka:9094" + autoCreateTopics: true + +# Health indicator for binders. +management: + health: + binders: + enabled: true + + +# ME QUEDA 7.3 Partitioning +# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java new file mode 100644 index 0000000..71fbfec --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderShould.java @@ -0,0 +1,42 @@ +package de.example.spring.kafka; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import javax.inject.Inject; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = { Sender.class }) +@DirtiesContext +public class SenderShould { + + @Inject + Source source; + + @Inject + Sender sender; + + @Inject + private MessageCollector messageCollector; + + @Test + public void sendSomeProduct() { + Product product = new Product("hello", "this is some description"); + + sender.sendMessage("hello"); + + Message received = (Message) messageCollector.forChannel(source.output()).poll(); + + assertThat(received.getPayload().getDescription(), is(product.getDescription())); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml new file mode 100644 index 0000000..3239586 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-stream-kafka-bom + 1.0-SNAPSHOT + + + spring-stream-kafka-registry + spring-stream-kafka-registry + https://gumartinm.name + Spring Cloud Stream Kafka Registry + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + + + + javax.inject + javax.inject + 1 + + + + de.example.spring.kafka + spring-cloud-stream-kafka-common-example + 1.0-SNAPSHOT + + + + + org.springframework.cloud + spring-cloud-stream-server + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..9817e62 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,17 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer; + +@SpringBootApplication +@EnableSchemaRegistryServer +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml new file mode 100644 index 0000000..bebb6c8 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-registry/src/main/resources/application.yml @@ -0,0 +1,14 @@ +logging: + level: + org: + springframework: info + ROOT: info + +server: + port: 8888 +spring: + cloud: + stream: + schema: + server: + path: schema -- 2.1.4