NAV

go-workflows

go-workflows is an embedded engine for orchestrating long running processes or "workflows" written in Go.

It borrows heavily from Temporal (and since it's a fork also Cadence) as well as Azure's Durable Task Framework (DTFx). Workflows are written in plain Go.

go-workflows support pluggable backends with official implementations for Sqlite, MySql, and Redis.

See also the following blog posts:

Quickstart

A short walkthrough of the most important concepts:

Workflow

func Workflow1(ctx workflow.Context, input string) error {
    r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
    if err != nil {
        panic("error getting activity 1 result")
    }

    log.Println("A1 result:", r1)

    r2, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
    if err != nil {
        panic("error getting activity 1 result")
    }

    log.Println("A2 result:", r2)

    return nil
}

Let's first write a simple workflows. Our workflow executes two activities in sequence waiting for each result. Both workflows and activities are written in plain Go. Workflows can be long-running and have to be deterministic so that they can be interrupted and resumed. Activities are functions that can have side-effects and don't have to be deterministic.

Both workflows and activities support arbitrary inputs and outputs as long as those are serializable.

Workflows have to take a workflow.Context as their first argument.

Activities

func Activity1(ctx context.Context, a, b int) (int, error) {
    return a + b, nil
}

func Activity2(ctx context.Context) (int, error) {
    return 12, nil
}

Activities receive a plain context.Context as their first argument. Activities are automatically retried by default, so it's good practice to make them idempotent.

Worker

func runWorker(ctx context.Context, mb backend.Backend) {
    w := worker.New(mb, nil)

    w.RegisterWorkflow(Workflow1)

    w.RegisterActivity(Activity1)
    w.RegisterActivity(Activity2)

    if err := w.Start(ctx); err != nil {
        panic("could not start worker")
    }
}

Next, we'll have to start a worker. Workers are responsible for executing workflows and activities and therefore we need to register both with the worker.

Backends support multiple worker processes, so you can scale out horizontially.

Backend

b := sqlite.NewSqliteBackend("simple.sqlite")

The backend is responsible for persisting the workflow events. Currently there is an in-memory backend implementation for testing, one using SQLite, one using MySql, and one using Redis. See backends for more information.

Putting it all together

func main() {
    ctx := context.Background()

    b := sqlite.NewSqliteBackend("simple.sqlite")

    go runWorker(ctx, b)

    c := client.New(b)

    wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
        InstanceID: uuid.NewString(),
    }, Workflow1, "input-for-workflow")
    if err != nil {
        panic("could not start workflow")
    }

    c2 := make(chan os.Signal, 1)
    signal.Notify(c2, os.Interrupt)
    <-c2
}

To finish the example, we create the backend, start a worker in a separate go-routine. We then create a Client instance which we then use to create a new workflow instance. A workflow instance is just one running instance of a previously registered workflow.

With the exception of the in-memory backend, we do not have to start the workflow from the same process the worker runs in, we could create the client from another process and create/wait for/cancel/... workflow instances from there.

Guide

Writing workflows

func Workflow1(ctx workflow.Context) error {
}

Workflows must accept workflow.Context as their first parameter. They can also receive any number of inputs parameters afterwards. Parameters need to be serializable (e.g., no chans etc.) by the default or any custom Converter.

Workflows must return an error and optionally one additional result, which again needs to be serializable by the used Converter.

Registering workflows

var b backend.Backend
w := worker.New(b)

w.RegisterWorkflow(Workflow1)

Workflows needs to be registered with the worker before they can be started. The name is automatically inferred from the function name.

Writing activities

func Activity1(ctx context.Context, a, b int) (int, error) {
    return a + b, nil
}

func Activity2(ctx context.Context) error {
    return a + b, nil
}

Activities must accept a context.Context as their first parameter. They can also receive any number of inputs parameters afterwards. Parameters need to be serializable (e.g., no chans etc.) by the default or any custom Converter. Activities must return an error and optionally one additional result, which again needs to be serializable by the used Converter.

Registering activities

Register activities as functions:

func Activity1(ctx context.Context, a, b int) (int, error) {
    return a + b, nil
}

var w worker.Worker
w.RegisterActivity(Activity1)

Register activities using a struct. SharedState will be available to all activities executed by this worker:

type act struct {
    SharedState int
}

func (a *act) Activity1(ctx context.Context, a, b int) (int, error) {
    return a + b + act.SharedState, nil
}

func (a *act) Activity2(ctx context.Context, a int) (int, error) {
    return a * act.SharedState, nil
}

var w worker.Worker

a := &act{
  SharedState: 12,
}

w.RegisterActivity(a)

Similar to workflows, activities need to be registered with the worker before they can be started.

Activites can be registered as plain funcs or as methods on a struct. The latter is useful if you want to provide some shared state to activities, for example, a database connection.

To execute activities registered as methods on a struct, pass the method to workflow.ExecuteActivity.

Calling activities registered on a struct

// ...
var a *act

r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a.Activity1, 35, 12).Get(ctx)
if err != nil {
    // handle error
}

// Output r1 = 47 + 12 (from the worker registration) = 59

Starting workflows

wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
    InstanceID: uuid.NewString(),
}, Workflow1, "input-for-workflow")
if err != nil {
    // ...
}

CreateWorkflowInstance on a client instance will start a new workflow instance. Pass options, a workflow to run, and any inputs.

Canceling workflows

var c *client.Client
err = c.CancelWorkflowInstance(context.Background(), workflowInstance)
if err != nil {
    panic("could not cancel workflow")
}

Create a Client instance then then call CancelWorkflow to cancel a workflow. When a workflow is canceled, its workflow context is canceled. Any subsequent calls to schedule activities or sub-workflows will immediately return an error, skipping their execution. Any activities already running when a workflow is canceled will still run to completion and their result will be available.

Sub-workflows will be canceled if their parent workflow is canceled.

Perform any cleanup in cancelled workflow

func Workflow2(ctx workflow.Context, msg string) (string, error) {
    defer func() {
        if errors.Is(ctx.Err(), workflow.Canceled) {
            // Workflow was canceled. Get new context to perform any cleanup activities
            ctx := workflow.NewDisconnectedContext(ctx)

            // Execute the cleanup activity
            if err := workflow.ExecuteActivity(ctx, ActivityCleanup).Get(ctx, nil); err != nil {
                return errors.Wrap(err, "could not perform cleanup")
            }
        }
    }()

    r1, err := workflow.ExecuteActivity[int](ctx, ActivityCancel, 1, 2).Get(ctx)
    if err != nil {  // <---- Workflow is canceled while this activity is running
        return errors.Wrap(err, "could not get ActivityCancel result")
    }

    // r1 will contain the result of ActivityCancel
    // ⬇ ActivitySkip will be skipped immediately
    r2, err := workflow.ExecuteActivity(ctx, ActivitySkip, 1, 2).Get(ctx)
    if err != nil {
        return errors.Wrap(err, "could not get ActivitySkip result")
    }

    return "some result", nil
}

If you need to run any activities or make calls using workflow.Context you need to create a new context with workflow.NewDisconnectedContext, since the original context is canceled at this point.

Workers

defaultWorker := worker.New(mb, &worker.Options{})

workflowWorker := worker.NewWorkflowWorker(b, &worker.WorkflowWorkerOptions{})

activityWorker := worker.NewActivityWorker(b, &worker.ActivityWorkerOptions{})

There are three different types of workers:

defaultWorker.RegisterWorkflow(Workflow1)
defaultWorker.RegisterActivity(Activity1)

ctx, cancel := context.WithCancel(context.Background())
defaultWorker.Start(ctx)

cancel()
defaultWorker.WaitForCompletion()

All workers have the same simple interface. You can register workflows and activities, start the worker, and when shutting down wait for all pending tasks to be finished.

Queues

Workers can pull workflow and activity tasks from different queues. By default workers listen to two queues:

For now, every worker will always pull from _system_, but you can configure other queues you want to listen to. All worker options structs take a Queues option.

When starting workflows, creating sub-workflow instances, or scheduling activities you can pass a queue you want the task to be scheduled on.

The default behavior if no explicit queue is given:

Executing activities

r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12, nil, "test").Get(ctx)
if err != nil {
    panic("error getting activity 1 result")
}

log.Println(r1)

From a workflow, call workflow.ExecuteActivity to execute an activity. The call returns a Future[T] you can await to get the result or any error it might return.

Executing activities on a specific queue

r1, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
    Queue: "my-queue",
}, Activity1, 35, 12, nil, "test").Get(ctx)
if err != nil {
    panic("error getting activity 1 result")
}

log.Println(r1)

Canceling activities

Canceling activities is not supported at this time.

Timers

t := workflow.ScheduleTimer(ctx, 2*time.Second)
err := t.Get(ctx, nil)

You can schedule timers to fire at any point in the future by calling workflow.ScheduleTimer. It returns a Future you can await to wait for the timer to fire.

t := workflow.ScheduleTimer(ctx, 2*time.Second, workflow.WithTimerName("my-timer"))

You can optionally name timers for tracing and debugging purposes.

Canceling timers

tctx, cancel := workflow.WithCancel(ctx)
t := workflow.ScheduleTimer(tctx, 2*time.Second)

// Cancel the timer
cancel()

There is no explicit API to cancel timers. You can cancel a timer by creating a cancelable context, and canceling that.

Signals

// From outside the workflow:
c.SignalWorkflow(ctx, "<instance-id>", "signal-name", "value")

func Workflow(ctx workflow.Context) error {
    // ...

    signalCh := workflow.NewSignalChannel[string](ctx, "signal-name")

    // Pause workflow until signal is received
    workflow.Select(ctx,
        workflow.Receive(signalCh, func(ctx workflow.Context, r string, ok bool) {
            logger.Debug("Received signal:", r)
        }),
    )

    // Continue execution
}

Signals are a way to send a message to a running workflow instance. You can send a signal to a workflow by calling workflow.Signal and listen to them by creating a SignalChannel via NewSignalChannel.

Signaling other workflows from within a workflow

func Workflow(ctx workflow.Context) error {
    if _, err := workflow.SignalWorkflow(ctx, "sub-instance-id", "signal-name", "value").Get(ctx); err != nil {
        // Handle error
    }
}

You can also signal a workflow from within another workflow. This is useful if you want to signal a sub-workflow from its parent or vice versa.

Executing side effects

id, _ := workflow.SideEffect[string](ctx, func(ctx workflow.Context) string) {
    return uuid.NewString()
}).Get(ctx)

Sometimes scheduling an activity is too much overhead for a simple side effect. For those scenarios you can use workflow.SideEffect. You can pass a func which will be executed only once inline with its result being recorded in the history. Subsequent executions of the workflow will return the previously recorded result.

Executing sub-workflows

func Workflow1(ctx workflow.Context, msg string) error {
    result, err := workflow.CreateSubWorkflowInstance[int]
        ctx, workflow.SubWorkflowInstanceOptions{}, SubWorkflow, "some input").Get(ctx)
    if err != nil {
        return errors.Wrap(err, "could not get sub workflow result")
    }

    logger.Debug("Sub workflow result:", "result", result)

    return nil
}

func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
    r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12).Get(ctx)
    if err != nil {
        return "", errors.Wrap(err, "could not get activity result")
    }

    logger.Debug("A1 result:", "r1", r1)
    return r1, nil
}

Call workflow.CreateSubWorkflowInstance to start a sub-workflow. The returned Future will resolve once the sub-workflow has finished.

Executing sub-workflows on a specific queue

result, err := workflow.CreateSubWorkflowInstance[int]
    ctx, workflow.SubWorkflowInstanceOptions{
        Queue: "my-queue",
    }, SubWorkflow, "some input").Get(ctx)
if err != nil {
    return errors.Wrap(err, "could not get sub workflow result")
}

Canceling sub-workflows

Similar to timer cancellation, you can pass a cancelable context to CreateSubWorkflowInstance and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the Client. See Canceling workflows for more details.

Error handling

Custom errors

func handleError(ctx workflow.Context, logger log.Logger, err error) {
  var werr *workflow.Error
    if errors.As(err, &werr) {
    switch werr.Type {
      case "CustomError": // This was a `type CustomError struct...` returned by an activity/subworkflow
            logger.Error("Custom error", "err", werr)
            return
        }

        logger.Error("Generic workflow error", "err", werr, "stack", werr.Stack())
        return
    }

    var perr *workflow.PanicError
    if errors.As(err, &perr) {
    // Activity/subworkflow ran into a panic
        logger.Error("Panic", "err", perr, "stack", perr.Stack())
        return
    }

    logger.Error("Generic error", "err", err)
}

Errors returned from activities and subworkflows need to be marshalled/unmarshalled by the library so they are wrapped in a workflow.Error. You can access the original type via the err.Type field. If a stacktrace was captured, you can access it via err.Stack(). Example (see also samples/errors).

Panics

r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
if err != nil {
    panic("error getting activity 1 result")
}

var perr *workflow.PanicError
if errors.As(err, &perr) {
    logger.Error("Panic", "err", perr, "stack", perr.Stack())
    return
}

A panic in an activity will be captured by the library and made available as a workflow.PanicError in the calling workflow.

Retries

Workflow:

r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
if err != nil {
    panic("error getting activity 1 result")
}

log.Println(r1)

Activity:

func Activity1(ctx context.Context, name string) (int, error) {
    if name == "test" {
        // No need to retry in this case, the activity will aways fail with the given inputs
        return 0, workflow.NewPermanentError(errors.New("test is not a valid name"))
    }

    return http.Do("POST", "https://example.com", name)
}

With the default DefaultActivityOptions, Activities are retried up to three times when they return an error. If you want to keep automatic retries, but want to avoid them when hitting certain error types, you can wrap an error with workflow.NewPermanentError.

func Activity1(ctx context.Context, name string) (int, error) {
    // Current retry attempt
    attempt := activity.Attempt(ctx)

    return http.Do("POST", "https://example.com", name)
}

activity.Attempt returns the current attempt retry.

ContinueAsNew

wf := func(ctx workflow.Context, run int) (int, error) {
    run = run + 1
    if run > 3 {
        return run, workflow.ContinueAsNew(ctx, run)
    }

    return run, nil
}

ContinueAsNew allows you to restart workflow execution with different inputs. The purpose is to keep the history size small enough to avoid hitting size limits, running out of memory and impacting performance. It works by returning a special error from your workflow that contains the new inputs.

Here the workflow is going to be restarted when workflow.ContinueAsNew is returned. Internally the new execution starts with a fresh history. It uses the same InstanceID but a different ExecutionID.

If a sub-workflow is restarted, the caller doesn't notice this, only once it ends without being restarted the caller will get the result and control will be passed back.

select

var f1 workflow.Future[int]
var c workflow.Channel[int]

value := 42

workflow.Select(
    ctx,
    workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
        r, err := f.Get(ctx)
        // ...
    }),
    workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
        // use v
    }),
    workflow.Default(ctx, func (ctx workflow.Context) {
        // ...
    })
)

Due its non-deterministic behavior you must not use a select statement in workflows. Instead you can use the provided workflow.Select function. It blocks until one of the provided cases is ready. Cases are evaluated in the order passed to `Select.

Waiting for a Future

var f1, f2 workflow.Future[int]

workflow.Select(
    ctx,
    workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
        r, err := f.Get(ctx)
        // ...
    }),
    workflow.Await(f2, func (ctx workflow.Context, f Future[int]) {
        r, err := f.Get(ctx)
        // ...
    }),
)

Await adds a case to wait for a Future to have a value.

Waiting to receive from a Channel

var c workflow.Channel[int]

workflow.Select(
    ctx,
    workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
        // ...
    }),
)

Receive adds a case to receive from a given channel.

Default/Non-blocking

var f1 workflow.Future[int]

workflow.Select(
    ctx,
    workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
        r, err := f.Get(ctx, &r)
        // ...
    }),
    workflow.Default(ctx, func (ctx workflow.Context) {
        // ...
    })
)

A Default case is executed if no previous case is ready to be selected.

Testing Workflows

func TestWorkflow(t *testing.T) {
    tester := tester.NewWorkflowTester[int](Workflow1)

    // Mock two activities
    tester.OnActivity(Activity1, mock.Anything, 35, 12).Return(47, nil)
    tester.OnActivity(Activity2, mock.Anything, mock.Anything, mock.Anything).Return(12, nil)

    // Run workflow with inputs
    tester.Execute("Hello world")

    // Workflows always run to completion, or time-out
    require.True(t, tester.WorkflowFinished())

    wr, werr := tester.WorkflowResult()
    require.Equal(t, 59, wr)
    require.Empty(t, werr)

    // Ensure any expectations set for activity or sub-workflow mocks were met
    tester.AssertExpectations(t)
}

go-workflows includes support for testing workflows, a simple example using mocked activities.

Testing Activities

func Activity(ctx context.Context, a int, b int) (int, error) {
    activity.Logger(ctx).Debug("Activity is called", "a", a)

    return a + b, nil
}

func TestActivity(t *testing.T) {
    ctx := activitytester.WithActivityTestState(context.Background(), "activityID", "instanceID", nil)

    r, err := Activity(ctx, 35, 12)
    require.Equal(t, 47, r)
    require.NoError(t, err)
}

Activities can be tested like any other function. If you make use of the activity context, for example, to retrieve a logger, you can use activitytester.WithActivityTestState to provide a test activity context. If you don't specify a logger, the default logger implementation will be used.

Removing workflow instances

err = c.RemoveWorkflowInstance(ctx, workflowInstance)
if err != nil {
    // ...
}

RemoveWorkflowInstance on a client instance will remove that workflow instance including all history data from the backend. A workflow instance needs to be in the finished state before calling this, otherwise an error will be returned.

err = c.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(time.Now().Add(-time.Hour * 24))
if err != nil {
    // ...
}

RemoveWorkflowInstances on a client instance will remove all finished workflow instances that match the given condition(s). Currently only RemoveFinishedBefore is supported.

Automatically expiring finished workflow instances

This differs for different backend implementations.

SQLite & MySQL

client.StartAutoExpiration(ctx context.Context, delay time.Duration)

When you call this on a client, a workflow will be started in the system queue that will periodically run and remove finished workflow instances that are older than the specified delay.

Redis

b, err := redis.NewRedisBackend(redisClient, redis.WithAutoExpiration(time.Hour * 48))
// ...

When an AutoExpiration is passed to the backend, finished workflow instances will be automatically removed after the specified duration. This works by setting a TTL on the Redis keys for finished workflow instances. If AutoExpiration is set to 0 (the default), no TTL will be set.

Logging

b := sqlite.NewInMemoryBackend(backend.WithLogger(slog.New(slog.Config{Level: slog.LevelDebug}))

go-workflows supports structured logging using Go's slog package. slog.Default is used by default, pass a custom logger with backend.WithLogger when creating a backend instance.

Workflows

logger := workflow.Logger(ctx)

For logging in workflows, you can get a logger using workflow.Logger. The returned logger instance already has the workflow instance set as a default field.

Activities

logger := activity.Logger(ctx)

For logging in activities, you can get a logger using activity.Logger. The returned logger already has the id of the activity, and the workflow instance set as default field.

Tracing

The library supports tracing via OpenTelemetry. When you pass a TracerProvider when creating a backend instance, workflow execution will be traced. You can also add additional spans for both activities and workflows.

Activities

func Activity1(ctx context.Context, a, b int) (int, error) {
    ctx, span := otel.Tracer("activity1").Start(ctx, "Custom Activity1 span")
    defer span.End()

    // Do something
}

The context.Context passed into activities is set up with the correct current span. If you create additional spans, they'll show up under the ActivityTaskExecution.

Workflows

func Workflow(ctx workflow.Context) error {
    ctx, span := workflow.Tracer(ctx).Start(ctx, "Workflow1 span", trace.WithAttributes(
        // Add additional
        attribute.String("msg", "hello world"),
    ))

    // Do something

    span.End()

For workflows the usage is a bit different, the tracer needs to be aware of whether the workflow is being replayed or not. You can get a replay-aware racer with workflow.Tracer(ctx).

Context Propagation

type ContextPropagator interface {
    Inject(context.Context, *Metadata) error
    Extract(context.Context, *Metadata) (context.Context, error)

    InjectFromWorkflow(Context, *Metadata) error
    ExtractToWorkflow(Context, *Metadata) (Context, error)
}

In Go programs it is common to use context.Context to pass around request-scoped data. This library supports context propagation between activities and workflows. When you create a workflow, you can pass a ContextPropagator to the backend to propagate context values.

The context-propagation sample shows an example of how to use this.

Tools

Analyzer

/analyzer contains a simple golangci-lint based analyzer to spot common issues in workflow code.

Diagnostics Web UI

m := http.NewServeMux()
m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(b)))
go http.ListenAndServe(":3000", m)

For investigating workflows, the package includes a simple diagnostic web UI. You can serve it via diag.NewServeMux.

It provides a simple paginated list of workflow instances:

And a way to inspect the history of a workflow instance:

Samples

Activity Registration

Shows to register activities, both as plain functions and on structs to share state.

Cancellation

Shows how to cancel a workflow and now to react to cancellation in workflow implementations.

Complex Parameters

Demonstrates how to pass complex parameters to workflows and activities.

Concurrent

Demonstrates how to run multiple activities concurrently.

Context Propagation

Shows how to propagate context into workflows and from there to activities.

Continue-As-New

Shows how to use ContinueAsNew to restart a workflow for "infinite" workflows.

Converter

Shows how to use a custom converter to convert between different types.

Errors

Shows how to handle errors in workflows and activities.

Retries

Demonstrates sub-workflow and activity retries.

Queues

Shows how to queue workflows and activities to different queues and how to create workers that only listen for specific queues or specific tasks (workflows or activities).

Scale

Simple sample with a split worker and "starter" process.

Signal

Shows how to send and receive signals to workflow.

Signal to a Sub-Workflow

Shows how to send signals to a sub-workflows.

Simple

Simple end-to-end sample with a single activity.

Simple (Split Worker)

Same as the simple example, but with a split worker and "starter" process.

Sub-Workflow

Shows how to implement and call a sub-workflow from a workflow.

Timer

Shows how to use timers in workflows.

Tracing

Shows how to use OpenTelemetry to trace workflows and activities.

Web / Diagnostic UI

Shows how to use the Web UI to monitor workflows and activities.

Backends

There are three backend implementations maintained in this repository. Some backend implementations have custom options and all of them accept:

SQLite

func NewSqliteBackend(path string, opts ...option)

Create a new SQLite backend instance with NewSqliteBackend.

Options

Schema

See migrations/sqlite for the schema and migrations. Main tables:

MySQL

func NewMysqlBackend(host string, port int, user, password, database string, opts ...option)

Create a new MySQL backend instance with NewMysqlBackend.

Options

Schema

See migrations/mysql for the schema and migrations. Main tables:

Redis

func NewRedisBackend(client redis.UniversalClient, opts ...RedisBackendOption)

Create a new Redis backend instance with NewRedisBackend.

Options

Schema/Keys

Shared keys:

Instance specific keys:

Custom implementation

To provide a custom backend, implement the following interface:

type Backend interface {
    // CreateWorkflowInstance creates a new workflow instance
    CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error

    // CancelWorkflowInstance cancels a running workflow instance
    CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error

    // RemoveWorkflowInstance removes a workflow instance
    RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error

    // GetWorkflowInstanceState returns the state of the given workflow instance
    GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)

    // GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
    // is given, only events after that event are returned. Otherwise the full history is returned.
    GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)

    // SignalWorkflow signals a running workflow instance
    //
    // If the given instance does not exist, it will return an error
    SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error

    // GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
    GetWorkflowTask(ctx context.Context) (*WorkflowTask, error)

    // ExtendWorkflowTask extends the lock of a workflow task
    ExtendWorkflowTask(ctx context.Context, taskID string, instance *core.WorkflowInstance) error

    // CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
    //
    // This checkpoints the execution. events are new events from the last workflow execution
    // which will be added to the workflow instance history. workflowEvents are new events for the
    // completed or other workflow instances.
    CompleteWorkflowTask(
        ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState,
        executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error

    // GetActivityTask returns a pending activity task or nil if there are no pending activities
    GetActivityTask(ctx context.Context) (*ActivityTask, error)

    // CompleteActivityTask completes an activity task retrieved using GetActivityTask
    CompleteActivityTask(ctx context.Context, instance *workflow.Instance, activityID string, event *history.Event) error

    // ExtendActivityTask extends the lock of an activity task
    ExtendActivityTask(ctx context.Context, activityID string) error

    // GetStats returns stats about the backend
    GetStats(ctx context.Context) (*Stats, error)

    // Logger returns the configured logger for the backend
    Logger() *slog.Logger

    // Tracer returns the configured trace provider for the backend
    Tracer() trace.Tracer

    // Metrics returns the configured metrics client for the backend
    Metrics() metrics.Client

    // Converter returns the configured converter for the backend
    Converter() converter.Converter

    // ContextPropagators returns the configured context propagators for the backend
    ContextPropagators() []workflow.ContextPropagator

    // Close closes any underlying resources
    Close() error
}

FAQ

How are releases versioned?

For now this library is in a pre-release state. There are no guarantees given regarding breaking changes between (pre)-releases.

Workflow versioning

For now, I've intentionally left out versioning. Cadence, Temporal, and DTFx all support the concept of versions for workflows as well as activities. This is mostly required when you make changes to workflows and need to keep backwards compatibility with workflows that are being executed at the time of the upgrade.

Example: when you change a workflow from:

Workflow version 1

func Workflow1(ctx workflow.Context) {
    r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
    log.Println("A1 result:", r1)

    r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
    log.Println("A2 result:", r2)
}

to

Workflow version 2

func Workflow1(ctx workflow.Context) {
    var r1 int
    r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
    log.Println("A1 result:", r1)

    r3, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity3).Get(ctx)
    log.Println("A3 result:", r3)

    r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
    log.Println("A2 result:", r2)
}

and you replay a workflow history that contains:

  1. ActivitySchedule - Activity1
  2. ActivityCompleted - Activity1
  3. ActivitySchedule - Activity2
  4. ActivityCompleted - Activity2

the workflow will encounter an attempt to execute Activity3 in-between event 2 and 3, for which there is no matching event. This is a non-recoverable error. The usual approach to solve is to version the workflows and every time you make a change to a workflow, you have to check that logic. For this example this could look like:

func Workflow1(ctx workflow.Context) {
    r1, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
    log.Println("A1 result:", r1)

    if workflow.Version(ctx) >= 2 {
        r3, _ := workflow.ExecuteActivity(ctx, workflow.DefaultActivityOptions, Activity3).Get(ctx)
        log.Println("A3 result:", r3)
    }

    r2, _ := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
    log.Println("A2 result:", r2)
}

and only if a workflow instance was created with a version of >= 2 will Activity3 be executed. Older workflows are persisted with a version < 2 and will not execute Activity3.

This kind of check is understandable for simple changes, but it becomes hard and a source of bugs for more complicated workflows. Therefore for now versioning is not supported and the guidance is to rely on side-by-side deployments. See also Azure's Durable Functions documentation for the same topic.

In addition to side-by-side deployments, you can use Queues to route workflows to different workers based on their version.

How to safely upgrade?

All backend implementations have limited support for migrations which by default are automatically executed when a backend is started. This generally assumes only a single running worker. If you use multiple workers, you need to synchronize migration execution yourself.

In general, the guidance is to rely on side-by-side deployments. See also the previous section about workflow versioning.