Fluo 1.0.0-alpha-1 is the first official release of Fluo. It contains 83 initial tickets marked for this first milestone. The roadmap for this release included a working initial implementation, completing an initial pass at the client API (which will be finalized in 1.0.0), and writing a stress test application. This release has not been tested much at scale and is not ready for production use. More scale and performance testing will be done for the beta release.

Below are resources for this release:

Fluo is an implementation of Google’s percolator paper, which adds large-scale incremental processing of data using distributed transactions and notifications. It runs on YARN using Twill and stores its data in Accumulo.

Significant features

This alpha release contained a lot of initial features to help establish the basic implementation and client API. All of these features will be expected to become more robust over time.

MiniFluo

MiniAccumulo and MiniFluo make it easy to start a local Fluo instance for integration testing. MiniFluo also makes it easy for developers to experiment with Fluo locally without installing it on an Accumulo cluster.

Garbage Collection

Without garbage collection, the columns in a Fluo table would grow without bound as updates are applied over time. Because of this, it’s important to clean up old fully-processed (and no longer being read) transaction data. The initial implementation of garbage collection kept N versions of columns. Fluo-8 added the ability to garbage collect all column versions that are no longer being read by transactions anywhere else in the cluster.

MapReduce InputFormat

The FluoInputFormat allows a specific snapshot to be read into a mapreduce job. It was established in Fluo-7.

FluoFileOutputFormat and FluoOutputFormat

The FluoFileOutputFormat enables the bulk ingest of a Fluo table using mapreduce by creating the Accumulo r-files in HDFS. The FluoOutputFormat pushes keys directly into the Accumulo tablet servers through the client API. Fluo-35 added this feature.

Fluo Workers and Oracle running in Yarn

Fluo can use Yarn to manage fault tolerance and guarantee multiple worker processes and some number of Oracle processes are kept running across the cluster. Apache Twill was used for the Yarn integration. Fluo-11 added an application that will deploy Fluo to a Yarn cluster.

Fluo API

Fluo provides a fluent-builder API for creating transactions to read and mutate data of various different data types in the Fluo table. This API was finalized in Fluo-23 and Fluo-98 when it was moved into its own module. It includes a type layer (finished in Fluo-153) that makes it easier to work with different types, default values, null return types, and row/column maps.

Stress test

A stress test was created that uses a tree to force collisions on transactions trying to mutate multiple columns at the same time. This was established in Fluo-24.

Other important improvements and features:

  • Fluo-1 - Create scripts for starting/stopping services
  • Fluo-3 - use Curator for leader election and leader discovery
  • Fluo-13 - Register clients that perform transactions in Zookeeper
  • Fluo-15 - Leader election for the Oracle Server
  • Fluo-16 - Allow observer notification to be configured in AbstractObserver class
  • Fluo-89 - OracleApp should allow configurable number of instances
  • Fluo-98 - Create a Range class for Fluo
  • Fluo-123 - Hide implementation of Bytes in the API
  • Fluo-130 - Expose MiniFluo in public API
  • Fluo-131 - Make LoaderExecutor implement closeable
  • Fluo-147 - Create fluo-accumulo module for Accumulo iterators and formatter
  • Fluo-175 - Add newTransaction() method to FluoClient
  • Fluo-186 - Add Exceptions to API

Testing

All unit and integration tests passed. Two stress tests were run successfully on a 20-node cluster with one test generating 2000 integers and another generating 5000 integers. The 5000 integer test took about 10 minutes to run after the MapReduce load of integers was complete.

Current Status of Percolator Implementation

The following tables identify the features necessary for a full implementation of Percolator and the current state of Fluo with regards to the implementation. For alpha, a test to exercise some of these features at scale was created, however, it was not run at scale for long periods. One of the goals for beta is to have at least a 24hr run of the scale test, on at least 20 nodes, succeed.

Necessary features:

Percolator Feature Details Testing
ACID-compliant cross-row transactions Provides distributed transactions with ACID guarantees that work across Accumulo rows. Rows are distributed across tablet servers. IT,CST,#2401
Observers Perform transactions by sending read/write RPCs to Accumulo tablet servers, which in turn send read/write RPCs to HDFS. IT,CST
Worker Process Scans Accumulo for changed columns and invokes necessary observers as function calls on the worker. IT,CST
Timestamp Oracle Provides strictly increasing timestamps; a property required for correct operation of the snapshot isolation protocol. IT,CST
Oracle Failover When an active Oracle process dies, timestamps should be served from a different Oracle. They should never go backwards. IT,#37,#2411
Lightweight Lock Service Makes searches for dirty notifications more efficient. Zookeeper is used in this case. Should be low-latency, replicated, balanced, and persist to a backing store. IT,CST
Multithreaded Transactions Allows highly parallel execution of transactions and synchronization only during get() and commit() upon which transactions actually collide. IT,CST
Snapshot Isolation Non-serializable isolation mostly implemented through MVCC provided through Accumulo’s timestamp portion of the key. IT,CST,#2401
Roll-back / Roll-forward semantics Failures during the first phase of commit cause transactions to be rolled back while failures during the second phase can cause them to be rolled forward. IT,#2401

1 This feature has been at least partially tested.

Performance enhancements:

Enhancement Details Testing
Locality Group for NOTIFY column Add an extra index in the underlying r-files for the notify column where it can be scanned faster by workers. IT,CST
Worker scan conflict resolution Workers register random tablets they choose to scan. When two workers generate the same tablet, one of them re-generates a new random tablet. #5
Worker advertisement Workers advertise through the lock service so that the system can know if they failed and any transactions upon which they are working can be rolled back or rolled forward IT
Oracle batching Timestamps get allocated in batch to workers. Each worker should share one connection to Oracle to limit RPCs across threads IT,CST
Weak Notifications Allows notification of a cell without the possibility of transactional conflict through a special type of column that can be notified but not written to IT
Conditional Mutations Allows READ-MODIFY-WRITE of a mutation in a single RPC. IT,CST
Conditional Mutation batching Delay sending of conditional mutations destined for same tablet server so they can be batched. Lowers number of RPCs, adds latency but provides better parallelism. IT,CST
Read batching Delaying of read operations on tablet server for batching #138
Prefetching & read-ahead caching Prediction made each time a column is read, which other columns will be read in the same transaction based on previous behavior. #12
  • IT = Integration tests, CST = Cluster stress tests
  • Testing columns providing a ticket number have not yet been implemented.

Quickstart

If you are new to Fluo, the best way to get started is to follow the quickstart which starts a local Fluo instance (called MiniFluo). By using MiniFluo, you can avoid following the steps below to run your own Fluo instance.

Running a Fluo instance

In order to run a Fluo instance, you will need Accumulo (version 1.6+), Hadoop (version 2.2+), and Zookeeper installed and running on your local machine or cluster.

Fluo distributions are built for specific releases of Hadoop and Accumulo. If you are using Accumulo 1.6.1 and Hadoop 2.3.0, you can download the 1.0.0-alpha-1 tarball. Otherwise, you will need to clone the Fluo repo and build a distribution tarball of 1.0.0-alpha-1 for your environment:

git clone https://github.com/fluo-io/fluo.git
cd fluo/
git checkout 1.0.0-alpha-1
mvn package -Daccumulo.version=1.6.1-SNAPSHOT -Dhadoop.version=2.4.0

The distribution tarball will be created in modules/distribution/target

When you have a Fluo distribution, install it to a directory:

tar xzf fluo-distribution-1.0.0-alpha-1-bin.tar.gz

Next, follow the instructions to configure and run Fluo in the README.

Running Fluo applications

Once you have Fluo installed and running on your cluster, you can now run Fluo applications. Fluo applications consist of clients and observers. If you are new to Fluo, consider first building and running the phrasecount application on your Fluo instance. Otherwise, you will need to include the following in your Maven pom and follow the steps below to create your own Fluo client or observer:

<dependency>
  <groupId>io.fluo</groupId>
  <artifactId>fluo-api</artifactId>
  <version>1.0.0-alpha-1</version>
</dependency>
<dependency>
  <groupId>io.fluo</groupId>
  <artifactId>fluo-core</artifactId>
  <version>1.0.0-alpha-1</version>
  <scope>runtime</scope>
</dependency>

Creating a Fluo client

To create a FluoClient, you will need all properties with the io.fluo.client.* prefix set in the configuration that is passed to the newClient() method of FluoFactory. See fluo.properties for details on these properties.

Creating a Fluo observer

To create an observer, follow these steps:

  1. Create a class that extends AbstractObserver.
  2. Build a jar containing this class and include this jar in lib/observers of your Fluo installation.
  3. Configure your Fluo instance to use this observer by modifying the Observer section of fluo.properties.
  4. Restart your Fluo instance so that your Fluo workers load the new observer.