Update Kafka consumer configuration
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 19 Feb 2017 19:25:01 +0000 (20:25 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 19 Feb 2017 19:25:01 +0000 (20:25 +0100)
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml

index 95a8520..8979947 100644 (file)
@@ -40,7 +40,7 @@ spring:
           
           consumer:
             # In my case I have: 1 thread per consumer -> 1 partition per consumer
-            # Concurrenty is the number of threads.
+            # Concurrenty is the number of threads. Usually one thread/process per partition.
             concurrency: 1
             
             # Whether the consumer receives data from a partitioned producer.
@@ -48,7 +48,11 @@ spring:
             # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
             headerMode: embeddedHeaders
             maxAttempts: 5
+            
+            # Reconect time to Kafka servers.
             backOffInitialInterval: 1000
+            backOffMaxInterval: 10000
+            backOffMultiplier: 2.0
 
             # When set to a negative value, it will default to spring.cloud.stream.instanceCount
             instanceCount: -1
@@ -66,6 +70,12 @@ spring:
           
           offsetUpdateTimeWindow: 3000
           
+          # This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that
+          # the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
+          # This is equivalent to the acks=-1 setting.
+          # I DO NOT THINK THIS CONFIGURATION APPLIES TO consumers... IT IS ONLY FOR PRODUCERS...
+          requiredAcks: -1
+          
           
           # It applies when using autoCreateTopics and autoAddPartitions.
           # minPartitionCount: 1
@@ -81,13 +91,14 @@ spring:
           # Partitions will be created by me on the Kafka cluster/servers
           autoAddPartitions: false
           
-          bindings:
-            input:
-              consumer:
-                resetOffsets: true
-                startOffset: latest
-                autoCommitOffset: true
-                autoCommitOnError: false
+        bindings:
+          input:
+            consumer:
+              resetOffsets: true
+              startOffset: latest
+              autoCommitOffset: true
+              # offsetUpdateTimeWindow could be a problem... It does not matter if there is error, every 3secs. offsets are saved :/
+              autoCommitOnError: false