From 7d5e8dde34321029566155cb04efe2eca96287a5 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sun, 29 Jan 2017 22:29:41 +0100 Subject: [PATCH] Many thoughts about MyCustomMessageConverter IMHO Spring Cloud Stream is a bit messy. Be careful when using it and trying to connect to systems not using Spring :( --- .../spring/kafka/MyCustomMessageConverter.java | 50 ++++++++++++++++++++++ .../de/example/spring/kafka/ReceiverConfig.java | 23 +++++----- .../main/java/de/example/spring/kafka/Sender.java | 2 +- .../java/de/example/spring/kafka/SenderConfig.java | 23 +++++----- .../src/main/resources/application.yml | 1 - 5 files changed, 71 insertions(+), 28 deletions(-) diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java index 419ae06..529bcb0 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java @@ -18,7 +18,57 @@ import org.springframework.util.MimeType; * to: * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"} * + * + * By means of my custom converter we end up having: + * + * contentType "application/json"{"name":"example message","description":"this is some description"} + * + * About "application/json" and character encoding: + * https://tools.ietf.org/html/rfc7158#section-8.1 "The default encoding is UTF-8" :) + * + * + * */ + + +// +// You should set breakpoints in org.springframework.cloud.stream.binder.AbstractBinder.deserializePayload +// org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary +// org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.SendingHandler.handleMessageInternal +// +// The code in this Spring project is a bit messy (IMHO) for example this.embedHeaders in +// SendingHandler.handleMessageInternal: When the heck this.embedHeaders must do something? Depending on +// the content type embedHeaders will be doing something or not. This is weird. Also deserializePayload is +// always using UTF-8. It does not matter what character set I am receiving :/ +// +// +// +// +// Be careful when using Spring Cloud Stream because there could be surprises when trying to connect to systems +// not using Spring (Spring is creating a mess with the headers...) +// +// + + + +// +// Kafka messages WITHOUT MyCustomMessageConverter: +// headers: +// contentType "text/plain" <--------------- perhaps this is important. +// originalContentType "application/json;charset=UTF-8" +// payload: +// {"name":"example message","description":"this is some description"} +// +// +// Kafka messages WITH MyCustomMessageConverter: +// headers: +// contentType "application/json" +// payload: +// {"name":"example message","description":"this is some description"} +// + + +// YOU'D RATHER BETTER NOT USE THIS CONVERTER BECAUSE I DO NOT KNOW IF contentType "text/plain" IS IMPORTANT OR NOT :( public class MyCustomMessageConverter extends MappingJackson2MessageConverter { public MyCustomMessageConverter() { diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java index 4d49cf8..e385776 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java @@ -2,9 +2,6 @@ package de.example.spring.kafka; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.converter.MessageConverter; - -import com.fasterxml.jackson.databind.ObjectMapper; @Configuration public class ReceiverConfig { @@ -15,16 +12,16 @@ public class ReceiverConfig { } - @Bean - public MessageConverter customMessageConverter(ObjectMapper objectMapper) { - MyCustomMessageConverter converter = new MyCustomMessageConverter(); - converter.setSerializedPayloadClass(String.class); - if (objectMapper != null) { - converter.setObjectMapper(objectMapper); - } - - return converter; - } +// @Bean +// public MessageConverter customMessageConverter(ObjectMapper objectMapper) { +// MyCustomMessageConverter converter = new MyCustomMessageConverter(); +// converter.setSerializedPayloadClass(String.class); +// if (objectMapper != null) { +// converter.setObjectMapper(objectMapper); +// } +// +// return converter; +// } // @Bean diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java index 68f0470..1a778ca 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java @@ -8,7 +8,7 @@ import org.springframework.messaging.support.MessageBuilder; @EnableBinding(Source.class) public class Sender { - // Aquí podrías haber usado tu custom interface: InputOutputChannels :) + // You could use here your custom interface. See: InputOutputChannels :) private final Source source; @Inject diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java index 7919aa2..e64a561 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java @@ -3,9 +3,6 @@ package de.example.spring.kafka; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.converter.MessageConverter; - -import com.fasterxml.jackson.databind.ObjectMapper; @Configuration public class SenderConfig { @@ -15,16 +12,16 @@ public class SenderConfig { return new Sender(source); } - @Bean - public MessageConverter customMessageConverter(ObjectMapper objectMapper) { - MyCustomMessageConverter converter = new MyCustomMessageConverter(); - converter.setSerializedPayloadClass(String.class); - if (objectMapper != null) { - converter.setObjectMapper(objectMapper); - } - - return converter; - } +// @Bean +// public MessageConverter customMessageConverter(ObjectMapper objectMapper) { +// MyCustomMessageConverter converter = new MyCustomMessageConverter(); +// converter.setSerializedPayloadClass(String.class); +// if (objectMapper != null) { +// converter.setObjectMapper(objectMapper); +// } +// +// return converter; +// } // @Bean // public MessageConverter avroMessageConverter() throws IOException { diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml index eff109c..c2546c4 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml +++ b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml @@ -15,7 +15,6 @@ spring: output: binder: kafka destination: test - # Requires work around, see: MyCustomMessageConverter contentType: application/json # Consumer (input) and producer (output) are Spring Cloud Stream applications :) headerMode: embeddedHeaders -- 2.1.4