ONOS-2322: Support for periodic purging of ECMap tombstones

Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
index 3b4f36c..3d9dc4e 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImplTest.java
@@ -103,7 +103,7 @@
             new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
 
     private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
-    private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
+    private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
 
     /*
      * Serialization is a bit tricky here. We need to serialize in the tests
@@ -144,9 +144,15 @@
         // delegate to our ClusterCommunicationService implementation. This
         // allows us to get a reference to the map's internal cluster message
         // handlers so we can induce events coming in from a peer.
-        clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
+        clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
                 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
-        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
+        clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
+                                                          anyObject(Function.class),
+                                                          anyObject(Function.class),
+                                                          anyObject(Function.class),
+                                                          anyObject(Executor.class));
+        expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
 
         replay(clusterCommunicator);
 
@@ -798,8 +804,16 @@
                 Executor executor) {
             if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
                 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
-            } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
-                antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
+            } else {
+                throw new RuntimeException("Unexpected message subject " + subject.toString());
+            }
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject,
+                Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+            if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
+                antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
             } else {
                 throw new RuntimeException("Unexpected message subject " + subject.toString());
             }