functional stats service

Change-Id: I90de3aa5d7721db8ef6a154e122af8b446243f60
diff --git a/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java b/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java
new file mode 100644
index 0000000..1826dde
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/statistic/DefaultLoad.java
@@ -0,0 +1,56 @@
+package org.onlab.onos.net.statistic;
+
+import org.onlab.onos.net.flow.FlowRuleProvider;
+
+/**
+ * Implementation of a load.
+ */
+public class DefaultLoad implements Load {
+
+    private final boolean isValid;
+    private final long current;
+    private final long previous;
+    private final long time;
+
+    /**
+     * Creates an invalid load.
+     */
+    public DefaultLoad() {
+        this.isValid = false;
+        this.time = System.currentTimeMillis();
+        this.current = -1;
+        this.previous = -1;
+    }
+
+    /**
+     * Creates a load value from the parameters.
+     * @param current the current value
+     * @param previous the previous value
+     */
+    public DefaultLoad(long current, long previous) {
+        this.current = current;
+        this.previous = previous;
+        this.time = System.currentTimeMillis();
+        this.isValid = true;
+    }
+
+    @Override
+    public long rate() {
+        return (current - previous) / FlowRuleProvider.POLL_INTERVAL;
+    }
+
+    @Override
+    public long latest() {
+        return current;
+    }
+
+    @Override
+    public boolean isValid() {
+        return isValid;
+    }
+
+    @Override
+    public long time() {
+        return time;
+    }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java b/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java
index 534b10c..b609f2b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java
+++ b/core/api/src/main/java/org/onlab/onos/net/statistic/Load.java
@@ -6,15 +6,27 @@
 public interface Load {
 
     /**
-     * Obtain the current observed rate on a link.
+     * Obtain the current observed rate (in bytes/s) on a link.
      * @return long value
      */
     long rate();
 
     /**
-     * Obtain the latest counter viewed on that link.
+     * Obtain the latest bytes counter viewed on that link.
      * @return long value
      */
     long latest();
 
+    /**
+     * Indicates whether this load was built on valid values.
+     * @return boolean
+     */
+    boolean isValid();
+
+    /**
+     * Returns when this value was seen.
+     * @return epoch time
+     */
+    long time();
+
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 01514d4..d8f89ae 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -108,6 +108,9 @@
             if (local) {
                 // TODO: aggregate all local rules and push down once?
                 applyFlowRulesToProviders(f);
+                eventDispatcher.post(
+                        new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
+
             }
         }
     }
@@ -136,6 +139,8 @@
             if (local) {
                 // TODO: aggregate all local rules and push down once?
                 removeFlowRulesFromProviders(f);
+                eventDispatcher.post(
+                        new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
             }
         }
     }
diff --git a/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java
index e682c49..49d21e0 100644
--- a/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/packet/impl/PacketManager.java
@@ -68,7 +68,9 @@
         checkNotNull(packet, "Packet cannot be null");
         final Device device = deviceService.getDevice(packet.sendThrough());
         final PacketProvider packetProvider = getProvider(device.providerId());
-        packetProvider.emit(packet);
+        if (packetProvider != null) {
+            packetProvider.emit(packet);
+        }
     }
 
     @Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
index 9b1a2e0..90db729 100644
--- a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
@@ -10,15 +10,19 @@
 import org.onlab.onos.net.Link;
 import org.onlab.onos.net.Path;
 
+import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleEvent;
 import org.onlab.onos.net.flow.FlowRuleListener;
 import org.onlab.onos.net.flow.FlowRuleService;
+import org.onlab.onos.net.statistic.DefaultLoad;
 import org.onlab.onos.net.statistic.Load;
 import org.onlab.onos.net.statistic.StatisticService;
 import org.onlab.onos.net.statistic.StatisticStore;
 import org.slf4j.Logger;
 
+import java.util.Set;
+
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -54,12 +58,12 @@
 
     @Override
     public Load load(Link link) {
-        return null;
+       return load(link.src());
     }
 
     @Override
     public Load load(ConnectPoint connectPoint) {
-        return null;
+        return loadInternal(connectPoint);
     }
 
     @Override
@@ -77,6 +81,35 @@
         return null;
     }
 
+    private Load loadInternal(ConnectPoint connectPoint) {
+        Set<FlowEntry> current;
+        Set<FlowEntry> previous;
+        synchronized (statisticStore) {
+            current = statisticStore.getCurrentStatistic(connectPoint);
+            previous = statisticStore.getPreviousStatistic(connectPoint);
+        }
+        if (current == null || previous == null) {
+            return new DefaultLoad();
+        }
+        long currentAggregate = aggregate(current);
+        long previousAggregate = aggregate(previous);
+
+        return new DefaultLoad(currentAggregate, previousAggregate);
+    }
+
+    /**
+     * Aggregates a set of values.
+     * @param values the values to aggregate
+     * @return a long value
+     */
+    private long aggregate(Set<FlowEntry> values) {
+        long sum = 0;
+        for (FlowEntry f : values) {
+            sum += f.bytes();
+        }
+        return sum;
+    }
+
     /**
      * Internal flow rule event listener.
      */
@@ -84,22 +117,29 @@
 
         @Override
         public void event(FlowRuleEvent event) {
-//            FlowRule rule = event.subject();
-//            switch (event.type()) {
-//                case RULE_ADDED:
-//                case RULE_UPDATED:
-//                    if (rule instanceof FlowEntry) {
-//                        statisticStore.addOrUpdateStatistic((FlowEntry) rule);
-//                    }
-//                    break;
-//                case RULE_ADD_REQUESTED:
-//                    statisticStore.prepareForStatistics(rule);
-//                    break;
-//                case RULE_REMOVE_REQUESTED:
-//                case RULE_REMOVED:
-//                    statisticStore.removeFromStatistics(rule);
-//                    break;
-//            }
+            FlowRule rule = event.subject();
+            switch (event.type()) {
+                case RULE_ADDED:
+                case RULE_UPDATED:
+                    if (rule instanceof FlowEntry) {
+                        statisticStore.addOrUpdateStatistic((FlowEntry) rule);
+                    } else {
+                        log.warn("IT AIN'T A FLOWENTRY");
+                    }
+                    break;
+                case RULE_ADD_REQUESTED:
+                    log.info("Preparing for stats");
+                    statisticStore.prepareForStatistics(rule);
+                    break;
+                case RULE_REMOVE_REQUESTED:
+                    log.info("Removing stats");
+                    statisticStore.removeFromStatistics(rule);
+                    break;
+                case RULE_REMOVED:
+                    break;
+                default:
+                    log.warn("Unknown flow rule event {}", event);
+            }
         }
     }
 
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index ca7cc07..cb966dc 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -6,9 +6,7 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -164,7 +162,8 @@
         assertEquals("2 rules should exist", 2, flowCount());
 
         providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
-        validateEvents(RULE_ADDED, RULE_ADDED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+                       RULE_ADDED, RULE_ADDED);
 
         addFlowRule(1);
         assertEquals("should still be 2 rules", 2, flowCount());
@@ -217,11 +216,12 @@
         FlowEntry fe2 = new DefaultFlowEntry(f2);
         FlowEntry fe3 = new DefaultFlowEntry(f3);
         providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2, fe3));
-        validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+                       RULE_ADDED, RULE_ADDED, RULE_ADDED);
 
         mgr.removeFlowRules(f1, f2);
         //removing from north, so no events generated
-        validateEvents();
+        validateEvents(RULE_REMOVE_REQUESTED, RULE_REMOVE_REQUESTED);
         assertEquals("3 rule should exist", 3, flowCount());
         assertTrue("Entries should be pending remove.",
                    validateState(ImmutableMap.of(
@@ -243,7 +243,8 @@
         service.removeFlowRules(f1);
         fe1.setState(FlowEntryState.REMOVED);
         providerService.flowRemoved(fe1);
-        validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
+                       RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
 
         providerService.flowRemoved(fe1);
         validateEvents();
@@ -252,7 +253,7 @@
         FlowEntry fe3 = new DefaultFlowEntry(f3);
         service.applyFlowRules(f3);
         providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
-        validateEvents(RULE_ADDED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
 
         providerService.flowRemoved(fe3);
         validateEvents();
@@ -281,7 +282,8 @@
                         f2, FlowEntryState.ADDED,
                         f3, FlowEntryState.PENDING_ADD)));
 
-        validateEvents(RULE_ADDED, RULE_ADDED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+                       RULE_ADDED, RULE_ADDED);
     }
 
     @Test
@@ -301,7 +303,7 @@
 
         providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2, fe3));
 
-        validateEvents(RULE_ADDED, RULE_ADDED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, RULE_ADDED);
 
     }
 
@@ -326,7 +328,8 @@
 
         providerService.pushFlowMetrics(DID, Lists.newArrayList(fe1, fe2));
 
-        validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
+        validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADD_REQUESTED,
+                       RULE_REMOVE_REQUESTED, RULE_ADDED, RULE_ADDED, RULE_REMOVED);
 
     }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
new file mode 100644
index 0000000..d89ce25
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -0,0 +1,289 @@
+package org.onlab.onos.store.statistic.impl;
+
+import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.flow.instructions.Instructions;
+import org.onlab.onos.net.statistic.StatisticStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
+import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.flow.ReplicaInfoService;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Maintains statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedStatisticStore implements StatisticStore {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ReplicaInfoService replicaInfoManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    private Map<ConnectPoint, InternalStatisticRepresentation> representations =
+            new ConcurrentHashMap<>();
+
+    private Map<ConnectPoint, Set<FlowEntry>> previous =
+            new ConcurrentHashMap<>();
+
+    private Map<ConnectPoint, Set<FlowEntry>> current =
+            new ConcurrentHashMap<>();
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespaces.API.newBuilder()
+                    .build()
+                    .populate(1);
+        }
+    };;
+
+    private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
+
+    @Activate
+    public void activate() {
+        clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                ConnectPoint cp = SERIALIZER.decode(message.payload());
+                try {
+                    message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp)));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+
+        clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                ConnectPoint cp = SERIALIZER.decode(message.payload());
+                try {
+                    message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public void prepareForStatistics(FlowRule rule) {
+        ConnectPoint cp = buildConnectPoint(rule);
+        if (cp == null) {
+            return;
+        }
+        InternalStatisticRepresentation rep;
+        synchronized (representations) {
+            rep = getOrCreateRepresentation(cp);
+        }
+        rep.prepare();
+    }
+
+    @Override
+    public void removeFromStatistics(FlowRule rule) {
+        ConnectPoint cp = buildConnectPoint(rule);
+        if (cp == null) {
+            return;
+        }
+        InternalStatisticRepresentation rep = representations.get(cp);
+        if (rep != null) {
+            rep.remove(rule);
+        }
+    }
+
+    @Override
+    public void addOrUpdateStatistic(FlowEntry rule) {
+        ConnectPoint cp = buildConnectPoint(rule);
+        if (cp == null) {
+            return;
+        }
+        InternalStatisticRepresentation rep = representations.get(cp);
+        if (rep != null && rep.submit(rule)) {
+            updatePublishedStats(cp, rep.get());
+        }
+    }
+
+    private synchronized void updatePublishedStats(ConnectPoint cp,
+                                                   Set<FlowEntry> flowEntries) {
+        Set<FlowEntry> curr = current.get(cp);
+        if (curr == null) {
+            curr = new HashSet<>();
+        }
+        previous.put(cp, curr);
+        current.put(cp, flowEntries);
+
+    }
+
+    @Override
+    public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return getCurrentStatisticInternal(connectPoint);
+        } else {
+            ClusterMessage message = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    GET_CURRENT,
+                    SERIALIZER.encode(connectPoint));
+
+            try {
+                ClusterMessageResponse response =
+                        clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+                return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
+                                                      TimeUnit.MILLISECONDS));
+            } catch (IOException | TimeoutException e) {
+                // FIXME: throw a FlowStoreException
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+        return current.get(connectPoint);
+    }
+
+    @Override
+    public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+            return getPreviousStatisticInternal(connectPoint);
+        } else {
+            ClusterMessage message = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    GET_CURRENT,
+                    SERIALIZER.encode(connectPoint));
+
+            try {
+                ClusterMessageResponse response =
+                        clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+                return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
+                                                      TimeUnit.MILLISECONDS));
+            } catch (IOException | TimeoutException e) {
+                // FIXME: throw a FlowStoreException
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+        return previous.get(connectPoint);
+    }
+
+    private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
+
+        if (representations.containsKey(cp)) {
+            return representations.get(cp);
+        } else {
+            InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
+            representations.put(cp, rep);
+            return rep;
+        }
+
+    }
+
+    private ConnectPoint buildConnectPoint(FlowRule rule) {
+        PortNumber port = getOutput(rule);
+        if (port == null) {
+            log.warn("Rule {} has no output.", rule);
+            return null;
+        }
+        ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+        return cp;
+    }
+
+    private PortNumber getOutput(FlowRule rule) {
+        for (Instruction i : rule.treatment().instructions()) {
+            if (i.type() == Instruction.Type.OUTPUT) {
+                Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+                return out.port();
+            }
+            if (i.type() == Instruction.Type.DROP) {
+                return PortNumber.P0;
+            }
+        }
+        return null;
+    }
+
+    private class InternalStatisticRepresentation {
+
+        private final AtomicInteger counter = new AtomicInteger(0);
+        private final Set<FlowEntry> rules = new HashSet<>();
+
+        public void prepare() {
+            counter.incrementAndGet();
+        }
+
+        public synchronized void remove(FlowRule rule) {
+            rules.remove(rule);
+            counter.decrementAndGet();
+        }
+
+        public synchronized boolean submit(FlowEntry rule) {
+            if (rules.contains(rule)) {
+                rules.remove(rule);
+            }
+            rules.add(rule);
+            if (counter.get() == 0) {
+                return true;
+            } else {
+                return counter.decrementAndGet() == 0;
+            }
+        }
+
+        public synchronized Set<FlowEntry> get() {
+            counter.set(rules.size());
+            return ImmutableSet.copyOf(rules);
+        }
+
+
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java
new file mode 100644
index 0000000..a096a3d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/StatisticStoreMessageSubjects.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.statistic.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by DistributedStatisticStore peer-peer communication.
+ */
+public final class StatisticStoreMessageSubjects {
+    private StatisticStoreMessageSubjects() {}
+        public static final MessageSubject GET_CURRENT =
+                new MessageSubject("peer-return-current");
+        public static final MessageSubject GET_PREVIOUS =
+            new MessageSubject("peer-return-previous");
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java
new file mode 100644
index 0000000..122d2be
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of the statistic store.
+ */
+package org.onlab.onos.store.statistic.impl;
\ No newline at end of file
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 9de6d8c..dc0eaa8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -4,6 +4,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
@@ -26,6 +27,7 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
@@ -75,6 +77,7 @@
                     ArrayList.class,
                     Arrays.asList().getClass(),
                     HashMap.class,
+                    HashSet.class,
                     //
                     //
                     ControllerNode.State.class,
@@ -94,6 +97,7 @@
                     HostDescription.class,
                     DefaultHostDescription.class,
                     DefaultFlowRule.class,
+                    DefaultFlowEntry.class,
                     FlowId.class,
                     DefaultTrafficSelector.class,
                     Criteria.PortCriterion.class,
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
index 95cd619..bf4fee0 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
@@ -16,6 +16,7 @@
 package org.onlab.onos.provider.lldp.impl;
 
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.nio.ByteBuffer;
@@ -95,11 +96,13 @@
      */
     public LinkDiscovery(Device device, PacketService pktService,
                          MastershipService masterService, LinkProviderService providerService, Boolean... useBDDP) {
+
         this.device = device;
         this.probeRate = 3000;
         this.linkProvider = providerService;
         this.pktService = pktService;
-        this.mastershipService = masterService;
+
+        this.mastershipService = checkNotNull(masterService, "WTF!");
         this.slowPorts = Collections.synchronizedSet(new HashSet<Long>());
         this.fastPorts = Collections.synchronizedSet(new HashSet<Long>());
         this.portProbeCount = new HashMap<>();
@@ -344,6 +347,12 @@
     }
 
     private void sendProbes(Long portNumber) {
+       if (device == null) {
+           log.warn("CRAZY SHIT");
+       }
+       if (mastershipService == null) {
+           log.warn("INSANE");
+       }
        if (device.type() != Device.Type.ROADM &&
                mastershipService.getLocalRole(this.device.id()) ==
                MastershipRole.MASTER) {
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 54265ba..4a21add 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -103,6 +103,8 @@
     private final Map<Long, InstallationFuture> pendingFMs =
             new ConcurrentHashMap<Long, InstallationFuture>();
 
+    private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+
     /**
      * Creates an OpenFlow host provider.
      */
@@ -115,6 +117,14 @@
         providerService = providerRegistry.register(this);
         controller.addListener(listener);
         controller.addEventListener(listener);
+
+        for (OpenFlowSwitch sw : controller.getSwitches()) {
+            FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
+            fsc.start();
+            collectors.put(new Dpid(sw.getId()), fsc);
+        }
+
+
         log.info("Started");
     }
 
@@ -213,7 +223,7 @@
     private class InternalFlowProvider
     implements OpenFlowSwitchListener, OpenFlowEventListener {
 
-        private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+
         private final Multimap<DeviceId, FlowEntry> completeEntries =
                 ArrayListMultimap.create();