Moving meter store implementation to use map events

Change-Id: I338473b7286d7b9e5cdfb938f16c7b6155d4cbb5
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index b8f2080..6477e68 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -15,38 +15,40 @@
  */
 package org.onosproject.incubator.store.meter.impl;
 
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.incubator.net.meter.DefaultBand;
 import org.onosproject.incubator.net.meter.DefaultMeter;
 import org.onosproject.incubator.net.meter.Meter;
 import org.onosproject.incubator.net.meter.MeterEvent;
 import org.onosproject.incubator.net.meter.MeterFailReason;
 import org.onosproject.incubator.net.meter.MeterId;
 import org.onosproject.incubator.net.meter.MeterOperation;
+import org.onosproject.incubator.net.meter.MeterState;
 import org.onosproject.incubator.net.meter.MeterStore;
 import org.onosproject.incubator.net.meter.MeterStoreDelegate;
+import org.onosproject.incubator.net.meter.MeterStoreResult;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -60,54 +62,37 @@
     private Logger log = getLogger(getClass());
 
     private static final String METERSTORE = "onos-meter-store";
-    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
-
-    private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
-
-
-    @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
-            label = "Number of threads in the message handler pool")
-    private int msgPoolSize;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationService clusterCommunicationService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
-    private ConsistentMap<MeterId, Meter> meters;
+    private ConsistentMap<MeterId, MeterData> meters;
     private NodeId local;
-    private KryoNamespace kryoNameSpace;
 
-    private Serializer serializer;
+    private MapEventListener mapListener = new InternalMapEventListener();
+
+    private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
+            Maps.newConcurrentMap();
 
     @Activate
     public void activate() {
 
         local = clusterService.getLocalNode().id();
 
-        kryoNameSpace =
-                KryoNamespace.newBuilder()
-                                .register(DefaultMeter.class)
-                                .register(DefaultBand.class)
-                                .build();
 
-        serializer = Serializer.using(kryoNameSpace);
-
-        meters = storageService.<MeterId, Meter>consistentMapBuilder()
+        meters = storageService.<MeterId, MeterData>consistentMapBuilder()
                     .withName(METERSTORE)
-                    .withSerializer(serializer)
+                    .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+                                                     MeterData.class))
                     .build();
 
-        ExecutorService executors = Executors.newFixedThreadPool(
-                msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
-        registerMessageHandlers(executors);
+        meters.addListener(mapListener);
 
         log.info("Started");
     }
@@ -115,159 +100,133 @@
     @Deactivate
     public void deactivate() {
 
-
+        meters.removeListener(mapListener);
         log.info("Stopped");
     }
 
-    private void registerMessageHandlers(ExecutorService executor) {
-        clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
-                                                              this::notifyDelegate, executor);
-
-    }
-
 
     @Override
-    public void storeMeter(Meter meter) {
-        NodeId master = mastershipService.getMasterFor(meter.deviceId());
+    public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+        futures.put(meter.id(), future);
+        MeterData data = new MeterData(meter, null, local);
 
-        meters.put(meter.id(), meter);
-
-        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
-                                          new MeterOperation(meter, MeterOperation.Type.ADD));
-        if (Objects.equals(local, master)) {
-            notifyDelegate(event);
-        } else {
-            clusterCommunicationService.unicast(
-                    event,
-                    UPDATE_METER,
-                    serializer::encode,
-                    master
-            ).whenComplete((result, error) -> {
-                if (error != null) {
-                    log.warn("Failed to install meter {} because {} on {}",
-                             meter, error, master);
-
-                    // notify app of failure
-                    meter.context().ifPresent(c -> c.onError(
-                            event.subject(), MeterFailReason.UNKNOWN));
-                }
-            });
+        try {
+            meters.put(meter.id(), data);
+        } catch (StorageException e) {
+            future.completeExceptionally(e);
         }
 
+        return future;
+
     }
 
     @Override
-    public void deleteMeter(Meter meter) {
+    public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+        futures.put(meter.id(), future);
 
-        NodeId master = mastershipService.getMasterFor(meter.deviceId());
+        MeterData data = new MeterData(meter, null, local);
 
         // update the state of the meter. It will be pruned by observing
         // that it has been removed from the dataplane.
-        meters.put(meter.id(), meter);
-
-        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
-                                          new MeterOperation(meter, MeterOperation.Type.REMOVE));
-        if (Objects.equals(local, master)) {
-            notifyDelegate(event);
-        } else {
-            clusterCommunicationService.unicast(
-                    event,
-                    UPDATE_METER,
-                    serializer::encode,
-                    master
-            ).whenComplete((result, error) -> {
-                if (error != null) {
-                    log.warn("Failed to delete meter {} because {} on {}",
-                             meter, error, master);
-
-                    // notify app of failure
-                    meter.context().ifPresent(c -> c.onError(
-                            event.subject(), MeterFailReason.UNKNOWN));
-                }
-            });
+        try {
+            meters.put(meter.id(), data);
+        } catch (StorageException e) {
+            future.completeExceptionally(e);
         }
 
+
+        return future;
     }
 
     @Override
-    public void updateMeter(Meter meter) {
+    public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
+        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+        futures.put(meter.id(), future);
 
-        NodeId master = mastershipService.getMasterFor(meter.deviceId());
-
-        meters.put(meter.id(), meter);
-
-        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
-                                          new MeterOperation(meter, MeterOperation.Type.MODIFY));
-        if (Objects.equals(local, master)) {
-            notifyDelegate(event);
-        } else {
-            clusterCommunicationService.unicast(
-                    event,
-                    UPDATE_METER,
-                    serializer::encode,
-                    master
-            ).whenComplete((result, error) -> {
-                if (error != null) {
-                    log.warn("Failed to update meter {} because {} on {}",
-                             meter, error, master);
-
-                    // notify app of failure
-                    meter.context().ifPresent(c -> c.onError(
-                            event.subject(), MeterFailReason.UNKNOWN));
-                }
-            });
+        MeterData data = new MeterData(meter, null, local);
+        try {
+            meters.put(meter.id(), data);
+        } catch (StorageException e) {
+            future.completeExceptionally(e);
         }
-
+        return future;
     }
 
     @Override
     public void updateMeterState(Meter meter) {
-        meters.compute(meter.id(), (id, v) -> {
-            DefaultMeter m = (DefaultMeter) v;
+        meters.computeIfPresent(meter.id(), (id, v) -> {
+            DefaultMeter m = (DefaultMeter) v.meter();
             m.setState(meter.state());
             m.setProcessedPackets(meter.packetsSeen());
             m.setProcessedBytes(meter.bytesSeen());
             m.setLife(meter.life());
+            // TODO: Prune if drops to zero.
             m.setReferenceCount(meter.referenceCount());
-            return m;
+            return new MeterData(m, null, v.origin());
         });
     }
 
     @Override
     public Meter getMeter(MeterId meterId) {
-        return meters.get(meterId).value();
+        MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
+        return data == null ? null : data.meter();
     }
 
     @Override
     public Collection<Meter> getAllMeters() {
-        return meters.values().stream()
-                .map(v -> v.value()).collect(Collectors.toSet());
+        return Collections2.transform(meters.asJavaMap().values(),
+                                      MeterData::meter);
     }
 
     @Override
     public void failedMeter(MeterOperation op, MeterFailReason reason) {
-        NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
-        meters.remove(op.meter().id());
-
-        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
-        if (Objects.equals(local, master)) {
-            notifyDelegate(event);
-        } else {
-            clusterCommunicationService.unicast(
-                    event,
-                    UPDATE_METER,
-                    serializer::encode,
-                    master
-            ).whenComplete((result, error) -> {
-                if (error != null) {
-                    log.warn("Failed to delete failed meter {} because {} on {}",
-                             op.meter(), error, master);
-
-                    // Can't do any more...
-                }
-            });
-        }
-
+        meters.computeIfPresent(op.meter().id(), (k, v) ->
+                new MeterData(v.meter(), reason, v.origin()));
     }
 
+    private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
+        @Override
+        public void event(MapEvent<MeterId, MeterData> event) {
+            MeterData data = event.value().value();
+            NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
+            switch (event.type()) {
+                case INSERT:
+                case UPDATE:
+                        switch (data.meter().state()) {
+                            case PENDING_ADD:
+                            case PENDING_REMOVE:
+                                if (!data.reason().isPresent() && local.equals(master)) {
+                                    notifyDelegate(
+                                            new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
+                                                    MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
+                                                                  data.meter()));
+                                } else if (data.reason().isPresent() && local.equals(data.origin())) {
+                                    MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
+                                    //TODO: No future -> no friend
+                                    futures.get(data.meter().id()).complete(msr);
+                                }
+                                break;
+                            case ADDED:
+                            case REMOVED:
+                                if (local.equals(data.origin())) {
+                                    futures.get(data.meter().id()).complete(MeterStoreResult.success());
+                                }
+                                break;
+                            default:
+                                log.warn("Unknown meter state type {}", data.meter().state());
+                        }
+                    break;
+                case REMOVE:
+                    //Only happens at origin so we do not need to care.
+                    break;
+                default:
+                    log.warn("Unknown Map event type {}", event.type());
+            }
+
+        }
+    }
+
+
 }