import org.springframework.messaging.SubscribableChannel;
/**
- * En lugar de esta definicion "custom" usaremos las dos que
- * vienen por defecto en:
+ * Instead of this "custom" definition we will use the ones
+ * already implemented by Spring:
* org.springframework.cloud.stream.messaging.Sink
* org.springframework.cloud.stream.messaging.Source
*
- * Esta definicion custom se usaría igual que Sink y Source :)
- * Donde veas Sink y Source podrías haber puesto esto en su lugar ;)
+ * This "custom" definition would be used in the same way as Sink and Source :)
+ * Wherever you see Sink and Source you could replace them by this interface ;)
*/
public interface InputOutputChannels {
springframework: info
ROOT: info
+
spring:
main:
banner-mode: "off"
web-environment: false
cloud:
stream:
+
+ # We will have 3 brokers and 1 partition. The partition will be replicated/copied in every broker. Just one broker will be the leader.
+ # Zookeeper is in charge of choosing the leader.
+ # If you want to consume data in order you need one consumer per partition (consumer could be a process or a thread in some Java process,
+ # so one Java process could be implementing multiple consumers by means of multiple threads)
+ # Consuming data in order: 1 partition + 1 consumer
+ #
+ # See: https://kafka.apache.org/documentation/#intro_consumers
+ # However, if you require a total order over records this can be achieved with a topic that has only one partition, though
+ # this will mean only one consumer process per consumer group.
+ instanceCount: 1
+ # This is the binder with index 0 (only one consumer/Java process)
+ instanceIndex: 0
+
bindings:
output:
binder: kafka
- destination: test
+
+ # Topic
+ destination: mytopic
+
# Using Apache Avro with Schema Registry
# contentType: application/*+avro
contentType: application/json
- # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
- headerMode: embeddedHeaders
- instanceCount: 1
- instanceIndex: 0
+
+
+ producer:
+ # payload will be translated to Product.class (I hope)
+ # For example, if I had 3 partitions. Product objects with the same key will go to the same partition.
+ # Otherwise if I had 3 partitions and no key, objects will be using different partitions in a random way.
+ partitionKeyExpression: payload.name
+
+ # We have 3 brokers and 1 partition. This one partition will replicated/copied in every broker.
+ partitionCount: 1
+
+ # Consumer (input) and producer (output) are Spring Cloud Stream applications :)
+ headerMode: embeddedHeaders
+
+
+
kafka:
binder:
- zkNodes: "kafka:2181"
+ # Kafka cluster with 3 brokers
brokers: "kafka:9092,kafka:9093,kafka:9094"
- autoCreateTopics: true
+ # Zookeeper
+ zkNodes: "kafka:2181"
+
+ # I THINK THIS CONFIGURATION IS WORTHLESS... IT IS NEVER USED...
+ # see: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/8362cc35691ff64ce5ad401f43ca141910255b76
+ # JUST IN CASE I AM GOING TO SKIP IT, JUST IN CASE THERE COULD BE SOME CONFLICT WITH autoCommitOnError :/
+ offsetUpdateTimeWindow: 3000
+
+ # This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that
+ # the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
+ # This is equivalent to the acks=-1 setting.
+ requiredAcks: -1
+
+
+ # It applies when using autoCreateTopics and autoAddPartitions.
+ # minPartitionCount: 1
+
+ # I have 3 brokers and 1 partition. This partition must be replicated/copied in the 3 brokers.
+ # This option is only useful when using autoCreateTopics (Kafka will create our topics based on this
+ # configuration) Because I do not like creating stuff in run time I will be using autoCreateTopics: false
+ # and this option does not apply :)
+ # replicationFactor: 3
+
+ # Topics will be created by me on the Kafka cluster/servers
+ autoCreateTopics: false
+ # Partitions will be created by me on the Kafka cluster/servers
+ autoAddPartitions: false
+
+ bindings:
+ output:
+ producer:
+ sync: true
+ # No batch mode.
+ batchTimeout: 0
+
+
+ # Apache AVRO schema-registry
schemaRegistryClient: "http://localhost:8888/schema-registry/"
# Health indicator for binders.
management:
health:
binders:
- enabled: true
-
-
-# ME QUEDA 7.3 Partitioning
-# Y OPCIONES A MOGOLLÓN EN 12.3 Configuration Options
+ enabled: true
\ No newline at end of file
import static org.junit.Assert.assertThat;
import java.io.IOException;
-import java.io.StringWriter;
-import java.io.Writer;
import javax.inject.Inject;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import org.springframework.test.context.junit4.SpringRunner;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-@RunWith(SpringJUnit4ClassRunner.class)
+@RunWith(SpringRunner.class)
@SpringBootTest(classes = { Sender.class, TestSupportBinderAutoConfiguration.class })
@DirtiesContext
public class SenderIntegrationTest {