Below are resources for this release:

Apache Fluo follows semver for its API . The API consists of everything under the org.apache.fluo.api package. Code outside of this package can change at any time. If your project is using Fluo code that falls outside of the API, then consider initiating a discussion about adding it to the API.

Significant changes

The major changes in 1.1.0 are highlighted here, for the complete list of changes, see the 1.1.0 Milestone on Github.

New API for configuring observers.

The 1.0.0 API for providing Observers required configuring an Observer class for each observed column. This API was cumbersome to use and made using lambdas impossible. For #816 a better API was introduced. The new API only requires configuring a single class that provides all Observers. This single class can register lambdas to observe a column. This new API makes writing Fluo applications faster and easier. Below is an example of using the new API to register two observers that compute the number of URLs that reference a document.

public class AppObserverProvider implements ObserverProvider {

  private static final Column DOC_CURR_COL = new Column("doc", "curr");
  private static final Column DOC_NEW_COL = new Column("doc", "new");
  private static final Column DOC_URL_CHANGE = new Column("doc", "urlChange");
  private static final Column DOC_REF_COUNT_COL = new Column("doc", "refCount");

  // Each Fluo worker will call this method to create the observers it needs.
  @Override
  public void provide(Registry registry, Context ctx) {
    // This could be used to pass application specific configuration to observers. Its not used in
    // this example.
    SimpleConfiguration appConfig = ctx.getAppConfiguration();

    // Register an observer that processes new content for a document.
    registry.forColumn(DOC_NEW_COL, STRONG).useObserver(new ContentObserver());

    // Register a lambda that processes notifications for the column DOC_URL_CHANGE.
    registry.forColumn(DOC_URL_CHANGE, WEAK).useStrObserver((tx, myUrl, col) -> {

      // Compute the number of URLs that refer to this URL.
      CellScanner refScanner = tx.scanner().over(Span.exact(myUrl, new Column("ref"))).build();
      int numRefs = Iterables.size(refScanner);

      // Do something interesting with count.  This is not interesting, but keeps the example short.
      tx.set(myUrl, DOC_REF_COUNT_COL, numRefs + "");
    });
  }

  /**
   * Compute the change in a documents URLs and propagate those to other documents.
   */
  private static class ContentObserver implements StringObserver {
    @Override
    public void process(TransactionBase tx, String myUrl, Column col) throws Exception {

      // Retrieve the new and current document content.
      Map<Column, String> colVals = tx.gets(myUrl, DOC_CURR_COL, DOC_NEW_COL);

      String newContent = colVals.getOrDefault(DOC_NEW_COL, "");
      String currContent = colVals.getOrDefault(DOC_CURR_COL, "");

      // Extract the URLs in the new and current document content.
      Set<String> newUrls = extractUrls(newContent);
      Set<String> currUrls = extractUrls(currContent);

      // For URLs that only exist in new content, update the document they reference.
      Sets.difference(newUrls, currUrls).forEach(url -> {
        tx.set(url, new Column("ref", myUrl), "");
        tx.setWeakNotification(url, DOC_URL_CHANGE);
      });

      // For URLs that are no longer present, update the document they reference.
      Sets.difference(currUrls, newUrls).forEach(url -> {
        tx.delete(url, new Column("ref", myUrl));
        tx.setWeakNotification(url, DOC_URL_CHANGE);
      });

      // Update the current document content.
      tx.set(myUrl, DOC_CURR_COL, newContent);
      tx.delete(myUrl, DOC_NEW_COL);
    }

    private Set<String> extractUrls(String newContent) {
      // TODO implement extracting URLs from content
      return null;
    }
  }
}

Before initializing a Fluo App, the ObserverProvider above would be added to configuration as follows.

FluoConfiguration fluoConf = ...
fluoConf.setObserverProvider(AppObserverProvider.class);

// initialize Fluo app using fluoConf

The old API is still present but has been deprecated and may be removed in the future.

Improved Fluo scalability

In the previous release each worker scanned the entire table looking for notifications that hashed to it. This strategy for finding notifications is O(N*W) where N is the number of notification and W is the number of workers.

For #500 a different strategy was used to find notifications. Workers divide themselves into groups and each group scans a subset of the table for notifications. Every worker in a group scans the groups entire subset of a table looking for notifications that hash to it. The new strategy results in O(N*G) work where N is the number of notifications and G is the group size. By default the group size is 7, but this is configurable. The group size may need to be increased if portion of a table is popular and assigned to one group that can not processes it.

To compare the old and new ways assume we have 109 notifications and 100 workers. The old method would have scanned 1011 entries to to find all notifications. Assuming a group size of 7, the new strategy scans 7 * 109 entries to find all notifications. A nice feature of the new strategy is that the amount of scanning is independent of the number of workers. For the old strategy if the number of workers increases by factor of 10, then the amount scanning will increase by a factor of 10. So for 1,000 workers the old strategy would scan 1012 entries to find all notifications. The new strategy will still only scan 7 * 109 entries with 1,000 workers.

Improved Bytes

Fluo’s API has an immutable wrapper for a byte array. Multiple improvements were made to this byte wrapper.

  • startsWith(Bytes) and endsWith(Bytes) methods were added in #823
  • A copyTo(byte[]) method was added for #827
  • Internal performance improvements were made in #826 and #799

Improved Spark integration

For #813 improvements were made that allow easy passing of FluoConfiguration objects to remote Spark processes.

Testing

Long runs of Stresso and Webindex were successfully completed on EC2 using multiple nodes.