I'm working on a project where I want to use Kafka Streams for Event
Sourcing.
General idea is that I have a "commands" topic/KStream, an "events"
topic/KStream and a "snapshots" topic/KTable.
Snapshots contains the current state of the entities. Commands are
validated using the "snapshots" and transformed to "events".
Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
Left join COMMANDS stream with the SNAPSHOTS table and output new
EVENTS.
For example, to apply this pattern to a simple bank-account scenario I can
have:
- operations stream as "commands" (requests to deposit or withdraw an
amount of money, eg. "deposit $10" => Operation(+10) )
- movements stream as "events" (actual deposit or withdraw event, eg. "$10
deposited" => Movement(+10) )
- account table as a "snapshots" (account balance, eg. "$20 in account
balance" => Account(20) )
- account id is used as key for all topics and tables
The topology can be written like:
case class Operation(amount: Int)
case class Movement(amount: Int, error: String = "")
case class Account(balance: Int)
// events
val movementsStream = streamBuilder.stream[String,
Movement](Config.topicMovements)
// snapshots
val accountTable = movementsStream
.groupByKey
.aggregate(Account(0)){ (_, movement, account) =>
account.copy(balance = account.balance + movement.amount)
}
accountTable.toStream.to(Config.topicAccounts)
// commands
val operationsStream = streamBuilder.stream[String,
Operation](Config.topicOperations)
operationsStream
.leftJoin(accountTable) { (operation, accountOrNull) =>
val account = Option(accountOrNull).getOrElse(Account(0))
if (account.balance >= -operation.amount) {
Movement(operation.amount)
} else {
Movement(0, error = "insufficient funds")
}
}
.to(Config.topicMovements)
(see full code here:
https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
)
Now let's imagine a scenario where I deposit $ 10, then I deposit again $
10 and then I withdraw $ 20:
inOperations.pipeInput("alice", Operation(10))
inOperations.pipeInput("alice", Operation(10))
inOperations.pipeInput("alice", Operation(-20))
Can I assume that when processing the third message (withdraw $ 20) the
account table already processed the previous two movements (10+10)?
In other words, can I assume that:
- if operations are valid I never receive an "insufficent funds" error
event ?
- in the above topology, account KTable is always updated before processing
the next operation from KStream ?
From my tests it seems to work, but I would like to have some advice if
this is a safe assumption.
(see test here:
https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala
)
thanks
Davide Icardi
Sourcing.
General idea is that I have a "commands" topic/KStream, an "events"
topic/KStream and a "snapshots" topic/KTable.
Snapshots contains the current state of the entities. Commands are
validated using the "snapshots" and transformed to "events".
Group EVENTS stream by key and aggregate them to a SNAPSHOTS table.
Left join COMMANDS stream with the SNAPSHOTS table and output new
EVENTS.
For example, to apply this pattern to a simple bank-account scenario I can
have:
- operations stream as "commands" (requests to deposit or withdraw an
amount of money, eg. "deposit $10" => Operation(+10) )
- movements stream as "events" (actual deposit or withdraw event, eg. "$10
deposited" => Movement(+10) )
- account table as a "snapshots" (account balance, eg. "$20 in account
balance" => Account(20) )
- account id is used as key for all topics and tables
The topology can be written like:
case class Operation(amount: Int)
case class Movement(amount: Int, error: String = "")
case class Account(balance: Int)
// events
val movementsStream = streamBuilder.stream[String,
Movement](Config.topicMovements)
// snapshots
val accountTable = movementsStream
.groupByKey
.aggregate(Account(0)){ (_, movement, account) =>
account.copy(balance = account.balance + movement.amount)
}
accountTable.toStream.to(Config.topicAccounts)
// commands
val operationsStream = streamBuilder.stream[String,
Operation](Config.topicOperations)
operationsStream
.leftJoin(accountTable) { (operation, accountOrNull) =>
val account = Option(accountOrNull).getOrElse(Account(0))
if (account.balance >= -operation.amount) {
Movement(operation.amount)
} else {
Movement(0, error = "insufficient funds")
}
}
.to(Config.topicMovements)
(see full code here:
https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/main/scala/bank/StreamingPipeline.scala
)
Now let's imagine a scenario where I deposit $ 10, then I deposit again $
10 and then I withdraw $ 20:
inOperations.pipeInput("alice", Operation(10))
inOperations.pipeInput("alice", Operation(10))
inOperations.pipeInput("alice", Operation(-20))
Can I assume that when processing the third message (withdraw $ 20) the
account table already processed the previous two movements (10+10)?
In other words, can I assume that:
- if operations are valid I never receive an "insufficent funds" error
event ?
- in the above topology, account KTable is always updated before processing
the next operation from KStream ?
From my tests it seems to work, but I would like to have some advice if
this is a safe assumption.
(see test here:
https://github.com/davideicardi/es4kafka/blob/master/examples/bank-account/src/test/scala/bank/StreamingPipelineSpec.scala
)
thanks
Davide Icardi
Comments
Post a Comment