Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java b/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java
index 318aebd..081efed 100644
--- a/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java
+++ b/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java
@@ -3,8 +3,6 @@
 import java.util.List;
 
 import org.codehaus.jackson.annotate.JsonProperty;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.MacAddress;
 
 /**
  * Represents a set of addresses bound to a port.
@@ -12,8 +10,8 @@
 public class AddressEntry {
     private String dpid;
     private short portNumber;
-    private List<IpPrefix> ipAddresses;
-    private MacAddress macAddress;
+    private List<String> ipAddresses;
+    private String macAddress;
 
     public String getDpid() {
         return dpid;
@@ -33,21 +31,21 @@
         this.portNumber = portNumber;
     }
 
-    public List<IpPrefix> getIpAddresses() {
+    public List<String> getIpAddresses() {
         return ipAddresses;
     }
 
     @JsonProperty("ips")
-    public void setIpAddresses(List<IpPrefix> ipAddresses) {
-        this.ipAddresses = ipAddresses;
+    public void setIpAddresses(List<String> strIps) {
+        this.ipAddresses = strIps;
     }
 
-    public MacAddress getMacAddress() {
+    public String getMacAddress() {
         return macAddress;
     }
 
     @JsonProperty("mac")
-    public void setMacAddress(MacAddress macAddress) {
+    public void setMacAddress(String macAddress) {
         this.macAddress = macAddress;
     }
 }
diff --git a/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java b/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java
index 985c4a2..4f1a48a 100644
--- a/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java
+++ b/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java
@@ -5,6 +5,8 @@
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -17,10 +19,10 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.host.HostAdminService;
 import org.onlab.onos.net.host.PortAddresses;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Sets;
-
 /**
  * Simple configuration module to read in supplementary network configuration
  * from a file.
@@ -51,9 +53,29 @@
                         DeviceId.deviceId(dpidToUri(entry.getDpid())),
                         PortNumber.portNumber(entry.getPortNumber()));
 
+                Set<IpPrefix> ipAddresses = new HashSet<IpPrefix>();
+
+                for (String strIp : entry.getIpAddresses()) {
+                    try {
+                        IpPrefix address = IpPrefix.valueOf(strIp);
+                        ipAddresses.add(address);
+                    } catch (IllegalArgumentException e) {
+                        log.warn("Bad format for IP address in config: {}", strIp);
+                    }
+                }
+
+                MacAddress macAddress = null;
+                if (entry.getMacAddress() != null) {
+                    try {
+                        macAddress = MacAddress.valueOf(entry.getMacAddress());
+                    } catch (IllegalArgumentException e) {
+                        log.warn("Bad format for MAC address in config: {}",
+                                entry.getMacAddress());
+                    }
+                }
+
                 PortAddresses addresses = new PortAddresses(cp,
-                        Sets.newHashSet(entry.getIpAddresses()),
-                        entry.getMacAddress());
+                        ipAddresses, macAddress);
 
                 hostAdminService.bindAddressesToPort(addresses);
             }
diff --git a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
index f7fbdbb..aaf5350 100644
--- a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
+++ b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
@@ -38,6 +38,8 @@
 @Component(immediate = true)
 public class ReactiveForwarding {
 
+    private static final int TIMEOUT = 10;
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -184,15 +186,15 @@
             Ethernet inPkt = context.inPacket().parsed();
             TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder();
             builder.matchEthType(inPkt.getEtherType())
-            .matchEthSrc(inPkt.getSourceMAC())
-            .matchEthDst(inPkt.getDestinationMAC())
-            .matchInport(context.inPacket().receivedFrom().port());
+                .matchEthSrc(inPkt.getSourceMAC())
+                .matchEthDst(inPkt.getDestinationMAC())
+                .matchInport(context.inPacket().receivedFrom().port());
 
             TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder();
             treat.setOutput(portNumber);
 
             FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(),
-                    builder.build(), treat.build(), 0, appId);
+                    builder.build(), treat.build(), 0, appId, TIMEOUT);
 
             flowRuleService.applyFlowRules(f);
         }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
index f705a94..bb4805b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
@@ -27,11 +27,12 @@
 
     private final ApplicationId appId;
 
-    private boolean expired;
+    private final int timeout;
 
     public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
             TrafficTreatment treatment, int priority, FlowRuleState state,
-            long life, long packets, long bytes, long flowId, boolean expired) {
+            long life, long packets, long bytes, long flowId, boolean expired,
+            int timeout) {
         this.deviceId = deviceId;
         this.priority = priority;
         this.selector = selector;
@@ -39,26 +40,30 @@
         this.state = state;
         this.appId = ApplicationId.valueOf((int) (flowId >> 32));
         this.id = FlowId.valueOf(flowId);
-        this.expired = expired;
         this.life = life;
         this.packets = packets;
         this.bytes = bytes;
         this.created = System.currentTimeMillis();
+        this.timeout = timeout;
     }
 
     public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
-            TrafficTreatment treatement, int priority, ApplicationId appId) {
-        this(deviceId, selector, treatement, priority, FlowRuleState.CREATED, appId);
+            TrafficTreatment treatement, int priority, ApplicationId appId,
+            int timeout) {
+        this(deviceId, selector, treatement, priority,
+                FlowRuleState.CREATED, appId, timeout);
     }
 
     public DefaultFlowRule(FlowRule rule, FlowRuleState state) {
         this(rule.deviceId(), rule.selector(), rule.treatment(),
-                rule.priority(), state, rule.id(), rule.appId());
+                rule.priority(), state, rule.id(), rule.appId(),
+                rule.timeout());
     }
 
     private DefaultFlowRule(DeviceId deviceId,
             TrafficSelector selector, TrafficTreatment treatment,
-            int priority, FlowRuleState state, ApplicationId appId) {
+            int priority, FlowRuleState state, ApplicationId appId,
+            int timeout) {
         this.deviceId = deviceId;
         this.priority = priority;
         this.selector = selector;
@@ -69,13 +74,16 @@
         this.bytes = 0;
         this.appId = appId;
 
+        this.timeout = timeout;
+
         this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL));
         this.created = System.currentTimeMillis();
     }
 
     private DefaultFlowRule(DeviceId deviceId,
             TrafficSelector selector, TrafficTreatment treatment,
-            int priority, FlowRuleState state, FlowId flowId, ApplicationId appId) {
+            int priority, FlowRuleState state, FlowId flowId, ApplicationId appId,
+            int timeout) {
         this.deviceId = deviceId;
         this.priority = priority;
         this.selector = selector;
@@ -86,6 +94,7 @@
         this.bytes = 0;
         this.appId = appId;
         this.id = flowId;
+        this.timeout = timeout;
         this.created = System.currentTimeMillis();
     }
 
@@ -149,7 +158,7 @@
      * @see java.lang.Object#equals(java.lang.Object)
      */
     public int hashCode() {
-        return Objects.hash(deviceId, id);
+        return Objects.hash(deviceId, selector, priority);
     }
 
     public int hash() {
@@ -170,7 +179,10 @@
         if (obj instanceof DefaultFlowRule) {
             DefaultFlowRule that = (DefaultFlowRule) obj;
             return Objects.equals(deviceId, that.deviceId) &&
-                    Objects.equals(id, that.id);
+                    //Objects.equals(id, that.id) &&
+                    Objects.equals(priority, that.priority) &&
+                    Objects.equals(selector, that.selector);
+
         }
         return false;
     }
@@ -181,16 +193,16 @@
                 .add("id", id)
                 .add("deviceId", deviceId)
                 .add("priority", priority)
-                .add("selector", selector)
-                .add("treatment", treatment)
+                .add("selector", selector.criteria())
+                .add("treatment", treatment == null ? "N/A" : treatment.instructions())
                 .add("created", created)
                 .add("state", state)
                 .toString();
     }
 
     @Override
-    public boolean expired() {
-        return expired;
+    public int timeout() {
+        return timeout > MAX_TIMEOUT ? MAX_TIMEOUT : this.timeout;
     }
 
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
index 8f68ea5..d792c7e 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
@@ -3,8 +3,9 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
 
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.flow.criteria.Criteria;
@@ -16,22 +17,42 @@
 
 public final class DefaultTrafficSelector implements TrafficSelector {
 
-    private final List<Criterion> selector;
+    private final Set<Criterion> selector;
 
-    private DefaultTrafficSelector(List<Criterion> selector) {
-        this.selector = Collections.unmodifiableList(selector);
+    private DefaultTrafficSelector(Set<Criterion> selector) {
+        this.selector = Collections.unmodifiableSet(selector);
     }
 
     @Override
-    public List<Criterion> criteria() {
+    public Set<Criterion> criteria() {
         return selector;
     }
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(selector);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof DefaultTrafficSelector) {
+            DefaultTrafficSelector that = (DefaultTrafficSelector) obj;
+            return Objects.equals(selector, that.selector);
+
+        }
+        return false;
+    }
+
+
+
     public static class Builder implements TrafficSelector.Builder {
 
         private final Logger log = getLogger(getClass());
 
-        private final List<Criterion> selector = new LinkedList<>();
+        private final Set<Criterion> selector = new HashSet<>();
 
         @Override
         public Builder add(Criterion criterion) {
@@ -39,38 +60,47 @@
             return this;
         }
 
+        @Override
         public Builder matchInport(PortNumber port) {
             return add(Criteria.matchInPort(port));
         }
 
+        @Override
         public Builder matchEthSrc(MacAddress addr) {
             return add(Criteria.matchEthSrc(addr));
         }
 
+        @Override
         public Builder matchEthDst(MacAddress addr) {
             return add(Criteria.matchEthDst(addr));
         }
 
+        @Override
         public Builder matchEthType(short ethType) {
             return add(Criteria.matchEthType(ethType));
         }
 
+        @Override
         public Builder matchVlanId(VlanId vlanId) {
             return add(Criteria.matchVlanId(vlanId));
         }
 
+        @Override
         public Builder matchVlanPcp(Byte vlanPcp) {
             return add(Criteria.matchVlanPcp(vlanPcp));
         }
 
+        @Override
         public Builder matchIPProtocol(Byte proto) {
             return add(Criteria.matchIPProtocol(proto));
         }
 
+        @Override
         public Builder matchIPSrc(IpPrefix ip) {
             return add(Criteria.matchIPSrc(ip));
         }
 
+        @Override
         public Builder matchIPDst(IpPrefix ip) {
             return add(Criteria.matchIPDst(ip));
         }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
index 2728e21..4d1b3cf 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
@@ -9,6 +9,7 @@
  */
 public interface FlowRule {
 
+    static final int MAX_TIMEOUT = 60;
 
     public enum FlowRuleState {
         /**
@@ -112,10 +113,9 @@
     long bytes();
 
     /**
-     * Indicates that this flow has expired at the device.
-     *
-     * @return true if it has expired, false otherwise
+     * Returns the timeout for this flow requested by an application.
+     * @return integer value of the timeout
      */
-    boolean expired();
+    int timeout();
 
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index b2c3d30..c4e2f92 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -8,6 +8,8 @@
  */
 public interface FlowRuleProvider extends Provider {
 
+    static final int POLL_INTERVAL = 5;
+
     /**
      * Instructs the provider to apply the specified flow rules to their
      * respective devices.
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java
index 01e4372..2076103 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java
@@ -17,27 +17,6 @@
     void flowRemoved(FlowRule flowRule);
 
     /**
-     * Signals that a flow rule is missing for some network traffic.
-     *
-     * @param flowRule information about traffic in need of flow rule(s)
-     */
-    void flowMissing(FlowRule flowRule);
-
-    /**
-     * Signals that a flow rule is on the switch but not in the store.
-     *
-     * @param flowRule the extra flow rule
-     */
-    void extraneousFlow(FlowRule flowRule);
-
-    /**
-     * Signals that a flow rule was indeed added.
-     *
-     * @param flowRule the added flow rule
-     */
-    void flowAdded(FlowRule flowRule);
-
-    /**
      * Pushes the collection of flow entries currently applied on the given
      * device.
      *
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
index 249d1f9..c704c8f 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
@@ -1,6 +1,6 @@
 package org.onlab.onos.net.flow;
 
-import java.util.List;
+import java.util.Set;
 
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.flow.criteria.Criterion;
@@ -18,7 +18,7 @@
      *
      * @return list of criteria
      */
-    List<Criterion> criteria();
+    Set<Criterion> criteria();
 
     /**
      * Builder of traffic selector entities.
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
index 758c51c..a819bd3 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
@@ -2,6 +2,8 @@
 
 import static com.google.common.base.MoreObjects.toStringHelper;
 
+import java.util.Objects;
+
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.flow.criteria.Criterion.Type;
 import org.onlab.packet.IpPrefix;
@@ -137,6 +139,25 @@
             return toStringHelper(type().toString())
                     .add("port", port).toString();
         }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(port);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof PortCriterion) {
+                PortCriterion that = (PortCriterion) obj;
+                return Objects.equals(port, that.port);
+
+            }
+            return false;
+        }
+
     }
 
 
@@ -164,6 +185,27 @@
                     .add("mac", mac).toString();
         }
 
+        @Override
+        public int hashCode() {
+            return Objects.hash(mac, type);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof EthCriterion) {
+                EthCriterion that = (EthCriterion) obj;
+                return Objects.equals(mac, that.mac) &&
+                        Objects.equals(type, that.type);
+
+
+            }
+            return false;
+        }
+
+
     }
 
     public static final class EthTypeCriterion implements Criterion {
@@ -189,6 +231,25 @@
                     .add("ethType", Long.toHexString(ethType)).toString();
         }
 
+        @Override
+        public int hashCode() {
+            return Objects.hash(ethType);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof EthTypeCriterion) {
+                EthTypeCriterion that = (EthTypeCriterion) obj;
+                return Objects.equals(ethType, that.ethType);
+
+
+            }
+            return false;
+        }
+
     }
 
 
@@ -217,6 +278,26 @@
                     .add("ip", ip).toString();
         }
 
+        @Override
+        public int hashCode() {
+            return Objects.hash(ip, type);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof IPCriterion) {
+                IPCriterion that = (IPCriterion) obj;
+                return Objects.equals(ip, that.ip) &&
+                        Objects.equals(type, that.type);
+
+
+            }
+            return false;
+        }
+
     }
 
 
@@ -243,6 +324,25 @@
                     .add("protocol", Long.toHexString(proto)).toString();
         }
 
+        @Override
+        public int hashCode() {
+            return Objects.hash(proto);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof IPProtocolCriterion) {
+                IPProtocolCriterion that = (IPProtocolCriterion) obj;
+                return Objects.equals(proto, that.proto);
+
+
+            }
+            return false;
+        }
+
     }
 
 
@@ -269,6 +369,25 @@
                     .add("pcp", Long.toHexString(vlanPcp)).toString();
         }
 
+        @Override
+        public int hashCode() {
+            return Objects.hash(vlanPcp);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof VlanPcpCriterion) {
+                VlanPcpCriterion that = (VlanPcpCriterion) obj;
+                return Objects.equals(vlanPcp, that.vlanPcp);
+
+
+            }
+            return false;
+        }
+
     }
 
 
@@ -296,6 +415,25 @@
                     .add("id", vlanId).toString();
         }
 
+        @Override
+        public int hashCode() {
+            return Objects.hash(vlanId);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof VlanIdCriterion) {
+                VlanIdCriterion that = (VlanIdCriterion) obj;
+                return Objects.equals(vlanId, that.vlanId);
+
+
+            }
+            return false;
+        }
+
     }
 
 
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index a6f5ebb..00619b3 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,9 @@
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -59,6 +62,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    private final Map<FlowRule, AtomicInteger> deadRounds = new ConcurrentHashMap<>();
+
     @Activate
     public void activate() {
         store.setDelegate(delegate);
@@ -84,6 +89,7 @@
             FlowRule f = flowRules[i];
             final Device device = deviceService.getDevice(f.deviceId());
             final FlowRuleProvider frp = getProvider(device.providerId());
+            deadRounds.put(f, new AtomicInteger(0));
             store.storeFlowRule(f);
             frp.applyFlowRule(f);
         }
@@ -98,6 +104,7 @@
             f = flowRules[i];
             device = deviceService.getDevice(f.deviceId());
             frp = getProvider(device.providerId());
+            deadRounds.remove(f);
             store.deleteFlowRule(f);
             frp.removeFlowRule(f);
         }
@@ -161,11 +168,7 @@
             switch (stored.state()) {
             case ADDED:
             case PENDING_ADD:
-                if (flowRule.expired()) {
-                    event = store.removeFlowRule(flowRule);
-                } else {
                     frp.applyFlowRule(stored);
-                }
                 break;
             case PENDING_REMOVE:
             case REMOVED:
@@ -181,8 +184,8 @@
             }
         }
 
-        @Override
-        public void flowMissing(FlowRule flowRule) {
+
+        private void flowMissing(FlowRule flowRule) {
             checkNotNull(flowRule, FLOW_RULE_NULL);
             checkValidity();
             Device device = deviceService.getDevice(flowRule.deviceId());
@@ -209,29 +212,47 @@
 
         }
 
-        @Override
-        public void extraneousFlow(FlowRule flowRule) {
+
+        private void extraneousFlow(FlowRule flowRule) {
             checkNotNull(flowRule, FLOW_RULE_NULL);
             checkValidity();
             removeFlowRules(flowRule);
             log.debug("Flow {} is on switch but not in store.", flowRule);
         }
 
-        @Override
-        public void flowAdded(FlowRule flowRule) {
+
+        private void flowAdded(FlowRule flowRule) {
             checkNotNull(flowRule, FLOW_RULE_NULL);
             checkValidity();
 
-            FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule);
-            if (event == null) {
-                log.debug("No flow store event generated.");
+            if (deadRounds.containsKey(flowRule) &&
+                    checkRuleLiveness(flowRule, store.getFlowRule(flowRule))) {
+
+                FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule);
+                if (event == null) {
+                    log.debug("No flow store event generated.");
+                } else {
+                    log.debug("Flow {} {}", flowRule, event.type());
+                    post(event);
+                }
             } else {
-                log.debug("Flow {} {}", flowRule, event.type());
-                post(event);
+                removeFlowRules(flowRule);
             }
 
         }
 
+        private boolean checkRuleLiveness(FlowRule swRule, FlowRule storedRule) {
+            int timeout = storedRule.timeout();
+            if (storedRule.packets() != swRule.packets()) {
+                deadRounds.get(swRule).set(0);
+                return true;
+            }
+
+            return (deadRounds.get(swRule).getAndIncrement() *
+                    FlowRuleProvider.POLL_INTERVAL) <= timeout;
+
+        }
+
         // Posts the specified event to the local event dispatcher.
         private void post(FlowRuleEvent event) {
             if (event != null) {
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java
index e3f53fe..88b6923 100644
--- a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java
@@ -1,5 +1,10 @@
 package org.onlab.onos.net.host.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Set;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -12,6 +17,7 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.Host;
 import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.device.DeviceService;
 import org.onlab.onos.net.host.HostAdminService;
 import org.onlab.onos.net.host.HostDescription;
 import org.onlab.onos.net.host.HostEvent;
@@ -23,6 +29,7 @@
 import org.onlab.onos.net.host.HostStore;
 import org.onlab.onos.net.host.HostStoreDelegate;
 import org.onlab.onos.net.host.PortAddresses;
+import org.onlab.onos.net.packet.PacketService;
 import org.onlab.onos.net.provider.AbstractProviderRegistry;
 import org.onlab.onos.net.provider.AbstractProviderService;
 import org.onlab.packet.IpAddress;
@@ -31,11 +38,6 @@
 import org.onlab.packet.VlanId;
 import org.slf4j.Logger;
 
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
 /**
  * Provides basic implementation of the host SB &amp; NB APIs.
  */
@@ -59,12 +61,22 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected EventDeliveryService eventDispatcher;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PacketService packetService;
+
+    private HostMonitor monitor;
 
     @Activate
     public void activate() {
+        log.info("Started");
         store.setDelegate(delegate);
         eventDispatcher.addSink(HostEvent.class, listenerRegistry);
-        log.info("Started");
+
+        monitor = new HostMonitor(deviceService,  packetService, this);
+
     }
 
     @Deactivate
@@ -76,6 +88,8 @@
 
     @Override
     protected HostProviderService createProviderService(HostProvider provider) {
+        monitor.registerHostProvider(provider);
+
         return new InternalHostProviderService(provider);
     }
 
@@ -126,12 +140,12 @@
 
     @Override
     public void startMonitoringIp(IpAddress ip) {
-        // TODO pass through to HostMonitor
+        monitor.addMonitoringFor(ip);
     }
 
     @Override
     public void stopMonitoringIp(IpAddress ip) {
-        // TODO pass through to HostMonitor
+        monitor.stopMonitoring(ip);
     }
 
     @Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java
index a5aa13e..9f8dd48 100644
--- a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java
@@ -2,10 +2,11 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.jboss.netty.util.Timeout;
@@ -21,19 +22,19 @@
 import org.onlab.onos.net.flow.instructions.Instruction;
 import org.onlab.onos.net.flow.instructions.Instructions;
 import org.onlab.onos.net.host.HostProvider;
-import org.onlab.onos.net.host.HostService;
-import org.onlab.onos.net.host.HostStore;
 import org.onlab.onos.net.host.PortAddresses;
 import org.onlab.onos.net.packet.DefaultOutboundPacket;
 import org.onlab.onos.net.packet.OutboundPacket;
 import org.onlab.onos.net.packet.PacketService;
-import org.onlab.onos.net.topology.TopologyService;
+import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.packet.ARP;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Monitors hosts on the dataplane to detect changes in host data.
@@ -43,9 +44,7 @@
  * probe for hosts that have not yet been detected (specified by IP address).
  */
 public class HostMonitor implements TimerTask {
-
-    private static final byte[] DEFAULT_MAC_ADDRESS =
-            MacAddress.valueOf("00:00:00:00:00:01").getAddress();
+    private static final Logger log = LoggerFactory.getLogger(HostMonitor.class);
 
     private static final byte[] ZERO_MAC_ADDRESS =
             MacAddress.valueOf("00:00:00:00:00:00").getAddress();
@@ -54,59 +53,77 @@
     private static final byte[] BROADCAST_MAC =
             MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress();
 
-    private final HostService hostService;
-    private final TopologyService topologyService;
-    private final DeviceService deviceService;
-    private final HostProvider hostProvider;
-    private final PacketService packetService;
-    private final HostStore hostStore;
+    private DeviceService deviceService;
+    private PacketService packetService;
+    private HostManager hostManager;
 
     private final Set<IpAddress> monitoredAddresses;
 
+    private final Map<ProviderId, HostProvider> hostProviders;
+
     private final long probeRate;
 
     private final Timeout timeout;
 
-    public HostMonitor(HostService hostService, TopologyService topologyService,
+    public HostMonitor(
             DeviceService deviceService,
-            HostProvider hostProvider, PacketService packetService,
-            HostStore hostStore) {
-        this.hostService = hostService;
-        this.topologyService = topologyService;
+            PacketService packetService,
+            HostManager hostService) {
+
         this.deviceService = deviceService;
-        this.hostProvider = hostProvider;
         this.packetService = packetService;
-        this.hostStore = hostStore;
+        this.hostManager = hostService;
 
         monitoredAddresses = new HashSet<>();
+        hostProviders = new ConcurrentHashMap<>();
 
         probeRate = 30000; // milliseconds
 
         timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
+
+        addDefaultAddresses();
     }
 
-    public void addMonitoringFor(IpAddress ip) {
+    private void addDefaultAddresses() {
+        //monitoredAddresses.add(IpAddress.valueOf("10.0.0.1"));
+    }
+
+    void addMonitoringFor(IpAddress ip) {
         monitoredAddresses.add(ip);
     }
 
-    public void stopMonitoring(IpAddress ip) {
+    void stopMonitoring(IpAddress ip) {
         monitoredAddresses.remove(ip);
     }
 
-    public void shutdown() {
+    void shutdown() {
         timeout.cancel();
     }
 
+    void registerHostProvider(HostProvider provider) {
+        hostProviders.put(provider.id(), provider);
+    }
+
+    void unregisterHostProvider(HostProvider provider) {
+        // TODO find out how to call this
+    }
+
     @Override
     public void run(Timeout timeout) throws Exception {
         for (IpAddress ip : monitoredAddresses) {
-            Set<Host> hosts = Collections.emptySet(); //TODO hostService.getHostsByIp(ip);
+            // TODO have to convert right now because the HostService API uses IpPrefix
+            IpPrefix prefix = IpPrefix.valueOf(ip.toOctets());
+
+            Set<Host> hosts = hostManager.getHostsByIp(prefix);
 
             if (hosts.isEmpty()) {
                 sendArpRequest(ip);
             } else {
                 for (Host host : hosts) {
-                    hostProvider.triggerProbe(host);
+                    HostProvider provider = hostProviders.get(host.providerId());
+                    if (provider != null) {
+                        provider.triggerProbe(host);
+                    }
                 }
             }
         }
@@ -120,29 +137,26 @@
      * @param targetIp IP address to ARP for
      */
     private void sendArpRequest(IpAddress targetIp) {
-
         // Find ports with an IP address in the target's subnet and sent ARP
         // probes out those ports.
         for (Device device : deviceService.getDevices()) {
             for (Port port : deviceService.getPorts(device.id())) {
                 ConnectPoint cp = new ConnectPoint(device.id(), port.number());
-                PortAddresses addresses = hostStore.getAddressBindingsForPort(cp);
+                PortAddresses addresses = hostManager.getAddressBindingsForPort(cp);
 
-                /*for (IpPrefix prefix : addresses.ips()) {
+                for (IpPrefix prefix : addresses.ips()) {
                     if (prefix.contains(targetIp)) {
-                        sendProbe(device.id(), port, addresses, targetIp);
+                        sendProbe(device.id(), port, targetIp,
+                                prefix.toIpAddress(), addresses.mac());
                     }
-                }*/
+                }
             }
         }
-
-        // TODO case where no address was found.
-        // Broadcast out internal edge ports?
     }
 
-    private void sendProbe(DeviceId deviceId, Port port, PortAddresses portAddresses,
-            IpAddress targetIp) {
-        Ethernet arpPacket = createArpFor(targetIp, portAddresses);
+    private void sendProbe(DeviceId deviceId, Port port, IpAddress targetIp,
+            IpAddress sourceIp, MacAddress sourceMac) {
+        Ethernet arpPacket = buildArpRequest(targetIp, sourceIp, sourceMac);
 
         List<Instruction> instructions = new ArrayList<>();
         instructions.add(Instructions.createOutput(port.number()));
@@ -158,31 +172,26 @@
         packetService.emit(outboundPacket);
     }
 
-    private Ethernet createArpFor(IpAddress targetIp, PortAddresses portAddresses) {
+    private Ethernet buildArpRequest(IpAddress targetIp, IpAddress sourceIp,
+            MacAddress sourceMac) {
 
         ARP arp = new ARP();
         arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
-        .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
-        .setProtocolType(ARP.PROTO_TYPE_IP)
-        .setProtocolAddressLength((byte) IpPrefix.INET_LEN);
+           .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
+           .setProtocolType(ARP.PROTO_TYPE_IP)
+           .setProtocolAddressLength((byte) IpPrefix.INET_LEN)
+           .setOpCode(ARP.OP_REQUEST);
 
-        byte[] sourceMacAddress;
-        if (portAddresses.mac() == null) {
-            sourceMacAddress = DEFAULT_MAC_ADDRESS;
-        } else {
-            sourceMacAddress = portAddresses.mac().getAddress();
-        }
-
-        arp.setSenderHardwareAddress(sourceMacAddress)
-        //TODO .setSenderProtocolAddress(portAddresses.ips().toOctets())
-        .setTargetHardwareAddress(ZERO_MAC_ADDRESS)
-        .setTargetProtocolAddress(targetIp.toOctets());
+        arp.setSenderHardwareAddress(sourceMac.getAddress())
+           .setSenderProtocolAddress(sourceIp.toOctets())
+           .setTargetHardwareAddress(ZERO_MAC_ADDRESS)
+           .setTargetProtocolAddress(targetIp.toOctets());
 
         Ethernet ethernet = new Ethernet();
         ethernet.setEtherType(Ethernet.TYPE_ARP)
-        .setDestinationMACAddress(BROADCAST_MAC)
-        .setSourceMACAddress(sourceMacAddress)
-        .setPayload(arp);
+                .setDestinationMACAddress(BROADCAST_MAC)
+                .setSourceMACAddress(sourceMac.getAddress())
+                .setPayload(arp);
 
         return ethernet;
     }
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 5ff72a2..0b451c0 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -9,7 +9,9 @@
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.junit.After;
 import org.junit.Before;
@@ -42,6 +44,7 @@
 import org.onlab.onos.net.provider.ProviderId;
 import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -52,6 +55,7 @@
 
     private static final ProviderId PID = new ProviderId("of", "foo");
     private static final DeviceId DID = DeviceId.deviceId("of:001");
+    private static final int TIMEOUT = 10;
     private static final Device DEV = new DefaultDevice(
             PID, DID, Type.SWITCH, "", "", "", "");
 
@@ -96,7 +100,7 @@
     private FlowRule flowRule(int tsval, int trval) {
         TestSelector ts = new TestSelector(tsval);
         TestTreatment tr = new TestTreatment(trval);
-        return new DefaultFlowRule(DID, ts, tr, 0, appId);
+        return new DefaultFlowRule(DID, ts, tr, 0, appId, TIMEOUT);
     }
 
     private FlowRule flowRule(FlowRule rule, FlowRuleState state) {
@@ -105,7 +109,8 @@
 
     private FlowRule addFlowRule(int hval) {
         FlowRule rule = flowRule(hval, hval);
-        providerService.flowAdded(rule);
+        service.applyFlowRules(rule);
+
         assertNotNull("rule should be found", service.getFlowEntries(DID));
         return rule;
     }
@@ -135,13 +140,18 @@
     public void getFlowEntries() {
         assertTrue("store should be empty",
                 Sets.newHashSet(service.getFlowEntries(DID)).isEmpty());
-        addFlowRule(1);
-        addFlowRule(2);
+        FlowRule f1 = addFlowRule(1);
+        FlowRule f2 = addFlowRule(2);
+
         assertEquals("2 rules should exist", 2, flowCount());
+
+        providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2));
         validateEvents(RULE_ADDED, RULE_ADDED);
 
         addFlowRule(1);
         assertEquals("should still be 2 rules", 2, flowCount());
+
+        providerService.pushFlowMetrics(DID, ImmutableList.of(f1));
         validateEvents(RULE_UPDATED);
     }
 
@@ -179,8 +189,10 @@
     public void removeFlowRules() {
         FlowRule f1 = addFlowRule(1);
         FlowRule f2 = addFlowRule(2);
-        addFlowRule(3);
+        FlowRule f3 = addFlowRule(3);
         assertEquals("3 rules should exist", 3, flowCount());
+
+        providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2, f3));
         validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED);
 
         FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
@@ -200,8 +212,9 @@
     @Test
     public void flowRemoved() {
         FlowRule f1 = addFlowRule(1);
+        FlowRule f2 = addFlowRule(2);
+        providerService.pushFlowMetrics(f1.deviceId(), ImmutableList.of(f1, f2));
         service.removeFlowRules(f1);
-        addFlowRule(2);
         FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
         providerService.flowRemoved(rem1);
         validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
@@ -209,9 +222,11 @@
         providerService.flowRemoved(rem1);
         validateEvents();
 
-        FlowRule f3 = flowRule(flowRule(3, 3), FlowRuleState.ADDED);
-        providerService.flowAdded(f3);
+        FlowRule f3 = flowRule(3, 3);
+        service.applyFlowRules(f3);
+        providerService.pushFlowMetrics(f3.deviceId(), Collections.singletonList(f3));
         validateEvents(RULE_ADDED);
+
         providerService.flowRemoved(f3);
         validateEvents();
     }
@@ -223,9 +238,10 @@
         FlowRule f3 = flowRule(3, 3);
 
 
+
+        mgr.applyFlowRules(f1, f2, f3);
         FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
         FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
-        mgr.applyFlowRules(f1, f2, f3);
 
         providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2));
 
@@ -233,7 +249,7 @@
                 validateState(FlowRuleState.PENDING_ADD, FlowRuleState.ADDED,
                         FlowRuleState.ADDED));
 
-        validateEvents(RULE_UPDATED, RULE_UPDATED);
+        validateEvents(RULE_ADDED, RULE_ADDED);
     }
 
     @Test
@@ -241,15 +257,15 @@
         FlowRule f1 = flowRule(1, 1);
         FlowRule f2 = flowRule(2, 2);
         FlowRule f3 = flowRule(3, 3);
+        mgr.applyFlowRules(f1, f2);
 
         FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
         FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
         FlowRule updatedF3 = flowRule(f3, FlowRuleState.ADDED);
-        mgr.applyFlowRules(f1, f2);
 
         providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2, updatedF3));
 
-        validateEvents(RULE_UPDATED, RULE_UPDATED);
+        validateEvents(RULE_ADDED, RULE_ADDED);
 
     }
 
@@ -271,7 +287,7 @@
 
         providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2));
 
-        validateEvents(RULE_UPDATED, RULE_UPDATED, RULE_REMOVED);
+        validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
 
     }
 
@@ -386,7 +402,7 @@
         }
 
         @Override
-        public List<Criterion> criteria() {
+        public Set<Criterion> criteria() {
             return null;
         }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index e25c964..9408cc9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -20,7 +20,7 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
-import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
 import org.onlab.packet.IpPrefix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@
     private final Map<NodeId, State> states = new ConcurrentHashMap<>();
     private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
             .maximumSize(1000)
-            .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+            .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
             .removalListener(new LivenessCacheRemovalListener()).build();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
index 15e756d..7ec27ec 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
@@ -1,5 +1,13 @@
 package org.onlab.onos.store.cluster.messaging;
 
+/**
+ * Interface for handling cluster messages.
+ */
 public interface ClusterMessageHandler {
+
+    /**
+     * Handles/Processes the cluster message.
+     * @param message cluster message.
+     */
     public void handle(ClusterMessage message);
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index 4c9eefa..ee8d9c1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -2,6 +2,8 @@
 
 /**
  * Representation of a message subject.
+ * Cluster messages have associated subjects that dictate how they get handled
+ * on the receiving side.
  */
 public class MessageSubject {
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
deleted file mode 100644
index 666ac6d..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-import org.onlab.onos.cluster.NodeId;
-
-/**
- * Represents a message consumer.
- */
-public interface MessageSubscriber {
-
-    /**
-     * Receives the specified cluster message.
-     *
-     * @param message    message to be received
-     * @param fromNodeId node from which the message was received
-     */
-    void receive(Object messagePayload, NodeId fromNodeId);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
similarity index 99%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index e6e4a4d..d4fd9c0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -32,7 +32,7 @@
 
 @Component(immediate = true)
 @Service
-public class OnosClusterCommunicationManager
+public class ClusterCommunicationManager
         implements ClusterCommunicationService, ClusterCommunicationAdminService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index 44e5421..bba12f2 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -6,7 +6,7 @@
 import org.junit.Test;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
 import org.onlab.netty.NettyMessagingService;
 import org.onlab.packet.IpPrefix;
 
@@ -29,8 +29,8 @@
 
     private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
 
-    private OnosClusterCommunicationManager ccm1;
-    private OnosClusterCommunicationManager ccm2;
+    private ClusterCommunicationManager ccm1;
+    private ClusterCommunicationManager ccm2;
 
     private TestDelegate cnd1 = new TestDelegate();
     private TestDelegate cnd2 = new TestDelegate();
@@ -46,11 +46,11 @@
         NettyMessagingService messagingService = new NettyMessagingService();
         messagingService.activate();
 
-        ccm1 = new OnosClusterCommunicationManager();
+        ccm1 = new ClusterCommunicationManager();
 //        ccm1.serializationService = messageSerializer;
         ccm1.activate();
 
-        ccm2 = new OnosClusterCommunicationManager();
+        ccm2 = new ClusterCommunicationManager();
 //        ccm2.serializationService = messageSerializer;
         ccm2.activate();
 
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 2f43211..d12d00e 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -1,6 +1,5 @@
 package org.onlab.onos.store.trivial.impl;
 
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -116,18 +115,21 @@
         DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
-        if (flowEntries.containsEntry(did, rule)) {
-            //synchronized (flowEntries) {
+        FlowRule stored = getFlowRule(rule);
+        if (stored != null) {
             // Multimaps support duplicates so we have to remove our rule
             // and replace it with the current version.
             flowEntries.remove(did, rule);
             flowEntries.put(did, rule);
-            //}
+
+            if (stored.state() == FlowRuleState.PENDING_ADD) {
+                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+            }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
 
         flowEntries.put(did, rule);
-        return new FlowRuleEvent(RULE_ADDED, rule);
+        return null;
     }
 
     @Override
@@ -140,11 +142,4 @@
         }
         //}
     }
-
-
-
-
-
-
-
 }
diff --git a/features/features.xml b/features/features.xml
index f008c14..68fa8c3 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -11,7 +11,7 @@
         <bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
 
         <bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
-        <bundle>mvn:com.codahale.metrics/metrics-core/3.0.2</bundle>
+        <bundle>mvn:io.dropwizard.metrics/metrics-core/3.1.0</bundle>
         <bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
 
         <bundle>mvn:com.esotericsoftware.kryo/kryo/2.24.0</bundle>
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
index eb12286..e8ebcd1 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -169,7 +169,12 @@
 
     @Override
     public void setRole(Dpid dpid, RoleState role) {
-        getSwitch(dpid).setRole(role);
+        final OpenFlowSwitch sw = getSwitch(dpid);
+        if (sw == null) {
+            log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role);
+            return;
+        }
+        sw.setRole(role);
     }
 
     /**
diff --git a/pom.xml b/pom.xml
index 56bbd74..cb00f32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,11 @@
                 <classifier>tests</classifier>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+              <groupId>commons-pool</groupId>
+              <artifactId>commons-pool</artifactId>
+              <version>1.6</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index 86ab701..ade651e 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -77,7 +77,6 @@
                 .setCookie(U64.of(cookie.value()))
                 .setBufferId(OFBufferId.NO_BUFFER)
                 .setActions(actions)
-                .setIdleTimeout(10)
                 .setMatch(match)
                 .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
                 .setPriority(priority)
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java
index ac00f05..eba2282 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java
@@ -71,7 +71,7 @@
                     buildSelector(), buildTreatment(), stat.getPriority(),
                     FlowRuleState.ADDED, stat.getDurationNsec() / 1000000,
                     stat.getPacketCount().getValue(), stat.getByteCount().getValue(),
-                    stat.getCookie().getValue(), false);
+                    stat.getCookie().getValue(), false, stat.getIdleTimeout());
         } else {
             // TODO: revisit potentially.
             return new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
@@ -79,7 +79,8 @@
                     FlowRuleState.REMOVED, removed.getDurationNsec() / 1000000,
                     removed.getPacketCount().getValue(), removed.getByteCount().getValue(),
                     removed.getCookie().getValue(),
-                    removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal());
+                    removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal(),
+                    stat.getIdleTimeout());
         }
     }
 
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index bf29ae4..24a7ea8 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -127,7 +127,7 @@
 
         @Override
         public void switchAdded(Dpid dpid) {
-            FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), 5);
+            FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
             fsc.start();
             collectors.put(dpid, fsc);
         }
diff --git a/tools/build/conf/src/main/resources/onos/checkstyle.xml b/tools/build/conf/src/main/resources/onos/checkstyle.xml
index 06413aa..dad602d 100644
--- a/tools/build/conf/src/main/resources/onos/checkstyle.xml
+++ b/tools/build/conf/src/main/resources/onos/checkstyle.xml
@@ -176,7 +176,7 @@
         </module>
 
         <module name="ParameterNumber">
-            <property name="max" value="10"/>
+            <property name="max" value="15"/>
             <property name="tokens" value="CTOR_DEF"/>
         </module>
         <!-- Checks for whitespace                               -->
diff --git a/utils/misc/pom.xml b/utils/misc/pom.xml
index bb25635..bd3cc08 100644
--- a/utils/misc/pom.xml
+++ b/utils/misc/pom.xml
@@ -56,9 +56,13 @@
             <artifactId>objenesis</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.codahale.metrics</groupId>
+            <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-core</artifactId>
-            <version>3.0.2</version>
+            <version>3.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
         </dependency>
     </dependencies>
 
diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
index 2b13efb..e07d3f9 100644
--- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
+++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
@@ -1,10 +1,18 @@
 package org.onlab.metrics;
 
+import java.io.File;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.CsvReporter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
@@ -45,24 +53,44 @@
  *   </code>
  * </pre>
  */
+@Component(immediate = true)
 public final class MetricsManager implements MetricsService {
 
     /**
      * Registry to hold the Components defined in the system.
      */
-    private ConcurrentMap<String, MetricsComponent> componentsRegistry =
-            new ConcurrentHashMap<>();
+    private ConcurrentMap<String, MetricsComponent> componentsRegistry;
 
     /**
      * Registry for the Metrics objects created in the system.
      */
-    private final MetricRegistry metricsRegistry = new MetricRegistry();
+    private final MetricRegistry metricsRegistry;
 
     /**
-     * Hide constructor.  The only way to get the registry is through the
-     * singleton getter.
+     * Default Reporter for this metrics manager.
      */
-    private MetricsManager() {}
+    private final CsvReporter reporter;
+
+    public MetricsManager() {
+        this.componentsRegistry = new ConcurrentHashMap<>();
+        this.metricsRegistry = new MetricRegistry();
+
+        this.reporter = CsvReporter.forRegistry(metricsRegistry)
+                .formatFor(Locale.US)
+                .convertRatesTo(TimeUnit.SECONDS)
+                .convertDurationsTo(TimeUnit.MICROSECONDS)
+                .build(new File("/tmp/"));
+
+        reporter.start(10, TimeUnit.SECONDS);
+    }
+
+    @Activate
+    public void activate() {
+    }
+
+    @Deactivate
+    public void deactivate() {
+    }
 
     /**
      * Registers a component.
diff --git a/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java b/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java
index b205f90..84acb82 100644
--- a/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java
+++ b/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java
@@ -250,6 +250,17 @@
         return new IpPrefix(version, host, netmask);
     }
 
+    /**
+     * Returns an IpAddress of the bytes contained in this prefix.
+     * FIXME this is a hack for now and only works because IpPrefix doesn't
+     * mask the input bytes on creation.
+     *
+     * @return the IpAddress
+     */
+    public IpAddress toIpAddress() {
+        return IpAddress.valueOf(octets);
+    }
+
     public boolean isMasked() {
         return mask() != 0;
     }
@@ -278,6 +289,17 @@
         return false;
     }
 
+    public boolean contains(IpAddress address) {
+        // Need to get the network address because prefixes aren't automatically
+        // masked on creation
+        IpPrefix meMasked = network();
+
+        IpPrefix otherMasked =
+                IpPrefix.valueOf(address.octets, netmask).network();
+
+        return Arrays.equals(meMasked.octets, otherMasked.octets);
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -303,6 +325,7 @@
         if (netmask != other.netmask) {
             return false;
         }
+        // TODO not quite right until we mask the input
         if (!Arrays.equals(octets, other.octets)) {
             return false;
         }
diff --git a/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java b/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java
index f6bf6f1..297a0f3 100644
--- a/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java
+++ b/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java
@@ -76,7 +76,7 @@
     }
 
     @Test
-    public void testContains() {
+    public void testContainsIpPrefix() {
         IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31);
         IpPrefix slash32 = IpPrefix.valueOf(BYTES1, 32);
         IpPrefix differentSlash32 = IpPrefix.valueOf(BYTES2, 32);
@@ -96,4 +96,17 @@
         assertTrue(slash8.contains(slash31));
         assertFalse(slash31.contains(slash8));
     }
+
+    @Test
+    public void testContainsIpAddress() {
+        IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31);
+        IpAddress slash32 = IpAddress.valueOf(BYTES1, 32);
+
+        assertTrue(slash31.contains(slash32));
+
+        IpPrefix intf = IpPrefix.valueOf("192.168.10.101/24");
+        IpAddress addr = IpAddress.valueOf("192.168.10.1");
+
+        assertTrue(intf.contains(addr));
+    }
 }
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index d335117..a980d1d 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -42,7 +42,6 @@
         <dependency>
             <groupId>commons-pool</groupId>
             <artifactId>commons-pool</artifactId>
-            <version>1.6</version>
         </dependency>
     </dependencies>
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
new file mode 100644
index 0000000..b2b490e
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -0,0 +1,68 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An asynchronous response.
+ * This class provides a base implementation of Response, with methods to retrieve the
+ * result and query to see if the result is ready. The result can only be retrieved when
+ * it is ready and the get methods will block if the result is not ready yet.
+ * @param <T> type of response.
+ */
+public class AsyncResponse<T> implements Response<T> {
+
+    private T value;
+    private boolean done = false;
+    private final long start = System.nanoTime();
+
+    @Override
+    public T get(long timeout, TimeUnit tu) throws TimeoutException {
+        timeout = tu.toNanos(timeout);
+        boolean interrupted = false;
+        try {
+            synchronized (this) {
+                while (!done) {
+                    try {
+                        long timeRemaining = timeout - (System.nanoTime() - start);
+                        if (timeRemaining <= 0) {
+                            throw new TimeoutException("Operation timed out.");
+                        }
+                        TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+                    }
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        return value;
+    }
+
+    @Override
+    public T get() throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isReady() {
+        return done;
+    }
+
+    /**
+     * Sets response value and unblocks any thread blocking on the response to become
+     * available.
+     * @param data response data.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized void setResponse(Object data) {
+        if (!done) {
+            done = true;
+            value = (T) data;
+            this.notifyAll();
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
new file mode 100644
index 0000000..313a448
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -0,0 +1,15 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Message handler that echos the message back to the sender.
+ */
+public class EchoHandler implements MessageHandler {
+
+    @Override
+    public void handle(Message message) throws IOException {
+        System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
+        message.respond(message.payload());
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
new file mode 100644
index 0000000..8681093
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -0,0 +1,62 @@
+package org.onlab.netty;
+
+/**
+ * Representation of a TCP/UDP communication end point.
+ */
+public class Endpoint {
+
+    private final int port;
+    private final String host;
+
+    public Endpoint(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    public String host() {
+        return host;
+    }
+
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return "Endpoint [port=" + port + ", host=" + host + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((host == null) ? 0 : host.hashCode());
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        Endpoint other = (Endpoint) obj;
+        if (host == null) {
+            if (other.host != null) {
+                return false;
+            }
+        } else if (!host.equals(other.host)) {
+            return false;
+        }
+        if (port != other.port) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
new file mode 100644
index 0000000..bcf6f52
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -0,0 +1,85 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Internal message representation with additional attributes
+ * for supporting, synchronous request/reply behavior.
+ */
+public final class InternalMessage implements Message {
+
+    private long id;
+    private Endpoint sender;
+    private String type;
+    private Object payload;
+    private transient NettyMessagingService messagingService;
+    public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
+
+    // Must be created using the Builder.
+    private InternalMessage() {}
+
+    public long id() {
+        return id;
+    }
+
+    public String type() {
+        return type;
+    }
+
+    public Endpoint sender() {
+        return sender;
+    }
+
+    @Override
+    public Object payload() {
+        return payload;
+    }
+
+    @Override
+    public void respond(Object data) throws IOException {
+        Builder builder = new Builder(messagingService);
+        InternalMessage message = builder.withId(this.id)
+             // FIXME: Sender should be messagingService.localEp.
+            .withSender(this.sender)
+            .withPayload(data)
+            .withType(REPLY_MESSAGE_TYPE)
+            .build();
+        messagingService.sendAsync(sender, message);
+    }
+
+
+    /**
+     * Builder for InternalMessages.
+     */
+    public static class Builder {
+        private InternalMessage message;
+
+        public Builder(NettyMessagingService messagingService) {
+            message = new InternalMessage();
+            message.messagingService = messagingService;
+        }
+
+        public Builder withId(long id) {
+            message.id = id;
+            return this;
+        }
+
+        public Builder withType(String type) {
+            message.type = type;
+            return this;
+        }
+
+        public Builder withSender(Endpoint sender) {
+            message.sender = sender;
+            return this;
+        }
+        public Builder withPayload(Object payload) {
+            message.payload = payload;
+            return this;
+        }
+
+        public InternalMessage build() {
+            return message;
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
new file mode 100644
index 0000000..73c01a0
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -0,0 +1,47 @@
+package org.onlab.netty;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Kryo Serializer.
+ */
+public class KryoSerializer implements Serializer {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private KryoPool serializerPool;
+
+    public KryoSerializer() {
+        setupKryoPool();
+    }
+
+    /**
+     * Sets up the common serialzers pool.
+     */
+    protected void setupKryoPool() {
+        // FIXME Slice out types used in common to separate pool/namespace.
+        serializerPool = KryoPool.newBuilder()
+                .register(ArrayList.class,
+                          HashMap.class,
+                          ArrayList.class
+                )
+                .build()
+                .populate(1);
+    }
+
+
+    @Override
+    public Object decode(byte[] data) {
+        return serializerPool.deserialize(data);
+    }
+
+    @Override
+    public byte[] encode(Object payload) {
+        return serializerPool.serialize(payload);
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
new file mode 100644
index 0000000..ed6cdb4
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -0,0 +1,12 @@
+package org.onlab.netty;
+
+/**
+ * A MessageHandler that simply logs the information.
+ */
+public class LoggingHandler implements MessageHandler {
+
+    @Override
+    public void handle(Message message) {
+        System.out.println("Received: " + message.payload());
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
new file mode 100644
index 0000000..54b9526
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Message.java
@@ -0,0 +1,23 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * A unit of communication.
+ * Has a payload. Also supports a feature to respond back to the sender.
+ */
+public interface Message {
+
+    /**
+     * Returns the payload of this message.
+     * @return message payload.
+     */
+    public Object payload();
+
+    /**
+     * Sends a reply back to the sender of this messge.
+     * @param data payload of the response.
+     * @throws IOException if there is a communication error.
+     */
+    public void respond(Object data) throws IOException;
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
new file mode 100644
index 0000000..ecf2d62
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -0,0 +1,58 @@
+package org.onlab.netty;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+/**
+ * Decode bytes into a InternalMessage.
+ */
+public class MessageDecoder extends ByteToMessageDecoder {
+
+    private final NettyMessagingService messagingService;
+    private final Serializer serializer;
+
+    public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+        this.messagingService = messagingService;
+        this.serializer = serializer;
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext context, ByteBuf in,
+            List<Object> messages) throws Exception {
+
+        byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
+        checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+
+        // read message Id.
+        long id = in.readLong();
+
+        // read message type; first read size and then bytes.
+        String type = new String(in.readBytes(in.readInt()).array());
+
+        // read sender host name; first read size and then bytes.
+        String host = new String(in.readBytes(in.readInt()).array());
+
+        // read sender port.
+        int port = in.readInt();
+
+        Endpoint sender = new Endpoint(host, port);
+
+        // read message payload; first read size and then bytes.
+        Object payload = serializer.decode(in.readBytes(in.readInt()).array());
+
+        InternalMessage message = new InternalMessage.Builder(messagingService)
+                .withId(id)
+                .withSender(sender)
+                .withType(type)
+                .withPayload(payload)
+                .build();
+
+        messages.add(message);
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
new file mode 100644
index 0000000..1b52a0f
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -0,0 +1,60 @@
+package org.onlab.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ * Encode InternalMessage out into a byte buffer.
+ */
+public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+
+    // onosiscool in ascii
+    public static final byte[] PREAMBLE = "onosiscool".getBytes();
+
+    private final Serializer serializer;
+
+    public MessageEncoder(Serializer serializer) {
+        this.serializer = serializer;
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext context, InternalMessage message,
+            ByteBuf out) throws Exception {
+
+        // write preamble
+        out.writeBytes(PREAMBLE);
+
+        // write id
+        out.writeLong(message.id());
+
+        // write type length
+        out.writeInt(message.type().length());
+
+        // write type
+        out.writeBytes(message.type().getBytes());
+
+        // write sender host name size
+        out.writeInt(message.sender().host().length());
+
+        // write sender host name.
+        out.writeBytes(message.sender().host().getBytes());
+
+        // write port
+        out.writeInt(message.sender().port());
+
+        try {
+            serializer.encode(message.payload());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        byte[] payload = serializer.encode(message.payload());
+
+        // write payload length.
+        out.writeInt(payload.length);
+
+        // write payload bytes
+        out.writeBytes(payload);
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
new file mode 100644
index 0000000..7bd5a7f
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
@@ -0,0 +1,16 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Handler for a message.
+ */
+public interface MessageHandler {
+
+    /**
+     * Handles the message.
+     * @param message message.
+     * @throws IOException.
+     */
+    public void handle(Message message) throws IOException;
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
new file mode 100644
index 0000000..ebad442
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -0,0 +1,41 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Interface for low level messaging primitives.
+ */
+public interface MessagingService {
+    /**
+     * Sends a message asynchronously to the specified communication end point.
+     * The message is specified using the type and payload.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @throws IOException
+     */
+    public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
+
+    /**
+     * Sends a message synchronously and waits for a response.
+     * @param ep end point to send the message to.
+     * @param type type of message.
+     * @param payload message payload.
+     * @return a response future
+     * @throws IOException
+     */
+    public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
+
+    /**
+     * Registers a new message handler for message type.
+     * @param type message type.
+     * @param handler message handler
+     */
+    public void registerHandler(String type, MessageHandler handler);
+
+    /**
+     * Unregister current handler, if one exists for message type.
+     * @param type message type
+     */
+    public void unregisterHandler(String type);
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
new file mode 100644
index 0000000..54da8cc
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -0,0 +1,244 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.pool.KeyedObjectPool;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * A Netty based implementation of MessagingService.
+ */
+public class NettyMessagingService implements MessagingService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private KeyedObjectPool<Endpoint, Channel> channels =
+            new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+    private final int port;
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private Cache<Long, AsyncResponse<?>> responseFutures;
+    private final Endpoint localEp;
+
+    protected Serializer serializer;
+
+    public NettyMessagingService() {
+        // TODO: Default port should be configurable.
+        this(8080);
+    }
+
+    // FIXME: Constructor should not throw exceptions.
+    public NettyMessagingService(int port) {
+        this.port = port;
+        try {
+            localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+        } catch (UnknownHostException e) {
+            // bailing out.
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void activate() throws Exception {
+        responseFutures = CacheBuilder.newBuilder()
+                .maximumSize(100000)
+                .weakValues()
+                // TODO: Once the entry expires, notify blocking threads (if any).
+                .expireAfterWrite(10, TimeUnit.MINUTES)
+                .build();
+        startAcceptingConnections();
+    }
+
+    public void deactivate() throws Exception {
+        channels.close();
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+
+    @Override
+    public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
+        InternalMessage message = new InternalMessage.Builder(this)
+            .withId(RandomUtils.nextLong())
+            .withSender(localEp)
+            .withType(type)
+            .withPayload(payload)
+            .build();
+        sendAsync(ep, message);
+    }
+
+    protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
+        Channel channel = null;
+        try {
+            channel = channels.borrowObject(ep);
+            channel.eventLoop().execute(new WriteTask(channel, message));
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            try {
+                channels.returnObject(ep, channel);
+            } catch (Exception e) {
+                log.warn("Error returning object back to the pool", e);
+                // ignored.
+            }
+        }
+    }
+
+    @Override
+    public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
+            throws IOException {
+        AsyncResponse<T> futureResponse = new AsyncResponse<T>();
+        Long messageId = RandomUtils.nextLong();
+        responseFutures.put(messageId, futureResponse);
+        InternalMessage message = new InternalMessage.Builder(this)
+            .withId(messageId)
+            .withSender(localEp)
+            .withType(type)
+            .withPayload(payload)
+            .build();
+        sendAsync(ep, message);
+        return futureResponse;
+    }
+
+    @Override
+    public void registerHandler(String type, MessageHandler handler) {
+        // TODO: Is this the right semantics for handler registration?
+        handlers.putIfAbsent(type, handler);
+    }
+
+    public void unregisterHandler(String type) {
+        handlers.remove(type);
+    }
+
+    private MessageHandler getMessageHandler(String type) {
+        return handlers.get(type);
+    }
+
+    private void startAcceptingConnections() throws InterruptedException {
+        ServerBootstrap b = new ServerBootstrap();
+        b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        b.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new OnosCommunicationChannelInitializer())
+            .option(ChannelOption.SO_BACKLOG, 128)
+            .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        // Bind and start to accept incoming connections.
+        b.bind(port).sync();
+    }
+
+    private class OnosCommunicationChannelFactory
+        implements KeyedPoolableObjectFactory<Endpoint, Channel> {
+
+        @Override
+        public void activateObject(Endpoint endpoint, Channel channel)
+                throws Exception {
+        }
+
+        @Override
+        public void destroyObject(Endpoint ep, Channel channel) throws Exception {
+            channel.close();
+        }
+
+        @Override
+        public Channel makeObject(Endpoint ep) throws Exception {
+            Bootstrap b = new Bootstrap();
+            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            b.group(workerGroup);
+            // TODO: Make this faster:
+            // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+            b.handler(new OnosCommunicationChannelInitializer());
+
+            // Start the client.
+            ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
+            return f.channel();
+        }
+
+        @Override
+        public void passivateObject(Endpoint ep, Channel channel)
+                throws Exception {
+        }
+
+        @Override
+        public boolean validateObject(Endpoint ep, Channel channel) {
+            return channel.isOpen();
+        }
+    }
+
+    private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+        @Override
+        protected void initChannel(SocketChannel channel) throws Exception {
+            channel.pipeline()
+                .addLast(new MessageEncoder(serializer))
+                .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
+                .addLast(new NettyMessagingService.InboundMessageDispatcher());
+        }
+    }
+
+    private class WriteTask implements Runnable {
+
+        private final Object message;
+        private final Channel channel;
+
+        public WriteTask(Channel channel, Object message) {
+            this.message = message;
+            this.channel = channel;
+        }
+
+        @Override
+        public void run() {
+            channel.writeAndFlush(message);
+        }
+    }
+
+    private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
+            String type = message.type();
+            if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+                try {
+                    AsyncResponse<?> futureResponse =
+                        NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+                    if (futureResponse != null) {
+                        futureResponse.setResponse(message.payload());
+                    }
+                    log.warn("Received a reply. But was unable to locate the request handle");
+                } finally {
+                    NettyMessagingService.this.responseFutures.invalidate(message.id());
+                }
+                return;
+            }
+            MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+            handler.handle(message);
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
new file mode 100644
index 0000000..04675ce
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Response.java
@@ -0,0 +1,36 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Response object returned when making synchronous requests.
+ * Can you used to check is a response is ready and/or wait for a response
+ * to become available.
+ *
+ * @param <T> type of response.
+ */
+public interface Response<T> {
+
+    /**
+     * Gets the response waiting for a designated timeout period.
+     * @param timeout timeout period (since request was sent out)
+     * @param tu unit of time.
+     * @return response
+     * @throws TimeoutException if the timeout expires before the response arrives.
+     */
+    public T get(long timeout, TimeUnit tu) throws TimeoutException;
+
+    /**
+     * Gets the response waiting for indefinite timeout period.
+     * @return response
+     * @throws InterruptedException if the thread is interrupted before the response arrives.
+     */
+    public T get() throws InterruptedException;
+
+    /**
+     * Checks if the response is ready without blocking.
+     * @return true if response is ready, false otherwise.
+     */
+    public boolean isReady();
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
new file mode 100644
index 0000000..ac55f5a
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
@@ -0,0 +1,24 @@
+package org.onlab.netty;
+
+/**
+ * Interface for encoding/decoding message payloads.
+ */
+public interface Serializer {
+
+    /**
+     * Decodes the specified byte array to a POJO.
+     *
+     * @param data byte array.
+     * @return POJO
+     */
+    Object decode(byte[] data);
+
+    /**
+     * Encodes the specified POJO into a byte array.
+     *
+     * @param data POJO to be encoded
+     * @return byte array.
+     */
+    byte[] encode(Object message);
+
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
new file mode 100644
index 0000000..1573780
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -0,0 +1,24 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+
+public final class SimpleClient {
+    private SimpleClient() {}
+
+    public static void main(String... args) throws Exception {
+        NettyMessagingService messaging = new TestNettyMessagingService(9081);
+        messaging.activate();
+
+        messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
+        Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World");
+        System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
+    }
+
+    public static class TestNettyMessagingService extends NettyMessagingService {
+        public TestNettyMessagingService(int port) throws Exception {
+            super(port);
+            Serializer serializer = new KryoSerializer();
+            this.serializer = serializer;
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
new file mode 100644
index 0000000..12fa025
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -0,0 +1,19 @@
+package org.onlab.netty;
+
+public final class SimpleServer {
+    private SimpleServer() {}
+
+    public static void main(String... args) throws Exception {
+        NettyMessagingService server = new TestNettyMessagingService();
+        server.activate();
+        server.registerHandler("simple", new LoggingHandler());
+        server.registerHandler("echo", new EchoHandler());
+    }
+
+    public static class TestNettyMessagingService extends NettyMessagingService {
+        protected TestNettyMessagingService() {
+            Serializer serializer = new KryoSerializer();
+            this.serializer = serializer;
+        }
+    }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/package-info.java b/utils/netty/src/main/java/org/onlab/netty/package-info.java
new file mode 100644
index 0000000..b1b90a3
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Asynchronous messaging APIs implemented using the Netty framework.
+ */
+package org.onlab.netty;
\ No newline at end of file