Skip to main content

Re: Many dups received by consumer (kafka_2.13)

Hello community! Hasn't anyone faced a similar problem? I see nobody can
give me advice on what's happening with our Kafka cluster. :(

пн, 9 нояб. 2020 г. в 10:57, Dev Op <dsd7150@gmail.com>:

> Hello all!
>
> Please, help me to understand why my consumer start receives the
> duplicates. I think it is because of problems on my kafka1 node.
>
> Cluster consists of three nodes: kafka1 (192.168.137.19, id=1),
> kafka2 (192.168.137.20, id=2), kafka3 ( 192.168.137.21, id=3)
> Version of Kafka: kafka_2.13-2.4.1
> Configs:
> - Broker config (server.properties from kafka1):
> https://pastebin.com/MR20rZdQ
> - Zookeeper config (zookeeper.properties from kafka1):
> https://pastebin.com/vCpFU0gp
>
> /opt/kafka_2.13-2.4.1/bin/kafka-topics.sh --describe --topic in_raw
> --zookeeper localhost:2181
> Topic: in_raw PartitionCount: 1 ReplicationFactor: 3 Configs:
> Topic: in_raw Partition: 0 Leader: 1 Replicas: 1,3,2
> Isr: 1,2,3
>
> Producer put one msg in `in_raw' topic msg, after it our consumer starts
> receive many dups from that topic every 10 minutes:
>
> The first duplicate occurrence was at 20:01:
> $ xzcat parsers.log-20201105.xz | perl -MData::Dumper -lne 'if
> (/(unitId=\d+, unitDate=\d+, msgNumber=\d+)/) { ++$a->{$1}; die "$_\n" if
> $a->{$1} > 1; }'
> 2020-11-04 20:01:47.173
> [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
> - PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073,
> unitDate=1604519428552, msgNumber=6948}
> ...
>
> A couple of record from log file:
> 2020-11-04 19:54:52.740
> [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
> - PARSE: Event{id='86cc792b-fb5e-4ebb-be49-7a51f3a1c954', unitId=1073,
> unitDate=1604519428552, msgNumber=6948}
> 2020-11-04 20:01:47.173
> [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
> - PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073,
> unitDate=1604519428552, msgNumber=6948}
> 2020-11-04 20:11:47.217
> [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
> - PARSE: Event{id='05a059e0-8002-48d0-b2da-269e42b879a0', unitId=1073,
> unitDate=1604519428552, msgNumber=6948}
> 2020-11-04 20:21:47.185
> [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1] ParserService
> - PARSE: Event{id='5b590bde-9e86-4660-8916-db4a590ba12e', unitId=1073,
> unitDate=1604519428552, msgNumber=6948}
> ..and so on.
>
> Something went wrong earlier at 19:50.
>
> Log from kafka1 broker:
>
> [2020-11-04 19:04:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
> expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2020-11-04 19:14:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
> expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2020-11-04 19:24:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
> expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2020-11-04 19:34:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
> expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2020-11-04 19:44:02,195] INFO [GroupMetadataManager brokerId=1] Removed 0
> expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2020-11-04 19:50:19,506] WARN Client session timed out, have not heard
> from server in 7997ms for sessionid 0x1000060d4310001
> (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:19,526] INFO Client session timed out, have not heard
> from server in 7997ms for sessionid 0x1000060d4310001, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:20,774] INFO Opening socket connection to server
> kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate
> using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:20,775] INFO Socket connection established, initiating
> session, client: /192.168.137.19:57606, server: kafka2.8m.local/
> 192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:22,776] WARN Client session timed out, have not heard
> from server in 2002ms for sessionid 0x1000060d4310001
> (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:22,776] INFO Client session timed out, have not heard
> from server in 2002ms for sessionid 0x1000060d4310001, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,360] INFO Opening socket connection to server
> kafka3.8m.local/192.168.137.21:2181. Will not attempt to authenticate
> using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,361] INFO Socket connection established, initiating
> session, client: /192.168.137.19:54702, server: kafka3.8m.local/
> 192.168.137.21:2181 (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,373] WARN Unable to reconnect to ZooKeeper service,
> session 0x1000060d4310001 has expired (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,373] INFO Unable to reconnect to ZooKeeper service,
> session 0x1000060d4310001 has expired, closing socket connection
> (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,378] INFO EventThread shut down for session:
> 0x1000060d4310001 (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,389] INFO [ZooKeeperClient Kafka server] Session
> expired. (kafka.zookeeper.ZooKeeperClient)
> [2020-11-04 19:50:23,434] INFO [ZooKeeperClient Kafka server] Initializing
> a new session to 192.168.137.19:2181,192.168.137.20:2181,
> 192.168.137.21:2181. (kafka.zookeeper.ZooKeeperClient)
> [2020-11-04 19:50:23,436] INFO Initiating client connection, connectString=
> 192.168.137.19:2181,192.168.137.20:2181,192.168.137.21:2181
> sessionTimeout=6000
> watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@1cbbffcd
> (org.apache.zookeeper.ZooKeeper)
> [2020-11-04 19:50:23,458] INFO jute.maxbuffer value is 4194304 Bytes
> (org.apache.zookeeper.ClientCnxnSocket)
> [2020-11-04 19:50:23,470] INFO zookeeper.request.timeout value is 0.
> feature enabled= (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,490] INFO Opening socket connection to server
> kafka1.8m.local/192.168.137.19:2181. Will not attempt to authenticate
> using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,491] INFO Socket connection established, initiating
> session, client: /192.168.137.19:46826, server: kafka1.8m.local/
> 192.168.137.19:2181 (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,641] INFO Creating /brokers/ids/1 (is it secure?
> false) (kafka.zk.KafkaZkClient)
> [2020-11-04 19:50:23,666] INFO Unable to read additional data from server
> sessionid 0x0, likely server has closed socket, closing socket connection
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,768] INFO [ZooKeeperClient Kafka server] Waiting
> until connected. (kafka.zookeeper.ZooKeeperClient)
> [2020-11-04 19:50:23,768] INFO [ZooKeeperClient Kafka server] Waiting
> until connected. (kafka.zookeeper.ZooKeeperClient)
> [2020-11-04 19:50:23,784] INFO Opening socket connection to server
> kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate
> using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,785] INFO Socket connection established, initiating
> session, client: /192.168.137.19:57614, server: kafka2.8m.local/
> 192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,998] INFO Session establishment complete on server
> kafka2.8m.local/192.168.137.20:2181, sessionid = 0x2000034d85c0003,
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2020-11-04 19:50:23,998] INFO [ZooKeeperClient Kafka server] Connected.
> (kafka.zookeeper.ZooKeeperClient)
> [2020-11-04 19:50:23,998] INFO [ZooKeeperClient Kafka server] Connected.
> (kafka.zookeeper.ZooKeeperClient)
> [2020-11-04 19:50:24,230] ERROR [KafkaApi-1] Error when handling request:
> clientId=3, correlationId=0, api=UPDATE_METADATA, version=6,
> body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],rack=null,_tagged_fields={}},{id=1,endpoints=[{port=9092,host=192.168.137.19,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}}
> (kafka.server.KafkaApis)
> java.lang.IllegalStateException: Epoch 17179870048 larger than current
> broker epoch 17179869253
> at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917)
> at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:269)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:133)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
> at java.lang.Thread.run(Thread.java:748)
> [2020-11-04 19:50:24,268] ERROR [KafkaApi-1] Error when handling request:
> clientId=3, correlationId=1, api=LEADER_AND_ISR, version=4,
> body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],_tagged_fields={}}],live_leaders=[{broker_id=3,host_name=192.168.137.21,port=9092,_tagged_fields={}},{broker_id=2,host_name=192.168.137.20,port=9092,_tagged_fields={}}],_tagged_fields={}}
> (kafka.server.KafkaApis)
> java.lang.IllegalStateException: Epoch 17179870048 larger than current
> broker epoch 17179869253
> at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917)
> at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:211)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:131)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
> at java.lang.Thread.run(Thread.java:748)
> [2020-11-04 19:50:24,277] ERROR [KafkaApi-1] Error when handling request:
> clientId=3, correlationId=2, api=UPDATE_METADATA, version=6,
> body={controller_id=3,controller_epoch=3,broker_epoch=17179870048,topic_states=[<snipped>],_tagged_fields={}}],live_brokers=[{id=3,endpoints=[{port=9092,host=192.168.137.21,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}},{id=2,endpoints=[{port=9092,host=192.168.137.20,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}},{id=1,endpoints=[{port=9092,host=192.168.137.19,listener=PLAINTEXT,security_protocol=0,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}}
> (kafka.server.KafkaApis)
> java.lang.IllegalStateException: Epoch 17179870048 larger than current
> broker epoch 17179869253
> at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2917)
> at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:269)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:133)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
> at java.lang.Thread.run(Thread.java:748)
> [2020-11-04 19:50:24,786] INFO Stat of the created znode at /brokers/ids/1
> is:
> 17179870048,17179870048,1604519424151,1604519424151,1,0,0,144115415044063235,200,0,17179870048
> (kafka.zk.KafkaZkClient)
> [2020-11-04 19:50:24,791] INFO Registered broker 1 at path /brokers/ids/1
> with addresses:
> ArraySeq(EndPoint(192.168.137.19,9092,ListenerName(PLAINTEXT),PLAINTEXT)),
> czxid (broker epoch): 17179870048 (kafka.zk.KafkaZkClient)
>
> Controller of kafka1 broker:
> [2020-11-04 19:50:23,442] DEBUG [Controller id=1] Resigning
> (kafka.controller.KafkaController)
> [2020-11-04 19:50:23,496] DEBUG [Controller id=1] Unregister
> BrokerModifications handler for Set() (kafka.controller.KafkaController)
> [2020-11-04 19:50:23,519] INFO [PartitionStateMachine controllerId=1]
> Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
> [2020-11-04 19:50:23,526] INFO [ReplicaStateMachine controllerId=1]
> Stopped replica state machine (kafka.controller.ZkReplicaStateMachine)
> [2020-11-04 19:50:23,560] INFO [Controller id=1] Resigned
> (kafka.controller.KafkaController)
> [2020-11-04 19:50:24,797] DEBUG [Controller id=1] Broker 3 has been
> elected as the controller, so stopping the election process.
> (kafka.controller.KafkaController)
>
> Zookeeper on kafka1:
> [2020-11-04 19:50:20,215] WARN Unable to read additional data from client
> sessionid 0x1000060d4310000, likely client has closed socket
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2020-11-04 19:50:20,215] WARN Unable to read additional data from client
> sessionid 0x1000060d4310001, likely client has closed socket
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2020-11-04 19:50:21,495] WARN Exception when following the leader
> (org.apache.zookeeper.server.quorum.Learner)
> java.io.EOFException
> at java.io.DataInputStream.readInt(DataInputStream.java:392)
> at org.apache.jute.BinaryInputArchive.readInt(BinaryInputArchive.java:84)
> at
> org.apache.zookeeper.server.quorum.QuorumPacket.deserialize(QuorumPacket.java:85)
> at
> org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:118)
> at org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:158)
> at
> org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:92)
> at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1253)
> [2020-11-04 19:50:22,000] INFO shutdown called
> (org.apache.zookeeper.server.quorum.Learner)
> java.lang.Exception: shutdown Follower
> at org.apache.zookeeper.server.quorum.Follower.shutdown(Follower.java:201)
> at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:1257)
> [2020-11-04 19:50:22,043] INFO Shutting down
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2020-11-04 19:50:22,043] INFO shutting down
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2020-11-04 19:50:22,059] INFO Shutting down
> (org.apache.zookeeper.server.quorum.FollowerRequestProcessor)
> [2020-11-04 19:50:22,060] INFO Shutting down
> (org.apache.zookeeper.server.quorum.CommitProcessor)
> [2020-11-04 19:50:22,139] INFO FollowerRequestProcessor exited loop!
> (org.apache.zookeeper.server.quorum.FollowerRequestProcessor)
> [2020-11-04 19:50:22,141] INFO CommitProcessor exited loop!
> (org.apache.zookeeper.server.quorum.CommitProcessor)
> [2020-11-04 19:50:22,218] INFO shutdown of request processor complete
> (org.apache.zookeeper.server.FinalRequestProcessor)
> [2020-11-04 19:50:22,637] INFO Shutting down
> (org.apache.zookeeper.server.SyncRequestProcessor)
> [2020-11-04 19:50:22,655] INFO SyncRequestProcessor exited!
> (org.apache.zookeeper.server.SyncRequestProcessor)
> [2020-11-04 19:50:22,658] WARN PeerState set to LOOKING
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2020-11-04 19:50:22,694] INFO LOOKING
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2020-11-04 19:50:23,044] INFO New election. My id = 1, proposed
> zxid=0x400000281 (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,170] INFO Notification: 2 (message format version), 1
> (n.leader), 0x400000281 (n.zxid), 0x5 (n.round), LOOKING (n.state), 1
> (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
> (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,293] INFO Notification: 2 (message format version), 3
> (n.leader), 0x300000020 (n.zxid), 0x4 (n.round), LEADING (n.state), 3
> (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
> (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,500] INFO Notification time out: 400
> (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,501] INFO Notification: 2 (message format version), 1
> (n.leader), 0x400000281 (n.zxid), 0x5 (n.round), LOOKING (n.state), 1
> (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
> (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,503] INFO Notification: 2 (message format version), 3
> (n.leader), 0x300000020 (n.zxid), 0x4 (n.round), LEADING (n.state), 3
> (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
> (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,665] WARN Exception causing close of session 0x0:
> ZooKeeperServer not running (org.apache.zookeeper.server.NIOServerCnxn)
> [2020-11-04 19:50:23,680] INFO Notification: 2 (message format version), 3
> (n.leader), 0x300000020 (n.zxid), 0x4 (n.round), FOLLOWING (n.state), 2
> (n.sid), 0x4 (n.peerEPoch), LOOKING (my state)0 (n.config version)
> (org.apache.zookeeper.server.quorum.FastLeaderElection)
> [2020-11-04 19:50:23,682] INFO FOLLOWING
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2020-11-04 19:50:23,707] INFO minSessionTimeout set to 4000
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2020-11-04 19:50:23,708] INFO maxSessionTimeout set to 40000
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2020-11-04 19:50:23,708] INFO Created server with tickTime 2000
> minSessionTimeout 4000 maxSessionTimeout 40000 datadir
> /data/zookeeper/version-2 snapdir /data/zookeeper/version-2
> (org.apache.zookeeper.server.ZooKeeperServer)
> [2020-11-04 19:50:23,719] INFO FOLLOWING - LEADER ELECTION TOOK - 30 MS
> (org.apache.zookeeper.server.quorum.Learner)
> [2020-11-04 19:50:23,786] INFO Getting a diff from the leader 0x40000035e
> (org.apache.zookeeper.server.quorum.Learner)
> [2020-11-04 19:50:23,798] WARN Got zxid 0x400000282 expected 0x1
> (org.apache.zookeeper.server.quorum.Learner)
> [2020-11-04 19:50:23,809] INFO Learner received NEWLEADER message
> (org.apache.zookeeper.server.quorum.Learner)
> [2020-11-04 19:50:23,818] INFO Learner received UPTODATE message
> (org.apache.zookeeper.server.quorum.Learner)
> [2020-11-04 19:50:23,831] INFO Configuring CommitProcessor with 4 worker
> threads. (org.apache.zookeeper.server.quorum.CommitProcessor)
> [2020-11-04 19:50:23,863] WARN Got zxid 0x40000035f expected 0x1
> (org.apache.zookeeper.server.quorum.Learner)
>
> I appreciate any help.
>
> --
> Regards,
> Denis
>


--
С уважением,
Денис

*Это сообщение и любые документы, приложенные к нему, содержат
конфиденциальную информацию. Уведомляем Вас о том, что использование,
копирование, распространение информации, содержащейся в настоящем
сообщении, запрещено.*

Comments