A Monad for concurrency

This code is an attempt to port the Haskell code in A poor man’s concurrency monad to OCaml.

The main idea here, is about using a monadic interface to compose (CPS’d) functions. The resulting program can be run concurrently by interleaving and evaluating, on-demand, its constituents.

module Conc = 
struct
  type action = Atom of zaction
                | Fork of (zaction * zaction)
                | Stop
  and zaction  = action lazy_t
 
  (* CPS monad *)
  (* type 'a t = ('a -> action) -> action *)
  let bind f k = fun c -> f (fun a -> k a c)
  let return x = fun c -> c x  
 
  let atom f = fun c -> Atom (lazy (let b = f () in c b))
  let action f = f (fun () -> Stop)
  let fork f = fun c -> Fork (lazy (action f), lazy (c ()))
  let par m1 m2 = fun c -> Fork (lazy (m1 c), lazy (m2 c)) 
  let stop = fun c -> Stop
  let rec round = function
    | [] -> ()
    | (x::xs) -> match x with
        | Atom th -> let y = Lazy.force th in round (xs @ [y])
        | Fork (a1, a2) -> round (xs @ [Lazy.force a1; Lazy.force a2])
        | Stop -> round xs
 
  let run m = round [action m]
end
 
open Conc
 
(* given f and a number n, returns f composed with itself
   n times *)
let rec loop f = function
  | 0 -> stop
  | n -> bind f (fun () -> loop f (n-1))
 
(* example 1, prints "start", then prints the words "cat" 
   and "fish", interleaved, 10 times*)
let write1 s = atom (fun _ -> print_string s)
let example1 : ((unit -> action) -> action) = 
  bind (write1 "start>>>") 
    (fun _ -> bind (fork (loop (write1 "cat") 10)) 
       (fun _ -> (loop (write1 "fish") 10)))
 
 
let explode s =
  let rec exp i l =
    if i < 0 then l else exp (i - 1) (s.[i] :: l) in
  exp (String.length s - 1) []
 
(* write2: string -> ('a -> action) -> action
   prints each character one after the other *)
let write2 s = 
  let rec write2_aux = function
    | []    -> atom (fun _ -> ())
    | c::cs -> bind (atom (fun _ -> print_char c)) 
                    (fun _ -> write2_aux cs)
  in write2_aux (explode s)
 
(* write2: given a string, returns a CPSed function
   which prints each character one after the other *)
let example2 : ((unit -> action) -> action) = 
  bind (write2 "start>>>") 
    (fun _ -> bind (fork (loop (write2 "cat") 10)) 
       (fun _ -> (loop (write2 "fish") 10)))
 
let _ = 
  run example1; 
  print_newline ();
  run example2

Dataflow variables in Erlang

Implementing dataflow variables in Erlang, as library, is oddly easy.

Here is the whole code:

-module(prog).
 
-export([set/1, get/1, main/0]).
 
-define(DF_SET(Exp), prog:set(fun() -> Exp end)).
-define(DF_GET(Var), prog:get(Var)).
 
%% Creates a new dataflow var
set(Fun) ->
    spawn(fun() -> Res = Fun(), resolved(Res) end).
 
%% Loop to keep the result accessible
resolved(Res) ->
    receive
        {get, Ref, Pid} -> Pid ! {value, Ref, Res}
    end,
    resolved(Res).
 
%% Blocks until the future has evaluated
get(Var) ->
    Ref = make_ref(),
    Var ! {get, Ref, self()},
    receive
        {value, Ref, Res} -> Res
    end.
 
%% Time consuming friend
fib(0) -> 0;
fib(1) -> 1;
fib(N) when N > 1 -> fib(N-1) + fib(N-2).
 
main() ->
    X = ?DF_SET(fib(30)),
    Y = ?DF_SET(fib(20)),
    io:format("Computation starts, we do something else in the meantime...~n"),
    io:format("Accessing the result will block until it's resolved~n"),
    io:format("Result is: ~w ~n", [?DF_GET(X) + ?DF_GET(Y)]),
    io:format("Subsequent accesses will be instantaneous~n").

How does it work ?

We can implement a dataflow variable with an Erlang process, because:

  1. Erlang processes are really cheap, so we can afford to spawn one for each variable.
  2. a process can block to wait for a specific message. We use this to block the current process when we ‘look up’ a variable whose value is not yet available.

Breaking it down

To create a dataflow variable, you provide a computation:

X = prog:set(fun() -> fib(30) end)

This spawns a process p which starts executing fib(30) right away (see Res = Fun() in set(Fun)).

When the current process try to access X’s value:

Result = prog:get(X)

it sends a lookup message to p:

 X ! {get, Ref, self()}

and blocks (because of receive ... end) until p responds.

If p is not done computing the result of fib(30), the message sits in its mailbox.

When p is done,  it enters an infinite loop (the resolved function), and responds to pending / incoming lookups.

Syntax

We can improve syntax slightly with some help from the following macros:

-define(DF_SET(Exp), prog:set(fun() -> Exp end)).
-define(DF_GET(Var), prog:get(Var)).

we can now write

X = ?DF_SET(Exp)

instead of

X = prog:set(fun() -> Exp end)

and

?DF_GET(X)

instead of

prog:get(X)