From: Gustavo Martin Morcuende Date: Sun, 29 Jan 2017 17:38:06 +0000 (+0100) Subject: Kafka working with application/json X-Git-Url: https://git.gumartinm.name/?a=commitdiff_plain;h=0f5f940912511d7b3bc9da78d6d7c80dac3907cf;p=JavaForFun Kafka working with application/json Working around JavaClassMimeTypeConversion.mimeTypeFromObject --- diff --git a/SpringJava/Kafka/SpringCloudStream/pom.xml b/SpringJava/Kafka/SpringCloudStream/pom.xml index c90a7e4..712b080 100644 --- a/SpringJava/Kafka/SpringCloudStream/pom.xml +++ b/SpringJava/Kafka/SpringCloudStream/pom.xml @@ -18,6 +18,8 @@ + spring-cloud-stream-kafka-example-bom + spring-cloud-stream-kafka-common-example spring-cloud-stream-kafka-consumer-example spring-cloud-stream-kafka-producer-example diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml new file mode 100644 index 0000000..c7d4f82 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml @@ -0,0 +1,27 @@ + + + + 4.0.0 + + + de.example.spring.kafka + spring-cloud-stream-kafka-example-bom + 1.0-SNAPSHOT + + + spring-cloud-stream-kafka-common-example + spring-cloud-stream-kafka-common-example + https://gumartinm.name + Spring Cloud Stream Kafka Commons + + gumartinm + https://gumartinm.name + + + scm:git:https://git.gumartinm.name/JavaForFun + https://git.gumartinm.name/JavaForFun + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java new file mode 100644 index 0000000..f489302 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java @@ -0,0 +1,30 @@ +package de.example.spring.kafka; + +import java.nio.charset.Charset; + +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.util.MimeType; + +/** + * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject() + * + * This code: + * if (payload instanceof String) { + * return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON + * : MimeTypeUtils.TEXT_PLAIN; + * } + * + * Changes messages from: + * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} + * + * to: + * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} + * + */ +public class MyCustomMessageConverter extends MappingJackson2MessageConverter { + + public MyCustomMessageConverter() { + super(new MimeType("application", "json")); + } + +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java new file mode 100644 index 0000000..98564a5 --- /dev/null +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java @@ -0,0 +1,19 @@ +package de.example.spring.kafka; + +public class Product { + private String name; + private String description; + + public Product(String name, String description) { + this.name = name; + this.description = description; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return this.description; + } +} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml index 6d78cf9..e8cfa41 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml @@ -24,4 +24,28 @@ https://git.gumartinm.name/JavaForFun + + + + javax.inject + javax.inject + 1 + + + + de.example.spring.kafka + spring-cloud-stream-kafka-common-example + 1.0-SNAPSHOT + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java deleted file mode 100644 index cc56794..0000000 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java +++ /dev/null @@ -1,26 +0,0 @@ -package de.example.spring.kafka; - -import org.springframework.messaging.Message; -import org.springframework.messaging.converter.AbstractMessageConverter; -import org.springframework.util.MimeType; - -public class MyCustomMessageConverter extends AbstractMessageConverter { - - public MyCustomMessageConverter() { - super(new MimeType("application", "example")); - } - - @Override - protected boolean supports(Class clazz) { - return (String.class == clazz); - } - - @Override - protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { - Object payload = message.getPayload(); - - logger.info("convertFromInternal, payload: " + payload); - - return payload instanceof String ? payload : new String((byte[]) payload); - } -} diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java index 83f3660..7c88cd5 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java @@ -11,8 +11,9 @@ public class Receiver { private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); @StreamListener(Sink.INPUT) - public void handle(String message) { - LOGGER.info("received message='{}'", message); + public void handle(Product product) { + LOGGER.info("product name='{}'", product.getName()); + LOGGER.info("product description='{}'", product.getDescription()); } } diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java index 3788a7b..4d49cf8 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -1,13 +1,10 @@ package de.example.spring.kafka; -import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -//import org.springframework.core.io.ClassPathResource; import org.springframework.messaging.converter.MessageConverter; -import org.springframework.util.MimeType; -import java.io.IOException; +import com.fasterxml.jackson.databind.ObjectMapper; @Configuration public class ReceiverConfig { @@ -19,14 +16,21 @@ public class ReceiverConfig { @Bean - public MessageConverter customMessageConverter() { - return new MyCustomMessageConverter(); + public MessageConverter customMessageConverter(ObjectMapper objectMapper) { + MyCustomMessageConverter converter = new MyCustomMessageConverter(); + converter.setSerializedPayloadClass(String.class); + if (objectMapper != null) { + converter.setObjectMapper(objectMapper); + } + + return converter; } - @Bean - public MessageConverter avroMessageConverter() throws IOException { - AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); - //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); - return converter; - } + +// @Bean +// public MessageConverter avroMessageConverter() throws IOException { +// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); +// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); +// return converter; +// } } diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml index ce733c8..72eafba 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml @@ -18,11 +18,12 @@ spring: binder: kafka destination: test group: helloworld - contentType: "" + contentType: application/json consumer: concurrency: 1 partitioned: false - headerMode: raw + # Consumer (input) and producer (output) are Spring Cloud Stream applications :) + headerMode: embeddedHeaders maxAttempts: 3 backOffInitialInterval: 1000 instanceCount: 1 diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml index 769c420..4b96825 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml @@ -75,6 +75,7 @@ 1.8.1 + @@ -159,23 +160,27 @@ + - - - org.springframework.boot - spring-boot-maven-plugin - 1.4.4.RELEASE - - + The start-class property is automatically populated by spring-boot-maven-plugin and it is + used in spring-boot-starter-parent. + + See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params + --> + + org.springframework.boot + spring-boot-maven-plugin + 1.4.4.RELEASE + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml index 1ea5ef4..3335647 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml @@ -24,12 +24,27 @@ https://git.gumartinm.name/JavaForFun - + javax.inject javax.inject 1 + + + de.example.spring.kafka + spring-cloud-stream-kafka-common-example + 1.0-SNAPSHOT + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java index c05ba2f..1034a98 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java @@ -19,7 +19,7 @@ public class Application { if (args.length > 0) { - message = args[1]; + message = args[0]; } sender.sendMessage(message); diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java index 9157a7e..68f0470 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java @@ -17,6 +17,7 @@ public class Sender { } public void sendMessage(String message) { - source.output().send(MessageBuilder.withPayload(message).build()); + Product product = new Product(message, "this is some description"); + source.output().send(MessageBuilder.withPayload(product).build()); } } diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java index f4fa20b..7919aa2 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -3,6 +3,9 @@ package de.example.spring.kafka; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.converter.MessageConverter; + +import com.fasterxml.jackson.databind.ObjectMapper; @Configuration public class SenderConfig { @@ -11,4 +14,22 @@ public class SenderConfig { public Sender sender(Source source) { return new Sender(source); } + + @Bean + public MessageConverter customMessageConverter(ObjectMapper objectMapper) { + MyCustomMessageConverter converter = new MyCustomMessageConverter(); + converter.setSerializedPayloadClass(String.class); + if (objectMapper != null) { + converter.setObjectMapper(objectMapper); + } + + return converter; + } + +// @Bean +// public MessageConverter avroMessageConverter() throws IOException { +// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes")); +// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro")); +// return converter; +// } } diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml index 4d37c02..07b1ebf 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml @@ -7,14 +7,18 @@ logging: spring: main: banner-mode: "off" + # We do not need Tomcat running (this application runs as a client) + web-environment: false cloud: stream: bindings: output: binder: kafka destination: test - contentType: "" - headerMode: raw + contentType: application/json + # headerMode value is used only when input bindings :( + # headerMode: raw + #contentType: "application/example" instanceCount: 1 instanceIndex: 0 kafka: