Pure Danger Tech


navigation
home

Running rings around Erlang

01 Jan 2009

I wrote some Erlang code the other day to spin up a ring of processes (which are like super-lightweight green threads) and send messages around the ring. I’ve already done one full rewrite of this code but I suspect it’s still probably not very idiomatic Erlang code. I’d love to see someone else’s example or any comments from Erlang gurus out there.

There are three basic steps to this run:

  1. Start the processes in the ring and remember their Pids (process IDs)
  2. Connect the processes into a ring
  3. Send messages around the ring

I’ve kind of combined #1 and 2 into the same function to start up the ring. I suspect there’s probably some nifty way to use a map or foldl to do steps 1 and 2 at the same time but my feeble brain couldn’t figure out how to do the special case to connect the ring when it wraps around.

Anyhow, here’s the startRing/2 function which 1) spawns the nodes and 2) connects them: [source:erlang]

-module(ring).

-import(lists, [last/1]).

-export([startRing/2]).</p>

%% returns first pid in the ring of size N

startRing(N,TimerPid) ->

Pids = spawnNodes(N, [], TimerPid),

connectNodes([last(Pids) Pids]).

spawnNodes(0, Pids, _TimerPid) -> Pids;

spawnNodes(Remaining, Pids, TimerPid) ->

Pid = spawn(fun() -> unconnectedNode(TimerPid) end),

spawnNodes(Remaining-1, [Pid Pids], TimerPid).

%% Connect F and S until there are no nodes left to connect

connectNodes([F [S T]]) ->

F ! {connect,S},

if

length(T) > 0 -> connectNodes([S T]);

true -> S

end.

%% Run node unconnected until there is a connect message

unconnectedNode(TimerPid) ->

receive

{connect,SendTo} ->

nodeloop(SendTo,TimerPid)

end.

[/source]

Here’s a run down of the functions and what they’re doing in more detail:

  • startRing(N,TimerPid) – this is the main exported function. It spawns N processes, then connects them in a ring. One of the Pids in the ring is returned (the nodes are all the same so it’s not too important which one).
  • spawnNodes(0, Pids, _TimerPid) – spawnNodes is a recursive function and this is the base case. The first parameter is decremented as processes are created. The second parameter accumulates a list of Pids. The third parameter is a Pid for a timer process that will be used later. In this base case definition, it is unused and thus prefixed with an _.
  • spawnNodes(Remaining, Pids, TimerPid) – the main recursive spawnNodes first spawns a new process with the function unconnectedNode() and does the obvious recursive call – decrement the # of processes left to create, add Pid to the head of the list of Pids, and pass the TimerPid through.
  • **connectNodes([F [S T]])** – this function takes a list of Pids and connects them (recursively in a ring). If you noticed back in the startRing function, the last Pid was added to the head of the Pids before the initial call to connectNodes so that we will have a full ring. The first expression in connectNodes sends a message to process F to connect it to process S. If there are still pids left in T, we recurse while moving dropping the initial F pid. If no pids are left in T, we return S, which was a pid in the ring which will be returned by startRing.
  • unconnectedNode(TimerPid) – unconnectedNode is a sort of initial state for each node in the process where each node is only looking for a single connect message with the Pid it should forward messages to when it receives them. When it receives it, it will dump into the main node function nodeloop which we will look at next.

So, moving onward we look finally at the main nodeloop function from each node:

[source:erlang]

%% When connected, run loop waiting for either a stop or {Source,Val}

%% message where Source is the Pid of the node that originated the message.

nodeloop(SendTo,TimerPid) ->

receive

stop ->

SendTo ! stop;

start ->

io:format(“~p Starting message~n”, [now()]),

TimerPid ! start,

SendTo ! {self(), 0},

nodeloop(SendTo,TimerPid);

{Source, Val} when Source == self() ->

logOnNth(Val+1, 10000),

if

Val+1 == 1000000 -> TimerPid ! stop, SendTo ! stop;

true -> SendTo ! {Source, Val+1}

end,

nodeloop(SendTo,TimerPid);

{Source,Val} ->

SendTo ! {Source, Val},

nodeloop(SendTo,TimerPid)

end.

%% Log if Val is Nth value

logOnNth(Val, Nth) when Val rem Nth == 0 ->

io:format(“~p Around ring ~p times ~n”, [now(), Val]);

logOnNth(_Val, _Nth) -> void.

[/source]

The nodeloop/2 function takes SendTo which is a Pid to forward messages on to and the TimerPid which is the timer process. The receive loop is expecting three kinds of messages: stop, start, and {Source,Val} (where Source is the originating Pid and Val is an int counter for the number of trips around the ring).

  • stop – just forwards the stop message on and most importantly does NOT call nodeloop, ending the recursion and allowing the function to end.
  • start – notify the TimerPid that the message send has started, send the initial {Source,Val} message with self() pid as the source, and finally recursively wait for the next message
  • {Source,Val} with guard – when a message is received, if this node was the source of the message, we log that every 10000 trips around the ring. If the message has traveled around the ring 1M times, we notify the Timer process to stop and switch to sending a stop message around the ring. Otherwise, we just forward the message and bump up the Val.
  • {Source,Val} – if this node was not the source, just forward the message and loop

So, this allows us to spin up a ring of size N, tell some process in the ring to start, and the message will go around the ring a million times.

I should also explain the timer process. This process has two states defined by functions timerOff() and timerOn(). The timer starts in the off position and waits for either a start or cancel message. On start, it switches to the timerOff() function, which just listens for a stop message. When stop is received, we calculate and print the elapsed time, then switch the timer off. Here’s the code: [source:erlang]

%% Start timer listener process, return Pid of timer listener

startTimer() ->

spawn(fun() -> timerOff() end).

timerOff() ->

receive

cancel -> void;

start -> timerOn(now())

end.

timerOn(Start) ->

receive

cancel -> {timer,aborted};

stop ->

End=now(),

io:format(“Start=~p Stop=~p Elapsed=~p~n”, [Start,End,diff(Start,End)]),

timerOff()

end.

diff(Start, End) ->

{_,StartSeconds,StartMicros} = Start,

{_,EndSeconds,EndMicros} = End,

((EndSeconds*1000000) + EndMicros) – ((StartSeconds*1000000) + StartMicros).

[/source]

The important thing to know about now() is that it returns a tuple {MegaSeconds, Seconds, Microseconds}. I’m making the assumption in the diff function above that MegaSeconds is not changing, which is completely not true all the time. But I was lazy and cut that corner for this code.

I also wanted to know how long it took to actually spawn the processes in the ring, so I modified the startRing function a bit to look like this: [source:erlang]

startRing(N,TimerPid) ->

{Micros,Pids} = timer:tc(ring,spawnNodes,[N, [], TimerPid]),

io:format(“spawned: ~p in ~p microseconds~n”, [N,Micros]),

connectNodes([last(Pids) Pids]).

[/source]

This is wrapping a timer:tc call around the initial spawnNodes call which wraps the whole recursive spawn that creates all the processes.

Ah, so now let’s look at how it works:

[source:erlang]

1> c(ring).

{ok,ring}

2> T=ring:startTimer().

<0.38.0>

3> R=ring:startRing(100,T).

spawned: 100 in 241 microseconds

<0.40.0>

4> R ! start.

{1230,863064,116530} Starting message

start

{1230,863064,879862} Around ring 10000 times

{1230,863065,642097} Around ring 20000 times

{1230,863066,404689} Around ring 30000 times

…etc

{1230,863140,707023} Around ring 990000 times

{1230,863141,471193} Around ring 1000000 times

Start={1230,863064,116875} Stop={1230,863141,471380} Elapsed=77354505

[/source]

Here I started a ring of 100 processes and sent a message around the ring 1 million times. It took 241 microseconds (.000002 seconds per process) to spin up the processes and about 77 seconds to send the messages around. Given the size of the ring, this is actually 100 million messages in 77 seconds or 1.3 million messages / second. I don’t have anything to judge against (yet) but that seems fast!

Of course, making process spawn and message send really fast are the basis of how the Erlang model works, so this should not be surprising.

I also tried just spinning up a lot of processes in the ring to see how fast that was:

[source:erlang]

3> R = ring:startRing(20000,0).

spawned: 20000 in 118580 microseconds

[/source]

which actually created 20,000 processes in 0.12 seconds! So, as we saw earlier, process spawn is really fast.

Hope this was fun for someone. And I’m very open to constructive criticism on the Erlang code – I’m a total n00b.