Tour page 24 of 28
static Column NC = new Column("ntyf", "sum");
static Column TOTAL_COL = new Column("sum", "total");
static Column UPDATE_COL = new Column("sum", "update");
public static class TourObserverProvider implements ObserverProvider {
@Override
public void provide(Registry obsRegistry, Context ctx) {
// Observers can be written as lambdas
StringObserver summingObs = (tx, row, col) -> {
int sum = Integer.parseInt(tx.gets(row, TOTAL_COL, "0"));
CellScanner scanner = tx.scanner().overPrefix(row + "/").build();
for (RowColumnValue rcv : scanner) {
sum += Integer.parseInt(rcv.getsValue());
tx.delete(rcv.getRow(), rcv.getColumn());
}
System.out.println("sum : " + sum);
tx.set(row, TOTAL_COL, "" + sum);
};
obsRegistry.forColumn(NC, NotificationType.WEAK).useObserver(summingObs);
}
}
private static void preInit(FluoConfiguration fluoConfig) {
fluoConfig.setObserverProvider(TourObserverProvider.class);
}
private static void exercise(MiniFluo mini, FluoClient client) {
try (LoaderExecutor le = client.newLoaderExecutor()) {
Random r = new Random(42);
for (int i = 0; i < 5000; i++) {
// The Loader interface only has one function and can therefore be written as a lambda
// below.
le.execute((tx, ctx) -> {
String row = "counter001/" + String.format("%07d", r.nextInt(10_000_000));
int curVal = Integer.parseInt(tx.gets(row, UPDATE_COL, "0"));
tx.set(row, UPDATE_COL, curVal + 1 + "");
tx.setWeakNotification("counter001", NC);
});
}
}
mini.waitForObservers();
try (Snapshot snap = client.newSnapshot()) {
System.out.println("final sum : " + snap.gets("counter001", TOTAL_COL));
}
}
The code above will print something like the following. Every run will print something slightly different because it depends on when the observer runs.
$ mvn -q clean compile exec:java
Starting MiniFluo ... started.
sum : 526
sum : 1465
sum : 2414
sum : 3260
sum : 4736
sum : 5000
final sum : 5000