Initial implementation of Meter Service (needs testing)
Change-Id: Ie07bd3e2bd7c67a6499c965d8926eb361ad16462
store impl started
Change-Id: Ib8b474f40dcecff335a421c12ad149fe9830c427
full implementation
Change-Id: Ie59fd61d02972bd04d887bdcca9745793b880681
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java
index ecd6504..1342a22 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java
@@ -69,6 +69,20 @@
*/
Type type();
+ /**
+ * Returns the packets seen by this band.
+ *
+ * @return a long value
+ */
+ long packets();
+
+ /**
+ * Return the bytes seen by this band.
+ *
+ * @return a byte counter
+ */
+ long bytes();
+
interface Builder {
/**
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/BandEntry.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/BandEntry.java
new file mode 100644
index 0000000..369e1cc
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/BandEntry.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+/**
+ * Represents a stored band.
+ */
+public interface BandEntry extends Band {
+
+ /**
+ * Sets the number of packets seen by this band.
+ *
+ * @param packets a packet count
+ */
+ void setPackets(long packets);
+
+ /**
+ * Sets the number of bytes seen by this band.
+ *
+ * @param bytes a byte counter
+ */
+ void setBytes(long bytes);
+
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java
index bff757b..669dc0d 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java
@@ -20,12 +20,14 @@
/**
* A default implementation for a Band.
*/
-public final class DefaultBand implements Band {
+public final class DefaultBand implements Band, BandEntry {
private final Type type;
private final long rate;
private final long burstSize;
private final short prec;
+ private long packets;
+ private long bytes;
public DefaultBand(Type type, long rate,
long burstSize, short prec) {
@@ -55,6 +57,26 @@
return type;
}
+ @Override
+ public long packets() {
+ return packets;
+ }
+
+ @Override
+ public long bytes() {
+ return bytes;
+ }
+
+ @Override
+ public void setPackets(long packets) {
+ this.packets = packets;
+ }
+
+ @Override
+ public void setBytes(long bytes) {
+ this.bytes = bytes;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -91,7 +113,7 @@
}
@Override
- public Band build() {
+ public DefaultBand build() {
checkArgument(prec != null && type == Type.REMARK,
"Only REMARK bands can have a precendence.");
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java
index 953feb2..138bb18 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java
@@ -28,7 +28,7 @@
/**
* A default implementation of a meter.
*/
-public final class DefaultMeter implements Meter {
+public final class DefaultMeter implements Meter, MeterEntry {
private final MeterId id;
@@ -39,6 +39,12 @@
private final DeviceId deviceId;
private final Optional<MeterContext> context;
+ private MeterState state;
+ private long life;
+ private long refCount;
+ private long packets;
+ private long bytes;
+
private DefaultMeter(DeviceId deviceId, MeterId id, ApplicationId appId,
Unit unit, boolean burst,
Collection<Band> bands, Optional<MeterContext> context) {
@@ -86,10 +92,60 @@
return null;
}
+ @Override
+ public MeterState state() {
+ return state;
+ }
+
+ @Override
+ public long life() {
+ return life;
+ }
+
+ @Override
+ public long referenceCount() {
+ return refCount;
+ }
+
+ @Override
+ public long packetsSeen() {
+ return packets;
+ }
+
+ @Override
+ public long bytesSeen() {
+ return bytes;
+ }
+
public static Builder builder() {
return new Builder();
}
+ @Override
+ public void setState(MeterState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void setLife(long life) {
+ this.life = life;
+ }
+
+ @Override
+ public void setReferenceCount(long count) {
+ this.refCount = count;
+ }
+
+ @Override
+ public void setProcessedPackets(long packets) {
+ this.packets = packets;
+ }
+
+ @Override
+ public void setProcessedBytes(long bytes) {
+ this.bytes = bytes;
+ }
+
public static final class Builder implements Meter.Builder {
private MeterId id;
@@ -108,7 +164,7 @@
}
@Override
- public Meter.Builder withId(int id) {
+ public Meter.Builder withId(long id) {
this.id = MeterId.meterId(id);
return this;
}
@@ -144,7 +200,7 @@
}
@Override
- public Meter build() {
+ public DefaultMeter build() {
checkNotNull(deviceId, "Must specify a device");
checkNotNull(bands, "Must have bands.");
checkArgument(bands.size() > 0, "Must have at least one band.");
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java
index e70dd88..14593df 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java
@@ -89,6 +89,41 @@
Optional<MeterContext> context();
/**
+ * Fetches the state of this meter.
+ *
+ * @return a meter state
+ */
+ MeterState state();
+
+ /**
+ * The lifetime in seconds of this meter.
+ *
+ * @return number of seconds
+ */
+ long life();
+
+ /**
+ * The number of flows pointing to this meter.
+ *
+ * @return a reference count
+ */
+ long referenceCount();
+
+ /**
+ * Number of packets processed by this meter.
+ *
+ * @return a packet count
+ */
+ long packetsSeen();
+
+ /**
+ * Number of bytes processed by this meter.
+ *
+ * @return a byte count
+ */
+ long bytesSeen();
+
+ /**
* A meter builder.
*/
interface Builder {
@@ -104,10 +139,10 @@
/**
* Assigns the id to this meter.
*
- * @param id an integer
+ * @param id a long
* @return this
*/
- Builder withId(int id);
+ Builder withId(long id);
/**
* Assigns the application that built this meter.
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java
index 4c814f8..919cf63 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java
@@ -34,6 +34,5 @@
* @param op a meter operation
* @param reason the reason why it failed
*/
- default void onError(MeterOperation op, MeterFailReason reason) {
- }
+ default void onError(MeterOperation op, MeterFailReason reason) {}
}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEntry.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEntry.java
new file mode 100644
index 0000000..6157578
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEntry.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+/**
+ * Represents a stored meter.
+ */
+public interface MeterEntry extends Meter {
+
+ /**
+ * Updates the state of this meter.
+ *
+ * @param state a meter state
+ */
+ void setState(MeterState state);
+
+ /**
+ * Set the amount of time the meter has existed in seconds.
+ *
+ * @param life number of seconds
+ */
+ void setLife(long life);
+
+ /**
+ * Sets the number of flows which are using this meter.
+ *
+ * @param count a reference count.
+ */
+ void setReferenceCount(long count);
+
+ /**
+ * Updates the number of packets seen by this meter.
+ *
+ * @param packets a packet count.
+ */
+ void setProcessedPackets(long packets);
+
+ /**
+ * Updates the number of bytes seen by the meter.
+ *
+ * @param bytes a byte counter.
+ */
+ void setProcessedBytes(long bytes);
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java
index a952a14..9439145 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java
@@ -20,20 +20,12 @@
/**
* Entity that represents Meter events.
*/
-public class MeterEvent extends AbstractEvent<MeterEvent.Type, Meter> {
+public class MeterEvent extends AbstractEvent<MeterEvent.Type, MeterOperation> {
- enum Type {
+ private final MeterFailReason reason;
- /**
- * Signals that a new meter has been added.
- */
- METER_ADDED,
-
- /**
- * Signals that a meter has been removed.
- */
- METER_REMOVED,
+ public enum Type {
/**
* Signals that a meter has been added.
@@ -41,19 +33,15 @@
METER_UPDATED,
/**
- * Signals that a meter addition failed.
- */
- METER_ADD_FAILED,
-
- /**
- * Signals that a meter removal failed.
- */
- METER_REMOVE_FAILED,
-
- /**
* Signals that a meter update failed.
*/
- METER_UPDATE_FAILED
+ METER_OP_FAILED,
+
+ /**
+ * A meter operation was requested.
+ */
+ METER_OP_REQ,
+
}
@@ -62,21 +50,32 @@
* current time.
*
* @param type meter event type
- * @param meter event subject
+ * @param op event subject
*/
- public MeterEvent(Type type, Meter meter) {
- super(type, meter);
+ public MeterEvent(Type type, MeterOperation op) {
+ super(type, op);
+ this.reason = null;
}
/**
* Creates an event of a given type and for the specified meter and time.
*
* @param type meter event type
- * @param meter event subject
+ * @param op event subject
* @param time occurrence time
*/
- public MeterEvent(Type type, Meter meter, long time) {
- super(type, meter, time);
+ public MeterEvent(Type type, MeterOperation op, long time) {
+ super(type, op, time);
+ this.reason = null;
+ }
+
+ public MeterEvent(Type type, MeterOperation op, MeterFailReason reason) {
+ super(type, op);
+ this.reason = reason;
+ }
+
+ public MeterFailReason reason() {
+ return reason;
}
}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java
index ea7cf36..8619e14 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java
@@ -19,19 +19,19 @@
/**
* A representation of a meter id.
- * Uniquely identifies a meter for a given device.
+ * Uniquely identifies a meter system wide.
*/
public final class MeterId {
static final long MAX = 0xFFFF0000;
- private final int id;
+ private final long id;
public static final MeterId SLOWPATH = new MeterId(0xFFFFFFFD);
public static final MeterId CONTROLLER = new MeterId(0xFFFFFFFE);
public static final MeterId ALL = new MeterId(0xFFFFFFFF);
- private MeterId(int id) {
+ private MeterId(long id) {
checkArgument(id <= MAX, "id cannot be larger than 0xFFFF0000");
this.id = id;
}
@@ -39,9 +39,9 @@
/**
* The integer representation of the meter id.
*
- * @return an integer
+ * @return a long
*/
- public int id() {
+ public long id() {
return id;
}
@@ -62,10 +62,10 @@
@Override
public int hashCode() {
- return id;
+ return Long.hashCode(id);
}
- public static MeterId meterId(int id) {
+ public static MeterId meterId(long id) {
return new MeterId(id);
}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterState.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterState.java
new file mode 100644
index 0000000..a910d6d
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterState.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+/**
+ * Represents the state of the meter as seen by the store.
+ */
+public enum MeterState {
+
+ /**
+ * The meter is in the process of being added.
+ */
+ PENDING_ADD,
+
+ /**
+ * THe meter has been added.
+ */
+ ADDED,
+
+ /**
+ * The meter is in the process of being removed.
+ */
+ PENDING_REMOVE,
+
+ /**
+ * The meter has been removed.
+ */
+ REMOVED,
+
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterStore.java
new file mode 100644
index 0000000..232eedd
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterStore.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+import org.onosproject.store.Store;
+
+import java.util.Collection;
+
+/**
+ * Entity that stores and distributed meter objects.
+ */
+public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
+
+ /**
+ * Adds a meter to the store.
+ *
+ * @param meter a meter
+ */
+ void storeMeter(Meter meter);
+
+ /**
+ * Deletes a meter from the store.
+ *
+ * @param meter a meter
+ */
+ void deleteMeter(Meter meter);
+
+ /**
+ * Updates a meter whose meter id is the same as the passed meter.
+ *
+ * @param meter a new meter
+ */
+ void updateMeter(Meter meter);
+
+ /**
+ * Updates a given meter's state with the provided state.
+ * @param meter a meter
+ */
+ void updateMeterState(Meter meter);
+
+ /**
+ * Obtains a meter matching the given meter id.
+ *
+ * @param meterId a meter id
+ * @return a meter
+ */
+ Meter getMeter(MeterId meterId);
+
+ /**
+ * Returns all meters stored in the store.
+ *
+ * @return a collection of meters
+ */
+ Collection<Meter> getAllMeters();
+
+ /**
+ * Update the store by deleting the failed meter.
+ * Notifies the delegate that the meter failed to allow it
+ * to nofity the app.
+ *
+ * @param op a failed meter operation
+ * @param reason a failure reason
+ */
+ void failedMeter(MeterOperation op, MeterFailReason reason);
+}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 6039247..b0726d6 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -21,6 +21,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterEvent;
import org.onosproject.incubator.net.meter.MeterFailReason;
@@ -31,6 +32,8 @@
import org.onosproject.incubator.net.meter.MeterProviderRegistry;
import org.onosproject.incubator.net.meter.MeterProviderService;
import org.onosproject.incubator.net.meter.MeterService;
+import org.onosproject.incubator.net.meter.MeterState;
+import org.onosproject.incubator.net.meter.MeterStore;
import org.onosproject.incubator.net.meter.MeterStoreDelegate;
import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractListenerProviderRegistry;
@@ -41,6 +44,7 @@
import java.util.Collection;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@@ -60,6 +64,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ MeterStore store;
+
private AtomicCounter meterIdCounter;
@Activate
@@ -82,27 +89,35 @@
@Override
public void addMeter(Meter meter) {
-
+ DefaultMeter m = (DefaultMeter) meter;
+ m.setState(MeterState.PENDING_ADD);
+ store.storeMeter(m);
}
@Override
public void updateMeter(Meter meter) {
-
+ DefaultMeter m = (DefaultMeter) meter;
+ m.setState(MeterState.PENDING_ADD);
+ store.updateMeter(m);
}
@Override
public void removeMeter(Meter meter) {
-
+ DefaultMeter m = (DefaultMeter) meter;
+ m.setState(MeterState.PENDING_REMOVE);
+ store.deleteMeter(m);
}
@Override
public void removeMeter(MeterId id) {
-
+ DefaultMeter meter = (DefaultMeter) store.getMeter(id);
+ checkNotNull(meter, "No such meter {}", id);
+ removeMeter(meter);
}
@Override
public Meter getMeter(MeterId id) {
- return null;
+ return store.getMeter(id);
}
@Override
@@ -125,13 +140,14 @@
}
@Override
- public void meterOperationFailed(MeterOperation operation, MeterFailReason reason) {
-
+ public void meterOperationFailed(MeterOperation operation,
+ MeterFailReason reason) {
+ store.failedMeter(operation, reason);
}
@Override
public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
-
+ meterEntries.forEach(m -> store.updateMeterState(m));
}
}
@@ -139,6 +155,21 @@
@Override
public void notify(MeterEvent event) {
+ DeviceId deviceId = event.subject().meter().deviceId();
+ MeterProvider p = getProvider(event.subject().meter().deviceId());
+ switch (event.type()) {
+ case METER_UPDATED:
+ break;
+ case METER_OP_FAILED:
+ event.subject().meter().context().ifPresent(c ->
+ c.onError(event.subject(), event.reason()));
+ break;
+ case METER_OP_REQ:
+ p.performMeterOperation(deviceId, event.subject());
+ break;
+ default:
+ log.warn("Unknown meter event {}", event.type());
+ }
}
}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
new file mode 100644
index 0000000..b8f2080
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.meter.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.meter.DefaultBand;
+import org.onosproject.incubator.net.meter.DefaultMeter;
+import org.onosproject.incubator.net.meter.Meter;
+import org.onosproject.incubator.net.meter.MeterEvent;
+import org.onosproject.incubator.net.meter.MeterFailReason;
+import org.onosproject.incubator.net.meter.MeterId;
+import org.onosproject.incubator.net.meter.MeterOperation;
+import org.onosproject.incubator.net.meter.MeterStore;
+import org.onosproject.incubator.net.meter.MeterStoreDelegate;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A distributed meter store implementation. Meters are stored consistently
+ * across the cluster.
+ */
+public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
+ implements MeterStore {
+
+ private Logger log = getLogger(getClass());
+
+ private static final String METERSTORE = "onos-meter-store";
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
+ private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
+
+
+ @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ label = "Number of threads in the message handler pool")
+ private int msgPoolSize;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicationService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ private ConsistentMap<MeterId, Meter> meters;
+ private NodeId local;
+ private KryoNamespace kryoNameSpace;
+
+ private Serializer serializer;
+
+ @Activate
+ public void activate() {
+
+ local = clusterService.getLocalNode().id();
+
+ kryoNameSpace =
+ KryoNamespace.newBuilder()
+ .register(DefaultMeter.class)
+ .register(DefaultBand.class)
+ .build();
+
+ serializer = Serializer.using(kryoNameSpace);
+
+ meters = storageService.<MeterId, Meter>consistentMapBuilder()
+ .withName(METERSTORE)
+ .withSerializer(serializer)
+ .build();
+
+ ExecutorService executors = Executors.newFixedThreadPool(
+ msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
+ registerMessageHandlers(executors);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+
+
+ log.info("Stopped");
+ }
+
+ private void registerMessageHandlers(ExecutorService executor) {
+ clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
+ this::notifyDelegate, executor);
+
+ }
+
+
+ @Override
+ public void storeMeter(Meter meter) {
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+ meters.put(meter.id(), meter);
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+ new MeterOperation(meter, MeterOperation.Type.ADD));
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to install meter {} because {} on {}",
+ meter, error, master);
+
+ // notify app of failure
+ meter.context().ifPresent(c -> c.onError(
+ event.subject(), MeterFailReason.UNKNOWN));
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public void deleteMeter(Meter meter) {
+
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+ // update the state of the meter. It will be pruned by observing
+ // that it has been removed from the dataplane.
+ meters.put(meter.id(), meter);
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+ new MeterOperation(meter, MeterOperation.Type.REMOVE));
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to delete meter {} because {} on {}",
+ meter, error, master);
+
+ // notify app of failure
+ meter.context().ifPresent(c -> c.onError(
+ event.subject(), MeterFailReason.UNKNOWN));
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public void updateMeter(Meter meter) {
+
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+ meters.put(meter.id(), meter);
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+ new MeterOperation(meter, MeterOperation.Type.MODIFY));
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to update meter {} because {} on {}",
+ meter, error, master);
+
+ // notify app of failure
+ meter.context().ifPresent(c -> c.onError(
+ event.subject(), MeterFailReason.UNKNOWN));
+ }
+ });
+ }
+
+ }
+
+ @Override
+ public void updateMeterState(Meter meter) {
+ meters.compute(meter.id(), (id, v) -> {
+ DefaultMeter m = (DefaultMeter) v;
+ m.setState(meter.state());
+ m.setProcessedPackets(meter.packetsSeen());
+ m.setProcessedBytes(meter.bytesSeen());
+ m.setLife(meter.life());
+ m.setReferenceCount(meter.referenceCount());
+ return m;
+ });
+ }
+
+ @Override
+ public Meter getMeter(MeterId meterId) {
+ return meters.get(meterId).value();
+ }
+
+ @Override
+ public Collection<Meter> getAllMeters() {
+ return meters.values().stream()
+ .map(v -> v.value()).collect(Collectors.toSet());
+ }
+
+ @Override
+ public void failedMeter(MeterOperation op, MeterFailReason reason) {
+ NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
+ meters.remove(op.meter().id());
+
+ MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
+ if (Objects.equals(local, master)) {
+ notifyDelegate(event);
+ } else {
+ clusterCommunicationService.unicast(
+ event,
+ UPDATE_METER,
+ serializer::encode,
+ master
+ ).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to delete failed meter {} because {} on {}",
+ op.meter(), error, master);
+
+ // Can't do any more...
+ }
+ });
+ }
+
+ }
+
+}
diff --git a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java
index a87250e..00ad8ef 100644
--- a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java
+++ b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java
@@ -46,7 +46,7 @@
private final OFFactory factory;
private Meter.Unit unit = Meter.Unit.KB_PER_SEC;
private boolean burst = false;
- private Integer id;
+ private Long id;
private Collection<Band> bands;
public MeterModBuilder(long xid, OFFactory factory) {
diff --git a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java
index dabeda0..e56a54c 100644
--- a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java
+++ b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java
@@ -20,7 +20,6 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
-
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
@@ -28,7 +27,10 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.net.DeviceId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.meter.Band;
+import org.onosproject.incubator.net.meter.DefaultBand;
+import org.onosproject.incubator.net.meter.DefaultMeter;
import org.onosproject.incubator.net.meter.Meter;
import org.onosproject.incubator.net.meter.MeterFailReason;
import org.onosproject.incubator.net.meter.MeterOperation;
@@ -36,6 +38,8 @@
import org.onosproject.incubator.net.meter.MeterProvider;
import org.onosproject.incubator.net.meter.MeterProviderRegistry;
import org.onosproject.incubator.net.meter.MeterProviderService;
+import org.onosproject.incubator.net.meter.MeterState;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.openflow.controller.Dpid;
@@ -47,15 +51,23 @@
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFErrorType;
import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFMeterBandStats;
+import org.projectfloodlight.openflow.protocol.OFMeterConfigStatsReply;
+import org.projectfloodlight.openflow.protocol.OFMeterStats;
+import org.projectfloodlight.openflow.protocol.OFMeterStatsReply;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.projectfloodlight.openflow.protocol.OFStatsReply;
+import org.projectfloodlight.openflow.protocol.OFStatsType;
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.errormsg.OFMeterModFailedErrorMsg;
import org.slf4j.Logger;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
@@ -74,6 +86,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MeterProviderRegistry providerRegistry;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
private MeterProviderService providerService;
private static final AtomicLong XID_COUNTER = new AtomicLong(1);
@@ -81,8 +96,7 @@
static final int POLL_INTERVAL = 10;
static final long TIMEOUT = 30;
- private Cache<Integer, MeterOperation> pendingOperations;
- private Cache<Long, MeterOperation> pendingXid;
+ private Cache<Long, MeterOperation> pendingOperations;
private InternalMeterListener listener = new InternalMeterListener();
@@ -101,7 +115,7 @@
pendingOperations = CacheBuilder.newBuilder()
.expireAfterWrite(TIMEOUT, TimeUnit.SECONDS)
- .removalListener((RemovalNotification<Integer, MeterOperation> notification) -> {
+ .removalListener((RemovalNotification<Long, MeterOperation> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
providerService.meterOperationFailed(notification.getValue(),
MeterFailReason.TIMEOUT);
@@ -149,6 +163,8 @@
return;
}
+ performOperation(sw, meterOp);
+
}
private void performOperation(OpenFlowSwitch sw, MeterOperation op) {
@@ -203,7 +219,57 @@
}
private void pushMeterStats(Dpid dpid, OFStatsReply msg) {
+ DeviceId deviceId = DeviceId.deviceId(Dpid.uri(dpid));
+ if (msg.getStatsType() == OFStatsType.METER) {
+ OFMeterStatsReply reply = (OFMeterStatsReply) msg;
+ Collection<Meter> meters = buildMeters(deviceId, reply.getEntries());
+ //TODO do meter accounting here.
+ providerService.pushMeterMetrics(deviceId, meters);
+ } else if (msg.getStatsType() == OFStatsType.METER_CONFIG) {
+ OFMeterConfigStatsReply reply = (OFMeterConfigStatsReply) msg;
+ // FIXME: Map<Long, Meter> meters = collectMeters(deviceId, reply);
+ }
+
+ }
+
+ private Map<Long, Meter> collectMeters(DeviceId deviceId,
+ OFMeterConfigStatsReply reply) {
+ return Maps.newHashMap();
+ //TODO: Needs a fix to be applied to loxi MeterConfig stat is incorrect
+ }
+
+ private Collection<Meter> buildMeters(DeviceId deviceId,
+ List<OFMeterStats> entries) {
+ return entries.stream().map(stat -> {
+ DefaultMeter.Builder builder = DefaultMeter.builder();
+ Collection<Band> bands = buildBands(stat.getBandStats());
+ builder.forDevice(deviceId)
+ .withId(stat.getMeterId())
+ //FIXME: need to encode appId in meter id, but that makes
+ // things a little annoying for debugging
+ .fromApp(coreService.getAppId("org.onosproject.core"))
+ .withBands(bands);
+ DefaultMeter meter = builder.build();
+ meter.setState(MeterState.ADDED);
+ meter.setLife(stat.getDurationSec());
+ meter.setProcessedBytes(stat.getByteInCount().getValue());
+ meter.setProcessedPackets(stat.getPacketInCount().getValue());
+ meter.setReferenceCount(stat.getFlowCount());
+
+ // marks the meter as seen on the dataplane
+ pendingOperations.invalidate(stat.getMeterId());
+ return meter;
+ }).collect(Collectors.toSet());
+ }
+
+ private Collection<Band> buildBands(List<OFMeterBandStats> bandStats) {
+ return bandStats.stream().map(stat -> {
+ DefaultBand band = DefaultBand.builder().build();
+ band.setBytes(stat.getByteBandCount().getValue());
+ band.setPackets(stat.getPacketBandCount().getValue());
+ return band;
+ }).collect(Collectors.toSet());
}
private void signalMeterError(OFMeterModFailedErrorMsg meterError,
diff --git a/tools/test/bin/onos-form-cluster b/tools/test/bin/onos-form-cluster
index b6823b1..daca34c 100755
--- a/tools/test/bin/onos-form-cluster
+++ b/tools/test/bin/onos-form-cluster
@@ -29,4 +29,4 @@
set -x
-ssh $ONOS_USER@$node /tmp/$ONOS_BITS/bin/onos-form-cluster -u $user -p $password $nodes
\ No newline at end of file
+ssh $ONOS_USER@$node $ONOS_INSTALL_DIR/bin/onos-form-cluster -u $user -p $password $nodes