Upgrade Atomix to Raft final version
Change-Id: I834a3db17bca69855901abb967218135d3547bee
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
index 56e89b1..c546bda 100644
--- a/core/store/primitives/pom.xml
+++ b/core/store/primitives/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
- <version>2.0.0-raft-beta1</version>
+ <version>2.0.0-raft-final</version>
</dependency>
<dependency>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
index 6123d38..2aecd0b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -36,6 +36,7 @@
final MessageSubject pollSubject;
final MessageSubject voteSubject;
final MessageSubject appendSubject;
+ final MessageSubject transferSubject;
RaftMessageContext(String prefix) {
this.prefix = prefix;
@@ -53,6 +54,7 @@
this.pollSubject = getSubject(prefix, "poll");
this.voteSubject = getSubject(prefix, "vote");
this.appendSubject = getSubject(prefix, "append");
+ this.transferSubject = getSubject(prefix, "transfer");
}
private static MessageSubject getSubject(String prefix, String type) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index f345459..9b8f3e6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -50,6 +50,8 @@
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.TransferRequest;
+import io.atomix.protocols.raft.protocol.TransferResponse;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.session.SessionId;
@@ -141,6 +143,11 @@
}
@Override
+ public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
+ return sendAndReceive(context.transferSubject, request, memberId);
+ }
+
+ @Override
public void publish(MemberId memberId, PublishRequest request) {
clusterCommunicator.unicast(request,
context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
@@ -290,6 +297,16 @@
}
@Override
+ public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterTransferHandler() {
+ clusterCommunicator.removeSubscriber(context.transferSubject);
+ }
+
+ @Override
public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
index 1c31c55..22f59f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapService.java
@@ -144,7 +144,7 @@
public void install(SnapshotReader reader) {
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(serializer()::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
+ listeners.put(sessionId, sessions().getSession(sessionId));
}
preparedKeys = reader.readObject(serializer()::decode);
map = reader.readObject(serializer()::decode);
@@ -730,7 +730,7 @@
return PrepareResult.OK;
}
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
@@ -752,7 +752,7 @@
this.currentVersion = commit.index();
return commitTransaction(transactionScope);
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
} finally {
discardTombstones();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 44317f3..6478eef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -130,7 +130,7 @@
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(serializer::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
+ listeners.put(sessionId, sessions().getSession(sessionId));
}
backingMap = reader.readObject(serializer::decode);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
index b87f784..717f362 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -103,7 +103,7 @@
@Override
public Listener read(Kryo kryo, Input input, Class<Listener> type) {
- return new Listener(getSessions().getSession(input.readLong()),
+ return new Listener(sessions().getSession(input.readLong()),
kryo.readObjectOrNull(input, DocumentPath.class));
}
}, Listener.class)
@@ -273,7 +273,7 @@
} catch (NoSuchDocumentPathException e) {
result = DocumentTreeResult.invalidPath();
} catch (Exception e) {
- getLogger().error("Failed to apply {} to state machine", commit.value(), e);
+ logger().error("Failed to apply {} to state machine", commit.value(), e);
throw Throwables.propagate(e);
}
return result;
@@ -365,7 +365,7 @@
return PrepareResult.OK;
}
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
@@ -409,7 +409,7 @@
try {
previousValue = docTree.removeNode(path);
} catch (NoSuchDocumentPathException e) {
- getLogger().info("Value is being inserted first time");
+ logger().info("Value is being inserted first time");
}
if (record.value() != null) {
@@ -454,7 +454,7 @@
try {
return commitTransaction(transactionScope);
} catch (Exception e) {
- getLogger().warn("Failure applying {}", commit, e);
+ logger().warn("Failure applying {}", commit, e);
throw Throwables.propagate(e);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
index f8cfea9..b259cd1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
@@ -88,18 +88,18 @@
writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
writer.writeObject(termCounters, SERIALIZER::encode);
writer.writeObject(elections, SERIALIZER::encode);
- getLogger().debug("Took state machine snapshot");
+ logger().debug("Took state machine snapshot");
}
@Override
public void install(SnapshotReader reader) {
listeners = new LinkedHashMap<>();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
- listeners.put(sessionId, getSessions().getSession(sessionId));
+ listeners.put(sessionId, sessions().getSession(sessionId));
}
termCounters = reader.readObject(SERIALIZER::decode);
elections = reader.readObject(SERIALIZER::decode);
- getLogger().debug("Reinstated state machine from snapshot");
+ logger().debug("Reinstated state machine from snapshot");
}
@Override
@@ -176,7 +176,7 @@
}
return newLeadership;
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -196,7 +196,7 @@
notifyLeadershipChange(oldLeadership, newLeadership);
}
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -221,7 +221,7 @@
electionState.leader() != null &&
commit.value().nodeId().equals(electionState.leader().nodeId()));
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -246,7 +246,7 @@
}
return true;
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -270,7 +270,7 @@
});
notifyLeadershipChanges(changes);
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -285,7 +285,7 @@
try {
return leadership(topic);
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -303,7 +303,7 @@
return leader != null && leader.nodeId().equals(nodeId);
}).keySet());
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
@@ -319,7 +319,7 @@
result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
return result;
} catch (Exception e) {
- getLogger().error("State machine operation failed", e);
+ logger().error("State machine operation failed", e);
throw Throwables.propagate(e);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
index 08a894b..b1d6698 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixWorkQueueService.java
@@ -90,7 +90,7 @@
public void install(SnapshotReader reader) {
registeredWorkers = Maps.newHashMap();
for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
- registeredWorkers.put(sessionId, getSessions().getSession(sessionId));
+ registeredWorkers.put(sessionId, sessions().getSession(sessionId));
}
assignments = reader.readObject(SERIALIZER::decode);
unassignedTasks = reader.readObject(SERIALIZER::decode);
@@ -168,7 +168,7 @@
})
.collect(Collectors.toCollection(ArrayList::new));
} catch (Exception e) {
- getLogger().warn("State machine update failed", e);
+ logger().warn("State machine update failed", e);
throw Throwables.propagate(e);
}
}
@@ -185,7 +185,7 @@
}
});
} catch (Exception e) {
- getLogger().warn("State machine update failed", e);
+ logger().warn("State machine update failed", e);
throw Throwables.propagate(e);
}
}