Fluo Recipes 1.2.0 documentation >> Recipes >> Accumulo Export
The Export Queue Recipe provides a generic foundation for building export mechanism to any
external data store. The AccumuloExporter provides an Exporter for writing to
Accumulo. AccumuloExporter is located in the fluo-recipes-accumulo
module and provides the
following functionality:
Exporting to Accumulo is easy. Follow the steps below:
First, implement AccumuloTranslator. Your implementation translates exported
objects to Accumulo Mutations. For example, the SimpleTranslator
class below translates String
key/values and into mutations for Accumulo. This step is optional, a lambda could
be used in step 3 instead of creating a class.
public class SimpleTranslator implements AccumuloTranslator<String,String> {
@Override
public void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) {
Mutation m = new Mutation(export.getKey());
m.put("cf", "cq", export.getSequence(), export.getValue());
consumer.accept(m);
}
}
Configure an ExportQueue
and the export table prior to initializing Fluo.
FluoConfiguration fluoConfig = ...;
String instance = // Name of accumulo instance exporting to
String zookeepers = // Zookeepers used by Accumulo instance exporting to
String user = // Accumulo username, user that can write to exportTable
String password = // Accumulo user password
String exportTable = // Name of table to export to
// Set properties for table to export to in Fluo app configuration.
AccumuloExporter.configure(EXPORT_QID).instance(instance, zookeepers)
.credentials(user, password).table(exportTable).save(fluoConfig);
// Set properties for export queue in Fluo app configuration
ExportQueue.configure(EXPORT_QID).keyType(String.class).valueType(String.class)
.buckets(119).save(fluoConfig);
// Initialize Fluo using fluoConfig
In the applications ObserverProvider
, register an observer that will process exports and write
them to Accumulo using AccumuloExporter. Also, register observers that add to the export
queue.
public class MyObserverProvider implements ObserverProvider {
@Override
public void provide(Registry obsRegistry, Context ctx) {
SimpleConfiguration appCfg = ctx.getAppConfiguration();
ExportQueue<String, String> expQ = ExportQueue.getInstance(EXPORT_QID, appCfg);
// Register observer that will processes entries on export queue and write them to the Accumulo
// table configured earlier. SimpleTranslator from step 1 is passed here, could have used a
// lambda instead.
expQ.registerObserver(obsRegistry,
new AccumuloExporter<>(EXPORT_QID, appCfg, new SimpleTranslator()));
// An example observer created using a lambda that adds to the export queue.
obsRegistry.forColumn(OBS_COL, WEAK).useObserver((tx,row,col) -> {
// Read some data and do some work
// Add results to export queue
String key = // key that identifies export
String value = // object to export
expQ.add(tx, key, value);
});
}
}
The getTranslator()
method in AccumuloReplicator creates a specialized AccumuloTranslator for replicating a Fluo table to Accumulo.