From be81da78fbfa6d5f7a46ec910737badd76abc16c Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sun, 19 Feb 2017 19:03:49 +0100 Subject: [PATCH] Improving consumer configuration --- .../src/main/resources/application.yml | 83 +++++++++++++++++----- 1 file changed, 67 insertions(+), 16 deletions(-) diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml index f4f8991..95a8520 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml @@ -4,9 +4,6 @@ logging: springframework: info ROOT: info -kafka: - topic: test - messageKey: keyTest spring: main: @@ -15,33 +12,87 @@ spring: web-environment: false cloud: stream: + + # We will have 3 brokers and 1 partition. The partition will be replicated/copied in every broker. Just one broker will be the leader. + # Zookeeper is in charge of choosing the leader. + # If you want to consume data in order you need one consumer per partition (consumer could be a process or a thread in some Java process, + # so one Java process could be implementing multiple consumers by means of multiple threads) + # Consuming data in order: 1 partition + 1 consumer + # + # See: https://kafka.apache.org/documentation/#intro_consumers + # However, if you require a total order over records this can be achieved with a topic that has only one partition, though + # this will mean only one consumer process per consumer group. + instanceCount: 1 + # This is the binder with index 0 (only one consumer/Java process) + instanceIndex: 0 + bindings: input: binder: kafka - destination: test - group: helloworld + + # Topic + destination: mytopic + group: mygroup + + # Using Apache Avro with Schema Registry + # contentType: application/*+avro contentType: application/json - startOffset: latest + consumer: + # In my case I have: 1 thread per consumer -> 1 partition per consumer + # Concurrenty is the number of threads. concurrency: 1 - partitioned: false + + # Whether the consumer receives data from a partitioned producer. + partitioned: true # Consumer (input) and producer (output) are Spring Cloud Stream applications :) headerMode: embeddedHeaders - maxAttempts: 3 + maxAttempts: 5 backOffInitialInterval: 1000 - instanceCount: 1 - instanceIndex: 0 + + # When set to a negative value, it will default to spring.cloud.stream.instanceCount + instanceCount: -1 + # When set to a negative value, it will default to spring.cloud.stream.instanceIndex + instanceIndex: -1 + + + kafka: binder: - zkNodes: "kafka:2181" + # Kafka cluster with 3 brokers brokers: "kafka:9092,kafka:9093,kafka:9094" - autoCreateTopics: true + # Zookeeper (I think, it is not required in Kafka v0.10.1.1 + zkNodes: "kafka:2181" + + offsetUpdateTimeWindow: 3000 + + + # It applies when using autoCreateTopics and autoAddPartitions. + # minPartitionCount: 1 + + # I have 3 brokers and 1 partition. This partition must be replicated/copied in the 3 brokers. + # This option is only useful when using autoCreateTopics (Kafka will create our topics based on this + # configuration) Because I do not like creating stuff in run time I will be using autoCreateTopics: false + # and this option does not apply :) + # replicationFactor: 3 + + # Topics will be created by me on the Kafka cluster/servers + autoCreateTopics: false + # Partitions will be created by me on the Kafka cluster/servers + autoAddPartitions: false + + bindings: + input: + consumer: + resetOffsets: true + startOffset: latest + autoCommitOffset: true + autoCommitOnError: false + + # Health indicator for binders. management: health: binders: - enabled: true - -# ME QUEDA 7.3 Partitioning -# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options + enabled: true \ No newline at end of file -- 2.1.4