A reactive crawler using Amygdala

Kragen Javier Sitaker, 2014-09-02 (updated 2014-09-19) (4 minutes)

This is a design for a reactive programming system optimized for web crawling, to be implemented in Java. We want it to be as simple as possible to write a web-crawling application that transmits updates when it has them — say, to an IRC channel, or a webhook.

Suppose we want to send the Bitstamp Bitcoin price, translated into Euros, to an IRC channel, but only when it changes by more than 0.5%, and without hitting any of the web servers more often than once every 20 minutes. We'd like to be able to write code like this:

import org.canonical.amygdala.UserAgent;
import org.canonical.amygdala.Page;
import org.canonical.amygdala.DBVar;
import org.canonical.amygdala.DB;

// ...

final DBVar dbVar = DB.var("Ⓑ in €");
start("compute Ⓑ in €", new Runnable() {
    public void run() {
        String ratesUrl =
            "http://www.x-rates.com/calculator/?from=EUR&to=USD&amount=1",
            bitstamp = "https://www.bitstamp.net/";
        // Incapsula lets Googlebot access pages without having to 
        // execute JS.  Bitstamp uses Incapsula.
        UserAgent ua = UserAgent.create("Mozilla/5.0 (compatible; "
            + "Googlebot/2.1; +http://www.google.com/bot.html)");
        Page exchangeRates = ua.get(ratesUrl).maxAge(minutes(20)),
             bitstamp = ua.get(bitstamp).maxAge(minutes(20));
        // Extract floating-point numbers with regular expressions.
        Float bitcoinPrice =
                bitstamp.extFloat("class=\"last\">$([\\d.]+)<"),
            euroPrice = 
                exchangeRates.extFloat("class=\"ccOutputRslt\">([\\d.]+)<"),
            bitcoinInEuros = bitcoinPrice / euroPrice;
        dbVar.setFloat(bitcoinInEuros);
    }
});

start("irc announcement", new Runnable() {
    public void run() {
        DBVar last = DB.var("last Ⓑ in €");
        float newVar = dbVar.get().float(),
        Float oldValue = last.get().floatOption();

        if (oldValue == null
          || Math.abs(oldValue - newValue) / newValue > 0.005) {
            ircChan.say("Ⓑ = €" + newValue);
            last.set(newValue);
        }
    }
});

and have it "just work", which in this case involves fetching the requested pages, reinvoking the runnable once it has them, and reinvoking it again every time it has new versions of those pages.

Amygdala needs to run the first runnable once simply in order to find out what URLs it will try to fetch — but since it hasn't fetched any pages yet, the first attempt to use regexps to extract floats from the not-yet-fetched pages will raise an exception. But the page fetches will have been initiated, and the runnable will be reinvoked once one or the other is available.

Invoking get() on a DBVar notes that DBVar as a read-dependency of the current task; invoking float() on the result will return the float stored there. There are three possibilities:

So the first time the "irc announcement" runnable is invoked, it will fail the transaction because "Ⓑ in €" has no value yet; if and when a value is stored there, it will be rerun, and then it will also read, and write, "last Ⓑ in €".

I don't quite know what the right handling of the ircChan.say call is — it could either check to verify that speaking on the channel is possible (and abort the transaction if not), waiting to actually transmit until the transaction as a whole commits; or it could simply transmit from the middle of the transaction, with the unfortunate result that retrying the transaction could result in duplicate messages on the channel.

The maxAge ensures that we never fetch the pages more often than once every 20 minutes. But as long as the task is running, the fetcher will continue fetching those pages whenever they become stale.

This means Amygdala supports both "push" tasks, like those above, and "pull" tasks, which only run as long as something is listening to them. (Actually, I think that's not quite true; perhaps everything here can be "pull" from the IRC channel?)

Topics