Skip to main content

Posts

Showing posts from April, 2021

Windowing and Aggregation across windows

Does aggregation happen per window..what if I want to use the agg result of a prev window on the next window?Is it possible? I have a one second window with grace period 5 mins.. When my first window expires , it will have some aggregate value and store it to another topic. Now, when my second window expires, I want it to add the first window aggr + second window agg and then store it in another topic. Similarly when my third window expires, I want to add its own aggr + overall agg till now(here, 1st window + 2nd window) and then store in another topic. Is this possible in kafka streams? Also will window expiration occur in the same order?Like 1st window expires first, then second,then third and so on.. Thanks and Regards Saket

Identify clients that leak fetch sessions in a Kafka cluster

In one of our Kafka clusters we noticed that fetch sessions are being evicted and lots of clients print `FETCH_SESSION_ID_NOT_FOUND` log messages. We tried to increase max.incremental.fetch.session.cache.slots from 1k to 10k in the brokers but the slots were immediately used up again and slots were evicted as well. We assume it's caused by one or more clients that leak sessions but we have no clue which one it might be. Is there a good way to figure it out - ideally something like listing the fetch sessions by client.id , sasl username and/or ip address? Optionally, can we limit the number of fetch sessions either per client.id , IP address or SASL user? It's alarming that a single client can cause these issues (even though they are not critical) for the whole cluster. Best, Martin.

Identify clients that leak fetch sessions in a Kafka cluster

In one of our Kafka clusters we noticed that fetch sessions are being evicted and lots of clients print `FETCH_SESSION_ID_NOT_FOUND` log messages. We tried to increase max.incremental.fetch.session.cache.slots from 1k to 10k in the brokers but the slots were immediately used up again and slots were evicted as well. We assume it's caused by one or more clients that leak sessions but we have no clue which one it might be. Is there a good way to figure it out - ideally something like listing the fetch sessions by client.id , sasl username and/or ip address? Optionally, can we limit the number of fetch sessions either per client.id , IP address or SASL user? It's alarming that a single client can cause these issues (even though they are not critical) for the whole cluster. Best, Martin.

Re: Kafka S3 Connector: Sort by a field within a partition

Yes, of course, if 'sort by within a partition' is not available in the Connector we will start without it BUT was wondering if it is - OR - if someone has a better idea. On Thu, Apr 29, 2021 at 12:32 PM Mich Talebzadeh < mich.talebzadeh@gmail.com > wrote: > Well, you will need to set-up a pilot, test these scenarios and come up > with a minimal Viable Product (MVP) > > .It will be difficult to give accurate answers but would have thought that > you could have written your processed (enriched dataframe) to a database > table partitioned by hourly rate and then go from there. > > HTH > > > view my Linkedin profile > < https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/ > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is...

Re: Kafka S3 Connector: Sort by a field within a partition

Well, you will need to set-up a pilot, test these scenarios and come up with a minimal Viable Product (MVP) .It will be difficult to give accurate answers but would have thought that you could have written your processed (enriched dataframe) to a database table partitioned by hourly rate and then go from there. HTH view my Linkedin profile < https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/ > *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 29 Apr 2021 at 20:17, Eric Beabes < mailinglists19@gmail.com > wrote: > Correct. Question you've asked seems to be the one we're looking an answer > for. > > If we set the process...

Re: Kafka S3 Connector: Sort by a field within a partition

Correct. Question you've asked seems to be the one we're looking an answer for. If we set the processingTime to 60 minutes that will require tons of memory, right? What happens if the batch fails? Reprocess for the same hour? Not sure if this is the right approach. That's why we're thinking we will keep processing time small but use Kafka Connector to copy messages to hourly partitions. Other ideas are welcomed! On Thu, Apr 29, 2021 at 11:30 AM Mich Talebzadeh < mich.talebzadeh@gmail.com > wrote: > Let say that you have your readStream created in SSS with socket format. > > Now you want to process these files/messages > > result = streamingDataFrame.select( \ > ..... > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > foreachBatch(send...

Re: Kafka S3 Connector: Sort by a field within a partition

Let say that you have your readStream created in SSS with socket format. Now you want to process these files/messages result = streamingDataFrame.select( \ ..... writeStream. \ outputMode('append'). \ option("truncate", "false"). \ foreachBatch(sendToSink). \ trigger(processingTime='x seconds'). \ queryName('process name'). \ start() The crucial ones are in blue. sendToSink() function will have two important parameters def sendToSink(df, batchId): df is the DataFrame carrying your messages (as Dataframe) collected within that triggering interval (x seconds).batchId is a monolithically increasing number giving the running batchID. Within that sendToSink(), once you have the DataFrame you can do all sorts of slicing and dicing and write to...

Re: Kafka S3 Connector: Sort by a field within a partition

Source (devices) are sending messages to AWS SQS (Not Kafka). Each message contains the path of the file on S3. (We have no control on the source. They won't change the way it's being done.) SSS will be listening to the SQS queue. We are thinking SSS will read each SQS message, get the file location, read the file, clean up as needed & build a message that will be written to Kafka. Let's just say millions of messages will come in from SQS per minute into SSS. As for SSS, we are thinking batch window size could be 5 seconds (configurable). On Thu, Apr 29, 2021 at 10:43 AM Mich Talebzadeh < mich.talebzadeh@gmail.com > wrote: > Ok thanks for the info. > > One question I forgot to ask is what is the streaming interval that the > source is sending messages to Kafka to be processed inside SSS? For example > are these market data etc? > > HTH > > > > view my Linkedin profile > < https://www.linkedi...

Re: Kafka S3 Connector: Sort by a field within a partition

Ok thanks for the info. One question I forgot to ask is what is the streaming interval that the source is sending messages to Kafka to be processed inside SSS? For example are these market data etc? HTH view my Linkedin profile < https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/ > *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 29 Apr 2021 at 18:35, Eric Beabes < mailinglists19@gmail.com > wrote: > We're thinking Kafka will allow us to scale to billions of messages in a > day. That's the promise of Kafka, right? No other reason really. Main goal > is to "batch" the messages per hour, create file(s) on S3 whic...

Re: Kafka S3 Connector: Sort by a field within a partition

We're thinking Kafka will allow us to scale to billions of messages in a day. That's the promise of Kafka, right? No other reason really. Main goal is to "batch" the messages per hour, create file(s) on S3 which are sorted by device_id so that we can do more aggregations which can later be sliced & diced using UI. Feel free to suggest alternatives. Thanks. On Thu, Apr 29, 2021 at 10:22 AM Mich Talebzadeh < mich.talebzadeh@gmail.com > wrote: > Hi Eric, > > On your second point "Is there a better way to do this" > > You are going to use Spark Structured Streaming (SSS) to clean and enrich > the data and then push the messages to Kafka. > > I assume you will be using foreachBatch in this case. What purpose is there > for Kafka to receive the enriched data from SSS? Any other reason except > hourly partition of your data? > > HTH > > > > view my Linkedin profile > ...

Re: Kafka S3 Connector: Sort by a field within a partition

Hi Eric, On your second point "Is there a better way to do this" You are going to use Spark Structured Streaming (SSS) to clean and enrich the data and then push the messages to Kafka. I assume you will be using foreachBatch in this case. What purpose is there for Kafka to receive the enriched data from SSS? Any other reason except hourly partition of your data? HTH view my Linkedin profile < https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/ > *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 29 Apr 2021 at 18:07, Eric Beabes < mailinglists19@gmail.com > wrote: > We've a use case where lots of messages will come in ...

Kafka S3 Connector: Sort by a field within a partition

We've a use case where lots of messages will come in via AWS SQS from various devices. We're thinking of reading these messages using Spark Structured Streaming, cleaning them up as needed & saving each message on Kafka. Later we're thinking of using Kafka S3 Connector to push them to S3 on an hourly basis; meaning there will be a different directory for each hour. Challenge is that, within this hourly "partition" the messages need to be "sorted by" a certain field (let's say device_id). Reason being, we're planning to create an EXTERNAL table on it with BUCKETS on device_id. This will speed up the subsequent Aggregation jobs. Questions: 1) Does Kafka S3 Connector allow messages to be sorted by a particular field within a partition – or – do we need to extend it? 2) Is there a better way to do this?

kafka upgrade 2.0 - 2.7

Hi All, I am planning to do a rolling upgrade of our kafka cluster from 2.0 to kafka 2.7 I wanted to make sure that my assumptions about client compatibility are correct. After reading some documentation, I understood that after kafka broker 0.10.1 any java client should be supported and also all the clients that have the KIP-35 support. since I am on kafka 2.0, am I correct to say that all my clients should already match any of those combinations and should have no problems connecting to kafka 2.7 broker? We are not using any kafka streams. thanks very much for your help. David

Re: What's so special about 2,8,9,15,56,72 error codes?

Hi Nikita, The errors may be caused by the clients, but they are detected and announced by the server (broker). It is not always a server issue, the error codes sometimes indicate issues caused by the clients. However, it is the server that detects and notifies the clients of those issues when expectations are not met. Think of this as an ATM or bank teller scenario, where a customer goes to the bank or ATM to withdraw or deposit some money and the transaction has problems and the ATM or bank representative will inform the customer of the issue(s). If the customer enters an incorrect PIN, attempting to deposit counterfeit currency or perform any other invalid transaction, the notification comes from the bank and usually not the customer (client). Sometimes, the bank may be unable to perform the transaction because they don't have enough cash in the ATM in which case it is not the customer's fault but that of the bank (server). It is important to note here...

Re: Mirror Maker 2: Incoming messages on source and target kafka cluster mismatch after mirroring

We migrated data from 7 node source cluster to target 9 node cluster. MM source connector producer does not have any compression as i mentioned earlier uses default producer. We use ByteArrayConverter for both producer key and value, idempotency =true. Is there a chance the JMX metrics can go wrong ?. On Thu, Apr 29, 2021 at 12:09 AM fighter < kumarg.pavan@gmail.com > wrote: > We have did the kafka cluster migration from source kafka cluster to > target kafka cluster using MirrorMaker 2.5.1 in distributed mode using > kafka connect cluster. We see noticeable difference incoming messages rate > per sec on source and target. We also analyze that on kafka connect > producer has less output rate compared to consuming rate of > consumer.Details are given below. Do you suspect the message loss from > kafka connect worker ?. We have seen the offsetsync topic the snapshots and > data rate is quite well. Any help will be appreciated. > > K...

Re: Changing Replication Factor

Hi Marcus, 1. If you change REPLICATION_FACTOR_CONFIG without resetting the application (or deleting the changelog and repartition topics) and redeploy the Streams application, the replication factor of the internal topics will not change. The replication factor will only change for new deployments, i.e., Streams applications with a new application ID or Streams applications that where reset. In both cases the internal topics will be newly created. 2. Changing the replication factor of a topic directly on the brokers should be fine. Kafka Streams should not re-create the internal topics and not throw any exceptions. 3. Unfortunately, I do not know the answer to this question. Hopefully somebody else can answer it. I answered your questions to the best of my knowledge. The only way to confirm my answers is to test (preferably in a test environment). Best, Bruno On 28.04.21 17:34, Marcus Horsley-Rai wrote: > Hi All, > > I'm in a s...

Re: Spark Streams vs Kafka Streams

Spark Structured Streaming has some significant limitations compared to Kafka Streams. This one has always proved hard to overcome: "Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets." On Thu, 29 Apr. 2021, 8:13 am Parthasarathy, Mohan, < mparthas@hpe.com > wrote: > Matthias, > > I will create a KIP or ticket for tracking this issue. > > -thanks > Mohan > > > On 4/28/21, 1:01 PM, "Matthias J. Sax" < mjsax@apache.org > wrote: > > Feel free to do a KIP and contribute to Kafka! > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > > Or create a ticket for tracking. > > > -Matthias > > On 4/28/21 12:49 PM, Parthasarathy, Mohan wrote: > > Andrew, > > > > I am not sure I understand. We have built seve...

Re: Spark Streams vs Kafka Streams

Matthias, I will create a KIP or ticket for tracking this issue. -thanks Mohan On 4/28/21, 1:01 PM, "Matthias J. Sax" < mjsax@apache.org > wrote: Feel free to do a KIP and contribute to Kafka! https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Or create a ticket for tracking. -Matthias On 4/28/21 12:49 PM, Parthasarathy, Mohan wrote: > Andrew, > > I am not sure I understand. We have built several analytics applications. We typically use custom aggregations as they are not available directly in the library. > > -mohan > > > On 4/28/21, 12:12 PM, "Andrew Otto" < otto@wikimedia.org > wrote: > > I'd assume this is because Kafka Streams is positioned for building > streaming applications, rather than doing analytics, whereas Spark is more > often...

Re: Spark Streams vs Kafka Streams

Hi, "I'd assume this is because Kafka Streams is positioned for building streaming applications, rather than doing analytics, whereas Spark is more often used for analytics purposes." Well not necessarily the full picture. Spark can do both analytics and streaming, especially with Spark Structured Streaming. Spark Structured Streaming is the Apache Spark API that lets you express computation on streaming data *in the same way you express a batch computation on static data.* That is the strength of Spark. Spark supports Java, Scala and Python among others. Python or more specifically Pyspark is particularly popular with Data Science plus the conventional analytics. Structured Streaming Programming Guide - Spark 3.1.1 Documentation ( apache.org ) < https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html > There are two scenarios with Spark Structured Streaming. There are called *foreach* and *foreachBatch* operations a...

Re: Spark Streams vs Kafka Streams

> I am not sure I understand. We have built several analytics applications. We typically use custom aggregations as they are not available directly in the library. Oh for sure! I was answering this question: > . Is there any reason why it is not provided as part of the library ? And assuming that the reason was mainly that the developers building Kafka Streams aren't typically targeting analytics use cases in the same way that Spark is. Not that there is any reason those aggregations should not be in Kafka Streams, I'm sure that would be great! :) On Wed, Apr 28, 2021 at 3:50 PM Parthasarathy, Mohan < mparthas@hpe.com > wrote: > Andrew, > > I am not sure I understand. We have built several analytics applications. > We typically use custom aggregations as they are not available directly in > the library. > > -mohan > > > On 4/28/21, 12:12 PM, "Andrew Otto" < otto@wikimedia.org > wrote: ...

Re: Spark Streams vs Kafka Streams

Feel free to do a KIP and contribute to Kafka! https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Or create a ticket for tracking. -Matthias On 4/28/21 12:49 PM, Parthasarathy, Mohan wrote: > Andrew, > > I am not sure I understand. We have built several analytics applications. We typically use custom aggregations as they are not available directly in the library. > > -mohan > > > On 4/28/21, 12:12 PM, "Andrew Otto" < otto@wikimedia.org > wrote: > > I'd assume this is because Kafka Streams is positioned for building > streaming applications, rather than doing analytics, whereas Spark is more > often used for analytics purposes. > >

Re: Spark Streams vs Kafka Streams

Andrew, I am not sure I understand. We have built several analytics applications. We typically use custom aggregations as they are not available directly in the library. -mohan On 4/28/21, 12:12 PM, "Andrew Otto" < otto@wikimedia.org > wrote: I'd assume this is because Kafka Streams is positioned for building streaming applications, rather than doing analytics, whereas Spark is more often used for analytics purposes.

Re: Spark Streams vs Kafka Streams

Matthias, Once a Spark dataframe is created by reading the data from Kafka ( https://sparkbyexamples.com/spark/spark-streaming-with-kafka/ ) , you can use Spark SQL and all the aggregations that are shown in this page are valid. I feel that having this built into Kafka streams library would make it very easy. Thanks Mohan On 4/28/21, 12:00 PM, "Matthias J. Sax" < mjsax@apache.org > wrote: I am not familiar with all the details about Spark, however, the link you shared is for Spark SQL. I thought Spark SQL is for batch processing only? Personally, I would be open to add more built-in aggregations next to count(). It did not come up in the community so far, so there was no investment yet. -Matthias On 4/28/21 10:30 AM, Parthasarathy, Mohan wrote: > Hi, > > Whenever the discussion about what streaming framework to use for near-realtime analytics, there is normally a disc...

Re: Spark Streams vs Kafka Streams

I am not familiar with all the details about Spark, however, the link you shared is for Spark SQL. I thought Spark SQL is for batch processing only? Personally, I would be open to add more built-in aggregations next to count(). It did not come up in the community so far, so there was no investment yet. -Matthias On 4/28/21 10:30 AM, Parthasarathy, Mohan wrote: > Hi, > > Whenever the discussion about what streaming framework to use for near-realtime analytics, there is normally a discussion about Spark vs Kafka streaming. One of the points in favor of Spark streaming is the simple aggregations that are built-in. See here: https://sparkbyexamples.com/spark/spark-sql-aggregate-functions/ . When it comes to Kafka streams, there is boilerplate code for some of them. Is there any reason why it is not provided as part of the library ? I am unable to find any discussion on this topic. Are there any plans to provide such features in the Kafka streaming library ? ...

Mirror Maker 2: Incoming messages on source and target kafka cluster mismatch after mirroring

We have did the kafka cluster migration from source kafka cluster to target kafka cluster using MirrorMaker 2.5.1 in distributed mode using kafka connect cluster. We see noticeable difference incoming messages rate per sec on source and target. We also analyze that on kafka connect producer has less output rate compared to consuming rate of consumer.Details are given below. Do you suspect the message loss from kafka connect worker ?. We have seen the offsetsync topic the snapshots and data rate is quite well. Any help will be appreciated. Kafka connect Consumer enter image description here < https://i.stack.imgur.com/YQGga.png >. kafka connect Producer enter image description here < https://i.stack.imgur.com/asp4t.png > source kafka:enter image description here < https://i.stack.imgur.com/JrKmK.png > target kafka: enter image description here < https://i.stack.imgur.com/Umtx7.png > We notices Mirror maker tasks and workers has no issues. We ...

Re: What's so special about 2,8,9,15,56,72 error codes?

Thank you all for answers! Israel Ekpo, you clarification is really helpful for me. After studying protocol documentation closely, i indeed can agree with you about server side nature of error with types (8,8,15,56,72). But do you think error with code 2 ```CORRUPT_MESSAGE``` is a server side problem indicator? On 4/28/21 6:12 PM, Israel Ekpo wrote: > https://kafka.apache.org/protocol.html#protocol_error_codes > > According to the documentation, those numeric codes are special because > they are used within the Kafka protocol to indicate problems that are > observed at the server. > > These special numeric codes can be translated by the client into exceptions > or whatever the appropriate error handling mechanism in the client language. > > Unless you are implementing or maintaining a client library that interacts > with the server using the Kafka protocol, you do not have to be concerned > with those codes. > >...

Spark Streams vs Kafka Streams

Hi, Whenever the discussion about what streaming framework to use for near-realtime analytics, there is normally a discussion about Spark vs Kafka streaming. One of the points in favor of Spark streaming is the simple aggregations that are built-in. See here: https://sparkbyexamples.com/spark/spark-sql-aggregate-functions/ . When it comes to Kafka streams, there is boilerplate code for some of them. Is there any reason why it is not provided as part of the library ? I am unable to find any discussion on this topic. Are there any plans to provide such features in the Kafka streaming library ? Thanks Mohan

Changing Replication Factor

Hi All, I'm in a sub-optimal situation whereby I have some Kafka Streams apps deployed to production, but the default replication factor set on the brokers was 1 when they were first deployed. As such, any state store changelog topics, and re-partition topics therefore have RF 1 also. I'm familiar with the bin/kafka-reassign-partitions.sh tool and how to use that. I've since also found the streams replication.factor (StreamsConfig.REPLICATION_FACTOR_CONFIG) setting that can be supplied in the streams apps config. My questions are: 1. Will simply changing the value of REPLICATION_FACTOR_CONFIG and re-deploying have any effect on already-created internal topics? 2. Conversely, should I just change the RF of the internal topics using the re-assign-partitions tool? Is that safe to do whilst the apps are still running? 3. (not Streams-specific) If a broker that was the leader of partition(s) for a topic with RF 1 died (i.e. no repl...

Re: What's so special about 2,8,9,15,56,72 error codes?

https://kafka.apache.org/protocol.html#protocol_error_codes According to the documentation, those numeric codes are special because they are used within the Kafka protocol to indicate problems that are observed at the server. These special numeric codes can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Unless you are implementing or maintaining a client library that interacts with the server using the Kafka protocol, you do not have to be concerned with those codes. Your client library should understand the protocol and throw the appropriate exception/error condition while in use. See the following examples of how these errors are handled at the clients: https://kafka.apache.org/28/javadoc/org/apache/kafka/common/errors/package-summary.html https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/ErrorCode.cs https://github.com/edenhill/librdkafka/blob/mas...

Re: Kafka Streams - Out of Order Handling

Thanks very much for taking the time to answer, Matthias! Very much appreciated All the best, Marcus On Wed, Apr 7, 2021 at 10:22 PM Matthias J. Sax < mjsax@apache.org > wrote: > Sorry for late reply... > > > > I only see issues of out of order data in my re-partitioned topic as a > result of a rebalance happening. > > If you re-partition, you may actually see out-of-order data even if > there is no rebalance. In the end, during repartitioning you have > multiple upstream writers for the repartition topic and thus interleaved > writes per partition. > > Maybe it's not a issue during regular processing for your use case, as > your throughput seems to be tiny. > > > > I believe all stream threads across all app instances will pause > consuming whilst the rebalance is worked through. > > No really. (1) If a thread dies, it takes some time to detect that the > thread died and thus a...