Fluo Tour: Weak Notification Code

Tour page 24 of 28

  static Column NC = new Column("ntyf", "sum");
  static Column TOTAL_COL = new Column("sum", "total");
  static Column UPDATE_COL = new Column("sum", "update");

  public static class TourObserverProvider implements ObserverProvider {
    @Override
    public void provide(Registry obsRegistry, Context ctx) {

      // Observers can be written as lambdas
      StringObserver summingObs = (tx, row, col) -> {
        int sum = Integer.parseInt(tx.gets(row, TOTAL_COL, "0"));

        CellScanner scanner = tx.scanner().overPrefix(row + "/").build();
        for (RowColumnValue rcv : scanner) {
          sum += Integer.parseInt(rcv.getsValue());
          tx.delete(rcv.getRow(), rcv.getColumn());
        }

        System.out.println("sum : " + sum);

        tx.set(row, TOTAL_COL, "" + sum);
      };

      obsRegistry.forColumn(NC, NotificationType.WEAK).useObserver(summingObs);
    }
  }

  private static void preInit(FluoConfiguration fluoConfig) {
    fluoConfig.setObserverProvider(TourObserverProvider.class);
  }

  private static void exercise(MiniFluo mini, FluoClient client) {
    try (LoaderExecutor le = client.newLoaderExecutor()) {
      Random r = new Random(42);
      for (int i = 0; i < 5000; i++) {
        // The Loader interface only has one function and can therefore be written as a lambda
        // below.
        le.execute((tx, ctx) -> {
          String row = "counter001/" + String.format("%07d", r.nextInt(10_000_000));
          int curVal = Integer.parseInt(tx.gets(row, UPDATE_COL, "0"));
          tx.set(row, UPDATE_COL, curVal + 1 + "");
          tx.setWeakNotification("counter001", NC);
        });
      }
    }

    mini.waitForObservers();

    try (Snapshot snap = client.newSnapshot()) {
      System.out.println("final sum : " + snap.gets("counter001", TOTAL_COL));
    }
  }

The code above will print something like the following. Every run will print something slightly different because it depends on when the observer runs.

$ mvn -q clean compile exec:java
Starting MiniFluo ... started.
sum : 526
sum : 1465
sum : 2414
sum : 3260
sum : 4736
sum : 5000
final sum : 5000

< 24 / 28 >