Skip to main content

Cross-Workflow System Behavior (Implemented)

Dokumen ini menjelaskan behavior runtime apps/worker-workflow/src/cross-workflow sesuai implementasi saat ini.

1. Komponen Utama

  1. CrossWorkflowConsumer Peran: decode/filter CDC event, map payload, ack/nak. Path: apps/worker-workflow/src/cross-workflow/services/cross-workflow.consumer.ts

  2. CrossWorkflowService Peran: transaction boundary + routing relation type. Path: apps/worker-workflow/src/cross-workflow/services/cross-workflow.service.ts

  3. CrossWorkflowOneToManyService Peran: behavior relasi ONE_TO_MANY. Path: apps/worker-workflow/src/cross-workflow/services/cross-workflow-one-to-many.service.ts

  4. CrossWorkflowManyToOneService Peran: behavior relasi MANY_TO_ONE + readiness check. Path: apps/worker-workflow/src/cross-workflow/services/cross-workflow-many-to-one.service.ts

  5. CrossWorkflowRepository Peran: semua query DB untuk context cross-workflow. Path: apps/worker-workflow/src/cross-workflow/repositories/cross-workflow.repository.ts

  6. Kontrak dan util:

  • apps/worker-workflow/src/cross-workflow/types/cross-workflow.interface.ts
  • apps/worker-workflow/src/cross-workflow/utils/relation-type.util.ts
  • apps/worker-workflow/src/cross-workflow/utils/cross-workflow-log.util.ts
  • apps/worker-workflow/src/cross-workflow/utils/identifier.util.ts

2. Source Event dan Kontrak Trigger

Source trigger:

  • CDC subject pgcdc.public.wxl_submission_log
  • Consumer memakai @JetStreamConsumer(...) + @MessageHandler()

Payload internal (CrossWorkflowTriggerPayload):

  • submissionId
  • actionId
  • actorId
  • fromStatusId
  • toStatusId
  • actorUserId
  • sourceLogId
  • sourceLsn
  • changedAt
  • sourceTable = 'wxl_submission_log'

2.1 Tabel Input Trigger

FieldTipeKeterangan
submissionIdstringSubmission source yang men-trigger cross-workflow.
actionIdstringAction source pada wxl_submission_log.
actorIdstring | nullActor workflow (opsional).
fromStatusIdstring | nullStatus sebelum action source.
toStatusIdstring | nullStatus sesudah action source.
actorUserIdstring | nullUser executor hasil fallback mapping CDC.
changedAtstringTimestamp trigger event.

3. Filter CDC di Consumer

Consumer hanya memproses event jika:

  1. JSON valid.
  2. payload.source.table === 'wxl_submission_log'.
  3. payload.op === 'c'.
  4. after.submission_id tersedia.
  5. after.action_id tersedia.

Jika tidak valid/relevan:

  • event di-ack
  • service tidak dipanggil

Mapping penting:

  • actorUserId fallback: actor_user_id -> user_updated -> user_created
  • changedAt fallback: date_updated -> date_created -> ts_ms -> Date.now()

4. Flow Utama CrossWorkflowService.execute(...)

Semua eksekusi dalam transaksi QueryRunner.

Urutan flow:

  1. Start transaction.
  2. Skip jika bukan status change (toStatusId kosong atau sama dengan fromStatusId).
  3. Resolve SubmissionServiceV2 via ModuleRef + request context actor.
  4. Load source submission via CrossWorkflowRepository.findSourceSubmission(...).
  5. Load connection config via findConnectionsBySourceAction(...).
  6. Buat execution context (executedTargets, executedCreateTargets).
  7. Untuk setiap koneksi:
  • build dedupeKey dari sourceLogId/submissionId/actionId/connectionId/changedAt
  • insert append-only execution log via tryStartExecutionLog(...)
  • jika dedupe conflict, koneksi di-skip
  • route relation type:
    • MANY_TO_ONE -> CrossWorkflowManyToOneService.handle(...)
    • selain itu -> CrossWorkflowOneToManyService.handle(...)
  • simpan relasi executionLog -> targetSubmissionId[] via appendTargetSubmissionLinks(...)
  1. Commit transaksi.

Jika error:

  • rollback transaksi
  • error dilempar ke consumer (consumer akan nak)

5. Flowchart (Mermaid)

6. Behavior Relation: ONE_TO_MANY

Flow di CrossWorkflowOneToManyService:

  1. Resolve target entity id via lookup targetEntity.targetField = sourceSubmission.entityId.
  2. Jika mapping tidak menghasilkan target, fallback ke sourceSubmission.entityId hanya saat sourceSubmission.entity === targetEntity.
  3. Resolve target submission existing berdasarkan hasil target entity.
  4. Create target submission hanya untuk target entity yang belum punya submission.
  5. Execute action ke target submission existing.

Decision penting:

  1. Jika konfigurasi mapping belum lengkap, service tidak memaksa mapping dan langsung pakai fallback.
  2. Jika sourceEntity di config tidak match dengan entity source submission, mapping di-skip.
  3. sourceField boleh tetap terisi (termasuk kolom virtual Directus), tetapi tidak dipakai sebagai sumber nilai join runtime.
  4. Kolom fisik yang wajib hanya targetField; jika targetField bukan kolom fisik, mapping di-skip (warn log).
  5. Jika targetEntityIds kosong dan submission existing juga kosong, service tidak create blind target submission (skip_create_target_no_resolved_target).
  6. Dedupe create target pakai context.executedCreateTargets, dedupe execute target pakai context.executedTargets.

Catatan:

  • Tidak ada readiness gate khusus di ONE_TO_MANY.
  • Saat ini belum ada guard “target sudah lebih maju” di ONE_TO_MANY; behavior itu baru diterapkan di MANY_TO_ONE.

7. Behavior Relation: MANY_TO_ONE

Flow di CrossWorkflowManyToOneService:

  1. Validasi targetActionId masih valid di DB.
  2. Resolve target entity id via mapping field.
  3. Mapping MANY_TO_ONE selalu pakai target.id (config targetField diabaikan).
  4. Resolve target submission existing.
  5. Khusus jika sourceField fisik bernilai null, service cek flag target action is_cr_trgt_empty_conn:
    • true/null -> create target submission baru (entity_id = null), lalu backfill sourceEntity.sourceField dengan entity_id target yang baru dibuat.
    • false -> create di-skip, dan logger menulis reason terstruktur (event=skip_create_target_empty_connection_disabled).
  6. Jika sourceField sudah ada nilai tetapi row target tidak ditemukan, service skip create (hanya log warning).
  7. Jika belum ada target submission existing, skip execute existing target.
  8. Jalankan isManyToOneReady(...).
  9. Hanya jika ready -> execute action ke target submission existing.
  10. Saat execute target existing, jika status target sudah lebih maju dari action.to_status, trigger di-skip untuk mencegah regress status.

Readiness check ringkas:

  1. Validasi config minimum source.
  2. Ambil nilai grouping dari source field.
  3. Cari sibling entity source dengan nilai grouping sama.
  4. Cari sibling submission source.
  5. Ambil current_status_id + wml_status.order untuk semua sibling submission.
  6. Tentukan source_status_order dari submission pemicu.
  7. Ready jika semua sibling punya status_order >= source_status_order.
  8. Jika ada sibling status lebih rendah / status order tidak ditemukan, parent menunggu (not ready).

Proteksi tambahan saat eksekusi target:

  1. Jika targetActionId tidak ditemukan, koneksi MANY_TO_ONE di-skip.
  2. Jika target.current_status.order > targetAction.to_status.order, eksekusi target di-skip (parent tidak diturunkan).
  3. Jika targetAction.is_cr_trgt_empty_conn = false, auto-create target saat koneksi kosong dimatikan dan proses menulis note reason di log.

8. Target Resolution, Dedupe, dan Cross-WF Log

  1. Target submission dicari berdasarkan:
  • workflowId = targetWorkflowId
  • optional entity = targetEntity
  • entityId hasil mapping/fallback
  1. Dedupe create missing target (scope event/transaksi):
  • key: ${targetWorkflowId}:${targetActionId}:${targetEntityId ?? 'null'}
  • storage: context.executedCreateTargets
  1. Dedupe execute target submission (scope event/transaksi):
  • key: ${targetSubmissionId}:${targetActionId}
  • storage: context.executedTargets
  1. Dedupe lintas redelivery/event:
  • pakai insert append-only ke wxl_cross_wf_log dengan dedupe_key unique.
  • jika insert conflict, koneksi tidak diproses ulang.
  1. Relasi hasil eksekusi target disimpan append-only di pivot:
  • wxl_cross_wf_log_target_submission(cross_wf_log_id, target_submission_id)
  • insert dilakukan via appendTargetSubmissionLinks(...).

9. Repository Behavior dan Safety

CrossWorkflowRepository menangani query DB:

  • load source submission
  • load connection config
  • create execution log append-only (tryStartExecutionLog)
  • append relasi target submission ke pivot (appendTargetSubmissionLinks)
  • resolve target submissions
  • lookup kolom fisik (information_schema.columns)
  • lookup nilai field source
  • lookup entity id by field value
  • lookup sibling submission id
  • lookup sibling current_status_id + status_order

Safety:

  • validasi identifier tabel/kolom via isSafeIdentifier
  • cache hasil cek kolom fisik (hasPhysicalColumn) untuk mengurangi query berulang

10. Side Effects ke Workflow Target

Eksekusi target dilakukan via:

  • SubmissionServiceV2.setSubmission(...)

Payload yang dikirim relation service:

  • submission_id
  • entity_id (saat create missing)
  • workflow_id
  • action_id
  • actor_id
  • actor_user_id
  • override = true
  • source = 'CROSS_WORKFLOW'
  • do_not_clear_cache = true

Side effects tambahan (cross-workflow log):

  1. Insert append-only wxl_cross_wf_log untuk dedupe event.
  2. Insert append-only wxl_cross_wf_log_target_submission untuk mapping execution log ke target submission.

10.1 Tabel Output Cross-Workflow Log

TabelOperasiKeterangan
wxl_cross_wf_logINSERTMenyimpan jejak eksekusi per koneksi + dedupe_key unik.
wxl_cross_wf_log_target_submissionINSERTMenyimpan daftar target submission yang disentuh oleh satu execution log.

11. Error Handling dan Retry

  1. Consumer:
  • malformed/irrelevant event -> ack
  • error execute service -> nak(8000)
  1. Service:
  • error query/eksekusi relation -> rollback + throw

12. Test-Covered Behaviors Saat Ini

  1. Consumer spec (cross-workflow.consumer.spec.ts):
  • filter payload
  • mapping trigger
  • ack/nak behavior
  1. Service spec (cross-workflow.service.spec.ts):
  • early return
  • routing relation type
  • actor fallback
  • rollback on error
  1. Repository spec (cross-workflow.repository.spec.ts):
  • hasPhysicalColumn (is_exist, cache, invalid identifier)
  • tryStartExecutionLog (insert sukses vs dedupe conflict)
  • appendTargetSubmissionLinks (insert pivot unik)
  • findSubmissionStatusOrders (mapping current_status_id + status_order)
  1. MANY_TO_ONE service spec (cross-workflow-many-to-one.service.spec.ts):
  • readiness true saat semua sibling status sama/lebih maju
  • readiness false saat ada sibling tertinggal
  • skip saat target action tidak ditemukan
  • skip execute saat status target sudah lebih maju dari action target

13. Batasan Implementasi Saat Ini

  1. Consumer hanya memproses op='c' pada wxl_submission_log.
  2. Dedupe lintas redelivery bergantung pada kestabilan pembentuk dedupeKey (sourceLogId, submissionId, actionId, connectionId, changedAt).
  3. Logic create/execute target masih dipisah antara service ONE_TO_MANY dan MANY_TO_ONE (sengaja untuk readability relation-specific).