pipelines in Erlang

09:39PM Jan 08, 2008 in category Erlang by David King

(as always, cross-posted to wrotit)

I've been working a lot on Project Euler problems. Maybe I'm just a geek, but I have fun with that sort of thing. If you're not familiar, it's a list of a maths problems that are generally made easier with some code and a little bit of CPU-time. After solving a problem, you get access to the forum for it, and your "Genius Per Cent" increases. You can see examples at their site, and I have an example below.

Many of these problems are solvable with just a pencil and a piece of paper, but become much easier with a little bit of (usually very simple) code. There is a rule that the problems given should not require more than about 60 seconds of CPU-time (this isn't enforced, it's just a guideline that if your code takes longer, consider a new algorithm), but for some of the problems a brute-force approach takes less wall-time (Thinking+Coding+Execution).

Since "brute-force" means lots of CPU-time, I've come up with a few ways to make the best use of the CPU-time available, which means that I want to use both cores in my laptop, and maybe the CPU resources on neighbouring machines. One thing that I've used a lot in Erlang is Stephen Marsh's plists, which is just the best thing since concurrently sliced bread. It implements most of the API from Erlang's stdlib lists like map, fold, and sort, but allows you to specify a malt, which is a description of how you want the problem broken down into concurrent pieces. For example, "Break the list into 10 parts and process them at the same time", or "Spread this work across these ten machines" (!). It even includes a basic mapreduce implementation. How easy this makes concurrency of even simple algorithms just rocks my socks off.

pipelines

In that vein, I've found that a lot of problems in processing large amounts of data can be simplified by forming data-flows or pipelines. This is something you do very frequently in Unix:

cat big_file | sort | uniq | grep 'FAILED' | \
    awk '{print $1 $2}' | fix_failures

Ruby just implemented something similar using generators. The best part about the shell-variety is that those tools are all running concurrently. Sure, some parts might stall the pipeline (like sort, which doesn't continue until it gets all of the data), but even in that case the cat and the sort run concurrently (and the other items do the work of starting, if nothing else). You could stick pv (a small program for viewing data-flows through pipes and providing progress bars for programs that don't have their own) anywhere in there and see the data-flow. Here's a contrived example:

$ dd if=/dev/random | pv -cN dd | bzip2 | pv -cN bzip2 | gzip |     \
  pv -cN gzip | gunzip | pv -cN gunzip | bunzip2 | pv -cN bunzip2 | \
  md5`

     dd: 18.1MB 0:00:11 [1.71MB/s]
  bzip2: 18.1MB 0:00:11 [1.78MB/s]
   gzip: 18.1MB 0:00:11 [1.82MB/s]
 gunzip:   18MB 0:00:11 [1.83MB/s]
bunzip2: 16.4MB 0:00:10 [1.63MB/s]

So at any given time, every one of those programs is producing or receiving. top(1) shows this:

PID COMMAND   %CPU   TIME    #TH #PRTS #MREGS RPRVT  RSHRD  RSIZE  VSIZE
2027 bzip2    54.3%  0:17.79   1    13     27 6604K   240K  6900K    25M
2033 bunzip2  28.0%  0:09.29   1    13     24 3712K   240K  3988K    21M
2025 dd       26.8%  0:08.83   1    13     17   84K   184K   304K    17M
2029 gzip     16.8%  0:05.45   1    13     17  336K   184K   580K    10M
2129 md5       0.5%  0:00.11   1    13     18   96K   184K   312K    18M

Lots of programs are easily expressed conceptually as pipelines, and can do so such that their parts are executed concurrently. Here's an example problem from Project Euler:

39: If p is the perimeter of a right angle triangle with integral length sides, {a,b,c}, there are exactly three solutions for p = 120.

{20,48,52}, {24,45,51}, {30,40,50}

For which value of p < 1000, is the number of solutions maximised?

The program can be expressed as a pipeline (note that, like most Project Euler problems, this can be greatly simplified algebraically; this is not the most efficient solution):

  • generate all possible legs of a right triangle with length<1000
  • if those legs have a valid hypotenuse, calculate it, otherwise discard it
  • remove any whose perimeter exceeds 1,000
  • collect those up in a map of Count->Perimeter
  • go through that map, finding the largest value of Count
  • return it

I can launch all of those steps as processes that run concurrently that alternate between receiving a datum and processing it, and passing it on to the next entry in the pipeline. The representation of these steps as processes is very similar to the Actor Model, and most Erlang programs are already structured this way, just not as directly.

I wrote up a quick pipeline.erl that should make this pretty easy. The basic usage is:

pipeline:run([pipeline:generator(BigList),
              {filter,fun some_filter/1},
              {map,fun_some_map/1},
              {generic,fun some_complex_function/2},
              fun some_more_complicated_function/1,
              fun pipeline:collect/1]).

To apply that to our problem:

projecteuler39.erl

main() ->
  UpTo=1000,
  pipeline:run([%% find all possible legs
                pipeline:generator([ {X,Y}
                                     || X <- lists:seq(1,UpTo),
                                        Y <- lists:seq(1,X) ]),
                %% if the two legs form a right-triangle, calculate
                %% its side and send it
                {generic,fun({X,Y},Pid) ->
                             Z=math:sqrt(X*X+Y*Y),
                             RZ=erlang:round(Z),
                             case RZ==Z of
                               true -> 
                                 Pid ! {X,Y,RZ};
                               false -> ok
                             end
                         end},
                %% we only want ones for which P<1000
                {filter,fun({X,Y,Z}) -> X+Y+Z<1000 end},
                fun perim_collector/1,
                fun find_max_perimcount/1,
                fun pipeline:collect/1]).

find_max_perimcount(Pid) ->
  find_max_perimcount(Pid,{0,0}).
find_max_perimcount(Pid,MaxSoFar={_MaxPerim,MaxCount}) ->
  receive
    eof ->
      Pid ! MaxSoFar,
      Pid ! eof;
    ThisPerimCount={_Perim,Count}  ->
      case Count<MaxCount of
        true ->
          find_max_perimcount(Pid,MaxSoFar);
        _ ->
          find_max_perimcount(Pid,ThisPerimCount)
      end
  end.

perim_collector(Pid) ->
  perim_collector(Pid,dict:new()).
perim_collector(Pid,Dict) ->
  receive
    eof ->
      dict:fold(fun(Perim,Count,_AccIn) ->
                    Pid ! {Perim,Count}
                end,0,Dict),
      Pid ! eof;
    {X,Y,Z} ->
      P=X+Y+Z,
      perim_collector(Pid,
                      dict:update_counter(P,1,Dict))
  end.

It runs in about half the time that the equivalent program written linearly (6 seconds, as opposed to 12, on my dual-core laptop).

pipeline.erl

You can see most of it above, but pipeline.erl has a pretty simple API.

  • run/1 takes a list of FunSpecs and executes them, returning the result of the last entry (usually pipeline:collect/1)
  • create/2 takes a list of FunSpecs and a PID to send the results of the last entry. It's mostly used by run/1, but can be used to create some more advanced pipelines
  • generator/1 will usually be the first entry in the pipeline. It takes a list and injects it into the pipeline
  • collect/1 will usually be the last entry in the pipeline. It collects all of the results into a list and returns that. (Without a collector function, run/1 only returns the first message sent by the last entry)
  • tee/1 can be placed in a pipeline if you want another process to receive a copy of the messages going through it. It can be used as pipeline:tee(spawn_link(fun pipeline:printer/0)) to send those messages to the screen

A FunSpec is a description of a task. It can be one of:

  • {filter,Fun/1} Propagates messages if Fun(Msg) is true
  • {map,Fun/1} Propagates the result of Fun(Msg) for each message
  • {generic,Fun/2} Executed as Fun(Msg,Pid) for each messages received, and is expected to send messages to Pid as appropriate
  • Otherwise, it's a fun/1 that will be launched in its own process, and its PID passed to the previous entry in the pipeline. It will receive messages sent by the previous entry, and 'eof' when the pipeline behind it has emptied. It must propagate eof to Pid or the pipeline will stall indefinitely.

The test/0 function tries to use every piece of the API , so it's a good reference. It would be pretty trivial to extend pipeline.erl to launch multiple workers per entry, or to form more complex pipelines with multiple paths, but that would probably require more knowledge about your application, and this covers the most basic cases. It could be extended to be a little smarter about its message-passing (a stray message will confuse it at the moment), and the version I'm using locally is (but that makes it harder to read, so it's not included here).

-module(pipeline).

-export([create/2,
         generator/1,
         tee/1,
         collect/1,
         run/1,
         printer/0,
         generic_worker/1]).

-export([test/0]).

%% an example of a full pipeline function process
square_server(Pid) ->
  receive
    eof ->
      Pid ! eof,
      ok;
    X ->
      Pid ! X*X,
      square_server(Pid)
  end.

test() ->
  pipeline:run([pipeline:generator(lists:seq(1,10)),
                {filter,fun(X) -> X rem 3 =:= 0 end},
                {map,   fun(X) -> X + 1 end},
                fun square_server/1,
                pipeline:tee(
                  %% spawn a printer in the middle of the pipeline
                  %% so we can watch messages go by
                  spawn_link(fun pipeline:printer/0)),
                {generic,fun(Msg,Pid) ->
                             %% simple example generic_worker that
                             %% duplicates entries
                             Pid ! Msg,
                             Pid ! Msg
                         end},
                fun pipeline:collect/1]).

%% take a pipeline and return the result of running it
run(Operators) when is_list(Operators) ->
  create(Operators,self()),
  receive X -> X end.

%% creates a pipeline that sends results to EndPoint, and returns the
%% first PID in it
create(Operators,EndPoint) when is_list(Operators),
                                is_pid(EndPoint) ->
  lists:foldr(fun(Worker,SendResultsTo) when is_pid(SendResultsTo) ->
                  make_worker(Worker,SendResultsTo)
              end, EndPoint, Operators).

%% returns a pipeline function that injects the items in List into the
%% pipeline
generator(List) when is_list(List) ->
  fun(Pid) ->
      generator(List,Pid)
  end.

%% sends the contents of List, one item at a time, to Pid
generator(List,Pid) when is_list(List),
                         is_pid(Pid) ->
  lists:foreach(fun(Elem) ->
                    Pid ! Elem
                end, List),
  Pid ! eof.

%% a pipeline function that gathers messages until it receives 'eof',
%% and then sends them all as one list. Typically the last item in a
%% pipeline
collect(Pid) when is_pid(Pid) ->
  collect(Pid,[]).
collect(Pid,Acc) when is_list(Acc),
                      is_pid(Pid) ->
  receive
    eof ->
      Pid ! lists:reverse(Acc);
    X ->
      collect(Pid,[ X | Acc ])
  end.


%% returns a pipeline function that duplicates messages it receives,
%% passing them unmodified to both the next item in the pipeline, and
%% to some other PID (stopping when it receives eof, but passing that
%% as well).
%% An example as an item in a pipeline:
%%
%% pipeline:run([pipeline:generator(lists:seq(1,10)),
%%               tee(spawn_link(fun printer/1)),
%%               fun pipeline:collect/1]).
%%
%% That will print the messages as they go over the pipeline without
%% interrupting them.
tee(Pid) when is_pid(Pid) ->
  fun(X) when is_pid(X) ->
      tee(X,Pid)
  end.
%% can't be a generic_worker because it has to propagate 'eof', which
%% generic_workers don't receive
tee(Pid1,Pid2) when is_pid(Pid1),
                    is_pid(Pid2) ->
  receive
    X ->
      Pid1 ! X,
      Pid2 ! X,
      case X of
        eof -> ok;
        X ->
          tee(Pid1,Pid2)
      end
  end.

%% prints (using io:format/2) all messages that the process receives,
%% dying when it receives eof. Typically used with tee/1.
printer() ->
  receive
    X ->
      io:format("~p:~p got ~p~n",[?MODULE,?LINE,X]),
      case X of
        eof -> ok;
        X   -> printer()
      end
  end.

%% -------------------
%%% Internal functions
%% -------------------

%% returns the PID of the pipeline entry described by the FunSpec
make_worker(FunSpec,Pid) when is_pid(Pid) ->
  spawn_link(workerspec_to_fun(FunSpec,Pid)).

%% returns a generic_worker that sends the results of Fun(X) to the
%% next in the pipeline
workerspec_to_fun({map,Fun},Pid) when is_function(Fun,1),
                                      is_pid(Pid) ->
  workerspec_to_fun({generic,fun(Msg,SendToPid) ->
                                 SendToPid ! Fun(Msg)
                             end}, Pid);

%% returns a generic_worker that sends a given message if Fun(X) is
%% true
workerspec_to_fun({filter,Fun},Pid) when is_function(Fun,1),
                                         is_pid(Pid) ->
  workerspec_to_fun({generic,fun(Msg,SendToPid) ->
                                 case Fun(Msg) of
                                   true -> SendToPid ! Msg;
                                   _ -> ok
                                 end
                             end}, Pid);

%% returns a worker wrapped in the receive loop (so it just receives
%% the messages passed and the PID to send results to, that dies on
%% eof and loops on its own)
workerspec_to_fun({generic,Fun},Pid) when is_function(Fun,2),
                                          is_pid(Pid) ->
  fun() -> generic_worker(Fun, Pid) end;

%% the user has written a receive loop and whatnot on their own
workerspec_to_fun(Fun, Pid) when is_function(Fun,1),
                                 is_pid(Pid) ->
  fun() -> Fun(Pid) end.


%% takes a given Fun/1 and returns a pipeline function that does the
%% work of looping and terminating at eof
generic_worker(Fun) when is_function(Fun,1) ->
  fun(Pid) ->
      generic_worker(Fun,Pid)
  end.
generic_worker(Fun,Pid) when is_function(Fun,2),
                             is_pid(Pid) ->
  receive
    eof -> Pid ! eof;
    X ->
      Fun(X,Pid),
      generic_worker(Fun,Pid)
  end.

Comments[2]

Comments:

Pretty sure that's Stephen Marsh's plist library, no?

Posted by Bryan on January 09, 2008 at 06:29 AM PST #

Oops! You're right, I fixed that

Posted by David King on January 09, 2008 at 08:36 AM PST #

Post a Comment:
Comments are closed for this entry.