Haskellerへの道 #25 - STM

Last Edited: 8/28/2024

この記事では、Haskell におけるSTMを使った並行処理の実装方法を紹介します。

Haskell & STM

前回の記事では、ミューテックス変数、チャンネル、セマフォにおけるデッドロックの発生とその問題点について説明しました。 したがって、今回は、並行処理の一貫性を確保しデッドロックを防ぐための代替メカニズムである **ソフトウェアトランザクショナルメモリ(Software Transactional Memory, STM)**についてお話しします。

ソフトウェアトランザクショナルメモリ

データベースのトランザクションに精通している方には、ソフトウェアトランザクショナルメモリ(STM)は非常に直感的に感じ られるでしょう。ロックベースの同期化(ミューテックス変数、チャンネル、セマフォ)は共有リソースの取得を一つのスレッドに 制限するため、デッドロックを引き起こす可能性があるのに対し、トランザクションベースの方法(STM)ではすべてのスレッドが 共有リソースに制限なくアクセスすることができます。その代わり、共有リソースが矛盾なく更新されるようにし、矛盾が発生した 場合はスレッドを再実行します。こうすることで、一貫性を維持しながらデッドロックを防ぐことができるわけです。 STMはSTMモナドとして実装され、STMアクションとしてトランザクションを実行することができます。

import Control.Concurrent.STM
 
-- STMアクションをIOアクションに変換
atomically :: STM a -> IO a
 
-- トランザクションを明示的に再試行
retry :: STM a
 
-- 条件に応じてSTMアクションを選択(例外処理によく使用される)
orElse :: STM a -> STM a -> STM a
 
-- STMアクション内で例外をスロー
throwSTM :: Exception e => e -> STM a
 
-- トランザクショナル変数(共有メモリ)
data TVar a
 
-- 新たにTVarを作成
newTVar :: a -> TVar a
 
-- TVarの読み取りと書き込み用のSTMアクション
readTVar :: TVar a -> STM ()
writeTVar :: TVar a -> a -> STM ()

STMモジュールには他にも多くの関数やデータ型がありますが、ここでは主要なものに焦点を当てます。定義の詳細には 触れていないので、ドキュメンテーションを確認することを強くお勧めします。

STMアクションには、STMアクションと関数のみを含めることができ、IOアクションを含めることはできない点に注意が必要です。 これは、トランザクションのロールバックができない不要な副作用が発生しないようにするためです。したがって、STMアクション から結果を抽出する唯一の方法は、atomicallyを使用することです。そして、トランザクションのロールバックのタイミング を制御するためのretryorElse、STMアクション内で例外をスローするthrowSTMがあります。STMで共有メモリを 使用する方法の一つとしてTVarがあり、これはトランザクション内で読み書きできます。

使用例

STMの動作を理解するために、実際にSTMを使用した簡単なマルチスレッドプログラムを見てみましょう。以下は、STMを使用して配列内 の要素の合計を計算するプログラムです。

import System.IO
import Control.Concurrent
import Control.Concurrent.STM
 
-- sumとcounterを保持するTVar
type Result = TVar (Int, Int)
 
addToResult :: Result -> Int -> STM ()
addToResult result x = do
  (sum, endCtr) <- readTVar result
  writeTVar result (sum+x, endCtr+1)
 
waitForCounter :: Result -> Int -> STM Int
waitForCounter result limit = do
  (sum, endCtr) <- readTVar result
  if endCtr < limit then retry else return sum
 
main :: IO ()
main = do
  let n = 100
  result <- newTVarIO (0, 0)
  mapM_ (forkIO . atomically . addToResult result) [1..n]
  sum <- atomically $ waitForCounter result n
  putStrLn $ "The sum is " ++ show sum

addToResult関数は、readTVarwriteTVarという2つのSTMアクションを組み合わせてSTMアクションを作成し、 操作をコミットし、失敗した場合には自動的に再試行します。一方、waitForCounter関数は、if文を使用してトランザクション の再試行タイミングを指定します。newTVIO関数はatomically . newTVarと同等であり、TVarを作成し、 それをatomicallyでIOに変換します。

シナジー: HaskellとSTM

前述のように、STMを実装する際には、トランザクションに副作用(IOアクションや可変引数など)が含まれないようにすることが 重要です。これにより、トランザクションを簡単にロールバックできるようになります。Haskellの型システムと 純粋な関数型という性質は、IOアクションを分離し、変数をデフォルトで不変にするため、STMにとって理想的な特性です。 これにより、トランザクションをロールバックする際に関数の入力に加えられた変更を監視して変更を取り消す必要がなくなり、 トランザクションを単に再試行することができます。

クイズ

この記事では、学習した内容を確認するためのクイズを設けます。記事のメイン部分を読んだ後に、ぜひ自分で問題を解いてみることを強くお勧めします。各問題をクリックすると答えが表示されます。

リソース