Apache Fluo is a distributed processing system that lets users make incremental updates to large data sets. With Apache Fluo, users can set up workflows that execute cross node transactions when data changes. These workflows enable users to continuously join new data into large existing data sets without reprocessing all data. Apache Fluo is built on Apache Accumulo.

Below are resources for this release:

Apache Fluo follows semver for its API . The API consists of everything under the org.apache.fluo.api package. Code outside of this package can change at any time. If your project is using Fluo code that falls outside of the API, then consider initiating a discussion about adding it to the API.

Notable changes

The major changes in 1.2.0 are highlighted here, for the complete list of changes, see the 1.2.0 Milestone on Github.

Multiple ways of running applications

Before 1.2.0, Fluo applications could only be run in YARN or as local processes. For 1.2.0, the Fluo tarball was refactored to support other ways of running Fluo applications such as Docker, Kubernetes & Marathon. This refactoring has the following notable changes:

  • The Fluo tarball is now designed for only initializing Fluo and running local processes.
  • The fluo.properties configuration file was deprecated and was replaced by fluo-conn.properties and fluo-app.properties.
    • fluo-conn.properties contains connection properties and is shared by Fluo applications
    • fluo-app.properites contains application-specific properties and is only used during initialization of Fluo applications.
  • Code for launching Fluo applications in YARN was moved to its own project that has its own release tarball. The Fluo tarball can still be used to launch Fluo applications in YARN if a fluo.properties is created from fluo.properties.deprecated.
  • Fluo application jars are now stored in HDFS and application configuration is now stored in Zookeeper. This information used to be stored in a local directory. This change made running Fluo different ways (like YARN or Docker) possible. The fluo get-jars command (added in db0bdad) can be used to retrieve application jars.
  • Fluo configuration options can now be specified on the command line with -o option. This enables passing Fluo options on the Docker command line.
  • Fluo scripts now support passing Java options. In Docker, its useful to have control over the amount of memory Java uses. This can be done by setting the FLUO_JAVA_OPTS env variable on the Docker command line. This was added in commit 4207df4.

Read the quickstart documentation to learn how to run Fluo applications using these new methods.

Read locks.

The Percolator paper stated that read locks were expensive and usually not needed. Therefore in Percolator reads did not acquire a read lock. This assessment is correct, not every read should acquire a read lock. However, offering the ability to optionally obtain a read lock makes writing certain applications much simpler. So in this release of Fluo, optional read locks were added. Below is an example of how to acquire read locks.

  void addEdge(FluoClient client, String node1, String node2) {
    try(Transaction tx = client.newTransaction()) {

      // These reads acquire a read lock.  Any concurrent changes will cause this
      // transaction to fail.
      String a1 = tx.withReadLock().gets(node1, new Column("node","alias"));
      String a2 = tx.withReadLock().gets(node2, new Column("node","alias"));

      tx.set("e:"+a1+":"+a2, new Column("edge", "exists"), "Y");
    }
  }

  void setAlias(FluoClient client, String node, String newAlias) {
    try(Transaction tx = client.newTransaction()) {
      String oldAlias = tx.gets(node, new Column("node","alias"));
      tx.set(node, new Column("node","alias"), newAlias);

      updateExistingEdges(oldAlias, newAlias);
    }
  }

Concurrent calls to addEdge(client,"n1","n2") and addEdge(client,"n1","n3") can run without issue. However, concurrent calls to addEdge(client,"n1","n2") and setAlias(client, "n1","a5") will result in a collision. If addEdge did not obtain a read lock, then it would not collide with setAlias. If addEdge obtained a write lock, then concurrent calls to addEdge could needlessly collide.

See the withReadLock javadoc for more information.

Simplified Scanner API

Range scanning API was simplified in c737df6. Before, scanning over a row was :

CellScanner scanner = snapshot.scanner().over(Span.exact("row")).build();

After this change, it can be written as :

CellScanner scanner = snapshot.scanner().over("row").build();

Streams

In 2f11089 support for Java 8 streams was added. The following computes unique columns in rows with the prefix abc.

Set<Column> cols = snapshot.scanner().overPrefix("abc").build()
    .stream().map(RowColumnValue::getColumn).collect(Collectors.toSet());

Remove command.

Can now remove applications data in Zookeeper and Accumulo. Work on this issue was done in #999. Before data had to be manually removed from Zookeeper and Accumulo.

FluoConfiguration Javadoc

Most of the methods in FluoConfiguration get or a set a specific property. Before it was not easy to know which property. In cbe44b1 the javadoc was updated to mention the properties.

Shaded libthrift

Fluo uses Apache Thrift for remote procedure calls. Projects using Thrift use its compiler to generate code. This generated Java code make calls to libthrift which is an artifact release by the Thrift project. The code generated by a specific version of Thrift is only guaranteed to work with the same version of libthrift. For example, code generated by the Thrift 0.9.1 compiler is only guaranteed to work with libthrift 0.9.1.

Accumulo also uses Thrift for its RPCs. When Accumulo and Fluo use different versions of thrift it can cause serious problems. To avoid these problems, in 082ee8e libthrift was shaded and relocated into the fluo-core jar eliminating Fluo’s external dependency on libthrift. This means that no matter which version Accumulo uses, it will not conflict with Fluo’s version.

Asynchronous commit refactored

Fluo’s commit code is asynchronous in order to support high throughput. Before this release the high level commit logic was spread far and wide in the code. For this release the commit code was transitioned from Guava’s ListenableFuture to Java 8’s CompletableFuture in ca63aaf. This transition laid the ground work for 6bf604f which centralized the commit logic. Now the high level logic for the commit code is all in one place, making it much easier to understand.

Other notable changes

  • 7c16599 Added a method for efficiently comparing Byte and byte[]
  • 151c565 Added fluo status command.
  • #960 Fluo started using some Accumulo APIs introduced in 1.7.0. Therefore 1.7.0 is now the minimum version of Accumulo.
  • A few of Fluo’s internal caches were made configurable in 650f044, 9e87f47, and 955c86f.
  • Optimized Fluo’s server side code that runs in Accumulo tablet servers in 202fe08 and 51dc912
  • Added method to merge Fluo configuration objects in a349a9d
  • Initialization from Java API and command line now behave the same. Before 1.2, only command line initialization copied Fluo’s server side code to HDFS. Now the Java API also does this and it controlled by configuration.

Testing

The Fluo stress test was run twice as documented here and here.