Move GossipDeviceStore away from deprecated ClusterCommunicationService API
Change-Id: Ib0ca7125e17013156aac27f8437ca717a96a56f0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index c81d7ab..03a8673 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -15,26 +15,10 @@
*/
package org.onosproject.store.device.impl;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -81,8 +65,6 @@
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -96,10 +78,26 @@
import org.onosproject.store.service.WallClockTimestamp;
import org.slf4j.Logger;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
@@ -117,8 +115,13 @@
import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVED;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_UPDATE;
import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_UPDATE;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -208,29 +211,15 @@
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
- new InternalDeviceOfflineEventListener(),
- executor);
- clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
- new InternalRemoveRequestListener(),
- executor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
- new InternalDeviceAdvertisementListener(),
- backgroundExecutor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
+ addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);
+ addSubscriber(DEVICE_OFFLINE, this::handleDeviceOfflineEvent);
+ addSubscriber(DEVICE_REMOVE_REQ, this::handleRemoveRequest);
+ addSubscriber(DEVICE_REMOVED, this::handleDeviceRemovedEvent);
+ addSubscriber(PORT_UPDATE, this::handlePortEvent);
+ addSubscriber(PORT_STATUS_UPDATE, this::handlePortStatusEvent);
+ addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);
+ addSubscriber(DEVICE_INJECTED, this::handleDeviceInjectedEvent);
+ addSubscriber(PORT_INJECTED, this::handlePortInjectedEvent);
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
@@ -261,6 +250,10 @@
log.info("Started");
}
+ private <M> void addSubscriber(MessageSubject subject, Consumer<M> handler) {
+ clusterCommunicator.addSubscriber(subject, SERIALIZER::decode, handler, executor);
+ }
+
@Deactivate
public void deactivate() {
devicePortStats.removeListener(portStatsListener);
@@ -281,24 +274,15 @@
devices.clear();
devicePorts.clear();
availableDevices.clear();
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_UPDATE);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_REMOVED);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_UPDATE);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.DEVICE_INJECTED);
- clusterCommunicator.removeSubscriber(
- GossipDeviceStoreMessageSubjects.PORT_INJECTED);
+ clusterCommunicator.removeSubscriber(DEVICE_UPDATE);
+ clusterCommunicator.removeSubscriber(DEVICE_OFFLINE);
+ clusterCommunicator.removeSubscriber(DEVICE_REMOVE_REQ);
+ clusterCommunicator.removeSubscriber(DEVICE_REMOVED);
+ clusterCommunicator.removeSubscriber(PORT_UPDATE);
+ clusterCommunicator.removeSubscriber(PORT_STATUS_UPDATE);
+ clusterCommunicator.removeSubscriber(DEVICE_ADVERTISE);
+ clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
+ clusterCommunicator.removeSubscriber(PORT_INJECTED);
log.info("Stopped");
}
@@ -1336,7 +1320,7 @@
}
private void notifyPeers(InternalDeviceEvent event) {
- broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+ broadcastMessage(DEVICE_UPDATE, event);
}
private void notifyPeers(InternalDeviceOfflineEvent event) {
@@ -1357,7 +1341,7 @@
private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
try {
- unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+ unicastMessage(recipient, DEVICE_UPDATE, event);
} catch (IOException e) {
log.error("Failed to send" + event + " to " + recipient, e);
}
@@ -1631,188 +1615,124 @@
}
}
- private final class InternalDeviceEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received device update event from peer: {}", message.sender());
- InternalDeviceEvent event = SERIALIZER.decode(message.payload());
+ private void handleDeviceEvent(InternalDeviceEvent event) {
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
- ProviderId providerId = event.providerId();
- DeviceId deviceId = event.deviceId();
- Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
-
- try {
- notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
- deviceDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling device update", e);
- }
+ try {
+ notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
+ deviceDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device update", e);
}
}
- private final class InternalDeviceOfflineEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received device offline event from peer: {}", message.sender());
- InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
+ private void handleDeviceOfflineEvent(InternalDeviceOfflineEvent event) {
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
- DeviceId deviceId = event.deviceId();
- Timestamp timestamp = event.timestamp();
-
- try {
- notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling device offline", e);
- }
+ try {
+ notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device offline", e);
}
}
- private final class InternalRemoveRequestListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received device remove request from peer: {}", message.sender());
- DeviceId did = SERIALIZER.decode(message.payload());
-
- try {
- removeDevice(did);
- } catch (Exception e) {
- log.warn("Exception thrown handling device remove", e);
- }
+ private void handleRemoveRequest(DeviceId did) {
+ try {
+ removeDevice(did);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device remove", e);
}
}
- private final class InternalDeviceRemovedEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received device removed event from peer: {}", message.sender());
- InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
+ private void handleDeviceRemovedEvent(InternalDeviceRemovedEvent event) {
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
- DeviceId deviceId = event.deviceId();
- Timestamp timestamp = event.timestamp();
-
- try {
- notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling device removed", e);
- }
+ try {
+ notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device removed", e);
}
}
- private final class InternalPortEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
+ private void handlePortEvent(InternalPortEvent event) {
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
- log.debug("Received port update event from peer: {}", message.sender());
- InternalPortEvent event = SERIALIZER.decode(message.payload());
+ if (getDevice(deviceId) == null) {
+ log.debug("{} not found on this node yet, ignoring.", deviceId);
+ // Note: dropped information will be recovered by anti-entropy
+ return;
+ }
- ProviderId providerId = event.providerId();
- DeviceId deviceId = event.deviceId();
- Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
-
- if (getDevice(deviceId) == null) {
- log.debug("{} not found on this node yet, ignoring.", deviceId);
- // Note: dropped information will be recovered by anti-entropy
- return;
- }
-
- try {
- notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
- } catch (Exception e) {
- log.warn("Exception thrown handling port update", e);
- }
+ try {
+ notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
}
}
- private final class InternalPortStatusEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
+ private void handlePortStatusEvent(InternalPortStatusEvent event) {
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<PortDescription> portDescription = event.portDescription();
- log.debug("Received port status update event from peer: {}", message.sender());
- InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
+ if (getDevice(deviceId) == null) {
+ log.debug("{} not found on this node yet, ignoring.", deviceId);
+ // Note: dropped information will be recovered by anti-entropy
+ return;
+ }
- ProviderId providerId = event.providerId();
- DeviceId deviceId = event.deviceId();
- Timestamped<PortDescription> portDescription = event.portDescription();
-
- if (getDevice(deviceId) == null) {
- log.debug("{} not found on this node yet, ignoring.", deviceId);
- // Note: dropped information will be recovered by anti-entropy
- return;
- }
-
- try {
- notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling port update", e);
- }
+ try {
+ notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port update", e);
}
}
- private final class InternalDeviceAdvertisementListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
- DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- try {
- handleAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling Device advertisements.", e);
- }
+ private void handleDeviceAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
+ try {
+ handleAdvertisement(advertisement);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling Device advertisements.", e);
}
}
- private final class DeviceInjectedEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received injected device event from peer: {}", message.sender());
- DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
+ private void handleDeviceInjectedEvent(DeviceInjectedEvent event) {
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ DeviceDescription deviceDescription = event.deviceDescription();
+ if (!deviceClockService.isTimestampAvailable(deviceId)) {
+ // workaround for ONOS-1208
+ log.warn("Not ready to accept update. Dropping {}", deviceDescription);
+ return;
+ }
- ProviderId providerId = event.providerId();
- DeviceId deviceId = event.deviceId();
- DeviceDescription deviceDescription = event.deviceDescription();
- if (!deviceClockService.isTimestampAvailable(deviceId)) {
- // workaround for ONOS-1208
- log.warn("Not ready to accept update. Dropping {}", deviceDescription);
- return;
- }
-
- try {
- createOrUpdateDevice(providerId, deviceId, deviceDescription);
- } catch (Exception e) {
- log.warn("Exception thrown handling device injected event.", e);
- }
+ try {
+ createOrUpdateDevice(providerId, deviceId, deviceDescription);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling device injected event.", e);
}
}
- private final class PortInjectedEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- log.debug("Received injected port event from peer: {}", message.sender());
- PortInjectedEvent event = SERIALIZER.decode(message.payload());
+ private void handlePortInjectedEvent(PortInjectedEvent event) {
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ List<PortDescription> portDescriptions = event.portDescriptions();
+ if (!deviceClockService.isTimestampAvailable(deviceId)) {
+ // workaround for ONOS-1208
+ log.warn("Not ready to accept update. Dropping {}", portDescriptions);
+ return;
+ }
- ProviderId providerId = event.providerId();
- DeviceId deviceId = event.deviceId();
- List<PortDescription> portDescriptions = event.portDescriptions();
- if (!deviceClockService.isTimestampAvailable(deviceId)) {
- // workaround for ONOS-1208
- log.warn("Not ready to accept update. Dropping {}", portDescriptions);
- return;
- }
-
- try {
- updatePorts(providerId, deviceId, portDescriptions);
- } catch (Exception e) {
- log.warn("Exception thrown handling port injected event.", e);
- }
+ try {
+ updatePorts(providerId, deviceId, portDescriptions);
+ } catch (Exception e) {
+ log.warn("Exception thrown handling port injected event.", e);
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 9c9ef2c..4df97a1 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -17,7 +17,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
@@ -55,12 +54,12 @@
import org.onosproject.store.cluster.StaticClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.StorageService;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -70,20 +69,37 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import static java.util.Arrays.asList;
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.net.DefaultAnnotations.union;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.DeviceId.deviceId;
-import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
+import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
+import static org.onosproject.net.device.DeviceEvent.Type.PORT_REMOVED;
+import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
// TODO add tests for remote replication
@@ -157,9 +173,6 @@
@Before
public void setUp() throws Exception {
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
- clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
- anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
- expectLastCall().anyTimes();
replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();