layout: docs.hbs
title: Persistence
Persistence
Akka.Persistence plugin enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a CLR crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly (except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can be either the full history of changes or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point communication with at-least-once message delivery semantics.
Architecture
Akka.Persistence features are available through new set of actor base classes:
ReceivePersistentActor
is a persistent, stateful actor. It is able to persist events to a journal and can react to them in a thread-safe manner. It can be used to implement both command as well as event sourced actors. When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can recover internal state from these messages.UntypedPersistentActor
- untyped version of ReceivePersistentActor.PersistentView
is a persistent, stateful actor that receives journaled messages that have been written by another persistent actor. A view itself does not journal new messages, instead, it updates internal state only from a persistent actor's replicated message stream. Note: PersistentView is deprecated.AtLeastOnceDeliveryReceiveActor
is an actor which sends messages with at-least-once delivery semantics to destinations, also in case of sender and receiver CLR crashes.AtLeastOnceDeliveryActor
- untyped version of AtLeastOnceDeliveryReceiveActor.AsyncWriteJournal
stores the sequence of messages sent to a persistent actor. An application can control which messages are journaled and which are received by the persistent actor without being journaled. Journal maintains highestSequenceNr that is increased on each message. The storage backend of a journal is pluggable. By default it uses an in-memory message stream and is NOT a persistent storage.SnapshotStore
is used to persist snapshots of either persistent actor's or view's internal state. They can be used to reduce recovery times in case when a lot of events needs to be replayed for specific persistent actor. Storage backend of the snapshot store is pluggable. By default it uses local file system.
Persistent actors
Unlike the default ActorBase
class, PersistentActor
and its derivatives requires the setup of a few more additional members:
PersistenceId
is a persistent actor's identifier that doesn't change across different actor incarnations. It's used to retrieve an event stream required by the persistent actor to recover its internal state.ReceiveRecover
is a method invoked during an actor's recovery cycle. Incoming objects may be user-defined events as well as system messages, for exampleSnapshotOffer
which is used to deliver latest actor state saved in the snapshot store.ReceiveCommand
is an equivalent of the basicReceive
method of default Akka.NET actors.
Persistent actors also offer a set of specialized members:
Persist
andPersistAsync
methods can be used to send events to the event journal in order to store them inside. The second argument is a callback invoked when the journal confirms that events have been stored successfully.Defer
andDeferAsync
are used to perform various operations after events will be persisted and their callback handlers will be invoked. Unlike the persist methods, defer won't store an event in persistent storage. Defer methods may NOT be invoked in case when the actor is restarted even though the journal will successfully persist events sent.DeleteMessages
will order attached journal to remove part of its events. It can be either logical deletion - messages are marked as deleted, but are not removed physically from the backend storage - or a physical one, when the messages are removed physically from the journal.LoadSnapshot
will send a request to the snapshot store to resend the current actor's snapshot.SaveSnapshot
will send the current actor's internal state as a snapshot to be saved by the configured snapshot store.DeleteSnapshot
andDeleteSnapshots
methods may be used to specify snapshots to be removed from the snapshot store in cases where they are no longer needed.OnReplaySuccess
is a virtual method which will be called when the recovery cycle ends successfully.OnReplayFailure
is a virtual method which will be called when the recovery cycle fails unexpectedly from some reason.IsRecovering
property determines if the current actor is performing a recovery cycle at the moment.SnapshotSequenceNr
property may be used to determine the sequence number used for marking persisted events. This value changes in a monotonically increasing manner.
In case a manual recovery cycle initialization is necessary, it may be invoked by sending a Recover
message to a persistent actor.
A persistent actor receives a (non-persistent) command which is first validated if it can be applied to the current state. Here validation can mean anything from simple inspection of a command message's fields up to a conversation with several external services, for example. If validation succeeds, events are generated from the command, representing the effect of the command. These events are then persisted and, after successful persistence, used to change the actor's state. When the persistent actor needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. Event sourced actors may of course also process commands that do not change application state such as query commands for example.
Akka persistence supports event sourcing with the ReceivePersistentActor
abstract class. An actor that extends this class uses the persist method to persist and handle events. The behavior of an ReceivePersistentActor
is defined by implementing Recover
and Receive
methods. This is demonstrated in the following example.
public class Cmd
{
public Cmd(string data)
{
Data = data;
}
public string Data { get; }
}
public class Evt
{
public Evt(string data)
{
Data = data;
}
public string Data { get; }
}
public class ExampleState
{
private readonly List _events;
public ExampleState(List events)
{
_events = events;
}
public ExampleState() : this(new List())
{
}
public void Update(Evt evt)
{
_events.Add(evt.Data);
}
public ExampleState Copy()
{
return new ExampleState(_events);
}
public int Size => _events.Count;
}
public class ExamplePersistentActor : ReceivePersistentActor
{
private ExampleState _state = new ExampleState();
public ExamplePersistentActor()
{
Recover(evt =>
{
_state.Update(evt);
});
Recover(snapshot =>
{
_state = (ExampleState)snapshot.Snapshot;
});
Command(message =>
{
string data = message.Data;
Evt evt1 = new Evt($"{data}-{_state.Size}");
Evt evt2 = new Evt($"{data}-{_state.Size + 1}");
var events = new List { evt1, evt2 };
PersistAll(events, evt =>
{
_state.Update(evt);
if (evt == evt2)
{
Context.System.EventStream.Publish(evt);
}
});
});
Command(msg => msg == "snap", message =>
{
SaveSnapshot(_state.Copy());
});
Command(msg => msg == "print", message =>
{
Console.WriteLine(_state);
});
}
public override string PersistenceId { get; } = "sample-id-1";
}
The example defines two data types, Cmd and Evt to represent commands and events, respectively. The state of the ExamplePersistentActor
is a list of persisted event data contained in ExampleState
.
The persistent actor's OnReceiveRecover
method defines how state is updated during recovery by handling Evt
and SnapshotOffer
messages. The persistent actor's OnReceiveCommand
method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling Persist
with an event (or a sequence of events) as first argument and an event handler as second argument.
The persist method persists events asynchronously and the event handler is executed for successfully persisted events. Successfully persisted events are internally sent back to the persistent actor as individual messages that trigger event handler executions. An event handler may close over persistent actor state and mutate it. The sender of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command (not shown).
The main responsibility of an event handler is changing persistent actor state using event data and notifying others about successful state changes by publishing events.
When persisting events with persist it is guaranteed that the persistent actor will not receive further commands between the persist call and the execution(s) of the associated event handler. This also holds for multiple persist calls in context of a single command. Incoming messages are stashed until the persist is completed.
If persistence of an event fails, OnPersistFailure
will be invoked (logging the error by default), and the actor will unconditionally be stopped. If persistence of an event is rejected before it is stored, e.g. due to serialization error, OnPersistRejected
will be invoked (logging a warning by default), and the actor continues with the next message.
NOTE: It's also possible to switch between different command handlers during normal processing and recovery with
Context.Become
andContext.Unbecome
. To get the actor into the same state after recovery you need to take special care to perform the same state transitions with become and unbecome in theReceiveRecover
method as you would have done in the command handler. Note that when using become fromReceiveRecover
it will still only use theReceiveRecover
behavior when replaying the events. When replay is completed it will use the new behavior.
Identifiers
A persistent actor must have an identifier that doesn't change across different actor incarnations. The identifier must be defined with the PersistenceId
method.
public override string PersistenceId { get; } = "my-stable-persistence-id";
Recovery
By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages. New messages sent to a persistent actor during recovery do not interfere with replayed messages. They are cached and received by a persistent actor after recovery phase completes.
NOTE: Accessing the
Sender
for replayed messages will always result in a deadLetters reference, as the original sender is presumed to be long gone. If you indeed have to notify an actor during recovery in the future, store itsActorPath
explicitly in your persisted events.
Recovery customization
Applications may also customise how recovery is performed by returning a customised Recovery
object in the recovery method of a ReceivePersistentActor
, for example setting an upper bound to the replay which allows the actor to be replayed to a certain point "in the past" instead to its most up to date state:
public override Recovery Recovery
{
get { return new Recovery(new SnapshotSelectionCriteria(457)); }
}
Recovery can be disabled by returning SnapshotSelectionCriteria.None
in the recovery property of a PersistentActor:
public override Recovery Recovery
{
get { return new Recovery(SnapshotSelectionCriteria.None); }
}
Recovery status
A persistent actor can query its own recovery status via the methods
public bool IsRecovering { get; }
public bool IsRecoveryFinished { get; }
Sometimes there is a need for performing additional initialization when the recovery has completed before processing any other message sent to the persistent actor. The persistent actor will receive a special RecoveryCompleted
message right after recovery and before any other received messages.
Recover(message =>
{
// perform init after recovery, before any other messages
});
Command(message =>
{
});
If there is a problem with recovering the state of the actor from the journal, OnRecoveryFailure
is called (logging the error by default) and the actor will be stopped.
Internal stash
The persistent actor has a private stash for internally caching incoming messages during Recovery
or the Persist
\ PersistAll
method persisting events. However You can use inherited stash or create one or more stashes if needed. The internal stash doesn't interfere with these stashes apart from user inherited UnstashAll
method, which prepends all messages in the inherited stash to the internal stash instead of mailbox. Hence, If the message in the inherited stash need to be handled after the messages in the internal stash, you should call inherited unstash method.
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number of stashed messages will grow. It can be wise to protect against OutOfMemoryException
by defining a maximum stash capacity in the mailbox configuration:
akka.actor.default-mailbox.stash-capacity = 10000
Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, you may need to define a small stash capacity to ensure that the total number of stashed messages in the system don't consume too much memory. Additionally, The persistent actor defines three strategies to handle failure when the internal stash capacity is exceeded. The default overflow strategy is the ThrowOverflowExceptionStrategy
, which discards the current received message and throws a StashOverflowException
, causing actor restart if default supervision strategy is used. you can override the InternalStashOverflowStrategy
property to return DiscardToDeadLetterStrategy
or ReplyToStrategy
for any "individual" persistent actor, or define the "default" for all persistent actors by providing FQCN, which must be a subclass of StashOverflowStrategyConfigurator
, in the persistence configuration:
akka.persistence.internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator"
The DiscardToDeadLetterStrategy
strategy also has a pre-packaged companion configurator DiscardConfigurator
.
You can also query default strategy via the Akka persistence extension singleton:
Context.System.DefaultInternalStashOverflowStrategy
NOTE: Note The bounded mailbox should be avoid in the persistent actor, because it may be discarding the messages come from Storage backends. You can use bounded stash instead of bounded mailbox.
Relaxed local consistency requirements and high throughput use-cases
If faced with relaxed local consistency requirements and high throughput demands sometimes PersistentActor
and its persist may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all Events related to a given Command are processed in order to start processing the next Command. While this abstraction is very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may want to process commands as fast as you can, assuming that the Event will eventually be persisted and handled properly in the background, retroactively reacting to persistence failures if needed.
The PersistAsync
method provides a tool for implementing high-throughput persistent actors. It will not stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks.
In the below example, the event callbacks may be called "at any time", even after the next Command has been processed. The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.).
public class DocumentNestedPersistentActor : ReceivePersistentActor
{
public override string PersistenceId => "HardCoded";
public DocumentNestedPersistentActor()
{
Action replyToSender = message =>
{
Sender.Tell(message, Self);
};
Recover(message =>
{
// handle recovery here
});
Command(message =>
{
Sender.Tell(message, Self);
PersistAsync($"evt-{message}-1", replyToSender);
PersistAsync($"evt-{message}-2", replyToSender);
});
}
}
NOTE: In order to implement the pattern known as "command sourcing" simply
PersistAsync
all incoming messages right away and handle them in the callback.WARNING: The callback will not be invoked if the actor is restarted (or stopped) in between the call to
PersistAsync
and the journal has confirmed the write.
Deferring actions until preceding persist handlers have executed
Sometimes when working with PersistAsync
you may find that it would be nice to define some actions in terms of happens-after the previous PersistAsync
handlers have been invoked. PersistentActor
provides an utility method called DeferAsync
, which works similarly to PersistAsync
yet does not persist the passed in event. It is recommended to use it for read operations, and actions which do not have corresponding events in your domain model.
Using this method is very similar to the persist family of methods, yet it does not persist the passed in event. It will be kept in memory and used when invoking the handler.
public class DocumentNestedPersistentActor : ReceivePersistentActor
{
public override string PersistenceId => "HardCoded";
public DocumentNestedPersistentActor()
{
Action replyToSender = message =>
{
Sender.Tell(message, Self);
};
Recover(message =>
{
// handle recovery here
});
Command(message =>
{
PersistAsync($"evt-{message}-1", replyToSender);
PersistAsync($"evt-{message}-2", replyToSender);
DeferAsync($"evt-{message}-3", replyToSender);
});
}
}
Notice that the Sender
is safe to access in the handler callback, and will be pointing to the original sender of the command for which this DeferAsync
handler was called.
persistentActor.tell("a");
persistentActor.tell("b");
// order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3
WARNING: The callback will not be invoked if the actor is restarted (or stopped) in between the call to
DeferAsync
and the journal has processed and confirmed all preceding writes..
Nested persist calls
It is possible to call Persist
and PersistAsync
inside their respective callback blocks and they will properly retain both the thread safety (including the right value of Sender
) as well as stashing guarantees.
In general it is encouraged to create command handlers which do not need to resort to nested event persisting, however there are situations where it may be useful. It is important to understand the ordering of callback execution in those situations, as well as their implication on the stashing behaviour (that persist enforces). In the following example two persist calls are issued, and each of them issues another persist inside its callback:
public class DocumentNestedPersistentActor : ReceivePersistentActor
{
public override string PersistenceId => "HardCoded";
public DocumentNestedPersistentActor()
{
Action replyToSender = (message) =>
{
Sender.Tell(message, Self);
};
Command(message =>
{
Persist($"{message}-outer-1", innerMessage =>
{
Sender.Tell(innerMessage, Self);
Persist($"{innerMessage}-inner-1", replyToSender);
});
Persist($"{message}-outer-2", innerMessage =>
{
Sender.Tell(innerMessage, Self);
Persist($"{innerMessage}-inner-2", replyToSender);
});
});
}
}
When sending two commands to this PersistentActor
, the persist handlers will be executed in the following order:
persistentActor.tell("a");
persistentActor.tell("b");
// order of received messages:
// a
// a-outer-1
// a-outer-2
// a-inner-1
// a-inner-2
// and only then process "b"
// b
// b-outer-1
// b-outer-2
// b-inner-1
// b-inner-2
First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed, the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor. In other words, the stashing of incoming commands that is guaranteed by initially calling Persist
on the outer layer is extended until all nested persist callbacks have been handled.
It is also possible to nest PersistAsync
calls, using the same pattern:
public class DocumentNestedPersistentActor : ReceivePersistentActor
{
public override string PersistenceId => "HardCoded";
public DocumentNestedPersistentActor()
{
Action replyToSender = (message) =>
{
Sender.Tell(message, Self);
};
Command(message =>
{
PersistAsync($"{message}-outer-1", innerMessage =>
{
Sender.Tell(innerMessage, Self);
PersistAsync($"{innerMessage}-inner-1", replyToSender);
});
PersistAsync($"{message}-outer-2", innerMessage =>
{
Sender.Tell(innerMessage, Self);
PersistAsync($"{innerMessage}-inner-2", replyToSender);
});
});
}
}
In this case no stashing is happening, yet events are still persisted and callbacks are executed in the expected order:
persistentActor.tell("a");
persistentActor.tell("b");
// order of received messages:
// a
// b
// a-outer-1
// a-outer-2
// b-outer-1
// b-outer-2
// a-inner-1
// a-inner-2
// b-inner-1
// b-inner-2
// which can be seen as the following causal relationship:
// a -> a-outer-1 -> a-outer-2 -> a-inner-1 -> a-inner-2
// b -> b-outer-1 -> b-outer-2 -> b-inner-1 -> b-inner-2
While it is possible to nest mixed persist and PersistAsync
with keeping their respective semantics it is not a recommended practice, as it may lead to overly complex nesting.
Failures
If persistence of an event fails, OnPersistFailure
will be invoked (logging the error by default), and the actor will unconditionally be stopped.
The reason that it cannot resume when persist fails is that it is unknown if the event was actually persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures will most likely fail anyway since the journal is probably unavailable. It is better to stop the actor and after a back-off timeout start it again. The BackoffSupervisor
actor is provided to support such restarts.
protected override void PreStart()
{
var childProps = Props.Create();
var actor = new BackoffSupervisor(
childProps,
"myActor",
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(30),
0.2);
base.PreStart();
}
If persistence of an event is rejected before it is stored, e.g. due to serialization error, OnPersistRejected
will be invoked (logging a warning by default), and the actor continues with next message.
If there is a problem with recovering the state of the actor from the journal when the actor is started, OnRecoveryFailure
is called (logging the error by default), and the actor will be stopped.
Atomic writes
Each event is of course stored atomically, but it is also possible to store several events atomically by using the PersistAll
or PersistAllAsync
method. That means that all events passed to that method are stored or none of them are stored if there is an error.
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by PersistAll
.
Some journals may not support atomic writes of several events and they will then reject the PersistAll
command, i.e. OnPersistRejected
is called with an exception (typically NotSupportedException
).
Batch writes
In order to optimize throughput when using PersistAsync
, a persistent actor internally batches events to be stored under high load before writing them to the journal (as a single batch). The batch size is dynamically determined by how many events are emitted during the time of a journal round-trip: after sending a batch to the journal no further batch can be sent before confirmation has been received that the previous batch has been written. Batch writes are never timer-based which keeps latencies at a minimum.
Message deletion
It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number; Persistent actors may call the DeleteMessages
method to this end.
Deleting messages in event sourcing based applications is typically either not used at all, or used in conjunction with snapshotting, i.e. after a snapshot has been successfully stored, a DeleteMessages
(ToSequenceNr
) up until the sequence number of the data held by that snapshot can be issued to safely delete the previous events while still having access to the accumulated state during replays - by loading the snapshot.
The result of the DeleteMessages
request is signaled to the persistent actor with a DeleteMessagesSuccess
message if the delete was successful or a DeleteMessagesFailure
message if it failed.
Message deletion doesn't affect the highest sequence number of the journal, even if all messages were deleted from it after DeleteMessages
invocation.
Persistence status handling
Method | Success | Failure / Rejection | After failure handler invoked |
---|---|---|---|
Persist / PersistAsync | persist handler invoked | OnPersistFailure | Actor is stopped. |
OnPersistRejected | No automatic actions. | ||
Recovery | RecoverySuccess | OnRecoveryFailure | Actor is stopped. |
DeleteMessages | DeleteMessagesSuccess | DeleteMessagesFailure | No automatic actions. |
The most important operations (Persist and Recovery) have failure handlers modelled as explicit callbacks which the user can override in the PersistentActor
. The default implementations of these handlers emit a log message (error for persist/recovery failures, and warning for others), logging the failure cause and information about which message caused the failure.
For critical failures such as recovery or persisting events failing the persistent actor will be stopped after the failure handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most likely not help the journal recover – as it would likely cause a Thundering herd problem, as many persistent actors would restart and try to persist their events again. Instead, using a BackoffSupervisor
(as described in Failures) which implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between restarts of the persistent actor.
NOTE: Journal implementations may choose to implement a retry mechanism, e.g. such that only after a write fails N number of times a persistence failure is signalled back to the user. In other words, once a journal returns a failure, it is considered fatal by Akka Persistence, and the persistent actor which caused the failure will be stopped. Check the documentation of the journal implementation you are using for details if/how it is using this technique.
Safely shutting down persistent actors
Special care should be given when shutting down persistent actors from the outside. With normal Actors it is often acceptable to use the special PoisonPill
message to signal to an Actor that it should stop itself once it receives this message – in fact this message is handled automatically by Akka, leaving the target actor no way to refuse stopping itself when given a poison pill.
This can be dangerous when used with PersistentActor due to the fact that incoming commands are stashed while the persistent actor is awaiting confirmation from the Journal that events have been written when Persist
was used. Since the incoming commands will be drained from the Actor's mailbox and put into its internal stash while awaiting the confirmation (thus, before calling the persist handlers) the Actor may receive and (auto)handle the PoisonPill
before it processes the other messages which have been put into its stash, causing a pre-mature shutdown of the Actor.
WARNING: Consider using explicit shut-down messages instead of
PoisonPill
when working with persistent actors.
The example below highlights how messages arrive in the Actor's mailbox and how they interact with its internal stashing mechanism when Persist()
is used. Notice the early stop behaviour that occurs when PoisonPill
is used:
public class Shutdown
{
}
public class ShutdownPersistentActor : ReceivePersistentActor
{
public ShutdownPersistentActor()
{
Recover(rec =>
{
// handle recovery...
});
Command(msg =>
{
Persist(msg, param =>
{
Console.WriteLine(param);
});
});
Command(msg =>
{
Context.Stop(Self);
});
}
public override string PersistenceId
{
get
{
return "some-persistence-id";
}
}
}
// UN-SAFE, due to PersistentActor's command stashing:
persistentActor.Tell("a");
persistentActor.Tell("b");
persistentActor.Tell(PoisonPill.Instance);
// order of received messages:
// a
// # b arrives at mailbox, stashing; internal-stash = [b]
// # PoisonPill arrives at mailbox, stashing; internal-stash = [b, Shutdown]
// PoisonPill is an AutoReceivedMessage, is handled automatically
// !! stop !!
// Actor is stopped without handling `b` nor the `a` handler!
// SAFE:
persistentActor.Tell("a");
persistentActor.Tell("b");
persistentActor.Tell(new Shutdown());
// order of received messages:
// a
// # b arrives at mailbox, stashing; internal-stash = [b]
// # Shutdown arrives at mailbox, stashing; internal-stash = [b, Shutdown]
// handle-a
// # unstashing; internal-stash = [Shutdown]
// b
// handle-b
// # unstashing; internal-stash = []
// Shutdown
// -- stop --
Persistent views
WARNING:
PersistentView
is deprecated. UsePersistenceQuery
when it will be ported.
While a persistent actor may be used to produce and persist events, views are used only to read internal state based on them. Like the persistent actor, a view has a PersistenceId
to specify a collection of events to be resent to current view. This value should however be correlated with the PersistentId
of an actor who is the producer of the events.
Other members:
ViewId
property is a view unique identifier that doesn't change across different actor incarnations. It's useful in cases where there are multiple different views associated with a single persistent actor, but showing its state from a different perspectives.IsAutoUpdate
property determines if the view will try to automatically update its state in specified time intervals. Without it, the view won't update its state until it receives an explicitUpdate
message. This value can be set through configuration with akka.persistence.view.auto-update set to either on (by default) or off.AutoUpdateInterval
specifies a time interval in which the view will be updating itself - only in cases where the IsAutoUpdate flag is on. This value can be set through configuration with akka.persistence.view.auto-update-interval key (5 seconds by default).AutoUpdateReplayMax
property determines the maximum number of events to be replayed during a single Update cycle. This value can be set through configuration with akka.persistence.view.auto-update-replay-max key (by default it's -1 - no limit).LoadSnapshot
will send a request to the snapshot store to resend a current view's snapshot.SaveSnapshot
will send the current view's internal state as a snapshot to be saved by the configured snapshot store.DeleteSnapshot
andDeleteSnapshots
methods may be used to specify snapshots to be removed from the snapshot store in cases where they are no longer needed.
The PersistenceId
identifies the persistent actor from which the view receives journaled messages. It is not necessary that the referenced persistent actor is actually running. Views read messages from a persistent actor's journal directly. When a persistent actor is started later and begins to write new messages, by default the corresponding view is updated automatically.
It is possible to determine if a message was sent from the Journal or from another actor in user-land by calling the IsPersistent
property. Having that said, very often you don't need this information at all and can simply apply the same logic to both cases (skip the if IsPersistent
check).
Updates
The default update interval of all persistent views of an actor system is configurable:
akka.persistence.view.auto-update-interval = 5s
PersistentView
implementation classes may also override the AutoUpdateInterval
method to return a custom update interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an Update message.
IActorRef view = system.ActorOf();
view.Tell(new Update(true));
If the await parameter is set to true, messages that follow the Update
request are processed when the incremental message replay, triggered by that update request, completed. If set to false (default), messages following the update request may interleave with the replayed message stream.
Automated updates of all persistent views of an actor system can be turned off by configuration:
akka.persistence.view.auto-update = off
Implementation classes may override the configured default value by overriding the autoUpdate method. To limit the number of replayed messages per update request, applications can configure a custom akka.persistence.view.auto-update-replay-max value or override the AutoUpdateReplayMax
property. The number of replayed messages for manual updates can be limited with the replayMax parameter of the Update message.
Recovery
Initial recovery of persistent views works the very same way as for persistent actors (i.e. by sending a Recover
message to self). The maximum number of replayed messages during initial recovery is determined by AutoUpdateReplayMax
. Further possibilities to customize initial recovery are explained in section Recovery.
Identifiers
A persistent view must have an identifier that doesn't change across different actor incarnations. The identifier must be defined with the ViewId
method.
The ViewId
must differ from the referenced PersistenceId
, unless Snapshots of a view and its persistent actor should be shared (which is what applications usually do not want).
Snapshots
Snapshots can dramatically reduce recovery times of persistent actors and views. The following discusses snapshots in context of persistent actors but this is also applicable to persistent views.
Persistent actors can save snapshots of internal state by calling the SaveSnapshot
method. If saving of a snapshot succeeds, the persistent actor receives a SaveSnapshotSuccess
message, otherwise a SaveSnapshotFailure
message.
public class DocumentPersistentSnapshotActor : ReceivePersistentActor
{
private List _messages = new List();
private int _pagesSinceLastSnapshot = 0;
public DocumentPersistentSnapshotActor()
{
//...
Command(message =>
{
Persist(message, page =>
{
_messages.Add(page);
if (++_pagesSinceLastSnapshot % 5 == 0)
{
SaveSnapshot(_messages);
}
});
});
Command(success => {
// handle snapshot save success...
DeleteMessages(success.Metadata.SequenceNr);
});
Command(failure => {
// handle snapshot save failure...
});
}
public override string PersistenceId { get; } = "HardCoded";
}
During recovery, the persistent actor is offered a previously saved snapshot via a SnapshotOffer
message from which it can initialize internal state.
Recover(page =>
{
_messages.Add(page);
});
Recover(offer => {
var msgs = offer.Snapshot as List;
if (msgs != null)
_messages = _messages.Concat(msgs).ToList();
});
The replayed messages that follow the SnapshotOffer
message, if any, are younger than the offered snapshot. They finally recover the persistent actor to its current (i.e. latest) state.
In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots and at least one of these snapshots matches the SnapshotSelectionCriteria
that can be specified for recovery.
public override Recovery Recovery
{
get
{
return new Recovery(new SnapshotSelectionCriteria(150, DateTime.UtcNow));
}
}
If not specified, they default to SnapshotSelectionCriteria.Latest
which selects the latest (= youngest) snapshot. To disable snapshot-based recovery, applications should use SnapshotSelectionCriteria.None
. A recovery where no saved snapshot matches the specified SnapshotSelectionCriteria
will replay all journaled messages.
At-Least-Once Delivery
At-Least-Once Delivery actors are specializations of persistent actors and may be used to provide at-least-once delivery semantics, even in cases where one of the communication endpoints crashes. Because it's possible that the same message will be send twice, actor's receive behavior must work in the idempotent manner.
Members:
Deliver
method is used to send a message to another actor in at-least-once delivery semantics. A message sent this way must be confirmed by the other endpoint with theConfirmDelivery
method. Otherwise it will be resent again and again until the redelivery limit is reached.GetDeliverySnapshot
andSetDeliverySnapshot
methods are used as part of a delivery snapshotting strategy. They return/reset state of the current guaranteed delivery actor's unconfirmed messages. In order to save custom deliverer state inside a snapshot, a returned delivery snapshot should be included in that snapshot and reset in ReceiveRecovery method, whenSnapshotOffer
arrives.RedeliveryBurstLimit
is a virtual property which determines the maximum number of unconfirmed messages to be send in each redelivery attempt. It may be useful in preventing message overflow scenarios. It may be overridden or configured inside HOCON configuration under akka.persistence.at-least-once-delivery.redelivery-burst-limit path (10 000 by default).UnconfirmedDeliveryAttemptsToWarn
is a virtual property which determines how many unconfirmed deliveries may be sent before guaranteed delivery actor will send anUnconfirmedWarning
message to itself. The count is reset after the actor's restart. It may be overridden or configured inside HOCON configuration under akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts path (5 by default).MaxUnconfirmedMessages
is a virtual property which determines the maximum number of unconfirmed deliveries to hold in memory. After this threshold is exceeded, anyDeliver
method will raiseMaxUnconfirmedMessagesExceededException
. It may be overridden or configured inside HOCON configuration under akka.persistence.at-least-once-delivery.max-unconfirmed-messages path (100 000 by default).UnconfirmedCount
property shows the number of unconfirmed messages.
Relationship between Deliver and ConfirmDelivery
To send messages to the destination path, use the Deliver
method after you have persisted the intent to send the message.
The destination actor must send back a confirmation message. When the sending actor receives this confirmation message you should persist the fact that the message was delivered successfully and then call the ConfirmDelivery
method.
If the persistent actor is not currently recovering, the deliver method will send the message to the destination actor. When recovering, messages will be buffered until they have been confirmed using ConfirmDelivery
. Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay), the persistent actor will resend these before sending any other messages.
Deliver requires a deliveryMessageMapper
function to pass the provided deliveryId
into the message so that the correlation between Deliver
and ConfirmDelivery
is possible. The deliveryId
must do the round trip. Upon receipt of the message, the destination actor will send the same deliveryId
wrapped in a confirmation message back to the sender. The sender will then use it to call the ConfirmDelivery
method to complete the delivery routine.
public class Msg
{
public Msg(long deliveryId, string message)
{
DeliveryId = deliveryId;
Message = message;
}
public long DeliveryId { get; }
public string Message { get; }
}
public class Confirm
{
public Confirm(long deliveryId)
{
DeliveryId = deliveryId;
}
public long DeliveryId { get; }
}
public class MsgSent
{
public MsgSent(string message)
{
Message = message;
}
public string Message { get; }
}
public class MsgConfirmed
{
public MsgConfirmed(long deliveryId)
{
DeliveryId = deliveryId;
}
public long DeliveryId { get; }
}
public class MyAtLeastOneDeliveryReceiveActor : AtLeastOnceDeliveryReceiveActor
{
private readonly IActorRef _destionationActor = Context.ActorOf();
public MyAtLeastOneDeliveryReceiveActor()
{
Recover(msgSent => Handler(msgSent));
Recover(msgConfirmed => Handler(msgConfirmed));
Command(str =>
{
Persist(new MsgSent(str), Handler);
});
Command(confirm =>
{
Persist(new MsgConfirmed(confirm.DeliveryId), Handler);
});
}
private void Handler(MsgSent msgSent)
{
Deliver(_destionationActor.Path, l => new Msg(l, msgSent.Message));
}
private void Handler(MsgConfirmed msgConfirmed)
{
ConfirmDelivery(msgConfirmed.DeliveryId);
}
public override string PersistenceId { get; } = "HardCoded";
}
public class MyDestinationActor : ReceiveActor
{
public MyDestinationActor()
{
Receive(msg =>
{
Sender.Tell(new Confirm(msg.DeliveryId), Self);
});
}
}
The deliveryId
generated by the persistence module is a strictly monotonically increasing sequence number without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see gaps in the sequence. It is not possible to use custom deliveryId
. However, you can send a custom correlation identifier in the message to the destination. You must then retain a mapping between the internal deliveryId
(passed into the deliveryMessageMapper
function) and your custom correlation id (passed into the message). You can do this by storing such mapping in a Map(CorrelationId -> DeliveryId) from which you can retrieve the deliveryId
to be passed into the ConfirmDelivery
method once the receiver of your message has replied with your custom correlation id.
The AtLeastOnceDeliveryReceiveActor
class has a state consisting of unconfirmed messages and a sequence number. It does not store this state itself. You must persist events corresponding to the Deliver
and ConfirmDelivery
invocations from your PersistentActor so that the state can be restored by calling the same methods during the recovery phase of the PersistentActor. Sometimes these events can be derived from other business level events, and sometimes you must create separate events. During recovery, calls to deliver will not send out messages, those will be sent later if no matching ConfirmDelivery
will have been performed.
Support for snapshots is provided by GetDeliverySnapshot
and SetDeliverySnapshot
. The AtLeastOnceDeliverySnapshot
contains the full delivery state, including unconfirmed messages. If you need a custom snapshot for other parts of the actor state you must also include the AtLeastOnceDeliverySnapshot
. It is serialized using protobuf with the ordinary Akka serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot
as a blob in your custom snapshot.
The interval between redelivery attempts is defined by the RedeliverInterval
method. The default value can be configured with the akka.persistence.at-least-once-delivery.redeliver-interval configuration key. The method can be overridden by implementation classes to return non-default values.
The maximum number of messages that will be sent at each redelivery burst is defined by the RedeliveryBurstLimit
method (burst frequency is half of the redelivery interval). If there's a lot of unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming amount of messages to be sent at once. The default value can be configured with the akka.persistence.at-least-once-delivery.redelivery-burst-limit configuration key. The method can be overridden by implementation classes to return non-default values.
After a number of delivery attempts a UnconfirmedWarning
message will be sent to self. The re-sending will still continue, but you can choose to call ConfirmDelivery
to cancel the re-sending. The number of delivery attempts before emitting the warning is defined by the WarnAfterNumberOfUnconfirmedAttempts
property. The default value can be configured with the akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts configuration key. The method can be overridden by implementation classes to return non-default values.
The AtLeastOnceDeliveryReceiveActor
class holds messages in memory until their successful delivery has been confirmed. The maximum number of unconfirmed messages that the actor is allowed to hold in memory is defined by the MaxUnconfirmedMessages
method. If this limit is exceed the deliver method will not accept more messages and it will throw MaxUnconfirmedMessagesExceededException
. The default value can be configured with the akka.persistence.at-least-once-delivery.max-unconfirmed-messages configuration key. The method can be overridden by implementation classes to return non-default values.
Journals
Journal is a specialized type of actor which exposes an API to handle incoming events and store them in backend storage. By default Akka.Persitence uses a MemoryJournal
which stores all events in memory and therefore it's not persistent storage. A custom journal configuration path may be specified inside akka.persistence.journal.plugin path and by default it requires two keys set: class and plugin-dispatcher. Example configuration:
akka {
persistence {
journal {
# Path to the journal plugin to be used
plugin = "akka.persistence.journal.inmem"
# In-memory journal plugin.
inmem {
# Class name of the plugin.
class = "Akka.Persistence.Journal.MemoryJournal, Akka.Persistence"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
}
}
}
Snapshot store
Snapshot store is a specialized type of actor which exposes an API to handle incoming snapshot-related requests and is able to save snapshots in some backend storage. By default Akka.Persistence uses a LocalSnapshotStore
, which uses a local file system as storage. A custom snapshot store configuration path may be specified inside akka.persistence.snapshot-store.plugin path and by default it requires two keys set: class and plugin-dispatcher. Example configuration:
akka {
persistence {
snapshot-store {
# Path to the snapshot store plugin to be used
plugin = "akka.persistence.snapshot-store.local"
# Local filesystem snapshot store plugin.
local {
# Class name of the plugin.
class = "Akka.Persistence.Snapshot.LocalSnapshotStore, Akka.Persistence"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for streaming snapshot IO.
stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"
# Storage location of snapshot files.
dir = "snapshots"
}
}
}
}
Event adapters
Event adapters are an intermediate layer on top of your journal, that allows to produce different data model depending on stored/recovered event type. It's especially useful in situations like:
- Event versioning - since events may change their structure over the course of time, you may specify custom event adapter that will deal with mapping obsolete data types accordingly to current business logic.
- Separation of domain model from stored data in cases when such separation is necessary.
- Utilization of persistent backend specific data types as they allow transition between data understood by actors and specialized format allowed by datastores. Examples of such may be: BSON in MongoDb or JSON data type in PostgreSQL.
For custom event adapter simply create class implementing IEventAdapter
interface. It's required, that it should either expose parameterless constructor or the one that has ExtendedActorSystem
as its only argument. Then in order to use it, you'll need to register it and bind to a particular type of events using HOCON configuration - type assignability rules applies here and the most specific types have precedence over the more general ones:
akka.persistence.journal {
<journal_identifier> {
event-adapters {
tagging = "<fully qualified event adapter type name with assembly>"
v1 = "<fully qualified event adapter type name with assembly>"
v2 = "<fully qualified event adapter type name with assembly>"
}
event-adapter-bindings {
"<fully qualified event type name with assembly>" = v1
"<fully qualified event type name with assembly>" = [v2, tagging]
}
}
}
Multiple event adapters may be applied to a single type (for recovery). If that is the case, their order will match order of the definition in event-adapter-bindings config section. For write side, each adapter may decide to return none, one or many adapted event for each single event provided as an input. In case of multiple adapters attached, each one of them may decide to return its own set of adapted events. They all will be stored in the same order corresponding to adapters order.
Contributing
Akka persistence plugin gives a custom journal and snapshot store creator a built-in set of tests, which can be used to verify correctness of the implemented backend storage plugins. It's available through Akka.Persistence.TestKit
package and uses xUnit as the default test framework.