Configuration and Javadoc improvements
authorGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 19 Feb 2017 22:03:30 +0000 (23:03 +0100)
committerGustavo Martin Morcuende <gu.martinm@gmail.com>
Sun, 19 Feb 2017 22:03:30 +0000 (23:03 +0100)
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/java/de/example/spring/kafka/InputOutputChannels.java
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/main/resources/application.yml
SpringJava/Kafka/SpringCloudStream/spring-stream-kafka-producer/src/test/java/de/example/spring/kafka/SenderIntegrationTest.java

index ab612ec..a803c2a 100644 (file)
@@ -6,13 +6,13 @@ import org.springframework.messaging.MessageChannel;
 import org.springframework.messaging.SubscribableChannel;
 
 /**
- * En lugar de esta definicion "custom" usaremos las dos que
- * vienen por defecto en:
+ * Instead of this "custom" definition we will use the ones
+ * already implemented by Spring:
  * org.springframework.cloud.stream.messaging.Sink
  * org.springframework.cloud.stream.messaging.Source
  *
- * Esta definicion custom se usaría igual que Sink y Source :)
- * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
+ * This "custom" definition would be used in the same way as Sink and Source :)
+ * Wherever you see Sink and Source you could replace them by this interface ;)
  */
 public interface InputOutputChannels {
 
index a397d64..d6a7e8b 100644 (file)
@@ -4,6 +4,7 @@ logging:
       springframework: info
     ROOT: info
 
+
 spring:
   main:
     banner-mode: "off"
@@ -11,30 +12,91 @@ spring:
     web-environment: false
   cloud:
     stream:
+    
+      # We will have 3 brokers and 1 partition. The partition will be replicated/copied in every broker. Just one broker will be the leader.
+      # Zookeeper is in charge of choosing the leader.
+      # If you want to consume data in order you need one consumer per partition (consumer could be a process or a thread in some Java process,
+      # so one Java process could be implementing multiple consumers by means of multiple threads)
+      # Consuming data in order: 1 partition + 1 consumer
+      # 
+      # See: https://kafka.apache.org/documentation/#intro_consumers
+      # However, if you require a total order over records this can be achieved with a topic that has only one partition, though
+      # this will mean only one consumer process per consumer group.
+      instanceCount: 1
+      # This is the binder with index 0 (only one consumer/Java process)
+      instanceIndex: 0
+      
       bindings:
         output:
           binder: kafka
-          destination: test
+          
+          # Topic
+          destination: mytopic
+          
           # Using Apache Avro with Schema Registry
           # contentType: application/*+avro
           contentType: application/json
-          # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
-          headerMode: embeddedHeaders
-      instanceCount: 1
-      instanceIndex: 0
+
+          
+          producer:
+            # payload will be translated to Product.class (I hope)
+            # For example, if I had 3 partitions. Product objects with the same key will go to the same partition.
+            # Otherwise if I had 3 partitions and no key, objects will be using different partitions in a random way.
+            partitionKeyExpression: payload.name
+            
+            # We have 3 brokers and 1 partition. This one partition will replicated/copied in every broker.
+            partitionCount: 1
+            
+            # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+            headerMode: embeddedHeaders
+            
+
+      
       kafka:
         binder:
-          zkNodes: "kafka:2181"
+          # Kafka cluster with 3 brokers
           brokers: "kafka:9092,kafka:9093,kafka:9094"
-          autoCreateTopics: true
+          # Zookeeper
+          zkNodes: "kafka:2181"
+          
+          # I THINK THIS CONFIGURATION IS WORTHLESS... IT IS NEVER USED...
+          # see: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/8362cc35691ff64ce5ad401f43ca141910255b76
+          # JUST IN CASE I AM GOING TO SKIP IT, JUST IN CASE THERE COULD BE SOME CONFLICT WITH autoCommitOnError :/
+          offsetUpdateTimeWindow: 3000
+          
+          # This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that
+          # the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
+          # This is equivalent to the acks=-1 setting.
+          requiredAcks: -1
+          
+          
+          # It applies when using autoCreateTopics and autoAddPartitions.
+          # minPartitionCount: 1
+          
+          # I have 3 brokers and 1 partition. This partition must be replicated/copied in the 3 brokers.
+          # This option is only useful when using autoCreateTopics (Kafka will create our topics based on this
+          # configuration) Because I do not like creating stuff in run time I will be using autoCreateTopics: false
+          # and this option does not apply :)
+          # replicationFactor: 3
+          
+          # Topics will be created by me on the Kafka cluster/servers
+          autoCreateTopics: false
+          # Partitions will be created by me on the Kafka cluster/servers
+          autoAddPartitions: false
+
+        bindings:
+          output:
+            producer:
+              sync: true
+              # No batch mode.
+              batchTimeout: 0
+          
+          
+      # Apache AVRO schema-registry    
       schemaRegistryClient: "http://localhost:8888/schema-registry/"
 
 # Health indicator for binders.
 management:
   health:
     binders:
-      enabled: true
-
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
+      enabled: true
\ No newline at end of file
index b864a28..3f8ed6b 100644 (file)
@@ -4,8 +4,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
-import java.io.StringWriter;
-import java.io.Writer;
 
 import javax.inject.Inject;
 
@@ -17,13 +15,13 @@ import org.springframework.cloud.stream.test.binder.MessageCollector;
 import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
 import org.springframework.messaging.Message;
 import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.junit4.SpringRunner;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-@RunWith(SpringJUnit4ClassRunner.class)
+@RunWith(SpringRunner.class)
 @SpringBootTest(classes = { Sender.class, TestSupportBinderAutoConfiguration.class })
 @DirtiesContext
 public class SenderIntegrationTest {