Initial implementation of Meter Service (needs testing)
Change-Id: Ie07bd3e2bd7c67a6499c965d8926eb361ad16462
store impl started
Change-Id: Ib8b474f40dcecff335a421c12ad149fe9830c427
full implementation
Change-Id: Ie59fd61d02972bd04d887bdcca9745793b880681
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
new file mode 100644
index 0000000..b8f2080
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.meter.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.meter.DefaultBand;
+import org.onosproject.incubator.net.meter.DefaultMeter;
+import org.onosproject.incubator.net.meter.Meter;
+import org.onosproject.incubator.net.meter.MeterEvent;
+import org.onosproject.incubator.net.meter.MeterFailReason;
+import org.onosproject.incubator.net.meter.MeterId;
+import org.onosproject.incubator.net.meter.MeterOperation;
+import org.onosproject.incubator.net.meter.MeterStore;
+import org.onosproject.incubator.net.meter.MeterStoreDelegate;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A distributed meter store implementation. Meters are stored consistently
+ * across the cluster.
+ */
+public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
+ implements MeterStore {
+
+ private Logger log = getLogger(getClass());
+
+ private static final String METERSTORE = "onos-meter-store";
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
+ private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
+
+
+ @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Number of threads in the message handler pool")
+ private int msgPoolSize;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicationService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ private ConsistentMap<MeterId, Meter> meters;
+ private NodeId local;
+ private KryoNamespace kryoNameSpace;
+
+ private Serializer serializer;
+
+ @Activate
+ public void activate() {
+
+ local = clusterService.getLocalNode().id();
+
+ kryoNameSpace =
+ KryoNamespace.newBuilder()
+ .register(DefaultMeter.class)
+ .register(DefaultBand.class)
+ .build();
+
+ serializer = Serializer.using(kryoNameSpace);
+
+ meters = storageService.<MeterId, Meter>consistentMapBuilder()
+ .withName(METERSTORE)
+ .withSerializer(serializer)
+ .build();
+
+ ExecutorService executors = Executors.newFixedThreadPool(
+ msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
+ registerMessageHandlers(executors);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+
+
+ log.info("Stopped");
+ }
+
+ private void registerMessageHandlers(ExecutorService executor) {
+ clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
+ this::notifyDelegate, executor);
+
+ }
+
+
+ @Override
+ public void storeMeter(Meter meter) {
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+ meters.put(meter.id(), meter);
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+ new MeterOperation(meter, MeterOperation.Type.ADD));
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to install meter {} because {} on {}",
+ meter, error, master);
+
+ // notify app of failure
+ meter.context().ifPresent(c -> c.onError(
+ event.subject(), MeterFailReason.UNKNOWN));
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public void deleteMeter(Meter meter) {
+
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+ // update the state of the meter. It will be pruned by observing
+ // that it has been removed from the dataplane.
+ meters.put(meter.id(), meter);
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+ new MeterOperation(meter, MeterOperation.Type.REMOVE));
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to delete meter {} because {} on {}",
+ meter, error, master);
+
+ // notify app of failure
+ meter.context().ifPresent(c -> c.onError(
+ event.subject(), MeterFailReason.UNKNOWN));
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public void updateMeter(Meter meter) {
+
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+ meters.put(meter.id(), meter);
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+ new MeterOperation(meter, MeterOperation.Type.MODIFY));
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to update meter {} because {} on {}",
+ meter, error, master);
+
+ // notify app of failure
+ meter.context().ifPresent(c -> c.onError(
+ event.subject(), MeterFailReason.UNKNOWN));
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public void updateMeterState(Meter meter) {
+ meters.compute(meter.id(), (id, v) -> {
+ DefaultMeter m = (DefaultMeter) v;
+ m.setState(meter.state());
+ m.setProcessedPackets(meter.packetsSeen());
+ m.setProcessedBytes(meter.bytesSeen());
+ m.setLife(meter.life());
+ m.setReferenceCount(meter.referenceCount());
+ return m;
+ });
+ }
+
+ @Override
+ public Meter getMeter(MeterId meterId) {
+ return meters.get(meterId).value();
+ }
+
+ @Override
+ public Collection<Meter> getAllMeters() {
+ return meters.values().stream()
+ .map(v -> v.value()).collect(Collectors.toSet());
+ }
+
+ @Override
+ public void failedMeter(MeterOperation op, MeterFailReason reason) {
+ NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
+ meters.remove(op.meter().id());
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to delete failed meter {} because {} on {}",
+ op.meter(), error, master);
+
+ // Can't do any more...
+ }
+ });
+ }
+
+ }
+
+}