pipelines in Erlang
I've been working a lot on Project Euler problems. Maybe I'm just a geek, but I have fun with that sort of thing. If you're not familiar, it's a list of a maths problems that are generally made easier with some code and a little bit of CPU-time. After solving a problem, you get access to the forum for it, and your "Genius Per Cent" increases. You can see examples at their site, and I have an example below.
Many of these problems are solvable with just a pencil and a piece of paper, but become much easier with a little bit of (usually very simple) code. There is a rule that the problems given should not require more than about 60 seconds of CPU-time (this isn't enforced, it's just a guideline that if your code takes longer, consider a new algorithm), but for some of the problems a brute-force approach takes less wall-time (Thinking+Coding+Execution).
Since "brute-force" means lots of CPU-time, I've come up with a few
ways to make the best use of the CPU-time available, which means that
I want to use both cores in my laptop, and maybe the CPU resources on
neighbouring machines. One thing that I've used a lot in Erlang is
Stephen Marsh's
plists, which is
just the best thing since concurrently sliced bread. It implements
most of the API from Erlang's stdlib
lists like map, fold,
and sort, but allows you to specify a malt, which is a description
of how you want the problem broken down into concurrent pieces. For
example, "Break the list into 10 parts and process them at the same
time", or "Spread this work across these ten machines" (!). It even
includes a basic
mapreduce
implementation. How easy this makes concurrency of even simple
algorithms just rocks my socks off.
pipelines
In that vein, I've found that a lot of problems in processing large amounts of data can be simplified by forming data-flows or pipelines. This is something you do very frequently in Unix:
cat big_file | sort | uniq | grep 'FAILED' | \
awk '{print $1 $2}' | fix_failures
Ruby just implemented something similar using
generators. The
best part about the shell-variety is that those tools are all running
concurrently. Sure, some parts might stall the pipeline (like sort,
which doesn't continue until it gets all of the data), but even in
that case the cat and the sort run concurrently (and the other
items do the work of starting, if nothing else). You could stick
pv (a small program for
viewing data-flows through pipes and providing progress bars for
programs that don't have their own) anywhere in there and see the
data-flow. Here's a contrived example:
$ dd if=/dev/random | pv -cN dd | bzip2 | pv -cN bzip2 | gzip | \
pv -cN gzip | gunzip | pv -cN gunzip | bunzip2 | pv -cN bunzip2 | \
md5`
dd: 18.1MB 0:00:11 [1.71MB/s]
bzip2: 18.1MB 0:00:11 [1.78MB/s]
gzip: 18.1MB 0:00:11 [1.82MB/s]
gunzip: 18MB 0:00:11 [1.83MB/s]
bunzip2: 16.4MB 0:00:10 [1.63MB/s]
So at any given time, every one of those programs is producing or
receiving. top(1) shows this:
PID COMMAND %CPU TIME #TH #PRTS #MREGS RPRVT RSHRD RSIZE VSIZE
2027 bzip2 54.3% 0:17.79 1 13 27 6604K 240K 6900K 25M
2033 bunzip2 28.0% 0:09.29 1 13 24 3712K 240K 3988K 21M
2025 dd 26.8% 0:08.83 1 13 17 84K 184K 304K 17M
2029 gzip 16.8% 0:05.45 1 13 17 336K 184K 580K 10M
2129 md5 0.5% 0:00.11 1 13 18 96K 184K 312K 18M
Lots of programs are easily expressed conceptually as pipelines, and can do so such that their parts are executed concurrently. Here's an example problem from Project Euler:
39: If
pis the perimeter of a right angle triangle with integral length sides,{a,b,c}, there are exactly three solutions forp= 120.{20,48,52}, {24,45,51}, {30,40,50}For which value of
p< 1000, is the number of solutions maximised?
The program can be expressed as a pipeline (note that, like most Project Euler problems, this can be greatly simplified algebraically; this is not the most efficient solution):
- generate all possible legs of a right triangle with length<1000
- if those legs have a valid hypotenuse, calculate it, otherwise discard it
- remove any whose perimeter exceeds 1,000
- collect those up in a map of Count->Perimeter
- go through that map, finding the largest value of Count
- return it
I can launch all of those steps as processes that run concurrently that alternate between receiving a datum and processing it, and passing it on to the next entry in the pipeline. The representation of these steps as processes is very similar to the Actor Model, and most Erlang programs are already structured this way, just not as directly.
I wrote up a quick pipeline.erl that should make this pretty
easy. The basic usage is:
pipeline:run([pipeline:generator(BigList),
{filter,fun some_filter/1},
{map,fun_some_map/1},
{generic,fun some_complex_function/2},
fun some_more_complicated_function/1,
fun pipeline:collect/1]).
To apply that to our problem:
projecteuler39.erl
main() ->
UpTo=1000,
pipeline:run([%% find all possible legs
pipeline:generator([ {X,Y}
|| X <- lists:seq(1,UpTo),
Y <- lists:seq(1,X) ]),
%% if the two legs form a right-triangle, calculate
%% its side and send it
{generic,fun({X,Y},Pid) ->
Z=math:sqrt(X*X+Y*Y),
RZ=erlang:round(Z),
case RZ==Z of
true ->
Pid ! {X,Y,RZ};
false -> ok
end
end},
%% we only want ones for which P<1000
{filter,fun({X,Y,Z}) -> X+Y+Z<1000 end},
fun perim_collector/1,
fun find_max_perimcount/1,
fun pipeline:collect/1]).
find_max_perimcount(Pid) ->
find_max_perimcount(Pid,{0,0}).
find_max_perimcount(Pid,MaxSoFar={_MaxPerim,MaxCount}) ->
receive
eof ->
Pid ! MaxSoFar,
Pid ! eof;
ThisPerimCount={_Perim,Count} ->
case Count<MaxCount of
true ->
find_max_perimcount(Pid,MaxSoFar);
_ ->
find_max_perimcount(Pid,ThisPerimCount)
end
end.
perim_collector(Pid) ->
perim_collector(Pid,dict:new()).
perim_collector(Pid,Dict) ->
receive
eof ->
dict:fold(fun(Perim,Count,_AccIn) ->
Pid ! {Perim,Count}
end,0,Dict),
Pid ! eof;
{X,Y,Z} ->
P=X+Y+Z,
perim_collector(Pid,
dict:update_counter(P,1,Dict))
end.
It runs in about half the time that the equivalent program written linearly (6 seconds, as opposed to 12, on my dual-core laptop).
pipeline.erl
You can see most of it above, but pipeline.erl has a pretty simple
API.
run/1takes a list ofFunSpecs and executes them, returning the result of the last entry (usuallypipeline:collect/1)create/2takes a list ofFunSpecs and a PID to send the results of the last entry. It's mostly used byrun/1, but can be used to create some more advanced pipelinesgenerator/1will usually be the first entry in the pipeline. It takes a list and injects it into the pipelinecollect/1will usually be the last entry in the pipeline. It collects all of the results into a list and returns that. (Without a collector function,run/1only returns the first message sent by the last entry)tee/1can be placed in a pipeline if you want another process to receive a copy of the messages going through it. It can be used aspipeline:tee(spawn_link(fun pipeline:printer/0))to send those messages to the screen
A FunSpec is a description of a task. It can be one of:
{filter,Fun/1}Propagates messages ifFun(Msg)istrue{map,Fun/1}Propagates the result ofFun(Msg)for each message{generic,Fun/2}Executed as Fun(Msg,Pid) for each messages received, and is expected to send messages to Pid as appropriate- Otherwise, it's a
fun/1that will be launched in its own process, and its PID passed to the previous entry in the pipeline. It will receive messages sent by the previous entry, and'eof'when the pipeline behind it has emptied. It must propagateeoftoPidor the pipeline will stall indefinitely.
The test/0 function tries to use every piece of the API , so it's a
good reference. It would be pretty trivial to extend pipeline.erl to
launch multiple workers per entry, or to form more complex pipelines
with multiple paths, but that would probably require more knowledge
about your application, and this covers the most basic cases. It could
be extended to be a little smarter about its message-passing (a stray
message will confuse it at the moment), and the version I'm using
locally is (but that makes it harder to read, so it's not included
here).
-module(pipeline).
-export([create/2,
generator/1,
tee/1,
collect/1,
run/1,
printer/0,
generic_worker/1]).
-export([test/0]).
%% an example of a full pipeline function process
square_server(Pid) ->
receive
eof ->
Pid ! eof,
ok;
X ->
Pid ! X*X,
square_server(Pid)
end.
test() ->
pipeline:run([pipeline:generator(lists:seq(1,10)),
{filter,fun(X) -> X rem 3 =:= 0 end},
{map, fun(X) -> X + 1 end},
fun square_server/1,
pipeline:tee(
%% spawn a printer in the middle of the pipeline
%% so we can watch messages go by
spawn_link(fun pipeline:printer/0)),
{generic,fun(Msg,Pid) ->
%% simple example generic_worker that
%% duplicates entries
Pid ! Msg,
Pid ! Msg
end},
fun pipeline:collect/1]).
%% take a pipeline and return the result of running it
run(Operators) when is_list(Operators) ->
create(Operators,self()),
receive X -> X end.
%% creates a pipeline that sends results to EndPoint, and returns the
%% first PID in it
create(Operators,EndPoint) when is_list(Operators),
is_pid(EndPoint) ->
lists:foldr(fun(Worker,SendResultsTo) when is_pid(SendResultsTo) ->
make_worker(Worker,SendResultsTo)
end, EndPoint, Operators).
%% returns a pipeline function that injects the items in List into the
%% pipeline
generator(List) when is_list(List) ->
fun(Pid) ->
generator(List,Pid)
end.
%% sends the contents of List, one item at a time, to Pid
generator(List,Pid) when is_list(List),
is_pid(Pid) ->
lists:foreach(fun(Elem) ->
Pid ! Elem
end, List),
Pid ! eof.
%% a pipeline function that gathers messages until it receives 'eof',
%% and then sends them all as one list. Typically the last item in a
%% pipeline
collect(Pid) when is_pid(Pid) ->
collect(Pid,[]).
collect(Pid,Acc) when is_list(Acc),
is_pid(Pid) ->
receive
eof ->
Pid ! lists:reverse(Acc);
X ->
collect(Pid,[ X | Acc ])
end.
%% returns a pipeline function that duplicates messages it receives,
%% passing them unmodified to both the next item in the pipeline, and
%% to some other PID (stopping when it receives eof, but passing that
%% as well).
%% An example as an item in a pipeline:
%%
%% pipeline:run([pipeline:generator(lists:seq(1,10)),
%% tee(spawn_link(fun printer/1)),
%% fun pipeline:collect/1]).
%%
%% That will print the messages as they go over the pipeline without
%% interrupting them.
tee(Pid) when is_pid(Pid) ->
fun(X) when is_pid(X) ->
tee(X,Pid)
end.
%% can't be a generic_worker because it has to propagate 'eof', which
%% generic_workers don't receive
tee(Pid1,Pid2) when is_pid(Pid1),
is_pid(Pid2) ->
receive
X ->
Pid1 ! X,
Pid2 ! X,
case X of
eof -> ok;
X ->
tee(Pid1,Pid2)
end
end.
%% prints (using io:format/2) all messages that the process receives,
%% dying when it receives eof. Typically used with tee/1.
printer() ->
receive
X ->
io:format("~p:~p got ~p~n",[?MODULE,?LINE,X]),
case X of
eof -> ok;
X -> printer()
end
end.
%% -------------------
%%% Internal functions
%% -------------------
%% returns the PID of the pipeline entry described by the FunSpec
make_worker(FunSpec,Pid) when is_pid(Pid) ->
spawn_link(workerspec_to_fun(FunSpec,Pid)).
%% returns a generic_worker that sends the results of Fun(X) to the
%% next in the pipeline
workerspec_to_fun({map,Fun},Pid) when is_function(Fun,1),
is_pid(Pid) ->
workerspec_to_fun({generic,fun(Msg,SendToPid) ->
SendToPid ! Fun(Msg)
end}, Pid);
%% returns a generic_worker that sends a given message if Fun(X) is
%% true
workerspec_to_fun({filter,Fun},Pid) when is_function(Fun,1),
is_pid(Pid) ->
workerspec_to_fun({generic,fun(Msg,SendToPid) ->
case Fun(Msg) of
true -> SendToPid ! Msg;
_ -> ok
end
end}, Pid);
%% returns a worker wrapped in the receive loop (so it just receives
%% the messages passed and the PID to send results to, that dies on
%% eof and loops on its own)
workerspec_to_fun({generic,Fun},Pid) when is_function(Fun,2),
is_pid(Pid) ->
fun() -> generic_worker(Fun, Pid) end;
%% the user has written a receive loop and whatnot on their own
workerspec_to_fun(Fun, Pid) when is_function(Fun,1),
is_pid(Pid) ->
fun() -> Fun(Pid) end.
%% takes a given Fun/1 and returns a pipeline function that does the
%% work of looping and terminating at eof
generic_worker(Fun) when is_function(Fun,1) ->
fun(Pid) ->
generic_worker(Fun,Pid)
end.
generic_worker(Fun,Pid) when is_function(Fun,2),
is_pid(Pid) ->
receive
eof -> Pid ! eof;
X ->
Fun(X,Pid),
generic_worker(Fun,Pid)
end.
Posted by Bryan on January 09, 2008 at 06:29 AM PST #
Posted by David King on January 09, 2008 at 08:36 AM PST #