この記事では、Haskell における 並行処理の実装方法を紹介します。

Haskellに関する記事を最後に公開してから、ほぼ3週間が経過したことにお気づきかもしれません。この期間が空いたのは、 Haskellへの興味を失ったからではありません。むしろ、Haskellにおける並行処理と並列処理の実装に集中するために、 まずC言語での基本的な並行処理と並列処理の概念をカバーしたかったからです。したがって、概念的な理解を深めたい場合は、 Cプログラマーへの道 #13 - マルチスレディングをご覧になることをお勧めします。
スレッド
並行処理や並列処理を実現するためには、マルチスレディングを実装する必要があります。Haskellで新しいスレッドを作成するには、 次のコードを使用します。
import Control.Concurrent
-- 新しいスレッドの作成
-- forkIO :: IO() -> IO ThreadId
f :: Int -> Int -> IO ()
f a b = do
let x = a + b
putStrLn $! show x
main :: IO ()
main = do
...
forkIO $ f 1 2
...
Control.Concurrent
モジュールのforkIO
関数を使用することで、新しいスレッドを作成できます。forkIO
関数は、
IOアクションを入力として受け取り、ThreadId
を返すIOアクションを生成します。関数f
では、Haskellのデフォルトで
ある遅延評価によりスレッドがサンクを返す可能性があるため、$!
を使用して厳格評価を強制しています。
また、Haskellはデフォルトで単一プロセッサのみを使用します。マルチスレディングを有効にするには、ghc -threaded main.hs
のように
-threaded
フラグを使用してコードをコンパイルし、./main +RTS -N
のように-N
フラグを使って実行ファイルを実行する必要があります。
これにより、マシン上の複数のプロセッサにアクセスできます。
ミューテックス変数
別のスレッド内の関数の結果にアクセスするには、MVar
を使用します。これは、C言語のミューテックスロックと同様に機能しますが、
値を格納できます。ミューテックス変数には、一度に1つの値のみが格納され、取り出されます。
import Control.Concurrent
f :: Int -> Int -> MVar Int -> IO ()
f a b mVar = do
putMVar mVar $! (a + b) -- MVarに値を格納
main :: IO ()
main = do
mVar <- newEmptyMVar -- MVarの初期化
forkIO $ f 1 2 mVar
result <- takeMVar mVar -- MVarから値を取り出し
putStrLn $ show result
takeMVar
関数は、putMVar
関数でmVar
に値が格納された後にのみ実行されます。
チャンネル
チャンネル(Chan
)は、別のスレッド内の関数の結果を共有するもう一つの方法です。MVar
とは異なり、スレッドは
チャンネルに複数の値を格納できますが、読み取るためにはチャンネルに既に値が格納されている必要があります。
格納された値は、FIFO(先入れ先出し)キューのように、格納された順序で取り出されます。
import Control.Concurrent
f :: Int -> Int -> Chan Int -> IO ()
f a b chan = do
writeChan chan $! (a + b) -- Chanに値を書き込む
main :: IO ()
main = do
chan <- newChan -- Chanの初期化
forkIO $ f 1 2 chan
forkIO $ f 3 4 chan
result1 <- readChan chan -- Chanから値を読み取る
result2 <- readChan chan -- Chanから値を読み取る
putStrLn $ show result1 -- => (3 or 7)
putStrLn $ show result2 -- => (3 or 7)
読み取りの順序が重要でない場合は、並列実行を容易にするためにチャンネルを使用する方が効果的です。
readChan
関数は、writeChan
を使用してチャンネルに既に値が格納されている場合にのみ読み取ります。
MVarとChanの使用例
それでは、MVarとChanを実際に使用してみましょう。以下のコードは、10個の新しいスレッドを作成し、 それぞれのスレッドIDを出力しようとしています。
getGreeting :: IO String
getGreeting = do
tid <- myThreadId -- スレッドIDを取得
let greeting = "Hello from" ++ show tid
return $! greeting
hello :: IO ()
hello = do
greeting <- getGreeting
putStrLn greeting
main :: IO()
main = do
let n = 10
mapM_ (const $ forkIO $ hello) [1..n]
return ()
しかし、このコードには致命的な欠陥があります。それは、すべてのIDを出力しない場合があルコとです。これは、
return ()
がすべてのスレッドの実行が完了するのを待たずに、プログラムが終了してしまうためです。
すべてのスレッドが実行を完了するまで待つために、空のタプルを格納するチャンネルを作成し、
return ()
を実行する前にチャンネルを10回読み取ることができます。
getGreeting :: IO String
getGreeting = do
tid <- myThreadId
let greeting = "Hello from" ++ show tid
return $! greeting
hello :: Chan () -> IO ()
hello endFlags = do
greeting <- getGreeting
putStrLn greeting
writeChan endFlags ()
main :: IO()
main = do
endFlags <- newChan
let n = 10
mapM_ (const $ forkIO $ hello endFlags) [1..n]
mapM_ (const $ readChan endFlags) [1..n]
上記のコードでは、readChan
がendFlags
チャンネルから10個の空のタプルを読み取るまでプログラムが
終了しないようになっています。そのため、このコードを実行するたびにすべてのスレッドが実行されているの
が確認できます。ただし、このコードは、Haskellがデフォルトで標準出力をバッファリングして保護しているため、
正常に動作しているに過ぎません。
バッファリングを無効にするには、System.IO
をインポートし、メインのIOアクション内で
hSetBuffering stdout NoBuffering
を追加します。この状態で実行すると、
複数のスレッドで並列してputStrLn
が実行されるため、出力が次のようになります。
HeHHHHHHHHHleeeeeeeeellllllllllolllllllll ooooooooof rffffffffforrrrrrrrrmooooooooommTmmmmmmmTThTTTTTTThhrhhhhhhhrrerrrrrrreeaeeeeeeeaadaaaaaaaddIdddddddIIdIIIIIIIdd ddddddd 1 1115111122296
478310
この問題を修正するには、MVar
を使用し、putStrLn
操作をミューテックスロックのようにロックします。
getGreeting :: IO String
getGreeting = do
tid <- myThreadId
let greeting = "Hello from" ++ show tid
return $! greeting
hello :: MVar () -> Chan () -> IO ()
hello mutexLock endFlags = do
greeting <- getGreeting
takeMVar mutexLock
putStrLn greeting
putMVar mutexLock ()
writeChan endFlags ()
main :: IO()
main = do
hSetBuffering stdout NoBuffering
mutexLock <- newEmptyMVar
endFlags <- newChan
let n = 10
mapM_ (const $ forkIO $ hello mutexLock endFlags) [1..n]
putMVar mutexLock ()
mapM_ (const $ readChan endFlags) [1..n]
このコードを実行すると、以下のように綺麗な出力が得られるはずです。
Hello fromThreadId 14
Hello fromThreadId 13
Hello fromThreadId 16
Hello fromThreadId 12
Hello fromThreadId 15
Hello fromThreadId 18
Hello fromThreadId 19
Hello fromThreadId 21
Hello fromThreadId 20
Hello fromThreadId 17
ロックを作成する場所には注意が必要です。そうでないと、並列処理の利点を十分に活用できない可能性があります。
クイズ
この記事では、学習した内容を確認するためのクイズを設けます。記事のメイン部分を読んだ後に、ぜひ自分で問題を解いてみることを強くお勧めします。各問題をクリックすると答えが表示されます。
リソース
- Philipp, Hagenlocher. 2020. Haskell for Imperative Programmers #28 - Concurrency & Threads. YouTube.