Add support to CordMcast for sending multicast sink ports to remote cluster.

Change-Id: Ib915c68218033e1dcfa6f738a629c2d1d8442261
diff --git a/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentConfig.java b/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentConfig.java
index 704c6a1..6dc5c9d 100644
--- a/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentConfig.java
+++ b/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentConfig.java
@@ -20,11 +20,9 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang.StringUtils;
 import org.onlab.packet.MacAddress;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.Config;
 
 import java.util.Map;
@@ -49,7 +47,7 @@
         return hasOnlyFields(OLTS, AGENT_MAC, VTN_LOCATION) &&
                 isMacAddress(AGENT_MAC, MANDATORY) &&
                 isConnectPoint(VTN_LOCATION, OPTIONAL) &&
-                isValidOlts();
+                areOltsValid();
     }
 
     /**
@@ -61,7 +59,7 @@
         JsonNode olts = node.get(OLTS);
         Map<ConnectPoint, MacAddress> oltMacInfo = Maps.newHashMap();
         olts.fields().forEachRemaining(item -> oltMacInfo.put(
-                new ConnectPoint(subject(), PortNumber.fromString(item.getKey())),
+                ConnectPoint.deviceConnectPoint(item.getKey()),
                 MacAddress.valueOf(item.getValue().asText())));
 
         MacAddress agentMac = MacAddress.valueOf(node.path(AGENT_MAC).asText());
@@ -77,12 +75,13 @@
         return new AccessAgentData(subject(), oltMacInfo, agentMac, vtnLocation);
     }
 
-    private boolean isValidOlts() {
+    private boolean areOltsValid() {
         JsonNode olts = node.get(OLTS);
         if (!olts.isObject()) {
             return false;
         }
-        return !Iterators.any(olts.fields(), item -> !StringUtils.isNumeric(item.getKey()) ||
-                        !isMacAddress((ObjectNode) olts, item.getKey(), MANDATORY));
+        return Iterators.all(olts.fields(),
+                item -> ConnectPoint.deviceConnectPoint(item.getKey()) != null &&
+                        isMacAddress((ObjectNode) olts, item.getKey(), MANDATORY));
     }
 }
diff --git a/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentData.java b/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentData.java
index 8d14dde..de7a342 100644
--- a/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentData.java
+++ b/apps/cordconfig/src/main/java/org/onosproject/cordconfig/access/AccessAgentData.java
@@ -17,12 +17,15 @@
 package org.onosproject.cordconfig.access;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.packet.MacAddress;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -30,32 +33,41 @@
  * Information about an access agent.
  */
 public class AccessAgentData {
+
     private static final String DEVICE_ID_MISSING = "Device ID cannot be null";
     private static final String OLT_INFO_MISSING = "OLT information cannot be null";
     private static final String AGENT_MAC_MISSING = "Agent mac cannot be null";
     private static final String VTN_MISSING = "VTN location cannot be null";
 
+    private static final int CHIP_PORT_RANGE_SIZE = 130;
 
     private final Map<ConnectPoint, MacAddress> oltMacInfo;
     private final MacAddress agentMac;
     private final Optional<ConnectPoint> vtnLocation;
     private final DeviceId deviceId;
 
+    // OLT chip information sorted by ascending MAC address
+    private final List<Pair<ConnectPoint, MacAddress>> sortedOltChips;
 
     /**
-     * Constucts an agent configuration for a given device.
+     * Constructs an agent configuration for a given device.
      *
-     * @param deviceId    access device id
+     * @param deviceId    access device ID
      * @param oltMacInfo  a map of olt chips and their mac address
-     * @param agentMac    the mac address of the agent
+     * @param agentMac    the MAC address of the agent
      * @param vtnLocation the location of the agent
      */
     public AccessAgentData(DeviceId deviceId, Map<ConnectPoint, MacAddress> oltMacInfo,
                            MacAddress agentMac, Optional<ConnectPoint> vtnLocation) {
         this.deviceId = checkNotNull(deviceId, DEVICE_ID_MISSING);
-        this.oltMacInfo = checkNotNull(oltMacInfo, OLT_INFO_MISSING);
+        this.oltMacInfo = ImmutableMap.copyOf(checkNotNull(oltMacInfo, OLT_INFO_MISSING));
         this.agentMac = checkNotNull(agentMac, AGENT_MAC_MISSING);
         this.vtnLocation = checkNotNull(vtnLocation, VTN_MISSING);
+
+        this.sortedOltChips = oltMacInfo.entrySet().stream()
+                .sorted((e1, e2) -> Long.compare(e1.getValue().toLong(), e2.getValue().toLong()))
+                .map(e -> Pair.of(e.getKey(), e.getValue()))
+                .collect(Collectors.toList());
     }
 
     /**
@@ -68,17 +80,17 @@
     }
 
     /**
-     * Returns the mapping of olt chips to mac addresses. Each chip is
+     * Returns the mapping of OLT chips to MAC addresses. Each chip is
      * symbolized by a connect point.
      *
-     * @return a mapping of chips (as connect points) to mac addresses
+     * @return a mapping of chips (as connect points) to MAC addresses
      */
     public Map<ConnectPoint, MacAddress> getOltMacInfo() {
-        return ImmutableMap.copyOf(oltMacInfo);
+        return oltMacInfo;
     }
 
     /**
-     * Reuturns the agents mac address.
+     * Returns the agent's MAC address.
      *
      * @return a mac address
      */
@@ -94,4 +106,21 @@
     public Optional<ConnectPoint> getVtnLocation() {
         return vtnLocation;
     }
+
+    /**
+     * Returns the point where the OLT is connected to the fabric given a
+     * connect point on the agent device.
+     *
+     * @param agentConnectPoint connect point on the agent device
+     * @return point were OLT is connected to fabric
+     */
+    public Optional<ConnectPoint> getOltConnectPoint(ConnectPoint agentConnectPoint) {
+        int index = ((int) agentConnectPoint.port().toLong()) / CHIP_PORT_RANGE_SIZE;
+
+        if (index >= sortedOltChips.size()) {
+            return Optional.empty();
+        }
+
+        return Optional.of(sortedOltChips.get(index).getKey());
+    }
 }
diff --git a/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java b/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
index 19b4e2d..13500dd 100644
--- a/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -37,17 +37,12 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.codec.CodecService;
 import org.onosproject.codec.JsonCodec;
-import org.onosproject.cordconfig.access.AccessDeviceConfig;
+import org.onosproject.cordconfig.access.AccessAgentData;
 import org.onosproject.cordconfig.access.AccessDeviceData;
+import org.onosproject.cordconfig.access.CordConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigEvent;
-import org.onosproject.net.config.NetworkConfigListener;
-import org.onosproject.net.config.NetworkConfigRegistry;
-import org.onosproject.net.config.basics.SubjectFactories;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
@@ -64,7 +59,6 @@
 import org.onosproject.net.mcast.McastRoute;
 import org.onosproject.net.mcast.McastRouteInfo;
 import org.onosproject.net.mcast.MulticastRouteService;
-
 import org.onosproject.rest.AbstractWebResource;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
@@ -80,8 +74,8 @@
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -98,8 +92,9 @@
 @Component(immediate = true)
 public class CordMcast {
 
+    private final Logger log = getLogger(getClass());
 
-    private static final int DEFAULT_REST_TIMEOUT_MS = 2000;
+    private static final int DEFAULT_REST_TIMEOUT_MS = 1000;
     private static final int DEFAULT_PRIORITY = 500;
     private static final short DEFAULT_MCAST_VLAN = 4000;
     private static final String DEFAULT_SYNC_HOST = "10.90.0.8:8181";
@@ -107,8 +102,6 @@
     private static final String DEFAULT_PASSWORD = "karaf";
     private static final boolean DEFAULT_VLAN_ENABLED = true;
 
-    private final Logger log = getLogger(getClass());
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MulticastRouteService mcastService;
 
@@ -125,11 +118,9 @@
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected NetworkConfigRegistry networkConfig;
+    protected CordConfigService cordConfigService;
 
     protected McastListener listener = new InternalMulticastListener();
-    private InternalNetworkConfigListener configListener =
-            new InternalNetworkConfigListener();
 
     //TODO: move this to a ec map
     private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
@@ -162,20 +153,6 @@
 
     private String fabricOnosUrl;
 
-    private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
-
-    private static final Class<AccessDeviceConfig> CONFIG_CLASS =
-            AccessDeviceConfig.class;
-
-    private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory =
-            new ConfigFactory<DeviceId, AccessDeviceConfig>(
-                    SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
-                @Override
-                public AccessDeviceConfig createConfig() {
-                    return new AccessDeviceConfig();
-                }
-            };
-
     @Activate
     public void activate(ComponentContext context) {
         componentConfigService.registerProperties(getClass());
@@ -183,23 +160,8 @@
 
         appId = coreService.registerApplication("org.onosproject.cordmcast");
 
-
         clearRemoteRoutes();
 
-        networkConfig.registerConfigFactory(configFactory);
-        networkConfig.addListener(configListener);
-
-        networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
-                subject -> {
-                    AccessDeviceConfig config = networkConfig.getConfig(subject, AccessDeviceConfig.class);
-                    if (config != null) {
-                        AccessDeviceData data = config.getOlt();
-                        oltData.put(data.deviceId(), data);
-                    }
-                }
-        );
-
-
         mcastService.addListener(listener);
 
         mcastService.getRoutes().stream()
@@ -215,8 +177,6 @@
     public void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
         mcastService.removeListener(listener);
-        networkConfig.unregisterConfigFactory(configFactory);
-        networkConfig.removeListener(configListener);
         log.info("Stopped");
     }
 
@@ -323,9 +283,9 @@
         checkNotNull(route, "Route cannot be null");
         checkNotNull(sink, "Sink cannot be null");
 
-        AccessDeviceData oltInfo = oltData.get(sink.deviceId());
+        Optional<AccessDeviceData> oltInfo = cordConfigService.getAccessDevice(sink.deviceId());
 
-        if (oltInfo == null) {
+        if (!oltInfo.isPresent()) {
             log.warn("Unknown OLT device : {}", sink.deviceId());
             return;
         }
@@ -359,7 +319,7 @@
             flowObjectiveService.next(sink.deviceId(), next);
 
             TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
-                    .matchInPort(oltInfo.uplink())
+                    .matchInPort(oltInfo.get().uplink())
                     .matchEthType(Ethernet.TYPE_IPV4)
                     .matchIPDst(g.toIpPrefix());
 
@@ -420,17 +380,29 @@
             flowObjectiveService.next(sink.deviceId(), next);
         }
 
-
-        addRemoteRoute(route);
+        addRemoteRoute(route, sink);
     }
 
-    private void addRemoteRoute(McastRoute route) {
+    private void addRemoteRoute(McastRoute route, ConnectPoint inPort) {
         checkNotNull(route);
         if (syncHost == null) {
             log.warn("No host configured for synchronization; route will be dropped");
             return;
         }
 
+        Optional<AccessAgentData> accessAgent = cordConfigService.getAccessAgent(inPort.deviceId());
+        if (!accessAgent.isPresent()) {
+            log.warn("No accessAgent config found for in port {}", inPort);
+            return;
+        }
+
+        if (!accessAgent.get().getOltConnectPoint(inPort).isPresent()) {
+            log.warn("No OLT configured for in port {}", inPort);
+            return;
+        }
+
+        ConnectPoint oltConnectPoint = accessAgent.get().getOltConnectPoint(inPort).get();
+
         log.debug("Sending route {} to other ONOS {}", route, fabricOnosUrl);
 
         Invocation.Builder builder = getClientBuilder(fabricOnosUrl);
@@ -440,6 +412,13 @@
 
         try {
             builder.post(Entity.json(json.toString()));
+
+            builder = getClientBuilder(fabricOnosUrl + "/sinks/" + route.group() + "/" + route.source());
+            ObjectMapper mapper = new ObjectMapper();
+            ObjectNode obj = mapper.createObjectNode();
+            obj.putArray("sinks").add(oltConnectPoint.deviceId() + "/" + oltConnectPoint.port());
+
+            builder.post(Entity.json(obj.toString()));
         } catch (ProcessingException e) {
             log.warn("Unable to send route to remote controller: {}", e.getMessage());
         }
@@ -489,7 +468,7 @@
             list.forEach(n -> mcastRoutes.add(
                     routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
 
-        } catch (IOException e) {
+        } catch (IOException | ProcessingException e) {
             log.warn("Error clearing remote routes", e);
         }
 
@@ -508,34 +487,4 @@
         return wt.request(JSON_UTF_8.toString());
     }
 
-    private class InternalNetworkConfigListener implements NetworkConfigListener {
-        @Override
-        public void event(NetworkConfigEvent event) {
-            switch (event.type()) {
-
-                case CONFIG_ADDED:
-                case CONFIG_UPDATED:
-                    AccessDeviceConfig config =
-                            networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
-                    if (config != null) {
-                        oltData.put(config.getOlt().deviceId(), config.getOlt());
-                    }
-
-                    break;
-                case CONFIG_REGISTERED:
-                case CONFIG_UNREGISTERED:
-                    break;
-                case CONFIG_REMOVED:
-                    oltData.remove(event.subject());
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        @Override
-        public boolean isRelevant(NetworkConfigEvent event) {
-            return event.configClass().equals(CONFIG_CLASS);
-        }
-    }
 }