erlang reddit!

10:29PM Mar 07, 2008 in category Erlang by David King

I launched a reddit for Erlang news, and populated it with recent news in the Erlang world. It's not just an Erlang filter for regular programming.reddit news, it should be able to be an Erlang news source even for those that aren't usually interested in the social features of reddit. For those that are interested in the social features, reddit offers a fantastic platform for comments and other socialising.

Hopefully it takes off and can compliment places like trapexit and erlang-questions in helping Erlang programmers get together and promoting the language and publicising libraries and projects.

iPods are slow

02:24PM Jan 10, 2008 in category TodayILearnt by David King

Today I learnt that the hard drive in my laptop is fast enough that writing out a lot of data to it is faster than writing out a lot of data while compressing it, but the hard drive in my iPod is slow enough that it is significantly faster to compress on the way in (so that less data is written out). So the point where the addition CPU-time of compression is equal to the saved I/O time is somewhere in between a 4800 RPM 60GB iPod drive and a SATA-2 5400 RPM 80GB MacBook drive

Start of the TodayILearnt series

02:21PM Jan 10, 2008 in category TodayILearnt by David King

This is the first entry in a series I'm going to do called TodayILearnt. If you're subscribed to my RSS feed for my Erlang articles, you'll want to change your subscription to point to my Erlang feed unless you're also interested in my inane musings. I won't be cross-posting these to wrotit until I implement series there, since it would just be a jumble of long and short posts.

Today I learnt that there are more people subscribed to my RSS feeds than I thought :)

wrotit status update

09:40PM Jan 08, 2008 in category General by David King

(as always, cross-posted to wrotit)

It looks like Wrotit will devolve into a more generic blogging engine. It won't lose most of its social features (although I'll have to re-decide what those are going to look like; I'm open to suggestions), but I didn't clean up the popularity daemon into a permanent state on time, and I won't be able to do further work on it, so it will lose its current feel.

The popularity daemon had some pretty critical bugs (for instance, there are some circumstances in which voting down an already unpopular article can slightly increase its popularity) that I was in the process of fixing, that I just won't fix in time to declare the code "finished". So the popularity ranking (a la reddit/Digg) won't exist, and it won't have a recommendations system (which is also not in a usable state). It looks like I'll be able to preserve the more technical parts without any trouble, but you probably don't care about those

While I won't be open-sourcing the whole thing in one swoop, I'm happy to entertain questions about how something works.

pipelines in Erlang

09:39PM Jan 08, 2008 in category Erlang by David King

(as always, cross-posted to wrotit)

I've been working a lot on Project Euler problems. Maybe I'm just a geek, but I have fun with that sort of thing. If you're not familiar, it's a list of a maths problems that are generally made easier with some code and a little bit of CPU-time. After solving a problem, you get access to the forum for it, and your "Genius Per Cent" increases. You can see examples at their site, and I have an example below.

Many of these problems are solvable with just a pencil and a piece of paper, but become much easier with a little bit of (usually very simple) code. There is a rule that the problems given should not require more than about 60 seconds of CPU-time (this isn't enforced, it's just a guideline that if your code takes longer, consider a new algorithm), but for some of the problems a brute-force approach takes less wall-time (Thinking+Coding+Execution).

Since "brute-force" means lots of CPU-time, I've come up with a few ways to make the best use of the CPU-time available, which means that I want to use both cores in my laptop, and maybe the CPU resources on neighbouring machines. One thing that I've used a lot in Erlang is Stephen Marsh's plists, which is just the best thing since concurrently sliced bread. It implements most of the API from Erlang's stdlib lists like map, fold, and sort, but allows you to specify a malt, which is a description of how you want the problem broken down into concurrent pieces. For example, "Break the list into 10 parts and process them at the same time", or "Spread this work across these ten machines" (!). It even includes a basic mapreduce implementation. How easy this makes concurrency of even simple algorithms just rocks my socks off.

pipelines

In that vein, I've found that a lot of problems in processing large amounts of data can be simplified by forming data-flows or pipelines. This is something you do very frequently in Unix:

cat big_file | sort | uniq | grep 'FAILED' | \
    awk '{print $1 $2}' | fix_failures

Ruby just implemented something similar using generators. The best part about the shell-variety is that those tools are all running concurrently. Sure, some parts might stall the pipeline (like sort, which doesn't continue until it gets all of the data), but even in that case the cat and the sort run concurrently (and the other items do the work of starting, if nothing else). You could stick pv (a small program for viewing data-flows through pipes and providing progress bars for programs that don't have their own) anywhere in there and see the data-flow. Here's a contrived example:

$ dd if=/dev/random | pv -cN dd | bzip2 | pv -cN bzip2 | gzip |     \
  pv -cN gzip | gunzip | pv -cN gunzip | bunzip2 | pv -cN bunzip2 | \
  md5`

     dd: 18.1MB 0:00:11 [1.71MB/s]
  bzip2: 18.1MB 0:00:11 [1.78MB/s]
   gzip: 18.1MB 0:00:11 [1.82MB/s]
 gunzip:   18MB 0:00:11 [1.83MB/s]
bunzip2: 16.4MB 0:00:10 [1.63MB/s]

So at any given time, every one of those programs is producing or receiving. top(1) shows this:

PID COMMAND   %CPU   TIME    #TH #PRTS #MREGS RPRVT  RSHRD  RSIZE  VSIZE
2027 bzip2    54.3%  0:17.79   1    13     27 6604K   240K  6900K    25M
2033 bunzip2  28.0%  0:09.29   1    13     24 3712K   240K  3988K    21M
2025 dd       26.8%  0:08.83   1    13     17   84K   184K   304K    17M
2029 gzip     16.8%  0:05.45   1    13     17  336K   184K   580K    10M
2129 md5       0.5%  0:00.11   1    13     18   96K   184K   312K    18M

Lots of programs are easily expressed conceptually as pipelines, and can do so such that their parts are executed concurrently. Here's an example problem from Project Euler:

39: If p is the perimeter of a right angle triangle with integral length sides, {a,b,c}, there are exactly three solutions for p = 120.

{20,48,52}, {24,45,51}, {30,40,50}

For which value of p < 1000, is the number of solutions maximised?

The program can be expressed as a pipeline (note that, like most Project Euler problems, this can be greatly simplified algebraically; this is not the most efficient solution):

  • generate all possible legs of a right triangle with length<1000
  • if those legs have a valid hypotenuse, calculate it, otherwise discard it
  • remove any whose perimeter exceeds 1,000
  • collect those up in a map of Count->Perimeter
  • go through that map, finding the largest value of Count
  • return it

I can launch all of those steps as processes that run concurrently that alternate between receiving a datum and processing it, and passing it on to the next entry in the pipeline. The representation of these steps as processes is very similar to the Actor Model, and most Erlang programs are already structured this way, just not as directly.

I wrote up a quick pipeline.erl that should make this pretty easy. The basic usage is:

pipeline:run([pipeline:generator(BigList),
              {filter,fun some_filter/1},
              {map,fun_some_map/1},
              {generic,fun some_complex_function/2},
              fun some_more_complicated_function/1,
              fun pipeline:collect/1]).

To apply that to our problem:

projecteuler39.erl

main() ->
  UpTo=1000,
  pipeline:run([%% find all possible legs
                pipeline:generator([ {X,Y}
                                     || X <- lists:seq(1,UpTo),
                                        Y <- lists:seq(1,X) ]),
                %% if the two legs form a right-triangle, calculate
                %% its side and send it
                {generic,fun({X,Y},Pid) ->
                             Z=math:sqrt(X*X+Y*Y),
                             RZ=erlang:round(Z),
                             case RZ==Z of
                               true -> 
                                 Pid ! {X,Y,RZ};
                               false -> ok
                             end
                         end},
                %% we only want ones for which P<1000
                {filter,fun({X,Y,Z}) -> X+Y+Z<1000 end},
                fun perim_collector/1,
                fun find_max_perimcount/1,
                fun pipeline:collect/1]).

find_max_perimcount(Pid) ->
  find_max_perimcount(Pid,{0,0}).
find_max_perimcount(Pid,MaxSoFar={_MaxPerim,MaxCount}) ->
  receive
    eof ->
      Pid ! MaxSoFar,
      Pid ! eof;
    ThisPerimCount={_Perim,Count}  ->
      case Count<MaxCount of
        true ->
          find_max_perimcount(Pid,MaxSoFar);
        _ ->
          find_max_perimcount(Pid,ThisPerimCount)
      end
  end.

perim_collector(Pid) ->
  perim_collector(Pid,dict:new()).
perim_collector(Pid,Dict) ->
  receive
    eof ->
      dict:fold(fun(Perim,Count,_AccIn) ->
                    Pid ! {Perim,Count}
                end,0,Dict),
      Pid ! eof;
    {X,Y,Z} ->
      P=X+Y+Z,
      perim_collector(Pid,
                      dict:update_counter(P,1,Dict))
  end.

It runs in about half the time that the equivalent program written linearly (6 seconds, as opposed to 12, on my dual-core laptop).

pipeline.erl

You can see most of it above, but pipeline.erl has a pretty simple API.

  • run/1 takes a list of FunSpecs and executes them, returning the result of the last entry (usually pipeline:collect/1)
  • create/2 takes a list of FunSpecs and a PID to send the results of the last entry. It's mostly used by run/1, but can be used to create some more advanced pipelines
  • generator/1 will usually be the first entry in the pipeline. It takes a list and injects it into the pipeline
  • collect/1 will usually be the last entry in the pipeline. It collects all of the results into a list and returns that. (Without a collector function, run/1 only returns the first message sent by the last entry)
  • tee/1 can be placed in a pipeline if you want another process to receive a copy of the messages going through it. It can be used as pipeline:tee(spawn_link(fun pipeline:printer/0)) to send those messages to the screen

A FunSpec is a description of a task. It can be one of:

  • {filter,Fun/1} Propagates messages if Fun(Msg) is true
  • {map,Fun/1} Propagates the result of Fun(Msg) for each message
  • {generic,Fun/2} Executed as Fun(Msg,Pid) for each messages received, and is expected to send messages to Pid as appropriate
  • Otherwise, it's a fun/1 that will be launched in its own process, and its PID passed to the previous entry in the pipeline. It will receive messages sent by the previous entry, and 'eof' when the pipeline behind it has emptied. It must propagate eof to Pid or the pipeline will stall indefinitely.

The test/0 function tries to use every piece of the API , so it's a good reference. It would be pretty trivial to extend pipeline.erl to launch multiple workers per entry, or to form more complex pipelines with multiple paths, but that would probably require more knowledge about your application, and this covers the most basic cases. It could be extended to be a little smarter about its message-passing (a stray message will confuse it at the moment), and the version I'm using locally is (but that makes it harder to read, so it's not included here).

-module(pipeline).

-export([create/2,
         generator/1,
         tee/1,
         collect/1,
         run/1,
         printer/0,
         generic_worker/1]).

-export([test/0]).

%% an example of a full pipeline function process
square_server(Pid) ->
  receive
    eof ->
      Pid ! eof,
      ok;
    X ->
      Pid ! X*X,
      square_server(Pid)
  end.

test() ->
  pipeline:run([pipeline:generator(lists:seq(1,10)),
                {filter,fun(X) -> X rem 3 =:= 0 end},
                {map,   fun(X) -> X + 1 end},
                fun square_server/1,
                pipeline:tee(
                  %% spawn a printer in the middle of the pipeline
                  %% so we can watch messages go by
                  spawn_link(fun pipeline:printer/0)),
                {generic,fun(Msg,Pid) ->
                             %% simple example generic_worker that
                             %% duplicates entries
                             Pid ! Msg,
                             Pid ! Msg
                         end},
                fun pipeline:collect/1]).

%% take a pipeline and return the result of running it
run(Operators) when is_list(Operators) ->
  create(Operators,self()),
  receive X -> X end.

%% creates a pipeline that sends results to EndPoint, and returns the
%% first PID in it
create(Operators,EndPoint) when is_list(Operators),
                                is_pid(EndPoint) ->
  lists:foldr(fun(Worker,SendResultsTo) when is_pid(SendResultsTo) ->
                  make_worker(Worker,SendResultsTo)
              end, EndPoint, Operators).

%% returns a pipeline function that injects the items in List into the
%% pipeline
generator(List) when is_list(List) ->
  fun(Pid) ->
      generator(List,Pid)
  end.

%% sends the contents of List, one item at a time, to Pid
generator(List,Pid) when is_list(List),
                         is_pid(Pid) ->
  lists:foreach(fun(Elem) ->
                    Pid ! Elem
                end, List),
  Pid ! eof.

%% a pipeline function that gathers messages until it receives 'eof',
%% and then sends them all as one list. Typically the last item in a
%% pipeline
collect(Pid) when is_pid(Pid) ->
  collect(Pid,[]).
collect(Pid,Acc) when is_list(Acc),
                      is_pid(Pid) ->
  receive
    eof ->
      Pid ! lists:reverse(Acc);
    X ->
      collect(Pid,[ X | Acc ])
  end.


%% returns a pipeline function that duplicates messages it receives,
%% passing them unmodified to both the next item in the pipeline, and
%% to some other PID (stopping when it receives eof, but passing that
%% as well).
%% An example as an item in a pipeline:
%%
%% pipeline:run([pipeline:generator(lists:seq(1,10)),
%%               tee(spawn_link(fun printer/1)),
%%               fun pipeline:collect/1]).
%%
%% That will print the messages as they go over the pipeline without
%% interrupting them.
tee(Pid) when is_pid(Pid) ->
  fun(X) when is_pid(X) ->
      tee(X,Pid)
  end.
%% can't be a generic_worker because it has to propagate 'eof', which
%% generic_workers don't receive
tee(Pid1,Pid2) when is_pid(Pid1),
                    is_pid(Pid2) ->
  receive
    X ->
      Pid1 ! X,
      Pid2 ! X,
      case X of
        eof -> ok;
        X ->
          tee(Pid1,Pid2)
      end
  end.

%% prints (using io:format/2) all messages that the process receives,
%% dying when it receives eof. Typically used with tee/1.
printer() ->
  receive
    X ->
      io:format("~p:~p got ~p~n",[?MODULE,?LINE,X]),
      case X of
        eof -> ok;
        X   -> printer()
      end
  end.

%% -------------------
%%% Internal functions
%% -------------------

%% returns the PID of the pipeline entry described by the FunSpec
make_worker(FunSpec,Pid) when is_pid(Pid) ->
  spawn_link(workerspec_to_fun(FunSpec,Pid)).

%% returns a generic_worker that sends the results of Fun(X) to the
%% next in the pipeline
workerspec_to_fun({map,Fun},Pid) when is_function(Fun,1),
                                      is_pid(Pid) ->
  workerspec_to_fun({generic,fun(Msg,SendToPid) ->
                                 SendToPid ! Fun(Msg)
                             end}, Pid);

%% returns a generic_worker that sends a given message if Fun(X) is
%% true
workerspec_to_fun({filter,Fun},Pid) when is_function(Fun,1),
                                         is_pid(Pid) ->
  workerspec_to_fun({generic,fun(Msg,SendToPid) ->
                                 case Fun(Msg) of
                                   true -> SendToPid ! Msg;
                                   _ -> ok
                                 end
                             end}, Pid);

%% returns a worker wrapped in the receive loop (so it just receives
%% the messages passed and the PID to send results to, that dies on
%% eof and loops on its own)
workerspec_to_fun({generic,Fun},Pid) when is_function(Fun,2),
                                          is_pid(Pid) ->
  fun() -> generic_worker(Fun, Pid) end;

%% the user has written a receive loop and whatnot on their own
workerspec_to_fun(Fun, Pid) when is_function(Fun,1),
                                 is_pid(Pid) ->
  fun() -> Fun(Pid) end.


%% takes a given Fun/1 and returns a pipeline function that does the
%% work of looping and terminating at eof
generic_worker(Fun) when is_function(Fun,1) ->
  fun(Pid) ->
      generic_worker(Fun,Pid)
  end.
generic_worker(Fun,Pid) when is_function(Fun,2),
                             is_pid(Pid) ->
  receive
    eof -> Pid ! eof;
    X ->
      Fun(X,Pid),
      generic_worker(Fun,Pid)
  end.

Comments[2]

cron.erl

06:42PM Jan 01, 2008 in category Erlang by David King

(as always, cross-posted to wrotit)

My employer may soon interfere with my free-time development of wrotit, so I'm going to start open-sourcing the more generic bits of it so that somebody can make use of it, and so that I can continue to. I understand and respect their concerns about the leaking of trade-secrets into free-time projects, so I'm open-sourcing the bits that could conflict with that and ceasing development on those parts before that could start to happen.

Hopefully I can at least get my mapreduce-type work-distribution system and the popularity algorithm cleaned up and open-sourced. I probably won't have time to do the recommendations algorithm, so I'll just have to drop that bit from wrotit all-together (it's not currently finished). Those are probably the only bits that they care about, so if I open-source them (thereby documenting their state before any conflict could happen) and don't do any more development on them, using them only as-is, I'll probably be fine. If not, then wrotit will devolve into a more generic blogging engine without ranking/voting and may have to do away with some other the other fancy bits.

wrotit is written in Erlang, so if you're interested in the code, that's what it's going to be in.

cron.erl

In the mean time, here is cron, my Erlang module for periodically running tasks.

The problem

In wrotit, something that I do often is write a task that needs to run occasionally at some interval:

  • every N minutes, the popularity calculation is done
  • every hour, expired sessions are cleaned up
  • every hour, the compiled Markdown for the most popular/viewed posts is rendered and cached, and obsolete compiled caches are deleted
  • every five minutes, the cluster work-distribution system checks to see which hosts are available to accept jobs and checks their comparative load
  • etc

Erlang doesn't have a built-in task-scheduling facility (other than timer, which isn't as flexible), and since I wanted these tasks to all be brought up and scheduled with the system, be run every so often, and never overlap, I wrote a simple server to manage these periodic tasks.

The solution

It's basic API is pretty simple

  • start_link()
  • stop()
  • add(JobIdAsAtom, TaskFun, PeriocityInSeconds)
  • remove(JobIdAsAtom)

For example:

(node@host)2> TaskFun=fun() ->
                          io:format("~p~n",
                                    [calendar:universal_time()])
                      end.
#Fun<erl_eval.20.117942162>
(node@host)3> cron:start_link().
{ok,<0.197.0>}
(node@host)4> cron:add(date_printer,TaskFun,10).
ok
{{2008,1,2},{1,46,58}}
{{2008,1,2},{1,47,8}}
{{2008,1,2},{1,47,18}}
{{2008,1,2},{1,47,28}}
{{2008,1,2},{1,47,38}}
{{2008,1,2},{1,47,48}}           
(node@host)5> cron:stop().

There's some slightly more fancy stuff in there (for instance, TaskFun is typically of arity 0, but if it's of arity 1, its return value will be passed into it on its next run, so that it can maintain some state between runs), but that's the meat of it. I use it with a wrapper that asks the work-distribution system for the least loaded node to run it on and runs it there, but other than that, this is how it looks.

the code

(some day I'll move to a Markdown implementation that does syntax highlighting)

%%% Control the periodic running of a given set of tasks
-module(cron).

-behaviour(gen_server).

%% API
-export([start_link/0,start_link/1,start_link/2,
         stop/0,
         add/3, add/4, add/1,
         remove/1,
         info/0,
         wakeup/0,
         run_early/1,
         test_jobs/0]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

-define(log_info(Msg,Args),
        error_logger:info_msg("~p:~p "++Msg,
                              [?MODULE,?LINE | Args ])).
-define(log_warning(Msg,Args),
        error_logger:warning_msg("~p:~p "++Msg,
                              [?MODULE,?LINE | Args ])).
-define(log_error(Msg,Args),
        error_logger:error_msg("~p:~p "++Msg,
                              [?MODULE,?LINE | Args ])).
-define(log_debug(Msg,Args), ok). % ?log_info(Msg,Args)).

-record(state,      {jobs=[],          % all of the job records
                     running=[],       % a list of running_job records
                     wakeup_timer=none % the next-wakeup timer
                    }).
-record(job,        {task,             % fun/0 or fun/1
                     id,               % an atom identifier
                     last_run=0,       % seconds from epoch of last run
                     periocity,        % how often to sleep between
                                       % invocations, in seconds
                     last_state        % the last return of the task, when,
                                       % if task is of arity 1,
                                       % will be passed into
                                       % task/1
                    }).
-record(running_job,{id,               % the job_id
                     start_time,       % when it started (Now time)
                     pid               % its PID when it started
                    }).

%%--------------------------------------------------------------------
%%% API
%%--------------------------------------------------------------------
start_link() ->
  start_link([],[]).
start_link(InitialJobs) ->
  start_link(InitialJobs, []).
start_link(InitialJobs, Options) ->
  gen_server:start_link({local, ?MODULE},
                        ?MODULE,
                        _InitArgs=InitialJobs,
                        Options).

stop() ->
  gen_server:call(?MODULE,stop,infinity).

add(JobId,Task,Periocity) ->
  add(JobId,Task,Periocity,undefined).

add(JobId,Task,Periocity,InitialState) ->
  add(#job{id=JobId,
           task=Task,
           periocity=Periocity,
           last_state=InitialState}).

add(Job=#job{id=JobId,
             task=Task,
             periocity=Periocity})
  when is_atom(JobId),
       (is_function(Task,0)
        or is_function(Task,1)),
       is_number(Periocity) ->
  gen_server:cast(?MODULE,{add,Job});
%% friendly to init/1
add(JobTuple) when is_tuple(JobTuple) ->
  apply(?MODULE,add,tuple_to_list(JobTuple)).

remove(JobId) when is_atom(JobId) ->
  gen_server:call(?MODULE,{remove,JobId}).

info() ->
  gen_server:call(?MODULE,info,infinity).

wakeup() ->
  gen_server:cast(?MODULE,wakeup).

run_early(JobId) when is_atom(JobId) ->
  gen_server:cast(?MODULE,{run_early,JobId}).

%%--------------------------------------------------------------------
%%% gen_server callbacks
%%--------------------------------------------------------------------

init(InitialJobs) ->
  %% we need to be notified when our tasks complete
  erlang:process_flag(trap_exit,true),

  %% if we have any initial jobs, add them all
  lists:foreach(fun add/1, InitialJobs),

  {ok, #state{jobs=[],
              running=[]}}.

%% handle calls, casts, and infos. The choice of when to do which is
%% pretty arbitrary, based on my assumptions of when a programmer
%% expects a response

%% check for jobs that are ready, run them. don't allow multiple
%% simulaneous runs of the same job. Running this too many times isn't
%% that bad, it just wastes a little CPU. It will only run jobs when
%% they are ready
handle_cast(wakeup,
            State=#state{jobs=Jobs,
                         running=Running,
                         wakeup_timer=Timer}) ->
  Now=nnow(),

  clear_wakeup_queue(),

  %% remove the ones that are already running from those that can be
  %% run
  NotRunningJobs=
    [ Job
      || Job <- Jobs,
         not lists:any(fun(#running_job{id=RunningId}) ->
                           Job#job.id =:= RunningId
                       end, Running) ],

  %% split those that we can run from those that we can't
  {EligableToRun,NotEligableToRun}=
    lists:partition(fun(#job{last_run=LastRun,
                             periocity=Periocity}) ->
                        %% it's eligable to run if its next run-time
                        %% is in the past and it's not already running
                        LastRun+Periocity =< Now
                    end, NotRunningJobs),

  case length(EligableToRun) of
    0 ->
      ok;
    Length ->
      ?log_debug("Running ~p jobs (~p)",[Length,[ Job#job.id
                                                  || Job <- EligableToRun]])
  end,

  %% run those we can, and add them to the list of running jobs.
  NowRunning=
    lists:foldl(fun(#job{task=Task,
                         id=Id,
                         last_state=LastState},
                    AccIn) ->
                    TaskWrapper=fun() ->
                                    case Task of
                                      Task
                                      when is_function(Task,0) ->
                                        Task(),
                                        %% avoid sending messages that
                                        %% can be expensive to copy if
                                        %% the function isn't
                                        %% interested in receiving
                                        %% them
                                        exit(ok);
                                      Task
                                      when is_function(Task,1) ->
                                        exit(Task(LastState))
                                    end
                                end,
                    Pid=spawn_link(TaskWrapper),
                    [ #running_job{pid=Pid,
                                   id=Id,
                                   start_time=Now}
                      | AccIn ]
                end,
                Running,
                EligableToRun),

  %% find the amount of time that we have to wait for the next job,
  %% based on the list of jobs that weren't eligable to run (since the
  %% running ones will re-schedule themselves as they complete), and
  %% spawn an alarm to wake us up at that time
  NewTimer=schedule_wakeup(NotEligableToRun,Timer),

  {noreply, State#state{running=NowRunning,
                        wakeup_timer=NewTimer}};

handle_cast({add,Job=#job{id=JobId,
                          task=JobFunction,
                          periocity=Periocity}},
            State=#state{jobs=AllJobs})
  when is_atom(JobId),
       (is_function(JobFunction,0)
        or is_function(JobFunction,1)),
       is_number(Periocity) ->

  NewState=case lists:any(fun(#job{id=Id}) -> % check for duplicates
                              Id =:= JobId
                          end, AllJobs) of
             false ->
               %% when we're done adding it, we need to run this job
               wakeup(),
               State#state{jobs=[ Job | AllJobs ]};
             true ->
               ?log_error("Attempted to add duplicate job ~p",[JobId]),
               State
           end,
  {noreply,NewState};

handle_cast({run_early,JobId},State=#state{jobs=Jobs}) ->
  %% extract the job from the list of all jobs
  {[Job],RestJobs}=lists:partition(fun(#job{id=ElemId}) ->
                                       ElemId =:= JobId
                                   end, Jobs),
  %% make it think that it last ran a long time ago
  NewJob=Job#job{last_run=0},
  wakeup(),
  {noreply,State#state{jobs=[ NewJob
                              | RestJobs ]}}.

handle_call(info,_From,State) ->
  %% just dump out our entire state; since we may change the format of
  %% our state, this should only be used by humans, not attempted to
  %% be parsed
  {reply,
   _Reply=State,
   State};

handle_call(stop,From,State) ->
  {stop,{requested,From},ok,State};

handle_call({remove,Job},From,State)
  when is_record(Job,job) ->

  handle_call({remove,Job#job.id},From,State);
handle_call({remove,JobId},_From,State=#state{jobs=Jobs})
  when is_atom(JobId) ->

  {FoundJobs,RestJobs}=lists:partition(fun(#job{id=ElemId}) ->
                                           JobId =:= ElemId
                                       end, Jobs),
  RemovedJob=
    case FoundJobs of
      [] ->
        ?log_warning("Could not remove job ~p; not found",[JobId]),
        {not_found,JobId};
      [Job] ->
        {ok,Job}
    end,
  {reply,RemovedJob,State#state{jobs=RestJobs}}.

%% we receive 'EXIT' messages when a job dies. Note that we can't tell
%% a crashed job from a completed one, and happily pass the
%% crash-reason to the next invocation as the last_state. This could
%% be changed if it bothers you.
handle_info({'EXIT', FromPid, LastState},
            State=#state{running=RunningJobs,
                         jobs=Jobs})
  when is_pid(FromPid) ->

  %% remove it from the list of running jobs
  {[WasRunning],NewRunningJobs}=
    lists:partition(fun(#running_job{pid=Pid}) ->
                        Pid =:= FromPid
                    end,
                    RunningJobs),

  %% extract the completed job from the list of all jobs and update
  %% its timestamp, then add it back to the list
  NewJobs=case lists:partition(fun(#job{id=ElemId}) ->
                                   ElemId =:= WasRunning#running_job.id
                               end, Jobs) of
            {[Job],RestJobs} ->
              %% update the timestamp and last_state and put it back
              %% on the list
              [ Job#job{last_run=nnow(),
                        last_state=LastState}
                | RestJobs ];
            {[], RestJobs} ->
              %% the job must have been deleted from the list while it
              %% was running. Just move on with the old job-list
              RestJobs
            end,

  %% this is leaves a message in the queue, which will be processed
  %% when we're done here
  wakeup(),

  {noreply,State#state{running=NewRunningJobs,
                       jobs=NewJobs}};

handle_info(Other, State) ->
  ?log_info("Unknown message ~p",[Other]),
  {noreply,State}.

code_change(_OldVsn, State, _Extra) -> {ok, State}.

terminate(Reason, #state{running=[]}) ->
  ?log_info("stopped (~p)",[Reason]),
  ok;
terminate(Reason,
          #state{running=[ #running_job{id=Id,
                                        pid=Pid}
                           | Rest ] }) ->
  ?log_info("Terminating (reason: ~p); waiting on ~p; ~p jobs remaining",
            [Reason,Id,length(Rest)+1]),
  receive
    {'EXIT',Pid,_PidDeathReason} ->
      %% keep calling terminate until we're ready to die. (we discard
      %% the rest of the state here)
      terminate(Reason,#state{running=Rest})
  end.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

nnow() -> calendar:datetime_to_gregorian_seconds(calendar:universal_time()).

%% takes a list of jobs and the current wakeup-timer (if any), and
%% sets a timer for when to run next
schedule_wakeup(NotEligableToRun,CurrentTimer)
  when is_list(NotEligableToRun) ->

  %% cancel any running timer we have
  case CurrentTimer of
    none -> ok;
    CurrentTimer ->
      timer:cancel(CurrentTimer)
  end,

  case NotEligableToRun of
    %% if there are no tasks to run, there's no reason to wake
    %% ourselves up
    [] -> none;
    NotEligableToRun ->
      NextRunTimes=lists:map(fun(#job{last_run=LastRun,
                                      periocity=Periocity}) ->
                                 LastRun+Periocity
                             end, NotEligableToRun),
      NextRun=lists:min(NextRunTimes),
      {ok,TRef}=timer:apply_after((NextRun-nnow())*1000+1,
                                  ?MODULE,wakeup,[]),
      TRef
  end.

%% tries to clear all of the 'wakeup' messages out of the erlang
%% message queue. This might not work in future versions of Erlang or
%% gen_server, but it's just a performance hack anyway
clear_wakeup_queue() ->
  receive
    {'$gen_cast',wakeup} ->
      clear_wakeup_queue()
  after 0 ->
      ok
  end.

test_jobs() ->
  [#job{id=ioer5,
        task=fun() ->
                 ?log_debug("ioer~p~n",[5])
             end,
        periocity=5},
   #job{id=ioer10,
        task=fun() ->
                 ?log_debug("ioer~p~n",[10])
             end,
        periocity=10},
   #job{id=ioer15,
        task=fun() ->
                 ?log_debug("ioer~p~n",[15])
             end,
        periocity=15},
   #job{id=ioer20, % job that uses state
        task=fun(LastState) ->
                 ?log_debug("ioer~p uses state (~p)~n",[20,LastState]),
                 LastState+1
             end,
        periocity=20,
        last_state=0},
   #job{id=ioer30, % job that takes a while to complete
        task=fun() ->
                 ?log_debug("ioer~p (sleeps begin)~n",[30]),
                 timer:sleep(30*1000),
                 ?log_debug("ioer~p (done sleeping)~n",[30])
             end,
        periocity=30},
   #job{id=ioer45, % job that crashes
        task=fun(State) ->
                 ?log_debug("ioer~p crashes (~p)~n",[45,State]),
                 throw(badarg)
             end,
        periocity=45},
   #job{id=ioer60, % job that removes itself after 3 invocations
        task=fun(State) ->
                 ?log_debug("ioer~p removes at state==0~p~n",[60,State]),
                 case State of
                   0 ->
                     %% kill self
                     ?log_debug("ioer~p Oh, what a world!~n",[60]),
                     remove(ioer60);
                   _ -> State-1
                 end
             end,
        periocity=60,
        last_state=3},
   #job{id=ioer60, % a job with an ID that already exists
        task=fun erlang:now/0,
        periocity=60},
   #job{id=ioer61, % a job that will try to remove a non-existant job
        task=fun() ->
                 remove(job_that_doesnt_exist)
             end,
        periocity=61}

   | [ % and a bunch of no-ops
       #job{id=list_to_atom("noop_job_" ++ integer_to_list(X)),
            task=fun erlang:now/0,
            periocity=X}
       || X <- lists:seq(10,10000) ]].

erlyweb and phased_vars IRC tutorial transcript

05:23PM Dec 24, 2007 in category Erlang by David King

(Cross-posted to wrotit)

While I've been promising to publish an example erlyweb app, it's taking me a while to acquire the sufficient round tuits to do so. In the mean time, I gave a quick tutorial on IRC last night on how to use phased_vars. It's really informal, full of bad spelling, and probably not 100% accurate. But since there's a total dearth of erlyweb documentation, here it is:

[00:42] ketralnis_: So a request comes into yaws. Yaws takes that request, and looks at the appmods in yaws.conf to see what app gets that request
[00:43] ketralnis_: Yaws sees that, for this request, erlyweb is responsible. Erlyweb asks the "opaque" parameter (also in yaws.conf) to see what erlyweb app is responsible
[00:43] ketralnis_: Let's call it wrotit
[00:43] ketralnis_: So erlyweb does some initial (very minimal) processing, mostly wrapping up the request coming in from yaws in the yaws_arg
[00:44] ketralnis_: Then erlyweb passes control to wrotit_app_controller:hook/1, which it expects to return an {ewc}-type tuple
[00:45] ketralnis_: So in wrotit_app_controller, I ask erlyweb to determine what the best controller/view is for this request, by saying Ewc = erlyweb:get_initial_ewc({ewc, A1}).
[00:45] ketralnis_: (where A1 is the yaws_arg passed into wrotit_app_controller:hook/1)
[00:46] maddiin: looking at this files now, so far I can follow you.
[00:46] ketralnis_: So an {ewc}-tuple is the type of a set of tuples that are all basically shortcuts for {ewc, controller_module_name, view_controller_name, [Set,Of,Args]}
[00:47] ketralnis_: So erlyweb:get_initial_ewc looks at a request like /post/show/23, and turns it into {ewc, post_controller, post_view, [A, 23]} (where A is again the yaws_arg, which is passed to most controller functions for convenience)
[00:48] ketralnis_: Now, we don't want to just render that page, right? We want to render that page along with a sidebar and some other crap. So instead of returning just that {ewc}, we're going to return a special {ewc}-type tuples, which is the {phased}-type
[00:49] ketralnis_: The {phased} {ewc}-type tuple is a special tuple that is returned to erlyweb, which renders the result, and passes the entire rendered IOlist into a fun/3, which is the third item in the {phased,{ewc,whatever},fun/3} tuple
[00:49] ketralnis_: Is that making sense so far
[00:49] ketralnis_: ?
[00:49] maddiin: yes, so far I´m ok
[00:49] ketralnis_: The whole {phased} thing takes a sec to think about, or at least it did me. It's hard to see it unless you see the development process that ends in it
[00:50] ketralnis_: Okay. So the idea is that we return a tuple like {phased, {ewc, post, show, [A, "23"], fun/3} }
[00:50] ketralnis_: Erlyweb renders that {ewc}, and gets back something like "<h2>My name is bob</h2><p>...</p>" in one big iolist
[00:52] ketralnis_: Then erlyweb passes that whole iolist into the fun, which looks like fun(Ewc, RenderedPage, PhasedVars) -> whatever() end.
[00:52] ketralnis_: Ewc is just the {ewc, post, show ...} from before, and RenderedPage is that big iolist. I'll get to PhasedVars in a moment
[00:53] ketralnis_: So now let's peek into post_controller:show/2.
[00:54] ketralnis_: post_controller is the module that is called by anything that starts with /post/. get_initial_ewc decides that (you could override, in your hook/1)
[00:54] ketralnis_: Controller functions (those called by URL, like in /post/show, "show" is the controller function) are expected to return {ewc}-type tuples
[00:54] ketralnis_: Or (potentially deep) lists of them
[00:55] ketralnis_: So it may return, for instance, [ {data,post:title(Post)}, {ewc, post, comments, [A, Post]}, {ewc, post, blah, [A, Post]}]
[00:56] ketralnis_: Erlyweb takes that returned list, does any further processing, and passes it to post_view, as one list of arguments
[00:56] ketralnis_: As in the music-example that I gave on-list, if you saw that. That's the essence of the component system
[00:57] maddiin: yes, that was very helpful
[00:57] ketralnis_: so post_view:show/1 may look like show([Title, Comments, Blah]) and so on
[00:59] ketralnis_: However, sometimes we want to return information that we don't want passed into the view. Some {ewc}-type tuples do this already, like {ewr}, which is used as a redirect
[01:02] ketralnis_: So when {ewr} is "passed up" to a {phased} ewc, it's not rendered, the redirect is just sent. That might not make sense until you've seen it
[01:03] ketralnis_: But basically, if your hook/1 returns {phased, {ewc, post, show, [A, "23"]}, fun/3}, and post_controller:show/2 returns an {ewr, login, index}, the fun/3 will never be called, erlyweb just never runs it and sends the redirect
[01:04] maddiin: ok, that makes sense
[01:05] ketralnis_: So let's ignore phased_vars for now, and say that now you have your rendered /post/show/23 page, and you're passing it into that fun/3
[01:05] ketralnis_: For me, it looks like:
[01:06] ketralnis_: {phased, Ewc, fun(_Ewc, Data,_PhasedVars) -> {ewc, html_container, index, [A1,{data, Data}]} end}
[01:06] ketralnis_: That's the whole return from hook/1 for me
[01:06] maddiin: mine is the same, I followed your example from the list
[01:07] ketralnis_: So that fun/3 returns *another* {ewc} tuple for the html_container_controller:index/2 function, which is then rendered, passed the rendered "business logic" page (in this case, showing a post
[01:07] ketralnis_: And since you know the component system, you know how that maps into a sidebar or whatever else
[01:08] maddiin: yes
[01:08] ketralnis_: So another {ewc}-type tuple (beyond {ewr} and {phased} ones that I just described) is the {response}-type. This one allows you to attach headers, phased_vars, etc
[01:09] ketralnis_: It *must* be the first-level controller function (that is, the one referred to by the URL, not a sub-component)
[01:09] ketralnis_: And I'm looking for its definition, just one sec
[01:11] ketralnis_: The second element in the {response, Elems} is a list of values that ErlyWeb should return to Yaws verbatim, with the exception of the (optional) {body, Body} for HTML or {body, MimeType, Body} tuple and any of the ewr tuples listed above (ErlyWeb translates the latter into their redirect_local equivalents). If, in addition to returning standard Yaws tuples, you want ErlyWeb to render the response's body using the component's view, you can include the {body, Body} or {body, MimeType, Body} tuple in Elems. Body may be any single ewc or data tuple, or a list thereof. ErlyWeb renders the elements of Body, sends the result to the view function, and embeds the resulting iolist in an {html, Iolist} tuple prior to returning it to Yaws
[01:11] ketralnis_: To quote the documentation
[01:12] ketralnis_: That's old though, let me find a newer copy that has the phased_var stuff
[01:12] ketralnis_: But it gives you a general idea while I hunt down (or write) an example
[01:14] ketralnis_: Okay, looks like I'll have to write one real quick
[01:15] maddiin:
[01:17] ketralnis_: http://www.mail-archive.com/erlyweb@googlegroups.com/msg00132.html
[01:17] ketralnis_: See that
[01:18] ketralnis_: So the idea is that from the top-level controller function, you can return something like:
[01:18] maddiin: yes, I remember that from the title and other non component data topic
[01:18] ketralnis_: {response, [{phased_vars, [{title, "The title!"}], [{body, [{ewc, my_component1},{ewc,my_component2}]}]}
[01:19] ketralnis_: So then your controller-function is rendered, and everyone is happy. Meanwhile, based in {phased}-land, the fun/3 is waiting around....
[01:20] ketralnis_: So the fun/3 gets passed (1) the {ewc} itself from the request {ewc, post, show, [A, "23"]} (2) The entire rendered IOlist "<h2>My post!</h2>..." and (3) a proplist containing everything that was passed in the phased_vars bit of the {response}-tuple returned from the top-level controller function (the one referred by the URL)
[01:21] ketralnis_: That proplist looks like [{title,"The title!"}] in the example above, or [{title,"foo"}, {meta,"bar"}] in the one on the link above
[01:22] ketralnis_: The erlang in-built "lists" and "proplist" modules both contain useful functions for manipulating proplists
[01:23] maddiin: have to look at the proplist, I only know about the lists, but its getting clearer now
[01:23] ketralnis_: So you could do something like:
[01:24] ketralnis_: fun(_Ewc,RenderedPage,PropList) -> {ewc, html_container, index, [A, RenderedPage, _Title=proplists:get_value(title,PropList)]} end
[01:25] ketralnis_: Then html_container_controller:index/3 would have a head like index(A,RenderedPage,Title) -> [{data,Title},{data,RenderedPage}].
[01:25] ketralnis_: And a view like
[01:26] ketralnis_: <%@ index([Title, RenderedPage]) %><html><head><title><% Title %></title></head><body><% RenderedPage %></body></html>
[01:26] ketralnis_: Except with whitespace, sidebars, etc
[01:27] maddiin: let my try this
[01:27] ketralnis_: And of course SANITISED I left that part out, but it's really important. Literal data included on a page that you got from a user (like the title of their blog entry) should *aways* be sanitised, escaped, etc
[01:28] maddiin: yes, I know of that, <script>alert('doh-nuts')</script> :=)
[01:28] ketralnis_: The component system makes funding *where* to do that much easier. I have a module called "q" with things like q:h/1 and q:date/1 that escape those, with short module-name and function names so that i remember to type them )
[01:28] ketralnis_: RIght
[01:29] ketralnis_: q:h/1 escapes HTML, q:date/2 formats dates, q:link/2 makes links properly sanitised, etc
[01:29] ketralnis_: There's a copy of it in the tictactoe server that I'll release cleaned up realsoonnow
[01:29] maddiin: great
[01:31] ketralnis_: Whew. That was a lot of typing. But I hope it clarified things. I fear that understanding any of this requires listening to the arguments with Yariv to understand *why* it's there, which doesn't reek of good design
[01:31] maddiin: woohoo
[01:32] maddiin: have my title set
[01:32] ketralnis_: Yaaay
[01:33] maddiin: thank you so much, I have to read what you wrote twice, but this was another breakthrough in understanding after the component system
[01:34] ketralnis_: I hope I went into enough detail. I only understand anything after reading the code that actually does it, which is why I started way back at how yaws gets requests to erlyweb and whatnot
[01:34] ketralnis_: I guess i could have just started with the example and worked backwards far enough and skipped most of that text
[01:35] maddiin: yes, you gave me a good insight, example was not needed, I went through a lot experimenting
[01:37] maddiin: I have to read the code as well and learn more of erlang... but now I have a lot more motivation again

Comments[1]

wrotit.com first-stage launch

12:35PM Nov 03, 2007 in category Erlang by David King

I've done the first bit of a launch for wrotit.com, a social site for writers inspired by reddit. (I've cross-posed this entry there, too)

Writers can get together, write articles, vote on them, get recommended reading based on past likes/dislikes, and have threaded conversations. I have the basics of the application written, and now I'm focussed on ironing out the bugs and getting a workable CSS design (any volunteers?) before I start accepting general signups (although I can give you an account if you personally email me and ask; I'd like some users for a limited beta to give suggestions and find bugs).

Simplicity is a major goal. The expected feature-list isn't that long, but eventually I'll want simple things like RSS and tags, and to have articles belong to a series. Articles are written in Markdown, although I'm still trying to make a decision as to whether to allow raw HTML or other markup languages. I hope to have a good recommendations algorithm as a way to find new content that fits a user's tastes.

Since it's a free-time project, I expect that it will be a month or two before I have a version with which to go truly "live", but I'm well on my way.

The application is written in Erlang running under Yaws. I started out using Erlyweb, but I find myself re-writing most of the framework in large chunks as I find that it just doesn't meet my needs. ErlTl isn't bad, but is missing some pretty important functionality in my book.

Erlang has proven to be a very powerful language in writing it. It's allowed me to split the application between two not-so-powerful machines (one dual-1.0ghz Via, one single-core 1.25ghz PowerPC; I'm currently hosting it out of my home-office) and still get a rather performant application. In my tests, mnesia has scaled very well for my seed data of tens of millions of entries, and I'm overall impressed by OTP and Yaws.

So this isn't a launch announcement, but it's a short preview into where I'm going with it :)

Comments[1]

Supervision Trees and Ports in Erlang

08:31PM Sep 03, 2007 in category Erlang by David King

Erlang provides a whole lot of infrastructure for doing tasks commonly associated with building giant fault-tolerant systems. But what about when you have to talk to an external non-Erlang program? Can we make our communication to that program fault-tolerant using only native Erlang/OTP components?

The problem

I'm writing a blogging engine using Erlyweb and Mnesia, and one of my tasks is to pass the user's blog entry through a formatter. I use two: one for Markdown, and one to sanitise for XSS attacks. Since there's no Markdown or HTML-sanitiser implementation for Erlang, we need to communicate with the Perl implementations (markdown, HTML-TagFilter).

We'll want some fault-tolerance (in case one of our external programs crash, we want to re-launch it), and we don't want to introduce a bottleneck by serialising access to our external programs, so we'll launch several copies of it to spread the load and allow some concurrency. In addition, we'll want to leave room to move all of the processing to its own node in the event that our blog gets very popular.

We'll communicate using an Erlang Port. A Port is a connection to an external program, implemented using Unix pipes. So our external program just has to know how to read/write to stdout/stdin. We're going to keep this all pretty generic to the actual external program. You should be able to do the same for any other program that can take data in length-encoded packets (or can be wrapped in one that does, like our Perl wrapper) for which you want to launch mutliple workers.

To launch our workers, we'll use Erlang's gen_server interface, which is a way to write a generic server without having to write the implementation details of the server, keeping state, message-handling and responding, etc. Instead, you write a gen_server with a set of callbacks, and all you write is the code to actually handle the messages that you want to respond to. Bascially, gen_server is a module that can take another module with appropriate callbacks and handle all of the tasks of making it a server that looks more or less like (psuedo-code)

    loop(State) ->
      Message=receive Some_Message
      if Message is an internal message, like shutdown or startup or code-replacement
        let gen_server handle it
        loop(State)
      else it's a message that the programmer wants to respond to
        pass it to the programmer, let them handle it, get new state
        loop(NewState)
  
Our gen_server will have to keep track of its connection to our external program, and on request it will have to send data to the external program and retrieve the result. It will also keep track of its process group (pg2) and adding itself to it

Erlang does something similar for supervisors. A supervisor is a process whose entire job is to watch another process or set of processes. In our case, we just want it to make sure that our external processes are up and running, and to re-launch them in case they crash, and to bring up all of our workers at once so that we don't have to launch them individually. We'll have one supervisor for Markdown which will watch all of our Markdown gen_servers, and another for all of our sanitiser gen_servers. To make managing them easier, we'll use another supervisor that will watch those two supervisors, re-launching them if they crash, and enabling us to bring both up, and all of the associated gen_servers, at once. So the plan is to have a formatter supervisor that will launch a markdown supervisor and a sanitiser supervisor, each of which will launch N workers, like this:

I'll show you working code, and try to explain the fuzzy bits, but I encourage you to research the functions that compose it in Erlang's own docs so that you really know what is going on.

external_program.pl

Since all of this relies on an actual external program, here's a shell to use for a Perl program:

 1 #!/usr/bin/env perl -w
 2 
 3 # turn on auto-flushing
 4 $cfh = select (STDOUT);
 5 $| = 1;
 6 select ($cfh);
 7 
 8 sub process {
 9   return $_[0];
10 }
11 
12 until(eof(STDIN)) {
13   my ($length_bytes,$length,$data,$return);
14 
15   read STDIN,$length_bytes,4; # four-byte length-header
16   $length=unpack("N",$length_bytes);
17 
18   read STDIN,$data,$length;
19 
20   $return=&process($data);
21 
22   print STDOUT pack("N",length($return));
23   print STDOUT $return;
24 }

It communicates with Erlang via stdin/stdout, and handles packets with 4-byte length headers (with network byte-order, since that's how Erlang sends it). That is just a stub program that returns the data coming in to it, but you can replace process with whatever you want to process/format the data. I have two versions of this, one that applies markdown, and one that sanitises input with HTML-TagFilter.

external_program.erl

Now the for the Erlang bits. First, we'll construct the gen_server to actually communicate with the Perl program. We'll communicate using a Port (which uses a unix pipe). We want to send data in atomic packets so that the Perl program knows when we're done sending data and so that we know when the Perl program is done responding. To do this, we'll send and receive data in packets of up to 232 bytes. That is, we'll send our Perl program a four-byte header (in network byte-order) indicating how much data is to be sent, then we'll send the data, and Perl will send back a four-byte header indicating how much data it is about to send, and then it will send that data. Erlang will handle the packets and headers on its side with the {packet,4} option to open_port/2, but we'll have to handle the packetising ourselves from the Perl side (already done in the above Perl program). This one is pretty generic, so you could use it to communicate with any outside program that can send data in packets, not just a formatter. Here's the code:

 1 -module(external_program).
 2 -behaviour(gen_server).
 3 
 4 % API
 5 -export([start_link/3,
 6          filter/2]).
 7 
 8 % gen_server callbacks
 9 -export([init/1,
10          handle_call/3,
11          handle_cast/2,
12          handle_info/2,
13          code_change/3,
14          terminate/2]).
15 
16 -record(state,{port}).
17 
18 start_link(Type,Id,ExtProg) ->
19   gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
20 
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22   {stop, {port_terminated, Reason}, State}.
23 terminate({port_terminated, _Reason}, _State) ->
24   ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26   port_close(Port).
27 handle_cast(_Msg, State) ->
28   {noreply, State}.
29 code_change(_OldVsn, State, _Extra) ->
30   {ok, State}.
31 
32 init({Type,ExtProg}) ->
33   process_flag(trap_exit, true),
34   ok=pg2:create(Type), % idempotent
35   ok=pg2:join(Type,self()),
36   Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37   {ok, #state{port = Port}}.
38 
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40   port_command(Port,Data),
41   receive {Port,{data,Data2}} ->
42       {reply,Data2,State}
43     end.
44 
45 filter(Type,Data) ->
46   gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).

And here's the blow-by-blow. Skip ahead to externalprogram_supervisor.erl if you understand it. First, we start our module with

     2 -behaviour(gen_server)
This tells the compiler to warn us if we forget any functions for call-back that gen_server is expecting to find. I won't introduce the exports here, I'll introduce the functions we come to them.

We use a record to encode our state, even though the only item in our state is the connection to the external program

    16 -record(state,{port}).
We do this in case we later decide to add anything to the state, it will save us from modifying all of the function headers that refer to the state. Remember that our entire state between calls must go into this record, so if we want to later add information like the time the program was launched (for statistical tracking), or the system user that launched it, this is where it goes.

start_link/3 is how we'll start our gen_server. It's the function that we call from another module (our supervisor), not one called as a gen_server callback, so it can look however we want, but our supervisor is expecting it to return {ok,Pid}.

    18 start_link(Type,Id,ExtProg) ->
    19   gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
It calls gen_server:start_link/4, which takes:
  • {local,Id}: This is how gen_server will register the PID of the server. local means that it will only be registered on the local node, and Id is an atom passed to us by our supervisor that will appear in process listings. We don't use this anywhere, it's just nice to have for viewing in appmon and other tools, and it's optional (gen_server:start_link/3 takes only the other three arguments)
  • the call-back module (i.e. this module)
  • the arguments to the gen_server's init/1 function (which we'll get to)
  • any special options to gen_server (we don't use any)
It returns {ok,Pid}, where Pid is the Pid of the launched process. While we used gen_server:start_link/4, which takes registered name for the gen_server, we don't use it to address the gen_server. Instead, we're going to use the pg2 group name that we set in init/1 below. This is because we're going to launch several copies of this gen_server, so communicating directly with it isn't practical.

handle_info/2 is a callback function that is called whenever our server receives a message that gen_server doesn't understand.

    21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
    22   {stop, {port_terminated, Reason}, State}.
The only message that we handle is the Port indicating that it has shut down (for instance, if it crashes). In this case, there's nothing that we can do anymore (since our whole reason for being is to communicate with this Port), so we signal to our caller that we intend to shut down, and gen_server will shut us down, which includes calling our terminate/2 function and do other shutdown tasks.

terminate/2 is a callback function that gen_server calls whenever it is shutting us down for any reason so that we can free up any resources that we have (like our Port). It passes us the Reason for shutdown and the last State of the server

    23 terminate({port_terminated, _Reason}, _State) ->
    24   ok;
    25 terminate(_Reason, #state{port = Port} = _State) ->
    26   port_close(Port).
We have two clauses to handle a shutdown. If we are shutting down because our port terminated (because handle_info said so), then we just let gen_server finish shutting us down. If we shut down for any other reason, then we close the Port before we die so that it's not left open forever. terminate's return value is ignored.

handle_cast/2 handles asynchronous messages to our server

    27 handle_cast(_Msg, State) ->
    28   {noreply, State}.
gen_servers can handle two types of in-band messages: synchronous messages generated by calls to gen_server:call/2, and asynchronous messages generated by gen_server:cast/2. (gen_server handles the difference.) Since we're only handling synchronous messages, we just don't return a reply if we receive any asynchronous messages, but if you really wanted you could make a synchronous call and send it as a response

code_change/3 is called when our code is being dynamically replaced

    29 code_change(_OldVsn, State, _Extra) ->
    30   {ok, State}.
Erlang supports replacing code while the server is running, with no downtime. In the event that the code is replaced, a gen_server temporarily stops responding to messages (enqueueing them as they come in), waits for the code to be replaced, calls the call-back module's new code_change function, picks up the new state, and starts running again. This could be used if we were to upgrade our code to one that required a new member of the state record, for instance, to upgrade the running state to the state that the new version uses. (If the function returns anything but {ok,NewState}, the gen_server crashes, which would be fine in our case since our supervisor would just restart it under the new code anyway.) Unless we plan to convert the state of the server, which for now we don't, we just return {ok,State}

init/1 is called by gen_server just before the server-loop is run.

    32 init({Type,ExtProg}) ->
    33   process_flag(trap_exit, true),
    34   ok=pg2:create(Type), % idempotent
    35   ok=pg2:join(Type,self()),
    36   Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
    37   {ok, #state{port = Port}}.
Its argument comes from gen_server:start_link, which we called in our own start_link/3 function, and is our Type (which becomes our pg2 group name) and the path to the external program that we want to launch (the Perl programs mentioned above). We first indicate that we want to trap exits, so that we can receive 'EXIT' messages from our Port in case it dies. These messages are not recognised by gen_server, so they get passed to our handle_info/2 callback function. Then we create our pg2 group (this operation is idempotent, which means that doing it several times has the same effect as doing it once), and join this process to the group. Then we open up the external program itself using open_port. It takes a tuple of {Type,Path}, which we use to indicate that we are launching an external program (the spawn type) and where to find it, and a list of options, which we use to tell it that we want to communicate using binaries instead of lists, and that we want it to handle communication to our Port with packets with a length-header of four bytes. Then we tell gen_server that all is well and to start the server with a state record containing a reference to our Port.

handle_call/2 is called whenever we receive a synchronous request from an outside caller, this is the actual guts of our server.

    39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
    40   port_command(Port,Data),
    41   receive {Port,{data,Data2}} ->
    42       {reply,Data2,State}
    43     end.
We just receive the command (which is a {filter,Data} tuple; it doesn't have to be, but that's how we're going to call it later), send it to the Port, get the reply from the Port, and tell gen_server to reply that back to the caller. We didn't have to write any of the server code, just the implementation of our code.

Finally, here's how we actually communicate with our server after it's launched:

    45 filter(Type,Data) ->
    46   gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
We make a synchronous call to the "closest" Pid that is a member of the pg2 group that our programs join when launching. ("Closest" just means that pg2 will first look on the local node before looking on other nodes.) pg2:get_closest_pid randomises which worker it returns within a group, so this should spread the load if we get many simultaneous requests. This could be expanded to try again in the event of a timeout (assuming that the timed-out server will crash and be re-launched, or that pg2:get_closest_pid will return a different Pid next time)

At this point you should be able to create and call a gen_server

externalprogram_supervisor.erl

Now for the supervisor. Again, we'll keep this generic. It handles multiple external_program workers, launching NumWorkers (passed to start_link/3). We'll use the (aptly named) supervisor behaviour. supervisor launches a group of processes, watches them, and re-launches them if they die. It also provides a way to bring up or down a group of processes all at once, just by launching or shutting down the supervisor. I'm going to explain the functions of order for clarity.

 1 -module(externalprogram_supervisor).
 2 -behavior(supervisor).
 3 
 4 % API
 5 -export([start_link/3]).
 6 
 7 % supervisor callbacks
 8 -export([init/1]).
 9 
10 start_link(Type,NumWorkers,ExtProg) ->
11   supervisor:start_link({local,
12                           list_to_atom(atom_to_list(Type) ++ "_supervisor")},
13                         ?MODULE,
14                         {Type,NumWorkers,ExtProg}).
15 
16 init({Type, NumWorkers,ExtProg}) ->
17   {ok,
18    {{one_for_one,
19      3,
20      10},
21     [begin
22          Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23          {Id,
24           {external_program,
25            start_link,
26            [Type,Id,ExtProg]},
27           permanent,
28           _Timeout=10*1000,
29           worker,
30           [external_program]}
31        end
32      || Which <- lists:seq(1,NumWorkers)]}}.
33 

Again, -behaviour(supervisor). tells the compiler to warn us if we forget any callback functions required by supervisor.

supervisor only has one callback, init/1:

    16 init({Type, NumWorkers,ExtProg}) ->
    17   {ok,
    18    {{one_for_one,
    19      3,
    20      10},
    21     [begin
    22          Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
    23          {Id,
    24           {external_program,
    25            start_link,
    26            [Type,Id,ExtProg]},
    27           permanent,
    28           _Timeout=10*1000,
    29           worker,
    30           [external_program]}
    31        end
    32      || Which <- lists:seq(1,NumWorkers)]}}.
It's only called once when the supervisor is created (or re-started after crashing). It gets its arguments from supervisor:start_link, which we call from our local start_link (that we'll explain in a moment). init is expected to return a restart strategy and a list of Childspecs, where a Childspec looks like (straight from the Erlang supervisor documentation):
    {Id,StartFunc,Restart,Shutdown,Type,Modules},
      Id = term()
      StartFunc = {M,F,A}
         M = F = atom()
         A = [term()]
      Restart = permanent | transient | temporary
      Shutdown = brutal_kill | int()>=0 | infinity
      Type = worker | supervisor
      Modules = [Module] | dynamic
        Module = atom()
You can read up on what all of that means in the supervisor documentation, but we return a list of them consisting of NumWorkers items (see the list-comprehension) such that each is launched by calling external_program:start_link/3 (specified by the StartFunc in the Childspec), passing each worker their Id, Type and ExtProg. The Id just an atom that we generate from the Type and the worker-number (which is just determined by their launch-order) that you'll recall being passed to gen_server to register the worker's PID so that in debug listings we can see markdown3 instead of <0.15.3>

The supervisor itself is created by calling our local start_link/3, which calls supervisor:start_link/2 (which actually creates the supervisor, calls our init, etc). This supervisor will be called from formatter_supervisor, defined below, but you could call it done at this point and have a working implementation of an N-worker external program.

    1> externalprogram_supervisor:start_link(my_server,5,"/usr/local/bin/external_program.pl").
    <0.1976.0>
    2> pg2:get_members(my_server).
    [<0.1981.0>,<0.1980.0>,<0.1979.0>,<0.1978.0>,<0.1977.0>]
    3> gen_server:call(pg2:get_closest_pid(my_server),{filter,<<"This is some random text!">>}).
    <<"This is some random text!">>
  

formatter_supervisor.erl

Our original goal was to have N workers running for two external programs. So far all of the code we've written has been generic to the external program, but now we're going to get into (very slightly) more specific stuff. The formatter_supervisor will be a supervisor that will launch and watch two supervisors, one for Markdown and one for our santiser. They'll both run the same externalprogram_supervisor code, they'll just be launched with different arguments.

 1 -module(formatter_supervisor).
 2 -behavior(supervisor).
 3 
 4 % External exports
 5 -export([start_link/0]).
 6 
 7 % supervisor callbacks
 8 -export([init/1]).
 9 
10 start_link() ->
11   supervisor:start_link(?MODULE, '_').
12 
13 % formatters are assumed to create pg2s registered under their
14 % type-names (at the moment, markdown and sanitiser)
15 init(_Args) ->
16   {ok,
17    {{one_for_one,
18      3,
19      10},
20     [{markdown_supervisor,
21       {externalprogram_supervisor,
22        start_link,
23        [markdown,
24         myapp_config:markdown_workers(),
25         myapp_config:markdown_program()]},
26       permanent,
27       _Timeout1=infinity,
28       supervisor,
29       [externalprogram_supervisor]},
30      {sanitiser_supervisor,
31       {externalprogram_supervisor,
32        start_link,
33        [sanitiser,
34         myapp_config:sanitiser_workers(),
35         myapp_config:sanitiser_program()]},
36       permanent,
37       _Timeout2=infinity,
38       supervisor,
39       [externalprogram_supervisor]}]}}.

That looks almost the same as our externalprogram_supervisor, except that the ChildSpecs returned by init are different. We don't have a local version of start_worker_link, we just call externalprogram_supervisor:create_link (since there's nothing we need to do with it after launching it). The ChildSpec list looks like this:

    20     [{markdown_supervisor,
    21       {externalprogram_supervisor,
    22        start_link,
    23        [markdown,
    24         myapp_config:markdown_workers(),
    25         myapp_config:markdown_program()]},
    26       permanent,
    27       _Timeout1=infinity,
    28       supervisor,
    29       [externalprogram_supervisor]},
    30      {sanitiser_supervisor,
    31       {externalprogram_supervisor,
    32        start_link,
    33        [sanitiser,
    34         myapp_config:sanitiser_workers(),
    35         myapp_config:sanitiser_program()]},
    36       permanent,
    37       _Timeout2=infinity,
    38       supervisor,
    39       [externalprogram_supervisor]}]}}.
Nothing too special there, except that we pass in the Type atom to externalprogram_supervisor:start_link (markdown or sanitiser), which becomes the name of the pg2 group. The shutdown timeout is set to infinity for supervisors of supervisors (as recommended by the Erlang docs) to give the indirect children all time to shut down, and we've indicated that the worker type is supervisor, so that Erlang knows that we're building a supervision tree. Of course, the arguments to externalprogram_supervisor:start_link assume that myapp_config:markdown_workers/0 and friends have been defined, but you could just replace those with literal values, too. myapp_config:markdown_workers/0 is assumed to return an integer (in my case, five), which is how many copies of the program you want to launch, and myapp_config:markdown_program/0 returns the full path to the program to launch (like "/usr/local/myapp/lib/markdown.pl"). formatter_supervisor is the supervisor that will be started from my application's main supervisor.

format.erl

The actual communication with the programs should be pretty easy now, we just ask external_program to do its magic. Here it is:

 1 -module(format).
 2 
 3 % formatters are defined and launched by the formatter_supervisor
 4 
 5 -export([markdown/1,
 6          sanitise/1]).
 7 
 8 markdown(Data) ->
 9   external_program:filter(markdown,Data).
10 
11 sanitise(Data) ->
12   external_program:filter(sanitiser,Data).

It should work with lists or a binaries. Add a use Text::Markdown; to the top of our Perl module module (at the beginning of this entry) and replace its process function with:

10 sub process {
11   return Text::Markdown::markdown($_[0]);
12 }

Launch our formatter_supervisor with formatter_supervisor:start_link() and now we have a working Markdown implementation:

  (name@node)106> format:markdown("I *really* like cookies").
  <<"<p>I <em>really<em> like cookies<p>\n">>

Edit 2007-09-06 18:45: I received some advice from Paul Mineiro on a better way to abstract the pg2 groups, and updated the article to better reflect that

Comments[1]

Too much stuff

02:46PM Aug 11, 2007 in category Personal by David King

I just got rid of nearly half of my junk in the most efficient and positive way possible.

The problem

About eighteen months ago, I moved in to my current house. We were slated to start moving in on the first of February, so I took a week off of work which would have given us nine days to move and unpack. But the previous occupants of the house didn't actually leave the house until the fourth, which gave us only five days. Since we had to combine two households (this was my girlfriend and I moving in together), there was a lot of stuff to move and unpack, so most of the stuff got moved into the garage, but not unpacked. We figured that we'd unpack as we got around to it. Of course, we never actually did that. So we had half a garage full of crap that sat around not doing much.

Over a year and a half, we accumulated more stuff in the garage. Toys that the boy outgrew or stopped playing with, computer hardware that I replaced, clothes that wore or was outgrown (I've gained some weight since I moved, and the boy has grown from three years old to five). In the mean time, we acquired things around the house, like a giant bowl-type chair in the living room, appliances that we used once or twice, books that we finished, and other miscellany. While it's nice to have things like that, we didn't have the room for all of it, and it made it clumsy to get around the house and impossible to get around the garage. And it's a self-feeding problem: as I forget about something useless in the garage, but think I need it, I buy another one, and then soon I have two of these useless things in the garage. It's not just me, of course, Nearly everybody that I speak to has the same problem, but doesn't regard it as a problem, they just think that they have a lot of stuff, and having a lot of stuff means that you're well-off, right?

The solution

About a week ago, the stuff in the garage spilt over and blocked my access to our mailbox. That was the last straw. I was just overwhelmed by all of my stuff. I hadn't even seen most of it in eighteen months, so clearly I didn't need it that badly. So I posted an ad on Craigslist, indicating that the next Saturday at noon (today, as I write this) there would be a whole lot of stuff on my lawn, and that "if you can haul it, you can have it." There were books, lots of computer hardware (most of it relatively up-to-date), furniture, clothes, kids toys, dishes, appliances, other miscelaneous household stuff, and basically everything in my garage that I hadn't seen in a month or more.

At first, the post was flagged as a scam by a Craigslist moderator, but a re-post with "yes, I'm really giving away my stuff, I'll be at the house, and you can call me at 555-..." and it was accepted. Within hours, we had people calling us, asking if we had specific items. We told them that we didn't know, and that the reason we were giving it away was that we didn't want to go through it all.

We wanted to start today at noon, so we started unloading the garage and things around the house at 9:00. By 10:30, we already had people showing up to "get in on the good stuff." As time went on, people that noticed that we were leaving the house with more stuff would start to ask which rooms we were working on, and which rooms were left. "I'll hang around until you finish with your office; I want some more monitors". People showed up from all across the Bay area, some driving more than an hour to get here. Some called ahead, saying that they would be late, but wanted us to set aside an item (like a bed-frame for a guest room) for them. Some wanted specific items, some just wanted to see what we had, and at least one just came to see the spectacle. One family that came (and filled the back of their SUV to the brim) sounded like they were going to try to sell some of it at a flea-market in a few hours (and judging by the clothes their many kids were wearing, they needed the money more than I did). Some teenagers came by and took my my BDU's from the Army for paintballing, and a coleman stove for camping. Even the ice cream man stopped when going by, and picked up a device I had out there for recharging the freon in air-conditioners.

By 11:50 (ten minutes before we thought anyone would even start showing up), everything was gone but some clothes (which we donated to a local charity later that day). The front lawn, which was previously that day fully covered in piles of unwanted things, was empty.

Everybody wins

We got rid of a bunch of stuff that we didn't need or want, and people from all over the Bay Area ended up with things that they needed or wanted. If we'd have held a garage sale, we'd probably have made $50, maximum, and have ended up with most of the stuff still here (who travels an hour to buy someone else's crappy stuff?). Selling the stuff on eBay may not even have worked, and would have taken months to get rid of it all. If we'd have donated it to Goodwill, we'd have had to make several trips (our Goodwill doesn't do pick-ups) that would have taken all weekend. The dump would have cost us $27/load.

All-in-all, this was a great idea, and my house feels so much more liveable now. I'd highly recommend it to anyone that's feeling cluttered or has a garage full of things they haven't seen in a year. It probably only works if you actually have useful stuff that you just can't use (as opposed to a bunch of trash), but if that's you, consider giving it all away en-masse. I don't miss it.