From 85b4c744f803618bf6443b0d5d45e3b6b739b611 Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sun, 19 Feb 2017 23:03:30 +0100 Subject: [PATCH] Configuration and Javadoc improvements --- .../example/spring/kafka/InputOutputChannels.java | 8 +- .../src/main/resources/application.yml | 86 +++++++++++++++++++--- .../spring/kafka/SenderIntegrationTest.java | 6 +- 3 files changed, 80 insertions(+), 20 deletions(-) diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java index ab612ec..a803c2a 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java @@ -6,13 +6,13 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** - * En lugar de esta definicion "custom" usaremos las dos que - * vienen por defecto en: + * Instead of this "custom" definition we will use the ones + * already implemented by Spring: * org.springframework.cloud.stream.messaging.Sink * org.springframework.cloud.stream.messaging.Source * - * Esta definicion custom se usaría igual que Sink y Source :) - * Donde veas Sink y Source podrías haber puesto esto en su lugar ;) + * This "custom" definition would be used in the same way as Sink and Source :) + * Wherever you see Sink and Source you could replace them by this interface ;) */ public interface InputOutputChannels { diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml index a397d64..d6a7e8b 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml @@ -4,6 +4,7 @@ logging: springframework: info ROOT: info + spring: main: banner-mode: "off" @@ -11,30 +12,91 @@ spring: web-environment: false cloud: stream: + + # We will have 3 brokers and 1 partition. The partition will be replicated/copied in every broker. Just one broker will be the leader. + # Zookeeper is in charge of choosing the leader. + # If you want to consume data in order you need one consumer per partition (consumer could be a process or a thread in some Java process, + # so one Java process could be implementing multiple consumers by means of multiple threads) + # Consuming data in order: 1 partition + 1 consumer + # + # See: https://kafka.apache.org/documentation/#intro_consumers + # However, if you require a total order over records this can be achieved with a topic that has only one partition, though + # this will mean only one consumer process per consumer group. + instanceCount: 1 + # This is the binder with index 0 (only one consumer/Java process) + instanceIndex: 0 + bindings: output: binder: kafka - destination: test + + # Topic + destination: mytopic + # Using Apache Avro with Schema Registry # contentType: application/*+avro contentType: application/json - # Consumer (input) and producer (output) are Spring Cloud Stream applications :) - headerMode: embeddedHeaders - instanceCount: 1 - instanceIndex: 0 + + + producer: + # payload will be translated to Product.class (I hope) + # For example, if I had 3 partitions. Product objects with the same key will go to the same partition. + # Otherwise if I had 3 partitions and no key, objects will be using different partitions in a random way. + partitionKeyExpression: payload.name + + # We have 3 brokers and 1 partition. This one partition will replicated/copied in every broker. + partitionCount: 1 + + # Consumer (input) and producer (output) are Spring Cloud Stream applications :) + headerMode: embeddedHeaders + + + kafka: binder: - zkNodes: "kafka:2181" + # Kafka cluster with 3 brokers brokers: "kafka:9092,kafka:9093,kafka:9094" - autoCreateTopics: true + # Zookeeper + zkNodes: "kafka:2181" + + # I THINK THIS CONFIGURATION IS WORTHLESS... IT IS NEVER USED... + # see: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/8362cc35691ff64ce5ad401f43ca141910255b76 + # JUST IN CASE I AM GOING TO SKIP IT, JUST IN CASE THERE COULD BE SOME CONFLICT WITH autoCommitOnError :/ + offsetUpdateTimeWindow: 3000 + + # This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that + # the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. + # This is equivalent to the acks=-1 setting. + requiredAcks: -1 + + + # It applies when using autoCreateTopics and autoAddPartitions. + # minPartitionCount: 1 + + # I have 3 brokers and 1 partition. This partition must be replicated/copied in the 3 brokers. + # This option is only useful when using autoCreateTopics (Kafka will create our topics based on this + # configuration) Because I do not like creating stuff in run time I will be using autoCreateTopics: false + # and this option does not apply :) + # replicationFactor: 3 + + # Topics will be created by me on the Kafka cluster/servers + autoCreateTopics: false + # Partitions will be created by me on the Kafka cluster/servers + autoAddPartitions: false + + bindings: + output: + producer: + sync: true + # No batch mode. + batchTimeout: 0 + + + # Apache AVRO schema-registry schemaRegistryClient: "http://localhost:8888/schema-registry/" # Health indicator for binders. management: health: binders: - enabled: true - - -# ME QUEDA 7.3 Partitioning -# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options + enabled: true \ No newline at end of file diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderIntegrationTest.java b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderIntegrationTest.java index b864a28..3f8ed6b 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderIntegrationTest.java +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderIntegrationTest.java @@ -4,8 +4,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import java.io.IOException; -import java.io.StringWriter; -import java.io.Writer; import javax.inject.Inject; @@ -17,13 +15,13 @@ import org.springframework.cloud.stream.test.binder.MessageCollector; import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration; import org.springframework.messaging.Message; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; -@RunWith(SpringJUnit4ClassRunner.class) +@RunWith(SpringRunner.class) @SpringBootTest(classes = { Sender.class, TestSupportBinderAutoConfiguration.class }) @DirtiesContext public class SenderIntegrationTest { -- 2.1.4