Moving meter store implementation to use map events
Change-Id: I338473b7286d7b9e5cdfb938f16c7b6155d4cbb5
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index b8f2080..6477e68 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -15,38 +15,40 @@
*/
package org.onosproject.incubator.store.meter.impl;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.incubator.net.meter.DefaultBand;
import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterEvent;
import org.onosproject.incubator.net.meter.MeterFailReason;
import org.onosproject.incubator.net.meter.MeterId;
import org.onosproject.incubator.net.meter.MeterOperation;
+import org.onosproject.incubator.net.meter.MeterState;
import org.onosproject.incubator.net.meter.MeterStore;
import org.onosproject.incubator.net.meter.MeterStoreDelegate;
+import org.onosproject.incubator.net.meter.MeterStoreResult;
import org.onosproject.mastership.MastershipService;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.slf4j.LoggerFactory.getLogger;
@@ -60,54 +62,37 @@
private Logger log = getLogger(getClass());
private static final String METERSTORE = "onos-meter-store";
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
-
- private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
-
-
- @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
- label = "Number of threads in the message handler pool")
- private int msgPoolSize;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationService clusterCommunicationService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
- private ConsistentMap<MeterId, Meter> meters;
+ private ConsistentMap<MeterId, MeterData> meters;
private NodeId local;
- private KryoNamespace kryoNameSpace;
- private Serializer serializer;
+ private MapEventListener mapListener = new InternalMapEventListener();
+
+ private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
+ Maps.newConcurrentMap();
@Activate
public void activate() {
local = clusterService.getLocalNode().id();
- kryoNameSpace =
- KryoNamespace.newBuilder()
- .register(DefaultMeter.class)
- .register(DefaultBand.class)
- .build();
- serializer = Serializer.using(kryoNameSpace);
-
- meters = storageService.<MeterId, Meter>consistentMapBuilder()
+ meters = storageService.<MeterId, MeterData>consistentMapBuilder()
.withName(METERSTORE)
- .withSerializer(serializer)
+ .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ MeterData.class))
.build();
- ExecutorService executors = Executors.newFixedThreadPool(
- msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
- registerMessageHandlers(executors);
+ meters.addListener(mapListener);
log.info("Started");
}
@@ -115,159 +100,133 @@
@Deactivate
public void deactivate() {
-
+ meters.removeListener(mapListener);
log.info("Stopped");
}
- private void registerMessageHandlers(ExecutorService executor) {
- clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
- this::notifyDelegate, executor);
-
- }
-
@Override
- public void storeMeter(Meter meter) {
- NodeId master = mastershipService.getMasterFor(meter.deviceId());
+ public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
+ MeterData data = new MeterData(meter, null, local);
- meters.put(meter.id(), meter);
-
- MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
- new MeterOperation(meter, MeterOperation.Type.ADD));
- if (Objects.equals(local, master)) {
- notifyDelegate(event);
- } else {
- clusterCommunicationService.unicast(
- event,
- UPDATE_METER,
- serializer::encode,
- master
- ).whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to install meter {} because {} on {}",
- meter, error, master);
-
- // notify app of failure
- meter.context().ifPresent(c -> c.onError(
- event.subject(), MeterFailReason.UNKNOWN));
- }
- });
+ try {
+ meters.put(meter.id(), data);
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
}
+ return future;
+
}
@Override
- public void deleteMeter(Meter meter) {
+ public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
- NodeId master = mastershipService.getMasterFor(meter.deviceId());
+ MeterData data = new MeterData(meter, null, local);
// update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
- meters.put(meter.id(), meter);
-
- MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
- new MeterOperation(meter, MeterOperation.Type.REMOVE));
- if (Objects.equals(local, master)) {
- notifyDelegate(event);
- } else {
- clusterCommunicationService.unicast(
- event,
- UPDATE_METER,
- serializer::encode,
- master
- ).whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to delete meter {} because {} on {}",
- meter, error, master);
-
- // notify app of failure
- meter.context().ifPresent(c -> c.onError(
- event.subject(), MeterFailReason.UNKNOWN));
- }
- });
+ try {
+ meters.put(meter.id(), data);
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
}
+
+ return future;
}
@Override
- public void updateMeter(Meter meter) {
+ public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
+ CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+ futures.put(meter.id(), future);
- NodeId master = mastershipService.getMasterFor(meter.deviceId());
-
- meters.put(meter.id(), meter);
-
- MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
- new MeterOperation(meter, MeterOperation.Type.MODIFY));
- if (Objects.equals(local, master)) {
- notifyDelegate(event);
- } else {
- clusterCommunicationService.unicast(
- event,
- UPDATE_METER,
- serializer::encode,
- master
- ).whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to update meter {} because {} on {}",
- meter, error, master);
-
- // notify app of failure
- meter.context().ifPresent(c -> c.onError(
- event.subject(), MeterFailReason.UNKNOWN));
- }
- });
+ MeterData data = new MeterData(meter, null, local);
+ try {
+ meters.put(meter.id(), data);
+ } catch (StorageException e) {
+ future.completeExceptionally(e);
}
-
+ return future;
}
@Override
public void updateMeterState(Meter meter) {
- meters.compute(meter.id(), (id, v) -> {
- DefaultMeter m = (DefaultMeter) v;
+ meters.computeIfPresent(meter.id(), (id, v) -> {
+ DefaultMeter m = (DefaultMeter) v.meter();
m.setState(meter.state());
m.setProcessedPackets(meter.packetsSeen());
m.setProcessedBytes(meter.bytesSeen());
m.setLife(meter.life());
+ // TODO: Prune if drops to zero.
m.setReferenceCount(meter.referenceCount());
- return m;
+ return new MeterData(m, null, v.origin());
});
}
@Override
public Meter getMeter(MeterId meterId) {
- return meters.get(meterId).value();
+ MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
+ return data == null ? null : data.meter();
}
@Override
public Collection<Meter> getAllMeters() {
- return meters.values().stream()
- .map(v -> v.value()).collect(Collectors.toSet());
+ return Collections2.transform(meters.asJavaMap().values(),
+ MeterData::meter);
}
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
- NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
- meters.remove(op.meter().id());
-
- MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
- if (Objects.equals(local, master)) {
- notifyDelegate(event);
- } else {
- clusterCommunicationService.unicast(
- event,
- UPDATE_METER,
- serializer::encode,
- master
- ).whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to delete failed meter {} because {} on {}",
- op.meter(), error, master);
-
- // Can't do any more...
- }
- });
- }
-
+ meters.computeIfPresent(op.meter().id(), (k, v) ->
+ new MeterData(v.meter(), reason, v.origin()));
}
+ private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
+ @Override
+ public void event(MapEvent<MeterId, MeterData> event) {
+ MeterData data = event.value().value();
+ NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ switch (data.meter().state()) {
+ case PENDING_ADD:
+ case PENDING_REMOVE:
+ if (!data.reason().isPresent() && local.equals(master)) {
+ notifyDelegate(
+ new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
+ MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
+ data.meter()));
+ } else if (data.reason().isPresent() && local.equals(data.origin())) {
+ MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
+ //TODO: No future -> no friend
+ futures.get(data.meter().id()).complete(msr);
+ }
+ break;
+ case ADDED:
+ case REMOVED:
+ if (local.equals(data.origin())) {
+ futures.get(data.meter().id()).complete(MeterStoreResult.success());
+ }
+ break;
+ default:
+ log.warn("Unknown meter state type {}", data.meter().state());
+ }
+ break;
+ case REMOVE:
+ //Only happens at origin so we do not need to care.
+ break;
+ default:
+ log.warn("Unknown Map event type {}", event.type());
+ }
+
+ }
+ }
+
+
}