</scm>
<modules>
+ <module>spring-cloud-stream-kafka-example-bom</module>
+ <module>spring-cloud-stream-kafka-common-example</module>
<module>spring-cloud-stream-kafka-consumer-example</module>
<module>spring-cloud-stream-kafka-producer-example</module>
</modules>
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-example-bom</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+ <name>spring-cloud-stream-kafka-common-example</name>
+ <url>https://gumartinm.name</url>
+ <description>Spring Cloud Stream Kafka Commons</description>
+ <organization>
+ <name>gumartinm</name>
+ <url>https://gumartinm.name</url>
+ </organization>
+ <scm>
+ <developerConnection>scm:git:https://git.gumartinm.name/JavaForFun</developerConnection>
+ <url>https://git.gumartinm.name/JavaForFun</url>
+ </scm>
+
+</project>
--- /dev/null
+package de.example.spring.kafka;
+
+import java.nio.charset.Charset;
+
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.util.MimeType;
+
+/**
+ * Working around problem created by org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion.mimeTypeFromObject()
+ *
+ * This code:
+ * if (payload instanceof String) {
+ * return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
+ * : MimeTypeUtils.TEXT_PLAIN;
+ * }
+ *
+ * Changes messages from:
+ * contentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ *
+ * to:
+ * contentType "text/plain" originalContentType "application/json;charset=UTF-8"{"name":"example message","description":"this is some description"}
+ *
+ */
+public class MyCustomMessageConverter extends MappingJackson2MessageConverter {
+
+ public MyCustomMessageConverter() {
+ super(new MimeType("application", "json"));
+ }
+
+}
--- /dev/null
+package de.example.spring.kafka;
+
+public class Product {
+ private String name;
+ private String description;
+
+ public Product(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getDescription() {
+ return this.description;
+ }
+}
<url>https://git.gumartinm.name/JavaForFun</url>
</scm>
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax.inject</artifactId>
+ <version>1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
+++ /dev/null
-package de.example.spring.kafka;
-
-import org.springframework.messaging.Message;
-import org.springframework.messaging.converter.AbstractMessageConverter;
-import org.springframework.util.MimeType;
-
-public class MyCustomMessageConverter extends AbstractMessageConverter {
-
- public MyCustomMessageConverter() {
- super(new MimeType("application", "example"));
- }
-
- @Override
- protected boolean supports(Class<?> clazz) {
- return (String.class == clazz);
- }
-
- @Override
- protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
- Object payload = message.getPayload();
-
- logger.info("convertFromInternal, payload: " + payload);
-
- return payload instanceof String ? payload : new String((byte[]) payload);
- }
-}
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
@StreamListener(Sink.INPUT)
- public void handle(String message) {
- LOGGER.info("received message='{}'", message);
+ public void handle(Product product) {
+ LOGGER.info("product name='{}'", product.getName());
+ LOGGER.info("product description='{}'", product.getDescription());
}
}
package de.example.spring.kafka;
-import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-//import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.converter.MessageConverter;
-import org.springframework.util.MimeType;
-import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class ReceiverConfig {
@Bean
- public MessageConverter customMessageConverter() {
- return new MyCustomMessageConverter();
+ public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+ MyCustomMessageConverter converter = new MyCustomMessageConverter();
+ converter.setSerializedPayloadClass(String.class);
+ if (objectMapper != null) {
+ converter.setObjectMapper(objectMapper);
+ }
+
+ return converter;
}
- @Bean
- public MessageConverter avroMessageConverter() throws IOException {
- AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
- //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
- return converter;
- }
+
+// @Bean
+// public MessageConverter avroMessageConverter() throws IOException {
+// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+// return converter;
+// }
}
binder: kafka
destination: test
group: helloworld
- contentType: ""
+ contentType: application/json
consumer:
concurrency: 1
partitioned: false
- headerMode: raw
+ # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+ headerMode: embeddedHeaders
maxAttempts: 3
backOffInitialInterval: 1000
instanceCount: 1
<version>1.8.1</version>
</dependency>
</dependencies>
+
<build>
<plugins>
<plugin>
</execution>
</executions>
</plugin>
+ </plugins>
- <!--
- It will create our running jar file.
- The main class must be located in: src/main/java/de/example/spring/kafka
- otherwise you need an explicit declaration using this property
- <start-class>de.exmaple.spring.kafka.Application</start-class>
+ <pluginManagement>
+ <plugins>
+ <!--
+ It will create our running jar file.
+ The main class must be located in: src/main/java/de/example/spring/kafka
+ otherwise you need an explicit declaration using this property
+ <start-class>de.exmaple.spring.kafka.Application</start-class>
- The start-class property is automatically populated by spring-boot-maven-plugin and it is
- used in spring-boot-starter-parent.
-
- See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
- -->
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.4.RELEASE</version>
- </plugin>
- </plugins>
+ The start-class property is automatically populated by spring-boot-maven-plugin and it is
+ used in spring-boot-starter-parent.
+
+ See: http://docs.spring.io/spring-boot/docs/1.0.1.RELEASE/reference/html/build-tool-plugins-maven-plugin.html#build-tool-plugins-maven-packaging-optional-params
+ -->
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>1.4.4.RELEASE</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
<url>https://git.gumartinm.name/JavaForFun</url>
</scm>
- <dependencies>
+ <dependencies>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
+
+ <dependency>
+ <groupId>de.example.spring.kafka</groupId>
+ <artifactId>spring-cloud-stream-kafka-common-example</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
if (args.length > 0) {
- message = args[1];
+ message = args[0];
}
sender.sendMessage(message);
}
public void sendMessage(String message) {
- source.output().send(MessageBuilder.withPayload(message).build());
+ Product product = new Product(message, "this is some description");
+ source.output().send(MessageBuilder.withPayload(product).build());
}
}
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.messaging.converter.MessageConverter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
@Configuration
public class SenderConfig {
public Sender sender(Source source) {
return new Sender(source);
}
+
+ @Bean
+ public MessageConverter customMessageConverter(ObjectMapper objectMapper) {
+ MyCustomMessageConverter converter = new MyCustomMessageConverter();
+ converter.setSerializedPayloadClass(String.class);
+ if (objectMapper != null) {
+ converter.setObjectMapper(objectMapper);
+ }
+
+ return converter;
+ }
+
+// @Bean
+// public MessageConverter avroMessageConverter() throws IOException {
+// AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
+// //converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
+// return converter;
+// }
}
spring:
main:
banner-mode: "off"
+ # We do not need Tomcat running (this application runs as a client)
+ web-environment: false
cloud:
stream:
bindings:
output:
binder: kafka
destination: test
- contentType: ""
- headerMode: raw
+ contentType: application/json
+ # headerMode value is used only when input bindings :(
+ # headerMode: raw
+ #contentType: "application/example"
instanceCount: 1
instanceIndex: 0
kafka: