Dataflow variables in Erlang

Implementing dataflow variables in Erlang, as library, is oddly easy.

Here is the whole code:

-export([set/1, get/1, main/0]).
-define(DF_SET(Exp), prog:set(fun() -> Exp end)).
-define(DF_GET(Var), prog:get(Var)).
%% Creates a new dataflow var
set(Fun) ->
    spawn(fun() -> Res = Fun(), resolved(Res) end).
%% Loop to keep the result accessible
resolved(Res) ->
        {get, Ref, Pid} -> Pid ! {value, Ref, Res}
%% Blocks until the future has evaluated
get(Var) ->
    Ref = make_ref(),
    Var ! {get, Ref, self()},
        {value, Ref, Res} -> Res
%% Time consuming friend
fib(0) -> 0;
fib(1) -> 1;
fib(N) when N > 1 -> fib(N-1) + fib(N-2).
main() ->
    X = ?DF_SET(fib(30)),
    Y = ?DF_SET(fib(20)),
    io:format("Computation starts, we do something else in the meantime...~n"),
    io:format("Accessing the result will block until it's resolved~n"),
    io:format("Result is: ~w ~n", [?DF_GET(X) + ?DF_GET(Y)]),
    io:format("Subsequent accesses will be instantaneous~n").

How does it work ?

We can implement a dataflow variable with an Erlang process, because:

  1. Erlang processes are really cheap, so we can afford to spawn one for each variable.
  2. a process can block to wait for a specific message. We use this to block the current process when we ‘look up’ a variable whose value is not yet available.

Breaking it down

To create a dataflow variable, you provide a computation:

X = prog:set(fun() -> fib(30) end)

This spawns a process p which starts executing fib(30) right away (see Res = Fun() in set(Fun)).

When the current process try to access X’s value:

Result = prog:get(X)

it sends a lookup message to p:

 X ! {get, Ref, self()}

and blocks (because of receive ... end) until p responds.

If p is not done computing the result of fib(30), the message sits in its mailbox.

When p is done,  it enters an infinite loop (the resolved function), and responds to pending / incoming lookups.


We can improve syntax slightly with some help from the following macros:

-define(DF_SET(Exp), prog:set(fun() -> Exp end)).
-define(DF_GET(Var), prog:get(Var)).

we can now write

X = ?DF_SET(Exp)

instead of

X = prog:set(fun() -> Exp end)



instead of


Les commentaires sont fermés.