RPC Part 4: Sequences and Streams


Intro

In the previous post we learned about capabilities and Capability Exchange. We examined how capablities provide elegant, powerful, and highly efficient mechanisms for interacting with remote distributed objects and their state. Finally, we touched on some of the ways in which capabilities can be used to build secure systems. In this post we will look at streaming sequences which provide a model for dealing with data whose length is very large (or even infinite).

Motivating Example

Consider a multiplayer game architecture from the highest level. Abstractly, it might consist of a game server and some number of game clients. The game server orchestrates the game. Each of the clients collects input from one of the players and passes it to the game server. The game server combines all of the inputs, updates the active state of the game, and communicates game state changes to the clients. Finally, each client updates its own display to reflect the current state of the game. This process continues in a loop indefinitely until the game server determines (according to the rules of the game) that the game is over.

Let’s try to define an eventual interface to manage the communication between the clients and the game server. A simple design might look like:

1
2
3
4
5
[Eventual]
interface IGame
{
  Promise<GameEvent> GetNext();
}

Assuming s is a GameProxy, the main game loop might look like:

1
2
3
4
5
6
7
8
while (true) {
  switch (await s.GetNext()) {
    // Handle different types of events.
    ...
    case EndGame:
      return;
  }
}

This design is very simple, but it works. Each client will receive a sequence of events by repeatedly calling GetNext. Each client will handle that event, and then call the server again for the next event. To maintain the proper ordering during event handling the client can easily control when the next event is received by controlling when it calls GetNext. There are three main drawbacks to this design:

  1. Unnecessary round-trips.
    Each call to GetNext requires a full network round-trip (1xRTT). The client must first send a message to the server, and then the server must respond. This increases the network latency between events and doesn’t allow for any pipelining from the server. This will result in the game being less responsive and less scalable.

  2. Additional allocations.
    Each call requires individual allocations for at least the response promise (and possibly for the GetNext message, depending the conditions). Furthermore, there is no batching of events, even if they occur close together. Though all of these allocations are small, if the number of events is very large this can create additional GC pressure resulting in the game being less responsive and less scalable.

  3. The server-side ordering is implicit.
    The GetNext method doesn’t by itself indicate which event should be next. Different clients may process the events at different rates (due to both networking characteristics and hardware considerations). The server MUST keep track of which events have been sent to each client so that it can return the correct next message for the calling client.

Push vs Pull

What if we invert the call flow? What if we instead define the game server interface to take a capability from the client. Then the game server can call methods on that capability any time it wants. This is sometimes called Server Push where the server can push data to the client without the client having to make a request first.

1
2
3
4
5
6
7
8
9
10
[Eventual]
interface IGame
{
  Promise SetEventCallback(ClientProxy callback);
}
[Eventual]
interface IClient
{
  Promise HandleEvent(GameEvent e);
}

Now, instead of having a game loop, the client would set its callback capability and then just wait for the game to be over, relying on the implementation of the callback to handle all of the game events:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ClientHandler handler = new();
await s.SetEventCallback(new ClientProxy(handler));
await handler.Done;

...

sealed class ClientHandler : AClientServer
{
    Promise Done => new(m_done);  // Resolved when the EndGame event arrives.

    public async Promise HandleEvent(GameEvent e) {
      switch (e) {
        // Handle different types of events.
        ...
        case EndGame:
          m_done.Resolve();  // game is over.
          return;
      }
    }
}

This design addresses problems (1) and (3) above:

  • Half the network latency.
    The server can now push events, whenever they occur, directly to the client. The client will receive them after only half the previous network latency (i.e. the path from server-to-client, without first the client-to-server needed to send the GetNext message). That is, the average latency is now ½xRTT.

  • Explicit server-side ordering
    The server doesn’t need to wait for the client’s response before making another call to HandleEvent. Instead, the server can pipeline any number of calls immediately and rely on Instance Order to dispatch them at the client in the proper order. The order the server makes the calls explicitly defines the order the game events occurred in.

That seems like a definite improvement, however, we have now introduced four new problems:

  1. Loss of client-side ordering.
    The client will now dispatch the next event as soon as the previous event yields (e.g. awaits). Dispatch Order only guarantees strong ordering of the first turn of an activity. If handling of an event takes multiple turns (e.g. plays an input-blocking animation), then the next event will improperly dispatch before the previous event is finished. Just awaiting within the event handler is no longer sufficient to properly order event processing. Instead the client implementation would need to do something more sophisticated to defer processing of the next event from the server.

  2. No backpressure.
    The client has no control over how many events the server can send it at one time. If the server keeps pushing events at a rate faster than the client can handle the events will start to queue up either in the dispatcher or in the event handler implementation. This will eventually lead the client to exhaust its own memory and crash - unless it can provide backpressure to the server to tell it to stop or slow down. With the current design the server has no way to know that the client is not keeping up.

  3. No way for the client to gracefully exit early.
    If a player leaves the game early (before the game is finished) there is no graceful way to tell the server to stop pushing messages. We’d need to add another method to IGame that would tell the server to stop.

  4. No way for the client to detect when the server has crashed.
    If the game server were to crash, or a network partition were to occur, there is no way for the client to detect it. The client is waiting for the server to make calls on its interface. If the session with the game server were to disconnect then the server would simply stop making calls, but the client wouldn’t know when to stop waiting. Since the server was disconnected abruptly, no EndGame event would ever be sent.

A better solution to all of these problems is a Streaming Sequence.

Streaming Sequence

Let’s first consider what kind of characteristics we’d want from an ideal streaming sequence of items. Then we can try to employ that ideal and see if it addresses any of the problems we have identified so far.

What is a streaming sequence? Obviously from the name, it is a sequence of items. So there must be some party who creates the items, we’ll call them the Producer. And then some party who makes use of the items, we’ll call them the Consumer. Streaming implies that the sequence doesn’t necessary exist all at once, but instead that it is produced and/or delivered incrementally — otherwise it would just be an Array. The streaming nature of the sequence allows both the producer and the consumer to overcome both real and practical limitations on the size of available memory. Network and serialization protocols must use buffers that fit into available memory. A serialization of a very large object tree, for instance, can’t be bigger than the largest such buffer. A streaming sequence, however, can deliver a set of items that couldn’t fit into memory (if they were all materialized at once). A very large object tree could be delivered as a streaming traversal, requiring only buffers large enough to handle a single node at a time.

If our message passing system has Capability Exchange (as we discussed in the previous post), and we could define a capability for a streaming sequence, then we could use that feature to send streaming sequences as part of our messages. Since there are two parties that work together to define a streaming sequence perhaps we need two capabilities, one for the consumer and for the producer?

Let’s propose the following: a streaming sequence is actually defined by two capabilities that work together to create a single instance:

  • A Consumer Capability: ISequence<T>
    A capability that gives the holder the right to read a sequence of zero or more (and possibly infinite) items (of type T).

  • A Producer Capability: ISequenceWriter<T>
    A capability that gives the holder the right to write zero or more items (of type T) into a sequence and then to terminate that sequence.

The reader side of a streaming sequence provides the consumer with four actions:

1
2
3
4
5
6
7
interface ISequence<T>
{
  Promise<T> Read();
  void Cancel(Exception? error = null);
  int Capacity { get; set; }
  int BatchSize { get; set; }
}

The ability to read the next item in the sequence. The ability to gracefully terminate the sequence from the consumer side and promptly release all consumer-side resources (optionally with a diagnostic error for the producer’s use). The ability to set the parameters of the flow control modelling including its overall in-flight window capacity as well as its limits on automatic batching.

Alternatively, the writer side of a streaming sequence provides the producer with four actions of their own:

1
2
3
4
5
6
7
interface ISequenceWriter<T>
{
  void Write(T item);
  Promise Flush();
  Promise Close(Exception? error = null);
  void Abort();
}

The ability to write the next item into the sequence. The ability to observe the current state of backpressure (via Flush) and to optionally await until backpressure has eased. The ability to end the sequence gracefully or to terminate the sequence with a diagnostic error (for the consumer’s information). And finally, the ability to abruptly abort the sequence and promptly release all producer-side resources.

When a new streaming sequence is created, the two capabilities are returned as a pair. The creator can then distribute either the read capability and/or the write capability as they sees fit.

Since our message passing system supports Capability Exchange (as we discussed in the previous post) we may exchange either the read capability or the write capability (or both) in the payload of messages (including response messages).

Sequence In Action

Let’s assume that our message passing system supports streaming sequences (as, of course, Promise RPC does or we wouldn’t be writing this). Borrowing the syntax from Promise RPC, we’ll assume for the sake of this discussion that a streaming sequence is implemented by two special, system implemented capabilities. The type Sequence<T> implements the consumer capability and the type Sequence<T>.Writer implements the producer capability. The message passing system is responsible for implementing buffering, batching, and flow control between these two capabilities (constrained by the parameters set by the consumer). The consumer type Sequence<T> also implements C#’s IAsyncEnumerator which executes a call to the ISequence<T>.Read method once per loop iteration.

With Sequence<T> we can redefine our game server interface:

1
2
3
4
5
[Eventual]
interface IGame
{
  Sequence<GameEvent> GetEvents();
}

Assuming s is a GameProxy, the main game loop might, once again, look like:

1
2
3
4
5
6
7
8
await foreach (GameEvent e in s.GetEvents()) {
  switch (e) {
    // Handle different types of events.
    ...
    case EndGame:
      return;
  }
}

This code is suspiciously similar to the code we started with, however, despite is apparent simplicity, this code has none of the problems we encountered earlier:

  1. Unnecessary round-trips.
    Though each iteration of the foreach loop calls the sequence Read method, this call is NOT a normal eventual RPC. The underlying message passing system is managing an asynchronous stream of items with buffering and flow control. If an item already exists in the consumer-side buffer, a call to Read is handled locally and promptly (i.e. it does NOT need to yield during the await). Additionally, the server writes items directly into the producer capability and the message passing system is responsible for handling buffering, nagling, and coalescing to produce efficient batching for asynchronous background transmission that overlaps with both the server and client side processing. Like in the section Push vs Pull above, the average latency is ½xRTT.

  2. Additional allocations.
    There are significantly fewer underlying network RPCs in the sequence model. Since the message passing system is providing automatic buffering, nagling, and coalescing, multiple events that occur close together will be combined into a single batch for more efficient network transmission. Additionally, there are no allocations when reads are resolved promptly from the consumer-side buffer. No Read messages are actually sent, the message passing system employs push in the background to asynchronously deliver items to the consumer-side buffer.

  3. The server-side ordering is implicit.
    The server writes directly into the producer capability. The order of the writes explicitly defines the order of the items that will be read from the sequence by the consumer. The message passing system is responsible for keeping track of order as well as the consumer’s position within the sequence.

  4. Loss of client-side ordering.
    The client-side main loop doesn’t extract the next item from the sequence until it starts the next iteration of the foreach loop. If the client awaits in the middle of the loop then no items will be extracted or dispatched. So, awaiting is once again sufficient to maintain client-side ordering even when the event processing activity takes multiple turns to complete (e.g. an input-blocking animation).

  5. No backpressure.
    The client has full control over both the overall size of the flow control window and its internal batching. This gives the client direct control over how much memory the client is willing to commit to event processing. The message passing system’s implementation of flow control is responsible for guaranteeing that the server never exceeds these limits. Additionally, the server can observe client backpressure (communicated automatically through out-of-band messaging by the message passing system in the upstream direction consumer-to-producer). In response to observed backpressure, the server can pause production, coalesce items, or drop unnecessary or redundant events to reduce traffic until the client catches up.

  6. No way for the client to gracefully exit early.
    The client can gracefully end the sequence by calling Cancel (or by exiting the foreach loop with a break or an exception). The server can observe the client’s sequence cancellation during its next call to Write or Flush and then perform its own clean up.

  7. No way for the client to detect when the server has crashed.
    If the game server were to crash, the client will observe the sequence termination in the next call to Read. The client can then perform its own clean up or recovery.

Streams

One of the most commonly encountered scenarios for streaming sequences is the need to stream a sequence of bytes. A game’s save-game file written to disk. Compressed audio for real-time voice chat. Video streaming of cut-scenes or instant replay. Logically this could be accomplished with a Sequence<byte>, but in practice this isn’t very efficient. Rarely does the consumer of the sequence want to read a byte at a time, but instead it is preferable to read larger chunks for more efficient processing. Similarly, rarely does the producer write a single byte at a time, but instead larger chunks (e.g. a whole buffer’s worth) are written all at once.

Instead you could use a Sequence<Memory<byte>> where the producer writes whole chunks that the consumer then reads. This would be more efficient certainly, but it still has several draw backs:

  1. Buffer Alignment.
    The boundaries of the chunks the consumer needs to look at may not align with the boundaries of the chunks the producer writes. Sending a sequence of memory buffers forces the consumer to read the data at the same boundaries as the producer.
  2. Memory Availability.
    The memory availability of the consumer may not be the same as the producer. If the consumer has more memory they may want to read much larger chunks for more efficient processing with less overhead. If the consumer has less memory they may have to read much smaller chunks to avoid memory overload.
  3. Flow Control Alignment.
    The consumer sets the parameters of the flow control window, but the producer determines the size of the produced chunk. If the chunk is too large then the flow control window will NEVER be large enough to allow forward progress and the sequence will stall.

To address all of these issues, and for convenience, a specialized implementation we’ll call Bytes is also made available in most message passing systems. The Bytes is logically identical to a Sequence<byte>, but with the following caveats:

  1. Batched Writes.
    The Write method of Bytes.Writer accepts a Memory<byte> for efficiency (like Sequence<Memory<byte>> did), but the Bytes is NOT required to honor or maintain chunk boundaries. Like a TCP socket, a Pipe, or a file handle, the internal buffering of a Bytes coalesces all bytes into a single continuous sequence with no internal boundaries. Flow control is thus defines in terms of bytes not chunks and is allowed to deliver a fraction of a producer written buffer (if that is all the open window allows).
  2. Non-aligned Batched Reads.
    The Read method of Bytes returns a Memory<byte> for efficiency, but it also does not honor the boundaries of chunks written by the producer. It can deliver up to as many bytes as are available in the consumer buffer (or the maximum batching parameter defined by the consumer - whichever is smaller).

Promise RPC

We’ve already touched on most of the features Promise RPC delivers for streaming sequences. It implements first-class capabilities for both Sequence<T> and Bytes. It provides an implementation of streaming’s underlying flow control with buffering, nagling, and automatic batching. In this section, I’d like to highlight some additional special features that may be unique to Promise RPC’s approach to streaming sequences.

  1. It’s a Promise-type!
    We’ve already seen code examples where an RPC method directly returns a Sequence<T>. This demonstrates that Sequence<T> is an eventual type. But beyond the obvious, even though Sequence<T> is not derived from Proxy<T> it also exhibits other aspects of proxies. For instance, methods can be pipelined on it, such as setting the flow control parameters or posting a Read before the Sequence has been resolved remotely. And the writer has factory methods for creating async iterators.

  2. It’s Full Duplex!
    A Sequence<T> can appear as either a return value or as an argument to an RPC method:

1
2
3
4
5
[Eventual]
interface IFunWithSequences
{
    Sequence<Item> GetAndSend(Sequence<Item> forYou);
}

Many protocols support streaming, but few support arbitrary first-class streaming sequences. For example:

1
2
3
4
5
[Eventual]
interface IMoreFunWithSequences
{
    Sequence<A> SoManyStreams(Sequence<B> one, Sequence<C> two);
}

This method sends two streaming sequences at once, while receiving one in return. All of these sequences have flow control and a lifetime that is independent of each other and independent of the call to the method SoManyStreams.

  1. It’s Serializable!
    In addition to appearing directly as an argument or return value, a Sequence<T> (like a proxy) can appear as a member of any other serializable [DataContract] type.
1
2
3
4
5
6
7
8
9
10
11
[DataContract]
record struct StructWithStreams(int Id, Sequence<float> Values);

[Eventual]
interface IStillMoreFunWithSequences
{
    Promise<StructWithStreams> GetStruct(StructWithStreams asAnArg);

    Promise<(Sequence<A> TheGood, Sequence<B> TheBad)> 
      EvenMoreStreams(Sequence<C> AndAlso, Sequence<D> TheUgly);
}
  1. It’s Composable!
    A Sequence<T> can even contain other streaming sequences:
1
2
3
4
5
[Eventual]
interface IComposableSequences<T>
{
    Sequence<Sequence<object>> GetAllThatDontContainThemselves();
}

Conclusion

In this post we talked about streaming sequences including Sequence<T> and Bytes. We examined some of the issues that streaming solves including memory limitations, network latency, traffic minimization, flow control, and backpressure. Lastly, we looked at some unique characterisitcs of the Promise RPC library’s implementation of streaming sequences.

This is Part 4 of our look at the MSC RPC system. In the next and final part, we’ll look at channel lifetime, aborts and cancellation. Until next time, code on!

Previous

Read the previous post in this series.

Feedback

Write us with feedback.

See Also