Hi Chris,
Thank you for the clarification. Now I see what you mean. If your
topology works correctly, I would not file it as a bug but as a
possible improvement.
Best,
Bruno
On Wed, Oct 30, 2019 at 1:20 AM Chris Toomey <ctoomey@gmail.com> wrote:
>
> Bruno,
>
> I'm using a fork based off the 2.4 branch .It's not the global consumer but
> the stream thread consumer that has the group id since it's built with the
> main consumer config:
> https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1051
> .
>
> It shouldn't be creating a regular consumer for the topic since my topology
> only has a single element, the GlobalKTable, which is populated by the
> global consumer. My scala code:
>
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start()
>
>
> I can disable the stream thread consumer by configuring num.stream.threads
> = 0, but why does it create this stream thread consumer in the first place
> if it's not been requested in the topology?
>
> thx,
> Chris
>
> On Tue, Oct 29, 2019 at 2:08 PM Bruno Cadonna <bruno@confluent.io> wrote:
>
> > Hi Chris,
> >
> > What version of Streams are you referring to?
> >
> > On the current trunk the group.id property is removed from the config
> > for the global consumer that populates the GlobalKTable.
> >
> > See the following code line
> >
> > https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1149
> >
> > Best,
> > Bruno
> >
> > On Tue, Oct 29, 2019 at 8:12 PM Chris Toomey <ctoomey@gmail.com> wrote:
> > >
> > > We have some simple Kafka streams apps that populate GlobalKTables to use
> > > as caches for topic contents. When running them with info-level logging
> > > enabled, I noticed unexpected activity around group coordination
> > (joining,
> > > rebalancing, leaving, rejoining) that I didn't expect given that they
> > need
> > > to consume from all topic partitions vs. use the group load balancing
> > > feature.
> > >
> > > I tracked this down to the way the consumer config. is generated for
> > > a GlobalKTable consumer -- the groupId is set to the Kafka streams
> > > application id instead of to null -- the consumer needlessly creates a
> > > ConsumerCoordinator and thus intiiates all the needless associated
> > > messaging and overhead.
> > >
> > > I was going to file a bug for this but per the contributing page am
> > > bringing this up here first. Is there a reason why GlobalKTable consumers
> > > should bear this group coordination overhead or should I go ahead and
> > file
> > > a ticket to remove it?
> > >
> > > thanks,
> > > Chris
> >
Thank you for the clarification. Now I see what you mean. If your
topology works correctly, I would not file it as a bug but as a
possible improvement.
Best,
Bruno
On Wed, Oct 30, 2019 at 1:20 AM Chris Toomey <ctoomey@gmail.com> wrote:
>
> Bruno,
>
> I'm using a fork based off the 2.4 branch .It's not the global consumer but
> the stream thread consumer that has the group id since it's built with the
> main consumer config:
> https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1051
> .
>
> It shouldn't be creating a regular consumer for the topic since my topology
> only has a single element, the GlobalKTable, which is populated by the
> global consumer. My scala code:
>
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start()
>
>
> I can disable the stream thread consumer by configuring num.stream.threads
> = 0, but why does it create this stream thread consumer in the first place
> if it's not been requested in the topology?
>
> thx,
> Chris
>
> On Tue, Oct 29, 2019 at 2:08 PM Bruno Cadonna <bruno@confluent.io> wrote:
>
> > Hi Chris,
> >
> > What version of Streams are you referring to?
> >
> > On the current trunk the group.id property is removed from the config
> > for the global consumer that populates the GlobalKTable.
> >
> > See the following code line
> >
> > https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1149
> >
> > Best,
> > Bruno
> >
> > On Tue, Oct 29, 2019 at 8:12 PM Chris Toomey <ctoomey@gmail.com> wrote:
> > >
> > > We have some simple Kafka streams apps that populate GlobalKTables to use
> > > as caches for topic contents. When running them with info-level logging
> > > enabled, I noticed unexpected activity around group coordination
> > (joining,
> > > rebalancing, leaving, rejoining) that I didn't expect given that they
> > need
> > > to consume from all topic partitions vs. use the group load balancing
> > > feature.
> > >
> > > I tracked this down to the way the consumer config. is generated for
> > > a GlobalKTable consumer -- the groupId is set to the Kafka streams
> > > application id instead of to null -- the consumer needlessly creates a
> > > ConsumerCoordinator and thus intiiates all the needless associated
> > > messaging and overhead.
> > >
> > > I was going to file a bug for this but per the contributing page am
> > > bringing this up here first. Is there a reason why GlobalKTable consumers
> > > should bear this group coordination overhead or should I go ahead and
> > file
> > > a ticket to remove it?
> > >
> > > thanks,
> > > Chris
> >
Comments
Post a Comment