[CORD-1634] Support enable/disable ports on STANDBY nodes.

Change-Id: I6e519acad1f0a425126961c6e88bd64c90574939
diff --git a/core/net/BUCK b/core/net/BUCK
index 383cf54..f89254e 100644
--- a/core/net/BUCK
+++ b/core/net/BUCK
@@ -5,6 +5,7 @@
     '//utils/rest:onlab-rest',
     '//incubator/net:onos-incubator-net',
     '//incubator/store:onos-incubator-store',
+    '//core/store/serializers:onos-core-serializers',
 ]
 
 TEST_DEPS = [
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index de76226..7cad15b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -26,6 +26,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
@@ -67,6 +68,10 @@
 import org.onosproject.net.provider.AbstractProviderService;
 import org.onosproject.net.provider.Provider;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -80,13 +85,16 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Multimaps.newListMultimap;
 import static com.google.common.collect.Multimaps.synchronizedListMultimap;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.MastershipRole.MASTER;
@@ -110,6 +118,7 @@
     private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
     private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
     private static final String PORT_DESC_LIST_NULL = "Port description list cannot be null";
+    private static final String EVENT_NON_MASTER = "Non-master node cannot handle this event";
 
     private final Logger log = getLogger(getClass());
 
@@ -137,7 +146,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigService networkConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
 
+    private ExecutorService portReqeustExecutor;
     /**
      * List of all registered PortConfigOperator.
      */
@@ -153,6 +165,16 @@
     // not part of portOps. must be executed at the end
     private PortAnnotationOperator portAnnotationOp;
 
+    private static final MessageSubject PORT_UPDOWN_SUBJECT =
+            new MessageSubject("port-updown-req");
+
+    private static final Serializer SERIALIZER = Serializer.using(
+            KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(InternalPortUpDownEvent.class)
+                    .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+                    .build("DeviceManager"));
+
     /**
      * Local storage for connectivity status of devices.
      */
@@ -190,6 +212,15 @@
                 log.error("Exception thrown during integrity check", e);
             }
         }, 1, 1, TimeUnit.MINUTES);
+
+        portReqeustExecutor = newSingleThreadExecutor();
+
+        communicationService.<InternalPortUpDownEvent>addSubscriber(
+                PORT_UPDOWN_SUBJECT,
+                SERIALIZER::decode,
+                this::handlePortRequest,
+                portReqeustExecutor);
+
         log.info("Started");
     }
 
@@ -200,6 +231,8 @@
         store.unsetDelegate(delegate);
         mastershipService.removeListener(mastershipListener);
         eventDispatcher.removeSink(DeviceEvent.class);
+        communicationService.removeSubscriber(PORT_UPDOWN_SUBJECT);
+        portReqeustExecutor.shutdown();
         log.info("Stopped");
     }
 
@@ -328,14 +361,19 @@
         }
     }
 
-    @Override
-    public void changePortState(DeviceId deviceId, PortNumber portNumber,
-                                boolean enable) {
+    private void handlePortRequest(InternalPortUpDownEvent event) {
+        DeviceId deviceId = event.deviceId();
         checkNotNull(deviceId, DEVICE_ID_NULL);
         checkNotNull(deviceId, PORT_NUMBER_NULL);
+        checkState(mastershipService.isLocalMaster(deviceId), EVENT_NON_MASTER);
+        changePortStateAtMaster(event.deviceId(), event.portNumber(), event.isEnable());
+    }
+
+    private void changePortStateAtMaster(DeviceId deviceId, PortNumber portNumber,
+                                       boolean enable) {
         DeviceProvider provider = getProvider(deviceId);
         if (provider != null) {
-            log.warn("Port {} on device {} being administratively brought {}",
+            log.info("Port {} on device {} being administratively brought {}",
                      portNumber, deviceId,
                      (enable) ? "UP" : "DOWN");
             provider.changePortState(deviceId, portNumber, enable);
@@ -345,6 +383,31 @@
     }
 
     @Override
+    public void changePortState(DeviceId deviceId, PortNumber portNumber,
+                                boolean enable) {
+        checkNotNull(deviceId, DEVICE_ID_NULL);
+        checkNotNull(deviceId, PORT_NUMBER_NULL);
+        NodeId masterId = mastershipService.getMasterFor(deviceId);
+
+        if (!masterId.equals(localNodeId)) {
+            //Send the request to the master node for the device
+            log.info("Device {} is managed by {}, forwarding the request to the MASTER",
+                     deviceId, masterId);
+            communicationService.unicast(
+                    new InternalPortUpDownEvent(deviceId, portNumber, enable),
+                    PORT_UPDOWN_SUBJECT,
+                    SERIALIZER::encode,
+                    masterId).whenComplete((r, error) -> {
+                if (error != null) {
+                    log.warn("Failed to send packet-updown-req to {}", masterId, error);
+                }
+            });
+        } else {
+            changePortStateAtMaster(deviceId, portNumber, enable);
+        }
+    }
+
+    @Override
     protected DeviceProviderService createProviderService(
             DeviceProvider provider) {
         return new InternalDeviceProviderService(provider);
@@ -1059,4 +1122,35 @@
         return portAnnotationOp.combine(cpt, work);
     }
 
+    /**
+     * Port Enable/Disable message sent to the device's MASTER node.
+     */
+    private class InternalPortUpDownEvent {
+        private final DeviceId deviceId;
+        private final PortNumber portNumber;
+        private final boolean enable;
+
+        protected InternalPortUpDownEvent(
+                DeviceId deviceId, PortNumber portNumber, boolean enable) {
+            this.deviceId = deviceId;
+            this.portNumber = portNumber;
+            this.enable = enable;
+        }
+
+        public DeviceId deviceId() {
+            return deviceId;
+        }
+        public PortNumber portNumber() {
+            return portNumber;
+        }
+        public boolean isEnable() {
+            return enable;
+        }
+
+        protected InternalPortUpDownEvent() {
+            this.deviceId = null;
+            this.portNumber = null;
+            this.enable = false;
+        }
+    }
 }
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index b1bc079..975c90a 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -49,6 +49,7 @@
 import org.onosproject.net.device.PortDescription;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.trivial.SimpleDeviceStore;
 
 import java.util.ArrayList;
@@ -106,6 +107,7 @@
         mgr.termService = mastershipManager;
         mgr.clusterService = new TestClusterService();
         mgr.networkConfigService = new TestNetworkConfigService();
+        mgr.communicationService = new TestClusterCommunicationService();
         mgr.activate();
 
 
@@ -337,4 +339,7 @@
 
     private class TestNetworkConfigService extends NetworkConfigServiceAdapter {
     }
+
+    private class TestClusterCommunicationService extends ClusterCommunicationServiceAdapter {
+    }
 }