Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/mobility/pom.xml b/apps/mobility/pom.xml
new file mode 100644
index 0000000..a919ff2
--- /dev/null
+++ b/apps/mobility/pom.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onlab.onos</groupId>
+        <artifactId>onos-apps</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>onos-app-mobility</artifactId>
+    <packaging>bundle</packaging>
+
+    <description>ONOS simple Mobility app</description>
+
+</project>
diff --git a/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java b/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java
new file mode 100644
index 0000000..7958f99
--- /dev/null
+++ b/apps/mobility/src/main/java/org/onlab/onos/mobility/HostMobility.java
@@ -0,0 +1,121 @@
+package org.onlab.onos.mobility;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.ApplicationId;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Host;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleService;
+import org.onlab.onos.net.flow.criteria.Criteria.EthCriterion;
+import org.onlab.onos.net.flow.criteria.Criterion;
+import org.onlab.onos.net.flow.criteria.Criterion.Type;
+import org.onlab.onos.net.host.HostEvent;
+import org.onlab.onos.net.host.HostListener;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.packet.MacAddress;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * Sample mobility application. Cleans up flowmods when a host moves.
+ */
+@Component(immediate = true)
+public class HostMobility {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected HostService hostService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    private ApplicationId appId;
+
+    @Activate
+    public void activate() {
+        appId = ApplicationId.getAppId();
+        hostService.addListener(new InternalHostListener());
+        log.info("Started with Application ID {}", appId.id());
+    }
+
+    @Deactivate
+    public void deactivate() {
+        flowRuleService.removeFlowRulesById(appId);
+        log.info("Stopped");
+    }
+
+    public class InternalHostListener
+    implements HostListener {
+
+        @Override
+        public void event(HostEvent event) {
+            switch (event.type()) {
+                case HOST_ADDED:
+                case HOST_REMOVED:
+                case HOST_UPDATED:
+                    // don't care if a host has been added, removed.
+                    break;
+                case HOST_MOVED:
+                    log.info("Host {} has moved; cleaning up.", event.subject());
+                    cleanup(event.subject());
+                    break;
+
+                default:
+                    break;
+
+            }
+
+        }
+
+        /**
+         * For a given host, remove any flow rule which references it's addresses.
+         * @param host the host to clean up for
+         */
+        private void cleanup(Host host) {
+            Iterable<Device> devices = deviceService.getDevices();
+            List<FlowRule> flowRules = Lists.newLinkedList();
+            for (Device device : devices) {
+                   flowRules.addAll(cleanupDevice(device, host));
+            }
+            FlowRule[] flows = new FlowRule[flowRules.size()];
+            flows = flowRules.toArray(flows);
+            flowRuleService.removeFlowRules(flows);
+        }
+
+        private Collection<? extends FlowRule> cleanupDevice(Device device, Host host) {
+            List<FlowRule> flowRules = Lists.newLinkedList();
+            MacAddress mac = host.mac();
+            for (FlowRule rule : flowRuleService.getFlowEntries(device.id())) {
+                for (Criterion c : rule.selector().criteria()) {
+                    if (c.type() == Type.ETH_DST || c.type() == Type.ETH_SRC) {
+                        EthCriterion eth = (EthCriterion) c;
+                        if (eth.mac().equals(mac)) {
+                            flowRules.add(rule);
+                        }
+                    }
+                }
+            }
+            //TODO: handle ip cleanup
+            return flowRules;
+        }
+
+    }
+
+}
+
+
diff --git a/apps/mobility/src/main/java/org/onlab/onos/mobility/package-info.java b/apps/mobility/src/main/java/org/onlab/onos/mobility/package-info.java
new file mode 100644
index 0000000..ea5bdf0
--- /dev/null
+++ b/apps/mobility/src/main/java/org/onlab/onos/mobility/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Trivial application that provides simple form of reactive forwarding.
+ */
+package org.onlab.onos.mobility;
diff --git a/apps/pom.xml b/apps/pom.xml
index 995a8d6..010c531 100644
--- a/apps/pom.xml
+++ b/apps/pom.xml
@@ -20,7 +20,8 @@
         <module>tvue</module>
         <module>fwd</module>
         <module>foo</module>
-	<module>config</module>
+        <module>mobility</module>
+	    <module>config</module>
     </modules>
 
     <properties>
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 4babb50..89c3399 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -161,7 +161,7 @@
             switch (stored.state()) {
             case ADDED:
             case PENDING_ADD:
-                frp.applyFlowRule(flowRule);
+                frp.applyFlowRule(stored);
                 break;
             case PENDING_REMOVE:
             case REMOVED:
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java
new file mode 100644
index 0000000..22ed9de
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationAdminService.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+/**
+ * Service for administering communications manager.
+ */
+public interface ClusterCommunicationAdminService {
+
+    /**
+     * Adds the node to the list of monitored nodes.
+     *
+     * @param node node to be added
+     */
+    void addNode(DefaultControllerNode node);
+
+    /**
+     * Removes the node from the list of monitored nodes.
+     *
+     * @param node node to be removed
+     */
+    void removeNode(DefaultControllerNode node);
+
+    /**
+     * Starts-up the communications engine.
+     *
+     * @param localNode local controller node
+     * @param delegate nodes delegate
+     */
+    void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
+
+    /**
+     * Clears all nodes and streams as part of leaving the cluster.
+     */
+    void clearAllNodesAndStreams();
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
new file mode 100644
index 0000000..2e2887c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -0,0 +1,354 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
+import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.packet.IpPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.net.InetAddress.getByAddress;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Implements the cluster communication services to use by other stores.
+ */
+@Component(immediate = true)
+@Service
+public class ClusterCommunicationManager
+        implements ClusterCommunicationService, ClusterCommunicationAdminService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
+    private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
+
+    private static final long START_TIMEOUT = 1000;
+    private static final int WORKERS = 3;
+
+    private ClusterConnectionListener connectionListener;
+    private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
+
+    private DefaultControllerNode localNode;
+    private ClusterNodesDelegate nodesDelegate;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected SerializationService serializationService;
+
+    // Nodes to be monitored to make sure they have a connection.
+    private final Set<DefaultControllerNode> nodes = new HashSet<>();
+
+    // Means to track message streams to other nodes.
+    private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
+
+    // TODO: use something different that won't require synchronization
+    private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
+
+    // Executor pools for listening and managing connections to other nodes.
+    private final ExecutorService listenExecutor =
+            Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
+    private final ExecutorService commExecutors =
+            Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
+    private final ExecutorService heartbeatExecutor =
+            Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
+
+    private final Timer timer = new Timer("onos-comm-initiator");
+    private final TimerTask connectionCustodian = new ConnectionCustodian();
+    private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
+
+    @Activate
+    public void activate() {
+        addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
+        addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
+        log.info("Activated but waiting for delegate");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
+        removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
+
+        connectionCustodian.cancel();
+        if (connectionListener != null) {
+            connectionListener.shutdown();
+            for (ClusterIOWorker worker : workers) {
+                worker.shutdown();
+            }
+        }
+        log.info("Stopped");
+    }
+
+    @Override
+    public boolean send(ClusterMessage message) {
+        boolean ok = true;
+        for (DefaultControllerNode node : nodes) {
+            if (!node.equals(localNode)) {
+                ok = send(message, node.id()) && ok;
+            }
+        }
+        return ok;
+    }
+
+    @Override
+    public boolean send(ClusterMessage message, NodeId toNodeId) {
+        ClusterMessageStream stream = streams.get(toNodeId);
+        if (stream != null && !toNodeId.equals(localNode.id())) {
+            try {
+                stream.write(message);
+                return true;
+            } catch (IOException e) {
+                log.warn("Unable to send message {} to node {}",
+                         message.subject(), toNodeId);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized void addSubscriber(MessageSubject subject,
+                                           MessageSubscriber subscriber) {
+        subscribers.put(subject, subscriber);
+    }
+
+    @Override
+    public synchronized void removeSubscriber(MessageSubject subject,
+                                              MessageSubscriber subscriber) {
+        subscribers.remove(subject, subscriber);
+    }
+
+    @Override
+    public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
+        return ImmutableSet.copyOf(subscribers.get(subject));
+    }
+
+    @Override
+    public void addNode(DefaultControllerNode node) {
+        nodes.add(node);
+    }
+
+    @Override
+    public void removeNode(DefaultControllerNode node) {
+        send(new LeavingMemberMessage(node.id()));
+        nodes.remove(node);
+        ClusterMessageStream stream = streams.remove(node.id());
+        if (stream != null) {
+            stream.close();
+        }
+    }
+
+    @Override
+    public void startUp(DefaultControllerNode localNode,
+                        ClusterNodesDelegate delegate) {
+        this.localNode = localNode;
+        this.nodesDelegate = delegate;
+
+        startCommunications();
+        startListening();
+        startInitiatingConnections();
+        log.info("Started");
+    }
+
+    @Override
+    public void clearAllNodesAndStreams() {
+        nodes.clear();
+        send(new LeavingMemberMessage(localNode.id()));
+        for (ClusterMessageStream stream : streams.values()) {
+            stream.close();
+        }
+        streams.clear();
+    }
+
+    /**
+     * Dispatches the specified message to all subscribers to its subject.
+     *
+     * @param message    message to dispatch
+     * @param fromNodeId node from which the message was received
+     */
+    void dispatch(ClusterMessage message, NodeId fromNodeId) {
+        Set<MessageSubscriber> set = getSubscribers(message.subject());
+        if (set != null) {
+            for (MessageSubscriber subscriber : set) {
+                subscriber.receive(message, fromNodeId);
+            }
+        }
+    }
+
+    /**
+     * Adds the stream associated with the specified node.
+     *
+     * @param nodeId  newly detected cluster node id
+     * @param ip      node IP listen address
+     * @param tcpPort node TCP listen port
+     * @return controller node bound to the stream
+     */
+    DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
+                                        ClusterMessageStream stream) {
+        DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
+        stream.setNode(node);
+        streams.put(node.id(), stream);
+        send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
+        return node;
+    }
+
+    /**
+     * Removes the stream associated with the specified node.
+     *
+     * @param node node whose stream to remove
+     */
+    void removeNodeStream(DefaultControllerNode node) {
+        nodesDelegate.nodeVanished(node.id());
+        streams.remove(node.id());
+    }
+
+    /**
+     * Finds the least utilized IO worker.
+     *
+     * @return IO worker
+     */
+    ClusterIOWorker findWorker() {
+        ClusterIOWorker leastUtilized = null;
+        int minCount = Integer.MAX_VALUE;
+        for (ClusterIOWorker worker : workers) {
+            int count = worker.streamCount();
+            if (count == 0) {
+                return worker;
+            }
+
+            if (count < minCount) {
+                leastUtilized = worker;
+                minCount = count;
+            }
+        }
+        return leastUtilized;
+    }
+
+    /**
+     * Kicks off the IO loops and waits for them to startup.
+     */
+    private void startCommunications() {
+        HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
+                                              localNode.tcpPort());
+        for (int i = 0; i < WORKERS; i++) {
+            try {
+                ClusterIOWorker worker =
+                        new ClusterIOWorker(this, serializationService, hello);
+                workers.add(worker);
+                commExecutors.execute(worker);
+            } catch (IOException e) {
+                log.warn("Unable to start communication worker", e);
+            }
+        }
+
+        // Wait for the IO loops to start
+        for (ClusterIOWorker loop : workers) {
+            if (!loop.awaitStart(START_TIMEOUT)) {
+                log.warn("Comm loop did not start on-time; moving on...");
+            }
+        }
+    }
+
+    /**
+     * Starts listening for connections from peer cluster members.
+     */
+    private void startListening() {
+        try {
+            connectionListener =
+                    new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
+            listenExecutor.execute(connectionListener);
+            if (!connectionListener.awaitStart(START_TIMEOUT)) {
+                log.warn("Listener did not start on-time; moving on...");
+            }
+        } catch (IOException e) {
+            log.error("Unable to listen for cluster connections", e);
+        }
+    }
+
+    /**
+     * Attempts to connect to any nodes that do not have an associated connection.
+     */
+    private void startInitiatingConnections() {
+        timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
+                       CONNECTION_CUSTODIAN_FREQUENCY);
+    }
+
+    /**
+     * Initiates open connection request and registers the pending socket
+     * channel with the given IO worker.
+     *
+     * @param worker loop with which the channel should be registered
+     * @throws java.io.IOException if the socket could not be open or connected
+     */
+    private void initiateConnection(DefaultControllerNode node,
+                                    ClusterIOWorker worker) throws IOException {
+        SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
+        SocketChannel ch = SocketChannel.open();
+        ch.configureBlocking(false);
+        ch.connect(sa);
+        worker.connectStream(ch);
+    }
+
+    // Sweeps through all controller nodes and attempts to open connection to
+    // those that presently do not have one.
+    private class ConnectionCustodian extends TimerTask {
+        @Override
+        public void run() {
+            for (DefaultControllerNode node : nodes) {
+                if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
+                    try {
+                        initiateConnection(node, findWorker());
+                    } catch (IOException e) {
+                        log.debug("Unable to connect", e);
+                    }
+                }
+            }
+        }
+    }
+
+    private class MembershipSubscriber implements MessageSubscriber {
+        @Override
+        public void receive(ClusterMessage message, NodeId fromNodeId) {
+            MessageSubject subject = message.subject();
+            ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
+            if (message.subject() == MessageSubject.NEW_MEMBER) {
+                log.info("Node {} arrived", cmm.nodeId());
+                nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
+
+            } else if (subject == MessageSubject.LEAVING_MEMBER) {
+                log.info("Node {} is leaving", cmm.nodeId());
+                nodesDelegate.nodeRemoved(cmm.nodeId());
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
index ae4a76f..36d5ab4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
@@ -23,12 +23,12 @@
     private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
     private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
 
-    private final WorkerFinder workerFinder;
+    private final ClusterCommunicationManager manager;
 
-    ClusterConnectionListener(IpPrefix ip, int tcpPort,
-                              WorkerFinder workerFinder) throws IOException {
+    ClusterConnectionListener(ClusterCommunicationManager manager,
+                              IpPrefix ip, int tcpPort) throws IOException {
         super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
-        this.workerFinder = workerFinder;
+        this.manager = manager;
     }
 
     @Override
@@ -41,7 +41,7 @@
         so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
         so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
 
-        workerFinder.findWorker().acceptStream(sc);
+        manager.findWorker().acceptStream(sc);
     }
 
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
index 0e93985..d442cc8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
@@ -3,8 +3,9 @@
 import org.onlab.nio.IOLoop;
 import org.onlab.nio.MessageStream;
 import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
 import org.onlab.onos.store.cluster.messaging.SerializationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,27 +30,23 @@
 
     private static final long SELECT_TIMEOUT = 50;
 
-    private final ConnectionManager connectionManager;
-    private final CommunicationsDelegate commsDelegate;
+    private final ClusterCommunicationManager manager;
     private final SerializationService serializationService;
     private final ClusterMessage helloMessage;
 
     /**
      * Creates a new cluster IO worker.
      *
-     * @param connectionManager    parent connection manager
-     * @param commsDelegate        communications delegate for dispatching
+     * @param manager              parent comms manager
      * @param serializationService serialization service for encode/decode
      * @param helloMessage         hello message for greeting peers
      * @throws IOException if errors occur during IO loop ignition
      */
-    ClusterIOWorker(ConnectionManager connectionManager,
-                    CommunicationsDelegate commsDelegate,
+    ClusterIOWorker(ClusterCommunicationManager manager,
                     SerializationService serializationService,
                     ClusterMessage helloMessage) throws IOException {
         super(SELECT_TIMEOUT);
-        this.connectionManager = connectionManager;
-        this.commsDelegate = commsDelegate;
+        this.manager = manager;
         this.serializationService = serializationService;
         this.helloMessage = helloMessage;
     }
@@ -61,11 +58,27 @@
 
     @Override
     protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
+        NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
         for (ClusterMessage message : messages) {
-            commsDelegate.dispatch(message);
+            manager.dispatch(message, nodeId);
         }
     }
 
+    // Retrieves the node from the stream. If one is not bound, it attempts
+    // to bind it using the knowledge that the first message must be a hello.
+    private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
+        DefaultControllerNode node = stream.node();
+        if (node == null && !messages.isEmpty()) {
+            ClusterMessage firstMessage = messages.get(0);
+            if (firstMessage instanceof HelloMessage) {
+                HelloMessage hello = (HelloMessage) firstMessage;
+                node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
+                                             hello.tcpPort(), stream);
+            }
+        }
+        return node != null ? node.id() : null;
+    }
+
     @Override
     public ClusterMessageStream acceptStream(SocketChannel channel) {
         ClusterMessageStream stream = super.acceptStream(channel);
@@ -99,7 +112,7 @@
         DefaultControllerNode node = ((ClusterMessageStream) stream).node();
         if (node != null) {
             log.info("Closed connection to node {}", node.id());
-            connectionManager.removeNodeStream(node);
+            manager.removeNodeStream(node);
         }
         super.removeStream(stream);
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
index 0970726..d182aa1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
@@ -1,8 +1,10 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.cluster.impl;
 
 import org.onlab.nio.IOLoop;
 import org.onlab.nio.MessageStream;
 import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
index b822304..b82a835 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
@@ -1,6 +1,8 @@
 package org.onlab.onos.store.cluster.impl;
 
 import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
 
 /**
  * Simple back interface through which connection manager can interact with
@@ -9,17 +11,27 @@
 public interface ClusterNodesDelegate {
 
     /**
-     * Notifies about a new cluster node being detected.
+     * Notifies about cluster node coming online.
      *
-     * @param node newly detected cluster node
+     * @param nodeId  newly detected cluster node id
+     * @param ip      node IP listen address
+     * @param tcpPort node TCP listen port
+     * @return the controller node
      */
-    void nodeDetected(DefaultControllerNode node);
+    DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
 
     /**
      * Notifies about cluster node going offline.
      *
-     * @param node cluster node that vanished
+     * @param nodeId identifier of the cluster node that vanished
      */
-    void nodeVanished(DefaultControllerNode node);
+    void nodeVanished(NodeId nodeId);
+
+    /**
+     * Notifies about remote request to remove node from cluster.
+     *
+     * @param nodeId identifier of the cluster node that was removed
+     */
+    void nodeRemoved(NodeId nodeId);
 
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java
deleted file mode 100644
index e74d14b..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-
-/**
- * Simple back interface for interacting with the communications service.
- */
-public interface CommunicationsDelegate {
-
-    /**
-     * Dispatches the specified message to all registered subscribers.
-     *
-     * @param message message to be dispatched
-     */
-    void dispatch(ClusterMessage message);
-
-    /**
-     * Sets the sender.
-     *
-     * @param messageSender message sender
-     */
-    void setSender(MessageSender messageSender);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ConnectionManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ConnectionManager.java
deleted file mode 100644
index fac3c21..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ConnectionManager.java
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static java.net.InetAddress.getByAddress;
-import static org.onlab.util.Tools.namedThreads;
-
-/**
- * Manages connections to other controller cluster nodes.
- */
-public class ConnectionManager implements MessageSender {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
-    private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
-
-    private static final long START_TIMEOUT = 1000;
-    private static final int WORKERS = 3;
-
-    private ClusterConnectionListener connectionListener;
-    private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
-
-    private final DefaultControllerNode localNode;
-    private final ClusterNodesDelegate nodesDelegate;
-    private final CommunicationsDelegate commsDelegate;
-    private final SerializationService serializationService;
-
-    // Nodes to be monitored to make sure they have a connection.
-    private final Set<DefaultControllerNode> nodes = new HashSet<>();
-
-    // Means to track message streams to other nodes.
-    private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
-
-    // Executor pools for listening and managing connections to other nodes.
-    private final ExecutorService listenExecutor =
-            Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
-    private final ExecutorService commExecutors =
-            Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
-    private final ExecutorService heartbeatExecutor =
-            Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
-
-    private final Timer timer = new Timer("onos-comm-initiator");
-    private final TimerTask connectionCustodian = new ConnectionCustodian();
-
-    private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
-
-
-    /**
-     * Creates a new connection manager.
-     */
-    ConnectionManager(DefaultControllerNode localNode,
-                      ClusterNodesDelegate nodesDelegate,
-                      CommunicationsDelegate commsDelegate,
-                      SerializationService serializationService) {
-        this.localNode = localNode;
-        this.nodesDelegate = nodesDelegate;
-        this.commsDelegate = commsDelegate;
-        this.serializationService = serializationService;
-
-        commsDelegate.setSender(this);
-        startCommunications();
-        startListening();
-        startInitiating();
-        log.info("Started");
-    }
-
-    /**
-     * Shuts down the connection manager.
-     */
-    void shutdown() {
-        connectionListener.shutdown();
-        for (ClusterIOWorker worker : workers) {
-            worker.shutdown();
-        }
-        log.info("Stopped");
-    }
-
-    /**
-     * Adds the node to the list of monitored nodes.
-     *
-     * @param node node to be added
-     */
-    void addNode(DefaultControllerNode node) {
-        nodes.add(node);
-    }
-
-    /**
-     * Removes the node from the list of monitored nodes.
-     *
-     * @param node node to be removed
-     */
-    void removeNode(DefaultControllerNode node) {
-        nodes.remove(node);
-        ClusterMessageStream stream = streams.remove(node.id());
-        if (stream != null) {
-            stream.close();
-        }
-    }
-
-    /**
-     * Removes the stream associated with the specified node.
-     *
-     * @param node node whose stream to remove
-     */
-    void removeNodeStream(DefaultControllerNode node) {
-        nodesDelegate.nodeVanished(node);
-        streams.remove(node.id());
-    }
-
-    @Override
-    public boolean send(NodeId nodeId, ClusterMessage message) {
-        ClusterMessageStream stream = streams.get(nodeId);
-        if (stream != null) {
-            try {
-                stream.write(message);
-                return true;
-            } catch (IOException e) {
-                log.warn("Unable to send a message about {} to node {}",
-                         message.subject(), nodeId);
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Kicks off the IO loops and waits for them to startup.
-     */
-    private void startCommunications() {
-        HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
-                                              localNode.tcpPort());
-        for (int i = 0; i < WORKERS; i++) {
-            try {
-                ClusterIOWorker worker =
-                        new ClusterIOWorker(this, commsDelegate,
-                                            serializationService, hello);
-                workers.add(worker);
-                commExecutors.execute(worker);
-            } catch (IOException e) {
-                log.warn("Unable to start communication worker", e);
-            }
-        }
-
-        // Wait for the IO loops to start
-        for (ClusterIOWorker loop : workers) {
-            if (!loop.awaitStart(START_TIMEOUT)) {
-                log.warn("Comm loop did not start on-time; moving on...");
-            }
-        }
-    }
-
-    /**
-     * Starts listening for connections from peer cluster members.
-     */
-    private void startListening() {
-        try {
-            connectionListener =
-                    new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
-                                                  workerFinder);
-            listenExecutor.execute(connectionListener);
-            if (!connectionListener.awaitStart(START_TIMEOUT)) {
-                log.warn("Listener did not start on-time; moving on...");
-            }
-        } catch (IOException e) {
-            log.error("Unable to listen for cluster connections", e);
-        }
-    }
-
-    /**
-     * Initiates open connection request and registers the pending socket
-     * channel with the given IO loop.
-     *
-     * @param loop loop with which the channel should be registered
-     * @throws java.io.IOException if the socket could not be open or connected
-     */
-    private void initiateConnection(DefaultControllerNode node,
-                                    ClusterIOWorker loop) throws IOException {
-        SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
-        SocketChannel ch = SocketChannel.open();
-        ch.configureBlocking(false);
-        ch.connect(sa);
-        loop.connectStream(ch);
-    }
-
-
-    /**
-     * Attempts to connect to any nodes that do not have an associated connection.
-     */
-    private void startInitiating() {
-        timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
-                       CONNECTION_CUSTODIAN_FREQUENCY);
-    }
-
-    // Sweeps through all controller nodes and attempts to open connection to
-    // those that presently do not have one.
-    private class ConnectionCustodian extends TimerTask {
-        @Override
-        public void run() {
-            for (DefaultControllerNode node : nodes) {
-                if (node != localNode && !streams.containsKey(node.id())) {
-                    try {
-                        initiateConnection(node, workerFinder.findWorker());
-                    } catch (IOException e) {
-                        log.debug("Unable to connect", e);
-                    }
-                }
-            }
-        }
-    }
-
-    // Finds the least utilitied IO loop.
-    private class LeastUtilitiedWorkerFinder implements WorkerFinder {
-
-        @Override
-        public ClusterIOWorker findWorker() {
-            ClusterIOWorker leastUtilized = null;
-            int minCount = Integer.MAX_VALUE;
-            for (ClusterIOWorker worker : workers) {
-                int count = worker.streamCount();
-                if (count == 0) {
-                    return worker;
-                }
-
-                if (count < minCount) {
-                    leastUtilized = worker;
-                    minCount = count;
-                }
-            }
-            return leastUtilized;
-        }
-    }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index ae04226..d4b7289 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -14,7 +14,6 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
 import org.onlab.packet.IpPrefix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,20 +42,20 @@
     private final Map<NodeId, State> states = new ConcurrentHashMap<>();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private CommunicationsDelegate commsDelegate;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private SerializationService serializationService;
+    private ClusterCommunicationAdminService communicationAdminService;
 
     private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
-    private ConnectionManager connectionManager;
 
     @Activate
     public void activate() {
         loadClusterDefinition();
         establishSelfIdentity();
-        connectionManager = new ConnectionManager(localNode, nodesDelegate,
-                                                  commsDelegate, serializationService);
+
+        // Start-up the comm service and prime it with the loaded nodes.
+        communicationAdminService.startUp(localNode, nodesDelegate);
+        for (DefaultControllerNode node : nodes.values()) {
+            communicationAdminService.addNode(node);
+        }
         log.info("Started");
     }
 
@@ -92,8 +91,8 @@
         if (localNode == null) {
             localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
             nodes.put(localNode.id(), localNode);
-            states.put(localNode.id(), State.ACTIVE);
         }
+        states.put(localNode.id(), State.ACTIVE);
     }
 
     @Override
@@ -122,29 +121,48 @@
     public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
         DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
         nodes.put(nodeId, node);
-        connectionManager.addNode(node);
+        communicationAdminService.addNode(node);
         return node;
     }
 
     @Override
     public void removeNode(NodeId nodeId) {
-        DefaultControllerNode node = nodes.remove(nodeId);
-        if (node != null) {
-            connectionManager.removeNode(node);
+        if (nodeId.equals(localNode.id())) {
+            // We are being ejected from the cluster, so remove all other nodes.
+            communicationAdminService.clearAllNodesAndStreams();
+            nodes.clear();
+            nodes.put(localNode.id(), localNode);
+
+        } else {
+            // Remove the other node.
+            DefaultControllerNode node = nodes.remove(nodeId);
+            if (node != null) {
+                communicationAdminService.removeNode(node);
+            }
         }
     }
 
     // Entity to handle back calls from the connection manager.
     private class InnerNodesDelegate implements ClusterNodesDelegate {
         @Override
-        public void nodeDetected(DefaultControllerNode node) {
-            nodes.put(node.id(), node);
-            states.put(node.id(), State.ACTIVE);
+        public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+            DefaultControllerNode node = nodes.get(nodeId);
+            if (node == null) {
+                node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
+            }
+            states.put(nodeId, State.ACTIVE);
+            return node;
         }
 
         @Override
-        public void nodeVanished(DefaultControllerNode node) {
-            states.put(node.id(), State.INACTIVE);
+        public void nodeVanished(NodeId nodeId) {
+            states.put(nodeId, State.INACTIVE);
+        }
+
+        @Override
+        public void nodeRemoved(NodeId nodeId) {
+            removeNode(nodeId);
         }
     }
+
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSender.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSender.java
deleted file mode 100644
index 55f868c..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSender.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-
-/**
- * Created by tom on 9/29/14.
- */
-public interface MessageSender {
-
-    /**
-     * Sends the specified message to the given cluster node.
-     *
-     * @param nodeId  node identifier
-     * @param message mesage to send
-     * @return true if the message was sent sucessfully; false if there is
-     * no stream or if there was an error
-     */
-    boolean send(NodeId nodeId, ClusterMessage message);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
new file mode 100644
index 0000000..c6ebca9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -0,0 +1,166 @@
+package org.onlab.onos.store.cluster.impl;
+
+import de.javakaffee.kryoserializers.URISerializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Element;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.EchoMessage;
+import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.onos.store.serializers.ConnectPointSerializer;
+import org.onlab.onos.store.serializers.DefaultLinkSerializer;
+import org.onlab.onos.store.serializers.DefaultPortSerializer;
+import org.onlab.onos.store.serializers.DeviceIdSerializer;
+import org.onlab.onos.store.serializers.IpPrefixSerializer;
+import org.onlab.onos.store.serializers.LinkKeySerializer;
+import org.onlab.onos.store.serializers.NodeIdSerializer;
+import org.onlab.onos.store.serializers.PortNumberSerializer;
+import org.onlab.onos.store.serializers.ProviderIdSerializer;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+@Component(immediate = true)
+@Service
+public class MessageSerializer implements SerializationService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final int METADATA_LENGTH = 12; // 8 + 4
+    private static final int LENGTH_OFFSET = 8;
+
+    private static final long MARKER = 0xfeedcafebeaddeadL;
+
+    private KryoPool serializerPool;
+
+    @Activate
+    public void activate() {
+        setupKryoPool();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    /**
+     * Sets up the common serialzers pool.
+     */
+    protected void setupKryoPool() {
+        // FIXME Slice out types used in common to separate pool/namespace.
+        serializerPool = KryoPool.newBuilder()
+                .register(ArrayList.class,
+                          HashMap.class,
+
+                          ControllerNode.State.class,
+                          Device.Type.class,
+
+                          DefaultControllerNode.class,
+                          DefaultDevice.class,
+                          MastershipRole.class,
+                          Port.class,
+                          Element.class,
+
+                          Link.Type.class,
+
+                          MessageSubject.class,
+                          HelloMessage.class,
+                          NewMemberMessage.class,
+                          LeavingMemberMessage.class,
+                          EchoMessage.class
+                )
+                .register(IpPrefix.class, new IpPrefixSerializer())
+                .register(URI.class, new URISerializer())
+                .register(NodeId.class, new NodeIdSerializer())
+                .register(ProviderId.class, new ProviderIdSerializer())
+                .register(DeviceId.class, new DeviceIdSerializer())
+                .register(PortNumber.class, new PortNumberSerializer())
+                .register(DefaultPort.class, new DefaultPortSerializer())
+                .register(LinkKey.class, new LinkKeySerializer())
+                .register(ConnectPoint.class, new ConnectPointSerializer())
+                .register(DefaultLink.class, new DefaultLinkSerializer())
+                .build()
+                .populate(1);
+    }
+
+
+    @Override
+    public ClusterMessage decode(ByteBuffer buffer) {
+        try {
+            // Do we have enough bytes to read the header? If not, bail.
+            if (buffer.remaining() < METADATA_LENGTH) {
+                return null;
+            }
+
+            // Peek at the length and if we have enough to read the entire message
+            // go ahead, otherwise bail.
+            int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+            if (buffer.remaining() < length) {
+                return null;
+            }
+
+            // At this point, we have enough data to read a complete message.
+            long marker = buffer.getLong();
+            checkState(marker == MARKER, "Incorrect message marker");
+            length = buffer.getInt();
+
+            // TODO: sanity checking for length
+            byte[] data = new byte[length - METADATA_LENGTH];
+            buffer.get(data);
+            return (ClusterMessage) serializerPool.deserialize(data);
+
+        } catch (Exception e) {
+            // TODO: recover from exceptions by forwarding stream to next marker
+            log.warn("Unable to decode message due to: " + e);
+        }
+        return null;
+    }
+
+    @Override
+    public void encode(ClusterMessage message, ByteBuffer buffer) {
+        try {
+            byte[] data = serializerPool.serialize(message);
+            buffer.putLong(MARKER);
+            buffer.putInt(data.length + METADATA_LENGTH);
+            buffer.put(data);
+
+        } catch (Exception e) {
+            // TODO: recover from exceptions by forwarding stream to next marker
+            log.warn("Unable to encode message due to: " + e);
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/WorkerFinder.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/WorkerFinder.java
deleted file mode 100644
index 06f4f8a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/WorkerFinder.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-/**
- * Provides means to find a worker IO loop.
- */
-public interface WorkerFinder {
-
-    /**
-     * Finds a suitable worker.
-     *
-     * @return available worker
-     */
-    ClusterIOWorker findWorker();
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
new file mode 100644
index 0000000..f4b9710
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Distributed cluster store and messaging subsystem implementation.
+ */
+package org.onlab.onos.store.cluster.impl;
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
index 87ed221..fe7fcd3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -10,6 +10,15 @@
 public interface ClusterCommunicationService {
 
     /**
+     * Sends a message to all controller nodes.
+     *
+     * @param message  message to send
+     * @return true if the message was sent sucessfully to all nodes; false
+     * if there is no stream or if there was an error for some node
+     */
+    boolean send(ClusterMessage message);
+
+    /**
      * Sends a message to the specified controller node.
      *
      * @param message  message to send
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMembershipMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMembershipMessage.java
new file mode 100644
index 0000000..ea00185
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMembershipMessage.java
@@ -0,0 +1,66 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Base for cluster membership messages.
+ */
+public abstract class ClusterMembershipMessage extends ClusterMessage {
+
+    private NodeId nodeId;
+    private IpPrefix ipAddress;
+    private int tcpPort;
+
+    // For serialization
+    protected ClusterMembershipMessage() {
+        super(MessageSubject.HELLO);
+        nodeId = null;
+        ipAddress = null;
+        tcpPort = 0;
+    }
+
+    /**
+     * Creates a new membership message for the specified end-point data.
+     *
+     * @param subject   message subject
+     * @param nodeId    sending node identification
+     * @param ipAddress sending node IP address
+     * @param tcpPort   sending node TCP port
+     */
+    protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
+                                       IpPrefix ipAddress, int tcpPort) {
+        super(subject);
+        this.nodeId = nodeId;
+        this.ipAddress = ipAddress;
+        this.tcpPort = tcpPort;
+    }
+
+    /**
+     * Returns the sending node identifer.
+     *
+     * @return node identifier
+     */
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns the sending node IP address.
+     *
+     * @return node IP address
+     */
+    public IpPrefix ipAddress() {
+        return ipAddress;
+    }
+
+    /**
+     * Returns the sending node TCP listen port.
+     *
+     * @return TCP listen port
+     */
+    public int tcpPort() {
+        return tcpPort;
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
index ddc79d3..d692e4e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
@@ -6,18 +6,10 @@
 /**
  * Hello message that nodes use to greet each other.
  */
-public class HelloMessage extends ClusterMessage {
-
-    private NodeId nodeId;
-    private IpPrefix ipAddress;
-    private int tcpPort;
+public class HelloMessage extends ClusterMembershipMessage {
 
     // For serialization
     private HelloMessage() {
-        super(MessageSubject.HELLO);
-        nodeId = null;
-        ipAddress = null;
-        tcpPort = 0;
     }
 
     /**
@@ -28,36 +20,7 @@
      * @param tcpPort   sending node TCP port
      */
     public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
-        super(MessageSubject.HELLO);
-        nodeId = nodeId;
-        ipAddress = ipAddress;
-        tcpPort = tcpPort;
+        super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
     }
 
-    /**
-     * Returns the sending node identifer.
-     *
-     * @return node identifier
-     */
-    public NodeId nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * Returns the sending node IP address.
-     *
-     * @return node IP address
-     */
-    public IpPrefix ipAddress() {
-        return ipAddress;
-    }
-
-    /**
-     * Returns the sending node TCP listen port.
-     *
-     * @return TCP listen port
-     */
-    public int tcpPort() {
-        return tcpPort;
-    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/LeavingMemberMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/LeavingMemberMessage.java
new file mode 100644
index 0000000..59686b8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/LeavingMemberMessage.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Announcement message that nodes use to gossip about team departures.
+ */
+public class LeavingMemberMessage extends ClusterMembershipMessage {
+
+    // For serialization
+    private LeavingMemberMessage() {
+        super();
+    }
+
+    /**
+     * Creates a new goodbye message.
+     *
+     * @param nodeId sending node identification
+     */
+    public LeavingMemberMessage(NodeId nodeId) {
+        super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index 3b888b3..c7badf2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -8,6 +8,12 @@
     /** Represents a first greeting message. */
     HELLO,
 
+    /** Signifies announcement about new member. */
+    NEW_MEMBER,
+
+    /** Signifies announcement about leaving member. */
+    LEAVING_MEMBER,
+
     /** Signifies a heart-beat message. */
     ECHO
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
index 6b78fec..68cd83c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
@@ -1,5 +1,7 @@
 package org.onlab.onos.store.cluster.messaging;
 
+import org.onlab.onos.cluster.NodeId;
+
 /**
  * Represents a message consumer.
  */
@@ -8,8 +10,9 @@
     /**
      * Receives the specified cluster message.
      *
-     * @param message message to be received
+     * @param message    message to be received
+     * @param fromNodeId node from which the message was received
      */
-    void receive(ClusterMessage message);
+    void receive(ClusterMessage message, NodeId fromNodeId);
 
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/NewMemberMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/NewMemberMessage.java
new file mode 100644
index 0000000..53bc282
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/NewMemberMessage.java
@@ -0,0 +1,26 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Announcement message that nodes use to gossip about new arrivals.
+ */
+public class NewMemberMessage extends ClusterMembershipMessage {
+
+    // For serialization
+    private NewMemberMessage() {
+    }
+
+    /**
+     * Creates a new gossip message for the specified end-point data.
+     *
+     * @param nodeId    sending node identification
+     * @param ipAddress sending node IP address
+     * @param tcpPort   sending node TCP port
+     */
+    public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
+        super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
index 79e054b..7521630 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -3,12 +3,12 @@
 import java.nio.ByteBuffer;
 
 /**
- * Service for serializing/deserializing intra-cluster messages.
+ * Service for encoding &amp; decoding intra-cluster messages.
  */
 public interface SerializationService {
 
     /**
-     * Decodes the specified byte buffer to obtain a message within.
+     * Decodes the specified byte buffer to obtain the message within.
      *
      * @param buffer byte buffer with message(s)
      * @return parsed message
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
deleted file mode 100644
index bafb2c3..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
-import org.onlab.onos.store.cluster.impl.MessageSender;
-import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
-
-import java.util.Set;
-
-/**
- * Implements the cluster communication services to use by other stores.
- */
-@Component(immediate = true)
-@Service
-public class ClusterCommunicationManager
-        implements ClusterCommunicationService, CommunicationsDelegate {
-
-    // TODO: use something different that won't require synchronization
-    private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
-    private MessageSender messageSender;
-
-    @Override
-    public boolean send(ClusterMessage message, NodeId toNodeId) {
-        return messageSender.send(toNodeId, message);
-    }
-
-    @Override
-    public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
-        subscribers.put(subject, subscriber);
-    }
-
-    @Override
-    public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
-        subscribers.remove(subject, subscriber);
-    }
-
-    @Override
-    public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
-        return ImmutableSet.copyOf(subscribers.get(subject));
-    }
-
-    @Override
-    public void dispatch(ClusterMessage message) {
-        Set<MessageSubscriber> set = getSubscribers(message.subject());
-        if (set != null) {
-            for (MessageSubscriber subscriber : set) {
-                subscriber.receive(message);
-            }
-        }
-    }
-
-    @Override
-    public void setSender(MessageSender messageSender) {
-        this.messageSender = messageSender;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
deleted file mode 100644
index 93c8310..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-
-import java.nio.ByteBuffer;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Factory for parsing messages sent between cluster members.
- */
-public class MessageSerializer implements SerializationService {
-
-    private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
-    private static final int LENGTH_OFFSET = 12;
-
-    private static final long MARKER = 0xfeedcafebeaddeadL;
-
-    @Override
-    public ClusterMessage decode(ByteBuffer buffer) {
-        try {
-            // Do we have enough bytes to read the header? If not, bail.
-            if (buffer.remaining() < METADATA_LENGTH) {
-                return null;
-            }
-
-            // Peek at the length and if we have enough to read the entire message
-            // go ahead, otherwise bail.
-            int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
-            if (buffer.remaining() < length) {
-                return null;
-            }
-
-            // At this point, we have enough data to read a complete message.
-            long marker = buffer.getLong();
-            checkState(marker == MARKER, "Incorrect message marker");
-
-            int subjectOrdinal = buffer.getInt();
-            MessageSubject subject = MessageSubject.values()[subjectOrdinal];
-            length = buffer.getInt();
-
-            // TODO: sanity checking for length
-            byte[] data = new byte[length - METADATA_LENGTH];
-            buffer.get(data);
-
-            // TODO: add deserialization hook here; for now this hack
-            return null; // actually deserialize
-
-        } catch (Exception e) {
-            // TODO: recover from exceptions by forwarding stream to next marker
-            e.printStackTrace();
-        }
-        return null;
-    }
-
-    @Override
-    public void encode(ClusterMessage message, ByteBuffer buffer) {
-        try {
-            int i = 0;
-            // Type based lookup for proper encoder
-        } catch (Exception e) {
-            // TODO: recover from exceptions by forwarding stream to next marker
-            e.printStackTrace();
-        }
-    }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
new file mode 100644
index 0000000..5276b0b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Cluster messaging APIs for the use by the various distributed stores.
+ */
+package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
new file mode 100644
index 0000000..6ae334b
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -0,0 +1,124 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests of the cluster communication manager.
+ */
+public class ClusterCommunicationManagerTest {
+
+    private static final NodeId N1 = new NodeId("n1");
+    private static final NodeId N2 = new NodeId("n2");
+
+    private static final int P1 = 9881;
+    private static final int P2 = 9882;
+
+    private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+    private ClusterCommunicationManager ccm1;
+    private ClusterCommunicationManager ccm2;
+
+    private TestDelegate cnd1 = new TestDelegate();
+    private TestDelegate cnd2 = new TestDelegate();
+
+    private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
+    private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
+
+    @Before
+    public void setUp() {
+        MessageSerializer messageSerializer = new MessageSerializer();
+        messageSerializer.activate();
+
+        ccm1 = new ClusterCommunicationManager();
+        ccm1.serializationService = messageSerializer;
+        ccm1.activate();
+
+        ccm2 = new ClusterCommunicationManager();
+        ccm2.serializationService = messageSerializer;
+        ccm2.activate();
+
+        ccm1.startUp(node1, cnd1);
+        ccm2.startUp(node2, cnd2);
+    }
+
+    @After
+    public void tearDown() {
+        ccm1.deactivate();
+        ccm2.deactivate();
+    }
+
+    @Test
+    public void connect() throws Exception {
+        cnd1.latch = new CountDownLatch(1);
+        cnd2.latch = new CountDownLatch(1);
+
+        ccm1.addNode(node2);
+        validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
+        validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
+    }
+
+    @Test
+    public void disconnect() throws Exception {
+        cnd1.latch = new CountDownLatch(1);
+        cnd2.latch = new CountDownLatch(1);
+
+        ccm1.addNode(node2);
+        validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
+        validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
+
+        cnd1.latch = new CountDownLatch(1);
+        cnd2.latch = new CountDownLatch(1);
+        ccm1.deactivate();
+//
+//        validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
+    }
+
+    private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
+            throws InterruptedException {
+        assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
+        assertEquals("incorrect event", op, delegate.op);
+        assertEquals("incorrect event node", nodeId, delegate.nodeId);
+    }
+
+    enum Op { DETECTED, VANISHED, REMOVED };
+
+    private class TestDelegate implements ClusterNodesDelegate {
+
+        Op op;
+        CountDownLatch latch;
+        NodeId nodeId;
+
+        @Override
+        public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+            latch(nodeId, Op.DETECTED);
+            return new DefaultControllerNode(nodeId, ip, tcpPort);
+        }
+
+        @Override
+        public void nodeVanished(NodeId nodeId) {
+            latch(nodeId, Op.VANISHED);
+        }
+
+        @Override
+        public void nodeRemoved(NodeId nodeId) {
+            latch(nodeId, Op.REMOVED);
+        }
+
+        private void latch(NodeId nodeId, Op op) {
+            this.op = op;
+            this.nodeId = nodeId;
+            latch.countDown();
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleFlowRuleStore.java
index 38e94aa..6b6c157 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleFlowRuleStore.java
@@ -105,11 +105,9 @@
          */
 
         if (flowEntries.containsEntry(did, f)) {
-            //synchronized (flowEntries) {
             flowEntries.remove(did, f);
             flowEntries.put(did, f);
             flowEntriesById.remove(rule.appId(), rule);
-            //}
         }
     }
 
diff --git a/features/features.xml b/features/features.xml
index c255bf2..d2d9567 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -119,6 +119,14 @@
         <bundle>mvn:org.onlab.onos/onos-app-fwd/1.0.0-SNAPSHOT</bundle>
     </feature>
 
+    <feature name="onos-app-mobility" version="1.0.0"
+             description="ONOS sample forwarding application">
+        <feature>onos-api</feature>
+        <bundle>mvn:org.onlab.onos/onos-app-mobility/1.0.0-SNAPSHOT</bundle>
+    </feature>
+
+
+
     <feature name="onos-app-foo" version="1.0.0"
              description="ONOS sample playground application">
         <feature>onos-api</feature>
diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 69ddc71..4334395 100644
--- a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -243,6 +243,8 @@
                 if (role == RoleState.SLAVE || role == RoleState.EQUAL) {
                     this.role = role;
                 }
+            } else {
+                this.role = role;
             }
         } catch (IOException e) {
             log.error("Unable to write to switch {}.", this.dpid);
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
index 75c139d..3f2984b 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
@@ -651,7 +651,7 @@
          * @param error The error message
          */
         protected void logError(OFChannelHandler h, OFErrorMsg error) {
-            log.error("{} from switch {} in state {}",
+            log.debug("{} from switch {} in state {}",
                     new Object[] {
                     error,
                     h.getSwitchInfoString(),
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index 2822f02..51e4a98 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -77,6 +77,7 @@
                 .setCookie(U64.of(cookie.value()))
                 .setBufferId(OFBufferId.NO_BUFFER)
                 .setActions(actions)
+                .setIdleTimeout(10)
                 .setMatch(match)
                 .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
                 .setPriority(priority)
@@ -104,6 +105,9 @@
 
     private List<OFAction> buildActions() {
         List<OFAction> acts = new LinkedList<>();
+        if (treatment == null) {
+            return acts;
+        }
         for (Instruction i : treatment.instructions()) {
             switch (i.type()) {
             case DROP:
diff --git a/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java b/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java
index d124f2a..4f5bb81 100644
--- a/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java
+++ b/providers/openflow/host/src/main/java/org/onlab/onos/provider/of/host/impl/OpenFlowHostProvider.java
@@ -31,6 +31,7 @@
 import org.onlab.onos.openflow.controller.PacketListener;
 import org.onlab.packet.ARP;
 import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.VlanId;
 import org.slf4j.Logger;
@@ -92,29 +93,37 @@
         public void handlePacket(OpenFlowPacketContext pktCtx) {
             Ethernet eth = pktCtx.parsed();
 
+            VlanId vlan = VlanId.vlanId(eth.getVlanID());
+            ConnectPoint heardOn = new ConnectPoint(deviceId(Dpid.uri(pktCtx.dpid())),
+                    portNumber(pktCtx.inPort()));
+
+         // If this is not an edge port, bail out.
+            Topology topology = topologyService.currentTopology();
+            if (topologyService.isInfrastructure(topology, heardOn)) {
+                return;
+            }
+
+            HostLocation hloc = new HostLocation(deviceId(Dpid.uri(pktCtx.dpid())),
+                    portNumber(pktCtx.inPort()),
+                    System.currentTimeMillis());
+            HostId hid = HostId.hostId(eth.getSourceMAC(), vlan);
             // Potentially a new or moved host
             if (eth.getEtherType() == Ethernet.TYPE_ARP) {
-                VlanId vlan = VlanId.vlanId(eth.getVlanID());
-                ConnectPoint heardOn = new ConnectPoint(deviceId(Dpid.uri(pktCtx.dpid())),
-                        portNumber(pktCtx.inPort()));
 
-                // If this is not an edge port, bail out.
-                Topology topology = topologyService.currentTopology();
-                if (topologyService.isInfrastructure(topology, heardOn)) {
-                    return;
-                }
 
-                HostLocation hloc = new HostLocation(deviceId(Dpid.uri(pktCtx.dpid())),
-                        portNumber(pktCtx.inPort()),
-                        System.currentTimeMillis());
-
-                HostId hid = HostId.hostId(eth.getSourceMAC(), vlan);
                 ARP arp = (ARP) eth.getPayload();
                 Set<IpPrefix> ips = newHashSet(IpPrefix.valueOf(arp.getSenderProtocolAddress()));
                 HostDescription hdescr =
                         new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ips);
                 providerService.hostDetected(hid, hdescr);
 
+            } else if (eth.getEtherType() == Ethernet.TYPE_IPV4) {
+                IPv4 ip = (IPv4) eth.getPayload();
+                Set<IpPrefix> ips = newHashSet(IpPrefix.valueOf(ip.getSourceAddress()));
+                HostDescription hdescr =
+                        new DefaultHostDescription(eth.getSourceMAC(), vlan, hloc, ips);
+                providerService.hostDetected(hid, hdescr);
+
             }
 
             // TODO: Use DHCP packets as well later...
diff --git a/providers/openflow/packet/src/main/java/org/onlab/onos/provider/of/packet/impl/OpenFlowPacketProvider.java b/providers/openflow/packet/src/main/java/org/onlab/onos/provider/of/packet/impl/OpenFlowPacketProvider.java
index cb7f1f9..41cb586 100644
--- a/providers/openflow/packet/src/main/java/org/onlab/onos/provider/of/packet/impl/OpenFlowPacketProvider.java
+++ b/providers/openflow/packet/src/main/java/org/onlab/onos/provider/of/packet/impl/OpenFlowPacketProvider.java
@@ -106,10 +106,6 @@
         for (Instruction inst : packet.treatment().instructions()) {
             if (inst.type().equals(Instruction.Type.OUTPUT)) {
                 p = portDesc(((OutputInstruction) inst).port());
-                /*if (!sw.getPorts().contains(p)) {
-                    log.warn("Tried to write out non-existent port {}", p.getPortNo());
-                    continue;
-                }*/
                 OFPacketOut po = packetOut(sw, eth, p.getPortNo());
                 sw.sendMsg(po);
             }
diff --git a/tools/test/bin/onos b/tools/test/bin/onos
index 74bf1cf..76b5c15 100755
--- a/tools/test/bin/onos
+++ b/tools/test/bin/onos
@@ -3,5 +3,7 @@
 # ONOS remote command-line client.
 #-------------------------------------------------------------------------------
 
+[ "$1" = "-w" ] && shift && onos-wait-for-start $1
+
 [ -n "$1" ] && OCI=$1 && shift
-client -h $OCI "$@"
+client -h $OCI "$@" 2>/dev/null
diff --git a/tools/test/bin/onos-kill b/tools/test/bin/onos-kill
new file mode 100755
index 0000000..6b849d8
--- /dev/null
+++ b/tools/test/bin/onos-kill
@@ -0,0 +1,9 @@
+#!/bin/bash
+#-------------------------------------------------------------------------------
+# Remotely kills the ONOS service on the specified node.
+#-------------------------------------------------------------------------------
+
+[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
+. $ONOS_ROOT/tools/build/envDefaults
+
+ssh $ONOS_USER@${1:-$OCI} "kill -9 \$(ps -ef | grep karaf.jar | grep -v grep | cut -c10-15)"
\ No newline at end of file
diff --git a/tools/test/cells/office b/tools/test/cells/office
new file mode 100644
index 0000000..0edf04d
--- /dev/null
+++ b/tools/test/cells/office
@@ -0,0 +1,10 @@
+# ProxMox-based cell of ONOS instances 1,2 & ONOS mininet box
+. $ONOS_ROOT/tools/test/cells/.reset
+
+export ONOS_NIC="10.128.4.*"
+
+export OC1="10.128.4.60"
+#export OC2="192.168.97.131"
+
+#export OCN="192.168.97.130"
+
diff --git a/tools/test/cells/tom b/tools/test/cells/tom
new file mode 100644
index 0000000..2eb0523
--- /dev/null
+++ b/tools/test/cells/tom
@@ -0,0 +1,10 @@
+# Default virtual box ONOS instances 1,2 & ONOS mininet box
+
+export ONOS_NIC=192.168.56.*
+
+export OC1="192.168.56.11"
+export OC2="192.168.56.12"
+
+export OCN="192.168.56.7"
+
+