Move GossipDeviceStore away from deprecated ClusterCommunicationService API

Change-Id: Ib0ca7125e17013156aac27f8437ca717a96a56f0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index c81d7ab..03a8673 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -15,26 +15,10 @@
  */
 package org.onosproject.store.device.impl;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -81,8 +65,6 @@
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.Timestamped;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -96,10 +78,26 @@
 import org.onosproject.store.service.WallClockTimestamp;
 import org.slf4j.Logger;
 
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Predicates.notNull;
@@ -117,8 +115,13 @@
 import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVED;
 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_UPDATE;
 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE;
+import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_UPDATE;
 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -208,29 +211,15 @@
         backgroundExecutor =
                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));
 
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
-                new InternalDeviceOfflineEventListener(),
-                executor);
-        clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
-                                          new InternalRemoveRequestListener(),
-                                          executor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
-                new InternalDeviceAdvertisementListener(),
-                backgroundExecutor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
-        clusterCommunicator.addSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
+        addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);
+        addSubscriber(DEVICE_OFFLINE, this::handleDeviceOfflineEvent);
+        addSubscriber(DEVICE_REMOVE_REQ, this::handleRemoveRequest);
+        addSubscriber(DEVICE_REMOVED, this::handleDeviceRemovedEvent);
+        addSubscriber(PORT_UPDATE, this::handlePortEvent);
+        addSubscriber(PORT_STATUS_UPDATE, this::handlePortStatusEvent);
+        addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);
+        addSubscriber(DEVICE_INJECTED, this::handleDeviceInjectedEvent);
+        addSubscriber(PORT_INJECTED, this::handlePortInjectedEvent);
 
         // start anti-entropy thread
         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
@@ -261,6 +250,10 @@
         log.info("Started");
     }
 
+    private <M> void addSubscriber(MessageSubject subject, Consumer<M> handler) {
+        clusterCommunicator.addSubscriber(subject, SERIALIZER::decode, handler, executor);
+    }
+
     @Deactivate
     public void deactivate() {
         devicePortStats.removeListener(portStatsListener);
@@ -281,24 +274,15 @@
         devices.clear();
         devicePorts.clear();
         availableDevices.clear();
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_UPDATE);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_REMOVED);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_UPDATE);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.DEVICE_INJECTED);
-        clusterCommunicator.removeSubscriber(
-                GossipDeviceStoreMessageSubjects.PORT_INJECTED);
+        clusterCommunicator.removeSubscriber(DEVICE_UPDATE);
+        clusterCommunicator.removeSubscriber(DEVICE_OFFLINE);
+        clusterCommunicator.removeSubscriber(DEVICE_REMOVE_REQ);
+        clusterCommunicator.removeSubscriber(DEVICE_REMOVED);
+        clusterCommunicator.removeSubscriber(PORT_UPDATE);
+        clusterCommunicator.removeSubscriber(PORT_STATUS_UPDATE);
+        clusterCommunicator.removeSubscriber(DEVICE_ADVERTISE);
+        clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
+        clusterCommunicator.removeSubscriber(PORT_INJECTED);
         log.info("Stopped");
     }
 
@@ -1336,7 +1320,7 @@
     }
 
     private void notifyPeers(InternalDeviceEvent event) {
-        broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+        broadcastMessage(DEVICE_UPDATE, event);
     }
 
     private void notifyPeers(InternalDeviceOfflineEvent event) {
@@ -1357,7 +1341,7 @@
 
     private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
         try {
-            unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
+            unicastMessage(recipient, DEVICE_UPDATE, event);
         } catch (IOException e) {
             log.error("Failed to send" + event + " to " + recipient, e);
         }
@@ -1631,188 +1615,124 @@
         }
     }
 
-    private final class InternalDeviceEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received device update event from peer: {}", message.sender());
-            InternalDeviceEvent event = SERIALIZER.decode(message.payload());
+    private void handleDeviceEvent(InternalDeviceEvent event) {
+        ProviderId providerId = event.providerId();
+        DeviceId deviceId = event.deviceId();
+        Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
 
-            ProviderId providerId = event.providerId();
-            DeviceId deviceId = event.deviceId();
-            Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
-
-            try {
-                notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
-                                                                     deviceDescription));
-            } catch (Exception e) {
-                log.warn("Exception thrown handling device update", e);
-            }
+        try {
+            notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
+                    deviceDescription));
+        } catch (Exception e) {
+            log.warn("Exception thrown handling device update", e);
         }
     }
 
-    private final class InternalDeviceOfflineEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received device offline event from peer: {}", message.sender());
-            InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
+    private void handleDeviceOfflineEvent(InternalDeviceOfflineEvent event) {
+        DeviceId deviceId = event.deviceId();
+        Timestamp timestamp = event.timestamp();
 
-            DeviceId deviceId = event.deviceId();
-            Timestamp timestamp = event.timestamp();
-
-            try {
-                notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
-            } catch (Exception e) {
-                log.warn("Exception thrown handling device offline", e);
-            }
+        try {
+            notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
+        } catch (Exception e) {
+            log.warn("Exception thrown handling device offline", e);
         }
     }
 
-    private final class InternalRemoveRequestListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received device remove request from peer: {}", message.sender());
-            DeviceId did = SERIALIZER.decode(message.payload());
-
-            try {
-                removeDevice(did);
-            } catch (Exception e) {
-                log.warn("Exception thrown handling device remove", e);
-            }
+    private void handleRemoveRequest(DeviceId did) {
+        try {
+            removeDevice(did);
+        } catch (Exception e) {
+            log.warn("Exception thrown handling device remove", e);
         }
     }
 
-    private final class InternalDeviceRemovedEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received device removed event from peer: {}", message.sender());
-            InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
+    private void handleDeviceRemovedEvent(InternalDeviceRemovedEvent event) {
+        DeviceId deviceId = event.deviceId();
+        Timestamp timestamp = event.timestamp();
 
-            DeviceId deviceId = event.deviceId();
-            Timestamp timestamp = event.timestamp();
-
-            try {
-                notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
-            } catch (Exception e) {
-                log.warn("Exception thrown handling device removed", e);
-            }
+        try {
+            notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
+        } catch (Exception e) {
+            log.warn("Exception thrown handling device removed", e);
         }
     }
 
-    private final class InternalPortEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
+    private void handlePortEvent(InternalPortEvent event) {
+        ProviderId providerId = event.providerId();
+        DeviceId deviceId = event.deviceId();
+        Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
 
-            log.debug("Received port update event from peer: {}", message.sender());
-            InternalPortEvent event = SERIALIZER.decode(message.payload());
+        if (getDevice(deviceId) == null) {
+            log.debug("{} not found on this node yet, ignoring.", deviceId);
+            // Note: dropped information will be recovered by anti-entropy
+            return;
+        }
 
-            ProviderId providerId = event.providerId();
-            DeviceId deviceId = event.deviceId();
-            Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
-
-            if (getDevice(deviceId) == null) {
-                log.debug("{} not found on this node yet, ignoring.", deviceId);
-                // Note: dropped information will be recovered by anti-entropy
-                return;
-            }
-
-            try {
-                notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
-            } catch (Exception e) {
-                log.warn("Exception thrown handling port update", e);
-            }
+        try {
+            notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
+        } catch (Exception e) {
+            log.warn("Exception thrown handling port update", e);
         }
     }
 
-    private final class InternalPortStatusEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
+    private void handlePortStatusEvent(InternalPortStatusEvent event) {
+        ProviderId providerId = event.providerId();
+        DeviceId deviceId = event.deviceId();
+        Timestamped<PortDescription> portDescription = event.portDescription();
 
-            log.debug("Received port status update event from peer: {}", message.sender());
-            InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
+        if (getDevice(deviceId) == null) {
+            log.debug("{} not found on this node yet, ignoring.", deviceId);
+            // Note: dropped information will be recovered by anti-entropy
+            return;
+        }
 
-            ProviderId providerId = event.providerId();
-            DeviceId deviceId = event.deviceId();
-            Timestamped<PortDescription> portDescription = event.portDescription();
-
-            if (getDevice(deviceId) == null) {
-                log.debug("{} not found on this node yet, ignoring.", deviceId);
-                // Note: dropped information will be recovered by anti-entropy
-                return;
-            }
-
-            try {
-                notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
-            } catch (Exception e) {
-                log.warn("Exception thrown handling port update", e);
-            }
+        try {
+            notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
+        } catch (Exception e) {
+            log.warn("Exception thrown handling port update", e);
         }
     }
 
-    private final class InternalDeviceAdvertisementListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
-            DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            try {
-                handleAdvertisement(advertisement);
-            } catch (Exception e) {
-                log.warn("Exception thrown handling Device advertisements.", e);
-            }
+    private void handleDeviceAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
+        try {
+            handleAdvertisement(advertisement);
+        } catch (Exception e) {
+            log.warn("Exception thrown handling Device advertisements.", e);
         }
     }
 
-    private final class DeviceInjectedEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received injected device event from peer: {}", message.sender());
-            DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
+    private void handleDeviceInjectedEvent(DeviceInjectedEvent event) {
+        ProviderId providerId = event.providerId();
+        DeviceId deviceId = event.deviceId();
+        DeviceDescription deviceDescription = event.deviceDescription();
+        if (!deviceClockService.isTimestampAvailable(deviceId)) {
+            // workaround for ONOS-1208
+            log.warn("Not ready to accept update. Dropping {}", deviceDescription);
+            return;
+        }
 
-            ProviderId providerId = event.providerId();
-            DeviceId deviceId = event.deviceId();
-            DeviceDescription deviceDescription = event.deviceDescription();
-            if (!deviceClockService.isTimestampAvailable(deviceId)) {
-                // workaround for ONOS-1208
-                log.warn("Not ready to accept update. Dropping {}", deviceDescription);
-                return;
-            }
-
-            try {
-                createOrUpdateDevice(providerId, deviceId, deviceDescription);
-            } catch (Exception e) {
-                log.warn("Exception thrown handling device injected event.", e);
-            }
+        try {
+            createOrUpdateDevice(providerId, deviceId, deviceDescription);
+        } catch (Exception e) {
+            log.warn("Exception thrown handling device injected event.", e);
         }
     }
 
-    private final class PortInjectedEventListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received injected port event from peer: {}", message.sender());
-            PortInjectedEvent event = SERIALIZER.decode(message.payload());
+    private void handlePortInjectedEvent(PortInjectedEvent event) {
+        ProviderId providerId = event.providerId();
+        DeviceId deviceId = event.deviceId();
+        List<PortDescription> portDescriptions = event.portDescriptions();
+        if (!deviceClockService.isTimestampAvailable(deviceId)) {
+            // workaround for ONOS-1208
+            log.warn("Not ready to accept update. Dropping {}", portDescriptions);
+            return;
+        }
 
-            ProviderId providerId = event.providerId();
-            DeviceId deviceId = event.deviceId();
-            List<PortDescription> portDescriptions = event.portDescriptions();
-            if (!deviceClockService.isTimestampAvailable(deviceId)) {
-                // workaround for ONOS-1208
-                log.warn("Not ready to accept update. Dropping {}", portDescriptions);
-                return;
-            }
-
-            try {
-                updatePorts(providerId, deviceId, portDescriptions);
-            } catch (Exception e) {
-                log.warn("Exception thrown handling port injected event.", e);
-            }
+        try {
+            updatePorts(providerId, deviceId, portDescriptions);
+        } catch (Exception e) {
+            log.warn("Exception thrown handling port injected event.", e);
         }
     }
 
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 9c9ef2c..4df97a1 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -17,7 +17,6 @@
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-
 import org.easymock.Capture;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -55,12 +54,12 @@
 import org.onosproject.store.cluster.StaticClusterService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.StorageService;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -70,20 +69,37 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import static java.util.Arrays.asList;
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.*;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
 import static org.onosproject.net.DefaultAnnotations.union;
 import static org.onosproject.net.Device.Type.SWITCH;
 import static org.onosproject.net.DeviceId.deviceId;
-import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_UPDATED;
+import static org.onosproject.net.device.DeviceEvent.Type.PORT_ADDED;
+import static org.onosproject.net.device.DeviceEvent.Type.PORT_REMOVED;
+import static org.onosproject.net.device.DeviceEvent.Type.PORT_UPDATED;
 
 
 // TODO add tests for remote replication
@@ -157,9 +173,6 @@
     @Before
     public void setUp() throws Exception {
         clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
-        clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
-                                          anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
-        expectLastCall().anyTimes();
         replay(clusterCommunicator);
         ClusterService clusterService = new TestClusterService();