go-workflows
go-workflows is an embedded engine for orchestrating long running processes or "workflows" written in Go.
It borrows heavily from Temporal (and since it's a fork also Cadence) as well as Azure's Durable Task Framework (DTFx). Workflows are written in plain Go.
go-workflows support pluggable backends with official implementations for Sqlite, MySql, and Redis.
See also the following blog posts:
- https://cschleiden.dev/blog/2022-02-13-go-workflows-part1/
- https://cschleiden.dev/blog/2022-05-02-go-workflows-part2/
Quickstart
A short walkthrough of the most important concepts:
Workflow
func Workflow1(ctx workflow.Context, input string) error {
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println("A1 result:", r1)
r2, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println("A2 result:", r2)
return nil
}
Let's first write a simple workflows. Our workflow executes two activities in sequence waiting for each result. Both workflows and activities are written in plain Go. Workflows can be long-running and have to be deterministic so that they can be interrupted and resumed. Activities are functions that can have side-effects and don't have to be deterministic.
Both workflows and activities support arbitrary inputs and outputs as long as those are serializable.
Workflows have to take a workflow.Context
as their first argument.
Activities
func Activity1(ctx context.Context, a, b int) (int, error) {
return a + b, nil
}
func Activity2(ctx context.Context) (int, error) {
return 12, nil
}
Activities receive a plain context.Context
as their first argument. Activities are automatically retried by default, so it's good practice to make them idempotent.
Worker
func runWorker(ctx context.Context, mb backend.Backend) {
w := worker.New(mb, nil)
w.RegisterWorkflow(Workflow1)
w.RegisterActivity(Activity1)
w.RegisterActivity(Activity2)
if err := w.Start(ctx); err != nil {
panic("could not start worker")
}
}
Next, we'll have to start a worker. Workers are responsible for executing workflows and activities and therefore we need to register both with the worker.
Backends support multiple worker processes, so you can scale out horizontially.
Backend
b := sqlite.NewSqliteBackend("simple.sqlite")
The backend is responsible for persisting the workflow events. Currently there is an in-memory backend implementation for testing, one using SQLite, one using MySql, and one using Redis. See backends for more information.
Putting it all together
func main() {
ctx := context.Background()
b := sqlite.NewSqliteBackend("simple.sqlite")
go runWorker(ctx, b)
c := client.New(b)
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: uuid.NewString(),
}, Workflow1, "input-for-workflow")
if err != nil {
panic("could not start workflow")
}
c2 := make(chan os.Signal, 1)
signal.Notify(c2, os.Interrupt)
<-c2
}
To finish the example, we create the backend, start a worker in a separate go-routine. We then create a Client
instance which we then use to create a new workflow instance. A workflow instance is just one running instance of a previously registered workflow.
With the exception of the in-memory backend, we do not have to start the workflow from the same process the worker runs in, we could create the client from another process and create/wait for/cancel/... workflow instances from there.
Guide
Writing workflows
func Workflow1(ctx workflow.Context) error {
}
Workflows must accept workflow.Context
as their first parameter. They can also receive any number of inputs parameters afterwards. Parameters need to be serializable (e.g., no chan
s etc.) by the default or any custom Converter.
Workflows must return an error
and optionally one additional result, which again needs to be serializable by the used Converter.
Registering workflows
var b backend.Backend
w := worker.New(b)
w.RegisterWorkflow(Workflow1)
Workflows needs to be registered with the worker before they can be started. The name is automatically inferred from the function name.
Writing activities
func Activity1(ctx context.Context, a, b int) (int, error) {
return a + b, nil
}
func Activity2(ctx context.Context) error {
return a + b, nil
}
Activities must accept a context.Context
as their first parameter. They can also receive any number of inputs parameters afterwards. Parameters need to be serializable (e.g., no chan
s etc.) by the default or any custom Converter. Activities must return an error
and optionally one additional result, which again needs to be serializable by the used Converter.
Registering activities
Register activities as functions:
func Activity1(ctx context.Context, a, b int) (int, error) {
return a + b, nil
}
var w worker.Worker
w.RegisterActivity(Activity1)
Register activities using a
struct
.SharedState
will be available to all activities executed by this worker:
type act struct {
SharedState int
}
func (a *act) Activity1(ctx context.Context, a, b int) (int, error) {
return a + b + act.SharedState, nil
}
func (a *act) Activity2(ctx context.Context, a int) (int, error) {
return a * act.SharedState, nil
}
var w worker.Worker
a := &act{
SharedState: 12,
}
w.RegisterActivity(a)
Similar to workflows, activities need to be registered with the worker before they can be started.
Activites can be registered as plain func
s or as methods on a struct
. The latter is useful if you want to provide some shared state to activities, for example, a database connection.
To execute activities registered as methods on a struct
, pass the method to workflow.ExecuteActivity
.
Calling activities registered on a struct
// ...
var a *act
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a.Activity1, 35, 12).Get(ctx)
if err != nil {
// handle error
}
// Output r1 = 47 + 12 (from the worker registration) = 59
Starting workflows
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: uuid.NewString(),
}, Workflow1, "input-for-workflow")
if err != nil {
// ...
}
CreateWorkflowInstance
on a client instance will start a new workflow instance. Pass options, a workflow to run, and any inputs.
Canceling workflows
var c *client.Client
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
if err != nil {
panic("could not cancel workflow")
}
Create a Client
instance then then call CancelWorkflow
to cancel a workflow. When a workflow is canceled, its workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.
Sub-workflows will be canceled if their parent workflow is canceled.
Perform any cleanup in cancelled workflow
func Workflow2(ctx workflow.Context, msg string) (string, error) {
defer func() {
if errors.Is(ctx.Err(), workflow.Canceled) {
// Workflow was canceled. Get new context to perform any cleanup activities
ctx := workflow.NewDisconnectedContext(ctx)
// Execute the cleanup activity
if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
return errors.Wrap(err, "could not perform cleanup")
}
}
}()
r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
if err != nil { // <---- Workflow is canceled while this activity is running
return errors.Wrap(err, "could not get ActivityCancel result")
}
// r1 will contain the result of ActivityCancel
// ⬇ ActivitySkip will be skipped immediately
r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
if err != nil {
return errors.Wrap(err, "could not get ActivitySkip result")
}
return "some result", nil
}
If you need to run any activities or make calls using workflow.Context
you need to create a new context with workflow.NewDisconnectedContext
, since the original context is canceled at this point.
Workers
defaultWorker := worker.New(mb, &worker.Options{})
workflowWorker := worker.NewWorkflowWorker(b, &worker.WorkflowWorkerOptions{})
activityWorker := worker.NewActivityWorker(b, &worker.ActivityWorkerOptions{})
There are three different types of workers:
- the default worker is a combined worker that listens to both workflow and activity queues
- a workflow worker that only listens to workflow queues
- an activity worker that only listens to activity queues
defaultWorker.RegisterWorkflow(Workflow1)
defaultWorker.RegisterActivity(Activity1)
ctx, cancel := context.WithCancel(context.Background())
defaultWorker.Start(ctx)
cancel()
defaultWorker.WaitForCompletion()
All workers have the same simple interface. You can register workflows and activities, start the worker, and when shutting down wait for all pending tasks to be finished.
Queues
Workers can pull workflow and activity tasks from different queues. By default workers listen to two queues:
default
_system_
for system workflows and activities
For now, every worker will always pull from _system_
, but you can configure other queues you want to listen to. All worker options struct
s take a Queues
option.
When starting workflows, creating sub-workflow instances, or scheduling activities you can pass a queue you want the task to be scheduled on.
The default behavior if no explicit queue is given:
- Starting a workflow: the default queue is
default
. - Creating a sub-workflow instance: the default behavior is to inherit the queue from the parent workflow instance.
- Scheduling an activity: the default behavior is to inherit the queue from the parent workflow instance.
Executing activities
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12, nil, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println(r1)
From a workflow, call workflow.ExecuteActivity
to execute an activity. The call returns a Future[T]
you can await to get the result or any error it might return.
Executing activities on a specific queue
r1, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
Queue: "my-queue",
}, Activity1, 35, 12, nil, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println(r1)
Canceling activities
Canceling activities is not supported at this time.
Timers
t := workflow.ScheduleTimer(ctx, 2*time.Second)
err := t.Get(ctx, nil)
You can schedule timers to fire at any point in the future by calling workflow.ScheduleTimer
. It returns a Future
you can await to wait for the timer to fire.
t := workflow.ScheduleTimer(ctx, 2*time.Second, workflow.WithTimerName("my-timer"))
You can optionally name timers for tracing and debugging purposes.
Canceling timers
tctx, cancel := workflow.WithCancel(ctx)
t := workflow.ScheduleTimer(tctx, 2*time.Second)
// Cancel the timer
cancel()
There is no explicit API to cancel timers. You can cancel a timer by creating a cancelable context, and canceling that.
Signals
// From outside the workflow:
c.SignalWorkflow(ctx, "<instance-id>", "signal-name", "value")
func Workflow(ctx workflow.Context) error {
// ...
signalCh := workflow.NewSignalChannel[string](ctx, "signal-name")
// Pause workflow until signal is received
workflow.Select(ctx,
workflow.Receive(signalCh, func(ctx workflow.Context, r string, ok bool) {
logger.Debug("Received signal:", r)
}),
)
// Continue execution
}
Signals are a way to send a message to a running workflow instance. You can send a signal to a workflow by calling workflow.Signal
and listen to them by creating a SignalChannel
via NewSignalChannel
.
Signaling other workflows from within a workflow
func Workflow(ctx workflow.Context) error {
if _, err := workflow.SignalWorkflow(ctx, "sub-instance-id", "signal-name", "value").Get(ctx); err != nil {
// Handle error
}
}
You can also signal a workflow from within another workflow. This is useful if you want to signal a sub-workflow from its parent or vice versa.
Executing side effects
id, _ := workflow.SideEffect[string](ctx, func(ctx workflow.Context) string) {
return uuid.NewString()
}).Get(ctx)
Sometimes scheduling an activity is too much overhead for a simple side effect. For those scenarios you can use workflow.SideEffect
. You can pass a func which will be executed only once inline with its result being recorded in the history. Subsequent executions of the workflow will return the previously recorded result.
Executing sub-workflows
func Workflow1(ctx workflow.Context, msg string) error {
result, err := workflow.CreateSubWorkflowInstance[int]
ctx, workflow.SubWorkflowInstanceOptions{}, SubWorkflow, "some input").Get(ctx)
if err != nil {
return errors.Wrap(err, "could not get sub workflow result")
}
logger.Debug("Sub workflow result:", "result", result)
return nil
}
func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx)
if err != nil {
return "", errors.Wrap(err, "could not get activity result")
}
logger.Debug("A1 result:", "r1", r1)
return r1, nil
}
Call workflow.CreateSubWorkflowInstance
to start a sub-workflow. The returned Future
will resolve once the sub-workflow has finished.
Executing sub-workflows on a specific queue
result, err := workflow.CreateSubWorkflowInstance[int]
ctx, workflow.SubWorkflowInstanceOptions{
Queue: "my-queue",
}, SubWorkflow, "some input").Get(ctx)
if err != nil {
return errors.Wrap(err, "could not get sub workflow result")
}
Canceling sub-workflows
Similar to timer cancellation, you can pass a cancelable context to CreateSubWorkflowInstance
and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the Client
. See Canceling workflows for more details.
Error handling
Custom errors
func handleError(ctx workflow.Context, logger log.Logger, err error) {
var werr *workflow.Error
if errors.As(err, &werr) {
switch werr.Type {
case "CustomError": // This was a `type CustomError struct...` returned by an activity/subworkflow
logger.Error("Custom error", "err", werr)
return
}
logger.Error("Generic workflow error", "err", werr, "stack", werr.Stack())
return
}
var perr *workflow.PanicError
if errors.As(err, &perr) {
// Activity/subworkflow ran into a panic
logger.Error("Panic", "err", perr, "stack", perr.Stack())
return
}
logger.Error("Generic error", "err", err)
}
Errors returned from activities and subworkflows need to be marshalled/unmarshalled by the library so they are wrapped in a workflow.Error
. You can access the original type via the err.Type
field. If a stacktrace was captured, you can access it via err.Stack()
. Example (see also samples/errors
).
Panics
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
var perr *workflow.PanicError
if errors.As(err, &perr) {
logger.Error("Panic", "err", perr, "stack", perr.Stack())
return
}
A panic in an activity will be captured by the library and made available as a workflow.PanicError
in the calling workflow.
Retries
Workflow:
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
if err != nil {
panic("error getting activity 1 result")
}
log.Println(r1)
Activity:
func Activity1(ctx context.Context, name string) (int, error) {
if name == "test" {
// No need to retry in this case, the activity will aways fail with the given inputs
return 0, workflow.NewPermanentError(errors.New("test is not a valid name"))
}
return http.Do("POST", "https://example.com", name)
}
With the default DefaultActivityOptions
, Activities are retried up to three times when they return an error. If you want to keep automatic retries, but want to avoid them when hitting certain error types, you can wrap an error with workflow.NewPermanentError
.
func Activity1(ctx context.Context, name string) (int, error) {
// Current retry attempt
attempt := activity.Attempt(ctx)
return http.Do("POST", "https://example.com", name)
}
activity.Attempt
returns the current attempt retry.
ContinueAsNew
wf := func(ctx workflow.Context, run int) (int, error) {
run = run + 1
if run > 3 {
return run, workflow.ContinueAsNew(ctx, run)
}
return run, nil
}
ContinueAsNew
allows you to restart workflow execution with different inputs. The purpose is to keep the history size small enough to avoid hitting size limits, running out of memory and impacting performance. It works by returning a special error
from your workflow that contains the new inputs.
Here the workflow is going to be restarted when workflow.ContinueAsNew
is returned. Internally the new execution starts with a fresh history. It uses the same InstanceID
but a different ExecutionID
.
If a sub-workflow is restarted, the caller doesn't notice this, only once it ends without being restarted the caller will get the result and control will be passed back.
select
var f1 workflow.Future[int]
var c workflow.Channel[int]
value := 42
workflow.Select(
ctx,
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx)
// ...
}),
workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
// use v
}),
workflow.Default(ctx, func (ctx workflow.Context) {
// ...
})
)
Due its non-deterministic behavior you must not use a select
statement in workflows. Instead you can use the provided workflow.Select
function. It blocks until one of the provided cases is ready. Cases are evaluated in the order passed to `Select.
Waiting for a Future
var f1, f2 workflow.Future[int]
workflow.Select(
ctx,
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx)
// ...
}),
workflow.Await(f2, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx)
// ...
}),
)
Await
adds a case to wait for a Future
to have a value.
Waiting to receive from a Channel
var c workflow.Channel[int]
workflow.Select(
ctx,
workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
// ...
}),
)
Receive
adds a case to receive from a given channel.
Default/Non-blocking
var f1 workflow.Future[int]
workflow.Select(
ctx,
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
r, err := f.Get(ctx, &r)
// ...
}),
workflow.Default(ctx, func (ctx workflow.Context) {
// ...
})
)
A Default
case is executed if no previous case is ready to be selected.
Testing Workflows
func TestWorkflow(t *testing.T) {
tester := tester.NewWorkflowTester[int](Workflow1)
// Mock two activities
tester.OnActivity(Activity1, mock.Anything, 35, 12).Return(47, nil)
tester.OnActivity(Activity2, mock.Anything, mock.Anything, mock.Anything).Return(12, nil)
// Run workflow with inputs
tester.Execute("Hello world")
// Workflows always run to completion, or time-out
require.True(t, tester.WorkflowFinished())
wr, werr := tester.WorkflowResult()
require.Equal(t, 59, wr)
require.Empty(t, werr)
// Ensure any expectations set for activity or sub-workflow mocks were met
tester.AssertExpectations(t)
}
go-workflows includes support for testing workflows, a simple example using mocked activities.
- Timers are automatically fired by advancing a mock workflow clock that is used for testing workflows
- You can register callbacks to fire at specific times (in mock-clock time). Callbacks can send signals, cancel workflows etc.
Testing Activities
func Activity(ctx context.Context, a int, b int) (int, error) {
activity.Logger(ctx).Debug("Activity is called", "a", a)
return a + b, nil
}
func TestActivity(t *testing.T) {
ctx := activitytester.WithActivityTestState(context.Background(), "activityID", "instanceID", nil)
r, err := Activity(ctx, 35, 12)
require.Equal(t, 47, r)
require.NoError(t, err)
}
Activities can be tested like any other function. If you make use of the activity context, for example, to retrieve a logger, you can use activitytester.WithActivityTestState
to provide a test activity context. If you don't specify a logger, the default logger implementation will be used.
Removing workflow instances
err = c.RemoveWorkflowInstance(ctx, workflowInstance)
if err != nil {
// ...
}
RemoveWorkflowInstance
on a client instance will remove that workflow instance including all history data from the backend. A workflow instance needs to be in the finished state before calling this, otherwise an error will be returned.
err = c.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(time.Now().Add(-time.Hour * 24))
if err != nil {
// ...
}
RemoveWorkflowInstances
on a client instance will remove all finished workflow instances that match the given condition(s). Currently only RemoveFinishedBefore
is supported.
Automatically expiring finished workflow instances
This differs for different backend implementations.
SQLite & MySQL
client.StartAutoExpiration(ctx context.Context, delay time.Duration)
When you call this on a client, a workflow will be started in the system
queue that will periodically run and remove finished workflow instances that are older than the specified delay.
Redis
b, err := redis.NewRedisBackend(redisClient, redis.WithAutoExpiration(time.Hour * 48))
// ...
When an AutoExpiration
is passed to the backend, finished workflow instances will be automatically removed after the specified duration. This works by setting a TTL on the Redis keys for finished workflow instances. If AutoExpiration
is set to 0
(the default), no TTL will be set.
Logging
b := sqlite.NewInMemoryBackend(backend.WithLogger(slog.New(slog.Config{Level: slog.LevelDebug}))
go-workflows supports structured logging using Go's slog
package. slog.Default
is used by default, pass a custom logger with backend.WithLogger
when creating a backend instance.
Workflows
logger := workflow.Logger(ctx)
For logging in workflows, you can get a logger using workflow.Logger
. The returned logger instance already has the workflow instance set as a default field.
Activities
logger := activity.Logger(ctx)
For logging in activities, you can get a logger using activity.Logger
. The returned logger already has the id of the activity, and the workflow instance set as default field.
Tracing
The library supports tracing via OpenTelemetry. When you pass a TracerProvider
when creating a backend instance, workflow execution will be traced. You can also add additional spans for both activities and workflows.
Activities
func Activity1(ctx context.Context, a, b int) (int, error) {
ctx, span := otel.Tracer("activity1").Start(ctx, "Custom Activity1 span")
defer span.End()
// Do something
}
The context.Context
passed into activities is set up with the correct current span. If you create additional spans, they'll show up under the ActivityTaskExecution
.
Workflows
func Workflow(ctx workflow.Context) error {
ctx, span := workflow.Tracer(ctx).Start(ctx, "Workflow1 span", trace.WithAttributes(
// Add additional
attribute.String("msg", "hello world"),
))
// Do something
span.End()
For workflows the usage is a bit different, the tracer needs to be aware of whether the workflow is being replayed or not. You can get a replay-aware racer with workflow.Tracer(ctx)
.
Context Propagation
type ContextPropagator interface {
Inject(context.Context, *Metadata) error
Extract(context.Context, *Metadata) (context.Context, error)
InjectFromWorkflow(Context, *Metadata) error
ExtractToWorkflow(Context, *Metadata) (Context, error)
}
In Go programs it is common to use context.Context
to pass around request-scoped data. This library supports context propagation between activities and workflows. When you create a workflow, you can pass a ContextPropagator
to the backend to propagate context values.
The context-propagation
sample shows an example of how to use this.
Tools
Analyzer
/analyzer
contains a simple golangci-lint based analyzer to spot common issues in workflow code.
Diagnostics Web UI
m := http.NewServeMux()
m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(b)))
go http.ListenAndServe(":3000", m)
For investigating workflows, the package includes a simple diagnostic web UI. You can serve it via diag.NewServeMux
.
It provides a simple paginated list of workflow instances:
And a way to inspect the history of a workflow instance:
Samples
Activity Registration
Shows to register activities, both as plain functions and on struct
s to share state.
Cancellation
Shows how to cancel a workflow and now to react to cancellation in workflow implementations.
Complex Parameters
Demonstrates how to pass complex parameters to workflows and activities.
Concurrent
Demonstrates how to run multiple activities concurrently.
Context Propagation
Shows how to propagate context into workflows and from there to activities.
Continue-As-New
Shows how to use ContinueAsNew
to restart a workflow for "infinite" workflows.
Converter
Shows how to use a custom converter to convert between different types.
Errors
Shows how to handle errors in workflows and activities.
Retries
Demonstrates sub-workflow and activity retries.
Queues
Shows how to queue workflows and activities to different queues and how to create workers that only listen for specific queues or specific tasks (workflows or activities).
Scale
Simple sample with a split worker and "starter" process.
Signal
Shows how to send and receive signals to workflow.
Signal to a Sub-Workflow
Shows how to send signals to a sub-workflows.
Simple
Simple end-to-end sample with a single activity.
Simple (Split Worker)
Same as the simple example, but with a split worker and "starter" process.
Sub-Workflow
Shows how to implement and call a sub-workflow from a workflow.
Timer
Shows how to use timers in workflows.
Tracing
Shows how to use OpenTelemetry to trace workflows and activities.
Web / Diagnostic UI
Shows how to use the Web UI to monitor workflows and activities.
Backends
There are three backend implementations maintained in this repository. Some backend implementations have custom options and all of them accept:
WithStickyTimeout(timeout time.Duration)
- Set the timeout for sticky tasks. Defaults to 30 secondsWithLogger(logger *slog.Logger)
- Set the logger implementationWithMetrics(client metrics.Client)
- Set the metrics clientWithTracerProvider(tp trace.TracerProvider)
- Set the OpenTelemetry tracer providerWithConverter(converter converter.Converter)
- Provide a customConverter
implementationWithContextPropagator(prop workflow.ContextPropagator)
- Adds a custom context propagator
SQLite
func NewSqliteBackend(path string, opts ...option)
Create a new SQLite backend instance with NewSqliteBackend
.
Options
WithApplyMigrations(applyMigrations bool)
- Set whether migrations should be applied on startup. Defaults totrue
WithBackendOptions(opts ...backend.BackendOption)
- Apply generic backend options
Schema
See migrations/sqlite
for the schema and migrations. Main tables:
instances
- Tracks workflow instances. Functions as instance queue joined withpending_events
pending_events
- Pending events for workflow instanceshistory
- History for workflow instancesactivities
- Queue of pending activitiesattributes
- Payloads of events
MySQL
func NewMysqlBackend(host string, port int, user, password, database string, opts ...option)
Create a new MySQL backend instance with NewMysqlBackend
.
Options
WithMySQLOptions(f func(db *sql.DB))
- Apply custom options to the MySQL database connectionWithApplyMigrations(applyMigrations bool)
- Set whether migrations should be applied on startup. Defaults totrue
WithBackendOptions(opts ...backend.BackendOption)
- Apply generic backend options
Schema
See migrations/mysql
for the schema and migrations. Main tables:
instances
- Tracks workflow instances. Functions as instance queue joined withpending_events
pending_events
- Pending events for workflow instanceshistory
- History for workflow instancesactivities
- Queue of pending activitiesattributes
- Payloads of events
Redis
func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption)
Create a new Redis backend instance with NewRedisBackend
.
Options
WithKeyPrefix
- Set the key prefix for all keys. Defaults to""
WithBlockTimeout(timeout time.Duration)
- Set the timeout for blocking operations. Defaults to5s
WithAutoExpiration(expireFinishedRunsAfter time.Duration)
- Set the expiration time for finished runs. Defaults to0
, which never expires runsWithAutoExpirationContinueAsNew(expireContinuedAsNewRunsAfter time.Duration)
- Set the expiration time for continued as new runs. Defaults to0
, which uses the same value asWithAutoExpiration
WithBackendOptions(opts ...backend.BackendOption)
- Apply generic backend options
Schema/Keys
Shared keys:
instances-by-creation
-ZSET
- Instances sorted by creation timeinstances-active
-SET
- Active instancesinstances-expiring
-SET
- Instances about to expiretask-queue:workflows
-STREAM
- Task queue for workflowstask-queue:activities
-STREAM
- Task queue for activities
Instance specific keys:
active-instance-execution:{instanceID}
- Latest execution for a workflow instanceinstance:{instanceID}:{executionID}
- State of the workflow instancepending-events:{instanceID}:{executionID}
-STREAM
- Pending events for a workflow instancehistory:{instanceID}:{executionID}
-STREAM
- History for a workflow instancepayload:{instanceID}:{executionID}
-HASH
- Payloads of events for given workflow instancefuture-events
-ZSET
- Events not yet visible like timer events
Custom implementation
To provide a custom backend, implement the following interface:
type Backend interface {
// CreateWorkflowInstance creates a new workflow instance
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error
// CancelWorkflowInstance cancels a running workflow instance
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error
// RemoveWorkflowInstance removes a workflow instance
RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
// GetWorkflowInstanceState returns the state of the given workflow instance
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)
// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
// is given, only events after that event are returned. Otherwise the full history is returned.
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)
// SignalWorkflow signals a running workflow instance
//
// If the given instance does not exist, it will return an error
SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)
// ExtendWorkflowTask extends the lock of a workflow task
ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error
// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
//
// This checkpoints the execution. events are new events from the last workflow execution
// which will be added to the workflow instance history. workflowEvents are new events for the
// completed or other workflow instances.
CompleteWorkflowTask(
ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState,
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error
// GetActivityTask returns a pending activity task or nil if there are no pending activities
GetActivityTask(ctx context.Context) (*ActivityTask, error)
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error
// ExtendActivityTask extends the lock of an activity task
ExtendActivityTask(ctx context.Context, activityID string) error
// GetStats returns stats about the backend
GetStats(ctx context.Context) (*Stats, error)
// Logger returns the configured logger for the backend
Logger() *slog.Logger
// Tracer returns the configured trace provider for the backend
Tracer() trace.Tracer
// Metrics returns the configured metrics client for the backend
Metrics() metrics.Client
// Converter returns the configured converter for the backend
Converter() converter.Converter
// ContextPropagators returns the configured context propagators for the backend
ContextPropagators() []workflow.ContextPropagator
// Close closes any underlying resources
Close() error
}
FAQ
How are releases versioned?
For now this library is in a pre-release state. There are no guarantees given regarding breaking changes between (pre)-releases.
Workflow versioning
For now, I've intentionally left out versioning. Cadence, Temporal, and DTFx all support the concept of versions for workflows as well as activities. This is mostly required when you make changes to workflows and need to keep backwards compatibility with workflows that are being executed at the time of the upgrade.
Example: when you change a workflow from:
Workflow version 1
func Workflow1(ctx workflow.Context) {
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
log.Println("A1 result:", r1)
r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
log.Println("A2 result:", r2)
}
to
Workflow version 2
func Workflow1(ctx workflow.Context) {
var r1 int
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
log.Println("A1 result:", r1)
r3, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity3).Get(ctx)
log.Println("A3 result:", r3)
r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
log.Println("A2 result:", r2)
}
and you replay a workflow history that contains:
ActivitySchedule
-Activity1
ActivityCompleted
-Activity1
ActivitySchedule
-Activity2
ActivityCompleted
-Activity2
the workflow will encounter an attempt to execute Activity3
in-between event 2 and 3, for which there is no matching event. This is a non-recoverable error. The usual approach to solve is to version the workflows and every time you make a change to a workflow, you have to check that logic. For this example this could look like:
func Workflow1(ctx workflow.Context) {
r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
log.Println("A1 result:", r1)
if workflow.Version(ctx) >= 2 {
r3, _ := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity3).Get(ctx)
log.Println("A3 result:", r3)
}
r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
log.Println("A2 result:", r2)
}
and only if a workflow instance was created with a version of >= 2
will Activity3
be executed. Older workflows are persisted with a version < 2
and will not execute Activity3
.
This kind of check is understandable for simple changes, but it becomes hard and a source of bugs for more complicated workflows. Therefore for now versioning is not supported and the guidance is to rely on side-by-side deployments. See also Azure's Durable Functions documentation for the same topic.
In addition to side-by-side deployments, you can use Queues to route workflows to different workers based on their version.
How to safely upgrade?
All backend implementations have limited support for migrations which by default are automatically executed when a backend is started. This generally assumes only a single running worker. If you use multiple workers, you need to synchronize migration execution yourself.
In general, the guidance is to rely on side-by-side deployments. See also the previous section about workflow versioning.