Overview: Concurrency with Actors
Actor systems model concurrency with actors. In such a system, actors are independent objects that communicate asynchronously with each other via message passing.
An actor can:
Receive and send messages asynchronously
Change private state
Create new actors
Each actor has an address under which it can receive messages. Addresses are typically opaque, but often actor systems also provide a global registry where actors can be registered under a well-known name.
Processing of messages within an actor happens sequentially. Incoming messages are queued in an inbox, and the actor works on each message in turn. State within an actor system should be encapsulated in actors and be exposed only via message passing.
One of the better known actor systems is the Erlang/OTP programming language and libraries. Erlang has been designed from the ground up as an actor system – addresses are their own datatype, sending messages is an operator, and a complete VM that abstracts away underlying hardware in an actor-friendly way. On the other hand there are also add-on packages for regular programming languages that provide an actor system. One such system is Thespian for Python.
Thespian Actor System
The Thespian actor system targets the Python programming language, it's a package installable via pip. Targetting Python has the advantage that you can make use of the massive Python ecosystem. On the other hand, there's some drawbacks in scalability and safety when compared with a system like Erlang/OTP. Most importantly care must be taken with actors mutating shared state as this could lead to unpredictable behaviour – Thespian can't prevent doing this like e.g. Erlang does.
In the vein of Erlang, Thespian also is able to distribute actors and messages across system boundaries using UDP or TCP. Distribution is transparent but can be controlled via the Thespians capabilities system which allows one to associate tags with systems.
Some Examples
The basis of Thespian are the .createActor()
method and the Actor.send()
and .receiveMessage()
methods. Actor classes are derived from the thespian.Actor
base class.
Below are two examples which are adapted from the docs on the Thespian website.
Hello World
A commented Hello World example, with a single actor which echos back a greeting.
from thespian.actors import *
# Our example actor
class Hello(Actor):
def receiveMessage(self, message, sender):
# The messages will be fed in here. Messages can be of any type as
# long as they're hashable. We will also get the sender address
# which can be used to answer back
self.send(sender, 'Hello, World!')
if __name__ == "__main__":
# Create our actor instance. ActorSystems are singletons representing
# the local actor environment. Both actors and the ActorSystem have
# a .createActor() method
= ActorSystem().createActor(Hello)
hello # Send "hi" to our actor, and wait 1s for an answer
# Actors aren't required to answer, but our Hello
# actor does
print(ActorSystem().ask(hello, 'hi', 1))
# Telling our actor to clean up for exit
ActorSystem().tell(hello, ActorExitRequest())
Running this yields a familiar message. For the example I've created a venv, and installed Thespian via pip pip install thespian
(venv) peter@pirx ~/src/scratch$ python3 hello.py
Hello, World!
Passing Messages Around
In the next example, the actor system sends a message to the first actor, which creates two other actors and passes the message on. The first of those sends a message to the second, which finally answers back to the original sender, the actor system.
Note how:
Messages can be anything that's pickleable – that includes actor addresses
Actors can be created dynamically
Also, the actors below are subclassing a convenience class ActorTypeDispatcher
instead of the plain Actor
class. The ActorTypeDispatcher
class will dispatch messages based on message type, to methods named receiveMsg_<type>()
if they exist, with <type>
being the type name. Createing classes for each type of message is a common pattern with Thespian, so dispatching messages based on type makes a lot of sense.
from thespian.actors import ActorSystem, ActorTypeDispatcher, ActorExitRequest
class Greeting:
# Just a class to hold a greeting and some addresses
def __init__(self, msg):
self.message = msg
self.send_to = []
def __str__(self):
return self.message
class Hello(ActorTypeDispatcher):
def receiveMsg_str(self, message, sender):
# This method will handle all messages of type str()
if message == 'hi':
# Create two actors of type World and Punctuate
= self.createActor(World)
world = self.createActor(Punctuate)
punct # Create a greeting and set the punctuate and orig. sender addresses
= Greeting('Hello')
greeting = [punct, sender]
greeting.send_to # Send the greeting to the World actor
self.send(world, greeting)
class World(ActorTypeDispatcher):
def receiveMsg_Greeting(self, message, sender):
# This will receive all messages of type Greeting
# Update the message and pass it to the first address
= message.message + ", World"
message.message = message.send_to.pop(0)
next_to self.send(next_to, message)
class Punctuate(ActorTypeDispatcher):
def receiveMsg_Greeting(self, message, sender):
# This will receive all messages of type Greeting
# Update message again and send back to orig. sender
= message.message + "!!!"
message.message = message.send_to.pop(0)
next_to self.send(next_to, message)
if __name__ == "__main__":
= ActorSystem().createActor(Hello)
hello # Send message to first actor, then send an exit request
print(ActorSystem().ask(hello, 'hi', 0.2))
ActorSystem().tell(hello, ActorExitRequest())
Running it again gets us a familiar message:
(venv) peter@pirx ~/src/scratch$ python3 hello2.py
Hello, World!!!
Distributed Messaging
Actor system implementations
When instantiating the singleton ActorSystem, there are different flavors to choose from. By default the "simpleSystemBase" will be instantiated, which runs actors synchronously in one process. This lends itself well for experimentation as we've done above, and also for testing and debugging.
A very useful implementation is the "multiprocTCPBase" ActorSystem. This will run each actor in it's own process, plus an additional management process. It'll use TCP sockets to communicate between actors; this allows actors to reside on different hosts altogether.
Note: in contrast to the "simpleSystemBase" implementation, the lifetime of the multiprocTCPBase ActorSystem exceeds that of the process that started it – it'll fork itself into the background and can/will be reused until explicitly shutdown or killed.
Capabilities
In principle actors are location independent, and (when using the multiprocTCPBase implementation) could be instantiated across system boundaries.
Capabilities are descriptive tags for an actor system which can be used to mark specific systems. Actors can specify requirements, so that they get instantiated at the actor system that have the right capabilities for them.
For instance, an actor system could be marked as HasDBAcccess: True
, and actors which specify @requireCapability("HasDBAcccess")
as a requirement would then be instantiated on that system.
Example actor which uses the requireCapability decorator:
from thespian.actors import *
@requireCapability('HasDBaccess')
class DBActor(Actor):
def receiveMessage(self, message, sender):
# do some stuff, presumably requiring db access
And here is how you would specify the capabilities of an actor system (with the multiprocTCPBase implementation)
from thespian.actors import *
'multiprocTCPBase', capabilities={'HasDBaccess': True}) ActorSystem(
Besides the True/False logic of the requireCapability decorator there's also other more selective ways to specify requirements for actors (via the actorSystemCapabilityCheck()
static method).
Conventions
Capabilities are also used to link multiple actor systems together. This mechanism is called Conventions, and is implemented for the multiprocTCPBase and multiprocUDPBase based actor systems. The way it works is by picking one system of multiple cooperating ones as the leader system, and specifying it's ipaddr as a capability.
from thespian.actors import *
# Specify ipaddr 10.5.1.1, port 1900 as the endpoint for the convention leader
'multiprocTCPBase', capabilities = {'Convention Address.IPv4': ('10.5.1.1', '1900') }) ActorSystem(
This will make the system at host 10.5.1.1
the convention leader. Other systems which specify 10.5.1.1
as the Convention address will connect to the leader system. Now, if an actor is created that has a specific capability requirement, for instance specifying @requireCapability("HasDBAcccess")
, first the local actor system will be checked for that capability. If the local actor system doesn't have that capability, other actor systems that are linked into the Convention will be queried, until one is found that has the HasDBAcccess
capability. The actor will then be instantiated on this actor system. This happens in a completely transparent manner – the method that created the actor doesn't need to know or care on which actor system the actor got created. All it needs is the actor address.
An important constraint is that all cooperating actor system must have the code that implements the actors; the Convention protocol will pass on type information and actor data, but not the actual code. The most straightforward way is to install the codebase on all cooperating systems via standard deployment mechanisms. There is also a Thespian facility to distribute code within a convention but this is beyond the scope of this post.
Distributed Example
Let's look at an example of a convention, linking together two actor systems using the multiprocTCPBase implementation. The example implements an echo client and server with 3 modules.
Server module: starts an actor system and specifies two capabilities:
It sets a flag which I imaginatively name "Server". Note this is a made up name, it only serves to distinguish it from the client
And it sets the Convention address to it's own ip address. It'll therefore be considered the Convention leader
Client module: starts an actor system and specifies two capabilities as well
A made-up tag "Client", to distinguish it from the "Server" actor system above
A convention address, which is expected to be passed in from the command line, so it knows which Convention leader to connect to
Echo module: This defines two actors, an EchoRequestor which will run on the client actor system, and an EchoServer, which will run on the server actor system.
Server module
First, the server is fairly simple. We just start an an actor system and make it the convention leader. It'll get the "Server" capability, and set it's own address as the Convention address, implicitly making it the leader of the Convention.
import logging.handlers
import socket
from thespian.actors import ActorSystem
def get_my_ip():
"""Return the ipaddress of the local host"""
return socket.gethostbyname(socket.gethostname())
if __name__ == "__main__":
# Setting up some logging
= logging.getLogger("Echologger")
log
log.setLevel(logging.DEBUG)= logging.handlers.SysLogHandler(address="/dev/log")
handler
log.addHandler(handler)
# Setup this system as the convention leader, and give it a capability "Server"
# Note by default actor systems use port 1900, so we'll set this here too
= {"Convention Address.IPv4": (get_my_ip(), 1900), "Server": True}
capabilities "multiprocTCPBase", capabilities) ActorSystem(
Once started, the starting process will fork the actor system in the background and exit. Note: actor systems are considered singletons. Should a second process on the same host run the server module, it'll only connect to the existing one. Warning: this is a bit of a pitfall when developing. The old server process will still have the old code. If you're trying out your code, be sure to explicitly kill any existing actor system so your new code gets used.
Client module
Below the example client module. The client module also specifys the convention leader address. We expect to get it from the commandline, as well as the number of pings to perform. The client will create an echo actor echo_app
. We tell it how many echo requests it should perform, and an echo payload. Finally we poll the echo actor for a response.
import sys
from datetime import timedelta
from thespian.actors import ActorSystem
if __name__ == "__main__":
# We take the convention leaders address from the command line
# Also, we tag this system with "Client"
= {"Convention Address.IPv4": (sys.argv[1], 1900), "Client": True}
capabilities = ActorSystem("multiprocTCPBase", capabilities)
actor_system # We create an actor from the echo library with class EchoRequestor
= actor_system.createActor("echo.EchoRequestor")
echo_app # Send the echo actor a message: the number of echo requests it should perform
int(sys.argv[2]))
actor_system.tell(echo_app, # Now, send the echo payload, and wait max. 10s for an answer
= actor_system.ask(echo_app, "hello world", timedelta(seconds=10))
resp while resp:
# If we get "echo_done" as an answer we break out
if resp == "echo_done":
break
# Otherwise we'll retry to get a response
print("unexpected message {}".format(resp))
= actor_system.listen(timedelta(seconds=10)) resp
As with the server module, if you're making changes be sure to kill old actor system processes, as they will keep running in the background!
Echo module
Now, the echo module, used by both client and server – the meat of this actor application.
It has these classes:
Simple dataclasses named Ping and Pong
Actor class EchoRequestor (for issuing pings)
Actor class EchoServer (for sending back pongs)
import datetime
import logging
import logging.handlers
from thespian.actors import ActorTypeDispatcher, requireCapability
# Set up some logging to see what is going on
= logging.getLogger("Echologger")
log
log.setLevel(logging.DEBUG)= logging.handlers.SysLogHandler(address="/dev/log")
handler
log.addHandler(handler)
class Ping:
"""A simple object that just carries a payload"""
def __init__(self, payload):
self.payload = payload
class Pong(Ping):
"""Same as the ping class.
We subclass it so we can distinguish it by type, but it's really the same thing
"""
pass
@requireCapability("Server")
class EchoServer(ActorTypeDispatcher):
"""The echo server actor
It will receive ping messages, log them, and reply back to the sender with
a pong message
Specifies a system tagged with the "Server" capability as a requirement.
This will cause the linked actor systems to instantiate it on the server
actor system
"""
def receiveMsg_Ping(self, ping_request, sender):
"Got {}, ponging back at {}".format(ping_request, sender))
log.debug(self.send(sender, Pong(ping_request.payload))
@requireCapability("Client")
class EchoRequestor(ActorTypeDispatcher):
"""The echo client actor
It specifies an actor system tagged with the "Client" capability. The
client module is tagged with Client: True, so this actor will get
get started on the client actor system
"""
= None # hold an echo server instance
echo_server
def __init__(self):
# Initialise counters and timer, and calls the superclass constructor
self.pings_to_send = 0
self.pongs_to_receive = 0
self.time = None
super().__init__()
def receiveMsg_int(self, count, _client):
"""Add integer as a count of pings to send
If this actor receives an integer, it'll interpret it as
a count of pings, and add it to the pings to send counter
"""
self.pings_to_send += count
def receiveMsg_str(self, payload, client):
"""Receive a payload and start pinging
If this actor receives a str message, it'll interpret it as a paylod
to ping with, and start pinging the number of times
"""
# First we save the client, we will need it later to notify once we're done
self.client = client
# Then, instantiate an echo server. As the EchoServer class has a requirement
# "Server" it'll get started on the actor system tagged with the "Server" capability
self.echo_server = self.createActor(EchoServer)
# Then start to send out ping messages, and save the start time
= Ping(payload)
ping
log.debug("Sending, srv: {}; message: {}; count: {}".format(
self.echo_server, ping, self.pings_to_send
)
)self.time = datetime.datetime.now()
for _ in range(1, self.pings_to_send):
# Fire out pings_to_send pings to the server
self.send(self.echo_server, ping)
# Update counters
self.pongs_to_receive += self.pings_to_send
self.pings_to_send = 0
def receiveMsg_Pong(self, _pong, _server):
# Receive answers back from the echo server actor
# We decrease the counter until it's zero
self.pongs_to_receive -= 1
if self.pongs_to_receive <= 1:
log.info("Got all messages, timedelta: {}".format(
- self.time
datetime.datetime.now()
)
)# We're done, send a message to the client saying so
"Sending end request to {}".format(self.client))
log.info(self.send(self.client, "echo_done")
Running the example
Run this example on two separate hosts. I'm running Ubuntu and have created an LXD container for the server actor system, while the client runs on the underlying host (my notebook). Other Linuxes as well as MacOSX and Win32 should work as well, as long as they can talk tcp on port 1900. Note however that the example source must be available on both hosts, and of course Thespian must be installed as well (I've created a venv for this and installed via pip install thespian
).
On the server host, get the host ip address and then start the server. Note this will return immediately, as the actor system is forked into the background
(venv) ubuntu@devcontainer-1PtYuGO:~/src/thespianecho$ hostname -i
10.0.8.145
(venv) ubuntu@devcontainer-1PtYuGO:~/src/thespianecho$ python3 ./server.py
Kick off the example on the client host. Pass in the ip address of the server host, and a count of pings to send, and it should answer when it's done, and how long it took:
(venv) peter@pirx ~/src/thespianecho$ python3 client.py 10.0.8.145 1000
DEBUG:Echologger:Sending, srv: ActorAddr-LocalAddr.0; message: <echo.Ping object at 0x7fe427bcaf10>; count: 1000
INFO:Echologger:Got all messages, timedelta: 0:00:03.716326
INFO:Echologger:Sending end request to ActorAddr-(T|:33033)
Note the server host will log received pings:
DEBUG:Echologger:Got <echo.Ping object at 0x7f24ecd81710>, ponging back at ActorAddr-(T|10.0.8.1:42453)
DEBUG:Echologger:Got <echo.Ping object at 0x7f24ece2cda0>, ponging back at ActorAddr-(T|10.0.8.1:42453)
DEBUG:Echologger:Got <echo.Ping object at 0x7f24ecdf85c0>, ponging back at ActorAddr-(T|10.0.8.1:42453)
...
Coda
This concludes the blog post. You can find the distributed example on Github: https://github.com/sabaini/thespianecho
If there's questions you can ping me on Twitter. Also the Thespian user group is excellent and very helpful.