Skip to main content

Cross-Workflow System Behavior (Implemented)

Dokumen ini menjelaskan behavior runtime apps/worker-workflow/src/cross-workflow sesuai implementasi saat ini.

Komponen utama:

  • apps/worker-workflow/src/cross-workflow/services/cross-workflow.consumer.ts
  • apps/worker-workflow/src/cross-workflow/services/cross-workflow.service.ts
  • apps/worker-workflow/src/cross-workflow/contracts/cross-workflow.contract.ts
  • apps/worker-workflow/src/cross-workflow/utils/relation-type.util.ts

1. Source Event dan Kontrak Trigger

Source trigger cross-workflow:

  • CDC pgcdc.public.wxl_submission_log
  • Consumer memakai @JetStreamConsumer(...) + @MessageHandler()
  • Bootstrap/subscribe dijalankan JetStreamConsumerRegistryService (bukan di consumer)

Payload internal yang dikirim consumer ke service (CrossWorkflowTriggerPayload):

  • submissionId
  • actionId
  • fromStatusId
  • toStatusId
  • actorUserId
  • sourceLogId
  • sourceLsn
  • changedAt
  • sourceTable = 'wxl_submission_log'

2. Filter CDC di Consumer (Kapan Event Diproses)

Consumer hanya memproses event yang memenuhi semua syarat ini:

  1. JSON valid
  2. payload.source.table === 'wxl_submission_log'
  3. payload.op === 'c' (insert log saja)
  4. after.submission_id ada
  5. after.action_id ada

Jika salah satu tidak terpenuhi:

  • event di-ack
  • service tidak dipanggil

3. Mapping CDC -> Trigger Payload

Behavior mapping di consumer:

  • actorUserId fallback: actor_user_id -> user_updated -> user_created
  • changedAt fallback:
    1. after.date_updated (jika valid)
    2. after.date_created (jika valid)
    3. payload.ts_ms
    4. Date.now()
  • sourceLsn hanya diisi jika numeric valid

Semua field string dinormalisasi (trim) dan kosong dianggap null.

4. Flow Utama CrossWorkflowService.execute(...)

Semua eksekusi berjalan dalam transaksi DB (QueryRunner).

Urutan flow:

  1. Start transaction.
  2. Cek apakah trigger benar-benar perubahan status (toStatusId ada dan berbeda dari fromStatusId).
  3. Resolve SubmissionServiceV2 via ModuleRef + ContextIdFactory dengan actor request context.
  4. Lock source submission (pessimistic_read).
  5. Ambil konfigurasi WmlCrossWfConn berdasarkan sourceActionId.
  6. Iterasi tiap connection dan tentukan relation type (ONE_TO_MANY / MANY_TO_ONE / UNKNOWN).
  7. Untuk MANY_TO_ONE, jalankan readiness check dulu.
  8. Resolve target submissions.
  9. Dedupe per targetSubmissionId + targetActionId.
  10. Trigger SubmissionServiceV2.setSubmission(...) pada target.
  11. Commit transaction.

Jika error:

  • rollback transaction
  • error dilempar ulang (consumer akan nak)

5. Guard / Early Return (Skip Dengan Aman)

Service akan commit + return tanpa trigger target jika:

  • transition tidak mengubah status (toStatusId null atau fromStatusId === toStatusId)
  • source submission tidak ditemukan
  • tidak ada koneksi WmlCrossWfConn untuk sourceActionId

Consumer akan ack (bukan retry) jika:

  • payload malformed
  • event tidak relevan untuk cross-workflow

6. Relation Type Behavior

6.1 ONE_TO_MANY

  • Tidak perlu readiness check.
  • Langsung resolve target submissions lalu trigger target action.

6.2 MANY_TO_ONE

  • Wajib lulus isManyToOneReady(...) sebelum trigger target.
  • Jika belum ready:
    • skip connection (bukan error)
    • transaksi tetap commit normal

6.3 UNKNOWN

  • Tidak ada branch khusus.
  • Flow tetap lanjut seperti relation biasa (tanpa readiness gate MANY_TO_ONE).

7. Target Resolution Behavior (resolveTargetSubmissions)

Dasar filter target:

  • workflowId = conn.targetWorkflowId
  • optional entity = conn.targetEntity

Penentuan entityId target:

  1. Coba mapping field (sourceEntity/sourceField -> targetEntity/targetField)
  2. Jika mapping menghasilkan banyak id, gunakan In([...])
  3. Jika mapping kosong, fallback ke sourceSubmission.entityId (backward compatibility)

Query target submission memakai lock:

  • pessimistic_write

8. Field Mapping Behavior dan Safety

Fitur field mapping dipakai untuk relasi lintas entity (mis. parent-child by field value).

Behavior:

  • Ambil nilai field dari entity source (loadEntityFieldValue)
  • Cari row entity target dengan nilai field yang sama
  • Ambil id entity target, lalu cari submission target berdasarkan entityId

Safety guard:

  • nama tabel/field divalidasi regex identifier aman (isSafeIdentifier)
  • jika identifier tidak valid:
    • skip mapping
    • log warning

9. MANY_TO_ONE Readiness Check (Milestone Aggregation)

Tujuan:

  • memastikan semua sibling source yang terkait sudah melewati action milestone sebelum trigger target.

Ringkas flow isManyToOneReady(...):

  1. Validasi config minimum (sourceEntity, sourceField, sourceWorkflowId, sourceActionId)
  2. Ambil nilai grouping dari source entity (mis. parent_id)
  3. Cari semua entity source sibling dengan nilai grouping yang sama
  4. Cari semua submission source untuk sibling tersebut (workflowId + entity + entityId IN (...))
  5. Query wxl_submission_log untuk mendeteksi submission yang sudah menjalankan sourceActionId
    • to_status_id IS NOT NULL
    • from_status_id IS DISTINCT FROM to_status_id
  6. Ready jika semua submission source sibling sudah ada di set completed

Jika config tidak valid / identifier tidak aman / data tidak lengkap:

  • return false (skip trigger)

10. Dedupe dan Idempotency Scope

Dedupe yang aktif saat ini:

  • Set in-memory per transaksi eksekusi
  • key: ${targetSubmissionId}:${targetActionId}

Tujuan:

  • mencegah duplicate trigger dalam satu event ketika beberapa connection menghasilkan target yang sama

Catatan:

  • Ini bukan dedupe lintas event global
  • Redelivery CDC tetap bisa memicu eksekusi ulang (bergantung downstream behavior SubmissionServiceV2)

11. Side Effect ke Target Workflow

Eksekusi target dilakukan via:

  • SubmissionServiceV2.setSubmission(...)

Payload target yang dikirim:

  • submission_id
  • workflow_id
  • action_id
  • actor_id
  • actor_user_id
  • override = true
  • source = 'CROSS_WORKFLOW'
  • do_not_clear_cache = true

12. Error Handling dan Retry

12.1 Consumer

  • Malformed / irrelevant event -> ack
  • Error saat service.execute(...) -> nak(8000)

12.2 Service

  • Error query/trigger target -> rollback transaction lalu throw

13. Test-Covered Behaviors (Saat Ini)

Consumer spec sudah cover:

  • malformed JSON -> ack
  • table non-wxl_submission_log -> ack
  • non-create op -> ack
  • missing submission_id/action_id -> ack
  • valid CDC -> service.execute(...) + ack
  • actor fallback mapping
  • changedAt fallback ke ts_ms
  • service error -> nak(8000)

Service spec sudah cover:

  • early return non-status-change
  • source submission not found
  • no connection config
  • ONE_TO_MANY trigger
  • dedupe target
  • MANY_TO_ONE readiness false/true
  • actor default system
  • rollback on downstream error
  • skip invalid target config
  • MANY_TO_ONE readiness sibling milestone check

14. Batasan Implementasi Saat Ini

  • Consumer hanya listen op='c' (wxl_submission_log insert); update log tidak diproses.
  • Dedupe masih per-event/per-transaction, bukan lintas redelivery global.
  • UNKNOWN relation type tidak punya guard khusus (hanya lewat flow umum).
  • Behavior observability/logging masih plain logger per service (belum structured tracing).