Fluo Recipes 1.2.0 documentation >> Recipes >> Accumulo Export
The Export Queue Recipe provides a generic foundation for building export mechanism to any
external data store. The AccumuloExporter provides an Exporter for writing to
Accumulo. AccumuloExporter is located in the fluo-recipes-accumulo module and provides the
following functionality:
Exporting to Accumulo is easy. Follow the steps below:
First, implement AccumuloTranslator. Your implementation translates exported
objects to Accumulo Mutations. For example, the SimpleTranslator class below translates String
key/values and into mutations for Accumulo. This step is optional, a lambda could
be used in step 3 instead of creating a class.
public class SimpleTranslator implements AccumuloTranslator<String,String> {
@Override
public void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) {
Mutation m = new Mutation(export.getKey());
m.put("cf", "cq", export.getSequence(), export.getValue());
consumer.accept(m);
}
}
Configure an ExportQueue and the export table prior to initializing Fluo.
FluoConfiguration fluoConfig = ...;
String instance = // Name of accumulo instance exporting to
String zookeepers = // Zookeepers used by Accumulo instance exporting to
String user = // Accumulo username, user that can write to exportTable
String password = // Accumulo user password
String exportTable = // Name of table to export to
// Set properties for table to export to in Fluo app configuration.
AccumuloExporter.configure(EXPORT_QID).instance(instance, zookeepers)
.credentials(user, password).table(exportTable).save(fluoConfig);
// Set properties for export queue in Fluo app configuration
ExportQueue.configure(EXPORT_QID).keyType(String.class).valueType(String.class)
.buckets(119).save(fluoConfig);
// Initialize Fluo using fluoConfig
In the applications ObserverProvider, register an observer that will process exports and write
them to Accumulo using AccumuloExporter. Also, register observers that add to the export
queue.
public class MyObserverProvider implements ObserverProvider {
@Override
public void provide(Registry obsRegistry, Context ctx) {
SimpleConfiguration appCfg = ctx.getAppConfiguration();
ExportQueue<String, String> expQ = ExportQueue.getInstance(EXPORT_QID, appCfg);
// Register observer that will processes entries on export queue and write them to the Accumulo
// table configured earlier. SimpleTranslator from step 1 is passed here, could have used a
// lambda instead.
expQ.registerObserver(obsRegistry,
new AccumuloExporter<>(EXPORT_QID, appCfg, new SimpleTranslator()));
// An example observer created using a lambda that adds to the export queue.
obsRegistry.forColumn(OBS_COL, WEAK).useObserver((tx,row,col) -> {
// Read some data and do some work
// Add results to export queue
String key = // key that identifies export
String value = // object to export
expQ.add(tx, key, value);
});
}
}
The getTranslator() method in AccumuloReplicator creates a specialized AccumuloTranslator for replicating a Fluo table to Accumulo.