Skip to main content

Posts

Showing posts from April, 2020

Cant Able to start Kafka MirrorMaker

Hey Guys, I am trying to move data between one cluster to another cluster *Source* *Destination* *Zookeeper* 2181 2182 *Kafka* 9092 9091 *ConsumerProperties:* bootstrap.servers=localhost:9092 group.id =test-consumer-group auto.offset.rest=earliest *Producer Properties:* bootstrap.servers=localhost:9091 compression.type=none i am having topic in 9092 as actor which is from MySQL Sakila Schema actor table In the 9091 i don't have any topic ,so i try to migrate data from 9092 ->9091 it is showing like D:\kafka>.\bin\windows\kafka-mirror-maker.bat --consumer.config .\config\consumer.properties --producer.config .\config\producer.properties --whitelist actor WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of t...

Re: Kafka: Messages disappearing from topics, largestTime=0

Thank you very much for the help anyway. Best regards On Fri, May 1, 2020, 00:54 Liam Clarke-Hutchinson < liam.clarke@adscale.co.nz > wrote: > So the logs show a healthy shutdown, so we can eliminate that as an issue. > I would look next at the volume management during a rollout based on the > other error messages you had earlier about permission denied etc. It's > possible there's some journalled but not flushed changes in those time > indexes, but at this point we're getting into filesystem internals which > aren't my forte. But if you can temporarily disable the volume switching > and do a test roll out, see if you get the same problems or not, would help > eliminate it or confirm it. > > Sorry I can't help further on that. > > On Fri, May 1, 2020 at 5:34 AM JP MB < jose.brandao1994@gmail.com > wrote: > > > I took a bit because I needed logs of the server shutting down when this >...

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

thanks... will try with GlobalKTable. As a side question, I didn't really understand the significance of global state store which kind of works in a reverse way to local state store i.e. local state store is updated and then saved to changelog topic whereas in case of global state store the topic is updated first and then synced to global state store. Do these two work in sync i.e. the update to topic and global state store ? Say one stream thread updates the topic for global store and starts processing next event wherein the processor tries to read the global store which may not have been synced with the topic? On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax < mjsax@apache.org > wrote: > Yes. > > A `GlobalKTable` uses a global store internally. > > You can also use `StreamsBuilder.addGlobalStore()` or > `Topology.addGlobalStore()` to add a global store "manually". > > > -Matthias > > > On 4/30/20 7:42 A...

Re: Kafka: Messages disappearing from topics, largestTime=0

So the logs show a healthy shutdown, so we can eliminate that as an issue. I would look next at the volume management during a rollout based on the other error messages you had earlier about permission denied etc. It's possible there's some journalled but not flushed changes in those time indexes, but at this point we're getting into filesystem internals which aren't my forte. But if you can temporarily disable the volume switching and do a test roll out, see if you get the same problems or not, would help eliminate it or confirm it. Sorry I can't help further on that. On Fri, May 1, 2020 at 5:34 AM JP MB < jose.brandao1994@gmail.com > wrote: > I took a bit because I needed logs of the server shutting down when this > occurs. Here they are, I can see some errors: > https://gist.github.com/josebrandao13/e8b82469d3e9ad91fbf38cf139b5a726 > > Regarding systemd, the closest I could find to TimeoutStopSec was > DefaultTimeoutS...

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

-----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl6rS40ACgkQO4miYXKq /OhBag/8DbqKf2nweOFIgTLdWjEm/FtXTQriBUA6NYx4Ho4OMsQbVflgNZRPfNlI FFdufoKJ6ZW81GcU+5muPYoJuSWr4eNISXFL5+RGglyk6bhMo6nCqfi2c4srkr5n Pli5CEeIP1TAepXYSKFCbjjI0hSguMYiFU3ijGZ8FkIARRuyt8kbzyIWktwAUKop weR+u2yroIZPaaigb7ux/dPsUWw7W/SoMECNf/wdPZdwfZgH5/+Di2YrXQzjz8rL CAgJkxQcQimv2zefvgIrmJd8f6Mi33VCPfeHoDvVx+//r1wPhWTSYJgKiijuDjXv GJCteWYwlnr/7J49sIJrUjQ4GtzdM+5PjMpnflu2nGxzSaZDaWG59ovDj3y8wsT6 XDWsEsbHw2ie5p+LsrPa4xt8cV2Jj3zXE5tvqb/j6YjKfSA4urTplWYOudyTz+2d rjfJQsEC/7FmZA73CpkH2XEzDohBgL2oOV1gAmYsIDj9bNh7Fu+kd7Wgac00Bsl4 nlhHXmXV/AQIXwLhsfo3xBqaOVVY8sr5u38KYv9U2WF/lLezti8RsQ1LDcC0i4dd DCAT+HeAZH7m7JbVnD9sbnrZFdf58z+lo+nu1ktEnnLRDQw8nrG7jJfx1EW/429c ZCbcbd/TkdNDD5APlGZjDEMZzHWZInL8zgrTSftLN50I7T3iB+8= =vRLX -----END PGP SIGNATURE----- Yes. A `GlobalKTable` uses a global store internally. You can also use `StreamsBuilder.addGlobalStore()` or `Topology.addGlobalStore()` to add a...

Re: are kafka state stores global or local?

-----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl6rSx0ACgkQO4miYXKq /OgFgA/9GOd21+oyBLCUl5EYwxfoY2o4NPZXLWw2K2gY2DGeH9/RSlhOF96M/OQB qToHonZHL09NBt7+3vtNpqLL6OD7ofJ6hMb4bZ3ij17AFfH+cafySfHIyeomowv1 BgMZ2FyeCWzezFaE5N5kEvPRXO69OpLmRQC4DSDDZM5zgQXgaQlCGEyzBSdfG7uS 8wexZ359tUyEnI5efq0AXtrm0bE8zsABJ4T7C+uyCvXbSAlVhlFUpK6VMNvFrmPA +Xt6/H6TPCm2yIvvLCu4V/wngPNidhoE9sSj4FvF0dpK/Q5S+ml5ddkLq0zBqQXo t36N33m357QDS3P/2iV24m3hbnNzycvMQgp6Sb67UJnT0yWB4zTvZjYLmSGiZhIi bGf9pcRPMCdC4AFOcXwSEJ9ggGmfu1bgLu18IuI3QWZQUgPTQ51Ti9GcHVLBfLfa St1zVnBfp9vrswZ4AyQgfXNql6TwuvuSzDDBkimBeBP3rHx2f727ukVJW+d7B+LV 6DT8gfk+KYh9jIf+zbrazujXSH1ZCQx19sbBCPc86Mhby23wvCjaPMm5T8r2n4uW 0v+7yWU+pbZVdzjBQzTLvBrv2kNo3Cans8CW3w6P/vGIyWyxGoXeRQS9sTpp0A9Q MJM/hDFMvrVXVz7mTGL5GBfNkieEaTMy55+hqqFNRKWdg3W3jqA= =qAE+ -----END PGP SIGNATURE----- That is correct. For global stores, in fact you cannot put "arbitrary" data into the store. The `Processor` should _only_ take the input ...

Re: Metrics in Kafka Connect

This is what I did inside MM2's Connectors and Tasks. It works pretty well, but I'd certainly prefer if Connect exposed it's own Metrics API. Ryanne On Thu, Apr 30, 2020 at 2:57 PM GƩrald Quintana < gerald.quintana@gmail.com > wrote: > Hello, > > We developed a custom Kafka Connect implementation for a specific need, and > we would like to monitor its internals (request latency and rate, pool > usage....). > > Is it possible to publish custom metrics using the Kafka client metric > framework (org.apache.kafka.common.metrics.*) . We would like to reuse the > metric reporter defined at worker level and used for other connectors. > > Thanks, > Gerald >

Metrics in Kafka Connect

Hello, We developed a custom Kafka Connect implementation for a specific need, and we would like to monitor its internals (request latency and rate, pool usage....). Is it possible to publish custom metrics using the Kafka client metric framework (org.apache.kafka.common.metrics.*) . We would like to reuse the metric reporter defined at worker level and used for other connectors. Thanks, Gerald

Re: Kafka: Messages disappearing from topics, largestTime=0

I took a bit because I needed logs of the server shutting down when this occurs. Here they are, I can see some errors: https://gist.github.com/josebrandao13/e8b82469d3e9ad91fbf38cf139b5a726 Regarding systemd, the closest I could find to TimeoutStopSec was DefaultTimeoutStopUSec=1min 30s that looks to be 90seconds. I could not find any KillSignal or RestartKillSignal. You can see the output of systemctl show --all here: https://gist.github.com/josebrandao13/f2dd646fab19b19f127981fce92d78c4 Once again, thanks for the help. Em qui., 30 de abr. de 2020 Ć s 15:04, Liam Clarke-Hutchinson < liam.clarke@adscale.co.nz > escreveu: > I'd also suggest eyeballing your systemd conf to verify that someone hasn't > set a very low TimeoutStopSec, or that KillSignal/RestartKillSignal haven't > been configured to SIGKILL (confusingly named, imo, as the default for > KillSignal is SIGTERM). > > Also, the Kafka broker logs at shutdown look ve...

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

Thanks Matthias. Can you elaborate on the replicated caching layer part? When you say global stores, do you mean GlobalKTable created from a topic e.g. using StreamsBuilder.globalTable(String topic) method ? On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax < mjsax@apache.org > wrote: > It's not possible to modify state store from "outside". > > If you want to build a "replicated caching layer", you could use global > stores and write into the corresponding topics to update all stores. Of > course, those updates would be async. > > > -Matthias > > On 4/29/20 10:52 PM, Pushkar Deole wrote: > > Hi All, > > > > I am wondering if this is possible: i have been asked to use state stores > > as a general replicated cache among multiple instances of service > instances > > however the state store is created through streambuilder but is not > > actually modified through str...

Re: Kafka: Messages disappearing from topics, largestTime=0

I'd also suggest eyeballing your systemd conf to verify that someone hasn't set a very low TimeoutStopSec, or that KillSignal/RestartKillSignal haven't been configured to SIGKILL (confusingly named, imo, as the default for KillSignal is SIGTERM). Also, the Kafka broker logs at shutdown look very different if it shut down currently vs if it didn't. Could you perhaps put them in a Gist and email the link? Just trying to make sure basic assumptions are holding :) On Fri, 1 May 2020, 1:21 am JP MB, < jose.brandao1994@gmail.com > wrote: > Hi, > It's quite a complex script generated with ansible where we use a/b > deployment and honestly, I don't have full knowledge on it I can share the > general guidelines of what is done: > > > - Any old volumes (from previous releases are removed) (named with suffix > > '-old') > > - Detach the volumes attached to the old host > > - Stop the service in t...

Re: Kafka: Messages disappearing from topics, largestTime=0

Hi, It's quite a complex script generated with ansible where we use a/b deployment and honestly, I don't have full knowledge on it I can share the general guidelines of what is done: > - Any old volumes (from previous releases are removed) (named with suffix > '-old') > - Detach the volumes attached to the old host > - Stop the service in the old host - uses systemctl stop kafka > - Attempt to create a CNAME volume: this is a volume with the same name > that will be attached to the new box. Except for very first run, this task > is used to get the information about the existing volume. (no sufix) > - A new volume is created as copy of the CNAME volume (named with suffix > '-new') > - The new volume is attached to the host/vm (named with suffix '-new') > - The new volume is formated (except for very first run, its already > formated)(named with suffix '-new') > - The new volume is mounted (n...

Re: How to handle RebalanceInProgressException?

On 29.04.20 09:18, Benoit Delbosc wrote: > Hi > > On 28.04.20 21:05, Guozhang Wang wrote: >> Thanks for the explanation Ben. They are very helpful. >> >> Just to clarify on the context here: >> >> 1) Before Kafka 2.0 the poll(long) call make sure that the rebalance would >> be completed when the call returns, no matter how long it takes. Note this >> also includes the time for refreshing the metadata for the newly assigned >> partitions potentially. There are many user feedbacks that this long >> polling do not obey the passed in "long" timeout at all since it has to >> block until the rebalance + metadata refresh is completed. >> >> 2) So in Kafka 2.0 we introduced poll(Duration) which would practically be >> more strict in respecting the passed in timeout. It means, it could return >> while we are still in the middle of a rebalance. At the same time we >> dep...

Re: Kafka: Messages disappearing from topics, largestTime=0

Hi, It does look as index corruption... Can you post script that stops kafka? On Wednesday, April 29, 2020, 06:38:18 PM GMT+2, JP MB < jose.brandao1994@gmail.com > wrote: > > Can you try using the console consumer to display messages/keys and > timestamps ? > --property print.key=true --property print.timestamp=true There are a lot off messages so I'm picking an example without and with timeindex entry. All of them have a null key: Offset 57 CreateTime:1588074808027 Key:null  - no time index Offset 144 CreateTime:1588157145655 Key:null - has time index Hmm, how are you doing your rolling deploys? It's rollout deployment, we take one node down and spin up another a new one. One at a time. I'm wondering if the time indexes are being corrupted by unclean > shutdowns. I've been reading code and the only path I could find that led > to a largest > timestamp of 0 was, as you've discovered, where t...

Re: Kafka Mirror Maker 2

Hi Himanshu Can u pls tell how to use MM2.. I am using Apache Kafka,in this normal mirror maker is only available.. Most of the people saying like to use MM2 but I didn't able to know where to get that MM2. Is it related to Apache or from some other Distributors? Can u pls explain how to install that version On Thu, Apr 30, 2020, 01:35 Himanshu Tyagi < tyagi.himanshu90@gmail.com > wrote: > Hey Team, > I've a few doubts regarding how the producers work after failover in Mirror > Maker 2 > > Let's say that we have two clusters K1 and K2 and configured MM2 > replication for TOPIC1 (originally created in just K1). > > We configured the active-active replication: > > K1->K2.enabled = true > K2->K1.enabled = true > K1->K2.topics = .* > K2->K1.topics = .* > > On starting mirror maker 2, I see that topics are replicated from cluster > K1 > to K2 in the naming format K1.topic_...

Re: are kafka state stores global or local?

Thanks Matthias ! my question (So, does it mean that the state store modified locally by each application is replicated to all other applications) was mainly about "global state store", As I understand it from your previous response, the state for global state store would be replicated to all instances, please correct me if wrong On Thu, Apr 30, 2020 at 12:42 PM Matthias J. Sax < mjsax@apache.org > wrote: > > Thanks for the information. So, does it mean that the state store > modified > >> locally by each application is replicated to all other applications? > > No. As long as the application runs "normally" (ie, without failure), > state is only maintained by one instance. In general, only in case of > failure, state would be migrated. > > You can configure "standby tasks" to pro-actively maintain state on > other instances, but it would not be on _all_ instances (depending on > your ...

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

-----BEGIN PGP SIGNATURE----- iQIzBAEBCAAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl6qeuMACgkQO4miYXKq /Oi2+A//ftVA1eEasbi++XC0/7QnX0LMJc375izpCr6A7Piuyx6lZo/h1NiARwEu 96Z/HVnl/KRL6H9xHUpY4lEM7yvs+qwrFYcsWbxqa05TIaOmtY977WRB/mAnxU7C GP/UAuIJ6kjygCwHwFvjfSQkhGYTmAqopMdA+guVxxlWuIEmH2ZqCbn7kN1h8vlD oPD01ss4nJLWNcpERauQDkFB/23xSOxqE1tTzK0tb3MaEB/qRWJaCUFWMVXMOGCJ R9GIVV/R+KcqYObMMo2HS2TOb9rDNmyisx9VIFWyEe8pfsgOMh1IZ0D5mHN9b10H EWJRnQZ5s//goAfYZdQXNUE3QL0XiIjTWGMWE814JnVHINPt1coqQzdndgD8dFJS Y034lPowaXltvZl0H7EV27WHO2Hvf6r9AO7i4dLZb8Apx8lillKsWotoi8a863wy 718ICRUixRrHh0e+BEoqaf+88dL+9jpSIaGJEeh7hyhqLmQ0omibMqB9aG8r1mHJ mP8QyupYov0Y7vu97Fv/SZji/UkfWtrKneRKv0uLW/CUJPYhR7uvZKclxTIT5gGR AsSZtqGIh13dw/6YgNzR1sPzFZGGYWDwQF58xBvRuMlgPNR5Ax7FrwGmHZ2vNOAI KyO0J6I6DJ/oLopIHzrWo52ZmNd54bf0x8uP8MQ3d0coL7J5664= =x8um -----END PGP SIGNATURE----- It's not possible to modify state store from "outside". If you want to build a "replicated caching layer", you could use glo...