Lev Gorodinski
gorodinski.com
@eulerfx
Inventory tracking system
InventoryItem Aggregate
public class InventoryItem : AggregateRoot
{
bool _activated;
public void Deactivate()
{
if(!_activated) throw new InvalidOperationException("already deactivated");
ApplyChange(new InventoryItemDeactivated(_id));
}
void Apply(InventoryItemDeactivated e)
{
_activated = false;
}
}
source
public abstract class AggregateRoot
{
public abstract Guid Id { get; }
public int Version { get; internal set; }
readonly List<Event> _changes = new List<Event>();
protected void ApplyChange(Event e)
{
ApplyChange(e, true);
}
void ApplyChange(Event e, bool isNew)
{
(this as dynamic).Apply((dynamic)e);
if (isNew) _changes.Add(e);
}
}
type State = {
isActive : bool;
}
type Command =
| Create of name:string
| Deactivate
type Event =
| Created of name:string
| Deactivated
let apply state event =
match event with
| Deactivated -> { state with State.isActive = false; }
let apply state = function
| Deactivated -> { state with State.isActive = false; }
module Assert =
let inactive item = if item.isActive = true then failwith "The item is already deactivated."
let exec state =
let apply event =
let newItem = apply item event
event
function
| Deactivate -> item |> Assert.inactive; Deactivated |> apply
source
want to learn event sourcing? f(state, event) => state
— gregyoung (@gregyoung) March 17, 2013
let apply state = function
| Deactivated -> { state with State.isActive = false; }
Capture all changes to an application state as a sequence of events.Martin Fowler
Capture allchanges to an application statebehaviors as a sequence of events.
Replay all past events to bring aggregate into current state.
type Aggregate<'TState, 'TCommand, 'TEvent> = {
zero : 'TState;
apply : 'TState -> 'TEvent -> 'TState;
exec : 'TState -> 'TCommand -> 'TEvent;
}
let events = load (typeof<'TEvent>,id)
let state = events |> Seq.fold aggregate.apply aggregate.zero
source
let load (typ,id) =
let slice = conn.ReadStreamEventsForward(id,1,Int32.Max,false)
slice.Events
|> Seq.map (fun e -> decode(typ,e.Event.EventType,e.Event.Data))
source
let commit (id,expectedVersion) e =
let eventType,bytes = serialize e
let meta = [||] : byte array
let data = EventData(Guid.NewGuid(),eventType,true,bytes,meta)
if version = 0 then
conn.CreateStream(id, Guid.NewGuid(), true, metaData)
conn.AppendToStream(id,expectedVersion,data)
source
fromCategory('InventoryItem').foreachStream().when({
$init: function() {
return { name: null, active: false };
},
"Created": function (s, e) {
s.name = e.body.value;
s.active = true;
var streamId = "ReadModel-" +
e.streamId.replace("InventoryItem-", "");
var eventType = e.eventType + "_ReadModel";
emit(streamId, eventType, s);
},
"Deactivated": function (s, e) {
s.active = false;
var streamId = "ReadModel-" +
e.streamId.replace("InventoryItem-", "");
var eventType = e.eventType + "_ReadModel";
emit(streamId, eventType, s);
}
});
source