Thank you John for the explanation!
I confirm that introducing a full integration I have reproduced the problem.
We have reviewed our pipelines using your suggestion (ValueTransformer and
a state store) and now it seems to work correctly!
If someone is interested here is the improved version of the same pipeline:
https://github.com/davideicardi/es4kafka/blob/ca6f27a9db5e38ac029493ae4e1ddd47ade8266e/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
regards
Davide
On Sun, Jan 31, 2021 at 9:23 PM John Roesler <vvcephei@apache.org> wrote:
> Hi David,
>
> Thank you for the question.
>
> If I can confirm, it looks like the "operations" topic is
> the only input to the topology, and the topology reads the
> "operations" topic joined with the "account" table and
> generates a "movements" stream. It reads (and aggregates)
> the "movements" stream to create the "account" table.
>
> I think your concern is dead-on. The processing of incoming
> records from either the "operations" topic or the
> "movements" topic is synchronous, BUT the production of
> messages to the "movements" topic and subsequent consumption
> of those messages in the join is asynchronous. In other
> words, you can indeed get "insufficient funds" in your
> application.
>
> Following your scenario, this can happen:
>
> 1. consume Operation(10)
> 1. join (balance is $0)
> 1. produce Movement(10)
>
> 2. consume Operation(10)
> 2. join (balance is $0)
> 2. produce Movement(10)
>
> 3. consume Movement(10) // the first one
> 3. update table to value $10
>
> 4. consume Operation(-20)
> 4. join (balance is $10) // only seen the first one
> 4. produce Movement(0, "insufficient funds)
>
> 5. consume Movement(10) // the second one (too late)
> 5. update table to value $20
>
> 6. consume Movement(0, "insufficient funds)
> ...
>
> Other interleavings are also possible.
>
> To get synchronous processing, what you want is a single
> subtopology where all the data flows are internal (i.e.,
> that re-entrant topic is the source of the race condition).
>
> If you don't know about it already, you can print out your
> topology and visualize it with
> https://zz85.github.io/kafka-streams-viz/ .
>
> Technically, you can actually model this as a single
> aggregation:
>
> operationStream
> .groupByKey
> .aggregate(Account(0)){ (_, operation, account) =>
> if (account.balance >= -operation.amount) {
> account.copy(balance=account.balance+operation.amount)
> } else {
> account.copy(error="Insufficient funds")
> }
> }
> .toStream.to(topicAccounts)
>
> But if you want to decouple the "insufficient funds" error
> handling from the account maintenence, you might look at a
> ValueTransformer, in which you maintain the Accounts in a
> key/value store and then forward an Either[Account, Error]
> result, which you can then direct however you please.
>
> Either way, maintaining the table and the balance checking
> logic in a single operation guarantees you won't have race
> conditions.
>
> A final note: the reason your tests don't show the race
> condition is that they are using TopologyTestDriver, which
> synchronously propagates each individual input record all
> the way through the topology. If you also set up a full
> integration test, I suspect that you'll quickly see the race
> condition surface.
>
> I hope this helps,
> -John
>
>
> On Sun, 2021-01-31 at 11:37 +0100, Davide Icardi wrote:
> > I'm working on a project where I want to use Kafka Streams for Event
> > Sourcing.
> >
> > General idea is that I have a "commands" topic/KStream, an "events"
> > topic/KStream and a "snapshots" topic/KTable.
> > Snapshots contains the current state of the entities. Commands are
> > validated using the "snapshots" and transformed to "events".
> >
> > Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
> > Left join COMMANDS stream with the SNAPSHOTS table and output new
> > EVENTS.
> >
> > For example, to apply this pattern to a simple bank-account scenario I
> can
> > have:
> > - operations stream as "commands" (requests to deposit or withdraw an
> > amount of money, eg. "deposit $10" => Operation(+10) )
> > - movements stream as "events" (actual deposit or withdraw event, eg.
> "$10
> > deposited" => Movement(+10) )
> > - account table as a "snapshots" (account balance, eg. "$20 in account
> > balance" => Account(20) )
> > - account id is used as key for all topics and tables
> >
> > The topology can be written like:
> >
> > case class Operation(amount: Int)
> > case class Movement(amount: Int, error: String = "")
> > case class Account(balance: Int)
> >
> > // events
> > val movementsStream = streamBuilder.stream[String,
> > Movement](Config.topicMovements)
> > // snapshots
> > val accountTable = movementsStream
> > .groupByKey
> > .aggregate(Account(0)){ (_, movement, account) =>
> > account.copy(balance = account.balance + movement.amount)
> > }
> > accountTable.toStream.to(Config.topicAccounts)
> > // commands
> > val operationsStream = streamBuilder.stream[String,
> > Operation](Config.topicOperations)
> > operationsStream
> > .leftJoin(accountTable) { (operation, accountOrNull) =>
> > val account = Option(accountOrNull).getOrElse(Account(0))
> > if (account.balance >= -operation.amount) {
> > Movement(operation.amount)
> > } else {
> > Movement(0, error = "insufficient funds")
> > }
> > }
> > .to(Config.topicMovements)
> >
> > (see full code here:
> >
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
> > )
> >
> > Now let's imagine a scenario where I deposit $ 10, then I deposit again $
> > 10 and then I withdraw $ 20:
> > inOperations.pipeInput("alice", Operation(10))
> > inOperations.pipeInput("alice", Operation(10))
> > inOperations.pipeInput("alice", Operation(-20))
> >
> > Can I assume that when processing the third message (withdraw $ 20) the
> > account table already processed the previous two movements (10+10)?
> > In other words, can I assume that:
> > - if operations are valid I never receive an "insufficent funds" error
> > event ?
> > - in the above topology, account KTable is always updated before
> processing
> > the next operation from KStream ?
> >
> > From my tests it seems to work, but I would like to have some advice if
> > this is a safe assumption.
> > (see test here:
> >
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala
> > )
> >
> > thanks
> > Davide Icardi
>
>
>
I confirm that introducing a full integration I have reproduced the problem.
We have reviewed our pipelines using your suggestion (ValueTransformer and
a state store) and now it seems to work correctly!
If someone is interested here is the improved version of the same pipeline:
https://github.com/davideicardi/es4kafka/blob/ca6f27a9db5e38ac029493ae4e1ddd47ade8266e/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
regards
Davide
On Sun, Jan 31, 2021 at 9:23 PM John Roesler <vvcephei@apache.org> wrote:
> Hi David,
>
> Thank you for the question.
>
> If I can confirm, it looks like the "operations" topic is
> the only input to the topology, and the topology reads the
> "operations" topic joined with the "account" table and
> generates a "movements" stream. It reads (and aggregates)
> the "movements" stream to create the "account" table.
>
> I think your concern is dead-on. The processing of incoming
> records from either the "operations" topic or the
> "movements" topic is synchronous, BUT the production of
> messages to the "movements" topic and subsequent consumption
> of those messages in the join is asynchronous. In other
> words, you can indeed get "insufficient funds" in your
> application.
>
> Following your scenario, this can happen:
>
> 1. consume Operation(10)
> 1. join (balance is $0)
> 1. produce Movement(10)
>
> 2. consume Operation(10)
> 2. join (balance is $0)
> 2. produce Movement(10)
>
> 3. consume Movement(10) // the first one
> 3. update table to value $10
>
> 4. consume Operation(-20)
> 4. join (balance is $10) // only seen the first one
> 4. produce Movement(0, "insufficient funds)
>
> 5. consume Movement(10) // the second one (too late)
> 5. update table to value $20
>
> 6. consume Movement(0, "insufficient funds)
> ...
>
> Other interleavings are also possible.
>
> To get synchronous processing, what you want is a single
> subtopology where all the data flows are internal (i.e.,
> that re-entrant topic is the source of the race condition).
>
> If you don't know about it already, you can print out your
> topology and visualize it with
> https://zz85.github.io/kafka-streams-viz/ .
>
> Technically, you can actually model this as a single
> aggregation:
>
> operationStream
> .groupByKey
> .aggregate(Account(0)){ (_, operation, account) =>
> if (account.balance >= -operation.amount) {
> account.copy(balance=account.balance+operation.amount)
> } else {
> account.copy(error="Insufficient funds")
> }
> }
> .toStream.to(topicAccounts)
>
> But if you want to decouple the "insufficient funds" error
> handling from the account maintenence, you might look at a
> ValueTransformer, in which you maintain the Accounts in a
> key/value store and then forward an Either[Account, Error]
> result, which you can then direct however you please.
>
> Either way, maintaining the table and the balance checking
> logic in a single operation guarantees you won't have race
> conditions.
>
> A final note: the reason your tests don't show the race
> condition is that they are using TopologyTestDriver, which
> synchronously propagates each individual input record all
> the way through the topology. If you also set up a full
> integration test, I suspect that you'll quickly see the race
> condition surface.
>
> I hope this helps,
> -John
>
>
> On Sun, 2021-01-31 at 11:37 +0100, Davide Icardi wrote:
> > I'm working on a project where I want to use Kafka Streams for Event
> > Sourcing.
> >
> > General idea is that I have a "commands" topic/KStream, an "events"
> > topic/KStream and a "snapshots" topic/KTable.
> > Snapshots contains the current state of the entities. Commands are
> > validated using the "snapshots" and transformed to "events".
> >
> > Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
> > Left join COMMANDS stream with the SNAPSHOTS table and output new
> > EVENTS.
> >
> > For example, to apply this pattern to a simple bank-account scenario I
> can
> > have:
> > - operations stream as "commands" (requests to deposit or withdraw an
> > amount of money, eg. "deposit $10" => Operation(+10) )
> > - movements stream as "events" (actual deposit or withdraw event, eg.
> "$10
> > deposited" => Movement(+10) )
> > - account table as a "snapshots" (account balance, eg. "$20 in account
> > balance" => Account(20) )
> > - account id is used as key for all topics and tables
> >
> > The topology can be written like:
> >
> > case class Operation(amount: Int)
> > case class Movement(amount: Int, error: String = "")
> > case class Account(balance: Int)
> >
> > // events
> > val movementsStream = streamBuilder.stream[String,
> > Movement](Config.topicMovements)
> > // snapshots
> > val accountTable = movementsStream
> > .groupByKey
> > .aggregate(Account(0)){ (_, movement, account) =>
> > account.copy(balance = account.balance + movement.amount)
> > }
> > accountTable.toStream.to(Config.topicAccounts)
> > // commands
> > val operationsStream = streamBuilder.stream[String,
> > Operation](Config.topicOperations)
> > operationsStream
> > .leftJoin(accountTable) { (operation, accountOrNull) =>
> > val account = Option(accountOrNull).getOrElse(Account(0))
> > if (account.balance >= -operation.amount) {
> > Movement(operation.amount)
> > } else {
> > Movement(0, error = "insufficient funds")
> > }
> > }
> > .to(Config.topicMovements)
> >
> > (see full code here:
> >
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
> > )
> >
> > Now let's imagine a scenario where I deposit $ 10, then I deposit again $
> > 10 and then I withdraw $ 20:
> > inOperations.pipeInput("alice", Operation(10))
> > inOperations.pipeInput("alice", Operation(10))
> > inOperations.pipeInput("alice", Operation(-20))
> >
> > Can I assume that when processing the third message (withdraw $ 20) the
> > account table already processed the previous two movements (10+10)?
> > In other words, can I assume that:
> > - if operations are valid I never receive an "insufficent funds" error
> > event ?
> > - in the above topology, account KTable is always updated before
> processing
> > the next operation from KStream ?
> >
> > From my tests it seems to work, but I would like to have some advice if
> > this is a safe assumption.
> > (see test here:
> >
> https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala
> > )
> >
> > thanks
> > Davide Icardi
>
>
>
Comments
Post a Comment