IPC From Scratch
by Hayden Gray
9 min read
Inter-process communication (or IPC) doesn't typically cause programmers to feel warm fuzzies. "Wait, I just want my programs to talk to each other, why is this hard???" is a sentiment that I very much understand. Recently when working on Mixologist I came to a point where I needed a CLI, a GUI, and a daemon to all run and communicate with one another, sending info back in forth in a wonderfully concurrent, unordered fashion. Those two words being used together usually send shivers down the spine of any experienced programmer but, with a little legwork, the problem is solveable.
Humble Beginnings
Let's go back to the beginning; I started my work on Mixologists IPC requiring only a daemon and a CLI. This allowed me to dip my toes into the waters of IPC without having to go straight into the deep end. After doing some cursory research, I had a few options to pursue:
- Shared Memory
- Pipes
- Sockets
Initially, shared memory looked decently promising since it would give me full access to the shdared state of each process from any of the others but I ultimately decided against it simply because of the synchronization headache and the difficulty around creating custom clients to interact with my program. Pipes were also a no-go unfortunately because I expected to need bidirectional communication. This left me with sockets, probably one of the most common tools for IPC both due to their flexibility and relative ease of use.
So how do you make a socket?
sock_fd, sock_err := linux.socket(.UNIX, .STREAM, {}, .HOPOPT)
Well that isn't too bad.
Unfortunately, there is some extra work to do depending on if you're the server or the client. If you're the server, the setup is as follows:
sock_fd, sock_err := linux.socket(.UNIX, .STREAM, {}, .HOPOPT)
sock_addr: linux.Sock_Addr_Un,
sock_addr.sun_family = .UNIX
copy(sock_addr.sun_path[:], "/tmp/socket_name")
bind_err := linux.bind(sock_fd, &sock_addr)
listen_err := linux.listen(sock_fd, 1024)
So what did that do? Well, we start by initially setting up the
socket and its address, just giving it a name on the filesystem
to use. We then bind
the socket which is what actually sets
up the socket on the system. Finally, we listen
on the socket
which allows us to accept incoming connections.
The client side is simpler:
sock_fd, sock_err := linux.socket(.UNIX, .STREAM, {}, .HOPOPT)
sock_addr: linux.Sock_Addr_Un,
sock_addr.sun_family = .UNIX
copy(sock_addr.sun_path[:], "/tmp/socket_name")
connect_err := linux.connect(sock_fd, &sock_addr)
The setup is the same as for the server but instead of bind
and
listen
, we just connect
. This can of course error (the socket
may not exist) but that is something we can handle on the caller side.
Some of you may have noticed that I'm using .UNIX
and .STREAM
for
the socket type and are asking "what's that all about?" We are making
use of Unix domain sockets
here instead of INET sockets since they are targeted towards local
use. We are also making use of stream sockets instead of datagram
sockets because knowing about the connection itself is important.
Datagram sockets can be extremely useful when you rely on clearly defined package boundaries and don't require an order. They don't have the concept of a "connection" however which can make certain things harder to write.
Unidirectional Communication
Let's look at the interface for the CLI I built:
Flags:
-add-program:<string>, multiple | name of program to add to aux
-remove-program:<string>, multiple | name of program to remove from aux
-set-volume:<f32> | volume to assign nodes
-shift-volume:<f32> | volume to increment nodes
So, as it stands, the cli only needs to send things to the daemon.
That makes it so that we only ever need to call send
on the client
to get the data sent over to the server which is relatively simple:
bytes_sent, send_err := linux.send(sock_fd, message, {})
On the server side, we do the following:
buf: [1024]u8
// event loop
for {
client_fd, client_err := linux.accept(sock_fd, &sock_addr, {})
bytes_read, recv_err := linux.recv(client_fd, buf[:], {})
// handle message
// other event loop stuff
}
This is pretty simple but the minute we try and run the program, we'll
notice that the event loop just... stalls. The reason that happens is
because the socket is blocking. That means that whenever we call recv
on the socket, program execution will halt until the socket recieves some
data. So how do we solve this? Well fortunately, when you create a socket,
you can add a single flag, changing the instantiation to this:
sock_fd, sock_err := linux.socket(.UNIX, .STREAM, {.NONBLOCK}, .HOPOPT)
Now when we call recv, we can check recv_err
to see if it is either
EWOULDBLOCK
or EAGAIN
and if so, skip any subsequent code that would
depend on the result of a finished transmission.
Checking both is something you should do for portability reasons. Unfortunately, you can't assume that these error codes are the same value.
With that change in place, we can now run any of our commands (i.e.
mixcli -set-volume:0
) and hear (this is an audio program after all)
the results in real-time. Unfortunately, although we do have data
going to the daemon, we don't have anything coming back. That's a
bit of an issue if we want to add something like -get-volume
which could be good for scripts and other tools.
Bidirectional Communication
So how might we add that? Well, on the client we can do this:
bytes_sent, send_err := linux.send(sock_fd, message, {})
buf: [1024]u8
bytes_recv, recv_err := linux.recv(sock_fd, buf[:], {})
On something like the CLI, we actually don't want the socket to be
blocking. Instead, we can use a normal non-blocking socket and just have
recv
block until we get a response from the daemon.
On the daemon side, we can do some simple message processing and then craft a response to send back to the CLI:
buf: [1024]u8
for {
client_fd, client_err := linux.accept(sock_fd, &sock_addr, {})
bytes_read, recv_err := linux.recv(client_fd, buf[:], {})
// process input
// send reponse
bytes_sent, send_err := linux.send(client_fd, response, {})
// other event loop stuff
}
Now, when we run our program:
mixcli -get-volume
0.2
we get the expected response.
The N+1 Problem
With a CLI working, it's time to add a GUI. Let's start by roughing out the GUI with no actual backend plumbing:
So, what data is needed by the program? Well, the list of program rules that
should populate the text box list and the volume assigned to mixd
itself is needed.
This however, will be handled by inotify
watching the config for the time being.
We also want changes to the volume slider (bottom) though. These changes should of
course be sent to mixd
but we also want changes made to the volume by other programs
(i.e. mixcli
or another GUI instance) to show up here. That means that we
probably need a way to "subscribe" to changes to the volume. That means we
will need:
- Managing of multiple simultaneous connections
- Persistent connections
- A way to handle disconnected clients
- Tracking of "subscribers"
In a previous post, I mentioned the basic message-passing format I was using to send data from the client to the CLI. We will make a couple changes here though:
Message :: union {
Volume,
Program,
}
Volume :: struct {
act: enum {
Set,
Shift,
// these are new
Get,
Subscribe,
},
val: f32,
}
Program :: struct {
act: enum {
Add,
Remove,
},
val: string,
}
Note: we use CBOR to encode the message as it is trivially serialized, has a relatively low size, and is quite resilient to format changes.
So, these Get
and Subscribe
messages will allow us as users
to make a request to the server telling it that we want all future
updates to the volume. On the client, this is relatively simple:
msg := common.Volume {
.Subscribe,
0,
}
cbor_msg, _ := cbor.marshal(msg)
defer delete(cbor_msg)
bytes_sent, send_err := linux.send(client_fd, cbor_msg, {})
On the server however, we have an issue now: our current implementation only allows for a single connected client. That means if we want to keep the connection open to send data back to the client, we'll need to have a way of managing multiple sockets at once.
Enter poll()
Fortunately, there are tools that exist to deal with many sockets but there are two that I considered in this case:
- poll
- epoll
Although epoll
was attractive because of it's better asymptotic
performance, I decided against it for two reasons:
- More complex to set up
- Low number of sockets
So, on the server, we can start creating an IPC system, here's the state for it:
IPC_Server_Context :: struct {
server_fd: linux.Fd,
server_addr: linux.Sock_Addr_Un,
_clients: sa.Small_Array(MAX_CLIENTS, linux.Poll_Fd),
_removed_clients: sa.Small_Array(MAX_CLIENTS, linux.Fd),
_buf: [BUF_SIZE]u8,
}
We then initialize it like so:
IPC_Server_init :: proc(ctx: ^IPC_Server_Context) -> linux.Errno {
// this allows us to handle disconnects that aren't graceful
posix.signal(.SIGPIPE, IPC_Server__handle_sigpipe)
sock_err: linux.Errno
ctx.server_fd, sock_err = linux.socket(
.UNIX,
.STREAM,
{.NONBLOCK},
.HOPOPT
)
if sock_err != nil {
log.panicf("could not create socket with error %v", sock_err)
}
ctx.server_addr.sun_family = .UNIX
copy(ctx.server_addr.sun_path[:], SERVER_SOCKET)
// unlink the socket in the case of an unclean exit
linux.unlink(SERVER_SOCKET)
linux.bind(ctx.server_fd, &ctx.server_addr) or_return
listen_err := linux.listen(ctx.server_fd, 1024)
// set up a Poll_Fd to trigger when the server accepts connections
sa.append(&ctx._clients, linux.Poll_Fd{fd = ctx.server_fd, events = {.IN}})
return listen_err
}
core:containers/small_array
is really nice here since it gives us dynamic array semantics on a fixed size array, allowing us to stack allocate it.
With our basic server set up, we can now run the following code every cycle of the event loop:
_, poll_err := linux.poll(sa.slice(&ctx._clients), 5)
if poll_err != nil && mixd_ctx.should_exit do return
else if poll_err != nil do log.panicf("poll error: %v", poll_err)
if sa.get(ctx._clients, 0).revents >= {.IN} {
client_fd, client_err := linux.accept(
ctx.server_fd,
&ctx.server_addr,
{.NONBLOCK}
)
if client_err != nil do log.panicf("accept error %v", client_err)
log.debugf("client connected: socket %v", client_fd)
sa.append(&ctx._clients, linux.Poll_Fd{fd = client_fd, events = {.IN}})
}
In this case, we poll()
all active clients and also check if we
are able to accept()
on the server socket. If a new connection is
active, we add it to the list of clients. We can then iterate over the
list of clients and call read()
on them. In practice, the result looks like
this:
#reverse for &client, idx in sa.slice(&ctx._clients)[1:] {
if client.revents >= {.IN} {
bytes_read, read_err := linux.read(client.fd, ctx._buf[:])
if read_err == .EWOULDBLOCK || read_err == .EAGAIN do continue
// process message here
}
}
#reverse
is a surprise tool that will help us later
Subscriptions
Since we now have the option to handle multiple sockets concurrently, how do we handle subscriptions? Well, we can add the following field to our context struct:
_volume_subscribers: sa.Small_Array(MAX_CLIENTS, linux.Fd)
This is just a simple list that we can use to keep track of all of our volume subscribers. If we recieve a subscribe message, we just add the subscriber to the list of potential subscribers:
// other cases should also be handled for msg and act
switch msg in msg {
case common.Volume:
switch msg.act {
case .Subscribe:
_, found := slice.linear_search(
sa.slice(&ctx._volume_subscribers),
client_fd
)
if !found do sa.append(&ctx._volume_subscribers, client_fd)
}
}
Now, when we update the volume, we can just send a message out to each subscriber:
msg := common.Volume{.Get, mixd_ctx.vol}
for client_fd in sa.slice(&ctx._volume_subscribers) {
// wrapping this commonly used stuff into procs
IPC_Server_send(ctx, client_fd, msg)
}
This is all well and good but as it stands, we still don't know when
a client has disconnected or how we should manage that. Well, fortunately
with poll()
we can just check if reading from the socket returns
zero bytes and remove it from the connections list.
if bytes_read == 0 {
sa.unordered_remove(&ctx._clients, idx + 1)
sa.append(&ctx._removed_clients, client.fd)
}
the
#reverse
mentioned earlier allows us to callunordered_remove
without invalidating the iterator
After processing all the messages, we can also manage removing all the resources for each removed client:
for fd in sa.slice(&ctx._removed_clients) {
IPC_Server_remove_volume_subscriber(ctx, fd)
linux.close(fd)
}
Although we could call it a day here, we might also want to do
some extra work, most notably around handling if the socket
has not gracefully been closed. Fortunately, this is just
another if
:
if read_err != nil {
sa.unordered_remove(&ctx._clients, idx + 1)
sa.append(&ctx._removed_clients, client.fd)
}
Abstract Sockets
Before we finish up, there is a final concept that is extremely nice to have when using Unix domain sockets on Linux: abstract sockets. These are a non-portable, Linux-only extension that can be accessed by having the first byte of the socket path be NUL. This makes it so that the socket has no connection to filesystem pathnames and will automatically disappear when all references to the socket are closed (although there does seem to be a timer on this).
Putting it All Together
So what does this leave us with? Well, we effectively have ended up with a single-threaded server that is tailored to handling a custom format that we created. Being single-threaded allows us to avoid having to use synchronization primitives while the limited number of potential clients prevents this from becoming an issue.
So to summarize, on the server:
- Make a server struct to track the following
- Active sockets
- Sockets to remove
- Subscriptions
- Create a nonblocking socket
- Add the socket to the list of active sockets
- Poll list of active sockets and process their events
- Clean up sockets that are no longer connected
on a client like a GUI:
- Open a socket
- Send a "Subscribe" message
- Recieve all future events
- Send all updates to the client
and on a client like a CLI:
- Open a socket
- Send message
- Listen for response if applicable
A more complete implementation can be found in the Mixologist repo if more examples are needed.
So, hopefully this helped you if you made it this far. Thanks for reading and have a great day!