Kafka working with application/json
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 29 Jan 2017 17:38:06 +0000 (18:38 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 29 Jan 2017 17:38:06 +0000 (18:38 +0100)
Working around JavaClassMimeTypeConversion.mimeTypeFromObject

15 files changed:
SpringJava/Kafka/SpringCloudStream/pom.xml
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java [new file with mode: 0644]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/pom.xml
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java [deleted file]
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/Receiver.java
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/ReceiverConfig.java
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/resources/application.yml
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-example-bom/pom.xml
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/pom.xml
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Application.java
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/Sender.java
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/java/de/example/spring/kafka/SenderConfig.java
SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-producer-example/src/main/resources/application.yml

index c90a7e4..712b080 100644 (file)
@@ -18,6 +18,8 @@
     </scm>
 
     <modules>
+        <module>spring-cloud-stream-kafka-example-bom</module>
+        <module>spring-cloud-stream-kafka-common-example</module>
         <module>spring-cloud-stream-kafka-consumer-example</module>
         <module>spring-cloud-stream-kafka-producer-example</module>
     </modules>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/pom.xml
new file mode 100644 (file)
index 0000000..c7d4f82
--- /dev/null
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>de.example.spring.kafka</groupId>
+        <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+
+       <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+       <name>spring-cloud-stream-kafka-common-example</name>
+       <url>https://gumartinm.name</url>
+       <description>Spring Cloud Stream Kafka Commons</description>
+       <organization>
+           <name>gumartinm</name>
+           <url>https://gumartinm.name</url>
+       </organization>
+    <scm>
+        <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+        <url>https://git.gumartinm.name/JavaForFun</url>
+    </scm>
+
+</project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java
new file mode 100644 (file)
index 0000000..f489302
--- /dev/null
@@ -0,0 +1,30 @@
+package de.example.spring.kafka;
+
+import java.nio.charset.Charset;
+
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.util.MimeType;
+
+/**
+ * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject()
+ *
+ * This code:
+ *                     if (payload instanceof String) {
+ *                             return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
+ *                                             : MimeTypeUtils.TEXT_PLAIN;
+ *                     }
+ *
+ * Changes messages from: 
+ * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ * 
+ * to:
+ * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ * 
+ */
+public class MyCustomMessageConverter extends MappingJackson2MessageConverter {
+
+  public MyCustomMessageConverter() {
+               super(new MimeType("application", "json"));
+  }
+
+}
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-common-example/src/main/java/de/example/spring/kafka/Product.java
new file mode 100644 (file)
index 0000000..98564a5
--- /dev/null
@@ -0,0 +1,19 @@
+package de.example.spring.kafka;
+
+public class Product {
+       private String name;
+       private String description;
+       
+       public Product(String name, String description) {
+               this.name = name;
+               this.description = description;
+       }
+
+       public String getName() {
+               return this.name;
+       }
+       
+       public String getDescription() {
+               return this.description;
+       }
+}
index 6d78cf9..e8cfa41 100644 (file)
         <url>https://git.gumartinm.name/JavaForFun</url>
     </scm>
 
+
+    <dependencies>
+        <dependency>
+            <groupId>javax.inject</groupId>
+            <artifactId>javax.inject</artifactId>
+            <version>1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>de.example.spring.kafka</groupId>
+            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+             </plugin>
+        </plugins>
+    </build>
+
 </project>
diff --git a/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java b/SpringJava/Kafka/SpringCloudStream/spring-cloud-stream-kafka-consumer-example/src/main/java/de/example/spring/kafka/MyCustomMessageConverter.java
deleted file mode 100644 (file)
index cc56794..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-package de.example.spring.kafka;
-
-import org.springframework.messaging.Message;
-import org.springframework.messaging.converter.AbstractMessageConverter;
-import org.springframework.util.MimeType;
-
-public class MyCustomMessageConverter extends AbstractMessageConverter {
-
-  public MyCustomMessageConverter() {
-    super(new MimeType("application", "example"));
-  }
-
-  @Override
-  protected boolean supports(Class<?> clazz) {
-    return (String.class == clazz);
-  }
-
-  @Override
-  protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
-    Object payload = message.getPayload();
-
-    logger.info("convertFromInternal, payload: " + payload);
-
-       return payload instanceof String ? payload : new String((byte[]) payload);
-  }
-}
index 83f3660..7c88cd5 100644 (file)
@@ -11,8 +11,9 @@ public class Receiver {
   private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
 
   @StreamListener(Sink.INPUT)
-  public void handle(String message) {
-    LOGGER.info("received message='{}'", message);
+  public void handle(Product product) {
+    LOGGER.info("product name='{}'", product.getName());
+    LOGGER.info("product description='{}'", product.getDescription());
   }
 
 }
index 3788a7b..4d49cf8 100644 (file)
@@ -1,13 +1,10 @@
 package de.example.spring.kafka;
 
-import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-//import org.springframework.core.io.ClassPathResource;
 import org.springframework.messaging.converter.MessageConverter;
-import org.springframework.util.MimeType;
 
-import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @Configuration
 public class ReceiverConfig {
@@ -19,14 +16,21 @@ public class ReceiverConfig {
 
 
   @Bean
-  public MessageConverter customMessageConverter() {
-    return new MyCustomMessageConverter();
+  public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+         MyCustomMessageConverter converter = new MyCustomMessageConverter();
+         converter.setSerializedPayloadClass(String.class);
+         if (objectMapper != null) {
+                 converter.setObjectMapper(objectMapper);
+         }
+         
+         return converter;
   }
 
-  @Bean
-  public MessageConverter avroMessageConverter() throws IOException {
-      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
-      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
-      return converter;
-  }
+
+//  @Bean
+//  public MessageConverter avroMessageConverter() throws IOException {
+//      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+//      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+//      return converter;
+//  }
 }
index ce733c8..72eafba 100644 (file)
@@ -18,11 +18,12 @@ spring:
           binder: kafka
           destination: test
           group: helloworld
-          contentType: ""
+          contentType: application/json
           consumer:
             concurrency: 1
             partitioned: false
-            headerMode: raw
+            # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+            headerMode: embeddedHeaders
             maxAttempts: 3
             backOffInitialInterval: 1000
       instanceCount: 1
index 769c420..4b96825 100644 (file)
@@ -75,6 +75,7 @@
             <version>1.8.1</version>
         </dependency>
     </dependencies>
+    
        <build>
                <plugins>
             <plugin>
                     </execution>
                 </executions>
             </plugin>
+       </plugins>
 
-            <!--
-                 It will create our running jar file.
-                 The main class must be located in: src/main/java/de/example/spring/kafka
-                 otherwise you need an explicit declaration using this property
-                 <start-class>de.exmaple.spring.kafka.Application</start-class>
+         <pluginManagement>
+            <plugins>
+                <!--
+                      It will create our running jar file.
+                      The main class must be located in: src/main/java/de/example/spring/kafka
+                      otherwise you need an explicit declaration using this property
+                      <start-class>de.exmaple.spring.kafka.Application</start-class>
 
-                 The start-class property is automatically populated by spring-boot-maven-plugin and it is
-                 used in spring-boot-starter-parent.
-               
-                 See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
-            -->
-            <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-                <version>1.4.4.RELEASE</version>
-            </plugin>
-       </plugins>
+                      The start-class property is automatically populated by spring-boot-maven-plugin and it is
+                      used in spring-boot-starter-parent.
+                    
+                      See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
+                 -->
+                 <plugin>
+                     <groupId>org.springframework.boot</groupId>
+                     <artifactId>spring-boot-maven-plugin</artifactId>
+                     <version>1.4.4.RELEASE</version>
+                 </plugin>
+            </plugins>
+         </pluginManagement>
        </build>
 </project>
index 1ea5ef4..3335647 100644 (file)
         <url>https://git.gumartinm.name/JavaForFun</url>
     </scm>
 
-    <dependencies>
+       <dependencies>
         <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
             <version>1</version>
         </dependency>
+
+        <dependency>
+            <groupId>de.example.spring.kafka</groupId>
+            <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+               <plugin>
+               <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+             </plugin>
+        </plugins>
+    </build>
+
 </project>
index 9157a7e..68f0470 100644 (file)
@@ -17,6 +17,7 @@ public class Sender {
   }
 
   public void sendMessage(String message) {
-    source.output().send(MessageBuilder.withPayload(message).build());
+         Product product = new Product(message, "this is some description");
+         source.output().send(MessageBuilder.withPayload(product).build());
   }
 }
index f4fa20b..7919aa2 100644 (file)
@@ -3,6 +3,9 @@ package de.example.spring.kafka;
 import org.springframework.cloud.stream.messaging.Source;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.converter.MessageConverter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 @Configuration
 public class SenderConfig {
@@ -11,4 +14,22 @@ public class SenderConfig {
   public Sender sender(Source source) {
     return new Sender(source);
   }
+  
+  @Bean
+  public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+         MyCustomMessageConverter converter = new MyCustomMessageConverter();
+         converter.setSerializedPayloadClass(String.class);
+         if (objectMapper != null) {
+                 converter.setObjectMapper(objectMapper);
+         }
+         
+         return converter;
+  }
+
+//  @Bean
+//  public MessageConverter avroMessageConverter() throws IOException {
+//      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+//      //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+//      return converter;
+//  }
 }
index 4d37c02..07b1ebf 100644 (file)
@@ -7,14 +7,18 @@ logging:
 spring:
   main:
     banner-mode: "off"
+    # We do not need Tomcat running (this application runs as a client)
+    web-environment: false
   cloud:
     stream:
       bindings:
         output:
           binder: kafka
           destination: test
-          contentType: ""
-          headerMode: raw
+          contentType: application/json
+          # headerMode value is used only when input bindings :(
+          # headerMode: raw
+          #contentType: "application/example"
       instanceCount: 1
       instanceIndex: 0
       kafka: