Upgrade Atomix to Raft final version
Change-Id: I834a3db17bca69855901abb967218135d3547bee
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index 56e89b1..c546bda 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.0-raft-beta1</version>
+ <version>2.0.0-raft-final</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 6123d38..2aecd0b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -36,6 +36,7 @@
final MessageSubject pollSubject;
final MessageSubject voteSubject;
final MessageSubject appendSubject;
+ final MessageSubject transferSubject;
RaftMessageContext(String prefix) {
this.prefix = prefix;
@@ -53,6 +54,7 @@
this.pollSubject = getSubject(prefix, "poll");
this.voteSubject = getSubject(prefix, "vote");
this.appendSubject = getSubject(prefix, "append");
+ this.transferSubject = getSubject(prefix, "transfer");
}
private static MessageSubject getSubject(String prefix, String type) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index f345459..9b8f3e6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -50,6 +50,8 @@
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.session.SessionId;
@@ -141,6 +143,11 @@
}
@Override
+ public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
+ return sendAndReceive(context.transferSubject, request, memberId);
+ }
+
+ @Override
public void publish(MemberId memberId, PublishRequest request) {
clusterCommunicator.unicast(request,
context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
@@ -290,6 +297,16 @@
}
@Override
+ public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterTransferHandler() {
+ clusterCommunicator.removeSubscriber(context.transferSubject);
+ }
+
+ @Override
public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index 1c31c55..22f59f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -144,7 +144,7 @@
public void install(SnapshotReader reader) {
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
+ listeners.put(sessionId, sessions().getSession(sessionId));
}
preparedKeys = reader.readObject(serializer()::decode);
map = reader.readObject(serializer()::decode);
@@ -730,7 +730,7 @@
return PrepareResult.OK;
}
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
@@ -752,7 +752,7 @@
this.currentVersion = commit.index();
return commitTransaction(transactionScope);
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
} finally {
discardTombstones();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 44317f3..6478eef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -130,7 +130,7 @@
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
+ listeners.put(sessionId, sessions().getSession(sessionId));
}
backingMap = reader.readObject(serializer::decode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index b87f784..717f362 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -103,7 +103,7 @@
@Override
public Listener read(Kryo kryo, Input input, Class<Listener> type) {
- return new Listener(getSessions().getSession(input.readLong()),
+ return new Listener(sessions().getSession(input.readLong()),
kryo.readObjectOrNull(input, DocumentPath.class));
}
}, Listener.class)
@@ -273,7 +273,7 @@
} catch (NoSuchDocumentPathException e) {
result = DocumentTreeResult.invalidPath();
} catch (Exception e) {
- getLogger().error("Failed to apply {} to state machine", commit.value(), e);
+ logger().error("Failed to apply {} to state machine", commit.value(), e);
throw Throwables.propagate(e);
}
return result;
@@ -365,7 +365,7 @@
return PrepareResult.OK;
}
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
@@ -409,7 +409,7 @@
try {
previousValue = docTree.removeNode(path);
} catch (NoSuchDocumentPathException e) {
- getLogger().info("Value is being inserted first time");
+ logger().info("Value is being inserted first time");
}
if (record.value() != null) {
@@ -454,7 +454,7 @@
try {
return commitTransaction(transactionScope);
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
index f8cfea9..b259cd1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
@@ -88,18 +88,18 @@
writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
writer.writeObject(termCounters, SERIALIZER::encode);
writer.writeObject(elections, SERIALIZER::encode);
- getLogger().debug("Took state machine snapshot");
+ logger().debug("Took state machine snapshot");
}
@Override
public void install(SnapshotReader reader) {
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
+ listeners.put(sessionId, sessions().getSession(sessionId));
}
termCounters = reader.readObject(SERIALIZER::decode);
elections = reader.readObject(SERIALIZER::decode);
- getLogger().debug("Reinstated state machine from snapshot");
+ logger().debug("Reinstated state machine from snapshot");
}
@Override
@@ -176,7 +176,7 @@
}
return newLeadership;
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -196,7 +196,7 @@
notifyLeadershipChange(oldLeadership, newLeadership);
}
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -221,7 +221,7 @@
electionState.leader() != null &&
commit.value().nodeId().equals(electionState.leader().nodeId()));
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -246,7 +246,7 @@
}
return true;
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -270,7 +270,7 @@
});
notifyLeadershipChanges(changes);
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -285,7 +285,7 @@
try {
return leadership(topic);
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -303,7 +303,7 @@
return leader != null && leader.nodeId().equals(nodeId);
}).keySet());
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -319,7 +319,7 @@
result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
return result;
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
index 08a894b..b1d6698 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
@@ -90,7 +90,7 @@
public void install(SnapshotReader reader) {
registeredWorkers = Maps.newHashMap();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
- registeredWorkers.put(sessionId, getSessions().getSession(sessionId));
+ registeredWorkers.put(sessionId, sessions().getSession(sessionId));
}
assignments = reader.readObject(SERIALIZER::decode);
unassignedTasks = reader.readObject(SERIALIZER::decode);
@@ -168,7 +168,7 @@
})
.collect(Collectors.toCollection(ArrayList::new));
} catch (Exception e) {
- getLogger().warn("State machine update failed", e);
+ logger().warn("State machine update failed", e);
throw Throwables.propagate(e);
}
}
@@ -185,7 +185,7 @@
}
});
} catch (Exception e) {
- getLogger().warn("State machine update failed", e);
+ logger().warn("State machine update failed", e);
throw Throwables.propagate(e);
}
}
diff --git a/features/features.xml b/features/features.xml
index 9d576b8..bd36ee4 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -59,7 +59,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>mvn:com.googlecode.concurrent-trees/concurrent-trees/2.6.0</bundle>
<bundle>mvn:commons-io/commons-io/2.4</bundle>
- <bundle>mvn:io.atomix/atomix/2.0.0-raft-beta1</bundle>
+ <bundle>mvn:io.atomix/atomix/2.0.0-raft-final</bundle>
<bundle>mvn:org.glassfish.jersey.core/jersey-client/2.25.1</bundle>
diff --git a/lib/BUCK b/lib/BUCK
index b1f431f..76201f1 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1,4 +1,4 @@
-# ***** This file was auto-generated at Fri, 25 Aug 2017 19:51:50 GMT. Do not edit this file manually. *****
+# ***** This file was auto-generated at Tue, 29 Aug 2017 19:49:54 GMT. Do not edit this file manually. *****
# ***** Use onos-lib-gen *****
pass_thru_pom(
@@ -178,10 +178,10 @@
remote_jar (
name = 'atomix',
- out = 'atomix-2.0.0-raft-beta1.jar',
- url = 'mvn:io.atomix:atomix:jar:2.0.0-raft-beta1',
- sha1 = 'ecae1e33896035aff718e0bdf9030a32ab3f81a1',
- maven_coords = 'io.atomix:atomix:2.0.0-raft-beta1',
+ out = 'atomix-2.0.0-raft-final.jar',
+ url = 'mvn:io.atomix:atomix:jar:2.0.0-raft-final',
+ sha1 = '75ded9852e3d45ca4cbb3976a9ce39062e13fc0a',
+ maven_coords = 'io.atomix:atomix:2.0.0-raft-final',
visibility = [ 'PUBLIC' ],
)
diff --git a/lib/deps.json b/lib/deps.json
index 4c2ee23..e6026b0 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -112,7 +112,7 @@
"aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b32",
"amqp-client": "mvn:com.rabbitmq:amqp-client:jar:3.6.1",
"asm": "mvn:org.ow2.asm:asm:5.0.4",
- "atomix": "mvn:io.atomix:atomix:2.0.0-raft-beta1",
+ "atomix": "mvn:io.atomix:atomix:2.0.0-raft-final",
"commons-codec": "mvn:commons-codec:commons-codec:1.10",
"commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
"commons-configuration": "mvn:commons-configuration:commons-configuration:1.10",