Made "ReactiveFowarding" work in the multiple ONOS instance.
Fixed "Intent" path installed event issue.

Change-Id: Ibc75bc7b0ab0a723d7b8a74d524c80b1fa19465f
diff --git a/src/main/java/net/onrc/onos/apps/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/apps/forwarding/Forwarding.java
index 9dc840b..a4126f6 100644
--- a/src/main/java/net/onrc/onos/apps/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/apps/forwarding/Forwarding.java
@@ -4,9 +4,9 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -18,26 +18,25 @@
 import net.onrc.onos.api.packet.IPacketListener;
 import net.onrc.onos.api.packet.IPacketService;
 import net.onrc.onos.apps.proxyarp.IProxyArpService;
-import net.onrc.onos.core.datagrid.IDatagridService;
-import net.onrc.onos.core.datagrid.IEventChannelListener;
 import net.onrc.onos.core.devicemanager.IOnosDeviceService;
 import net.onrc.onos.core.intent.Intent;
 import net.onrc.onos.core.intent.Intent.IntentState;
 import net.onrc.onos.core.intent.IntentMap;
+import net.onrc.onos.core.intent.IntentMap.ChangedEvent;
+import net.onrc.onos.core.intent.IntentMap.ChangedListener;
 import net.onrc.onos.core.intent.IntentOperation;
 import net.onrc.onos.core.intent.IntentOperationList;
 import net.onrc.onos.core.intent.PathIntent;
 import net.onrc.onos.core.intent.ShortestPathIntent;
 import net.onrc.onos.core.intent.runtime.IPathCalcRuntimeService;
-import net.onrc.onos.core.intent.runtime.IntentStateList;
 import net.onrc.onos.core.packet.Ethernet;
 import net.onrc.onos.core.registry.IControllerRegistryService;
 import net.onrc.onos.core.topology.Device;
 import net.onrc.onos.core.topology.ITopologyService;
 import net.onrc.onos.core.topology.LinkEvent;
-import net.onrc.onos.core.topology.Topology;
 import net.onrc.onos.core.topology.Port;
 import net.onrc.onos.core.topology.Switch;
+import net.onrc.onos.core.topology.Topology;
 import net.onrc.onos.core.util.Dpid;
 import net.onrc.onos.core.util.FlowPath;
 import net.onrc.onos.core.util.SwitchPort;
@@ -50,8 +49,7 @@
 import com.google.common.collect.ListMultimap;
 
 public class Forwarding implements /*IOFMessageListener,*/ IFloodlightModule,
-        IForwardingService, IEventChannelListener<Long, IntentStateList>,
-        IPacketListener {
+        IForwardingService, IPacketListener, ChangedListener {
     private static final Logger log = LoggerFactory.getLogger(Forwarding.class);
 
     private static final int SLEEP_TIME_FOR_DB_DEVICE_INSTALLED = 100; // milliseconds
@@ -61,7 +59,6 @@
 
     private final String callerId = "Forwarding";
 
-    private IDatagridService datagrid;
     private IPacketService packetService;
     private IControllerRegistryService controllerRegistryService;
 
@@ -153,7 +150,6 @@
                 new ArrayList<Class<? extends IFloodlightService>>();
         dependencies.add(IControllerRegistryService.class);
         dependencies.add(IOnosDeviceService.class);
-        dependencies.add(IDatagridService.class);
         dependencies.add(ITopologyService.class);
         dependencies.add(IPathCalcRuntimeService.class);
         // We don't use the IProxyArpService directly, but reactive forwarding
@@ -165,7 +161,6 @@
 
     @Override
     public void init(FloodlightModuleContext context) {
-        datagrid = context.getServiceImpl(IDatagridService.class);
         controllerRegistryService = context.getServiceImpl(IControllerRegistryService.class);
         topologyService = context.getServiceImpl(ITopologyService.class);
         pathRuntime = context.getServiceImpl(IPathCalcRuntimeService.class);
@@ -181,12 +176,14 @@
 
         topology = topologyService.getTopology();
         intentMap = pathRuntime.getPathIntents();
-        datagrid.addListener("onos.pathintent_state", this, Long.class, IntentStateList.class);
+        intentMap.addChangeListener(this);
     }
 
     @Override
     public void receive(Switch sw, Port inPort, Ethernet eth) {
-        log.debug("Receive PACKET_IN swId {}, portId {}", sw.getDpid(), inPort.getNumber());
+        if (log.isTraceEnabled()) {
+            log.trace("Receive PACKET_IN swId {}, portId {}", sw.getDpid(), inPort.getNumber());
+        }
 
         if (eth.getEtherType() != Ethernet.TYPE_IPV4) {
             // Only handle IPv4 packets right now
@@ -211,7 +208,9 @@
     }
 
     private void handlePacketIn(Switch sw, Port inPort, Ethernet eth) {
-        log.debug("Start handlePacketIn swId {}, portId {}", sw.getDpid(), inPort.getNumber());
+        if (log.isTraceEnabled()) {
+           log.trace("Start handlePacketIn swId {}, portId {}", sw.getDpid(), inPort.getNumber());
+        }
 
         String destinationMac =
                 HexString.toHexString(eth.getDestinationMACAddress());
@@ -313,7 +312,7 @@
                     if (intent instanceof PathIntent) {
                         pathIntent = (PathIntent) intent;
                     } else {
-                        log.debug("Intent {} is not PathIntent. Return.", intent.getId());
+                        log.debug("Intent ID {} is not PathIntent or null. return.", existingFlow.intentId);
                         return;
                     }
 
@@ -404,7 +403,7 @@
 
     private void flowInstalled(PathIntent installedPath) {
         if (log.isTraceEnabled()) {
-            log.trace("Path {} was installed", installedPath.getParentIntent().getId());
+            log.trace("Installed intent ID {}, path {}", installedPath.getParentIntent().getId(), installedPath.getPath());
         }
 
         ShortestPathIntent spfIntent = (ShortestPathIntent) installedPath.getParentIntent();
@@ -464,20 +463,10 @@
     }
 
     @Override
-    public void entryAdded(IntentStateList value) {
-        entryUpdated(value);
-    }
-
-    @Override
-    public void entryRemoved(IntentStateList value) {
-        //no-op
-    }
-
-    @Override
-    public void entryUpdated(IntentStateList value) {
-        for (Entry<String, IntentState> entry : value.entrySet()) {
-            log.debug("path intent key {}, value {}", entry.getKey(), entry.getValue());
-            PathIntent pathIntent = (PathIntent) intentMap.getIntent(entry.getKey());
+    public void intentsChange(LinkedList<ChangedEvent> events) {
+        for (ChangedEvent event : events) {
+            log.debug("path intent ID {}, eventType {}", event.intent.getId() , event.eventType);
+            PathIntent pathIntent = (PathIntent) intentMap.getIntent(event.intent.getId());
             if (pathIntent == null) {
                 continue;
             }
@@ -486,22 +475,31 @@
                 continue;
             }
 
-            IntentState state = entry.getValue();
-            switch (state) {
-                case INST_REQ:
+            switch(event.eventType) {
+                case ADDED:
                     break;
-                case INST_ACK:
-                    flowInstalled(pathIntent);
+                case REMOVED:
                     break;
-                case INST_NACK:
-                    break;
-                case DEL_REQ:
-                    break;
-                case DEL_ACK:
-                    flowRemoved(pathIntent);
-                    break;
-                case DEL_PENDING:
-                    break;
+                case STATE_CHANGED:
+                    IntentState state = pathIntent.getState();
+                    switch (state) {
+                        case INST_REQ:
+                            break;
+                        case INST_ACK:
+                            flowInstalled(pathIntent);
+                            break;
+                        case INST_NACK:
+                            break;
+                        case DEL_REQ:
+                            break;
+                        case DEL_ACK:
+                            flowRemoved(pathIntent);
+                            break;
+                        case DEL_PENDING:
+                            break;
+                        default:
+                            break;
+                    }
                 default:
                     break;
             }
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/IntentStateList.java b/src/main/java/net/onrc/onos/core/intent/runtime/IntentStateList.java
index e354811..2b4946f 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/IntentStateList.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/IntentStateList.java
@@ -1,12 +1,30 @@
 package net.onrc.onos.core.intent.runtime;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import net.onrc.onos.core.intent.Intent.IntentState;
+public class IntentStateList {
+    protected Map<String, IntentState> intentMap;
+    public Set<Long> domainSwitchDpids;
 
-/**
- * @author Toshio Koide (t-koide@onlab.us)
- */
-public class IntentStateList extends HashMap<String, IntentState> {
-    private static final long serialVersionUID = -3674903999581438936L;
+    public IntentStateList() {
+        intentMap = new HashMap<String, IntentState>();
+        domainSwitchDpids = new HashSet<Long>();
+    }
+
+    public IntentState put(String id, IntentState state) {
+        return intentMap.put(id, state);
+    }
+
+    public Set<Entry<String, IntentState>> entrySet() {
+        return intentMap.entrySet();
+    }
+
+    public void clear() {
+        intentMap.clear();
+    }
 }
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PathCalcRuntimeModule.java b/src/main/java/net/onrc/onos/core/intent/runtime/PathCalcRuntimeModule.java
index 76fa263..48937f1 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/PathCalcRuntimeModule.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PathCalcRuntimeModule.java
@@ -8,6 +8,8 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -89,6 +91,7 @@
     private IEventChannel<Long, IntentOperationList> opEventChannel;
     private final ReentrantLock lock = new ReentrantLock();
     private HashSet<LinkEvent> unmatchedLinkEvents = new HashSet<>();
+    private Map<String, Set<Long>> intentInstalledMap = new ConcurrentHashMap<String, Set<Long>>();
     private static final String INTENT_OP_EVENT_CHANNEL_NAME = "onos.pathintent";
     private static final String INTENT_STATE_EVENT_CHANNEL_NAME = "onos.pathintent_state";
     private static final Logger log = LoggerFactory.getLogger(PathCalcRuntimeModule.class);
@@ -397,12 +400,23 @@
                 }
 
                 IntentState state = entry.getValue();
+                log.debug("put the state pathIntentStates ID {}, state {}", entry.getKey(), state);
+
                 switch (state) {
-                    //case INST_REQ:
                     case INST_ACK:
+                        Set<Long> installedDpids = calcInstalledDpids(pathIntent, value.domainSwitchDpids);
+                        if (!isFlowInstalled(pathIntent, installedDpids)) {
+                            break;
+                        }
+                        // FALLTHROUGH
                     case INST_NACK:
-                        //case DEL_REQ:
+                        // FALLTHROUGH
+                    // case INST_REQ:
+                        // FALLTHROUGH
+                    // case DEL_REQ:
+                        // FALLTHROUGH
                     case DEL_ACK:
+                        // FALLTHROUGH
                     case DEL_PENDING:
                         highLevelIntentStates.put(parentIntent.getId(), state);
                         pathIntentStates.put(entry.getKey(), entry.getValue());
@@ -419,4 +433,67 @@
             p.flushLog();
         }
     }
+
+    /***
+     * This function is to check whether the entire path's flow entries are installed or not.
+     * @param pathIntent : The pathIntent to be checked
+     * @param installedDpids : The dpids installed on one ONOS instance
+     * @return The result of whether a pathIntent has been installed or not.
+     */
+    private boolean isFlowInstalled(PathIntent pathIntent, Set<Long> installedDpids) {
+        String parentIntentId = pathIntent.getParentIntent().getId();
+        log.debug("parentIntentId {}", parentIntentId);
+
+        if (intentInstalledMap.containsKey(parentIntentId)) {
+            if (!installedDpids.isEmpty()) {
+                intentInstalledMap.get(parentIntentId).addAll(installedDpids);
+            }
+        } else {
+            // This is the creation of an entry.
+            intentInstalledMap.put(parentIntentId, installedDpids);
+        }
+
+        Set<Long> allSwitchesForPath = new HashSet<Long>();
+        ShortestPathIntent spfIntent = (ShortestPathIntent) pathIntent.getParentIntent();
+
+        for (LinkEvent linkEvent : pathIntent.getPath()) {
+            long sw = linkEvent.getSrc().getDpid();
+            allSwitchesForPath.add(sw);
+        }
+        allSwitchesForPath.add(spfIntent.getDstSwitchDpid());
+
+        if (log.isTraceEnabled()) {
+            log.trace("All switches {}, installed installedDpids {}", allSwitchesForPath, intentInstalledMap.get(parentIntentId));
+        }
+
+        if (allSwitchesForPath.equals(intentInstalledMap.get(parentIntentId))) {
+            intentInstalledMap.remove(parentIntentId);
+            return true;
+        }
+
+        return false;
+    }
+
+    private Set<Long> calcInstalledDpids(PathIntent pathIntent, Set<Long> domainSwitchDpids) {
+        Set<Long> allSwitchesForPath = new HashSet<Long>();
+        ShortestPathIntent spfIntent = (ShortestPathIntent) pathIntent.getParentIntent();
+
+        for (LinkEvent linkEvent : pathIntent.getPath()) {
+            long sw = linkEvent.getSrc().getDpid();
+
+            if (domainSwitchDpids.contains(sw)) {
+                allSwitchesForPath.add(sw);
+            }
+        }
+
+        if (domainSwitchDpids.contains(spfIntent.getDstSwitchDpid())) {
+            allSwitchesForPath.add(spfIntent.getDstSwitchDpid());
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("All switches {}, domain switch dpids {}", allSwitchesForPath, domainSwitchDpids);
+        }
+
+        return allSwitchesForPath;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallModule.java b/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallModule.java
index 15230ec..94e5d8f 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallModule.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallModule.java
@@ -22,7 +22,6 @@
 import net.onrc.onos.core.intent.IntentOperation;
 import net.onrc.onos.core.intent.IntentOperationList;
 import net.onrc.onos.core.topology.ITopologyService;
-//import net.onrc.onos.core.topology.Topology;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +37,6 @@
     private IEventChannel<Long, IntentStateList> intentStateChannel;
     private static final Logger log = LoggerFactory.getLogger(PlanInstallModule.class);
 
-
     private static final String PATH_INTENT_CHANNEL_NAME = "onos.pathintent";
     private static final String INTENT_STATE_EVENT_CHANNEL_NAME = "onos.pathintent_state";
 
@@ -88,14 +86,22 @@
             log("begin_installPlan");
             boolean success = planInstall.installPlan(plan);
             log("end_installPlan");
-
+            Set<Long> domainSwitchDpids = floodlightProvider.getSwitches().keySet();
             log("begin_sendInstallNotif");
-            sendNotifications(intents, true, success);
+            sendNotifications(intents, true, success, domainSwitchDpids);
             log("end_sendInstallNotif");
             log("finish");
         }
 
-        private void sendNotifications(IntentOperationList intents, boolean installed, boolean success) {
+        /***
+         * This function is for sending intent state notification to other ONOS instances.
+         * The argument of "domainSwitchDpids" is required for dispatching this ONOS's managed switches.
+         * @param intents
+         * @param installed
+         * @param success
+         * @param domainSwitchDpids
+         */
+        private void sendNotifications(IntentOperationList intents, boolean installed, boolean success, Set<Long> domainSwitchDpids) {
             IntentStateList states = new IntentStateList();
             for (IntentOperation i : intents) {
                 IntentState newState;
@@ -110,6 +116,9 @@
                     case ADD:
                     default:
                         if (installed) {
+                            if (domainSwitchDpids != null) {
+                                states.domainSwitchDpids.addAll(domainSwitchDpids);
+                            }
                             newState = success ? IntentState.INST_ACK : IntentState.INST_NACK;
                         } else {
                             newState = IntentState.INST_REQ;
@@ -118,7 +127,12 @@
                 }
                 states.put(i.intent.getId(), newState);
             }
-            intentStateChannel.addEntry(key, states);
+
+            if (log.isTraceEnabled()) {
+                log.trace("domainSwitchDpids {}", states.domainSwitchDpids);
+            }
+
+            intentStateChannel.addTransientEntry(key, states);
             // XXX: Send notifications using the same key every time
             // and receive them by entryAdded() and entryUpdated()
             // key += 1;
@@ -138,7 +152,7 @@
         public void entryUpdated(IntentOperationList value) {
             log("start_intentNotifRecv");
             log("begin_sendReceivedNotif");
-            sendNotifications(value, false, false);
+            sendNotifications(value, false, false, null);
             log("end_sendReceivedNotif");
             log("finish");
 
@@ -191,5 +205,4 @@
         // no services, for now
         return null;
     }
-
 }
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java b/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java
index a807bc8..ca7fcaa 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PlanInstallRuntime.java
@@ -64,6 +64,7 @@
             }
         }
 
+        @Override
         public String toString() {
             return "sw:" + sw.getStringId() + ": modify " + modFlows + " delete " + delFlows + " error " + errors;
         }
@@ -102,7 +103,6 @@
         Map<Long, IOFSwitch> switches = provider.getSwitches();
 
         log.debug("IOFSwitches: {}", switches);
-
         FlowModCount.startCount();
         for (Set<FlowEntry> phase : plan) {
             Set<Pair<IOFSwitch, net.onrc.onos.core.util.FlowEntry>> entries = new HashSet<>();
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index 3d7f4aa..c37f3ad 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -4,6 +4,7 @@
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 
 import net.floodlightcontroller.util.MACAddress;
@@ -210,6 +211,7 @@
         kryo.register(IntentOperation.Operator.class);
         kryo.register(IntentOperationList.class);
         kryo.register(IntentStateList.class);
+        kryo.register(HashMap.class);
 
         // Device-related classes
         kryo.register(HashSet.class);