Fresh にWebSocketのFirstClassサポートが追加されたので試す

Fresh 2.3 からWebSocketのFirstClassサポートが追加 されたので、試します。

参考

実装

注意事項

ドキュメントにも記載の通り、FirstClassサポートは入ったが、開発においてこの機能が使えるのは、Fresh 2.4以降かつDeno2.8以降になる。
そのため、一旦ビルドを要する。
このためHMRは効かないなど、一定程度現状では不便。

シンプル

サンプルとして出されているようなシンプルな実装は以下のようになる。

main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { App, staticFiles } from "fresh";
import { type State } from "./utils.ts";

export const app = new App<State>();

app.use(staticFiles());

app.ws("/ws", {
open(_socket) {
console.log("Client connected");
},
message(socket, event) {
socket.send(`Echo: ${event.data} `);
},
close(_socket, code, reason) {
console.log("Client disconnected", code, reason);
},
});

app.fsRoutes();

これを受け取る側は次のように書くことができる。
Freshで書くので、WebSocket通信する側は、Islandコンポーネントで書くこ都とする。

islands/WsResult.tsx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import { useEffect, useRef, useState } from "preact/hooks";

function buildWsUrl(path: string): string {
const protocol = globalThis.location.protocol === "https:" ? "wss:" : "ws:";
console.log(
"WebSocket URL:",
`${protocol}//${globalThis.location.host}${path}`,
);
return `${protocol}//${globalThis.location.host}${path}`;
}

export default function WsResult() {
const [status, setStatus] = useState("idle");
const [input, setInput] = useState("hello from island");
const [messages, setMessages] = useState<string[]>([]);
const socketRef = useRef<WebSocket | null>(null);

useEffect(() => {
setStatus("connecting");
const wsUrl = buildWsUrl("/ws");
const socket = new WebSocket(wsUrl);
socketRef.current = socket;

socket.addEventListener("open", () => {
setStatus("connected");
socket.send("connected");
});

socket.addEventListener("message", (event) => {
setMessages((prev) => [...prev, String(event.data)]);
});

socket.addEventListener("close", () => {
setStatus("closed");
});

socket.addEventListener("error", () => {
setStatus("error");
});

return () => {
socketRef.current = null;
socket.close();
};
}, []);

const sendMessage = () => {
if (socketRef.current?.readyState !== WebSocket.OPEN) {
setStatus("not-connected");
return;
}
socketRef.current.send(input);
};

return (
<section class="w-full rounded-lg border border-lime-400/40 bg-zinc-900/60 p-4 text-left">
<h2 class="text-xl font-semibold text-lime-300">/ws result viewer</h2>
<p class="mt-2 text-sm text-zinc-300">status: {status}</p>

<div class="mt-4 flex flex-col gap-2 sm:flex-row">
<input
class="w-full rounded-md border border-zinc-500 bg-zinc-950 px-3 py-2 text-sm text-zinc-100"
value={input}
onInput={(e) => setInput((e.target as HTMLInputElement).value)}
/>
<button
type="button"
class="rounded-md bg-lime-400 px-4 py-2 text-sm font-semibold text-zinc-900"
onClick={sendMessage}
>
send
</button>
</div>

<div class="mt-4 rounded-md bg-black/40 p-3 text-sm text-zinc-200">
<p class="mb-2 font-medium text-zinc-100">received messages</p>
{messages.length === 0
? <p class="text-zinc-400">No messages yet.</p>
: (
<ul class="space-y-1">
{messages.map((message, index) => <li key={index}>{message}</li>)}
</ul>
)}
</div>
</section>
);
}

これを動かすと次のようになる。

非常にシンプルなものになる。

サーバーからデータを流し続ける

先ほどのものは、サンプルをベースに送ったらメッセージを返すだけのものとした。
続けて、サーバーから定期的にデータを流し続けるようなものも試してみる。

main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { App, staticFiles } from "fresh";
import { type State } from "./utils.ts";

export const app = new App<State>();

app.use(staticFiles());

app.ws("/ws", {
open(_socket) {
console.log("Client connected");
},
message(socket, event) {
socket.send(`Echo: ${event.data} --`);
setInterval(() => { // 2秒ごとにサーバーからクライアントに現在時刻を送る
socket.send(`polling Now: ${ new Date().toLocaleTimeString()}`);
}, 2000);
},
close(_socket, code, reason) {
console.log("Client disconnected", code, reason);
},
});

app.fsRoutes();

メッセージに混ぜて、サーバーから定期的に現在時刻を送るようにした。

送れている。

これはローカル環境だからだっただろうか。Deno Deployでは、1リクエスト当たりの処理時間制約があったはずだった。
懸念したので一応動かしてみておいたが、Deno Deployでも動作する。
(どうも、依然と制限の記載が変わっているようにも見えたが特に証拠はない。)
放置しておいたら本当にずっと動いていた。

部屋を立てる

現在の実装は、クライアントとサーバー間が1対1のものになっている。
これを、複数のクライアントが同じ部屋に入って、同じメッセージを受け取れるようなものにしてみる。

面倒なので、ここではチャットルームは1部屋とする。

main.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import { App, staticFiles } from "fresh";
import { type State } from "./utils.ts";

const clients = new Set<WebSocket>();
const clientIdMap = new Map<WebSocket, string>();
const kv = await Deno.openKv();
let workerTimer: ReturnType<typeof setInterval> | null = null;

export const app = new App<State>();

app.use(staticFiles());

app.ws("/ws", {
open(socket){
clients.add(socket);
clientIdMap.set(socket, crypto.randomUUID());

if (workerTimer) {
return;
}

const broadcastFromQueue = async () => {
const staleClients: WebSocket[] = [];

for await (const entry of kv.list<{ msg: string; id: string }>({
prefix: ["messages"],
})) {
console.log("Checking for new messages in KV...");
const value = entry.value;

if (value.id === clientIdMap.get(socket)) {
continue;
}

const message = value.msg;

for (const client of clients) {
if (client.readyState !== WebSocket.OPEN) {
staleClients.push(client);
continue;
}

try {
client.send(message);
} catch (_error) {
staleClients.push(client);
}
}

await kv.delete(entry.key);
}

for (const staleClient of staleClients) {
clients.delete(staleClient);
}
};

workerTimer = setInterval(() => {
void broadcastFromQueue();
}, 1000);
},

message(socket, event) {
const value = { msg: String(event.data), id: clientIdMap.get(socket) };

void kv.set(["messages", crypto.randomUUID()], value);
},

close(socket, code, reason) {
clients.delete(socket);
clientIdMap.delete(socket);
if (clients.size === 0 && workerTimer) {
clearInterval(workerTimer);
workerTimer = null;
}
console.log("Client disconnected", code, reason);
}

});

app.fsRoutes();

ローカルで動かすと問題はないのだが、マルチプロセスで動かした際には、必ずしも同じプロセス上で2つの接続を持てない。
なので、メッセージはKVで持つ形を取った。
部屋の定義もKVで持てばあまり手間なく、参加者情報含めマルチプロセス対応できる。

ディレクトリベースルーティング

routes/ws.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/// <reference lib="deno.unstable" />

import { define } from "@/utils.ts";

const clients = new Set<WebSocket>();
const kv = await Deno.openKv();
let workerTimer: ReturnType<typeof setInterval> | null = null;

export const handler = define.handlers({
GET(ctx) {
const { socket, response } = ctx.upgrade();

const randomId = crypto.randomUUID();

socket.onopen = () => {
clients.add(socket);

if (workerTimer) {
return;
}

const broadcastFromQueue = async () => {
const staleClients: WebSocket[] = [];

for await (const entry of kv.list<{ msg: string; id: string }>({
prefix: ["messages"],
})) {
const value = entry.value;

if (value.id === randomId) {
continue;
}

const message = value.msg;

for (const client of clients) {
if (client.readyState !== WebSocket.OPEN) {
staleClients.push(client);
continue;
}

try {
client.send(message);
} catch (_error) {
staleClients.push(client);
}
}

await kv.delete(entry.key);
}

for (const staleClient of staleClients) {
clients.delete(staleClient);
}
};

workerTimer = setInterval(() => {
void broadcastFromQueue();
}, 1000);
};

socket.onmessage = (event: MessageEvent) => {
const value = { msg: String(event.data), id: randomId };

void kv.set(["messages", crypto.randomUUID()], value);
};

socket.onclose = () => {
clients.delete(socket);

if (clients.size === 0 && workerTimer) {
clearInterval(workerTimer);
workerTimer = null;
}
};

return response;
},
});

ディレクトリベースルーティングでも動く。
ざっくり作ったので、バグありの可能性あり。


以上、Fresh 2.3で追加されたWebSocketのFirstClassサポートをさわってきました。

Deno 2.4 で開発サーバーでも使えるようになってからが本番とも言えるでしょう。

では。