From 85151fb40782137600dcab1a011ebcc4da4660bf Mon Sep 17 00:00:00 2001 From: Gustavo Martin Morcuende Date: Sun, 19 Feb 2017 20:25:01 +0100 Subject: [PATCH] Update Kafka consumer configuration --- .../src/main/resources/application.yml | 27 +++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml index 95a8520..8979947 100644 --- a/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml +++ b/SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml @@ -40,7 +40,7 @@ spring: consumer: # In my case I have: 1 thread per consumer -> 1 partition per consumer - # Concurrenty is the number of threads. + # Concurrenty is the number of threads. Usually one thread/process per partition. concurrency: 1 # Whether the consumer receives data from a partitioned producer. @@ -48,7 +48,11 @@ spring: # Consumer (input) and producer (output) are Spring Cloud Stream applications :) headerMode: embeddedHeaders maxAttempts: 5 + + # Reconect time to Kafka servers. backOffInitialInterval: 1000 + backOffMaxInterval: 10000 + backOffMultiplier: 2.0 # When set to a negative value, it will default to spring.cloud.stream.instanceCount instanceCount: -1 @@ -66,6 +70,12 @@ spring: offsetUpdateTimeWindow: 3000 + # This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that + # the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. + # This is equivalent to the acks=-1 setting. + # I DO NOT THINK THIS CONFIGURATION APPLIES TO consumers... IT IS ONLY FOR PRODUCERS... + requiredAcks: -1 + # It applies when using autoCreateTopics and autoAddPartitions. # minPartitionCount: 1 @@ -81,13 +91,14 @@ spring: # Partitions will be created by me on the Kafka cluster/servers autoAddPartitions: false - bindings: - input: - consumer: - resetOffsets: true - startOffset: latest - autoCommitOffset: true - autoCommitOnError: false + bindings: + input: + consumer: + resetOffsets: true + startOffset: latest + autoCommitOffset: true + # offsetUpdateTimeWindow could be a problem... It does not matter if there is error, every 3secs. offsets are saved :/ + autoCommitOnError: false -- 2.1.4