From 8d266c6ef9f3feddeea81ebfe9b4227968328098 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sun, 29 Jan 2017 09:11:33 +0100 Subject: [PATCH] Kafka: using spring cloud stream and binders. --- SpringJava/Kafka/SpringCloudStream/README | 8 ++ SpringJava/Kafka/SpringCloudStream/pom.xml | 26 ++++ .../pom.xml | 27 ++++ .../java/de/example/spring/kafka/Application.java | 23 ++++ .../spring/kafka/MyCustomMessageConverter.java | 26 ++++ .../java/de/example/spring/kafka/Receiver.java | 18 +++ .../de/example/spring/kafka/ReceiverConfig.java | 32 +++++ .../src/main/resources/application.yml | 47 +++++++ .../spring-cloud-stream-kafka-example-bom/pom.xml | 153 +++++++++++++++++++++ .../pom.xml | 35 +++++ .../java/de/example/spring/kafka/Application.java | 31 +++++ .../example/spring/kafka/InputOutputChannels.java | 25 ++++ .../main/java/de/example/spring/kafka/Sender.java | 24 ++++ .../java/de/example/spring/kafka/SenderConfig.java | 14 ++ .../src/main/resources/application.yml | 38 +++++ 15 files changed, 527 insertions(+) create mode 100644 SpringJava/Kafka/SpringCloudStream/README create mode 100644 SpringJava/Kafka/SpringCloudStream/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java create mode 100644 SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml diff --git a/SpringJava/Kafka/SpringCloudStream/README b/SpringJava/Kafka/SpringCloudStream/README new file mode 100644 index 0000000..d3768c4 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/README @@ -0,0 +1,8 @@ +It does not work on gradle. Going back to the old and good maven configuration. + + +Documentation: http://docs.spring.io/spring-cloud-stream/docs/current/reference/html/ + https://github.com/mbogoevici/spring-cloud-stream + +Examples: https://github.com/spring-cloud/spring-cloud-stream-samples + diff --git a/SpringJava/Kafka/SpringCloudStream/pom.xml b/SpringJava/Kafka/SpringCloudStream/pom.xml new file mode 100644 index 0000000..c90a7e4 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/pom.xml @@ -0,0 +1,26 @@ + + 4.0.0 + de.example.spring.kafka + spring-cloud-stream-kafka-example + 1.0-SNAPSHOT + pom + spring-cloud-stream-kafka-example + https://gumartinm.name/ + Spring Cloud Stream Kafka + + gumartinm + https://gumartinm.name/ + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavForFun + + + + spring-cloud-stream-kafka-consumer-example + spring-cloud-stream-kafka-producer-example + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml new file mode 100644 index 0000000..6d78cf9 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml @@ -0,0 +1,27 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-cloud-stream-kafka-example-bom + 1.0-SNAPSHOT + + + spring-cloud-stream-kafka-consumer-example + spring-cloud-stream-kafka-consumer-example + https://gumartinm.name + Spring Cloud Stream Kafka Consumer + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..c367219 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,23 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + + @Bean + CommandLineRunner lookup(Receiver receiver) { + return args -> { + + }; + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java new file mode 100644 index 0000000..51eae29 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java @@ -0,0 +1,26 @@ +package de.example.spring.kafka; + +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.util.MimeType; + +public class MyCustomMessageConverter extends AbstractMessageConverter { + + public MyCustomMessageConverter() { + super(new MimeType("application", "example")); + } + + @Override + protected boolean supports(Class clazz) { + return (String.class == clazz); + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + Object payload = message.getPayload(); + + logger.info("convertFromInternal, payload: " + payload); + + return payload; + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java new file mode 100644 index 0000000..83f3660 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java @@ -0,0 +1,18 @@ +package de.example.spring.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; + +@EnableBinding(Sink.class) +public class Receiver { + private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); + + @StreamListener(Sink.INPUT) + public void handle(String message) { + LOGGER.info("received message='{}'", message); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java new file mode 100644 index 0000000..1979559 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -0,0 +1,32 @@ +package de.example.spring.kafka; + +import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.util.MimeType; + +import java.io.IOException; + +@Configuration +public class ReceiverConfig { + + @Bean + public Receiver receiver() { + return new Receiver(); + } + + + @Bean + public MessageConverter customMessageConverter() { + return new MyCustomMessageConverter(); + } + + @Bean + public MessageConverter avroMessageConverter() throws IOException { + AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); + converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); + return converter; + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml new file mode 100644 index 0000000..7f56b28 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml @@ -0,0 +1,47 @@ +logging: + level: + org: + springframework: info + ROOT: info + +spring: + main: + banner-mode: "off" + +kafka: + topic: test + messageKey: keyTest + + + +# *************************** BINDERS CONFIGURATION *************************** +spring: + cloud: + stream: + bindings: + input: + binder: kafka + destination: test + group: helloworld + contentType: "" + consumer: + concurrency: 1 + partitioned: false + headerMode: raw + maxAttempts: 3 + backOffInitialInterval: + instanceCount: 1 + instanceIndex: 0 + kafka: + binder: + brokers: "kafka:9092,kafka:9093,kafka:9094" + autoCreateTopics: true + +# Health indicator for binders. +management: + health: + binders: + enabled: true + +# ME QUEDA 7.3 Partitioning +# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml new file mode 100644 index 0000000..fc6113b --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml @@ -0,0 +1,153 @@ + + + + 4.0.0 + + + org.springframework.cloud + spring-cloud-starter-parent + Camden.SR4 + + + de.example.spring.kafka + spring-cloud-stream-kafka-example-bom + 1.0-SNAPSHOT + pom + spring-cloud-stream-kafka-example-bom + + https://gumartinm.name + Spring Cloud Stream Kafka BOM example + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + false + false + UTF-8 + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + 1.1.1.RELEASE + + + + + org.springframework.cloud + spring-cloud-stream-schema + 1.1.1.RELEASE + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-resources-plugin + 2.7 + + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-jar-plugin + 2.6 + + + + ${project.description} + ${project.version} + ${project.organization.name} + ${project.description} + ${project.version} + ${project.organization.name} + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + + org.apache.maven.surefire + surefire-junit47 + 2.19.1 + + + + + ${skip.unit.tests} + + + **/*IT.java + **/*IntegrationTest.java + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 2.19.1 + + + + integration-test + verify + + + + ${skip.integration.tests} + + **/*IT.java + **/*IntegrationTest.java + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 1.4.4.RELEASE + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml new file mode 100644 index 0000000..1ea5ef4 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-cloud-stream-kafka-example-bom + 1.0-SNAPSHOT + + + spring-cloud-stream-kafka-producer-example + spring-cloud-stream-kafka-producer-example + https://gumartinm.name + Spring Cloud Stream Kafka Producer + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + + + javax.inject + javax.inject + 1 + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java new file mode 100644 index 0000000..b8306c5 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java @@ -0,0 +1,31 @@ +package de.example.spring.kafka; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class); + } + + @Bean + CommandLineRunner lookup(Sender sender) { + return args -> { + String topic = "test"; + String message = "example message"; + + + if (args.length > 0) { + topic = args[0]; + message = args[1]; + } + + sender.sendMessage(message); + }; + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java new file mode 100644 index 0000000..ab612ec --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/InputOutputChannels.java @@ -0,0 +1,25 @@ +package de.example.spring.kafka; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +/** + * En lugar de esta definicion "custom" usaremos las dos que + * vienen por defecto en: + * org.springframework.cloud.stream.messaging.Sink + * org.springframework.cloud.stream.messaging.Source + * + * Esta definicion custom se usaría igual que Sink y Source :) + * Donde veas Sink y Source podrías haber puesto esto en su lugar ;) + */ +public interface InputOutputChannels { + + @Input("inputChannel") + SubscribableChannel input(); + + @Output("outputChannel") + MessageChannel output(); + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java new file mode 100644 index 0000000..dc14480 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java @@ -0,0 +1,24 @@ +package de.example.spring.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.messaging.support.MessageBuilder; + +import javax.inject.Inject; + +public class Sender { + private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); + + // Aquí podrías haber usado tu custom interface: InputOutputChannels :) + private final Source source; + + @Inject + public Sender(Source source) { + this.source = source; + } + + public void sendMessage(String message) { + source.output().send(MessageBuilder.withPayload(message).build()); + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java new file mode 100644 index 0000000..f4fa20b --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -0,0 +1,14 @@ +package de.example.spring.kafka; + +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SenderConfig { + + @Bean + public Sender sender(Source source) { + return new Sender(source); + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml new file mode 100644 index 0000000..a072392 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml @@ -0,0 +1,38 @@ +logging: + level: + org: + springframework: info + ROOT: info + +spring: + main: + banner-mode: "off" + + + +# *************************** BINDERS CONFIGURATION *************************** +spring: + cloud: + stream: + bindings: + output: + binder: kafka + destination: test + contentType: "" + headerMode: raw + instanceCount: 1 + instanceIndex: 0 + kafka: + binder: + brokers: "kafka:9092,kafka:9093,kafka:9094" + autoCreateTopics: true + +# Health indicator for binders. +management: + health: + binders: + enabled: true + + +# ME QUEDA 7.3 Partitioning +# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options -- 2.1.4