Fluo 1.0.0-alpha-1 is the first official release of Fluo. It contains 83 initial tickets marked for this first milestone. The roadmap for this release included a working initial implementation, completing an initial pass at the client API (which will be finalized in 1.0.0), and writing a stress test application. This release has not been tested much at scale and is not ready for production use. More scale and performance testing will be done for the beta release.
Below are resources for this release:
- Download the Fluo binary tarball for 1.0.0-alpha-1 from GitHub.
- View the documentation for this release.
- Read the Javadocs.
- The Fluo jars have been deploy to Maven Central.
Fluo is an implementation of Google’s percolator paper, which adds large-scale incremental processing of data using distributed transactions and notifications. It runs on YARN using Twill and stores its data in Accumulo.
Significant features
This alpha release contained a lot of initial features to help establish the basic implementation and client API. All of these features will be expected to become more robust over time.
MiniFluo
MiniAccumulo and MiniFluo make it easy to start a local Fluo instance for integration testing. MiniFluo also makes it easy for developers to experiment with Fluo locally without installing it on an Accumulo cluster.
Garbage Collection
Without garbage collection, the columns in a Fluo table would grow without bound as updates are applied over time. Because of this, it’s important to clean up old fully-processed (and no longer being read) transaction data. The initial implementation of garbage collection kept N versions of columns. Fluo-8 added the ability to garbage collect all column versions that are no longer being read by transactions anywhere else in the cluster.
MapReduce InputFormat
The FluoInputFormat allows a specific snapshot to be read into a mapreduce job. It was established in Fluo-7.
FluoFileOutputFormat and FluoOutputFormat
The FluoFileOutputFormat enables the bulk ingest of a Fluo table using mapreduce by creating the Accumulo r-files in HDFS. The FluoOutputFormat pushes keys directly into the Accumulo tablet servers through the client API. Fluo-35 added this feature.
Fluo Workers and Oracle running in Yarn
Fluo can use Yarn to manage fault tolerance and guarantee multiple worker processes and some number of Oracle processes are kept running across the cluster. Apache Twill was used for the Yarn integration. Fluo-11 added an application that will deploy Fluo to a Yarn cluster.
Fluo API
Fluo provides a fluent-builder API for creating transactions to read and mutate data of various different data types in the Fluo table. This API was finalized in Fluo-23 and Fluo-98 when it was moved into its own module. It includes a type layer (finished in Fluo-153) that makes it easier to work with different types, default values, null return types, and row/column maps.
Stress test
A stress test was created that uses a tree to force collisions on transactions trying to mutate multiple columns at the same time. This was established in Fluo-24.
Other important improvements and features:
- Fluo-1 - Create scripts for starting/stopping services
- Fluo-3 - use Curator for leader election and leader discovery
- Fluo-13 - Register clients that perform transactions in Zookeeper
- Fluo-15 - Leader election for the Oracle Server
- Fluo-16 - Allow observer notification to be configured in AbstractObserver class
- Fluo-89 - OracleApp should allow configurable number of instances
- Fluo-98 - Create a Range class for Fluo
- Fluo-123 - Hide implementation of Bytes in the API
- Fluo-130 - Expose MiniFluo in public API
- Fluo-131 - Make LoaderExecutor implement closeable
- Fluo-147 - Create fluo-accumulo module for Accumulo iterators and formatter
- Fluo-175 - Add newTransaction() method to FluoClient
- Fluo-186 - Add Exceptions to API
Testing
All unit and integration tests passed. Two stress tests were run successfully on a 20-node cluster with one test generating 2000 integers and another generating 5000 integers. The 5000 integer test took about 10 minutes to run after the MapReduce load of integers was complete.
Current Status of Percolator Implementation
The following tables identify the features necessary for a full implementation of Percolator and the current state of Fluo with regards to the implementation. For alpha, a test to exercise some of these features at scale was created, however, it was not run at scale for long periods. One of the goals for beta is to have at least a 24hr run of the scale test, on at least 20 nodes, succeed.
Necessary features:
Percolator Feature | Details | Testing |
---|---|---|
ACID-compliant cross-row transactions | Provides distributed transactions with ACID guarantees that work across Accumulo rows. Rows are distributed across tablet servers. | IT,CST,#2401 |
Observers | Perform transactions by sending read/write RPCs to Accumulo tablet servers, which in turn send read/write RPCs to HDFS. | IT,CST |
Worker Process | Scans Accumulo for changed columns and invokes necessary observers as function calls on the worker. | IT,CST |
Timestamp Oracle | Provides strictly increasing timestamps; a property required for correct operation of the snapshot isolation protocol. | IT,CST |
Oracle Failover | When an active Oracle process dies, timestamps should be served from a different Oracle. They should never go backwards. | IT,#37,#2411 |
Lightweight Lock Service | Makes searches for dirty notifications more efficient. Zookeeper is used in this case. Should be low-latency, replicated, balanced, and persist to a backing store. | IT,CST |
Multithreaded Transactions | Allows highly parallel execution of transactions and synchronization only during get() and commit() upon which transactions actually collide. | IT,CST |
Snapshot Isolation | Non-serializable isolation mostly implemented through MVCC provided through Accumulo’s timestamp portion of the key. | IT,CST,#2401 |
Roll-back / Roll-forward semantics | Failures during the first phase of commit cause transactions to be rolled back while failures during the second phase can cause them to be rolled forward. | IT,#2401 |
1 This feature has been at least partially tested.
Performance enhancements:
Enhancement | Details | Testing |
---|---|---|
Locality Group for NOTIFY column | Add an extra index in the underlying r-files for the notify column where it can be scanned faster by workers. | IT,CST |
Worker scan conflict resolution | Workers register random tablets they choose to scan. When two workers generate the same tablet, one of them re-generates a new random tablet. | #5 |
Worker advertisement | Workers advertise through the lock service so that the system can know if they failed and any transactions upon which they are working can be rolled back or rolled forward | IT |
Oracle batching | Timestamps get allocated in batch to workers. Each worker should share one connection to Oracle to limit RPCs across threads | IT,CST |
Weak Notifications | Allows notification of a cell without the possibility of transactional conflict through a special type of column that can be notified but not written to | IT |
Conditional Mutations | Allows READ-MODIFY-WRITE of a mutation in a single RPC. | IT,CST |
Conditional Mutation batching | Delay sending of conditional mutations destined for same tablet server so they can be batched. Lowers number of RPCs, adds latency but provides better parallelism. | IT,CST |
Read batching | Delaying of read operations on tablet server for batching | #138 |
Prefetching & read-ahead caching | Prediction made each time a column is read, which other columns will be read in the same transaction based on previous behavior. | #12 |
- IT = Integration tests, CST = Cluster stress tests
- Testing columns providing a ticket number have not yet been implemented.
Quickstart
If you are new to Fluo, the best way to get started is to follow the quickstart which starts a local Fluo instance (called MiniFluo). By using MiniFluo, you can avoid following the steps below to run your own Fluo instance.
Running a Fluo instance
In order to run a Fluo instance, you will need Accumulo (version 1.6+), Hadoop (version 2.2+), and Zookeeper installed and running on your local machine or cluster.
Fluo distributions are built for specific releases of Hadoop and Accumulo. If you are using Accumulo 1.6.1 and Hadoop 2.3.0, you can download the 1.0.0-alpha-1 tarball. Otherwise, you will need to clone the Fluo repo and build a distribution tarball of 1.0.0-alpha-1 for your environment:
The distribution tarball will be created in modules/distribution/target
When you have a Fluo distribution, install it to a directory:
Next, follow the instructions to configure and run Fluo in the README.
Running Fluo applications
Once you have Fluo installed and running on your cluster, you can now run Fluo applications. Fluo applications consist of clients and observers. If you are new to Fluo, consider first building and running the phrasecount application on your Fluo instance. Otherwise, you will need to include the following in your Maven pom and follow the steps below to create your own Fluo client or observer:
Creating a Fluo client
To create a FluoClient, you will need all properties with the io.fluo.client.*
prefix set in the configuration that is passed
to the newClient()
method of FluoFactory. See fluo.properties for details on these properties.
Creating a Fluo observer
To create an observer, follow these steps:
- Create a class that extends AbstractObserver.
- Build a jar containing this class and include this jar in
lib/observers
of your Fluo installation. - Configure your Fluo instance to use this observer by modifying the Observer section of fluo.properties.
- Restart your Fluo instance so that your Fluo workers load the new observer.