Efficiently querying a log of everything that ever happened

Kragen Javier Sitaker, 2015-09-03 (7 minutes)

I’ve been designing a new chat/email UI called Desbarrerarme, and one of the design approaches I want to try out is computing the UI state as mostly a pure function of the input history of the system.

One of the things I’ve been thinking about for Desbarrerarme is querying. By far the nicest existing way I’ve found to do normal queries is Prolog/Datalog; it’s dramatically better than SQL or Tutorial D. (Binate may be better, but it’s unfinished.). However, it’s not totally clear to me how to do aggregate queries, in particular “top N” queries.

I’ve been struggling with this for a long time; see e.g. http://stackoverflow.com/questions/1467898/what-language-could-i-use-for-fast-execution-of-this-database-summarization-task. (“Top 5 scores and their dates for each player”; one response: “Top-N is a well-known database killer. As shown by the post above, there is no way to efficiently express it in common SQL.”)

Here are some example queries I’ve been thinking about for Desbarrerarme:

  1. What are the N most recently updated threads (i.e. threads with the most recent messages)? Because those are the ones we want to show on the screen.
  2. What are the participants and the most recent message in each of the N most recently updated threads? Because that’s what we want to display on the left side.
  3. What are the N most recent messages in thread X? Because those are the ones we want on the screen.
  4. What are the N most recently updated threads I haven’t tagged as to-archive?
  5. What are the N next scheduled reminders after the beginning of today that I haven’t marked as taken-care-of?
  6. What are the N best threads that contain the words “apache” and “spark”, case-insensitively, where “best” is some kind of combination of recency and TF/IDF?

If we have a single relation messages with the columns when, thread, from, body, and some similar thing for reminders, it’s apparent how to compute each of these inefficiently. We can calculate #2, for example, like this:

participants = {}
last_message = {}
top_n = []
for when, thread, from_, body in sorted(messages):
    if thread in top_n:
        top_n.pop(top_n.index(thread))
    else:
        if len(top_n) >= n:
            top_n.pop(0)
    top_n.append(thread)

    if thread not in participants:
        participants[thread] = set([from_])
    else:
        participants[thread].add(from_)

    last_message[thread] = (when, body)

for thread in top_n:
    yield participants[thread], last_message[thread]

This has three big disadvantages:

  1. It’s a lot of code. The English version of the question was 18 words; the Python version above is 18 lines.
  2. That code is poorly abstracted, so it isn’t reusable. For example, it’s very likely that this isn’t the only place we’d like to know the participants of a chat.
  3. It’s inefficient. It inherently needs to traverse the entire messages relation, which might be many gigabytes, and it accumulates a potentially large participants relation in memory because it doesn’t know at the outset which threads are going to be included in the final result.

Problems #2 and #3 are interrelated. For example, if we could freeze the whole execution state of the loop partway through, then we could incrementally recompute its final state if new messages were added with a later date.

Better still, in this case, you could break the computations down into a monoid tree — participants can be computed from results over subsets of the data set simply using the union operation, while top_n could be computed from results over subsets of the data set by concatenating those results (ordered by timestamps) and truncating to the last N. Abstracting the operations to this level would allow them to be not only recomputed incrementally after updates, but also parallelized and made fault-tolerant. Also, the top_n result is probably somewhat lazy — it’s probably possible to compute the most recently updated 10 or 20 chats without looking further back than 1–30 days.

Ideally, we could find a set of language primitives to express queries like these that would make the query simple to express. Something like maybe

top = max(when) by thread | sortby when desc | limit n
participants = distinct(project(thread, from))
latest = max(when, body) by thread
(participants naturaljoin latest) if thread ∈ top.thread

Or, I don’t know, something still more concise. And ideally that set of language primitives would expose enough structure to a runtime to allow the orthogonal specification or even inference of optimizations like the ones suggested above; and ideally it would also, like the above, make it straightforward to reuse the definitions for other things.

Also, if it’s not too much to ask, generalize the materialized views that are used to speed up incremental recomputation so that they can be used for a larger range of queries than just the current ones. That is, maybe I only asked for the latest 12 chats, but maybe it would be prudent to calculate the latest 24 or 64 if that isn’t harder; and maybe latest only contains when and body here, but maybe we should also keep from just in case.

Additionally, I’d like to get the results from these queries incrementally, so that the UI isn’t frozen while the query is being evaluated.

This seems like it might be related to Spark’s concept of “resilient distributed datasets”. I need to read more.

Faster brute force

An alternative approach to caching and materializing a bunch of shit is trying to make brute-force query evaluation sufficiently fast. Considering this messages thing again, suppose I had 10 messages per second for the last 20 years. (Busy IRC channels.) That’s 6.3 billion messages. But maybe there are less than 65536 participants, so participants can be identified by 16-bit numbers; so the participants column there is only 12 gigabytes, and it’s probably highly enough compressible with LZ4 to fit into RAM easily. Similarly, suppose we’re using 64-bit microseconds for when; now we have 50 gigabytes of timestamps, but again, they probably fit into RAM easily with LZ4. The message bodies won’t, but they’ll fit on a small SSD, and will probably compress enough to hit RAM-like speeds for scanning.

And the above query, if the query optimizer is lucky enough to pick a reasonable evaluation order, should only need to traverse the tail end of the when and thread columns before hitting n, and then the entire from column. Still, this is probably not going to get us to subsecond response times.

Topics