I've found that Kafka Connect never respects the "target.cluster.bootstrap.servers" configuration in the MirrorMaker2 task config. It always uses the Kafka Connect broker information instead. Running Kafka Connect on the source cluster causes an infinite loop of messages read from the source cluster, then written back to the same topic on the source cluster when using an IdentityReplicationPolicy. Running Kafka Connect on a third cluster causes the messages to get written to the Kafka Connect cluster, not the configured target cluster. Below are the scenarios I tested, and an example of the Kafka Connect task settings used. The only scenario that produced the correct result is running Kafka Connect on the target server.
Is this a hard requirement? Am I misunderstanding how the MM2 configs get used in Kafka Connect? We generally recommend that for MirrorMaker2 applications, users run Kafka Connect against the "target" Kafka cluster to help minimize network latency for the producers. However, in some scenarios it makes sense to run Kafka Connect against the "source" Kafka cluster, or even a third, unrelated Kafka cluster. This is because we don't always have control over topic creation in the source/target clusters, and want MirrorMaker2 to only replicate data/offsets to / from existing topics.
connect-distributed.properties:
bootstrap.servers=source.broker.address:9092
group.id=demo-loop
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets-demo-loop
offset.storage.replication.factor=3
config.storage.topic=connect-configs-demo-loop
config.storage.replication.factor=3
status.storage.topic=connect-status-demo-loop
status.storage.replication.factor=3
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
Kafka Connect MM2 task config:
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"clusters": "msksource,mskdest",
"source.cluster.alias": "msksource",
"target.cluster.alias": "mskdest",
"target.cluster.bootstrap.servers": "target.broker.address:9092",
"source.cluster.bootstrap.servers": "source.broker.address:9092",
"topics": "example-topic",
"tasks.max": "1",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.factor": "3",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.acls.interval.seconds": "600",
"sync.topic.configs.interval.seconds": "600",
"refresh.topics.interval.seconds": "300",
"refresh.groups.interval.seconds": "20",
"producer.enable.idempotence":"true",
"consumer.group.id": "mm2-msc",
"source.cluster.max.poll.records" : "50000",
"source.cluster.receive.buffer.bytes" : "33554432",
"source.cluster.send.buffer.bytes" : "33554432",
"source.cluster.max.partition.fetch.bytes" : "33554432",
"source.cluster.message.max.bytes" : "37755000",
"source.cluster.compression.type" : "gzip",
"source.cluster.max.request.size" : "26214400",
"source.cluster.buffer.memory" : "524288000",
"source.cluster.batch.size" : "524288",
"target.cluster.max.poll.records" : "20000",
"target.cluster.receive.buffer.bytes" : "33554432",
"target.cluster.send.buffer.bytes" : "33554432",
"target.cluster.max.partition.fetch.bytes" : "33554432",
"target.cluster.message.max.bytes" : "37755000",
"target.cluster.compression.type" : "gzip",
"target.cluster.max.request.size" : "26214400",
"target.cluster.buffer.memory" : "524288000",
"target.cluster.batch.size" : "52428"
}
Test
Kafka Connect Server
Kafka Connect/ MM2 Version
Offset Sync Location
Source Cluster Version
Target Cluster Version
Result
Control (BNSF config)
Source
3.8.1
Source
3.5.1
2.7.0
Infinite loop
1
Source
3.8.1
Target
3.5.1
2.7.0
Infinite loop
2
Source
3.8.1
Source
3.5.1
3.5.1
Infinite loop
3
Source
3.9.0
Source
3.5.1
3.5.1
Infinite loop
4
Source
3.7.1
Source
3.5.1
3.5.1
Infinite loop
5
Source
3.6.0
Source
3.5.1
3.5.1
Infinite loop
6
Source
2.7.1
Source
3.5.1
3.5.1
Infinite loop
7
Source
3.8.1
Source, and tested with various other config changes (e.g. varying how target.cluster.bootstrap.servers setting is provided)
3.5.1
3.5.1
Infinite loop
8
A Third MSK Server
3.8.1
Source
3.5.1
3.5.1
Data replicated to the third server, not the target OR source
9
Target
3.8.1
Source
3.5.1
3.5.1
Correct behavior (source replicated to target)
Is this a hard requirement? Am I misunderstanding how the MM2 configs get used in Kafka Connect? We generally recommend that for MirrorMaker2 applications, users run Kafka Connect against the "target" Kafka cluster to help minimize network latency for the producers. However, in some scenarios it makes sense to run Kafka Connect against the "source" Kafka cluster, or even a third, unrelated Kafka cluster. This is because we don't always have control over topic creation in the source/target clusters, and want MirrorMaker2 to only replicate data/offsets to / from existing topics.
connect-distributed.properties:
bootstrap.servers=source.broker.address:9092
group.id=demo-loop
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets-demo-loop
offset.storage.replication.factor=3
config.storage.topic=connect-configs-demo-loop
config.storage.replication.factor=3
status.storage.topic=connect-status-demo-loop
status.storage.replication.factor=3
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
Kafka Connect MM2 task config:
{
"name": "mm2-msc",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
"clusters": "msksource,mskdest",
"source.cluster.alias": "msksource",
"target.cluster.alias": "mskdest",
"target.cluster.bootstrap.servers": "target.broker.address:9092",
"source.cluster.bootstrap.servers": "source.broker.address:9092",
"topics": "example-topic",
"tasks.max": "1",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"replication.factor": "3",
"offset-syncs.topic.replication.factor": "3",
"sync.topic.acls.interval.seconds": "600",
"sync.topic.configs.interval.seconds": "600",
"refresh.topics.interval.seconds": "300",
"refresh.groups.interval.seconds": "20",
"producer.enable.idempotence":"true",
"consumer.group.id": "mm2-msc",
"source.cluster.max.poll.records" : "50000",
"source.cluster.receive.buffer.bytes" : "33554432",
"source.cluster.send.buffer.bytes" : "33554432",
"source.cluster.max.partition.fetch.bytes" : "33554432",
"source.cluster.message.max.bytes" : "37755000",
"source.cluster.compression.type" : "gzip",
"source.cluster.max.request.size" : "26214400",
"source.cluster.buffer.memory" : "524288000",
"source.cluster.batch.size" : "524288",
"target.cluster.max.poll.records" : "20000",
"target.cluster.receive.buffer.bytes" : "33554432",
"target.cluster.send.buffer.bytes" : "33554432",
"target.cluster.max.partition.fetch.bytes" : "33554432",
"target.cluster.message.max.bytes" : "37755000",
"target.cluster.compression.type" : "gzip",
"target.cluster.max.request.size" : "26214400",
"target.cluster.buffer.memory" : "524288000",
"target.cluster.batch.size" : "52428"
}
Test
Kafka Connect Server
Kafka Connect/ MM2 Version
Offset Sync Location
Source Cluster Version
Target Cluster Version
Result
Control (BNSF config)
Source
3.8.1
Source
3.5.1
2.7.0
Infinite loop
1
Source
3.8.1
Target
3.5.1
2.7.0
Infinite loop
2
Source
3.8.1
Source
3.5.1
3.5.1
Infinite loop
3
Source
3.9.0
Source
3.5.1
3.5.1
Infinite loop
4
Source
3.7.1
Source
3.5.1
3.5.1
Infinite loop
5
Source
3.6.0
Source
3.5.1
3.5.1
Infinite loop
6
Source
2.7.1
Source
3.5.1
3.5.1
Infinite loop
7
Source
3.8.1
Source, and tested with various other config changes (e.g. varying how target.cluster.bootstrap.servers setting is provided)
3.5.1
3.5.1
Infinite loop
8
A Third MSK Server
3.8.1
Source
3.5.1
3.5.1
Data replicated to the third server, not the target OR source
9
Target
3.8.1
Source
3.5.1
3.5.1
Correct behavior (source replicated to target)
Comments
Post a Comment