Cross-Workflow System Behavior (Implemented)
Dokumen ini menjelaskan behavior runtime apps/worker-workflow/src/cross-workflow sesuai implementasi saat ini.
1. Komponen Utama
-
CrossWorkflowConsumerPeran: decode/filter CDC event, map payload,ack/nak. Path:apps/worker-workflow/src/cross-workflow/services/cross-workflow.consumer.ts -
CrossWorkflowServicePeran: transaction boundary + routing relation type. Path:apps/worker-workflow/src/cross-workflow/services/cross-workflow.service.ts -
CrossWorkflowOneToManyServicePeran: behavior relasiONE_TO_MANY. Path:apps/worker-workflow/src/cross-workflow/services/cross-workflow-one-to-many.service.ts -
CrossWorkflowManyToOneServicePeran: behavior relasiMANY_TO_ONE+ readiness check. Path:apps/worker-workflow/src/cross-workflow/services/cross-workflow-many-to-one.service.ts -
CrossWorkflowRepositoryPeran: semua query DB untuk context cross-workflow. Path:apps/worker-workflow/src/cross-workflow/repositories/cross-workflow.repository.ts -
Kontrak dan util:
apps/worker-workflow/src/cross-workflow/types/cross-workflow.interface.tsapps/worker-workflow/src/cross-workflow/utils/relation-type.util.tsapps/worker-workflow/src/cross-workflow/utils/cross-workflow-log.util.tsapps/worker-workflow/src/cross-workflow/utils/identifier.util.ts
2. Source Event dan Kontrak Trigger
Source trigger:
- CDC subject
pgcdc.public.wxl_submission_log - Consumer memakai
@JetStreamConsumer(...)+@MessageHandler()
Payload internal (CrossWorkflowTriggerPayload):
submissionIdactionIdactorIdfromStatusIdtoStatusIdactorUserIdsourceLogIdsourceLsnchangedAtsourceTable = 'wxl_submission_log'
2.1 Tabel Input Trigger
| Field | Tipe | Keterangan |
|---|---|---|
submissionId | string | Submission source yang men-trigger cross-workflow. |
actionId | string | Action source pada wxl_submission_log. |
actorId | string | null | Actor workflow (opsional). |
fromStatusId | string | null | Status sebelum action source. |
toStatusId | string | null | Status sesudah action source. |
actorUserId | string | null | User executor hasil fallback mapping CDC. |
changedAt | string | Timestamp trigger event. |
3. Filter CDC di Consumer
Consumer hanya memproses event jika:
- JSON valid.
payload.source.table === 'wxl_submission_log'.payload.op === 'c'.after.submission_idtersedia.after.action_idtersedia.
Jika tidak valid/relevan:
- event di-
ack - service tidak dipanggil
Mapping penting:
actorUserIdfallback:actor_user_id -> user_updated -> user_createdchangedAtfallback:date_updated -> date_created -> ts_ms -> Date.now()
4. Flow Utama CrossWorkflowService.execute(...)
Semua eksekusi dalam transaksi QueryRunner.
Urutan flow:
- Start transaction.
- Skip jika bukan status change (
toStatusIdkosong atau sama denganfromStatusId). - Resolve
SubmissionServiceV2viaModuleRef+ request context actor. - Load source submission via
CrossWorkflowRepository.findSourceSubmission(...). - Load connection config via
findConnectionsBySourceAction(...). - Buat execution context (
executedTargets,executedCreateTargets). - Untuk setiap koneksi:
- build
dedupeKeydarisourceLogId/submissionId/actionId/connectionId/changedAt - insert append-only execution log via
tryStartExecutionLog(...) - jika dedupe conflict, koneksi di-skip
- route relation type:
MANY_TO_ONE->CrossWorkflowManyToOneService.handle(...)- selain itu ->
CrossWorkflowOneToManyService.handle(...)
- simpan relasi
executionLog -> targetSubmissionId[]viaappendTargetSubmissionLinks(...)
- Commit transaksi.
Jika error:
- rollback transaksi
- error dilempar ke consumer (consumer akan
nak)
5. Flowchart (Mermaid)
6. Behavior Relation: ONE_TO_MANY
Flow di CrossWorkflowOneToManyService:
- Resolve target entity id via lookup
targetEntity.targetField = sourceSubmission.entityId. - Jika mapping tidak menghasilkan target, fallback ke
sourceSubmission.entityIdhanya saatsourceSubmission.entity === targetEntity. - Resolve target submission existing berdasarkan hasil target entity.
- Create target submission hanya untuk target entity yang belum punya submission.
- Execute action ke target submission existing.
Decision penting:
- Jika konfigurasi mapping belum lengkap, service tidak memaksa mapping dan langsung pakai fallback.
- Jika
sourceEntitydi config tidak match dengan entity source submission, mapping di-skip. sourceFieldboleh tetap terisi (termasuk kolom virtual Directus), tetapi tidak dipakai sebagai sumber nilai join runtime.- Kolom fisik yang wajib hanya
targetField; jikatargetFieldbukan kolom fisik, mapping di-skip (warn log). - Jika
targetEntityIdskosong dan submission existing juga kosong, service tidak create blind target submission (skip_create_target_no_resolved_target). - Dedupe create target pakai
context.executedCreateTargets, dedupe execute target pakaicontext.executedTargets.
Catatan:
- Tidak ada readiness gate khusus di
ONE_TO_MANY. - Saat ini belum ada guard “target sudah lebih maju” di
ONE_TO_MANY; behavior itu baru diterapkan diMANY_TO_ONE.
7. Behavior Relation: MANY_TO_ONE
Flow di CrossWorkflowManyToOneService:
- Validasi
targetActionIdmasih valid di DB. - Resolve target entity id via mapping field.
- Mapping
MANY_TO_ONEselalu pakaitarget.id(configtargetFielddiabaikan). - Resolve target submission existing.
- Khusus jika
sourceFieldfisik bernilainull, service cek flag target actionis_cr_trgt_empty_conn:true/null-> create target submission baru (entity_id = null), lalu backfillsourceEntity.sourceFielddenganentity_idtarget yang baru dibuat.false-> create di-skip, dan logger menulis reason terstruktur (event=skip_create_target_empty_connection_disabled).
- Jika
sourceFieldsudah ada nilai tetapi row target tidak ditemukan, service skip create (hanya log warning). - Jika belum ada target submission existing, skip execute existing target.
- Jalankan
isManyToOneReady(...). - Hanya jika ready -> execute action ke target submission existing.
- Saat execute target existing, jika status target sudah lebih maju dari
action.to_status, trigger di-skip untuk mencegah regress status.
Readiness check ringkas:
- Validasi config minimum source.
- Ambil nilai grouping dari source field.
- Cari sibling entity source dengan nilai grouping sama.
- Cari sibling submission source.
- Ambil
current_status_id+wml_status.orderuntuk semua sibling submission. - Tentukan
source_status_orderdari submission pemicu. - Ready jika semua sibling punya
status_order >= source_status_order. - Jika ada sibling status lebih rendah / status order tidak ditemukan, parent menunggu (not ready).
Proteksi tambahan saat eksekusi target:
- Jika
targetActionIdtidak ditemukan, koneksiMANY_TO_ONEdi-skip. - Jika
target.current_status.order > targetAction.to_status.order, eksekusi target di-skip (parent tidak diturunkan). - Jika
targetAction.is_cr_trgt_empty_conn = false, auto-create target saat koneksi kosong dimatikan dan proses menulis note reason di log.
8. Target Resolution, Dedupe, dan Cross-WF Log
- Target submission dicari berdasarkan:
workflowId = targetWorkflowId- optional
entity = targetEntity entityIdhasil mapping/fallback
- Dedupe create missing target (scope event/transaksi):
- key:
${targetWorkflowId}:${targetActionId}:${targetEntityId ?? 'null'} - storage:
context.executedCreateTargets
- Dedupe execute target submission (scope event/transaksi):
- key:
${targetSubmissionId}:${targetActionId} - storage:
context.executedTargets
- Dedupe lintas redelivery/event:
- pakai insert append-only ke
wxl_cross_wf_logdengandedupe_keyunique. - jika insert conflict, koneksi tidak diproses ulang.
- Relasi hasil eksekusi target disimpan append-only di pivot:
wxl_cross_wf_log_target_submission(cross_wf_log_id, target_submission_id)- insert dilakukan via
appendTargetSubmissionLinks(...).
9. Repository Behavior dan Safety
CrossWorkflowRepository menangani query DB:
- load source submission
- load connection config
- create execution log append-only (
tryStartExecutionLog) - append relasi target submission ke pivot (
appendTargetSubmissionLinks) - resolve target submissions
- lookup kolom fisik (
information_schema.columns) - lookup nilai field source
- lookup entity id by field value
- lookup sibling submission id
- lookup sibling
current_status_id+status_order
Safety:
- validasi identifier tabel/kolom via
isSafeIdentifier - cache hasil cek kolom fisik (
hasPhysicalColumn) untuk mengurangi query berulang
10. Side Effects ke Workflow Target
Eksekusi target dilakukan via:
SubmissionServiceV2.setSubmission(...)
Payload yang dikirim relation service:
submission_identity_id(saat create missing)workflow_idaction_idactor_idactor_user_idoverride = truesource = 'CROSS_WORKFLOW'do_not_clear_cache = true
Side effects tambahan (cross-workflow log):
- Insert append-only
wxl_cross_wf_loguntuk dedupe event. - Insert append-only
wxl_cross_wf_log_target_submissionuntuk mapping execution log ke target submission.
10.1 Tabel Output Cross-Workflow Log
| Tabel | Operasi | Keterangan |
|---|---|---|
wxl_cross_wf_log | INSERT | Menyimpan jejak eksekusi per koneksi + dedupe_key unik. |
wxl_cross_wf_log_target_submission | INSERT | Menyimpan daftar target submission yang disentuh oleh satu execution log. |
11. Error Handling dan Retry
- Consumer:
- malformed/irrelevant event ->
ack - error execute service ->
nak(8000)
- Service:
- error query/eksekusi relation -> rollback + throw
12. Test-Covered Behaviors Saat Ini
- Consumer spec (
cross-workflow.consumer.spec.ts):
- filter payload
- mapping trigger
- ack/nak behavior
- Service spec (
cross-workflow.service.spec.ts):
- early return
- routing relation type
- actor fallback
- rollback on error
- Repository spec (
cross-workflow.repository.spec.ts):
hasPhysicalColumn(is_exist, cache, invalid identifier)tryStartExecutionLog(insert sukses vs dedupe conflict)appendTargetSubmissionLinks(insert pivot unik)findSubmissionStatusOrders(mappingcurrent_status_id+status_order)
- MANY_TO_ONE service spec (
cross-workflow-many-to-one.service.spec.ts):
- readiness true saat semua sibling status sama/lebih maju
- readiness false saat ada sibling tertinggal
- skip saat target action tidak ditemukan
- skip execute saat status target sudah lebih maju dari action target
13. Batasan Implementasi Saat Ini
- Consumer hanya memproses
op='c'padawxl_submission_log. - Dedupe lintas redelivery bergantung pada kestabilan pembentuk
dedupeKey(sourceLogId,submissionId,actionId,connectionId,changedAt). - Logic create/execute target masih dipisah antara service
ONE_TO_MANYdanMANY_TO_ONE(sengaja untuk readability relation-specific).