Activities
Activities are long-running processes that can emit events, be cancelled, and maintain their own lifecycle alongside state machines.
Activity Interface
An activity implements the Activity<TInput, TOutput> interface:
interface Activity<TInput, TOutput> {
execute(input: TInput, sink: EventSink, signal: AbortSignal): Promise<TOutput>;
}
The three parameters provide:
input: Initial configuration datasink: Mechanism to emit events back to the machinesignal: Standard AbortSignal for cancellation
Creating Activities
Basic Activity
import { Activity, EventSink } from "@hex-di/flow";
class DataPollingActivity implements Activity<{ interval: number }, void> {
async execute(input: { interval: number }, sink: EventSink, signal: AbortSignal): Promise<void> {
while (!signal.aborted) {
try {
const data = await this.fetchData();
sink.emit({ type: "DATA_RECEIVED", payload: data });
await this.delay(input.interval, signal);
} catch (error) {
sink.emit({ type: "POLL_ERROR", payload: { error } });
break;
}
}
}
private async fetchData() {
// Fetch implementation
return { timestamp: Date.now(), value: Math.random() };
}
private delay(ms: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(resolve, ms);
signal.addEventListener("abort", () => {
clearTimeout(timeout);
reject(new Error("Aborted"));
});
});
}
}
Using the Activity Factory
Flow provides a convenient factory for creating activities:
import { activity } from "@hex-di/flow";
const dataPollingActivity = activity<{ interval: number }, void>(
"dataPoller",
async (input, sink, signal) => {
while (!signal.aborted) {
const data = await fetchData();
sink.emit({ type: "DATA_RECEIVED", payload: data });
await delay(input.interval, signal);
}
}
);
Activity Manager
The ActivityManager handles spawning, stopping, and tracking activities:
import { createActivityManager } from "@hex-di/flow";
const manager = createActivityManager({
maxConcurrent: 10,
defaultTimeout: 30000,
});
// Spawn an activity
const result = await manager.spawn("poller-1", dataPollingActivity, { interval: 5000 });
// Check status
const status = manager.getStatus("poller-1");
console.log(status); // 'running' | 'completed' | 'failed' | 'cancelled'
// Get result (blocks until complete)
const output = await manager.getResult("poller-1");
// Stop an activity
await manager.stop("poller-1");
// Get all activities
const activities = manager.getAll();
// Clean up
manager.dispose();
Activity Ports
For DI integration, use activity ports to define the contract:
import { activityPort } from "@hex-di/flow";
// Curried API for type inference
export const DataPollerPort = activityPort<{ interval: number; url: string }, { lastSync: Date }>()(
"DataPoller"
);
// The port can be used in effects
const effect = Effect.spawn("poller", { interval: 5000, url: "/api/data" });
Typed Events
Define typed events for better type safety:
import { defineEvents, TypedEventSink } from "@hex-di/flow";
// Define event schemas
const FileEvents = defineEvents({
FILE_UPLOADED: (data: { filename: string; size: number }) => ({
type: "FILE_UPLOADED" as const,
payload: data,
}),
UPLOAD_PROGRESS: (data: { percent: number }) => ({
type: "UPLOAD_PROGRESS" as const,
payload: data,
}),
UPLOAD_ERROR: (data: { error: string }) => ({
type: "UPLOAD_ERROR" as const,
payload: data,
}),
});
// Use in activity with typed sink
class FileUploadActivity implements Activity<File, string> {
async execute(
file: File,
sink: TypedEventSink<typeof FileEvents>,
signal: AbortSignal
): Promise<string> {
sink.emit(
FileEvents.FILE_UPLOADED({
filename: file.name,
size: file.size,
})
);
// Upload logic with progress
for (let percent = 0; percent <= 100; percent += 10) {
if (signal.aborted) throw new Error("Upload cancelled");
sink.emit(FileEvents.UPLOAD_PROGRESS({ percent }));
await this.delay(100);
}
return "upload-id-123";
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Configured Activities
For advanced DI scenarios, use configured activities:
import { ConfiguredActivity, ActivityContext } from "@hex-di/flow";
interface AnalyticsConfig {
endpoint: string;
apiKey: string;
}
const analyticsActivity: ConfiguredActivity<AnalyticsConfig> = {
id: "analytics",
config: {
endpoint: "https://api.analytics.com",
apiKey: process.env.ANALYTICS_KEY!,
},
execute: async (context: ActivityContext<AnalyticsConfig>) => {
const { config, input, sink, signal } = context;
// Use config for initialization
const client = new AnalyticsClient(config.endpoint, config.apiKey);
// Activity logic
while (!signal.aborted) {
const events = await client.fetchEvents();
sink.emit({ type: "ANALYTICS_DATA", payload: events });
await delay(60000); // Poll every minute
}
},
};
Activity Instance Tracking
Track activity lifecycle and status:
interface ActivityInstance {
id: string;
status: ActivityStatus;
startTime: number;
endTime?: number;
error?: unknown;
result?: unknown;
}
type ActivityStatus = "running" | "completed" | "failed" | "cancelled";
// In machine runner
const runner = createMachineRunner(machine);
const status = runner.getActivityStatus("my-activity");
if (status?.status === "running") {
console.log(`Activity running for ${Date.now() - status.startTime}ms`);
}
Integration with State Machines
Activities integrate seamlessly with state machines through effects:
import { defineMachine, Effect } from "@hex-di/flow";
const machine = defineMachine({
id: "file-processor",
initial: "idle",
context: { files: [], currentFile: null },
states: {
idle: {
on: {
START: {
target: "processing",
effects: [Effect.spawn("fileWatcher", { directory: "/uploads" })],
},
},
},
processing: {
on: {
FILE_DETECTED: {
target: "uploading",
actions: [
(ctx, event) => ({
currentFile: event.payload.file,
}),
],
effects: [Effect.spawn("uploader", { file: event.payload.file })],
},
STOP: {
target: "idle",
effects: [Effect.stop("fileWatcher"), Effect.stop("uploader")],
},
},
},
uploading: {
on: {
UPLOAD_COMPLETE: {
target: "processing",
actions: [
(ctx, event) => ({
files: [...ctx.files, event.payload.fileId],
currentFile: null,
}),
],
},
UPLOAD_ERROR: {
target: "error",
effects: [Effect.stop("uploader")],
},
},
},
error: {
entry: [Effect.log("Processing error occurred")],
on: {
RETRY: { target: "processing" },
STOP: { target: "idle" },
},
},
},
});
Testing Activities
Flow provides testing utilities for activities:
import { createTestEventSink, createTestSignal, testActivity } from "@hex-di/flow";
describe("DataPollingActivity", () => {
it("should emit data events", async () => {
const sink = createTestEventSink();
const signal = createTestSignal();
const activity = new DataPollingActivity();
// Run activity in background
const promise = activity.execute({ interval: 100 }, sink, signal.signal);
// Wait for some events
await new Promise(resolve => setTimeout(resolve, 250));
// Stop the activity
signal.abort();
// Check emitted events
expect(sink.events).toHaveLength(2);
expect(sink.events[0].type).toBe("DATA_RECEIVED");
});
it("should handle cancellation", async () => {
const result = await testActivity(dataPollingActivity, {
input: { interval: 1000 },
timeout: 100, // Cancel after 100ms
});
expect(result.status).toBe("cancelled");
expect(result.events).toHaveLength(0);
});
});
Best Practices
- Handle cancellation gracefully: Always check the abort signal
- Emit events regularly: Keep the machine informed of activity progress
- Clean up resources: Use try/finally for cleanup on cancellation
- Type your events: Use defineEvents for type safety
- Keep activities focused: Each activity should have a single responsibility
- Test timeout scenarios: Activities may run indefinitely, test boundaries
- Use activity ports: Define clear contracts for DI integration