最近、サービス間の整合性とトランザクションそしてその中で一連の見せ方について、Sagaパターンに一定の理解を持つことができたので、メモを残しておく。
何をまた仰々しくと読む向きもあるだろうが、一定の結論を自分の中で持てるというのはそれ自体に価値がある。
最終的に Denoの提供するインフラ(ランタイムの機能)を利用し、Sagaパターンを下敷きにしたマイクロサービスが裏に控えるWebアプリを実装した。
参考
Saga パターン
Saga の設計パターンは、複数のサービス間でトランザクションを調整することで、分散システムのデータの一貫性を維持するのに役立ちます。 saga は、各サービスがその操作を実行し、イベントまたはメッセージを介して次のステップを開始するローカル トランザクションのシーケンスです。 シーケンス内のステップが失敗した場合、saga は補正トランザクションを実行して、完了した手順を元に戻します。
https://learn.microsoft.com/ja-jp/azure/architecture/patterns/saga より。
ということをSagaパターンの紹介として紹介されている。 一貫性という寒天では、結果整合性に視点を置いている。
Sagaパターンは以下の3つ(大きくは2つ)に分けられる。
Orchestration
Orchestration synchronous
Orchestration asynchronous
Choreography
Orchestrationは、全体の進行を管理する役割を持つものがいる設計。 Choerographyは振り付けという意味だそうで、それぞれのサービス間をメッセージが媒介する形で進行する設計。
まず前提として、3種類のSagaパターンについてサンプルを考えてみる。 (ただし、マイクロサービスの間という観点ではないので、Commandパターンではという状況。)
Orchestration synchronous 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 class Service { constructor ( private name: string , private task: () => boolean , private rollbackTask: () => void , ) {} execute ( ) { console .log (`Call task for Service: ${this .name} ...` ); const tmp = this .task (); console .log (`Finished task for Service: ${this .name} ` ); return tmp; } rollback ( ) { console .log (`Rolling back task for Service: ${this .name} ` ); this .rollbackTask (); } } class Orchestrator { private executedServices : Service [] = []; constructor (private services: Service[] ) {} execute ( ) { console .log ("Executing orchestrator..." ); let doRollback = false ; for (const service of this .services ) { const result = service.execute (); if (!result) { doRollback = true ; break ; } this .executedServices .push (service); } if (doRollback) { return this .rollback (); } } rollback ( ) { console .log ("Rollback all services" ); for (const service of this .executedServices .reverse ()) { service.rollback (); } } } const orchestrator = new Orchestrator ( [ new Service ( "Service 1" , () => { console .log ("Executing task for Service 1" ); return true ; }, () => { console .log ("Rolling back task for Service 1" ); }, ), new Service ( "Service 2" , () => { console .log ("Executing task for Service 2" ); return false ; }, () => { console .log ("Rollback task for Service 2" ); }, ), ], ); orchestrator.execute ();
実行すると以下の通り。
1 2 3 4 5 6 7 8 9 10 11 $ deno run orchestration-synchronous/main.ts Executing orchestrator... Call task for Service: Service 1... Executing task for Service 1 Finished task for Service: Service 1 Call task for Service: Service 2... Executing task for Service 2 Finished task for Service: Service 2 Rollback all services Rolling back task for Service: Service 1 Rolling back task for Service 1
Orchestration asynchronous orchestration-asynchronous\deno.json 1 2 3 4 5 6 7 8 9 10 11 { "tasks" : { "dev" : "deno run --watch main.ts" } , "imports" : { "@std/assert" : "jsr:@std/assert@1" , "@std/async" : "jsr:@std/async@^1.0.13" , "@std/cli" : "jsr:@std/cli@^1.0.17" } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;class Service { constructor ( private name: string , private task: () => Promise <boolean > | boolean , private rollbackTask: () => Promise <void > | void , ) {} async execute ( ) { console .log (`Call task for Service: ${this .name} ...` ); const tmp = await Promise .try (this .task ); console .log (`Finished task for Service: ${this .name} ` ); return tmp; } async rollback ( ) { console .log (`Rolling back task for Service: ${this .name} ` ); await this .rollbackTask (); } } class Orchestrator { private executedServices : Service [] = []; constructor (private services: Service[] ) {} async execute ( ) { console .log ("Executing orchestrator..." ); let doRollback = false ; for (const service of this .services ) { const result = await service.execute (); if (!result) { doRollback = true ; break ; } this .executedServices .push (service); } if (doRollback) { return this .rollback (); } } async rollback ( ) { console .log ("Rollback all services" ); for (const service of this .executedServices .reverse ()) { await service.rollback (); } } } const orchestrator = new Orchestrator ( [ new Service ( "Service 1" , async () => { console .log ("Executing task for Service 1" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); return true ; }, async () => { console .log ("Rolling back task for Service 1" ); const spinner = new Spinner ({ message : "Rolling back ..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); }, ), new Service ( "Service 2" , async () => { console .log ("Executing task for Service 2" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); return false ; }, () => { console .log ("Rollback task for Service 2" ); }, ), ], ); orchestrator.execute ();
実行すると以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 13 $ deno task dev Task dev deno run --watch main.ts Watcher Process started. Executing orchestrator... Call task for Service: Service 1... Executing task for Service 1 Finished task for Service: Service 1 Call task for Service: Service 2... Executing task for Service 2 Finished task for Service: Service 2 Rollback all services Rolling back task for Service: Service 1 Rolling back task for Service 1
実行時は各サービス実行とロールバックの間でトータル15秒かかる。
Choreography choreography\deno.json 1 2 3 4 5 6 7 8 9 10 { "tasks" : { "dev" : "deno run --watch main.ts" } , "imports" : { "@std/assert" : "jsr:@std/assert@1" , "@std/async" : "jsr:@std/async@^1.0.13" , "@std/cli" : "jsr:@std/cli@^1.0.17" } }
choreography/main.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;class MessageBroker { private subscribers : { [key : string ]: (() => Promise <void > | void )[] } = {}; subscribe (event: string , callback: () => Promise <void > | void ) { if (!this .subscribers [event]) { this .subscribers [event] = []; } this .subscribers [event].push (callback); } publish (event: string ) { if (this .subscribers [event]) { this .subscribers [event].forEach ((callback ) => callback ()); } } } class Service { constructor ( private messageBroker: MessageBroker, private task: (messageBroker: MessageBroker) => Promise <void > | void , private rollbackTask: (messageBroker: MessageBroker) => Promise <void > | void ) { task (messageBroker); rollbackTask (messageBroker); } } const messageBroker = new MessageBroker ();const service1 = new Service ( messageBroker, async (messageBroker) => { messageBroker.subscribe ("service1:taskStart" , async () => { console .log ("Executing task for Service 1" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); messageBroker.publish ("service1:taskCompleted" ); }); }, async (messageBroker) => { messageBroker.subscribe ("service2:taskfailed" , async () => { console .log ("Rolling back task for Service 1" ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); } ); const service2 = new Service ( messageBroker, async (messageBroker) => { messageBroker.subscribe ("service1:taskCompleted" , async () => { console .log ("Executing task for Service 2" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); messageBroker.publish ("service2:taskCompleted" ); }); }, async (messageBroker) => { messageBroker.subscribe ("service2:taskCompleted" , async () => { console .log ("Rolling back task for Service 2" ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); } ); messageBroker.publish ("service1:taskStart" );
実行すると以下の通り。
1 2 3 4 $ deno task dev Executing task for Service 1 Executing task for Service 2 Rolling back task for Service 1
choreographyは、非中央集権な設計ではあるものの相互の通信を介在する必要があるので、そこをメッセージブローカーとして実装。(ここが単一障害点なのではと考えてしまった瞬間はある。) この場合では、キャンセル処理を順繰りに戻していますが、次のように全体にまとめてロールバックできる。
choreography/main2.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;class MessageBroker { private subscribers : { [key : string ]: (() => Promise <void > | void )[] } = {}; subscribe (event: string , callback: () => Promise <void > | void ) { if (!this .subscribers [event]) { this .subscribers [event] = []; } this .subscribers [event].push (callback); } publish (event: string ) { if (this .subscribers [event]) { this .subscribers [event].forEach ((callback ) => callback ()); } } } class Service { constructor ( private messageBroker: MessageBroker, private task: (messageBroker: MessageBroker) => Promise <void > | void , private rollbackTask: ( messageBroker: MessageBroker, ) => Promise <void > | void , ) { task (messageBroker); rollbackTask (messageBroker); } } const messageBroker = new MessageBroker ();const service1 = new Service ( messageBroker, async (messageBroker) => { messageBroker.subscribe ("service1:taskStart" , async () => { console .log ("Executing task for Service 1" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); messageBroker.publish ("service1:taskCompleted" ); }); }, async (messageBroker) => { messageBroker.subscribe ("services:rollback" , async () => { console .log ("Rolling back task for Service 1" ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); }, ); const service2 = new Service ( messageBroker, async (messageBroker) => { messageBroker.subscribe ("service1:taskCompleted" , async () => { console .log ("Executing task for Service 2" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); messageBroker.publish ("services:rollback" ); }); }, async (messageBroker) => { messageBroker.subscribe ("services:rollback" , async () => { console .log ("Rolling back task for Service 2" ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); }, ); messageBroker.publish ("service1:taskStart" );
実行すると以下の通り。
1 2 3 4 5 $ deno task dev2 Executing task for Service 1 Executing task for Service 2 Rolling back task for Service 1 Rolling back task for Service 2
Orchestration asynchronous 並行と待機 先の実装を試しながら、これは複数を並行に実行して、複数を待つということができそうなので試してみた。
orchestration-asynchronous-parallel/main.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;class MessageBroker { private subscribers : { [key : string ]: (() => Promise <void > | void )[] } = {}; subscribe (event: string , callback: () => Promise <void > | void ) { if (!this .subscribers [event]) { this .subscribers [event] = []; } this .subscribers [event].push (callback); } publish (event: string ) { if (this .subscribers [event]) { this .subscribers [event].forEach ((callback ) => callback ()); } } } class Service { constructor ( private messageBroker: MessageBroker, private task: (messageBroker: MessageBroker) => Promise <void > | void , private rollbackTask: ( messageBroker: MessageBroker, ) => Promise <void > | void , ) { task (messageBroker); rollbackTask (messageBroker); } } const messageBroker = new MessageBroker ();const service1 = new Service ( messageBroker, async (messageBroker) => { messageBroker.subscribe ("service:taskStart" , async () => { console .log ("Executing task for Service 1" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); messageBroker.publish ("service1:taskCompleted" ); }); }, async (messageBroker) => { messageBroker.subscribe ("service:taskFailed" , async () => { console .log ("Rolling back task for Service 1" ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); }, ); const service2 = new Service ( messageBroker, async (messageBroker) => { messageBroker.subscribe ("service:taskStart" , async () => { console .log ("Executing task for Service 2" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); messageBroker.publish ("service2:taskCompleted" ); }); }, async (messageBroker) => { messageBroker.subscribe ("service:taskFailed" , async () => { console .log ("Rolling back task for Service 2" ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); }, ); const service3 = new Service ( messageBroker, async (messageBroker) => { const waitingEvents = { "service1:taskCompleted" : false , "service2:taskCompleted" : false }; const checkAllTasksCompleted = async ( ) => { console .log ("Executing task for Service 3" ); console .log (` ${waitingEvents["service1:taskCompleted" ] ? "completed" : "Waiting" } for Service 1` ); console .log (` ${waitingEvents["service2:taskCompleted" ] ? "completed" : "Waiting" } for Service 2` ); if (! (waitingEvents["service1:taskCompleted" ] && waitingEvents["service2:taskCompleted" ])) { console .log (" Waiting for all tasks to complete..." ); return ; } const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); console .log ("Service 3 task completed" ); messageBroker.publish ("service3:taskCompleted" ); }; messageBroker.subscribe ("service1:taskCompleted" , () => { console .log ("Received Service 1 task completed" ); waitingEvents["service1:taskCompleted" ] = true ; checkAllTasksCompleted (); }); messageBroker.subscribe ("service2:taskCompleted" , () => { console .log ("Received Service 2 task completed" ); waitingEvents["service2:taskCompleted" ] = true ; checkAllTasksCompleted (); }); }, async (_messageBroker) => { }, ); messageBroker.publish ("service:taskStart" );
実行すると以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 Executing task for Service 1 Executing task for Service 2 Received Service 1 task completed Executing task for Service 3 completed for Service 1 Waiting for Service 2 Waiting for all tasks to complete... Received Service 2 task completed Executing task for Service 3 completed for Service 1 completed for Service 2 Service 3 task completed
service1,service2の完了を条件として、service3の実行を待つという形で実装している。 service1,2を待つだけのサービスを作成して、service4の実行をキックするという形も考えうる。
Choreography を Deno が提供するインフラで Deno は、Deno Queue というメッセージキューを提供している。 このメッセージキューを利用して、choreography を実装できる。 Deno Deploy 上にサービスを置くとき、考えうる実装になると考えられる。
choreography-deno-queue\deno.json 1 2 3 4 5 6 7 8 9 10 11 { "tasks" : { "dev" : "deno run --watch -RW main.ts" } , "imports" : { "@std/assert" : "jsr:@std/assert@1" , "@std/async" : "jsr:@std/async@^1.0.13" , "@std/cli" : "jsr:@std/cli@^1.0.17" } , "unstable" : [ "kv" ] }
choreography-deno-queue/kv.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 export type ResultGetKV = Promise <{ publish : (msg: string , id: string ) => Promise <void >; subscribe : (msg: string , callback: (msg: { id: string }) => void ) => void ; }>; export type GetKvType = () => ResultGetKV ;const kv = await Deno .openKv ("choreography-deno-queue" );const callbacks = {} as { [key : string ]: {key : string , callbacks : ((msg: { id: string } ) => void )[]}};export async function getKv ( ): ResultGetKV { const publish = async (key: string , id: string ) => { console .log (`publishing message key: ${key} , id: ${id} ` ); await kv.enqueue ({ key, id }); }; const subscribe = (key: string , callback: (msg: { id: string }) => void ) => { console .log (`subscribing to message: ${key} ` ); if (!callbacks[key]) { callbacks[key] = {key, callbacks : []}; } callbacks[key].callbacks .push (callback); kv.listenQueue ((msg ) => { console .log (`message received: ${JSON .stringify(msg)} ` ); callbacks[msg.key ].callbacks .forEach (cb => { cb (msg); }); }); }; return { publish, subscribe, }; }
choreography-deno-queue/main.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;import { getKv, GetKvType } from "./kv.ts" ;class Service { constructor ( getKv: GetKvType, task: (getKv: GetKvType) => Promise <void > | void , rollbackTask: (getKv: GetKvType) => Promise <void > | void , ) { task (getKv); rollbackTask (getKv); } } const service1 = new Service ( getKv, async (getKv) => { const kv = await getKv (); kv.subscribe ("service1:taskStart" , async ({id}) => { console .log ("Executing task for Service 1" ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); kv.publish ("service1:taskCompleted" , id); }); }, async (getKv) => { const kv = await getKv (); kv.subscribe ("service2:taskFailed" , async ({id}) => { console .log (`Rolling back task for Service 1 ${id} ` ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" }); spinner.start (); await delay (2000 ); spinner.stop (); }); }, ); const service2 = new Service ( getKv, async (getKv) => { const kv = await getKv (); kv.subscribe ("service1:taskCompleted" , async ({ id }) => { console .log (`Executing task for Service 2 ${id} ` ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" }); spinner.start (); await delay (5000 ); spinner.stop (); kv.publish ("service2:taskFailed" , id); }); }, async (_getKv) => { }, ); const kv = await getKv ();kv.publish ("service1:taskStart" , crypto.randomUUID ());
実行すると以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 13 $ deno task dev subscribing to message: service1:taskStart subscribing to message: service2:taskFailed subscribing to message: service1:taskCompleted publishing message key: service1:taskStart, id : c79e5a83-2fa0-455a-9264-e10110f5b72d message received: {"key" :"service1:taskStart" ,"id" :"c79e5a83-2fa0-455a-9264-e10110f5b72d" } Executing task for Service 1 publishing message key: service1:taskCompleted, id : c79e5a83-2fa0-455a-9264-e10110f5b72d message received: {"key" :"service1:taskCompleted" ,"id" :"c79e5a83-2fa0-455a-9264-e10110f5b72d" } Executing task for Service 2 c79e5a83-2fa0-455a-9264-e10110f5b72d publishing message key: service2:taskFailed, id : c79e5a83-2fa0-455a-9264-e10110f5b72d message received: {"key" :"service2:taskFailed" ,"id" :"c79e5a83-2fa0-455a-9264-e10110f5b72d" } Rolling back task for Service 1 c79e5a83-2fa0-455a-9264-e10110f5b72d
この状態は、未だ同じファイルで書かれており、同じプロセスで実行されている。 まだマイクロサービスを考慮された形としては受け入れが難しいとも考えられる。
Choreography を Deno が提供するインフラで 2 DENO Queue を利用したが、複数プロセスでQueueを参照することはちょっと難しい。 それは、特定のキーだけをだけ読み込むということができないため。 先の実装では、どのキーで受け取ったかにより呼び出すものを変えることで対処したが、これはマルチプロセスでは難しい。
ここでは Deno kv を愚直に使いサービスを別ファイルに切り分け、別プロセスで起動することを前提として実装する。
choreography-deno-kv/deno.jsonc 1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "tasks" : { "parallel:dev1" : "deno run -RW --watch entry_point.ts" , "parallel:dev2" : "deno run -RW --watch service1.ts" , "parallel:dev3" : "deno run -RW --watch service2.ts" } , "imports" : { "@std/assert" : "jsr:@std/assert@1" , "@std/async" : "jsr:@std/async@^1.0.13" , "@std/cli" : "jsr:@std/cli@^1.0.17" , "@std/uuid" : "jsr:@std/uuid@^1.0.7" } , "unstable" : [ "kv" ] }
choreography-deno-kv/kv.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 import { delay } from "@std/async/delay" ;export type ResultGetKV = { publish : (key: string , id: string ) => Promise <void >; subscribe : ( key: string , callback: (msg: { id: string }) => Promise <void > | void , ) => void ; row : Deno .Kv ; }; export type GetKvType = () => Promise <ResultGetKV >;const kv = await Deno .openKv ("choreography-deno-kv" );export function getKv ( ): Promise <ResultGetKV > { const clientId = crypto.randomUUID (); const publish = async (key: string , id: string ) => { console .log (`publishing message key: ${key} , id: ${id} ` ); const subscribers = await kv.list <string >({ prefix : ["choreography-deno-kv" , key, "subscribers" ], }); for await (const subscriber of subscribers) { const atomic = kv.atomic (); atomic.set ([ "choreography-deno-kv" , key, "update" , subscriber.value , ], (new Date ()).getTime ()); atomic.set ([ "choreography-deno-kv" , key, "messages" , subscriber.value , id, ], id); await atomic.commit (); } }; const subscribe = ( key: string , callback: (msg: { id: string }) => Promise <void > | void , ) => { (async () => { console .log (`subscribing to message: ${key} ` ); let lastUpdate = 0 ; const atomic = kv.atomic (); atomic.set ( ["choreography-deno-kv" , key, "subscribers" , clientId], clientId, ); atomic.set ( ["choreography-deno-kv" , key, "update" , clientId], (new Date ()).getTime (), ); await atomic.commit (); while (true ) { await delay (10 ); const update = await kv.get <number >([ "choreography-deno-kv" , key, "update" , clientId, ]); if (!update.value || update.value <= lastUpdate) { continue ; } lastUpdate = update.value ; const messages = kv.list <string >({ prefix : [ "choreography-deno-kv" , key, "messages" , clientId, ], }); for await (const message of messages) { await callback ({ id : message.value }); await kv.delete (message.key ); } } })(); }; return { publish, subscribe, row : kv, }; }
choreography-deno-kv/service.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import { GetKvType , ResultGetKV } from "./kv.ts" ;export class Service { constructor ( private getKv: GetKvType, private task: (getKv: ResultGetKV) => Promise <void > | void , private rollbackTask: (getKv: ResultGetKV) => Promise <void > | void , ) { } async start ( ) { const kv = await this .getKv (); this .task (kv); this .rollbackTask (kv); } }
choreography-deno-kv/service1.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;import { getKv } from "./kv.ts" ;import { Service } from "./service.ts" ;const service1 = new Service ( getKv, async ({ subscribe, publish }) => { subscribe ("service1:taskStart" , async (msg : { id : string }) => { console .log (`Executing task for Service 1 ${msg.id} ` ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" , }); spinner.start (); await delay (5000 ); spinner.stop (); publish ("service1:taskCompleted" , msg.id ); }); }, async ({ subscribe }) => { subscribe ("service2:taskfailed" , async (msg : { id : string }) => { console .log (`Rolling back task for Service 1 ${msg.id} ` ); const spinner = new Spinner ({ message : "Rolling back..." , color : "red" , }); spinner.start (); await delay (2000 ); spinner.stop (); }); }, ); service1.start (); console .log ("Service 1 started" );
choreography-deno-kv/service2.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import { delay } from "@std/async/delay" ;import { Spinner } from "@std/cli/unstable-spinner" ;import { getKv } from "./kv.ts" ;import { Service } from "./service.ts" ;const service2 = new Service ( getKv, async ({ subscribe, publish }) => { subscribe ("service1:taskCompleted" , async (msg) => { console .log (`Executing task for Service 2 ${msg.id} ` ); const spinner = new Spinner ({ message : "Executing..." , color : "yellow" , }); spinner.start (); await delay (5000 ); spinner.stop (); publish ("service2:taskfailed" , msg.id ); console .log (`Service 2 task failed ${msg.id} ` ); }); }, async (_getKv) => { }, ); service2.start (); console .log ("Service 2 started" );
choreography-deno-kv/entry_point.ts 1 2 3 4 5 6 7 8 9 import { getKv } from "./kv.ts" ;import { delay } from "@std/async/delay" ;const kv = await getKv ();await delay (1000 );await kv.publish ("service1:taskStart" , crypto.randomUUID ());
実行すると以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 $ deno task parallel:* Task parallel:dev1 deno run -RW --watch entry_point.ts Task parallel:dev2 deno run -RW --watch service1.ts Task parallel:dev3 deno run -RW --watch service2.ts Service 2 started Service 1 started subscribing to message: service1:taskCompleted subscribing to message: service1:taskStart subscribing to message: service2:taskfailed publishing message key: service1:taskStart, id : 19d48ceb-c421-4bdb-b20b-9368fafd18f1 Executing task for Service 1 19d48ceb-c421-4bdb-b20b-9368fafd18f1 publishing message key: service1:taskCompleted, id : 19d48ceb-c421-4bdb-b20b-9368fafd18f1 Executing task for Service 2 19d48ceb-c421-4bdb-b20b-9368fafd18f1 publishing message key: service2:taskfailed, id : 19d48ceb-c421-4bdb-b20b-9368fafd18f1 Service 2 task failed 19d48ceb-c421-4bdb-b20b-9368fafd18f1 Rolling back task for Service 1 19d48ceb-c421-4bdb-b20b-9368fafd18f1
複数のプロセスで起動する形をとり、それぞれがメッセージを発行し、やり取りする流れが作れました。 中心にあるのは、Deno KV です。
とここまでで、Deno の提供する機能を利用して、Saga パターンを実装できました。
アプリから使うを考える ここまでCLIで動くことを前提とした確認について紹介をしました。 ここからは見せ方について考えたいです。
たまに見るWebアプリで、何か設定すると進行状況がメッセージで出てくるものがあります。 ランプがテキストの横で順次点灯したりするものもあります。
それの実装を試みます。
ここでは、Deno KV を利用したChoreographyを利用し、全体の進捗を監視できるように改修し、Fresh に載せます。
マルチプロセスで、計算をさせてみることにします。
全体を乗せるのは量があるため、詳細はこちらのリポジトリ を参照。
メッセージブローカーは、Deno KV を利用して実装します。
services/messageBroker.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 import { delay } from "@std/async/delay" ;const MESSAGE_BROKER_PREFIX = "choreography" as const ;const MESSAGE_BROKER_UPDATE_KEY = "update" as const ;const MESSAGE_BROKER_MESSAGES_KEY = "messages" as const ;const MESSAGE_BROKER_SUBSCRIBERS_KEY = "subscribers" as const ;function getUpdateKey (key: string , clientId: string ): string [] { return [MESSAGE_BROKER_PREFIX , key, MESSAGE_BROKER_UPDATE_KEY , clientId]; } function getMessagesKey (key: string , clientId: string ): string [] { return [ MESSAGE_BROKER_PREFIX , key, MESSAGE_BROKER_MESSAGES_KEY , clientId, ]; } function getMessageKey (key: string , clientId: string , id: string ): string [] { return [ MESSAGE_BROKER_PREFIX , key, MESSAGE_BROKER_MESSAGES_KEY , clientId, id, ]; } function getSubscribersKey (key: string ): string [] { return [MESSAGE_BROKER_PREFIX , key, MESSAGE_BROKER_SUBSCRIBERS_KEY ]; } function getSubscriberKey (key: string , clientId: string ): string [] { return [MESSAGE_BROKER_PREFIX , key, MESSAGE_BROKER_SUBSCRIBERS_KEY , clientId]; } export type ResultMessageBroker = { publish : ( key: string , taskId: string , payload: { [key: string ]: unknown }, ) => Promise <void >; subscribe : ( key: string , callback: ( taskId: string , payload: { [key: string ]: unknown }, ) => Promise <void > | void , ) => void ;}; type KV_MESSAGE_BLOCKER_RAW_TYPE = { taskId : string ; payload : { [key : string ]: unknown }; }; export async function getMessageBroker ( ): Promise <ResultMessageBroker > { const kv = await Deno .openKv ("./tmp/choreography" ); const clientId = crypto.randomUUID (); const publish = async ( key: string , taskId: string , payload: { [key: string ]: unknown }, ) => { console .log ( `publishing message key: ${key} , taskId: ${taskId} , payload: ${ JSON .stringify(payload) } ` , ); const subscribers = await kv.list <string >({ prefix : getSubscribersKey (key), }); for await (const subscriber of subscribers) { const atomic = kv.atomic (); atomic.set (getUpdateKey (key, subscriber.value ), (new Date ()).getTime ()); atomic.set (getMessageKey (key, subscriber.value , taskId), { taskId, payload, }); await atomic.commit (); } }; const subscribe = ( key: string , callback: ( taskId: string , payload: { [key: string ]: unknown }, ) => Promise <void > | void , ) => { (async () => { console .info (`subscribing to message: ${key} ` ); let lastUpdate = 0 ; const atomic = kv.atomic (); atomic.set ( getSubscriberKey (key, clientId), clientId, ); atomic.set ( getUpdateKey (key, clientId), 0 , ); await atomic.commit (); while (true ) { await delay (1000 ); const update = await kv.get <number >( getUpdateKey (key, clientId), ); if (update.value === null || update.value <= lastUpdate) { continue ; } lastUpdate = update.value ; const messages = kv.list <KV_MESSAGE_BLOCKER_RAW_TYPE >({ prefix : getMessagesKey (key, clientId), }); for await (const message of messages) { await callback (message.value .taskId , message.value .payload ); await kv.delete (message.key ); } } })(); }; return { publish, subscribe, }; }
各サービスの基盤のServiceクラスは以下のように実装。
services/service.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import { ResultMessageBroker } from "./messageBroker.ts" ;export class Service { constructor ( private getMessageBroker: () => Promise <ResultMessageBroker>, private task: ( { publish, subscribe }: ResultMessageBroker, ) => Promise <void > | void , private rollbackTask: ( { publish, subscribe }: ResultMessageBroker, ) => Promise <void > | void , ) { } async start ( ) { const kv = await this .getMessageBroker (); this .task (kv); this .rollbackTask (kv); } }
具体的な各サービスは以下のように3ファイル実装。
services/service1.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import { Service } from "./service.ts" ;import { getMessageBroker } from "./messageBroker.ts" ;import { work1, work1Rollback } from "./work.ts" ;const service1 = new Service ( getMessageBroker, work1, work1Rollback, ); service1.start (); console .log ("Service 1 started" );
deno.jsonc(tasks のみ抜粋) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 { "tasks" : { "check" : "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx" , "cli" : "echo \"import '\\$fresh/src/dev/cli.ts'\" | deno run --unstable -A -" , "manifest" : "deno task cli manifest $(pwd)" , "start:app" : "deno run -A --watch=static/,routes/ dev.ts" , "start:service1" : "deno run -WR --watch services/task1service.ts" , "start:service2" : "deno run -WR --watch services/task2service.ts" , "start:service3" : "deno run -WR --watch services/task3service.ts" , "build" : "deno run -A dev.ts build" , "preview" : "deno run -A main.ts" , "update" : "deno run -A -r https://fresh.deno.dev/update ." } }
以下のコマンドで実行。
動かすと以下のようになります。 3つのタスクメッセージブローカーを介して、各サービスがメッセージをやり取りしながら進行します。
成功する場合、3段階でconpliteとなります。
失敗する場合、3段階目まで順次completeになっていき、最終的にすべてrollbackになります。
処理自体は、Sagaパターンに則り、計算状態や、ログはポーリングで取得しています。
例に挙げたような、処理の状況がオンラインで順次見えものができました。
今回作ったもの、マルチプロセスでやるには、Deno KV の機能の有効活用というより while ぶん回しみたいな形であった。 もっとスマートにできないものかと記事外でも試したところですが、今回はここまで。 中に組み込んでしまいましたが、Deno Deploy のKVに接続ができるので、Deno Deploy での処理の難しいモノは、EC2なりに任せてしまうことも可能。
もし全部Deno Deployでやるなら、KVの機能でほとんど綺麗に収まるだろうと考えています。 今回最後の実装だと、subscribe側が先に立っていないと通信が上手くいかないので、ここもまた考慮事項。
わざと1個づつ逆順でロールバックをしている箇所、確認した資料次第では、各サービスへ並行に投げているケースも見受けられた。
順番が本当に大事なのだとすれば、サービスを分けないか、Orchestrationが推奨なのだろうと感じる。
今回の実装から、本格的にCopilot Agentも使ってみた。 あまり生成させ過ぎるのは、学習を目的とするところでは不適切なので、意図的に使わない部分もある。 Sagaパターンの実装という意図が汲まれているのか、サジェスト自体もかなり賢い。 Deno KV周りは、学習がないのかサジェストも少ないように感じられた。
一番凄まじいと感じたのは、閉じタグ不足を目で判断できないtsxのエラー。 コンソールのエラーも大した記述ないところ、「直して」だけ書いて直したこと。 こいつとは、仲良くしよう。
では。