Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
AttemptStatus,
RunAnnotations,
RunStatus,
SerializedError,
TaskRunError,
Expand Down Expand Up @@ -56,6 +57,7 @@ const commonRunSelect = {
},
},
runTags: true,
annotations: true,
} satisfies Prisma.TaskRunSelect;

type CommonRelatedRun = Prisma.Result<
Expand Down Expand Up @@ -466,6 +468,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
triggerFunction: resolveTriggerFunction(run),
batchId: run.batch?.friendlyId,
metadata,
annotations: run.annotations ? RunAnnotations.safeParse(run.annotations).data : undefined,
};
}

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
}

const service = new ReplayTaskRunService();
const newRun = await service.call(taskRun);
const newRun = await service.call(taskRun, { triggerSource: "api" });

if (!newRun) {
return json({ error: "Failed to create new run" }, { status: 400 });
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const HeadersSchema = z.object({
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
"x-trigger-request-idempotency-key": z.string().nullish(),
"x-trigger-realtime-streams-version": z.string().nullish(),
"x-trigger-source": z.string().nullish(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});
Expand Down Expand Up @@ -67,6 +68,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
} = headers;

const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
Expand Down Expand Up @@ -119,6 +121,10 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: body.options?.parentRunId
? "sdk"
: triggerSourceHeader ?? "api",
triggerAction: "trigger",
},
engineVersion ?? undefined
);
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -113,6 +114,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: triggerSourceHeader ?? undefined,
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const { action, loader } = createActionApiRoute(
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -127,6 +128,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: triggerSourceHeader ?? undefined,
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => {
ttlSeconds: submission.value.ttlSeconds,
version: submission.value.version,
prioritySeconds: submission.value.prioritySeconds,
triggerSource: "dashboard",
});

if (!newRun) {
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

/**
Expand Down Expand Up @@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
batchId: batch.id,
batchIndex: currentIndex,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: parentRunId ? "sdk" : options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
Comment on lines +683 to +684
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Batch fallback runs still lose trigger annotations.

These fields are only attached on the normal TriggerTaskService.call() path. The pre-failed fallback at Lines 567-580 still calls TriggerFailedTaskService without triggerSource / triggerAction, so any batch item that fails validation, entitlement, or queue checks will create an unannotated run. Please thread the same attribution fields through that fallback path too.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/runEngine/services/batchTrigger.server.ts` around lines 683 -
684, The pre-failed fallback path calls TriggerFailedTaskService without the
trigger attribution fields, causing unannotated runs; update that fallback to
compute the same triggerSource and triggerAction values used in the normal
TriggerTaskService.call() path (e.g., triggerSource: parentRunId ? "sdk" :
options?.triggerSource ?? "api", triggerAction: options?.triggerAction ??
"trigger") and pass them into the TriggerFailedTaskService invocation so
fallback-created runs include the same attribution metadata.

},
"V2"
);
Expand Down
18 changes: 18 additions & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,23 @@ export class RunEngineTriggerTaskService {

const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
const triggerAction = options.triggerAction ?? "trigger";
const parentAnnotations = parentRun?.annotations as
| Record<string, unknown>
| null
| undefined;
const annotations = {
triggerSource,
triggerAction,
rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource,
rootScheduleId:
(parentAnnotations?.rootScheduleId as string | undefined) ||
options.scheduleId ||
undefined,
};

try {
return await this.traceEventConcern.traceRun(
triggerRequest,
Expand Down Expand Up @@ -369,6 +386,7 @@ export class RunEngineTriggerTaskService {
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
debounce: body.options?.debounce,
annotations,
// When debouncing with triggerAndWait, create a span for the debounced trigger
onDebounced:
body.options?.debounce && body.options?.resumeParentOnCompletion
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() {
batchIndex: itemIndex,
realtimeStreamsVersion: meta.realtimeStreamsVersion,
planType: meta.planType,
triggerSource: meta.parentRunId ? "sdk" : "api",
triggerAction: "trigger",
},
"V2"
);
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ function createScheduleEngine() {
scheduleInstanceId,
queueTimestamp: exactScheduleTime,
overrideCreatedAt: exactScheduleTime,
triggerSource: "schedule",
triggerAction: "trigger",
}
);

Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

type RunItemData = {
Expand Down Expand Up @@ -853,6 +855,10 @@ export class BatchTriggerV3Service extends BaseService {
skipChecks: true,
runFriendlyId: task.runId,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: task.item.options?.parentRunId
? "sdk"
: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
}
);

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ export class BulkActionService extends BaseService {
const [error, result] = await tryCatch(
replayService.call(run, {
bulkActionId: bulkActionId,
triggerSource: "dashboard",
})
);
if (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class PerformBulkActionService extends BaseService {
switch (item.group.type) {
case "REPLAY": {
const service = new ReplayTaskRunService(this._prisma);
const result = await service.call(item.sourceRun);
const result = await service.call(item.sourceRun, { triggerSource: "dashboard" });

await this._prisma.bulkActionItem.update({
where: { id: item.id },
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type OverrideOptions = {
payload?: string;
metadata?: unknown;
bulkActionId?: string;
triggerSource?: string;
} & RunOptionsData;

export class ReplayTaskRunService extends BaseService {
Expand Down Expand Up @@ -123,6 +124,8 @@ export class ReplayTaskRunService extends BaseService {
realtimeStreamsVersion: determineRealtimeStreamsVersion(
existingTaskRun.realtimeStreamsVersion
),
triggerSource: overrideOptions.triggerSource ?? "api",
triggerAction: "replay",
}
);

Expand Down
51 changes: 29 additions & 22 deletions apps/webapp/app/v3/services/testTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,35 @@ export class TestTaskService extends BaseService {

switch (triggerSource) {
case "STANDARD": {
const result = await triggerTaskService.call(data.taskIdentifier, environment, {
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds ? new Date(Date.now() + data.delaySeconds * 1000) : undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
const result = await triggerTaskService.call(
data.taskIdentifier,
environment,
{
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds
? new Date(Date.now() + data.delaySeconds * 1000)
: undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
},
},
});
{ triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
}
Expand Down Expand Up @@ -72,7 +79,7 @@ export class TestTaskService extends BaseService {
priority: data.prioritySeconds,
},
},
{ customIcon: "scheduled" }
{ customIcon: "scheduled", triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type TriggerTaskServiceOptions = {
replayedFromTaskRunFriendlyId?: string;
planType?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

export class OutOfEntitlementError extends Error {
Expand Down
3 changes: 3 additions & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,9 @@ model TaskRun {
metadataType String @default("application/json")
metadataVersion Int @default(1)

/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
annotations Json?

/// Run output
output String?
outputType String @default("application/json")
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ export class RunEngine {
planType,
realtimeStreamsVersion,
debounce,
annotations,
onDebounced,
}: TriggerParams,
tx?: PrismaClientOrTransaction
Expand Down Expand Up @@ -668,6 +669,7 @@ export class RunEngine {
createdAt: new Date(),
}
: undefined,
annotations,
executionSnapshots: {
create: {
engine: "V2",
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ export type TriggerParams = {
mode?: "leading" | "trailing";
maxDelay?: string;
};
annotations?: Record<string, unknown>;
/**
* Called when a run is debounced (existing delayed run found with triggerAndWait).
* Return spanIdToComplete to enable span closing when the run completes.
Expand Down
6 changes: 5 additions & 1 deletion packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,19 @@ import { VERSION } from "./version.js";

export class CliApiClient {
private engineURL: string;
private source: string;

constructor(
public readonly apiURL: string,
// TODO: consider making this required
public readonly accessToken?: string,
public readonly branch?: string
public readonly branch?: string,
options?: { source?: string }
) {
this.apiURL = apiURL.replace(/\/$/, "");
this.engineURL = this.apiURL;
this.branch = branch;
this.source = options?.source ?? "cli";
}

async createAuthorizationCode() {
Expand Down Expand Up @@ -819,6 +822,7 @@ export class CliApiClient {
const headers: Record<string, string> = {
Authorization: `Bearer ${this.accessToken}`,
"Content-Type": "application/json",
"x-trigger-source": this.source,
};

if (this.branch) {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-v3/src/mcp/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class McpContext {
public async getCliApiClient(branch?: string) {
const auth = await this.getAuth();

return new CliApiClient(auth.auth.apiUrl, auth.auth.accessToken, branch);
return new CliApiClient(auth.auth.apiUrl, auth.auth.accessToken, branch, { source: "mcp" });
}

public async getApiClient(options: {
Expand Down
Loading
Loading