Merge remote-tracking branch 'origin/master'
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/RoleInfo.java b/core/api/src/main/java/org/onlab/onos/cluster/RoleInfo.java
index d2bee8b..45b96ab 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/RoleInfo.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/RoleInfo.java
@@ -1,9 +1,10 @@
 package org.onlab.onos.cluster;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  * A container for detailed role information for a device,
  * within the current cluster. Role attributes include current
@@ -15,7 +16,7 @@
 
     public RoleInfo(NodeId master, List<NodeId> backups) {
         this.master = master;
-        this.backups = Collections.unmodifiableList(backups);
+        this.backups = ImmutableList.copyOf(backups);
     }
 
     public NodeId master() {
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java
index b17449d..97efa5a 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleEvent.java
@@ -24,7 +24,18 @@
         /**
          * Signifies that a rule has been updated.
          */
-        RULE_UPDATED
+        RULE_UPDATED,
+
+        // internal event between Manager <-> Store
+
+        /*
+         * Signifies that a request to add flow rule has been added to the store.
+         */
+        RULE_ADD_REQUESTED,
+        /*
+         * Signifies that a request to remove flow rule has been added to the store.
+         */
+        RULE_REMOVE_REQUESTED,
     }
 
     /**
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 5ce7eb1..abb9a10 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -44,16 +44,18 @@
      * Stores a new flow rule without generating events.
      *
      * @param rule the flow rule to add
+     * @return true if the rule should be handled locally
      */
-    void storeFlowRule(FlowRule rule);
+    boolean storeFlowRule(FlowRule rule);
 
     /**
      * Marks a flow rule for deletion. Actual deletion will occur
      * when the provider indicates that the flow has been removed.
      *
      * @param rule the flow rule to delete
+     * @return true if the rule should be handled locally
      */
-    void deleteFlowRule(FlowRule rule);
+    boolean deleteFlowRule(FlowRule rule);
 
     /**
      * Stores a new flow rule, or updates an existing entry.
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 9ea99c3..525946e 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -104,24 +104,52 @@
     public void applyFlowRules(FlowRule... flowRules) {
         for (int i = 0; i < flowRules.length; i++) {
             FlowRule f = flowRules[i];
-            final Device device = deviceService.getDevice(f.deviceId());
-            final FlowRuleProvider frp = getProvider(device.providerId());
-            store.storeFlowRule(f);
-            frp.applyFlowRule(f);
+            boolean local = store.storeFlowRule(f);
+            if (local) {
+                // TODO: aggregate all local rules and push down once?
+                applyFlowRulesToProviders(f);
+            }
+        }
+    }
+
+    private void applyFlowRulesToProviders(FlowRule... flowRules) {
+        DeviceId did = null;
+        FlowRuleProvider frp = null;
+        for (FlowRule f : flowRules) {
+            if (!f.deviceId().equals(did)) {
+                did = f.deviceId();
+                final Device device = deviceService.getDevice(did);
+                frp = getProvider(device.providerId());
+            }
+            if (frp != null) {
+                frp.applyFlowRule(f);
+            }
         }
     }
 
     @Override
     public void removeFlowRules(FlowRule... flowRules) {
         FlowRule f;
-        FlowRuleProvider frp;
-        Device device;
         for (int i = 0; i < flowRules.length; i++) {
             f = flowRules[i];
-            device = deviceService.getDevice(f.deviceId());
-            store.deleteFlowRule(f);
-            if (device != null) {
+            boolean local = store.deleteFlowRule(f);
+            if (local) {
+                // TODO: aggregate all local rules and push down once?
+                removeFlowRulesFromProviders(f);
+            }
+        }
+    }
+
+    private void removeFlowRulesFromProviders(FlowRule... flowRules) {
+        DeviceId did = null;
+        FlowRuleProvider frp = null;
+        for (FlowRule f : flowRules) {
+            if (!f.deviceId().equals(did)) {
+                did = f.deviceId();
+                final Device device = deviceService.getDevice(did);
                 frp = getProvider(device.providerId());
+            }
+            if (frp != null) {
                 frp.removeFlowRule(f);
             }
         }
@@ -135,8 +163,11 @@
 
         for (FlowRule f : rules) {
             store.deleteFlowRule(f);
+            // FIXME: only accept request and push to provider on internal event
             device = deviceService.getDevice(f.deviceId());
             frp = getProvider(device.providerId());
+            // FIXME: flows removed from store and flows removed from might diverge
+            //        get rid of #removeRulesById?
             frp.removeRulesById(id, f);
         }
     }
@@ -352,7 +383,23 @@
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
         @Override
         public void notify(FlowRuleEvent event) {
-            eventDispatcher.post(event);
+            switch (event.type()) {
+            case RULE_ADD_REQUESTED:
+                applyFlowRulesToProviders(event.subject());
+                break;
+            case RULE_REMOVE_REQUESTED:
+                removeFlowRulesFromProviders(event.subject());
+                break;
+
+            case RULE_ADDED:
+            case RULE_REMOVED:
+            case RULE_UPDATED:
+                // only dispatch events related to switch
+                eventDispatcher.post(event);
+                break;
+            default:
+                break;
+            }
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index e5b2ed6..3da5d25 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -2,6 +2,7 @@
 
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -30,6 +31,7 @@
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.flow.ReplicaInfo;
 import org.onlab.onos.store.flow.ReplicaInfoService;
@@ -80,10 +82,44 @@
     };
 
     // TODO: make this configurable
-    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
+    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
 
     @Activate
     public void activate() {
+        clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.info("received add request for {}", rule);
+                storeFlowEntryInternal(rule);
+                // FIXME what to respond.
+                try {
+                    // FIXME: #respond() not working. responded message is
+                    // handled by this sender node and never goes back.
+                    message.respond(SERIALIZER.encode("ACK"));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+
+        clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.info("received delete request for {}", rule);
+                deleteFlowRuleInternal(rule);
+                // FIXME what to respond.
+                try {
+                    message.respond(SERIALIZER.encode("ACK"));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+
+            }
+        });
         log.info("Started");
     }
 
@@ -131,13 +167,14 @@
     }
 
     @Override
-    public void storeFlowRule(FlowRule rule) {
+    public boolean storeFlowRule(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            storeFlowEntryInternal(rule);
-            return;
+            return storeFlowEntryInternal(rule);
         }
 
+        log.warn("Not my flow forwarding to {}", replicaInfo.master().orNull());
+
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 FlowStoreMessageSubjects.STORE_FLOW_RULE,
@@ -150,26 +187,29 @@
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
+        return false;
     }
 
-    private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
+    private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
         StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
         DeviceId deviceId = flowRule.deviceId();
         // write to local copy.
         if (!flowEntries.containsEntry(deviceId, flowEntry)) {
             flowEntries.put(deviceId, flowEntry);
             flowEntriesById.put(flowRule.appId(), flowEntry);
+            notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
+            return true;
         }
         // write to backup.
         // TODO: write to a hazelcast map.
+        return false;
     }
 
     @Override
-    public synchronized void deleteFlowRule(FlowRule rule) {
+    public synchronized boolean deleteFlowRule(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            deleteFlowRuleInternal(rule);
-            return;
+            return deleteFlowRuleInternal(rule);
         }
 
         ClusterMessage message = new ClusterMessage(
@@ -184,15 +224,21 @@
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
+        return false;
     }
 
-    private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
+    private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
         StoredFlowEntry entry = getFlowEntryInternal(flowRule);
         if (entry == null) {
-            return;
+            return false;
         }
         entry.setState(FlowEntryState.PENDING_REMOVE);
+
         // TODO: also update backup.
+
+        notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
+
+        return true;
     }
 
     @Override
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 1c16417..9de6d8c 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -8,6 +8,7 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
 import org.onlab.onos.mastership.MastershipTerm;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.DefaultAnnotations;
@@ -27,7 +28,11 @@
 import org.onlab.onos.net.device.DefaultPortDescription;
 import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
+import org.onlab.onos.net.flow.DefaultTrafficTreatment;
 import org.onlab.onos.net.flow.FlowId;
+import org.onlab.onos.net.flow.criteria.Criteria;
+import org.onlab.onos.net.flow.criteria.Criterion;
+import org.onlab.onos.net.flow.instructions.Instructions;
 import org.onlab.onos.net.host.DefaultHostDescription;
 import org.onlab.onos.net.host.HostDescription;
 import org.onlab.onos.net.link.DefaultLinkDescription;
@@ -90,7 +95,21 @@
                     DefaultHostDescription.class,
                     DefaultFlowRule.class,
                     FlowId.class,
-                    DefaultTrafficSelector.class
+                    DefaultTrafficSelector.class,
+                    Criteria.PortCriterion.class,
+                    Criteria.EthCriterion.class,
+                    Criteria.EthTypeCriterion.class,
+                    Criteria.IPCriterion.class,
+                    Criteria.IPProtocolCriterion.class,
+                    Criteria.VlanIdCriterion.class,
+                    Criteria.VlanPcpCriterion.class,
+                    Criteria.TcpPortCriterion.class,
+                    Criterion.class,
+                    Criterion.Type.class,
+                    DefaultTrafficTreatment.class,
+                    Instructions.DropInstruction.class,
+                    Instructions.OutputInstruction.class,
+                    RoleInfo.class
                     )
             .register(URI.class, new URISerializer())
             .register(NodeId.class, new NodeIdSerializer())
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
index f76513d..1bd2097 100644
--- a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
@@ -3,6 +3,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.onlab.onos.net.DeviceId.deviceId;
 import static org.onlab.onos.net.PortNumber.portNumber;
+import static java.util.Arrays.asList;
 
 import java.nio.ByteBuffer;
 
@@ -11,6 +12,7 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
 import org.onlab.onos.mastership.MastershipTerm;
 import org.onlab.onos.net.Annotations;
 import org.onlab.onos.net.ConnectPoint;
@@ -198,6 +200,12 @@
     }
 
     @Test
+    public void testRoleInfo() {
+        testSerialized(new RoleInfo(new NodeId("master"),
+                            asList(new NodeId("stby1"), new NodeId("stby2"))));
+    }
+
+    @Test
     public void testAnnotations() {
         // Annotations does not have equals defined, manually test equality
         final byte[] a1Bytes = serializer.encode(A1);
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 33ba95b..a96cacb 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -148,8 +148,9 @@
     }
 
     @Override
-    public void storeFlowRule(FlowRule rule) {
+    public boolean storeFlowRule(FlowRule rule) {
         final boolean added = storeFlowRuleInternal(rule);
+        return added;
     }
 
     private boolean storeFlowRuleInternal(FlowRule rule) {
@@ -166,13 +167,14 @@
             }
             // new flow rule added
             existing.add(f);
-            // TODO: notify through delegate about remote event?
+            // TODO: Should we notify only if it's "remote" event?
+            //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
             return true;
         }
     }
 
     @Override
-    public void deleteFlowRule(FlowRule rule) {
+    public boolean deleteFlowRule(FlowRule rule) {
 
         List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
         synchronized (entries) {
@@ -180,12 +182,15 @@
                 if (entry.equals(rule)) {
                     synchronized (entry) {
                         entry.setState(FlowEntryState.PENDING_REMOVE);
-                        return;
+                        // TODO: Should we notify only if it's "remote" event?
+                        //notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
+                        return true;
                     }
                 }
             }
         }
         //log.warn("Cannot find rule {}", rule);
+        return false;
     }
 
     @Override