[ONOS-7054] Implement prototype of ISSU protocol
Change-Id: Id543c0de9c97b68f977c824cbc987b35d81beb2d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 78d2d4e..2bb1ea3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,14 +15,12 @@
*/
package org.onosproject.store.cluster.impl;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
@@ -37,13 +35,21 @@
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.CoordinationService;
import org.onosproject.store.service.LeaderElector;
-import org.onosproject.store.service.StorageService;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
+import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
* primitive.
@@ -54,25 +60,41 @@
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
+ private static final char VERSION_SEP = '|';
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
+ protected CoordinationService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UpgradeService upgradeService;
private ExecutorService statusChangeHandler;
private NodeId localNodeId;
private LeaderElector leaderElector;
private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
+ private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
Leadership oldValue = change.oldValue();
Leadership newValue = change.newValue();
+
+ // If the topic is not relevant to this version, skip the event.
+ if (!isLocalTopic(newValue.topic())) {
+ return;
+ }
+
boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
+
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
@@ -83,7 +105,10 @@
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
- notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+ notifyDelegate(new LeadershipEvent(eventType, new Leadership(
+ parseTopic(change.newValue().topic()),
+ change.newValue().leader(),
+ change.newValue().candidates())));
// Update local cache of currently held leaderships
if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
localLeaderCache.put(newValue.topic(), newValue);
@@ -101,18 +126,27 @@
// Service Restored
localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
leaderElector.getLeaderships().forEach((topic, leadership) ->
- notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+ notifyDelegate(new LeadershipEvent(
+ LeadershipEvent.Type.SERVICE_RESTORED,
+ new Leadership(
+ parseTopic(leadership.topic()),
+ leadership.leader(),
+ leadership.candidates()))));
} else if (status == Status.SUSPENDED) {
// Service Suspended
localLeaderCache.forEach((topic, leadership) ->
- notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+ notifyDelegate(new LeadershipEvent(
+ LeadershipEvent.Type.SERVICE_DISRUPTED,
+ new Leadership(
+ parseTopic(leadership.topic()),
+ leadership.leader(),
+ leadership.candidates()))));
} else {
// Should be only inactive state
return;
}
}
-
@Activate
public void activate() {
statusChangeHandler = Executors.newSingleThreadExecutor(
@@ -124,6 +158,7 @@
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
leaderElector.addStatusChangeListener(clientStatusListener);
+ upgradeService.addListener(upgradeListener);
log.info("Started");
}
@@ -131,18 +166,20 @@
public void deactivate() {
leaderElector.removeChangeListener(leadershipChangeListener);
leaderElector.removeStatusChangeListener(clientStatusListener);
+ upgradeService.removeListener(upgradeListener);
statusChangeHandler.shutdown();
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
- return leaderElector.run(topic, localNodeId);
+ leaderElector.run(getLocalTopic(topic), localNodeId);
+ return getLeadership(topic);
}
@Override
public void removeRegistration(String topic) {
- leaderElector.withdraw(topic);
+ leaderElector.withdraw(getLocalTopic(topic));
}
@Override
@@ -152,21 +189,108 @@
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
- return leaderElector.anoint(topic, toNodeId);
+ return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
- return leaderElector.promote(topic, nodeId);
+ return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
}
@Override
public Leadership getLeadership(String topic) {
- return leaderElector.getLeadership(topic);
+ Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
+ return leadership != null ? new Leadership(
+ parseTopic(leadership.topic()),
+ leadership.leader(),
+ leadership.candidates()) : null;
}
@Override
public Map<String, Leadership> getLeaderships() {
- return leaderElector.getLeaderships();
+ Map<String, Leadership> leaderships = leaderElector.getLeaderships();
+ return leaderships.entrySet().stream()
+ .filter(e -> isActiveTopic(e.getKey()))
+ .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
+ e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
+ }
+
+ /**
+ * Returns a leader elector topic namespaced with the local node's version.
+ *
+ * @param topic the base topic
+ * @return a topic string namespaced with the local node's version
+ */
+ private String getLocalTopic(String topic) {
+ return topic + VERSION_SEP + versionService.version();
+ }
+
+ /**
+ * Returns a leader elector topic namespaced with the current cluster version.
+ *
+ * @param topic the base topic
+ * @return a topic string namespaced with the current cluster version
+ */
+ private String getActiveTopic(String topic) {
+ return topic + VERSION_SEP + upgradeService.getVersion();
+ }
+
+ /**
+ * Returns whether the given topic is a topic for the local version.
+ *
+ * @param topic the topic to check
+ * @return whether the given topic is relevant to the local version
+ */
+ private boolean isLocalTopic(String topic) {
+ return topic.endsWith(versionService.version().toString());
+ }
+
+ /**
+ * Returns whether the given topic is a topic for the current cluster version.
+ *
+ * @param topic the topic to check
+ * @return whether the given topic is relevant to the current cluster version
+ */
+ private boolean isActiveTopic(String topic) {
+ return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
+ }
+
+ /**
+ * Parses a topic string, returning the base topic.
+ *
+ * @param topic the topic string to parse
+ * @return the base topic string
+ */
+ private String parseTopic(String topic) {
+ return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
+ }
+
+ /**
+ * Returns the versioned topic for the given node.
+ *
+ * @param topic the topic for the given node
+ * @param nodeId the node for which to return the namespaced topic
+ * @return the versioned topic for the given node
+ */
+ private String getTopicFor(String topic, NodeId nodeId) {
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
+ }
+
+ /**
+ * Internal upgrade event listener.
+ */
+ private class InternalUpgradeEventListener implements UpgradeEventListener {
+ @Override
+ public void event(UpgradeEvent event) {
+ if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
+ // Iterate through all current leaderships for the new version and trigger events.
+ for (Leadership leadership : getLeaderships().values()) {
+ notifyDelegate(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
+ leadership));
+ }
+ }
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
new file mode 100644
index 0000000..aa96131
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
+
+@Component(componentAbstract = true)
+public abstract class AbstractClusterCommunicationManager
+ implements ClusterCommunicationService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+ private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+ private static final String PRIMITIVE_NAME = "clusterCommunication";
+ private static final String SUBJECT_PREFIX = "subject";
+ private static final String ENDPOINT_PREFIX = "endpoint";
+
+ private static final String SERIALIZING = "serialization";
+ private static final String DESERIALIZING = "deserialization";
+ private static final String NODE_PREFIX = "node:";
+ private static final String ROUND_TRIP_SUFFIX = ".rtt";
+ private static final String ONE_WAY_SUFFIX = ".oneway";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
+
+ private NodeId localNodeId;
+
+ /**
+ * Returns the type for the given message subject.
+ *
+ * @param subject the type for the given message subject
+ * @return the message subject
+ */
+ protected abstract String getType(MessageSubject subject);
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+ ).getBytes();
+ return doUnicast(subject, payload, toNodeId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ checkPermission(CLUSTER_WRITE);
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+ .getBytes();
+ nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+ apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+ thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
+ return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e));
+ }
+
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context epContext = endpointMeteringAgent.
+ startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+ MeteringAgent.Context subjectContext = subjectMeteringAgent.
+ startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+ return messagingService.sendAndReceive(nodeEp, getType(subject), payload).
+ whenComplete((bytes, throwable) -> {
+ subjectContext.stop(throwable);
+ epContext.stop(throwable);
+ });
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber,
+ ExecutorService executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalClusterMessageHandler(subscriber),
+ executor);
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.unregisterHandler(getType(subject));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalMessageResponder<>(decoder, encoder, handler));
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ /**
+ * Performs the timed function, returning the value it would while timing the operation.
+ *
+ * @param timedFunction the function to be timed
+ * @param meter the metering agent to be used to time the function
+ * @param opName the opname to be used when starting the meter
+ * @param <A> The param type of the function
+ * @param <B> The return type of the function
+ * @return the value returned by the timed function
+ */
+ private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+ MeteringAgent meter, String opName) {
+ checkNotNull(timedFunction);
+ checkNotNull(meter);
+ checkNotNull(opName);
+ return new Function<A, B>() {
+ @Override
+ public B apply(A a) {
+ final MeteringAgent.Context context = meter.startTimer(opName);
+ B result = null;
+ try {
+ result = timedFunction.apply(a);
+ context.stop(null);
+ return result;
+ } catch (Exception e) {
+ context.stop(e);
+ Throwables.propagate(e);
+ return null;
+ }
+ }
+ };
+ }
+
+
+ private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
+ private ClusterMessageHandler handler;
+
+ public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public byte[] apply(Endpoint sender, byte[] bytes) {
+ ClusterMessage message = ClusterMessage.fromBytes(bytes);
+ handler.handle(message);
+ return message.response();
+ }
+ }
+
+ private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, CompletableFuture<R>> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, CompletableFuture<R>> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
+ return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload())).
+ thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(Endpoint sender, byte[] bytes) {
+ consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload()));
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1bdac37..4602702 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,328 +15,24 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.base.Throwables;
-import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.core.VersionService;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.utils.MeteringAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(immediate = true)
@Service
-public class ClusterCommunicationManager
- implements ClusterCommunicationService {
+public class ClusterCommunicationManager extends AbstractClusterCommunicationManager {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
- private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
-
- private static final String PRIMITIVE_NAME = "clusterCommunication";
- private static final String SUBJECT_PREFIX = "subject";
- private static final String ENDPOINT_PREFIX = "endpoint";
-
- private static final String SERIALIZING = "serialization";
- private static final String DESERIALIZING = "deserialization";
- private static final String NODE_PREFIX = "node:";
- private static final String ROUND_TRIP_SUFFIX = ".rtt";
- private static final String ONE_WAY_SUFFIX = ".oneway";
+ private static final char VERSION_SEP = '-';
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
-
- private NodeId localNodeId;
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
+ private VersionService versionService;
@Override
- public <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
- ).getBytes();
- return doUnicast(subject, payload, toNodeId);
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- @Override
- public <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodes) {
- checkPermission(CLUSTER_WRITE);
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
- .getBytes();
- nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
- }
-
- @Override
- public <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
- apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).
- thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
- return messagingService.sendAsync(nodeEp, subject.value(), payload).whenComplete((r, e) -> context.stop(e));
- }
-
- private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context epContext = endpointMeteringAgent.
- startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
- MeteringAgent.Context subjectContext = subjectMeteringAgent.
- startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
- return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
- whenComplete((bytes, throwable) -> {
- subjectContext.stop(throwable);
- epContext.stop(throwable);
- });
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber,
- ExecutorService executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalClusterMessageHandler(subscriber),
- executor);
- }
-
- @Override
- public void removeSubscriber(MessageSubject subject) {
- checkPermission(CLUSTER_WRITE);
- messagingService.unregisterHandler(subject.value());
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<M, R>(decoder, encoder, m -> {
- CompletableFuture<R> responseFuture = new CompletableFuture<>();
- executor.execute(() -> {
- try {
- responseFuture.complete(handler.apply(m));
- } catch (Exception e) {
- responseFuture.completeExceptionally(e);
- }
- });
- return responseFuture;
- }));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<>(decoder, encoder, handler));
- }
-
- @Override
- public <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalMessageConsumer<>(decoder, handler),
- executor);
- }
-
- /**
- * Performs the timed function, returning the value it would while timing the operation.
- *
- * @param timedFunction the function to be timed
- * @param meter the metering agent to be used to time the function
- * @param opName the opname to be used when starting the meter
- * @param <A> The param type of the function
- * @param <B> The return type of the function
- * @return the value returned by the timed function
- */
- private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
- MeteringAgent meter, String opName) {
- checkNotNull(timedFunction);
- checkNotNull(meter);
- checkNotNull(opName);
- return new Function<A, B>() {
- @Override
- public B apply(A a) {
- final MeteringAgent.Context context = meter.startTimer(opName);
- B result = null;
- try {
- result = timedFunction.apply(a);
- context.stop(null);
- return result;
- } catch (Exception e) {
- context.stop(e);
- Throwables.propagate(e);
- return null;
- }
- }
- };
- }
-
-
- private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
- private ClusterMessageHandler handler;
-
- public InternalClusterMessageHandler(ClusterMessageHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public byte[] apply(Endpoint sender, byte[] bytes) {
- ClusterMessage message = ClusterMessage.fromBytes(bytes);
- handler.handle(message);
- return message.response();
- }
- }
-
- private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
- private final Function<byte[], M> decoder;
- private final Function<R, byte[]> encoder;
- private final Function<M, CompletableFuture<R>> handler;
-
- public InternalMessageResponder(Function<byte[], M> decoder,
- Function<R, byte[]> encoder,
- Function<M, CompletableFuture<R>> handler) {
- this.decoder = decoder;
- this.encoder = encoder;
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
- return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload())).
- thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
- }
- }
-
- private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
- private final Function<byte[], M> decoder;
- private final Consumer<M> consumer;
-
- public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
- this.decoder = decoder;
- this.consumer = consumer;
- }
-
- @Override
- public void accept(Endpoint sender, byte[] bytes) {
- consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload()));
- }
+ protected String getType(MessageSubject subject) {
+ return subject.value() + VERSION_SEP + versionService.version().toString();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
new file mode 100644
index 0000000..45c84af
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+@Component(immediate = true)
+@Service
+public class UnifiedClusterCommunicationManager
+ extends AbstractClusterCommunicationManager
+ implements UnifiedClusterCommunicationService {
+ @Override
+ protected String getType(MessageSubject subject) {
+ return subject.value();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 136df56..5d9f453 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -60,6 +60,7 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
+import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
import com.google.common.base.Objects;
@@ -90,6 +91,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UpgradeService upgradeService;
+
private NodeId localNodeId;
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
@@ -155,8 +159,12 @@
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
- return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
- ? MastershipRole.MASTER : MastershipRole.STANDBY);
+ NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+ List<NodeId> candidates = leadership == null ?
+ ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ MastershipRole role = Objects.equal(localNodeId, leader) ?
+ MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
+ return CompletableFuture.completedFuture(role);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
new file mode 100644
index 0000000..29045a2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.DocumentTreeBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
+import org.slf4j.Logger;
+
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
+ * in the cluster regardless of version.
+ */
+@Service
+@Component(immediate = true)
+public class CoordinationManager implements CoordinationService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PersistenceService persistenceService;
+
+ private StoragePartition partition;
+ private DistributedPrimitiveCreator primitiveCreator;
+
+ @Activate
+ public void activate() {
+ partition = new StoragePartition(
+ new DefaultPartition(
+ PartitionId.SHARED,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet())),
+ null,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") + "/partitions/coordination"));
+ partition.open().join();
+ primitiveCreator = partition.client();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+ clusterCommunicator,
+ persistenceService);
+ }
+
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultConsistentMapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDocumentTreeBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+ return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+ }
+
+ @Override
+ public AtomicCounterBuilder atomicCounterBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicCounterBuilder(primitiveCreator);
+ }
+
+ @Override
+ public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
+ }
+
+ @Override
+ public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+ checkPermission(STORAGE_WRITE);
+ Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
+ () -> this.<String, byte[]>consistentMapBuilder()
+ .withName("onos-atomic-values")
+ .withSerializer(Serializer.using(KryoNamespaces.BASIC));
+ return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
+ }
+
+ @Override
+ public TransactionContextBuilder transactionContextBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LeaderElectorBuilder leaderElectorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultLeaderElectorBuilder(primitiveCreator);
+ }
+
+ @Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newWorkQueue(name, serializer);
+ }
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newAsyncDocumentTree(name, serializer);
+ }
+
+ @Override
+ public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
+ String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newAsyncConsistentSetMultimap(name,
+ serializer);
+ }
+
+ @Override
+ public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
+ String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
+ }
+
+ @Override
+ public <T> Topic<T> getTopic(String name, Serializer serializer) {
+ AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
+ .withName("topic-" + name)
+ .withSerializer(serializer)
+ .build();
+ return new DefaultDistributedTopic<>(atomicValue);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index d15451e..1bfaaed 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -16,11 +16,11 @@
package org.onosproject.store.primitives.impl;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
@@ -38,8 +38,8 @@
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
- private final ClusterService clusterService;
- private final ClusterCommunicationService clusterCommunicator;
+ private final MembershipService clusterService;
+ private final ClusterCommunicator clusterCommunicator;
private String name;
private KryoNamespace serializer;
@@ -64,8 +64,8 @@
* @param clusterCommunicator cluster communication service
* @param persistenceService persistence service
*/
- public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
+ public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
+ ClusterCommunicator clusterCommunicator,
PersistenceService persistenceService) {
this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index bed19d5..461245e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,13 +25,13 @@
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
@@ -86,8 +86,8 @@
private final Map<K, MapValue<V>> items;
- private final ClusterService clusterService;
- private final ClusterCommunicationService clusterCommunicator;
+ private final MembershipService clusterService;
+ private final ClusterCommunicator clusterCommunicator;
private final Serializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
* @param persistenceService persistence service
*/
EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
+ MembershipService clusterService,
+ ClusterCommunicator clusterCommunicator,
KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 500b75c..4a92682 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -42,8 +42,10 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
@@ -51,6 +53,7 @@
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -68,7 +71,7 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
+ protected UnifiedClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService metadataService;
@@ -76,7 +79,14 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UpgradeService upgradeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
+ private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
+ private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@@ -85,17 +95,58 @@
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
currentClusterMetadata.set(metadataService.getClusterMetadata());
metadataService.addListener(metadataListener);
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
- clusterCommunicator,
- clusterService,
- new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
- CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
- .stream()
- .map(StoragePartition::open)
- .toArray(CompletableFuture[]::new));
+ // If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
+ CompletableFuture<Void> openFuture;
+ if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
+ Version sourceVersion = upgradeService.getState().source();
+ Version targetVersion = upgradeService.getState().target();
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ sourceVersion,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + sourceVersion + "/" + partition.getId()))));
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ targetVersion,
+ sourceVersion,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + targetVersion + "/" + partition.getId()))));
+
+ // We have to fork existing partitions before we can start inactive partition servers to
+ // avoid duplicate message handlers when both servers are running.
+ openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new))
+ .thenCompose(v -> CompletableFuture.allOf(inactivePartitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new)));
+ } else {
+ Version version = versionService.version();
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ version,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + version + "/" + partition.getId()))));
+ openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new));
+ }
+
openFuture.join();
log.info("Started");
}
@@ -105,10 +156,13 @@
metadataService.removeListener(metadataListener);
eventDispatcher.removeSink(PartitionEvent.class);
- CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
- .stream()
- .map(StoragePartition::close)
- .toArray(CompletableFuture[]::new));
+ CompletableFuture<Void> closeFuture = CompletableFuture.allOf(
+ CompletableFuture.allOf(inactivePartitions.values().stream()
+ .map(StoragePartition::close)
+ .toArray(CompletableFuture[]::new)),
+ CompletableFuture.allOf(activePartitions.values().stream()
+ .map(StoragePartition::close)
+ .toArray(CompletableFuture[]::new)));
closeFuture.join();
log.info("Stopped");
}
@@ -116,25 +170,25 @@
@Override
public int getNumberOfPartitions() {
checkPermission(PARTITION_READ);
- return partitions.size();
+ return activePartitions.size();
}
@Override
public Set<PartitionId> getAllPartitionIds() {
checkPermission(PARTITION_READ);
- return partitions.keySet();
+ return activePartitions.keySet();
}
@Override
public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
checkPermission(PARTITION_READ);
- return partitions.get(partitionId).client();
+ return activePartitions.get(partitionId).client();
}
@Override
public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
checkPermission(PARTITION_READ);
- StoragePartition partition = partitions.get(partitionId);
+ StoragePartition partition = activePartitions.get(partitionId);
return ImmutableSet.copyOf(partition.getMembers());
}
@@ -148,7 +202,7 @@
@Override
public List<PartitionInfo> partitionInfo() {
- return partitions.values()
+ return activePartitions.values()
.stream()
.flatMap(x -> Tools.stream(x.info()))
.collect(Collectors.toList());
@@ -161,7 +215,7 @@
.values()
.stream()
.filter(PartitionDiff::hasChanged)
- .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+ .forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
}
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@@ -173,7 +227,7 @@
@Override
public List<PartitionClientInfo> partitionClientInfo() {
- return partitions.values()
+ return activePartitions.values()
.stream()
.map(StoragePartition::client)
.map(StoragePartitionClient::clientInfo)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..8bae2a3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,8 +40,7 @@
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@@ -50,10 +49,10 @@
public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol {
public RaftClientCommunicator(
- PartitionId partitionId,
+ String prefix,
Serializer serializer,
- ClusterCommunicationService clusterCommunicator) {
- super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+ ClusterCommunicator clusterCommunicator) {
+ super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 1117ab9..765eb02 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.cluster.MemberId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
public abstract class RaftCommunicator {
protected final RaftMessageContext context;
protected final Serializer serializer;
- protected final ClusterCommunicationService clusterCommunicator;
+ protected final ClusterCommunicator clusterCommunicator;
public RaftCommunicator(
RaftMessageContext context,
Serializer serializer,
- ClusterCommunicationService clusterCommunicator) {
+ ClusterCommunicator clusterCommunicator) {
this.context = checkNotNull(context, "context cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..2710a2c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -56,8 +56,8 @@
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@@ -66,10 +66,10 @@
public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
public RaftServerCommunicator(
- PartitionId partitionId,
+ String prefix,
Serializer serializer,
- ClusterCommunicationService clusterCommunicator) {
- super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+ ClusterCommunicator clusterCommunicator) {
+ super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 822641c..59c4b17 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,10 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
-import static org.slf4j.LoggerFactory.getLogger;
-
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -26,25 +22,26 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicCounterMapBuilder;
import org.onosproject.store.service.AtomicIdGeneratorBuilder;
@@ -68,7 +65,9 @@
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
-import com.google.common.collect.Maps;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -82,10 +81,10 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
+ protected UnifiedClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
+ protected UnifiedClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
@@ -105,7 +104,7 @@
public void activate() {
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
- .filter(id -> !id.equals(PartitionId.from(0)))
+ .filter(id -> !id.equals(PartitionId.SHARED))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
transactionManager = new TransactionManager(this, partitionService, BUCKETS);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 842e047..414c49c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,11 +29,12 @@
import com.google.common.collect.ImmutableMap;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -53,8 +54,11 @@
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final ClusterCommunicationService clusterCommunicator;
- private final File logFolder;
+ private final UnifiedClusterCommunicationService clusterCommunicator;
+ private final MembershipService clusterService;
+ private final Version version;
+ private final Version source;
+ private final File dataFolder;
private Partition partition;
private NodeId localNodeId;
private StoragePartitionServer server;
@@ -77,14 +81,20 @@
() -> new AtomixDocumentTreeService(Ordering.INSERTION))
.build();
- public StoragePartition(Partition partition,
- ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService,
- File logFolder) {
+ public StoragePartition(
+ Partition partition,
+ Version version,
+ Version source,
+ UnifiedClusterCommunicationService clusterCommunicator,
+ MembershipService clusterService,
+ File dataFolder) {
this.partition = partition;
+ this.version = version;
+ this.source = source;
this.clusterCommunicator = clusterCommunicator;
+ this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
- this.logFolder = logFolder;
+ this.dataFolder = dataFolder;
}
/**
@@ -97,7 +107,12 @@
@Override
public CompletableFuture<Void> open() {
- if (partition.getMembers().contains(localNodeId)) {
+ if (source != null) {
+ return forkServer(source)
+ .thenCompose(v -> openClient())
+ .thenAccept(v -> isOpened.set(true))
+ .thenApply(v -> null);
+ } else if (partition.getMembers().contains(localNodeId)) {
return openServer()
.thenCompose(v -> openClient())
.thenAccept(v -> isOpened.set(true))
@@ -116,6 +131,43 @@
}
/**
+ * Returns the partition name.
+ *
+ * @return the partition name
+ */
+ public String getName() {
+ return getName(version);
+ }
+
+ /**
+ * Returns the partition name for the given version.
+ *
+ * @param version the version for which to return the partition name
+ * @return the partition name for the given version
+ */
+ String getName(Version version) {
+ return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
+ }
+
+ /**
+ * Returns the partition version.
+ *
+ * @return the partition version
+ */
+ public Version getVersion() {
+ return version;
+ }
+
+ /**
+ * Returns the partition data folder.
+ *
+ * @return the partition data folder
+ */
+ public File getDataFolder() {
+ return dataFolder;
+ }
+
+ /**
* Returns the identifier of the {@link Partition partition} associated with this instance.
* @return partition identifier
*/
@@ -136,7 +188,23 @@
* @return partition member identifiers
*/
public Collection<MemberId> getMemberIds() {
- return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+ return source != null ?
+ clusterService.getNodes()
+ .stream()
+ .map(node -> MemberId.from(node.id().id()))
+ .collect(Collectors.toList()) :
+ Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+ }
+
+ Collection<MemberId> getMemberIds(Version version) {
+ if (source == null || version.equals(source)) {
+ return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+ } else {
+ return clusterService.getNodes()
+ .stream()
+ .map(node -> MemberId.from(node.id().id()))
+ .collect(Collectors.toList());
+ }
}
/**
@@ -144,20 +212,37 @@
* @return future that is completed after the operation is complete
*/
private CompletableFuture<Void> openServer() {
- if (!partition.getMembers().contains(localNodeId) || server != null) {
- return CompletableFuture.completedFuture(null);
- }
- StoragePartitionServer server = new StoragePartitionServer(this,
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
MemberId.from(localNodeId.id()),
- () -> new RaftServerCommunicator(
- partition.getId(),
- Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
- clusterCommunicator),
- logFolder);
+ clusterCommunicator);
return server.open().thenRun(() -> this.server = server);
}
/**
+ * Forks the server from the given version.
+ *
+ * @return future to be completed once the server has been forked
+ */
+ private CompletableFuture<Void> forkServer(Version version) {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+
+ CompletableFuture<Void> future;
+ if (clusterService.getNodes().size() == 1) {
+ future = server.fork(version);
+ } else {
+ future = server.join(clusterService.getNodes().stream()
+ .filter(node -> !node.id().equals(localNodeId))
+ .map(node -> MemberId.from(node.id().id()))
+ .collect(Collectors.toList()));
+ }
+ return future.thenRun(() -> this.server = server);
+ }
+
+ /**
* Attempts to join the partition as a new member.
* @return future that is completed after the operation is complete
*/
@@ -168,11 +253,7 @@
.collect(Collectors.toSet());
StoragePartitionServer server = new StoragePartitionServer(this,
MemberId.from(localNodeId.id()),
- () -> new RaftServerCommunicator(
- partition.getId(),
- Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
- clusterCommunicator),
- logFolder);
+ clusterCommunicator);
return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
.thenRun(() -> this.server = server);
}
@@ -181,7 +262,7 @@
client = new StoragePartitionClient(this,
MemberId.from(localNodeId.id()),
new RaftClientCommunicator(
- partition.getId(),
+ getName(),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator));
return client.open().thenApply(v -> client);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..96e5140 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -92,7 +92,6 @@
log.info("Failed to start client for partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
-
}
@Override
@@ -316,7 +315,7 @@
private RaftClient newRaftClient(RaftClientProtocol protocol) {
return RaftClient.newBuilder()
.withClientId("partition-" + partition.getId())
- .withMemberId(MemberId.from(localMemberId.id()))
+ .withMemberId(localMemberId)
.withProtocol(protocol)
.build();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 04c0bdf..1bed89d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -99,7 +99,7 @@
public PartitionInfo toPartitionInfo() {
Function<RaftMember, String> memberToString =
m -> m == null ? "none" : m.memberId().toString();
- return new PartitionInfo(partitionId.toString(),
+ return new PartitionInfo(partitionId,
leaderTerm,
activeMembers.stream().map(memberToString).collect(Collectors.toList()),
memberToString.apply(leader));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..123c3b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -16,16 +16,19 @@
package org.onosproject.store.primitives.impl;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@@ -46,23 +49,21 @@
private final MemberId localMemberId;
private final StoragePartition partition;
- private final Supplier<RaftServerProtocol> protocol;
- private final File dataFolder;
+ private final UnifiedClusterCommunicationService clusterCommunicator;
private RaftServer server;
public StoragePartitionServer(
StoragePartition partition,
MemberId localMemberId,
- Supplier<RaftServerProtocol> protocol,
- File dataFolder) {
+ UnifiedClusterCommunicationService clusterCommunicator) {
this.partition = partition;
this.localMemberId = localMemberId;
- this.protocol = protocol;
- this.dataFolder = dataFolder;
+ this.clusterCommunicator = clusterCommunicator;
}
@Override
public CompletableFuture<Void> open() {
+ log.info("Starting server for partition {} ({})", partition.getId(), partition.getVersion());
CompletableFuture<RaftServer> serverOpenFuture;
if (partition.getMemberIds().contains(localMemberId)) {
if (server != null && server.isRunning()) {
@@ -77,9 +78,11 @@
}
return serverOpenFuture.whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully started server for partition {}", partition.getId());
+ log.info("Successfully started server for partition {} ({})",
+ partition.getId(), partition.getVersion());
} else {
- log.info("Failed to start server for partition {}", partition.getId(), e);
+ log.info("Failed to start server for partition {} ({})",
+ partition.getId(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
@@ -97,16 +100,68 @@
return server.leave();
}
- private RaftServer buildServer() {
+ /**
+ * Forks the existing partition into a new partition.
+ *
+ * @param version the version from which to fork the server
+ * @return future to be completed once the fork operation is complete
+ */
+ public CompletableFuture<Void> fork(Version version) {
+ log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName("partition-" + partition.getId())
- .withProtocol(protocol.get())
+ .withName(partition.getName(version))
+ .withType(RaftMember.Type.PASSIVE)
+ .withProtocol(new RaftServerCommunicator(
+ partition.getName(version),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
- .withDirectory(dataFolder)
+ .withDirectory(partition.getDataFolder())
+ .withMaxSegmentSize(MAX_SEGMENT_SIZE)
+ .build());
+ StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+ RaftServer server = builder.build();
+ return server.join(partition.getMemberIds(version))
+ .thenCompose(v -> server.shutdown())
+ .thenCompose(v -> {
+ // Delete the cluster configuration file from the forked partition.
+ try {
+ Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+ } catch (IOException e) {
+ log.error("Failed to delete partition configuration: {}", e);
+ }
+
+ // Build and bootstrap a new server.
+ this.server = buildServer();
+ return this.server.bootstrap();
+ }).whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully forked server for partition {} ({}->{})",
+ partition.getId(), version, partition.getVersion());
+ } else {
+ log.info("Failed to fork server for partition {} ({}->{})",
+ partition.getId(), version, partition.getVersion(), e);
+ }
+ }).thenApply(v -> null);
+ }
+
+ private RaftServer buildServer() {
+ RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
+ .withName(partition.getName())
+ .withProtocol(new RaftServerCommunicator(
+ partition.getName(),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator))
+ .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
+ .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
+ .withStorage(RaftStorage.newBuilder()
+ .withStorageLevel(StorageLevel.MAPPED)
+ .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
+ .withDirectory(partition.getDataFolder())
.withMaxSegmentSize(MAX_SEGMENT_SIZE)
.build());
StoragePartition.RAFT_SERVICES.forEach(builder::addService);
@@ -114,12 +169,13 @@
}
public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
+ log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully joined partition {}", partition.getId());
+ log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
} else {
- log.info("Failed to join partition {}", partition.getId(), e);
+ log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 6ed8ec3..be24d00 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -98,7 +98,6 @@
import org.junit.Before;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.impl.RaftClientCommunicator;
import org.onosproject.store.primitives.impl.RaftServerCommunicator;
import org.onosproject.store.service.Serializer;
@@ -382,7 +381,7 @@
RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
.withType(member.getType())
.withProtocol(new RaftServerCommunicator(
- PartitionId.from(1),
+ "partition-1",
PROTOCOL_SERIALIZER,
communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
.withStorage(RaftStorage.newBuilder()
@@ -406,7 +405,7 @@
RaftClient client = RaftClient.newBuilder()
.withMemberId(memberId)
.withProtocol(new RaftClientCommunicator(
- PartitionId.from(1),
+ "partition-1",
PROTOCOL_SERIALIZER,
communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
.build();
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 2de7dcc..d5deae1 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -250,6 +250,7 @@
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
import org.onosproject.ui.model.topo.UiTopoLayoutId;
+import org.onosproject.upgrade.Upgrade;
import java.net.URI;
import java.time.Duration;
@@ -627,6 +628,8 @@
PiCriterion.class,
PiInstruction.class
)
+ .register(Upgrade.class)
+ .register(Upgrade.Status.class)
.build("API");
/**