Improving consumer configuration
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 19 Feb 2017 18:03:49 +0000 (19:03 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 19 Feb 2017 18:03:49 +0000 (19:03 +0100)
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-consumer/src/main/resources/application.yml

index f4f8991..95a8520 100644 (file)
@@ -4,9 +4,6 @@ logging:
       springframework: info
     ROOT: info
 
-kafka:
-  topic: test
-  messageKey: keyTest
 
 spring:
   main:
@@ -15,33 +12,87 @@ spring:
     web-environment: false
   cloud:
     stream:
+        
+      # We will have 3 brokers and 1 partition. The partition will be replicated/copied in every broker. Just one broker will be the leader.
+      # Zookeeper is in charge of choosing the leader.
+      # If you want to consume data in order you need one consumer per partition (consumer could be a process or a thread in some Java process,
+      # so one Java process could be implementing multiple consumers by means of multiple threads)
+      # Consuming data in order: 1 partition + 1 consumer
+      # 
+      # See: https://kafka.apache.org/documentation/#intro_consumers
+      # However, if you require a total order over records this can be achieved with a topic that has only one partition, though
+      # this will mean only one consumer process per consumer group.
+      instanceCount: 1
+      # This is the binder with index 0 (only one consumer/Java process)
+      instanceIndex: 0
+      
       bindings:
         input:
           binder: kafka
-          destination: test
-          group: helloworld
+
+          # Topic          
+          destination: mytopic
+          group: mygroup
+          
+          # Using Apache Avro with Schema Registry
+          # contentType: application/*+avro
           contentType: application/json
-          startOffset: latest
+          
           consumer:
+            # In my case I have: 1 thread per consumer -> 1 partition per consumer
+            # Concurrenty is the number of threads.
             concurrency: 1
-            partitioned: false
+            
+            # Whether the consumer receives data from a partitioned producer.
+            partitioned: true
             # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
             headerMode: embeddedHeaders
-            maxAttempts: 3
+            maxAttempts: 5
             backOffInitialInterval: 1000
-      instanceCount: 1
-      instanceIndex: 0
+
+            # When set to a negative value, it will default to spring.cloud.stream.instanceCount
+            instanceCount: -1
+            # When set to a negative value, it will default to spring.cloud.stream.instanceIndex
+            instanceIndex: -1
+            
+
+      
       kafka:
         binder:
-          zkNodes: "kafka:2181"
+          # Kafka cluster with 3 brokers
           brokers: "kafka:9092,kafka:9093,kafka:9094"
-          autoCreateTopics: true
+          # Zookeeper (I think, it is not required in Kafka v0.10.1.1
+          zkNodes: "kafka:2181"
+          
+          offsetUpdateTimeWindow: 3000
+          
+          
+          # It applies when using autoCreateTopics and autoAddPartitions.
+          # minPartitionCount: 1
+          
+          # I have 3 brokers and 1 partition. This partition must be replicated/copied in the 3 brokers.
+          # This option is only useful when using autoCreateTopics (Kafka will create our topics based on this
+          # configuration) Because I do not like creating stuff in run time I will be using autoCreateTopics: false
+          # and this option does not apply :)
+          # replicationFactor: 3
+          
+          # Topics will be created by me on the Kafka cluster/servers
+          autoCreateTopics: false
+          # Partitions will be created by me on the Kafka cluster/servers
+          autoAddPartitions: false
+          
+          bindings:
+            input:
+              consumer:
+                resetOffsets: true
+                startOffset: latest
+                autoCommitOffset: true
+                autoCommitOnError: false
+              
+         
 
 # Health indicator for binders.
 management:
   health:
     binders:
-      enabled: true
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLĂ“N EN 12.3 Configuration Options
+      enabled: true
\ No newline at end of file