Running rings around Erlang

8

I wrote some Erlang code the other day to spin up a ring of processes (which are like super-lightweight green threads) and send messages around the ring. I’ve already done one full rewrite of this code but I suspect it’s still probably not very idiomatic Erlang code. I’d love to see someone else’s example or any comments from Erlang gurus out there.

There are three basic steps to this run:

  1. Start the processes in the ring and remember their Pids (process IDs)
  2. Connect the processes into a ring
  3. Send messages around the ring

I’ve kind of combined #1 and 2 into the same function to start up the ring. I suspect there’s probably some nifty way to use a map or foldl to do steps 1 and 2 at the same time but my feeble brain couldn’t figure out how to do the special case to connect the ring when it wraps around.

Anyhow, here’s the startRing/2 function which 1) spawns the nodes and 2) connects them:

-module(ring).
-import(lists, [last/1]).
-export([startRing/2]).

%% returns first pid in the ring of size N
startRing(N,TimerPid) ->
Pids = spawnNodes(N, [], TimerPid),
connectNodes([last(Pids)|Pids]).

spawnNodes(0, Pids, _TimerPid) -> Pids;
spawnNodes(Remaining, Pids, TimerPid) ->
Pid = spawn(fun() -> unconnectedNode(TimerPid) end),
spawnNodes(Remaining-1, [Pid|Pids], TimerPid).

%% Connect F and S until there are no nodes left to connect
connectNodes([F|[S|T]]) ->
F ! {connect,S},
if
length(T) > 0 -> connectNodes([S|T]);
true -> S
end.

%% Run node unconnected until there is a connect message
unconnectedNode(TimerPid) ->
receive
{connect,SendTo} ->
nodeloop(SendTo,TimerPid)
end.

Here’s a run down of the functions and what they’re doing in more detail:

  • startRing(N,TimerPid) – this is the main exported function. It spawns N processes, then connects them in a ring. One of the Pids in the ring is returned (the nodes are all the same so it’s not too important which one).
  • spawnNodes(0, Pids, _TimerPid) – spawnNodes is a recursive function and this is the base case. The first parameter is decremented as processes are created. The second parameter accumulates a list of Pids. The third parameter is a Pid for a timer process that will be used later. In this base case definition, it is unused and thus prefixed with an _.
  • spawnNodes(Remaining, Pids, TimerPid) – the main recursive spawnNodes first spawns a new process with the function unconnectedNode() and does the obvious recursive call – decrement the # of processes left to create, add Pid to the head of the list of Pids, and pass the TimerPid through.
  • connectNodes([F|[S|T]]) – this function takes a list of Pids and connects them (recursively in a ring). If you noticed back in the startRing function, the last Pid was added to the head of the Pids before the initial call to connectNodes so that we will have a full ring. The first expression in connectNodes sends a message to process F to connect it to process S. If there are still pids left in T, we recurse while moving dropping the initial F pid. If no pids are left in T, we return S, which was a pid in the ring which will be returned by startRing.
  • unconnectedNode(TimerPid) – unconnectedNode is a sort of initial state for each node in the process where each node is only looking for a single connect message with the Pid it should forward messages to when it receives them. When it receives it, it will dump into the main node function nodeloop which we will look at next.

So, moving onward we look finally at the main nodeloop function from each node:

%% When connected, run loop waiting for either a stop or {Source,Val}
%% message where Source is the Pid of the node that originated the message.
nodeloop(SendTo,TimerPid) ->
receive
stop ->
SendTo ! stop;
start ->
io:format(“~p Starting message~n”, [now()]),
TimerPid ! start,
SendTo ! {self(), 0},
nodeloop(SendTo,TimerPid);
{Source, Val} when Source == self() ->
logOnNth(Val+1, 10000),
if
Val+1 == 1000000 -> TimerPid ! stop, SendTo ! stop;
true -> SendTo ! {Source, Val+1}
end,
nodeloop(SendTo,TimerPid);
{Source,Val} ->
SendTo ! {Source, Val},
nodeloop(SendTo,TimerPid)
end.

%% Log if Val is Nth value
logOnNth(Val, Nth) when Val rem Nth == 0 ->
io:format(“~p Around ring ~p times ~n”, [now(), Val]);
logOnNth(_Val, _Nth) -> void.

The nodeloop/2 function takes SendTo which is a Pid to forward messages on to and the TimerPid which is the timer process. The receive loop is expecting three kinds of messages: stop, start, and {Source,Val} (where Source is the originating Pid and Val is an int counter for the number of trips around the ring).

  • stop – just forwards the stop message on and most importantly does NOT call nodeloop, ending the recursion and allowing the function to end.
  • start – notify the TimerPid that the message send has started, send the initial {Source,Val} message with self() pid as the source, and finally recursively wait for the next message
  • {Source,Val} with guard – when a message is received, if this node was the source of the message, we log that every 10000 trips around the ring. If the message has traveled around the ring 1M times, we notify the Timer process to stop and switch to sending a stop message around the ring. Otherwise, we just forward the message and bump up the Val.
  • {Source,Val} – if this node was not the source, just forward the message and loop

So, this allows us to spin up a ring of size N, tell some process in the ring to start, and the message will go around the ring a million times.

I should also explain the timer process. This process has two states defined by functions timerOff() and timerOn(). The timer starts in the off position and waits for either a start or cancel message. On start, it switches to the timerOff() function, which just listens for a stop message. When stop is received, we calculate and print the elapsed time, then switch the timer off. Here’s the code:

%% Start timer listener process, return Pid of timer listener
startTimer() ->
spawn(fun() -> timerOff() end).

timerOff() ->
receive
cancel -> void;
start -> timerOn(now())
end.

timerOn(Start) ->
receive
cancel -> {timer,aborted};
stop ->
End=now(),
io:format(“Start=~p Stop=~p Elapsed=~p~n”, [Start,End,diff(Start,End)]),
timerOff()
end.

diff(Start, End) ->
{_,StartSeconds,StartMicros} = Start,
{_,EndSeconds,EndMicros} = End,
((EndSeconds*1000000) + EndMicros) – ((StartSeconds*1000000) + StartMicros).

The important thing to know about now() is that it returns a tuple {MegaSeconds, Seconds, Microseconds}. I’m making the assumption in the diff function above that MegaSeconds is not changing, which is completely not true all the time. But I was lazy and cut that corner for this code.

I also wanted to know how long it took to actually spawn the processes in the ring, so I modified the startRing function a bit to look like this:

startRing(N,TimerPid) ->
{Micros,Pids} = timer:tc(ring,spawnNodes,[N, [], TimerPid]),
io:format(“spawned: ~p in ~p microseconds~n”, [N,Micros]),
connectNodes([last(Pids)|Pids]).

This is wrapping a timer:tc call around the initial spawnNodes call which wraps the whole recursive spawn that creates all the processes.

Ah, so now let’s look at how it works:

1> c(ring).
{ok,ring}
2> T=ring:startTimer().
<0.38.0>
3> R=ring:startRing(100,T).
spawned: 100 in 241 microseconds
<0.40.0>
4> R ! start.
{1230,863064,116530} Starting message
start
{1230,863064,879862} Around ring 10000 times
{1230,863065,642097} Around ring 20000 times
{1230,863066,404689} Around ring 30000 times
…etc
{1230,863140,707023} Around ring 990000 times
{1230,863141,471193} Around ring 1000000 times
Start={1230,863064,116875} Stop={1230,863141,471380} Elapsed=77354505

Here I started a ring of 100 processes and sent a message around the ring 1 million times. It took 241 microseconds (.000002 seconds per process) to spin up the processes and about 77 seconds to send the messages around. Given the size of the ring, this is actually 100 million messages in 77 seconds or 1.3 million messages / second. I don’t have anything to judge against (yet) but that seems fast!

Of course, making process spawn and message send really fast are the basis of how the Erlang model works, so this should not be surprising.

I also tried just spinning up a lot of processes in the ring to see how fast that was:

3> R = ring:startRing(20000,0).
spawned: 20000 in 118580 microseconds

which actually created 20,000 processes in 0.12 seconds! So, as we saw earlier, process spawn is really fast.

Hope this was fun for someone. And I’m very open to constructive criticism on the Erlang code – I’m a total n00b.

Comments

8 Responses to “Running rings around Erlang”
  1. joew says:

    Good to see that you’re enjoying some Erlang. One thing that might help make your benchmarking a bit cleaner is to use the timer:now_diff(T1,T2) function (http://erlang.org/doc/man/timer.html) that is built-in and will take the difference between Erlang formated two times.

    #####
    1> T1 = erlang:now().
    {1230,871175,142795}
    2> T2 = erlang:now().
    {1230,871188,17012}
    3> Diff = timer:now_diff(T2,T1).
    #####

    Hope this helps!

  2. aphexddb says:

    Very cool Erlang exploration, saw this via Twitter. I was fooling around with Erlang recently (trying to talk to a JMS bus) and I have a feeling that I need about 2 years of practical Erlang experience to start writing elegant code. Goes to show that people should stick with LISP, etc. and not bother with all these non-functional programming languages ;)

  3. Alex says:

    Thanks Joe – I should have found that earlier! Switched in my own code and it works great.

  4. Jonas says:

    Great post. Fun example.
    Good to see you ramping up on Erlang. I really like it, only wish I could get paid to hack it :)
    Looking forward to more posts like this.

  5. codist says:

    A year ago I did a similar thing in Java on a 4 processor PC. Best I could get was 18,000 messages per second.

  6. pat says:

    See also the Erlang thread-ring implementation at the Computer Language Benchmark Game (Shootout). It uses lists:foldl() to spawn and connect the processes in one go, with self() also used as one process in the ring.

  7. Alex says:

    @codist – I’m running on a MacBook Pro with a 2.4 GHz Intel Core 2 Duo and 4 GB of RAM just for the record.

    @pat – many thanks for the reference! I knew there was a more clever way to do it.

    After sleeping on this blog, it also occurred to me that rather than send a stop message around the ring I could use linked processes and just have the ring die off in reverse based on trapping exit signals. I might give that a try later.

  8. Michael says:

    I also tried this problem last night, here is my solution.

    http://sudothinker.com/2009/2/27/erlang-exercises-interaction-between-processes-concurrency-ring-code

    I took it one step further and made sure each node also knew the node before it so it knew where to accept the message from, that way we can be sure that the message is making its correct path around the ring. Might be overkill for this type of problem, but in a bigger system it would make sense as the processes could get messages from anyone and we might not want to send them around the ring.

    Thanks for sharing!