Skip to content

Steps and Workflow Authoring

Core rules

  • Keep orchestration logic in workflows.
  • Keep side effects in steps.
  • Keep steps small and retry-safe.
  • Check cancel/command points frequently in long loops.

Decorators

from exchange.workflows.bootstrap import workflow, step
from exchange.workflows.engine.instance import getWorkflows

bootstrap re-exports the public decorators and runtime accessor.

Minimal workflow

from exchange.workflows.bootstrap import workflow
from exchange.workflows.engine.instance import getWorkflows
from exchange.workflows.steps.sleep import sleep_chunked

@workflow(name="demo.small")
def demo_small(duration_s=10):
    rt = getWorkflows()
    loops = int(duration_s * 2)

    for i in range(loops):
        rt.checkCancelled()
        rt.progress("loop %s/%s" % (i + 1, loops))
        sleep_chunked(total_ms=500, chunk_ms=250)

    return {"ok": True}

Built-in step library

Tags

  • tags.read(path)
  • tags.write(path, value)
  • tags.write_confirm(path, value, confirm_path=None, timeout_ms=10000, poll_ms=250)

HTTP

  • http.request(method, url, headers=None, params=None, data=None, json_body=None, timeout_ms=10000)

DB (application DB, not engine internals)

  • db.query(dbName, sql, args=None)
  • db.update(dbName, sql, args=None)

Wait/sleep

  • wait.until_tag_equals(path, expected, timeout_ms=10000, poll_ms=250)
  • sleep.sleep_chunked(total_ms, chunk_ms=250)

Step retry options (DBOS-style)

@step(
    name="my.step",
    options={
        "retries_allowed": True,
        "max_attempts": 3,
        "interval_seconds": 1.0,
        "backoff_rate": 2.0,
    },
)
def my_step(x):
    return x

Mailbox pattern

Use mailbox commands for operator control.

cmd = rt.recvCommand(currentState="RUNNING", topic="cmd", timeoutSeconds=0.2, maxMessages=8)
if cmd and cmd.get("cmd") == "HOLD":
    ...

Registration reminder

Decorators register at import-time. Workflows must be imported by exchange.workflows.apps.load().

Current project registration happens in:

  • exchange.workflows.apps.__init__.load

If you add new workflow modules, import them there.