[ONOS-7068] Upgrade to Atomix 2.0.4

Change-Id: I864567084b1ac230254533026f06d4a25e5e0a2d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index e8dde09..b1bae21 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -87,7 +87,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected UpgradeService upgradeService;
 
-    private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 5000;
+    private static final long DEFAULT_ELECTION_TIMEOUT_MILLIS = 250;
     @Property(name = "electionTimeoutMillis", longValue = DEFAULT_ELECTION_TIMEOUT_MILLIS,
             label = "the leader election timeout in milliseconds")
     private long electionTimeoutMillis = DEFAULT_ELECTION_TIMEOUT_MILLIS;
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index 88f30a5..549e73f 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>io.atomix</groupId>
             <artifactId>atomix</artifactId>
-            <version>2.0.2</version>
+            <version>2.0.4</version>
         </dependency>
 
         <dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index ecba56d..45f40ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -20,6 +20,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import io.atomix.protocols.raft.cluster.MemberId;
@@ -27,6 +28,8 @@
 import io.atomix.protocols.raft.protocol.CloseSessionResponse;
 import io.atomix.protocols.raft.protocol.CommandRequest;
 import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.KeepAliveRequest;
 import io.atomix.protocols.raft.protocol.KeepAliveResponse;
 import io.atomix.protocols.raft.protocol.MetadataRequest;
@@ -86,6 +89,16 @@
     }
 
     @Override
+    public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) {
+        clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode);
+    }
+
+    @Override
+    public void unregisterHeartbeatHandler() {
+        clusterCommunicator.removeSubscriber(context.heartbeatSubject);
+    }
+
+    @Override
     public void reset(Collection<MemberId> members, ResetRequest request) {
         Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
         clusterCommunicator.multicast(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 2aecd0b..9f1f1bc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -22,6 +22,7 @@
  */
 final class RaftMessageContext {
     private final String prefix;
+    final MessageSubject heartbeatSubject;
     final MessageSubject openSessionSubject;
     final MessageSubject closeSessionSubject;
     final MessageSubject keepAliveSubject;
@@ -40,6 +41,7 @@
 
     RaftMessageContext(String prefix) {
         this.prefix = prefix;
+        this.heartbeatSubject = getSubject(prefix, "heartbeat");
         this.openSessionSubject = getSubject(prefix, "open");
         this.closeSessionSubject = getSubject(prefix, "close");
         this.keepAliveSubject = getSubject(prefix, "keep-alive");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 097ee46..f58db45 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -29,6 +29,8 @@
 import io.atomix.protocols.raft.protocol.CommandResponse;
 import io.atomix.protocols.raft.protocol.ConfigureRequest;
 import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.InstallRequest;
 import io.atomix.protocols.raft.protocol.InstallResponse;
 import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -153,6 +155,11 @@
     }
 
     @Override
+    public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
+        return sendAndReceive(context.heartbeatSubject, request, memberId);
+    }
+
+    @Override
     public void registerOpenSessionHandler(
             Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
         clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
index 0333b00..8350499 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -39,6 +39,8 @@
 import io.atomix.protocols.raft.protocol.CommandResponse;
 import io.atomix.protocols.raft.protocol.ConfigureRequest;
 import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.InstallRequest;
 import io.atomix.protocols.raft.protocol.InstallResponse;
 import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -106,6 +108,8 @@
             .register(CloseSessionResponse.class)
             .register(KeepAliveRequest.class)
             .register(KeepAliveResponse.class)
+            .register(HeartbeatRequest.class)
+            .register(HeartbeatResponse.class)
             .register(QueryRequest.class)
             .register(QueryResponse.class)
             .register(CommandRequest.class)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 53a7dd1..f5f8a99 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -264,7 +264,8 @@
                 .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
                 .withReadConsistency(ReadConsistency.LINEARIZABLE)
                 .withCommunicationStrategy(CommunicationStrategy.LEADER)
-                .withTimeout(Duration.ofMillis(timeUnit.toMillis(leaderTimeout)))
+                .withMinTimeout(Duration.ofMillis(timeUnit.toMillis(leaderTimeout)))
+                .withMaxTimeout(Duration.ofSeconds(5))
                 .withMaxRetries(5)
                 .build()
                 .open()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 3a15fce..dbe712d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -45,7 +45,7 @@
 
     private static final int MAX_SEGMENT_SIZE = 1024 * 1024 * 64;
     private static final long ELECTION_TIMEOUT_MILLIS = 2500;
-    private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+    private static final long HEARTBEAT_INTERVAL_MILLIS = 250;
 
     private final MemberId localMemberId;
     private final StoragePartition partition;
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
index 8720f45..698f015 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixAtomicCounterMapServiceTest.java
@@ -42,7 +42,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         AtomixAtomicCounterMapService service = new AtomixAtomicCounterMapService();
         service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
index 6563da5..09ac5ec 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapServiceTest.java
@@ -46,7 +46,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         AtomixConsistentMapService service = new AtomixConsistentMapService();
         service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
index fc9d34e..c05ebcb 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapServiceTest.java
@@ -50,7 +50,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         AtomixConsistentSetMultimapService service = new AtomixConsistentSetMultimapService();
         service.put(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
index 36d6874..fa7161f 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixCounterServiceTest.java
@@ -42,7 +42,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         AtomixCounterService service = new AtomixCounterService();
         service.set(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
index 555b82b..fd92e2b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeServiceTest.java
@@ -59,7 +59,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         AtomixDocumentTreeService service = new AtomixDocumentTreeService(ordering);
         service.update(new DefaultCommit<>(
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
index e4c13fb..2d69394 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorServiceTest.java
@@ -15,8 +15,6 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import io.atomix.protocols.raft.ReadConsistency;
 import io.atomix.protocols.raft.cluster.MemberId;
 import io.atomix.protocols.raft.impl.RaftContext;
@@ -34,6 +32,8 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
 import io.atomix.utils.concurrent.ThreadContext;
 import org.junit.Test;
 import org.onosproject.cluster.Leadership;
@@ -58,7 +58,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         DefaultServiceContext context = mock(DefaultServiceContext.class);
         expect(context.serviceType()).andReturn(ServiceType.from(LEADER_ELECTOR.name())).anyTimes();
@@ -85,10 +85,11 @@
                         "test",
                         ServiceType.from(LEADER_ELECTOR.name()),
                         ReadConsistency.LINEARIZABLE,
+                        100,
                         5000,
                         context,
                         server,
-                        mock(ScheduledExecutorService.class)),
+                        new SingleThreadContextFactory(new AtomixThreadFactory())),
                 System.currentTimeMillis()));
 
         try (SnapshotWriter writer = snapshot.openWriter()) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index be24d00..1ade931 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -56,6 +56,8 @@
 import io.atomix.protocols.raft.protocol.CommandResponse;
 import io.atomix.protocols.raft.protocol.ConfigureRequest;
 import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.HeartbeatRequest;
+import io.atomix.protocols.raft.protocol.HeartbeatResponse;
 import io.atomix.protocols.raft.protocol.InstallRequest;
 import io.atomix.protocols.raft.protocol.InstallResponse;
 import io.atomix.protocols.raft.protocol.JoinRequest;
@@ -116,6 +118,8 @@
             .register(CloseSessionResponse.class)
             .register(KeepAliveRequest.class)
             .register(KeepAliveResponse.class)
+            .register(HeartbeatRequest.class)
+            .register(HeartbeatResponse.class)
             .register(QueryRequest.class)
             .register(QueryResponse.class)
             .register(CommandRequest.class)
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
index 6cb8667..6c69abe 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueServiceTest.java
@@ -17,7 +17,6 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.concurrent.ScheduledExecutorService;
 
 import io.atomix.protocols.raft.ReadConsistency;
 import io.atomix.protocols.raft.cluster.MemberId;
@@ -36,6 +35,8 @@
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
 import io.atomix.storage.StorageLevel;
 import io.atomix.time.WallClockTimestamp;
+import io.atomix.utils.concurrent.AtomixThreadFactory;
+import io.atomix.utils.concurrent.SingleThreadContextFactory;
 import io.atomix.utils.concurrent.ThreadContext;
 import org.junit.Test;
 import org.onosproject.store.service.Task;
@@ -60,7 +61,7 @@
                 .withPrefix("test")
                 .withStorageLevel(StorageLevel.MEMORY)
                 .build());
-        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
+        Snapshot snapshot = store.newSnapshot(ServiceId.from(1), "test", 2, new WallClockTimestamp());
 
         DefaultServiceContext context = mock(DefaultServiceContext.class);
         expect(context.serviceType()).andReturn(ServiceType.from(WORK_QUEUE.name())).anyTimes();
@@ -79,10 +80,11 @@
                 "test",
                 ServiceType.from(WORK_QUEUE.name()),
                 ReadConsistency.LINEARIZABLE,
+                100,
                 5000,
                 context,
                 server,
-                mock(ScheduledExecutorService.class));
+                new SingleThreadContextFactory(new AtomixThreadFactory()));
 
         AtomixWorkQueueService service = new AtomixWorkQueueService();
         service.init(context);
diff --git a/features/features.xml b/features/features.xml
index e920ab0..707bcec 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -59,7 +59,7 @@
         <bundle>mvn:com.typesafe/config/1.2.1</bundle>
         <bundle>mvn:com.googlecode.concurrent-trees/concurrent-trees/2.6.0</bundle>
         <bundle>mvn:commons-io/commons-io/2.4</bundle>
-        <bundle>mvn:io.atomix/atomix/2.0.2</bundle>
+        <bundle>mvn:io.atomix/atomix/2.0.4</bundle>
 
         <bundle>mvn:org.glassfish.jersey.core/jersey-client/2.25.1</bundle>
 
diff --git a/lib/BUCK b/lib/BUCK
index 478049d..62ade4c 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Tue, 31 Oct 2017 20:36:56 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Fri, 3 Nov 2017 05:46:03 GMT. Do not edit this file manually. *****
 # ***** Use onos-lib-gen *****
 
 pass_thru_pom(
@@ -210,10 +210,10 @@
 
 remote_jar (
   name = 'atomix',
-  out = 'atomix-2.0.2.jar',
-  url = 'mvn:io.atomix:atomix:jar:2.0.2',
-  sha1 = '9b3b05af6337c35bc69922fb7b535c78f07a5f5b',
-  maven_coords = 'io.atomix:atomix:2.0.2',
+  out = 'atomix-2.0.4.jar',
+  url = 'mvn:io.atomix:atomix:jar:2.0.4',
+  sha1 = 'ad9165144b95174ab8c8ba1ae7481e6ca8a699e3',
+  maven_coords = 'io.atomix:atomix:2.0.4',
   visibility = [ 'PUBLIC' ],
 )
 
diff --git a/lib/deps.json b/lib/deps.json
index ffb46ed..f83ebf6 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -120,7 +120,7 @@
     "aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b32",
     "amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1",
     "asm": "mvn:org.ow2.asm:asm:5.0.4",
-    "atomix": "mvn:io.atomix:atomix:2.0.2",
+    "atomix": "mvn:io.atomix:atomix:2.0.4",
     "commons-codec": "mvn:commons-codec:commons-codec:1.10",
     "commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
     "commons-configuration": "mvn:commons-configuration:commons-configuration:1.10",