Scalable Termination Detection for Distributed Actor Systems

Transcript

Hi, my name’s Dan and I’m a grad student in Illinois. My advisor is Professor Gul Agha. Today I’ll be talking about a garbage collection algorithm for actor systems, based on something called termination detection. It's designed to scale to large clusters without significantly affecting latency.

Actor systems are built using the actor model of concurrency, where lightweight stateful processes called actors communicate with each other by sending asynchronous messages. The model is pretty popular for building low-latency distributed systems like databases and chat applications. These systems are usually run on a cluster of servers, where each server can host millions of actors.

Right now, the most popular frameworks for distributed actor systems are probably Erlang, Elixir, Akka, and Orleans. They all have automatic garbage collection for passive data, like records and objects. But, interestingly, none of them provide GC for the actors themselves. So programmers have to garbage collect actors manually, which as you can imagine is a significant source of bugs.

The issue is that ordinary GC techniques for a language like Java don’t work for actors. Although plenty of actor GC algorithms have been proposed, I would argue that none of them scale well enough on clusters to make them a viable choice. I’ll talk more about that a bit later, but first we should review how actors work and introduce some terminology.

As you can see in the legend there, we denote actors as circles and messages as dashed arrows. Here we have an actor with an undelivered message .

Initially an actor is idle, meaning it’s waiting for a message. Once an idle actor receives a message, it becomes busy (denoted by the orange square). A busy actor can then do a number of actions, for example spawning new actors. Notice that this gives the parent a reference to the child, which allows it to send asynchronous messages. These messages can contain ordinary data like this “hello” text, but they can also contain references. Here the parent is sending B a reference to A, and B receives it.

Note that a reference is just an address, not a channel. So in general, actors don’t know who has their address; to do that would require some synchronization.

Busy actors can also update their state. This can mean, for example, removing references from their local state. Notice that the parent can forget a reference before the message is even delivered!

Finally, most popular actor languages allow actors to perform some kind of effect, like writing to a file. Once an busy actor has done everything it wants to do, it can become idle again, ready to receive another message.

One last piece of terminology I want to introduce is the concept of an unblocked actor. That just means that either the actor is busy now or it has an undelivered message, meaning it will be busy at some point in the future. In this configuration, the unblocked actors are A and B, highlighted in blue.

So what does it mean for an actor to be garbage? The standard definition is that garbage actors can be destroyed without affecting the system's observable behavior. But garbage collectors usually don’t have access to an actor’s internal code, so we are forced to be a bit conservative:

If there’s any possibility for an actor to become unblocked, then it’s not safe to garbage collect it. This is because when the actor becomes busy, it might perform effects like writing to a log file.

Instead, we want actors that we can prove will remain blocked forever. That is, they're idle and will never receive a message. For historical reasons, we call these actors "terminated" but honestly at this point I wish we'd picked a different name for them. Anyway, clearly these actors will never be used, so it's safe to garbage collect them.

That’s all well and good, but how do you know when an actor is terminated? Let's look at a few examples.

Suppose that this is the state of the entire actor system. Is the blue actor X terminated here? Notice that it's idle and has no undelivered messages, so it's blocked. Also no actor has a reference to it. Since actors can't get references to each other out of thin air, it means no actor could ever get a reference to X. So yes: X is terminated because it is blocked and no actor could ever send it a message. Note that this is a type of terminated actor that you could easily detect using a technique like distributed reference counting.

How about this case? Now there is an actor C with a reference to X. But notice that it's blocked and so are all the other actors connected to it. So X is still terminated because every actor that could possibly send it a message is also blocked.

In this last case, we see that there is a path from a busy actor A to X. Now we can't tell if X is terminated because A could send a message to B, which could send a message to C, which could then send a message to X.

So maybe X is terminated when all the actors that can reach it are blocked? Unfortunately, life is not that easy for us. Consider the following example.

Here we see that all the actors that can reach X are blocked, except now there's a message to another actor E containing a reference to X. Here X might not be terminated because, when E gets the message, it will be unblocked and it will have a reference to X. The moral of the story is that it's not enough to look at each actor's local state: we also need to consider messages in flight. Let's formalize this.

We say that A is potentially acquainted with B if either A has a reference to B, or there is an undelivered message to A that contains a reference to B. Both of these situations can lead to a future configuration in which A has a reference to B. In either of these cases, we say that A is one of B’s potential inverse acquaintances.

Now we define potential reachability as the reflexive transitive closure of the potential acquaintance relation. Basically, when A can potentially reach D it means that there will be a path from A to D in the future, once all these messages have been delivered.

You can see now that the problem with that last case was that, although X is only reachable from blocked actors, it is potentially reachable from an unblocked actor - namely E. In general, an actor might not be terminated if it is potentially reachable by an unblocked actor. So what if an actor is not potentially reachable by an unblocked actor?

Well then that actor is certainly terminated, because at no point in the future could it possibly receive a message. So after all that work, we’ve finally defined our problem: We're looking for actors that are only potentially reachable by blocked actors.

The question now is, how do you find them?

One way you could find terminated actors is to compute a global snapshot, using an algorithm like Chandy-Lamport. That’s actually not as easy as it sounds, because that algorithm doesn’t support dynamic topologies like actor systems. But more fundamentally, that approach doesn’t scale because one slow node prevents all the rest from being able to find any garbage at all.

A clever workaround is the SALSA actor GC which uses approximate global snapshots, which are cheaper to compute and allows nodes to collect some local garbage. But their approach still has high overhead so in practice, each GC phase still significantly affects application performance.

Our approach is based on Pony’s GC, which is a fast message-based algorithm for multicore systems. Their main limitation is relying on causal message delivery, which is expensive to implement in clusters. We found a way to remove that requirement, making their approach scale. So how do these algorithms work if they don’t take a global snapshot?

First, we need to make sure that the actors communicate according to a special low-overhead communication protocol we’ve devised; Pony does the same thing but using a weighted reference counting protocol. Either way, the protocol enriches each actor’s state with some additional GC information that we call its knowledge set. Then, whenever an actor suspects that it has become garbage, it takes a snapshot of its local state and sends it to a special actor called a snapshot aggregator. (In Pony, actors have to do this whenever their message queue becomes empty. In our approach, actors can take snapshots whenever they want; if you like, you could give each actor a different heuristic.) The snapshot aggregator collects these local snapshots and periodically scans through them to look for actors that “appear” to be garbage. The magic of our approach is that, even though these actors are taking snapshots with no coordination at all, we can still identify which of those snapshots are consistent with one another. When an aggregator discovers some garbage actors, it can send it a self-destruct message to garbage collect it.

In Pony, this aggregator is a centralized entity, making it a bottleneck. But in our approach you could have a snapshot aggregator at each node, and then have the aggregators disseminate their snapshots amongst themselves.

How can the snapshot aggregator tell whether an actor is terminated? Let’s break it down.

First, the aggregator needs the ability to find any actor’s potential inverse acquaintances. Then it can find the potential inverse acquaintances of those potential inverse acquaintances, and so on, in order to find all the actors that can potentially reach a given actor.

Secondly, we need to know whether all of those actors that can potentially reach it are blocked. Basically that reduces to finding out whether an actor has any undelivered messages.

And most intimidatingly, it needs to know which snapshots are consistent with one another despite being taken at different times.

We address the first part with a kind of distributed contact tracing. Initially, we make sure each actor knows the actor that spawned it. Then we also make sure actors keep track of which references they send to one another. This allows you to trace a path from an actor to all of its potential inverse acquaintances.

This allows us to solve the second problem pretty easily, simply by having actors keep count of how many messages they send and receive for each reference.

All of this information is stored in the actor’s local state and gets sent over to the snapshot aggregator whenever the actor takes a snapshot.

But the burning question is how we can tell if two snapshots are mutually consistent if they were taken at different times. The answer is: magic! It turns out that the combination of contact tracing and message counts guarantees for us that any set of snapshots that “appears to be terminated” will also be consistent.

So how do we do this contact tracing? First, we need to make a little modification. Instead of using ordinary references (which are just addresses, remember) we ask the programmer to use these special reference objects that we call refobs. A refob is just a triple that we denote “x A lollipop B”. Here x is a globally unique token and A and B are actor addresses. This refob can only be used by A, called its owner, to send messages to B, which is called its target. When A is done with the refob, programmer needs to deactivate it, which triggers a special protocol between A and B.

So if refobs can only be used by their owner, how do you get new ones? There’s two ways. First, the parent actor gets one when it spawns the child. Secondly, if A has refobs to B and C, then it can create a new one pointing from C to B.

Let’s see an example of how this works.

Here we have busy actor A. The speech bubble shows its knowledge set, which remember holds some special GC-related information. For simplicitly we represent this knowledge set as a set of facts, like in datalog; in practice you would use a more optimized data structure.

When A spawns an actor B, it gets a refob pointing to it. This is represented with an “active” fact shown here. Similarly, when B is spawned its knowledge set contains a fact saying that it knows a refob x has been created. To save space, I’ll omit this additional information and just write “x” to refer to the entire refob.

Next A spawns another actor C, which creates a refob y. Now, say that A wants to give C a reference to B. To do that, it creates a new refob z, sends it in the message, and it also adds this fact to say that “A created a new refob z using x”.

Notice at this point that C is a potential inverse acquaintance of B, because of this message. But notice now that these knowledge sets create a path: B knows about x, which points to A. And A knows about z, which points to C.

This still holds when C receives the message, causing it to add a fact that z is “active.”

Now, if A creates a lot of these refobs then it’s knowledge set will keep growing without bound. So we give it a mechanism to forward the information to B inside of an “info” message.

Analogously, when C decides it doesn’t need z anymore, it deactivates it. That means it removes the fact about z from its knowledge set and sends a “release” message to B.

Now there are two ways this can go down, depending on which message gets received first.

In case 1, the info message arrives first. This causes B to add a fact to say, “there’s a refob z pointing to me and I think it might still be in use”. Notice that now there’s a path directly from B to C, instead of a path from B to A to C. When B finally gets the release message, it can delete this fact.

In case 2, the release message arrives first. This makes B add a fact saying “I got a release message for z, but I’m still waiting on that info message”. When it finally gets the message, it can delete that fact.

The takeaway from this is that by looking at all these actor's states, we can figure out all of B’s potential inverse acquaintances — unless there is an info message or a release message in transit. More on that later.

Next we look at message counts. These are pretty simple: each actor keeps track of how many messages it sent and received using each refob. Let’s see an example.

Here we have A getting a new refob that points to B. When A uses its refob for the first time, it initializes a send count for that refob to 1.

We also need to attach the token to the message so that, when it’s delivered, the recipient knows which receive count to increment. In this case, it’s receiving a message from this refob for the first time, so it initializes its receive count to 1.

The send and receive counts also need to be incremented whenever an info message is sent.

When an actor deactivates its refob, it also deletes the send count. When the target receives the release message, it deletes the receive count.

Putting these pieces together, how can we tell whether an actor is terminated just by looking at the snapshots?

First, remember that an actor is terminated if it is blocked, and all of its potential inverse acquaintances are blocked, and so on.

So let’s say we have a set of snapshots S and that actor B is the first one to take a snapshot.

In the paper we prove that if B’s knowledge set doesn’t have any facts of the form Created(x), then actually it has no potential inverse acquaintances and no undelivered messages. These actors are trivially terminated and can actually garbage collect themselves.

What if it does have a Created fact? That means A might be a potential inverse acquaintance of B. So to find out if B is terminated we need a snapshot from A.

Looking at A’s snapshot, there are a few possibilities.

First: What if A’s snapshot doesn’t contain Active(x)?

One possibility is that A has not yet received x from whoever created it. But that means A is unblocked, which means B is not terminated.

Another possibility is that A send a Release message to B. But then B is unblocked because it has this undelivered message, so B is still not terminated.

So the aggregator knows that B is only terminated when A’s snapshot contains Active(x).

Second: What if their message counts are not equal? Remember we assumed that B’s snapshot came first, so A’s snapshot came after. That means A’s send count can only be greater than B’s receive count, which means that at some point after B took a snapshot, B became unblocked because A sent it a message.

So the aggregator knows that B is only terminated if the message counts are equal. Moreover, we also know that no messages were sent along x between the time of A and B's snapshots.

Now here’s the interesting bit. Suppose that A created a refob y pointing to B before A took a snapshot. Remember that this causes it to add a CreatedUsing fact to its knowledge set. And remember that the only way for A to forget this fact is by sending an info message to B. Could this have happened? No! Because A would need to increment its Sent count to send that message, but we assumed the counts were equal. So any refobs that A created using x are guaranteed to be stored in A’s knowledge set.

Now the snapshot aggregator can see there’s another actor potentially acquainted with B, so it needs a snapshot from that actor as well. Just like for A, if that snapshot contains Active(y) and its message count agrees with B, then it will also have information about refobs that it created for B. In this way we can find every actor that is potentially acquainted with B and we also learn that they haven’t sent it any messages. So B is blocked - in fact it's been blocked ever since it took a snapshot.

What about the next actor to take a snapshot? The proof that it’s blocked too is pretty much the same, but now we use the assumption that the first actor B is already blocked. Proceeding this way we can conclude that every actor in S is blocked, so they’re all terminated.

Thanks for watching! Please feel free to email me if you want to talk or have any questions.