Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
Conflicts:
core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
Change-Id: I23a7b1bff399c95499a39f7a00563650f58b7210
diff --git a/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java b/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java
new file mode 100644
index 0000000..1826dde
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java
@@ -0,0 +1,56 @@
+package org.onlab.onos.net.statistic;
+
+import org.onlab.onos.net.flow.FlowRuleProvider;
+
+/**
+ * Implementation of a load.
+ */
+public class DefaultLoad implements Load {
+
+ private final boolean isValid;
+ private final long current;
+ private final long previous;
+ private final long time;
+
+ /**
+ * Creates an invalid load.
+ */
+ public DefaultLoad() {
+ this.isValid = false;
+ this.time = System.currentTimeMillis();
+ this.current = -1;
+ this.previous = -1;
+ }
+
+ /**
+ * Creates a load value from the parameters.
+ * @param current the current value
+ * @param previous the previous value
+ */
+ public DefaultLoad(long current, long previous) {
+ this.current = current;
+ this.previous = previous;
+ this.time = System.currentTimeMillis();
+ this.isValid = true;
+ }
+
+ @Override
+ public long rate() {
+ return (current - previous) / FlowRuleProvider.POLL_INTERVAL;
+ }
+
+ @Override
+ public long latest() {
+ return current;
+ }
+
+ @Override
+ public boolean isValid() {
+ return isValid;
+ }
+
+ @Override
+ public long time() {
+ return time;
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java b/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java
index 534b10c..b609f2b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java
+++ b/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java
@@ -6,15 +6,27 @@
public interface Load {
/**
- * Obtain the current observed rate on a link.
+ * Obtain the current observed rate (in bytes/s) on a link.
* @return long value
*/
long rate();
/**
- * Obtain the latest counter viewed on that link.
+ * Obtain the latest bytes counter viewed on that link.
* @return long value
*/
long latest();
+ /**
+ * Indicates whether this load was built on valid values.
+ * @return boolean
+ */
+ boolean isValid();
+
+ /**
+ * Returns when this value was seen.
+ * @return epoch time
+ */
+ long time();
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 01514d4..d8f89ae 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -108,6 +108,9 @@
if (local) {
// TODO: aggregate all local rules and push down once?
applyFlowRulesToProviders(f);
+ eventDispatcher.post(
+ new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
+
}
}
}
@@ -136,6 +139,8 @@
if (local) {
// TODO: aggregate all local rules and push down once?
removeFlowRulesFromProviders(f);
+ eventDispatcher.post(
+ new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
}
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java
index e682c49..49d21e0 100644
--- a/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java
@@ -68,7 +68,9 @@
checkNotNull(packet, "Packet cannot be null");
final Device device = deviceService.getDevice(packet.sendThrough());
final PacketProvider packetProvider = getProvider(device.providerId());
- packetProvider.emit(packet);
+ if (packetProvider != null) {
+ packetProvider.emit(packet);
+ }
}
@Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
index 9b1a2e0..90db729 100644
--- a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
@@ -10,15 +10,19 @@
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
+import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleService;
+import org.onlab.onos.net.statistic.DefaultLoad;
import org.onlab.onos.net.statistic.Load;
import org.onlab.onos.net.statistic.StatisticService;
import org.onlab.onos.net.statistic.StatisticStore;
import org.slf4j.Logger;
+import java.util.Set;
+
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -54,12 +58,12 @@
@Override
public Load load(Link link) {
- return null;
+ return load(link.src());
}
@Override
public Load load(ConnectPoint connectPoint) {
- return null;
+ return loadInternal(connectPoint);
}
@Override
@@ -77,6 +81,35 @@
return null;
}
+ private Load loadInternal(ConnectPoint connectPoint) {
+ Set<FlowEntry> current;
+ Set<FlowEntry> previous;
+ synchronized (statisticStore) {
+ current = statisticStore.getCurrentStatistic(connectPoint);
+ previous = statisticStore.getPreviousStatistic(connectPoint);
+ }
+ if (current == null || previous == null) {
+ return new DefaultLoad();
+ }
+ long currentAggregate = aggregate(current);
+ long previousAggregate = aggregate(previous);
+
+ return new DefaultLoad(currentAggregate, previousAggregate);
+ }
+
+ /**
+ * Aggregates a set of values.
+ * @param values the values to aggregate
+ * @return a long value
+ */
+ private long aggregate(Set<FlowEntry> values) {
+ long sum = 0;
+ for (FlowEntry f : values) {
+ sum += f.bytes();
+ }
+ return sum;
+ }
+
/**
* Internal flow rule event listener.
*/
@@ -84,22 +117,29 @@
@Override
public void event(FlowRuleEvent event) {
-// FlowRule rule = event.subject();
-// switch (event.type()) {
-// case RULE_ADDED:
-// case RULE_UPDATED:
-// if (rule instanceof FlowEntry) {
-// statisticStore.addOrUpdateStatistic((FlowEntry) rule);
-// }
-// break;
-// case RULE_ADD_REQUESTED:
-// statisticStore.prepareForStatistics(rule);
-// break;
-// case RULE_REMOVE_REQUESTED:
-// case RULE_REMOVED:
-// statisticStore.removeFromStatistics(rule);
-// break;
-// }
+ FlowRule rule = event.subject();
+ switch (event.type()) {
+ case RULE_ADDED:
+ case RULE_UPDATED:
+ if (rule instanceof FlowEntry) {
+ statisticStore.addOrUpdateStatistic((FlowEntry) rule);
+ } else {
+ log.warn("IT AIN'T A FLOWENTRY");
+ }
+ break;
+ case RULE_ADD_REQUESTED:
+ log.info("Preparing for stats");
+ statisticStore.prepareForStatistics(rule);
+ break;
+ case RULE_REMOVE_REQUESTED:
+ log.info("Removing stats");
+ statisticStore.removeFromStatistics(rule);
+ break;
+ case RULE_REMOVED:
+ break;
+ default:
+ log.warn("Unknown flow rule event {}", event);
+ }
}
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 5777285..a2fbc9a 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -1,5 +1,10 @@
package org.onlab.onos.net.flow.impl;
+
+
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*;
+
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -164,7 +169,8 @@
assertEquals("2 rules should exist", 2, flowCount());
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
- validateEvents(RULE_ADDED, RULE_ADDED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+ RULE_ADDED, RULE_ADDED);
addFlowRule(1);
assertEquals("should still be 2 rules", 2, flowCount());
@@ -218,11 +224,12 @@
FlowEntry fe2 = new DefaultFlowEntry(f2);
FlowEntry fe3 = new DefaultFlowEntry(f3);
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2, fe3));
- validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+ RULE_ADDED, RULE_ADDED, RULE_ADDED);
mgr.removeFlowRules(f1, f2);
//removing from north, so no events generated
- validateEvents();
+ validateEvents(RULE_REMOVE_REQUESTED, RULE_REMOVE_REQUESTED);
assertEquals("3 rule should exist", 3, flowCount());
assertTrue("Entries should be pending remove.",
validateState(ImmutableMap.of(
@@ -244,7 +251,8 @@
service.removeFlowRules(f1);
fe1.setState(FlowEntryState.REMOVED);
providerService.flowRemoved(fe1);
- validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
+ RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
providerService.flowRemoved(fe1);
validateEvents();
@@ -253,7 +261,7 @@
FlowEntry fe3 = new DefaultFlowEntry(f3);
service.applyFlowRules(f3);
providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
- validateEvents(RULE_ADDED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
providerService.flowRemoved(fe3);
validateEvents();
@@ -282,7 +290,8 @@
f2, FlowEntryState.ADDED,
f3, FlowEntryState.PENDING_ADD)));
- validateEvents(RULE_ADDED, RULE_ADDED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+ RULE_ADDED, RULE_ADDED);
}
@Test
@@ -302,7 +311,7 @@
providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2, fe3));
- validateEvents(RULE_ADDED, RULE_ADDED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, RULE_ADDED);
}
@@ -327,7 +336,8 @@
providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2));
- validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
+ validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+ RULE_REMOVE_REQUESTED, RULE_ADDED, RULE_ADDED, RULE_REMOVED);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
new file mode 100644
index 0000000..d89ce25
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -0,0 +1,289 @@
+package org.onlab.onos.store.statistic.impl;
+
+import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.flow.instructions.Instructions;
+import org.onlab.onos.net.statistic.StatisticStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
+import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.flow.ReplicaInfoService;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Maintains statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedStatisticStore implements StatisticStore {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ReplicaInfoService replicaInfoManager;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ private Map<ConnectPoint, InternalStatisticRepresentation> representations =
+ new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> previous =
+ new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> current =
+ new ConcurrentHashMap<>();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespaces.API.newBuilder()
+ .build()
+ .populate(1);
+ }
+ };;
+
+ private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
+
+ @Activate
+ public void activate() {
+ clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ ConnectPoint cp = SERIALIZER.decode(message.payload());
+ try {
+ message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ });
+
+ clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ ConnectPoint cp = SERIALIZER.decode(message.payload());
+ try {
+ message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ });
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public void prepareForStatistics(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+ InternalStatisticRepresentation rep;
+ synchronized (representations) {
+ rep = getOrCreateRepresentation(cp);
+ }
+ rep.prepare();
+ }
+
+ @Override
+ public void removeFromStatistics(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+ InternalStatisticRepresentation rep = representations.get(cp);
+ if (rep != null) {
+ rep.remove(rule);
+ }
+ }
+
+ @Override
+ public void addOrUpdateStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+ InternalStatisticRepresentation rep = representations.get(cp);
+ if (rep != null && rep.submit(rule)) {
+ updatePublishedStats(cp, rep.get());
+ }
+ }
+
+ private synchronized void updatePublishedStats(ConnectPoint cp,
+ Set<FlowEntry> flowEntries) {
+ Set<FlowEntry> curr = current.get(cp);
+ if (curr == null) {
+ curr = new HashSet<>();
+ }
+ previous.put(cp, curr);
+ current.put(cp, flowEntries);
+
+ }
+
+ @Override
+ public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return getCurrentStatisticInternal(connectPoint);
+ } else {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GET_CURRENT,
+ SERIALIZER.encode(connectPoint));
+
+ try {
+ ClusterMessageResponse response =
+ clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+ return current.get(connectPoint);
+ }
+
+ @Override
+ public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return getPreviousStatisticInternal(connectPoint);
+ } else {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GET_CURRENT,
+ SERIALIZER.encode(connectPoint));
+
+ try {
+ ClusterMessageResponse response =
+ clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+ return previous.get(connectPoint);
+ }
+
+ private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
+
+ if (representations.containsKey(cp)) {
+ return representations.get(cp);
+ } else {
+ InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
+ representations.put(cp, rep);
+ return rep;
+ }
+
+ }
+
+ private ConnectPoint buildConnectPoint(FlowRule rule) {
+ PortNumber port = getOutput(rule);
+ if (port == null) {
+ log.warn("Rule {} has no output.", rule);
+ return null;
+ }
+ ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+ return cp;
+ }
+
+ private PortNumber getOutput(FlowRule rule) {
+ for (Instruction i : rule.treatment().instructions()) {
+ if (i.type() == Instruction.Type.OUTPUT) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+ return out.port();
+ }
+ if (i.type() == Instruction.Type.DROP) {
+ return PortNumber.P0;
+ }
+ }
+ return null;
+ }
+
+ private class InternalStatisticRepresentation {
+
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final Set<FlowEntry> rules = new HashSet<>();
+
+ public void prepare() {
+ counter.incrementAndGet();
+ }
+
+ public synchronized void remove(FlowRule rule) {
+ rules.remove(rule);
+ counter.decrementAndGet();
+ }
+
+ public synchronized boolean submit(FlowEntry rule) {
+ if (rules.contains(rule)) {
+ rules.remove(rule);
+ }
+ rules.add(rule);
+ if (counter.get() == 0) {
+ return true;
+ } else {
+ return counter.decrementAndGet() == 0;
+ }
+ }
+
+ public synchronized Set<FlowEntry> get() {
+ counter.set(rules.size());
+ return ImmutableSet.copyOf(rules);
+ }
+
+
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java
new file mode 100644
index 0000000..a096a3d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.statistic.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by DistributedStatisticStore peer-peer communication.
+ */
+public final class StatisticStoreMessageSubjects {
+ private StatisticStoreMessageSubjects() {}
+ public static final MessageSubject GET_CURRENT =
+ new MessageSubject("peer-return-current");
+ public static final MessageSubject GET_PREVIOUS =
+ new MessageSubject("peer-return-previous");
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java
new file mode 100644
index 0000000..122d2be
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of the statistic store.
+ */
+package org.onlab.onos.store.statistic.impl;
\ No newline at end of file
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 9de6d8c..dc0eaa8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
@@ -26,6 +27,7 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
@@ -75,6 +77,7 @@
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
+ HashSet.class,
//
//
ControllerNode.State.class,
@@ -94,6 +97,7 @@
HostDescription.class,
DefaultHostDescription.class,
DefaultFlowRule.class,
+ DefaultFlowEntry.class,
FlowId.class,
DefaultTrafficSelector.class,
Criteria.PortCriterion.class,
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
index 95cd619..bf4fee0 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
@@ -16,6 +16,7 @@
package org.onlab.onos.provider.lldp.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
@@ -95,11 +96,13 @@
*/
public LinkDiscovery(Device device, PacketService pktService,
MastershipService masterService, LinkProviderService providerService, Boolean... useBDDP) {
+
this.device = device;
this.probeRate = 3000;
this.linkProvider = providerService;
this.pktService = pktService;
- this.mastershipService = masterService;
+
+ this.mastershipService = checkNotNull(masterService, "WTF!");
this.slowPorts = Collections.synchronizedSet(new HashSet<Long>());
this.fastPorts = Collections.synchronizedSet(new HashSet<Long>());
this.portProbeCount = new HashMap<>();
@@ -344,6 +347,12 @@
}
private void sendProbes(Long portNumber) {
+ if (device == null) {
+ log.warn("CRAZY SHIT");
+ }
+ if (mastershipService == null) {
+ log.warn("INSANE");
+ }
if (device.type() != Device.Type.ROADM &&
mastershipService.getLocalRole(this.device.id()) ==
MastershipRole.MASTER) {
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 03b9c7d..6fb54e8 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -103,6 +103,8 @@
private final Map<Long, InstallationFuture> pendingFMs =
new ConcurrentHashMap<Long, InstallationFuture>();
+ private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -115,6 +117,14 @@
providerService = providerRegistry.register(this);
controller.addListener(listener);
controller.addEventListener(listener);
+
+ for (OpenFlowSwitch sw : controller.getSwitches()) {
+ FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
+ fsc.start();
+ collectors.put(new Dpid(sw.getId()), fsc);
+ }
+
+
log.info("Started");
}
@@ -213,7 +223,7 @@
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
- private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+
private final Multimap<DeviceId, FlowEntry> completeEntries =
ArrayListMultimap.create();