最近、「データ志向プログラミング ソフトウェアがもつ複雑さの軽減に向けて 」を読んだ。
この本、翻訳技術書でたまに見かける「エキサイティングな体験」を感じた書籍であった。 プログラミングパラダイムの本となると、理念的な話が先行しているものを見かけることが多いと感じる。 本書は、現実の目の前にあるアプリケーションへどう適用するのかといった点までフォーカスされておりそれが良かった。
で、本書で、「アトム」が説明されるのだが、マルチスレッド環境と思ってjsで書くというものであった。 内容について納得はできるものの、ここはあえて触っておきたい。
参考
アトム アトム、アトムというが、アトムとは何か。 アトムとは、トランザクションにおける「アトミック性」を指す。 アトミック性とは、トランザクションが完全に成功するか、 完全に失敗するかのどちらかであり、中途半端な状態にならないことを意味する。
で、アトミックを略してアトムと呼ぶそうだ。 ちなみに、アトムはデッドロックを決して発生しないものと紹介される。
そのうえで、本書では全体に楽観的ロックを使用しており、「アトムを使う」とは、「楽観的ロックを使う」ことに同じとなる。
なので、以降楽観的ロックのことを述べていく。
楽観的ロック JSだけケース(仮想的でありマルチプロセス、マルチスレッドではない) マルチプロセス、マルチスレッドではないが、イベントべ――スで非同期な実行によりそれっぽいものは作れる。
atom_test.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import { delay } from "jsr:@std/async" ;const atom = ( ) => { let value = 0 ; return { get : () => value, swap : (currentValue: number , newValue: number ) => { if (value !== currentValue) { throw new Error ("Value has been changed by another process." ); } value = newValue; }, }; }; const counter = ( ) => { let value = 0 ; let stageValue = 0 ; return { increment : () => { stageValue = value + 1 ; }, commit : () => { value = stageValue; return value; }, }; }; const counterInstance = counter ();const atomInstance = atom ();const intervalPcoccess1 = async ( ) => { try { const currentValue = atomInstance.get (); counterInstance.increment (); await delay (Math .random () * 1000 ); atomInstance.swap (currentValue, currentValue + 1 ); counterInstance.commit (); console .log (`Interval1 Counter value: ${counterInstance.commit()} ` ); } catch (error : any ) { console .error (`Interval1 message: ${error.message} ` ); } setTimeout (intervalPcoccess1, Math .random () * 1000 ); }; const intervalPcoccess2 = async ( ) => { try { const currentValue = atomInstance.get (); counterInstance.increment (); await delay (Math .random () * 1000 ); atomInstance.swap (currentValue, currentValue + 1 ); counterInstance.commit (); console .log (`Interval2 Counter value: ${counterInstance.commit()} ` ); } catch (error : any ) { console .error (`Interval2 message: ${error.message} ` ); } setTimeout (intervalPcoccess2, Math .random () * 1000 ); }; setTimeout (intervalPcoccess1, 0 );setTimeout (intervalPcoccess2, 0 );
実行すると以下のようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 $ deno run atom_test.ts Interval1 Counter value: 1 Interval2 message: Value has been changed by another process. Interval1 Counter value: 2 Interval2 Counter value: 3 Interval1 message: Value has been changed by another process. Interval2 Counter value: 4 Interval1 message: Value has been changed by another process. Interval2 Counter value: 5 Interval1 message: Value has been changed by another process. Interval2 Counter value: 6 Interval2 Counter value: 7 Interval1 message: Value has been changed by another process. Interval2 Counter value: 8 Interval1 message: Value has been changed by another process. Interval2 Counter value: 9
競合が解決され、2つの処理時間がランダムな処理の中でカウントを飛ばしたりせずに処理で来ている。
ちなみに、atom を外すと、以下のようになる。
no_atom_test.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import { delay } from "jsr:@std/async" ;const counter = ( ) => { let value = 0 ; let stageValue = 0 ; return { increment : () => { stageValue = value + 1 ; }, commit : () => { value = stageValue; return value; }, }; }; const counterInstance = counter ();const intervalPcoccess1 = async ( ) => { try { counterInstance.increment (); await delay (Math .random () * 1000 ); counterInstance.commit (); console .log (`Interval1 Counter value: ${counterInstance.commit()} ` ); } catch (error : any ) { console .error (`Interval1 message: ${error.message} ` ); } setTimeout (intervalPcoccess1, Math .random () * 1000 ); }; const intervalPcoccess2 = async ( ) => { try { counterInstance.increment (); await delay (Math .random () * 1000 ); counterInstance.commit (); console .log (`Interval2 Counter value: ${counterInstance.commit()} ` ); } catch (error : any ) { console .error (`Interval2 message: ${error.message} ` ); } setTimeout (intervalPcoccess2, Math .random () * 1000 ); }; setTimeout (intervalPcoccess1, 0 );setTimeout (intervalPcoccess2, 0 );
実行すると以下のようになり、競合を解決できないことがわかる。
1 2 3 4 5 6 7 8 9 10 11 $ deno run no_atom_test.ts Interval1 Counter value: 1 Interval2 Counter value: 1 Interval2 Counter value: 2 Interval1 Counter value: 2 Interval1 Counter value: 3 Interval1 Counter value: 4 Interval2 Counter value: 5 Interval1 Counter value: 5 Interval1 Counter value: 6 Interval2 Counter value: 7
SQLで担保するケース SQLで楽観的ロックを実装する場合、通常はバージョン番号やタイムスタンプを使用して、データの変更が行われたかどうかを確認する。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 import { Database } from "jsr:@db/sqlite@0.11" ;import { delay } from "jsr:@std/async" ;const db = new Database ("test.db" );await delay (Math .random ()*1000 );db.exec (`create table if not exists atom ( id integer not null primary key autoincrement, name text not null unique, value integer not null default 0 )` );db.exec (` create table if not exists counter ( id integer primary key autoincrement, name text not null unique, value integer not null default 0 )` );const counterName = `counter` ;db.prepare (` insert into counter (name) values (?) ON CONFLICT(name) DO nothing ` ).run (counterName);const processName = Deno .args [0 ] || "default_process" ;const atomName = `counter_atom` ;const intervalProcess = async ( ) => { try { db.prepare ( "INSERT INTO atom (name) VALUES (?) ON CONFLICT(name) do nothing" , ).run (atomName); const atomValue = db.prepare ("SELECT value FROM atom WHERE name = ?" ).get ( atomName, ) as { value : number }; const nextAtomValue = atomValue.value + 1 ; const existingCounter = db.prepare ( "SELECT value FROM counter WHERE name = ?" , ).get (counterName) as { value : number }; const nextCounterValue = existingCounter.value + 1 ; await delay (Math .random () * 1000 ); db.transaction (() => { const result1 = db.prepare (` UPDATE atom SET value = ? WHERE name = ? and value = ? ` ).run (nextAtomValue, atomName, atomValue.value ); if ( result1 !== 1 ){ throw new Error (`Failed to update atom: ${atomName} ` ); } const result2 = db.prepare (` UPDATE counter SET value = ? WHERE name = ? ` ).run (nextCounterValue, counterName); if ( result2 !== 1 ){ throw new Error (`Failed to update counter: ${counterName} ` ); } }).deferred (); console .log (`${processName} Counter value: ${nextCounterValue} ` ); } catch (error : unknown ) { const errorMessage = error instanceof Error ? error.message : String (error); console .error ( `Process ${processName} encountered an error: ${errorMessage} ` , ); } setTimeout (intervalProcess, Math .random () * 1000 ); }; setTimeout (intervalProcess, 0 );
またマルチプロセス実行のため、タスクを記述しておく。
deno.json 1 2 3 4 5 6 7 8 { "tasks" : { "atom_test:1" : "deno run -RWE --allow-ffi --watch atom_test_2.ts process1" , "atom_test:2" : "deno run -RWE --allow-ffi --watch atom_test_2.ts process2" } , "imports" : { } }
deno task のワイルドカード実行により、並列起動して実行すると以下のようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 $ deno task atom_test:* Task atom_test:1 deno run -RWE --allow-ffi --watch atom_test_2.ts process1 Task atom_test:2 deno run -RWE --allow-ffi --watch atom_test_2.ts process2 Watcher Process started. Watcher Process started. process1 Counter value: 1 process2 Counter value: 2 process2 Counter value: 3 Process process1 encountered an error: Failed to update atom: counter_atom process2 Counter value: 4 process1 Counter value: 5 Process process2 encountered an error: Failed to update atom: counter_atom process1 Counter value: 6 Process process2 encountered an error: Failed to update atom: counter_atom process1 Counter value: 7 Process process2 encountered an error: Failed to update atom: counter_atom process1 Counter value: 8 process1 Counter value: 9 Process process2 encountered an error: Failed to update atom: counter_atom
データベースにsqliteを使用したため、たまにファイルハンドラがとれないためか、エラー発生することがある。 その点を除けば、共同で利用されるSQLを基準としたデータ競合を検知し、更新を破棄している 上記の記載はうまくいった例となる。
以下のようにatomを使わないケースでは、もちろん競合してしまう。
no_atom_test_2.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import { Database } from "jsr:@db/sqlite@0.11" ;import { delay } from "jsr:@std/async" ;const db = new Database ("test.db" );await delay (Math .random ()*1000 );db.exec (` create table if not exists counter ( id integer primary key autoincrement, name text not null unique, value integer not null default 0 )` );const counterName = `counter` ;db.prepare (` insert into counter (name) values (?) ON CONFLICT(name) DO nothing ` ).run (counterName);const processName = Deno .args [0 ] || "default_process" ;const atomName = `counter_atom` ;const intervalProcess = async ( ) => { try { const existingCounter = db.prepare ( "SELECT value FROM counter WHERE name = ?" , ).get (counterName) as { value : number }; const nextCounterValue = existingCounter.value + 1 ; await delay (Math .random () * 1000 ); db.transaction (() => { const result2 = db.prepare (` UPDATE counter SET value = ? WHERE name = ? ` ).run (nextCounterValue, counterName); if ( result2 !== 1 ){ throw new Error (`Failed to update counter: ${counterName} ` ); } }).deferred (); console .log (`${processName} Counter value: ${nextCounterValue} ` ); } catch (error : unknown ) { const errorMessage = error instanceof Error ? error.message : String (error); console .error ( `Process ${processName} encountered an error: ${errorMessage} ` , ); } setTimeout (intervalProcess, Math .random () * 1000 ); }; setTimeout (intervalProcess, 0 );
実行すると以下のようになり、競合解決できないことがわかる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 $ deno task no_atom_test:* Task no_atom_test:1 deno run -RWE --allow-ffi --watch no_atom_test_2.ts process1 Task no_atom_test:2 deno run -RWE --allow-ffi --watch no_atom_test_2.ts process2 Watcher Process started. Watcher Process started. process2 Counter value: 1 process1 Counter value: 1 process2 Counter value: 2 process2 Counter value: 3 process1 Counter value: 3 process1 Counter value: 4 process2 Counter value: 4 process2 Counter value: 5 process1 Counter value: 6
トランザクションがあるので、このあたりの整合性の担保は楽。
Deno KV Deno KV は、ドキュメントに以下の記述がある。
Keys are versioned on write by assigning the key an ever-increasing “versionstamp”. The versionstamp represents the version of a key-value pair in the database at some point in time, and can be used to perform transactional operations on the database without requiring any locking. This is enabled by atomic operations, which can have conditions that ensure that the operation only succeeds if the versionstamp of the key-value pair matches an expected versionstamp.
ということで、versionstampを使用して、ロックを使用せずトランザクション操作を実現する。 これがatomic操作でだけ実現できるとされます。
楽観的ロックのことは後から学んだので、KV のatomicの仕様のことを「楽観的ロック」のものだとあまり意識していなかった。仕様としての理解はあったけども。
KVの場合、versionstampが必ず設定されるのでcounterだけ実現はできるのだが、あえてatomとして使った実装は次のようになる。
atom_test_3.ts 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import { delay } from "jsr:@std/async" ;const kv = await Deno .openKv ("test_kv" );const processName = Deno .args [0 ] || "default_process" ;const atomName = `counter_atom` ;const counterName = `counter` ;await kv.set ([counterName], 0 );const intervalProcess = async ( ) => { try { const atomResult = await kv.get <number >([atomName]); const nextAtomValue = atomResult.value ! + 1 ; const counterResult = await kv.get <number >([counterName]); const nextCounterValue = counterResult.value ! + 1 ; await delay (Math .random () * 1000 ); const atomicOp = kv.atomic () .check ({ key : [atomName], versionstamp : atomResult.versionstamp }) .set ([atomName], nextAtomValue ) .set ([counterName], nextCounterValue ); const result = await atomicOp.commit (); if (!result.ok ) { throw new Error (`Failed to update counters atomically` ); } console .log (`${processName} Counter value: ${nextCounterValue} ` ); } catch (error : unknown ) { const errorMessage = error instanceof Error ? error.message : String (error); console .error ( `Process ${processName} encountered an error: ${errorMessage} ` , ); } setTimeout (intervalProcess, Math .random () * 1000 ); }; setTimeout (intervalProcess, 0 );
実行すると以下のようになる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 $ deno task --unstable-kv atom_test:* Task atom_test:1 deno run -RW --unstable-kv --watch atom_test_3.ts process1 Task atom_test:2 deno run -RW --unstable-kv --watch atom_test_3.ts process2 Watcher Process started. Watcher Process started. Process process1 encountered an error: database is locked process2 Counter value: 1 process2 Counter value: 2 Process process1 encountered an error: Failed to update counters atomically process1 Counter value: 3 Process process2 encountered an error: Failed to update counters atomically process1 Counter value: 4 Process process2 encountered an error: Failed to update counters atomically process1 Counter value: 5 process2 Counter value: 6 process1 Counter value: 7 Process process2 encountered an error: Failed to update counters atomically process1 Counter value: 8 Process process2 encountered an error: Failed to update counters atomically process2 Counter value: 9
競合した場合には破棄されて、最終的なカウント処理が正しいことを見て取れる。
アトムとして紹介されていた楽観的ロックをいくつか試した。
冒頭の通り「データ志向プログラミング ソフトウェアがもつ複雑さの軽減に向けて」はエキサイティングな書籍であった。
普段はActiveRecordやORM的な頭で考えているのでTS環境での見通しが悪くなっていたと感じる。 TSの環境でドライバーだけで扱うのであれば、必ずしもORMまで使う必要は無いのだと再認識できた。
では。