Service-oriented email

Kragen Javier Sitaker, 2017-06-20 (updated 2017-06-21) (15 minutes)

What would a REST or REST-plus-invalidations system for my mail look like?

You could plausibly argue that this is the wrong question to ask, because really the right architectural style is something else — something more like Kafka, which gives you a way to safely resend operations that may have side effects, and to incrementally replicate large databases. But let’s leave that aside for the moment.

At the bottom of the stack, we have three kinds of resources: a mbox file (which we probably need invalidation notifications, quick ETags, and byte-range fetches for), some kind of metadata store for tags and marks, and an index listing all the mboxes. Everything else is built on top of that.

The mbox-file resource can be provided by a service that keeps open an ssh connection and sends commands over it, or several ssh connections, or with rsync, or local file access, or whatever. Probably the most sensible way to do it would be to tunnel REST requests over an ssh connection to a remote REST service accessing a local file.

The metadata store is probably local and probably can be a simple key/value store.

A layer up, we convert blocks of the mboxes into resources of their own. The interface here is something like

GET /size?url=foo/bar/baz
GET /block?url=foo/bar/baz
          &start=180052992
          &bytes=1048576

/size and /block are cacheable stateless microservices that merely transform ordinary GET requests into GET-with-byte-range and HEAD requests.

Ideally the mbox blocks would be large enough to be large compared to the bandwidth-delay product, but small enough to amount to a tolerable delay when they are fetched. This is infeasible when my latency is 200+ ms, so I probably have to hide the latency as much as possible with parallelism and prefetching. Specifically, my ping time to canonical.org is currently 175–177 ms, my bandwidth at the office is about 1.5 megabytes per second, and a tolerable delay is 100 ms, which means that we want to ensure that 95%+ of our requests can be served from a local cache, which implies very aggressive cache prefetching. Or sucking it up, I guess.

The bandwidth-delay product is about 256 kilobytes, coincidentally almost exactly the bandwidth-delay product of a spinning-rust disk (though about 64 times that of an SSD being read). So, with chunks of 256 kilobytes, we could maintain full bandwidth usage with a parallelism factor of 2; with 128 kilobytes, a parallelism factor of 4; or with 64 kilobytes, a parallelism factor of 8. Probably the best overall compromise is a parallelism factor of 8 and 128-kilobyte chunks. Then, on the rare occasion that we have to suck up a cache miss, the size of the 128-kilobyte chunk will add 85 ms of latency on top of the 175 ms already inherent in the link — not insignificant, but not the main bottleneck.

My current mailbox is 4.3 gigabytes, which will take about 45 minutes to initially download at 1.5 megabytes per second.

Given the blocks, we need to find the messages within, and the message boundaries ("\nFrom " or "\AFrom ") might cross block boundaries. For this we use a stateless boundary-parsing service:

GET /bloxparse?url=/block?url=foo/bar/baz%26start=180052992%26bytes=1048576

This returns three items: an array of (potentially many) byte offsets where definite message boundaries within the block are found, an array of byte offsets before the beginning of the block which could potentially be message boundaries (containing zero or one item; this is if the block begins with something like "rom "), and an array of byte offsets just before the end of the block which could potentially be message boundaries (similarly, containing zero or one item, for cases where the block ends with "\nF" or something.)

If I run this locally, it should be cached pretty aggressively, because I have 181490 message boundaries in those 4.3 gigabytes, so a single 11-byte decimal number or 8-byte binary number stands in for 23 kilobytes of data. This very minimal summary, which can be generated in about 10 CPU seconds (with the bsdmainutils from command, for example), occupies about 2 megabytes and can avoid transferring 4.3 gigabytes.

This /bloxparse service is invoked by an /mboxparse service which manages some amount of concurrency and reconciles the possible offsets near the ends of blocks.

(In a more generalized sense, they aren’t so much possible offsets as sets of candidate finite state machine states: if the finite state machine is in state 3 or 5 at the beginning of this block, then it has found a message boundary; at the end of the block it is in state 2. More generally it is a mapping from input states to output states. But maybe that level of complexity isn’t needed here, since the message-boundary-parsing service is very simple.)

The /mboxparse service takes the same parameters and provides the same result format as /bloxparse, but also takes optional blocksize and nprocs parameters.

However, it isn’t necessary to parse the whole mailbox in order to start parsing message headers and contents from it and processing queries. We can chain two calls to the /block service to fetch a message, the second-invoked one fetching a 128k-aligned block and thus more neatly cacheable, and the other one extracting a message from it according to /mboxparse.

Somewhere nearby we should have a service that reformats /mboxparse results into lists of links to messages, and perhaps another that maps somewhat simple links to messages into redirects to the double-chained /block resource mentioned above:

/msg?mbox=foo/bar/baz
    &start=180053082
redirect to
    /block?url=/block?url=foo/bar/baz
        %26start=180052992
        %26bytes=1048576
    &start=180053082
    &bytes=20330

It would be nice to be able to somehow incrementally list the messages that have been parsed so far, like with some kind of re-rereadable message queue, like Kafka. Lacking such a thing, we can prefetch stuff in the background, I guess.

Once we have some message URLs, we can start parsing those messages so we can index them, both with full-text indexing and with traditional database-column indexing. Perhaps we maintain cached index segments corresponding to segments of the mailbox, and cache merge results for those index segments, re-merging index segments on demand.

A parse request might look like

GET /parse822?url=foo/bar/baz

where foo/bar/baz is something like the /msg? URL mentioned above. It returns some kind of parsed representation of the message headers, perhaps encoded using FlatBuffers; this parsing can be cached. (Maybe /hdr is better?)

A query process might fetch index segments (computed on the fly if necessary) for ever-larger sections of the mailbox, starting from the end, running sort-merge joins on them, perhaps ceasing once it has enough results to fill the screen (or a bit more), and perhaps maintaining a couple of different query frontiers open at once — full-text index segments and header index segments, for example.

The desire to be able to move various kinds of processing to the storage server suggests that it would be best for the service identifiers like /parse822 to be mapped not to processes on machines but to pieces of code, perhaps bundled up with some kind of handle to a persistent state for the services that aren’t purely stateless. Then a sort of query optimizer can try running the service locally, and if that’s taking a long time (e.g. >10ms), try running it remotely as well and use whichever one gets faster results. Some kind of standardized bytecode would be one way to handle such mobile-code services. (Also you could hide this behind a process-on-machine resource.)

All of this somewhat abstracts away the question of bandwidth usage, treating it as an implementation detail of the services, when it kind of isn’t really — if I have a bunch of useless local jobs sucking up all my uplink bandwidth, it’s going to make anything else I try to do slow. You could conceivably pass in a handle to an accounting resource to the service when you invoke it for it to charge its bandwidth and even CPU usage to, which would have the ability to cut it off and cause it to fail fast if it went over its resource limits or if you just no longer cared about the results.

All of this is about reading mail. Sending mail is a different matter; we want to ensure that mail gets sent exactly once. If the client has a private namespace on the server (/outmail/myhostname) it can peremptorily allocate message-IDs within there, assuming it hasn’t suffered an attack of amnesia due to some kind of virtual machine checkpoint and restart (which could cause it to reuse the same message ID), and PUT messages there. As long as PUT is atomic then it’s all good; retries are safe. In HTTP, PUT also has an If-None-Match: * option which can be used to ensure that you never overwrite an existing message, but what do you do in that case? (I guess you can GET the message to see if you previously PUT the same message and forgot about it, or whether it’s a collision.) If there’s some kind of collection indexing, then a mail-sending process on the server can watch the collection index for changes, passing the mail to Postfix or whatever when it sees a new message.

That is, Kafka-like topics aren’t necessary for reliable exactly-once mail sending. HTTP semantics are more than adequate.

Other useful stateless services:

/mime?url=foo/bar/baz

Gives you an index of the MIME structure of the RFC-822 message; the links add more parameters and contain enough data to do the parsing efficiently.

/col?parser=/parse822
    &col=subject
    &items=/msglist?foo

Passes the message URLs in /msglist?foo through /parse822?url=$url and extracts URL-subject pairs (the “subject” “column”) from them.

/invert?from=bar
    &to=baz
    &data=/col?foo

Invokes /col?foo and inverts it, converting values to keys and keys (such as message URLs) to values, and sorting by the new key. Excludes rows whose keys are outside the range from bar to baz. (Maybe the exclusion should be elsewhere?)

/union?data=/a
    &data=/b
    &data=/c

Merges some sorted sequences of key-value pairs. Note that applying /invert with a range to /union gives you a single-field query over a merged index.

/join?data=/a
    &data=/b
    &data=/c

Given some sequences of key-value pairs, this returns one row for each key that occurs in all the datasets, with the values for each dataset concatenated.

By applying /join to some /inverts applied to /inverts with ranges applied to /unions of the appropriate /cols, you can get a multi-field query. If you leave out the /inverts, you get a message index.

Ideally every service should be self-describing in that if you invoke it without all the mandatory parameters, it gives you IDL for how to call it, an HTML form or equivalent. Also, we need some way of assigning media types to URLs and URL input fields to make it easy to plug things together, and a better invocation syntax with less noisy parameters, and supporting URL nesting. Something like Clojure keyword syntax.

Also, both requests and responses should be able to contain capabilities, in order to avoid confused-deputy attacks.

An interactive prompt should promiscuously prefetch stuff as you are composing your request so you can see what you’re doing, since GET is safe. If you do some interactive exploration, you should then be able to go back and factor out some parameters from your script, then turn it into a new microservice.

Minimal implementation

Of course I don’t need all this shit to be able to try it out, and I do need my mail pretty soon. A mini HTTP server with a small number of simple scripts (parse message starts, provide mailbox size and mailbox blocks) that I can tunnel over ssh should be pretty doable. I’d need some special-purpose caching logic but nothing really special. Also mapping some URLs to ssh-tunneled URLs and others to local scripts seems like it should be pretty doable.

In terms of data formats, I can probably get by with space-separated URL-encoded fields with newline record terminators for now. Or Excel CSV, which is probably just as easy, but doesn’t sort properly.

Sorting and caching is probably pretty easily done with LevelDB or maybe Redis. Or both. LevelDB on my laptop can handle about 300k record insertions per second, so about half a second to sort the subjects of all my messages. /bin/sort sorts my 100k-line ~/netbook-misc-devel/bible-pg10.txt in 770 ms, or 420 ms with LANG=C, which is the same speed (even with -S 1G). It seems like it should be possible to go faster since that file is only 4.4 megabytes; LANG=C wc takes 100ms.

As another sample computation, time grep -a '^Subject: ' adjuvant-mbox.1g | wc finds 43893 subject lines in 1073741824 bytes (1 GiB) containing 41630 message starts in 11.5 seconds. Oh wait, it takes 740ms if it's already in memory, then another 740ms if I pipe it through sort. The total time for grep | sort | wc drops to 1000ms with LANG=C, which mostly affects sort.

HTTP/2 supports out-of-order responses over a single connection, so using HTTP/2 would avoid the need to use multiple HTTP connections. The quasi-required encryption would probably hurt performance, but it isn’t really required in the standard.

I should check out Hyper the Terminal and see if it has anything interesting (no, not for this). And maybe Spark and Samza.

Topics