Hi!
We're having a Kafka 3.4.1 cluster in use which we access through a Scala Library "zio-Kafka".
This lib itself uses the Java Client in Version 3.6.1
We have to authenticate with the broker through Kerberos/SAML.
It appears that the broker regularly queries LDAP to see if the token is still valid.
Sometimes this call is taking too long, and in our client we see this log message:
[Producer clientId=producer-1] Connection to node 1546333927 (xxx.acme.corp/10.123.45.181:6668) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
The broker logs this
Potential performance problem: getGroups(user=yyacme_abc_krb) took 36492 milliseconds.
I'm tasked with making our code robust against this kind of failure (including writing a test)
and thus have to see if I need to adjust our code or the zio-Kafka lib.
For creating a test reproducing the issue , I need to create a KafkaConsumer with a customized KafkaConsumerClient & KafkaClient/NetworkClient.
In the 3.6.1 codebase, the problem is now the usage of static imports which prevent me from overriding how clients are created, e.g.
https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L714
Looks like one could override how to create the consumerNetworkClient, but that is not possible, as "createConsumerNetworkClient" is not a method in KafkaConsumer but a statically imported method from ConsumerUtils.
Also all fields in KafkaConsumer are marked private, which prevent me from using them for just creating a custom KafkaConsumer using the testing constructor
https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L786
So KafkaConsumer#L714 calls into a static method in the helper class ConsumerUtils which itself again calls a static method in the ClientUtils class. This is making it very hard/seemingly impossible to swap out the ConsumerNetworkClient with a mock/stub.
I've seen that the general structure of KafkaConsumer has changed significantly in trunk,
with having a ConsumerDelegate as well as a ConsumerDelegateCreator.
However, it looks like the usage of static helper methods was just moved to these classes, making it now even harder to customize a KafkaConsumer.
Any pointers on how I can create a KafkaConsumer with a custom NetworkClient (where I can
control/simulate the above mentioned auth issue - after initial authentication worked) would be highly appreciated!
My next approach would be using reflection to get access to the fields - I would like to avoid that if somehow possible.
Thanks a lot!
Dominik
Dominik Dorn
https://dominikdorn.com/ | https://twitter.com/domdorn
XING: https://www.xing.com/profile/Dominik_Dorn
LINKEDIN: https://www.linkedin.com/in/dominik-dorn/
We're having a Kafka 3.4.1 cluster in use which we access through a Scala Library "zio-Kafka".
This lib itself uses the Java Client in Version 3.6.1
We have to authenticate with the broker through Kerberos/SAML.
It appears that the broker regularly queries LDAP to see if the token is still valid.
Sometimes this call is taking too long, and in our client we see this log message:
[Producer clientId=producer-1] Connection to node 1546333927 (xxx.acme.corp/10.123.45.181:6668) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
The broker logs this
Potential performance problem: getGroups(user=yyacme_abc_krb) took 36492 milliseconds.
I'm tasked with making our code robust against this kind of failure (including writing a test)
and thus have to see if I need to adjust our code or the zio-Kafka lib.
For creating a test reproducing the issue , I need to create a KafkaConsumer with a customized KafkaConsumerClient & KafkaClient/NetworkClient.
In the 3.6.1 codebase, the problem is now the usage of static imports which prevent me from overriding how clients are created, e.g.
https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L714
Looks like one could override how to create the consumerNetworkClient, but that is not possible, as "createConsumerNetworkClient" is not a method in KafkaConsumer but a statically imported method from ConsumerUtils.
Also all fields in KafkaConsumer are marked private, which prevent me from using them for just creating a custom KafkaConsumer using the testing constructor
https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L786
So KafkaConsumer#L714 calls into a static method in the helper class ConsumerUtils which itself again calls a static method in the ClientUtils class. This is making it very hard/seemingly impossible to swap out the ConsumerNetworkClient with a mock/stub.
I've seen that the general structure of KafkaConsumer has changed significantly in trunk,
with having a ConsumerDelegate as well as a ConsumerDelegateCreator.
However, it looks like the usage of static helper methods was just moved to these classes, making it now even harder to customize a KafkaConsumer.
Any pointers on how I can create a KafkaConsumer with a custom NetworkClient (where I can
control/simulate the above mentioned auth issue - after initial authentication worked) would be highly appreciated!
My next approach would be using reflection to get access to the fields - I would like to avoid that if somehow possible.
Thanks a lot!
Dominik
Dominik Dorn
https://dominikdorn.com/ | https://twitter.com/domdorn
XING: https://www.xing.com/profile/Dominik_Dorn
LINKEDIN: https://www.linkedin.com/in/dominik-dorn/
Comments
Post a Comment