[CORD-1634] Support enable/disable ports on STANDBY nodes.
Change-Id: I6e519acad1f0a425126961c6e88bd64c90574939
diff --git a/core/net/BUCK b/core/net/BUCK
index 383cf54..f89254e 100644
--- a/core/net/BUCK
+++ b/core/net/BUCK
@@ -5,6 +5,7 @@
'//utils/rest:onlab-rest',
'//incubator/net:onos-incubator-net',
'//incubator/store:onos-incubator-store',
+ '//core/store/serializers:onos-core-serializers',
]
TEST_DEPS = [
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index de76226..7cad15b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -26,6 +26,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
@@ -67,6 +68,10 @@
import org.onosproject.net.provider.AbstractProviderService;
import org.onosproject.net.provider.Provider;
import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import java.util.Collection;
@@ -80,13 +85,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Multimaps.newListMultimap;
import static com.google.common.collect.Multimaps.synchronizedListMultimap;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.MastershipRole.MASTER;
@@ -110,6 +118,7 @@
private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
private static final String PORT_DESC_LIST_NULL = "Port description list cannot be null";
+ private static final String EVENT_NON_MASTER = "Non-master node cannot handle this event";
private final Logger log = getLogger(getClass());
@@ -137,7 +146,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigService networkConfigService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService communicationService;
+ private ExecutorService portReqeustExecutor;
/**
* List of all registered PortConfigOperator.
*/
@@ -153,6 +165,16 @@
// not part of portOps. must be executed at the end
private PortAnnotationOperator portAnnotationOp;
+ private static final MessageSubject PORT_UPDOWN_SUBJECT =
+ new MessageSubject("port-updown-req");
+
+ private static final Serializer SERIALIZER = Serializer.using(
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(InternalPortUpDownEvent.class)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .build("DeviceManager"));
+
/**
* Local storage for connectivity status of devices.
*/
@@ -190,6 +212,15 @@
log.error("Exception thrown during integrity check", e);
}
}, 1, 1, TimeUnit.MINUTES);
+
+ portReqeustExecutor = newSingleThreadExecutor();
+
+ communicationService.<InternalPortUpDownEvent>addSubscriber(
+ PORT_UPDOWN_SUBJECT,
+ SERIALIZER::decode,
+ this::handlePortRequest,
+ portReqeustExecutor);
+
log.info("Started");
}
@@ -200,6 +231,8 @@
store.unsetDelegate(delegate);
mastershipService.removeListener(mastershipListener);
eventDispatcher.removeSink(DeviceEvent.class);
+ communicationService.removeSubscriber(PORT_UPDOWN_SUBJECT);
+ portReqeustExecutor.shutdown();
log.info("Stopped");
}
@@ -328,14 +361,19 @@
}
}
- @Override
- public void changePortState(DeviceId deviceId, PortNumber portNumber,
- boolean enable) {
+ private void handlePortRequest(InternalPortUpDownEvent event) {
+ DeviceId deviceId = event.deviceId();
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(deviceId, PORT_NUMBER_NULL);
+ checkState(mastershipService.isLocalMaster(deviceId), EVENT_NON_MASTER);
+ changePortStateAtMaster(event.deviceId(), event.portNumber(), event.isEnable());
+ }
+
+ private void changePortStateAtMaster(DeviceId deviceId, PortNumber portNumber,
+ boolean enable) {
DeviceProvider provider = getProvider(deviceId);
if (provider != null) {
- log.warn("Port {} on device {} being administratively brought {}",
+ log.info("Port {} on device {} being administratively brought {}",
portNumber, deviceId,
(enable) ? "UP" : "DOWN");
provider.changePortState(deviceId, portNumber, enable);
@@ -345,6 +383,31 @@
}
@Override
+ public void changePortState(DeviceId deviceId, PortNumber portNumber,
+ boolean enable) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(deviceId, PORT_NUMBER_NULL);
+ NodeId masterId = mastershipService.getMasterFor(deviceId);
+
+ if (!masterId.equals(localNodeId)) {
+ //Send the request to the master node for the device
+ log.info("Device {} is managed by {}, forwarding the request to the MASTER",
+ deviceId, masterId);
+ communicationService.unicast(
+ new InternalPortUpDownEvent(deviceId, portNumber, enable),
+ PORT_UPDOWN_SUBJECT,
+ SERIALIZER::encode,
+ masterId).whenComplete((r, error) -> {
+ if (error != null) {
+ log.warn("Failed to send packet-updown-req to {}", masterId, error);
+ }
+ });
+ } else {
+ changePortStateAtMaster(deviceId, portNumber, enable);
+ }
+ }
+
+ @Override
protected DeviceProviderService createProviderService(
DeviceProvider provider) {
return new InternalDeviceProviderService(provider);
@@ -1059,4 +1122,35 @@
return portAnnotationOp.combine(cpt, work);
}
+ /**
+ * Port Enable/Disable message sent to the device's MASTER node.
+ */
+ private class InternalPortUpDownEvent {
+ private final DeviceId deviceId;
+ private final PortNumber portNumber;
+ private final boolean enable;
+
+ protected InternalPortUpDownEvent(
+ DeviceId deviceId, PortNumber portNumber, boolean enable) {
+ this.deviceId = deviceId;
+ this.portNumber = portNumber;
+ this.enable = enable;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+ public PortNumber portNumber() {
+ return portNumber;
+ }
+ public boolean isEnable() {
+ return enable;
+ }
+
+ protected InternalPortUpDownEvent() {
+ this.deviceId = null;
+ this.portNumber = null;
+ this.enable = false;
+ }
+ }
}
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index b1bc079..975c90a 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -49,6 +49,7 @@
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.trivial.SimpleDeviceStore;
import java.util.ArrayList;
@@ -106,6 +107,7 @@
mgr.termService = mastershipManager;
mgr.clusterService = new TestClusterService();
mgr.networkConfigService = new TestNetworkConfigService();
+ mgr.communicationService = new TestClusterCommunicationService();
mgr.activate();
@@ -337,4 +339,7 @@
private class TestNetworkConfigService extends NetworkConfigServiceAdapter {
}
+
+ private class TestClusterCommunicationService extends ClusterCommunicationServiceAdapter {
+ }
}