Saga パターンを具体的に考える

最近、サービス間の整合性とトランザクションそしてその中で一連の見せ方について、Sagaパターンに一定の理解を持つことができたので、メモを残しておく。

何をまた仰々しくと読む向きもあるだろうが、一定の結論を自分の中で持てるというのはそれ自体に価値がある。

最終的に Denoの提供するインフラ(ランタイムの機能)を利用し、Sagaパターンを下敷きにしたマイクロサービスが裏に控えるWebアプリを実装した。

参考

Saga パターン

Saga の設計パターンは、複数のサービス間でトランザクションを調整することで、分散システムのデータの一貫性を維持するのに役立ちます。
saga は、各サービスがその操作を実行し、イベントまたはメッセージを介して次のステップを開始するローカル トランザクションのシーケンスです。
シーケンス内のステップが失敗した場合、saga は補正トランザクションを実行して、完了した手順を元に戻します。

https://learn.microsoft.com/ja-jp/azure/architecture/patterns/saga より。

ということをSagaパターンの紹介として紹介されている。
一貫性という寒天では、結果整合性に視点を置いている。

Sagaパターンは以下の3つ(大きくは2つ)に分けられる。

  • Orchestration
    • Orchestration synchronous
    • Orchestration asynchronous
  • Choreography

Orchestrationは、全体の進行を管理する役割を持つものがいる設計。
Choerographyは振り付けという意味だそうで、それぞれのサービス間をメッセージが媒介する形で進行する設計。

まず前提として、3種類のSagaパターンについてサンプルを考えてみる。
(ただし、マイクロサービスの間という観点ではないので、Commandパターンではという状況。)

Orchestration synchronous

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Service {
constructor(
private name: string,
private task: () => boolean,
private rollbackTask: () => void,
) {}

execute() {
console.log(`Call task for Service: ${this.name}...`);
const tmp = this.task();
console.log(`Finished task for Service: ${this.name}`);
return tmp;
}

rollback() {
console.log(`Rolling back task for Service: ${this.name}`);
this.rollbackTask();
}
}

class Orchestrator {
private executedServices: Service[] = [];
constructor(private services: Service[]) {}

execute() {
console.log("Executing orchestrator...");
let doRollback = false;
for (const service of this.services) {
const result = service.execute();
if (!result) {
doRollback = true;
break;
}
this.executedServices.push(service);
}

if (doRollback) {
return this.rollback();
}
}
rollback() {
console.log("Rollback all services");
for (const service of this.executedServices.reverse()) {
service.rollback();
}
}
}

const orchestrator = new Orchestrator(
[
new Service(
"Service 1",
() => {
console.log("Executing task for Service 1");
return true;
},
() => {
console.log("Rolling back task for Service 1");
},
),
new Service(
"Service 2",
() => {
console.log("Executing task for Service 2");
return false; // わざと失敗させる
},
() => {
console.log("Rollback task for Service 2");
},
),
],
);

orchestrator.execute();

実行すると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
$ deno run orchestration-synchronous/main.ts
Executing orchestrator...
Call task for Service: Service 1...
Executing task for Service 1
Finished task for Service: Service 1
Call task for Service: Service 2...
Executing task for Service 2
Finished task for Service: Service 2
Rollback all services
Rolling back task for Service: Service 1
Rolling back task for Service 1

Orchestration asynchronous

orchestration-asynchronous\deno.json
1
2
3
4
5
6
7
8
9
10
11
{
"tasks": {
"dev": "deno run --watch main.ts"
},
"imports": {
"@std/assert": "jsr:@std/assert@1",
"@std/async": "jsr:@std/async@^1.0.13",
"@std/cli": "jsr:@std/cli@^1.0.17"
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import { delay  } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";

class Service {
constructor(
private name: string,
private task: () => Promise<boolean> | boolean,
private rollbackTask: () => Promise<void> | void,
) {}

async execute() {
console.log(`Call task for Service: ${this.name}...`);
const tmp = await Promise.try(this.task);
console.log(`Finished task for Service: ${this.name}`);
return tmp;
}

async rollback() {
console.log(`Rolling back task for Service: ${this.name}`);
await this.rollbackTask();
}
}

class Orchestrator {
private executedServices: Service[] = [];
constructor(private services: Service[]) {}

async execute() {
console.log("Executing orchestrator...");
let doRollback = false;
for (const service of this.services) {
const result = await service.execute();
if (!result) {
doRollback = true;
break;
}
this.executedServices.push(service);
}

if (doRollback) {
return this.rollback();
}
}
async rollback() {
console.log("Rollback all services");
for (const service of this.executedServices.reverse()) {
await service.rollback();
}
}
}

const orchestrator = new Orchestrator(
[
new Service(
"Service 1",
async () => {
console.log("Executing task for Service 1");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();
return true;
},
async () => {
console.log("Rolling back task for Service 1");
const spinner = new Spinner({ message: "Rolling back ...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

},
),
new Service(
"Service 2",
async () => {
console.log("Executing task for Service 2");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();
return false; // わざと失敗させる
},
() => {
console.log("Rollback task for Service 2");
},
),
],
);

orchestrator.execute();

実行すると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ deno task dev
Task dev deno run --watch main.ts
Watcher Process started.
Executing orchestrator...
Call task for Service: Service 1...
Executing task for Service 1
Finished task for Service: Service 1
Call task for Service: Service 2...
Executing task for Service 2
Finished task for Service: Service 2
Rollback all services
Rolling back task for Service: Service 1
Rolling back task for Service 1

実行時は各サービス実行とロールバックの間でトータル15秒かかる。

Choreography

choreography\deno.json
1
2
3
4
5
6
7
8
9
10
{
"tasks": {
"dev": "deno run --watch main.ts"
},
"imports": {
"@std/assert": "jsr:@std/assert@1",
"@std/async": "jsr:@std/async@^1.0.13",
"@std/cli": "jsr:@std/cli@^1.0.17"
}
}
choreography/main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { delay } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";

class MessageBroker {
private subscribers: { [key: string]: (() => Promise<void> | void)[] } = {};

subscribe(event: string, callback: () => Promise<void> | void) {
if (!this.subscribers[event]) {
this.subscribers[event] = [];
}
this.subscribers[event].push(callback);
}

publish(event: string) {
if (this.subscribers[event]) {
this.subscribers[event].forEach((callback) => callback());
}
}
}

class Service {
constructor(
private messageBroker: MessageBroker,
private task: (messageBroker: MessageBroker) => Promise<void> | void,
private rollbackTask: (messageBroker: MessageBroker) => Promise<void> | void
) {
task(messageBroker);
rollbackTask(messageBroker);
}
}

const messageBroker = new MessageBroker();
const service1 = new Service(
messageBroker,
async (messageBroker) => {
messageBroker.subscribe("service1:taskStart", async () => {
console.log("Executing task for Service 1");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

messageBroker.publish("service1:taskCompleted");
});
},
async (messageBroker) => {
messageBroker.subscribe("service2:taskfailed", async () => {
console.log("Rolling back task for Service 1");
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
}
);
const service2 = new Service(
messageBroker,
async (messageBroker) => {
messageBroker.subscribe("service1:taskCompleted", async () => {
console.log("Executing task for Service 2");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();
// Simulate a failure
messageBroker.publish("service2:taskCompleted");
});
},
async (messageBroker) => {
messageBroker.subscribe("service2:taskCompleted", async () => {
console.log("Rolling back task for Service 2");
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
}
);

messageBroker.publish("service1:taskStart");

実行すると以下の通り。

1
2
3
4
$ deno task dev
Executing task for Service 1
Executing task for Service 2
Rolling back task for Service 1

choreographyは、非中央集権な設計ではあるものの相互の通信を介在する必要があるので、そこをメッセージブローカーとして実装。(ここが単一障害点なのではと考えてしまった瞬間はある。)
この場合では、キャンセル処理を順繰りに戻していますが、次のように全体にまとめてロールバックできる。

choreography/main2.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import { delay } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";

class MessageBroker {
private subscribers: { [key: string]: (() => Promise<void> | void)[] } = {};

subscribe(event: string, callback: () => Promise<void> | void) {
if (!this.subscribers[event]) {
this.subscribers[event] = [];
}
this.subscribers[event].push(callback);
}

publish(event: string) {
if (this.subscribers[event]) {
this.subscribers[event].forEach((callback) => callback());
}
}
}

class Service {
constructor(
private messageBroker: MessageBroker,
private task: (messageBroker: MessageBroker) => Promise<void> | void,
private rollbackTask: (
messageBroker: MessageBroker,
) => Promise<void> | void,
) {
task(messageBroker);
rollbackTask(messageBroker);
}
}

const messageBroker = new MessageBroker();
const service1 = new Service(
messageBroker,
async (messageBroker) => {
messageBroker.subscribe("service1:taskStart", async () => {
console.log("Executing task for Service 1");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

messageBroker.publish("service1:taskCompleted");
});
},
async (messageBroker) => {
messageBroker.subscribe("services:rollback", async () => { // 失敗時に、各サービスは同じイベントで実行を発火する
console.log("Rolling back task for Service 1");
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
},
);
const service2 = new Service(
messageBroker,
async (messageBroker) => {
messageBroker.subscribe("service1:taskCompleted", async () => {
console.log("Executing task for Service 2");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

messageBroker.publish("services:rollback"); // わざと失敗させる
});
},
async (messageBroker) => {
messageBroker.subscribe("services:rollback", async () => { // 失敗時に、各サービスは同じイベントで実行を発火する
console.log("Rolling back task for Service 2");
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
},
);

messageBroker.publish("service1:taskStart");

実行すると以下の通り。

1
2
3
4
5
$ deno task dev2
Executing task for Service 1
Executing task for Service 2
Rolling back task for Service 1
Rolling back task for Service 2

Orchestration asynchronous 並行と待機

先の実装を試しながら、これは複数を並行に実行して、複数を待つということができそうなので試してみた。

orchestration-asynchronous-parallel/main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import { delay } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";

class MessageBroker {
private subscribers: { [key: string]: (() => Promise<void> | void)[] } = {};

subscribe(event: string, callback: () => Promise<void> | void) {
if (!this.subscribers[event]) {
this.subscribers[event] = [];
}
this.subscribers[event].push(callback);
}

publish(event: string) {
if (this.subscribers[event]) {
this.subscribers[event].forEach((callback) => callback());
}
}
}

class Service {
constructor(
private messageBroker: MessageBroker,
private task: (messageBroker: MessageBroker) => Promise<void> | void,
private rollbackTask: (
messageBroker: MessageBroker,
) => Promise<void> | void,
) {
task(messageBroker);
rollbackTask(messageBroker);
}
}

const messageBroker = new MessageBroker();
const service1 = new Service(
messageBroker,
async (messageBroker) => {
messageBroker.subscribe("service:taskStart", async () => {
console.log("Executing task for Service 1");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

messageBroker.publish("service1:taskCompleted");
});
},
async (messageBroker) => {
messageBroker.subscribe("service:taskFailed", async () => {
console.log("Rolling back task for Service 1");
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
},
);
const service2 = new Service(
messageBroker,
async (messageBroker) => {
messageBroker.subscribe("service:taskStart", async () => {
console.log("Executing task for Service 2");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

messageBroker.publish("service2:taskCompleted");
});
},
async (messageBroker) => {
messageBroker.subscribe("service:taskFailed", async () => {
console.log("Rolling back task for Service 2");
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
},
);

const service3 = new Service(
messageBroker,
async (messageBroker) => {
const waitingEvents = { "service1:taskCompleted": false, "service2:taskCompleted": false };

const checkAllTasksCompleted = async () => {
console.log("Executing task for Service 3");

console.log(` ${waitingEvents["service1:taskCompleted"] ? "completed" : "Waiting"} for Service 1`);
console.log(` ${waitingEvents["service2:taskCompleted"] ? "completed" : "Waiting"} for Service 2`);

if (! (waitingEvents["service1:taskCompleted"] && waitingEvents["service2:taskCompleted"])) {
console.log(" Waiting for all tasks to complete...");
return;
}

const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

console.log("Service 3 task completed");
messageBroker.publish("service3:taskCompleted");
};

messageBroker.subscribe("service1:taskCompleted", () => {
console.log("Received Service 1 task completed");
waitingEvents["service1:taskCompleted"] = true;
checkAllTasksCompleted();
});

messageBroker.subscribe("service2:taskCompleted", () => {
console.log("Received Service 2 task completed");
waitingEvents["service2:taskCompleted"] = true;
checkAllTasksCompleted();
});
},
async (_messageBroker) => {
},
);


messageBroker.publish("service:taskStart");

実行すると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
Executing task for Service 1
Executing task for Service 2
Received Service 1 task completed
Executing task for Service 3
completed for Service 1
Waiting for Service 2
Waiting for all tasks to complete...
Received Service 2 task completed
Executing task for Service 3
completed for Service 1
completed for Service 2
Service 3 task completed

service1,service2の完了を条件として、service3の実行を待つという形で実装している。
service1,2を待つだけのサービスを作成して、service4の実行をキックするという形も考えうる。

Choreography を Deno が提供するインフラで

Deno は、Deno Queue というメッセージキューを提供している。
このメッセージキューを利用して、choreography を実装できる。
Deno Deploy 上にサービスを置くとき、考えうる実装になると考えられる。

choreography-deno-queue\deno.json
1
2
3
4
5
6
7
8
9
10
11
{
"tasks": {
"dev": "deno run --watch -RW main.ts"
},
"imports": {
"@std/assert": "jsr:@std/assert@1",
"@std/async": "jsr:@std/async@^1.0.13",
"@std/cli": "jsr:@std/cli@^1.0.17"
},
"unstable": ["kv"]
}
choreography-deno-queue/kv.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
export type ResultGetKV = Promise<{
publish: (msg: string, id: string) => Promise<void>;
subscribe: (msg: string, callback: (msg: { id: string }) => void) => void;
}>;
export type GetKvType = () => ResultGetKV;

const kv = await Deno.openKv("choreography-deno-queue");
const callbacks = {} as { [key: string]: {key: string, callbacks: ((msg: { id: string }) => void)[]}};

export async function getKv(): ResultGetKV {

const publish = async (key: string, id: string) => {
console.log(`publishing message key: ${key}, id: ${id}`);
await kv.enqueue({ key, id });
};

const subscribe = (key: string, callback: (msg: { id: string }) => void) => {
console.log(`subscribing to message: ${key}`);

if (!callbacks[key]) {
callbacks[key] = {key, callbacks: []};
}
callbacks[key].callbacks.push(callback);

kv.listenQueue((msg) => {
console.log(`message received: ${JSON.stringify(msg)}`);
callbacks[msg.key].callbacks.forEach(cb => {
cb(msg);
});
});
};

return {
publish,
subscribe,
};
}
choreography-deno-queue/main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import { delay } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";
import { getKv, GetKvType } from "./kv.ts";

class Service {
constructor(
getKv: GetKvType,
task: (getKv: GetKvType) => Promise<void> | void,
rollbackTask: (getKv: GetKvType) => Promise<void> | void,
) {
task(getKv);
rollbackTask(getKv);
}
}

const service1 = new Service(
getKv,
async (getKv) => {
const kv = await getKv();
kv.subscribe("service1:taskStart", async ({id}) => {
console.log("Executing task for Service 1");
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

kv.publish("service1:taskCompleted", id);
});
},
async (getKv) => {
const kv = await getKv();
kv.subscribe("service2:taskFailed", async ({id}) => {
console.log(`Rolling back task for Service 1 ${id}`);
const spinner = new Spinner({ message: "Rolling back...", color: "red" });
spinner.start();
await delay(2000);
spinner.stop();
});
},
);
const service2 = new Service(
getKv,
async (getKv) => {
const kv = await getKv();
kv.subscribe("service1:taskCompleted", async ({ id }) => {
console.log(`Executing task for Service 2 ${id}`);
const spinner = new Spinner({ message: "Executing...", color: "yellow" });
spinner.start();
await delay(5000);
spinner.stop();

kv.publish("service2:taskFailed", id); // わざと失敗させる
});
},
async (_getKv) => {
},
);

const kv = await getKv();
kv.publish("service1:taskStart", crypto.randomUUID());

実行すると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ deno task dev
subscribing to message: service1:taskStart
subscribing to message: service2:taskFailed
subscribing to message: service1:taskCompleted
publishing message key: service1:taskStart, id: c79e5a83-2fa0-455a-9264-e10110f5b72d
message received: {"key":"service1:taskStart","id":"c79e5a83-2fa0-455a-9264-e10110f5b72d"}
Executing task for Service 1
publishing message key: service1:taskCompleted, id: c79e5a83-2fa0-455a-9264-e10110f5b72d
message received: {"key":"service1:taskCompleted","id":"c79e5a83-2fa0-455a-9264-e10110f5b72d"}
Executing task for Service 2 c79e5a83-2fa0-455a-9264-e10110f5b72d
publishing message key: service2:taskFailed, id: c79e5a83-2fa0-455a-9264-e10110f5b72d
message received: {"key":"service2:taskFailed","id":"c79e5a83-2fa0-455a-9264-e10110f5b72d"}
Rolling back task for Service 1 c79e5a83-2fa0-455a-9264-e10110f5b72d

この状態は、未だ同じファイルで書かれており、同じプロセスで実行されている。
まだマイクロサービスを考慮された形としては受け入れが難しいとも考えられる。

Choreography を Deno が提供するインフラで 2

DENO Queue を利用したが、複数プロセスでQueueを参照することはちょっと難しい。
それは、特定のキーだけをだけ読み込むということができないため。
先の実装では、どのキーで受け取ったかにより呼び出すものを変えることで対処したが、これはマルチプロセスでは難しい。

ここでは Deno kv を愚直に使いサービスを別ファイルに切り分け、別プロセスで起動することを前提として実装する。

choreography-deno-kv/deno.jsonc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"tasks": {
"parallel:dev1": "deno run -RW --watch entry_point.ts",
"parallel:dev2": "deno run -RW --watch service1.ts",
"parallel:dev3": "deno run -RW --watch service2.ts"
},
"imports": {
"@std/assert": "jsr:@std/assert@1",
"@std/async": "jsr:@std/async@^1.0.13",
"@std/cli": "jsr:@std/cli@^1.0.17",
"@std/uuid": "jsr:@std/uuid@^1.0.7"
},
"unstable": ["kv"]
}
choreography-deno-kv/kv.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import { delay } from "@std/async/delay";
export type ResultGetKV = {
publish: (key: string, id: string) => Promise<void>;
subscribe: (
key: string,
callback: (msg: { id: string }) => Promise<void> | void,
) => void;
row: Deno.Kv;
};
export type GetKvType = () => Promise<ResultGetKV>;

const kv = await Deno.openKv("choreography-deno-kv");

export function getKv(): Promise<ResultGetKV> {
const clientId = crypto.randomUUID();

const publish = async (key: string, id: string) => {
console.log(`publishing message key: ${key}, id: ${id}`);
const subscribers = await kv.list<string>({
prefix: ["choreography-deno-kv", key, "subscribers"],
});

for await (const subscriber of subscribers) {
const atomic = kv.atomic();
atomic.set([
"choreography-deno-kv",
key,
"update",
subscriber.value,
], (new Date()).getTime());
atomic.set([
"choreography-deno-kv",
key,
"messages",
subscriber.value,
id,
], id);
await atomic.commit();
}
};

const subscribe = (
key: string,
callback: (msg: { id: string }) => Promise<void> | void,
) => {
(async () => {
console.log(`subscribing to message: ${key}`);
let lastUpdate = 0;

const atomic = kv.atomic();
atomic.set(
["choreography-deno-kv", key, "subscribers", clientId],
clientId,
);
atomic.set(
["choreography-deno-kv", key, "update", clientId],
(new Date()).getTime(),
);
await atomic.commit();

while (true) {
await delay(10);
const update = await kv.get<number>([
"choreography-deno-kv",
key,
"update",
clientId,
]);

if (!update.value || update.value <= lastUpdate) {
continue;
}
lastUpdate = update.value;

const messages = kv.list<string>({
prefix: [
"choreography-deno-kv",
key,
"messages",
clientId,
],
});

for await (const message of messages) {
await callback({ id: message.value });
await kv.delete(message.key);
}
}
})();
};

return {
publish,
subscribe,
row: kv,
};
}
choreography-deno-kv/service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { GetKvType, ResultGetKV } from "./kv.ts";

export class Service {
constructor(
private getKv: GetKvType,
private task: (getKv: ResultGetKV) => Promise<void> | void,
private rollbackTask: (getKv: ResultGetKV) => Promise<void> | void,
) {
}
async start() {
const kv = await this.getKv();
this.task(kv);
this.rollbackTask(kv);
}
}
choreography-deno-kv/service1.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { delay } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";
import { getKv } from "./kv.ts";
import { Service } from "./service.ts";

const service1 = new Service(
getKv,
async ({ subscribe, publish }) => {
subscribe("service1:taskStart", async (msg: { id: string }) => {
console.log(`Executing task for Service 1 ${msg.id}`);
const spinner = new Spinner({
message: "Executing...",
color: "yellow",
});
spinner.start();
await delay(5000);
spinner.stop();

publish("service1:taskCompleted", msg.id);
});
},
async ({ subscribe }) => {
subscribe("service2:taskfailed", async (msg: { id: string }) => {
console.log(`Rolling back task for Service 1 ${msg.id}`);
const spinner = new Spinner({
message: "Rolling back...",
color: "red",
});
spinner.start();
await delay(2000);
spinner.stop();
});
},
);

service1.start();

console.log("Service 1 started");
choreography-deno-kv/service2.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import { delay } from "@std/async/delay";
import { Spinner } from "@std/cli/unstable-spinner";
import { getKv } from "./kv.ts";
import { Service } from "./service.ts";

const service2 = new Service(
getKv,
async ({ subscribe, publish }) => {
subscribe("service1:taskCompleted", async (msg) => {
console.log(`Executing task for Service 2 ${msg.id}`);
const spinner = new Spinner({
message: "Executing...",
color: "yellow",
});
spinner.start();
await delay(5000);
spinner.stop();

publish("service2:taskfailed", msg.id); // わざと失敗させる

console.log(`Service 2 task failed ${msg.id}`);
});
},
async (_getKv) => {
},
);

service2.start();

console.log("Service 2 started");
choreography-deno-kv/entry_point.ts
1
2
3
4
5
6
7
8
9
import { getKv } from "./kv.ts";
import { delay } from "@std/async/delay";

const kv = await getKv();

// 各サービスの起動が済んでいないとpublish先登録できないので少し待機
await delay(1000);

await kv.publish("service1:taskStart", crypto.randomUUID());

実行すると以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ deno task parallel:*
Task parallel:dev1 deno run -RW --watch entry_point.ts
Task parallel:dev2 deno run -RW --watch service1.ts
Task parallel:dev3 deno run -RW --watch service2.ts
Service 2 started
Service 1 started
subscribing to message: service1:taskCompleted
subscribing to message: service1:taskStart
subscribing to message: service2:taskfailed
publishing message key: service1:taskStart, id: 19d48ceb-c421-4bdb-b20b-9368fafd18f1
Executing task for Service 1 19d48ceb-c421-4bdb-b20b-9368fafd18f1
publishing message key: service1:taskCompleted, id: 19d48ceb-c421-4bdb-b20b-9368fafd18f1
Executing task for Service 2 19d48ceb-c421-4bdb-b20b-9368fafd18f1
publishing message key: service2:taskfailed, id: 19d48ceb-c421-4bdb-b20b-9368fafd18f1
Service 2 task failed 19d48ceb-c421-4bdb-b20b-9368fafd18f1
Rolling back task for Service 1 19d48ceb-c421-4bdb-b20b-9368fafd18f1

複数のプロセスで起動する形をとり、それぞれがメッセージを発行し、やり取りする流れが作れました。
中心にあるのは、Deno KV です。

とここまでで、Deno の提供する機能を利用して、Saga パターンを実装できました。

アプリから使うを考える

ここまでCLIで動くことを前提とした確認について紹介をしました。
ここからは見せ方について考えたいです。

たまに見るWebアプリで、何か設定すると進行状況がメッセージで出てくるものがあります。
ランプがテキストの横で順次点灯したりするものもあります。

それの実装を試みます。

ここでは、Deno KV を利用したChoreographyを利用し、全体の進捗を監視できるように改修し、Freshに載せます。

マルチプロセスで、計算をさせてみることにします。

全体を乗せるのは量があるため、詳細はこちらのリポジトリを参照。

メッセージブローカーは、Deno KV を利用して実装します。

services/messageBroker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/// <reference lib="deno.unstable" />

import { delay } from "@std/async/delay";

const MESSAGE_BROKER_PREFIX = "choreography" as const;
const MESSAGE_BROKER_UPDATE_KEY = "update" as const;
const MESSAGE_BROKER_MESSAGES_KEY = "messages" as const;
const MESSAGE_BROKER_SUBSCRIBERS_KEY = "subscribers" as const;

function getUpdateKey(key: string, clientId: string): string[] {
return [MESSAGE_BROKER_PREFIX, key, MESSAGE_BROKER_UPDATE_KEY, clientId];
}
function getMessagesKey(key: string, clientId: string): string[] {
return [
MESSAGE_BROKER_PREFIX,
key,
MESSAGE_BROKER_MESSAGES_KEY,
clientId,
];
}

function getMessageKey(key: string, clientId: string, id: string): string[] {
return [
MESSAGE_BROKER_PREFIX,
key,
MESSAGE_BROKER_MESSAGES_KEY,
clientId,
id,
];
}
function getSubscribersKey(key: string): string[] {
return [MESSAGE_BROKER_PREFIX, key, MESSAGE_BROKER_SUBSCRIBERS_KEY];
}

function getSubscriberKey(key: string, clientId: string): string[] {
return [MESSAGE_BROKER_PREFIX, key, MESSAGE_BROKER_SUBSCRIBERS_KEY, clientId];
}

export type ResultMessageBroker = {
publish: (
key: string,
taskId: string,
payload: { [key: string]: unknown },
) => Promise<void>;
subscribe: (
key: string,
callback: (
taskId: string,
payload: { [key: string]: unknown },
) => Promise<void> | void,
) => void;
};

type KV_MESSAGE_BLOCKER_RAW_TYPE = {
taskId: string;
payload: { [key: string]: unknown };
};

export async function getMessageBroker(): Promise<ResultMessageBroker> {
const kv = await Deno.openKv("./tmp/choreography");
const clientId = crypto.randomUUID();

const publish = async (
key: string,
taskId: string,
payload: { [key: string]: unknown },
) => {
console.log(
`publishing message key: ${key}, taskId: ${taskId}, payload: ${
JSON.stringify(payload)
}`,
);
const subscribers = await kv.list<string>({
prefix: getSubscribersKey(key),
});

for await (const subscriber of subscribers) {
const atomic = kv.atomic();
atomic.set(getUpdateKey(key, subscriber.value), (new Date()).getTime());
atomic.set(getMessageKey(key, subscriber.value, taskId), {
taskId,
payload,
});
await atomic.commit();
}
};

const subscribe = (
key: string,
callback: (
taskId: string,
payload: { [key: string]: unknown },
) => Promise<void> | void,
) => {
(async () => {
console.info(`subscribing to message: ${key}`);
let lastUpdate = 0;

const atomic = kv.atomic();
atomic.set(
getSubscriberKey(key, clientId),
clientId,
);
atomic.set(
getUpdateKey(key, clientId),
0,
);
await atomic.commit();

while (true) {
await delay(1000);
const update = await kv.get<number>(
getUpdateKey(key, clientId),
);

if (update.value === null || update.value <= lastUpdate) {
continue;
}
lastUpdate = update.value;

const messages = kv.list<KV_MESSAGE_BLOCKER_RAW_TYPE>({
prefix: getMessagesKey(key, clientId),
});

for await (const message of messages) {
await callback(message.value.taskId, message.value.payload);
await kv.delete(message.key);
}
}
})();
};
return {
publish,
subscribe,
};
}

各サービスの基盤のServiceクラスは以下のように実装。

services/service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { ResultMessageBroker } from "./messageBroker.ts";

export class Service {
constructor(
private getMessageBroker: () => Promise<ResultMessageBroker>,
private task: (
{ publish, subscribe }: ResultMessageBroker,
) => Promise<void> | void,
private rollbackTask: (
{ publish, subscribe }: ResultMessageBroker,
) => Promise<void> | void,
) {
}
async start() {
const kv = await this.getMessageBroker();
this.task(kv);
this.rollbackTask(kv);
}
}

具体的な各サービスは以下のように3ファイル実装。

services/service1.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { Service } from "./service.ts";

import { getMessageBroker } from "./messageBroker.ts";
import { work1, work1Rollback } from "./work.ts";

const service1 = new Service(
getMessageBroker,
work1,
work1Rollback,
);

service1.start();

console.log("Service 1 started");
deno.jsonc(tasks のみ抜粋)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"tasks": {
"check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx",
"cli": "echo \"import '\\$fresh/src/dev/cli.ts'\" | deno run --unstable -A -",
"manifest": "deno task cli manifest $(pwd)",
"start:app": "deno run -A --watch=static/,routes/ dev.ts",
// "start:service0": "deno run -WR --watch services/mod.ts",
"start:service1": "deno run -WR --watch services/task1service.ts",
"start:service2": "deno run -WR --watch services/task2service.ts",
"start:service3": "deno run -WR --watch services/task3service.ts",
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update ."
}
}

以下のコマンドで実行。

1
$ deno task start:*

動かすと以下のようになります。
3つのタスクメッセージブローカーを介して、各サービスがメッセージをやり取りしながら進行します。

成功する場合、3段階でconpliteとなります。

失敗する場合、3段階目まで順次completeになっていき、最終的にすべてrollbackになります。

処理自体は、Sagaパターンに則り、計算状態や、ログはポーリングで取得しています。

例に挙げたような、処理の状況がオンラインで順次見えものができました。


今回作ったもの、マルチプロセスでやるには、Deno KV の機能の有効活用というより while ぶん回しみたいな形であった。
もっとスマートにできないものかと記事外でも試したところですが、今回はここまで。
中に組み込んでしまいましたが、Deno Deploy のKVに接続ができるので、Deno Deploy での処理の難しいモノは、EC2なりに任せてしまうことも可能。

もし全部Deno Deployでやるなら、KVの機能でほとんど綺麗に収まるだろうと考えています。
今回最後の実装だと、subscribe側が先に立っていないと通信が上手くいかないので、ここもまた考慮事項。

わざと1個づつ逆順でロールバックをしている箇所、確認した資料次第では、各サービスへ並行に投げているケースも見受けられた。

順番が本当に大事なのだとすれば、サービスを分けないか、Orchestrationが推奨なのだろうと感じる。


今回の実装から、本格的にCopilot Agentも使ってみた。
あまり生成させ過ぎるのは、学習を目的とするところでは不適切なので、意図的に使わない部分もある。
Sagaパターンの実装という意図が汲まれているのか、サジェスト自体もかなり賢い。
Deno KV周りは、学習がないのかサジェストも少ないように感じられた。

一番凄まじいと感じたのは、閉じタグ不足を目で判断できないtsxのエラー。
コンソールのエラーも大した記述ないところ、「直して」だけ書いて直したこと。
こいつとは、仲良くしよう。

では。