[ONOS-7054] Implement prototype of ISSU protocol
Change-Id: Id543c0de9c97b68f977c824cbc987b35d81beb2d
diff --git a/cli/src/main/java/org/onosproject/cli/IssuCommand.java b/cli/src/main/java/org/onosproject/cli/IssuCommand.java
new file mode 100644
index 0000000..1f0cb36
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/IssuCommand.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.upgrade.UpgradeAdminService;
+import org.onosproject.upgrade.UpgradeService;
+
+/**
+ * Commands for managing upgrades.
+ */
+@Command(scope = "onos", name = "issu",
+ description = "Manages upgrades")
+public class IssuCommand extends AbstractShellCommand {
+
+ static final String INIT = "init";
+ static final String UPGRADE = "upgrade";
+ static final String COMMIT = "commit";
+ static final String ROLLBACK = "rollback";
+ static final String RESET = "reset";
+ static final String STATUS = "status";
+ static final String VERSION = "version";
+
+ @Argument(index = 0, name = "command",
+ description = "Command name (init|upgrade|commit|rollback|status|version)",
+ required = false, multiValued = false)
+ String command = null;
+
+ @Override
+ protected void execute() {
+ UpgradeService upgradeService = get(UpgradeService.class);
+ UpgradeAdminService upgradeAdminService = get(UpgradeAdminService.class);
+ if (command == null) {
+ print("source=%s, target=%s, status=%s, upgraded=%b, active=%b",
+ upgradeService.getState().source(),
+ upgradeService.getState().target(),
+ upgradeService.getState().status(),
+ upgradeService.isLocalUpgraded(),
+ upgradeService.isLocalActive());
+ } else if (command.equals(INIT)) {
+ upgradeAdminService.initialize();
+ print("Initialized");
+ } else if (command.equals(UPGRADE)) {
+ upgradeAdminService.upgrade();
+ print("Upgraded");
+ } else if (command.equals(COMMIT)) {
+ upgradeAdminService.commit();
+ print("Committed version %s", upgradeService.getVersion());
+ } else if (command.equals(ROLLBACK)) {
+ upgradeAdminService.rollback();
+ print("Rolled back to version %s", upgradeService.getVersion());
+ } else if (command.equals(RESET)) {
+ upgradeAdminService.reset();
+ print("Reset version %s", upgradeService.getVersion());
+ } else if (command.equals(STATUS)) {
+ print("%s", upgradeService.getState().status());
+ } else if (command.equals(VERSION)) {
+ print("%s", upgradeService.getVersion());
+ } else {
+ print("Unsupported command: %s", command);
+ }
+ }
+
+}
diff --git a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
index bf53219..5a55c99 100644
--- a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
@@ -22,8 +22,8 @@
import org.apache.karaf.shell.commands.Command;
import org.joda.time.DateTime;
import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipAdminService;
import org.onosproject.core.Version;
import org.onosproject.utils.Comparators;
@@ -44,7 +44,7 @@
@Override
protected void execute() {
- ClusterAdminService service = get(ClusterAdminService.class);
+ MembershipAdminService service = get(MembershipAdminService.class);
List<ControllerNode> nodes = newArrayList(service.getNodes());
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
if (outputJson()) {
@@ -68,7 +68,7 @@
}
// Produces JSON structure.
- private JsonNode json(ClusterAdminService service, List<ControllerNode> nodes) {
+ private JsonNode json(MembershipAdminService service, List<ControllerNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
ControllerNode self = service.getLocalNode();
diff --git a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
index 0922d39..c9df46a 100644
--- a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
@@ -66,7 +66,7 @@
boolean first = true;
for (String member : Ordering.natural().sortedCopy(info.members())) {
if (first) {
- print(SERVER_FMT, info.name(), info.term(), member,
+ print(SERVER_FMT, info.id(), info.term(), member,
member.equals(info.leader()) ? "*" : "");
first = false;
} else {
@@ -130,7 +130,7 @@
info.members().forEach(members::add);
// Complete the partition attributes and add it to the array
- partition.put("name", info.name())
+ partition.put("name", info.id().toString())
.put("term", info.term())
.put("leader", info.leader());
partitions.add(partition);
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 1ffeed4..9e176b6 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -32,6 +32,11 @@
<command>
<action class="org.onosproject.cli.SummaryCommand"/>
</command>
+
+ <command>
+ <action class="org.onosproject.cli.IssuCommand"/>
+ </command>
+
<command>
<action class="org.onosproject.cli.security.ReviewCommand"/>
<completers>
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
index 625183a..e680877 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
@@ -15,56 +15,14 @@
*/
package org.onosproject.cluster;
-import org.onlab.packet.IpAddress;
-
-import java.util.Set;
-
/**
* Service for administering the cluster node membership.
+ * <p>
+ * This service has a view of the cluster membership that is isolated to the local node's version during upgrades.
+ * For an equivalent service that has control over all nodes during an upgrade use
+ * {@link UnifiedClusterAdminService}.
+ *
+ * @see UnifiedClusterAdminService
*/
-public interface ClusterAdminService extends ClusterService {
-
- /**
- * Forms cluster configuration based on the specified set of node
- * information. This method resets and restarts the controller
- * instance.
- *
- * @param nodes set of nodes that form the cluster
- */
- void formCluster(Set<ControllerNode> nodes);
-
- /**
- * Forms cluster configuration based on the specified set of node
- * information. This method resets and restarts the controller
- * instance.
- *
- * @param nodes set of nodes that form the cluster
- * @param partitionSize number of nodes to compose a partition
- */
- void formCluster(Set<ControllerNode> nodes, int partitionSize);
-
- /**
- * Adds a new controller node to the cluster.
- *
- * @param nodeId controller node identifier
- * @param ip node IP listen address
- * @param tcpPort tcp listen port
- * @return newly added node
- */
- ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
-
- /**
- * Removes the specified node from the cluster node list.
- *
- * @param nodeId controller node identifier
- */
- void removeNode(NodeId nodeId);
-
- /**
- * Marks the current node as fully started or not.
- *
- * @param started true indicates all components have been started
- */
- void markFullyStarted(boolean started);
-
+public interface ClusterAdminService extends MembershipAdminService {
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
index 964357f..f17e4d9 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
@@ -15,65 +15,13 @@
*/
package org.onosproject.cluster;
-import java.util.Set;
-
-import org.joda.time.DateTime;
-import org.onosproject.core.Version;
-import org.onosproject.event.ListenerService;
-
/**
- * Service for obtaining information about the individual nodes within
- * the controller cluster.
+ * Service for obtaining information about the individual nodes within the controller cluster.
+ * <p>
+ * This service's view of the nodes in the cluster is isolated to a single version of the software. During upgrades,
+ * when multiple versions of the software are running in the same cluster, users of this service will only be able
+ * to see nodes running the same version as the local node. This is useful for limiting communication to nodes running
+ * the same version of the software.
*/
-public interface ClusterService
- extends ListenerService<ClusterEvent, ClusterEventListener> {
-
- /**
- * Returns the local controller node.
- *
- * @return local controller node
- */
- ControllerNode getLocalNode();
-
- /**
- * Returns the set of current cluster members.
- *
- * @return set of cluster members
- */
- Set<ControllerNode> getNodes();
-
- /**
- * Returns the specified controller node.
- *
- * @param nodeId controller node identifier
- * @return controller node
- */
- ControllerNode getNode(NodeId nodeId);
-
- /**
- * Returns the availability state of the specified controller node. Note
- * that this does not imply that all the core and application components
- * have been fully activated; only that the node has joined the cluster.
- *
- * @param nodeId controller node identifier
- * @return availability state
- */
- ControllerNode.State getState(NodeId nodeId);
-
- /**
- * Returns the version of the given controller node.
- *
- * @param nodeId controller node identifier
- * @return controller version
- */
- Version getVersion(NodeId nodeId);
-
- /**
- * Returns the system time when the availability state was last updated.
- *
- * @param nodeId controller node identifier
- * @return system time when the availability state was last updated.
- */
- DateTime getLastUpdated(NodeId nodeId);
-
+public interface ClusterService extends MembershipService {
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java
new file mode 100644
index 0000000..908a963
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.onlab.packet.IpAddress;
+
+/**
+ * Service for administering the cluster node membership.
+ */
+public interface MembershipAdminService extends MembershipService {
+
+ /**
+ * Forms cluster configuration based on the specified set of node
+ * information. This method resets and restarts the controller
+ * instance.
+ *
+ * @param nodes set of nodes that form the cluster
+ */
+ void formCluster(Set<ControllerNode> nodes);
+
+ /**
+ * Forms cluster configuration based on the specified set of node
+ * information. This method resets and restarts the controller
+ * instance.
+ *
+ * @param nodes set of nodes that form the cluster
+ * @param partitionSize number of nodes to compose a partition
+ */
+ void formCluster(Set<ControllerNode> nodes, int partitionSize);
+
+ /**
+ * Adds a new controller node to the cluster.
+ *
+ * @param nodeId controller node identifier
+ * @param ip node IP listen address
+ * @param tcpPort tcp listen port
+ * @return newly added node
+ */
+ ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
+
+ /**
+ * Removes the specified node from the cluster node list.
+ *
+ * @param nodeId controller node identifier
+ */
+ void removeNode(NodeId nodeId);
+
+ /**
+ * Marks the current node as fully started or not.
+ *
+ * @param started true indicates all components have been started
+ */
+ void markFullyStarted(boolean started);
+
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
new file mode 100644
index 0000000..7b366f6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.joda.time.DateTime;
+import org.onosproject.core.Version;
+import org.onosproject.event.ListenerService;
+
+/**
+ * Service for obtaining information about the individual nodes within
+ * the controller cluster.
+ */
+public interface MembershipService
+ extends ListenerService<ClusterEvent, ClusterEventListener> {
+
+ /**
+ * Returns the local controller node.
+ *
+ * @return local controller node
+ */
+ ControllerNode getLocalNode();
+
+ /**
+ * Returns the set of current cluster members.
+ *
+ * @return set of cluster members
+ */
+ Set<ControllerNode> getNodes();
+
+ /**
+ * Returns the specified controller node.
+ *
+ * @param nodeId controller node identifier
+ * @return controller node
+ */
+ ControllerNode getNode(NodeId nodeId);
+
+ /**
+ * Returns the availability state of the specified controller node. Note
+ * that this does not imply that all the core and application components
+ * have been fully activated; only that the node has joined the cluster.
+ *
+ * @param nodeId controller node identifier
+ * @return availability state
+ */
+ ControllerNode.State getState(NodeId nodeId);
+
+ /**
+ * Returns the version of the given controller node.
+ *
+ * @param nodeId controller node identifier
+ * @return controller version
+ */
+ Version getVersion(NodeId nodeId);
+
+ /**
+ * Returns the system time when the availability state was last updated.
+ *
+ * @param nodeId controller node identifier
+ * @return system time when the availability state was last updated.
+ */
+ DateTime getLastUpdated(NodeId nodeId);
+
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/PartitionId.java b/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
index f8b65e7..39852de 100644
--- a/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
+++ b/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
@@ -25,6 +25,11 @@
public class PartitionId extends Identifier<Integer> implements Comparable<PartitionId> {
/**
+ * The {@code PartitionId} for the shared coordination partition.
+ */
+ public static final PartitionId SHARED = PartitionId.from(0);
+
+ /**
* Creates a partition identifier from an integer.
*
* @param id input integer
diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java
new file mode 100644
index 0000000..9fb7407
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Cluster membership administration service that supports modification of all nodes during an upgrade.
+ */
+@Beta
+public interface UnifiedClusterAdminService extends MembershipAdminService {
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java
new file mode 100644
index 0000000..f690f3e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Unified multi-version cluster membership service.
+ * <p>
+ * During upgrades, the nodes within a cluster may be running multiple versions of the software.
+ * This service has a view of the entire cluster running any version. Users of this service must be careful when
+ * communicating with nodes described by this service as compatibility issues can result from communicating across
+ * versions. For an equivalent service that has an isolated view of the cluster, see {@link ClusterService}.
+ *
+ * @see ClusterService
+ */
+@Beta
+public interface UnifiedClusterService extends MembershipService {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index 4a46885..dfc558d 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,152 +15,15 @@
*/
package org.onosproject.store.cluster.messaging;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.onosproject.cluster.NodeId;
-
/**
* Service for assisting communications between controller cluster nodes.
+ * <p>
+ * Communication via this service is isolated to nodes running a single version of the software. During upgrades, when
+ * nodes may be running multiple versions simultaneously, this service prevents nodes running different versions of
+ * the software from communicating with each other, thus avoiding compatibility issues. For an equivalent cross-version
+ * compatible service, see {@link UnifiedClusterCommunicationService}.
+ *
+ * @see UnifiedClusterCommunicationService
*/
-public interface ClusterCommunicationService {
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param subscriber message subscriber
- * @param executor executor to use for running handler.
- * @deprecated in Cardinal Release
- */
- @Deprecated
- void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
-
- /**
- * Broadcasts a message to all controller nodes.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param <M> message type
- */
- <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder);
-
- /**
- * Broadcasts a message to all controller nodes including self.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param <M> message type
- */
- <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder);
-
- /**
- * Sends a message to the specified controller node.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param toNodeId destination node identifier
- * @param <M> message type
- * @return future that is completed when the message is sent
- */
- <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId);
-
- /**
- * Multicasts a message to a set of controller nodes.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param nodeIds recipient node identifiers
- * @param <M> message type
- */
- <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodeIds);
-
- /**
- * Sends a message and expects a reply.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding request to byte[]
- * @param decoder function for decoding response from byte[]
- * @param toNodeId recipient node identifier
- * @param <M> request type
- * @param <R> reply type
- * @return reply future
- */
- <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId);
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param decoder decoder for resurrecting incoming message
- * @param handler handler function that processes the incoming message and produces a reply
- * @param encoder encoder for serializing reply
- * @param executor executor to run this handler on
- * @param <M> incoming message type
- * @param <R> reply message type
- */
- <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor);
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param decoder decoder for resurrecting incoming message
- * @param handler handler function that processes the incoming message and produces a reply
- * @param encoder encoder for serializing reply
- * @param <M> incoming message type
- * @param <R> reply message type
- */
- <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder);
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param decoder decoder to resurrecting incoming message
- * @param handler handler for handling message
- * @param executor executor to run this handler on
- * @param <M> incoming message type
- */
- <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor);
-
- /**
- * Removes a subscriber for the specified message subject.
- *
- * @param subject message subject
- */
- void removeSubscriber(MessageSubject subject);
+public interface ClusterCommunicationService extends ClusterCommunicator {
}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java
new file mode 100644
index 0000000..7d2480e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.onosproject.cluster.NodeId;
+
+/**
+ * Service for assisting communications between controller cluster nodes.
+ */
+public interface ClusterCommunicator {
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param subscriber message subscriber
+ * @param executor executor to use for running handler.
+ * @deprecated in Cardinal Release
+ */
+ @Deprecated
+ void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
+
+ /**
+ * Broadcasts a message to all controller nodes.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param <M> message type
+ */
+ <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder);
+
+ /**
+ * Broadcasts a message to all controller nodes including self.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param <M> message type
+ */
+ <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder);
+
+ /**
+ * Sends a message to the specified controller node.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param toNodeId destination node identifier
+ * @param <M> message type
+ * @return future that is completed when the message is sent
+ */
+ <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId);
+
+ /**
+ * Multicasts a message to a set of controller nodes.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param nodeIds recipient node identifiers
+ * @param <M> message type
+ */
+ <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodeIds);
+
+ /**
+ * Sends a message and expects a reply.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding request to byte[]
+ * @param decoder function for decoding response from byte[]
+ * @param toNodeId recipient node identifier
+ * @param <M> request type
+ * @param <R> reply type
+ * @return reply future
+ */
+ <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder for resurrecting incoming message
+ * @param handler handler function that processes the incoming message and produces a reply
+ * @param encoder encoder for serializing reply
+ * @param executor executor to run this handler on
+ * @param <M> incoming message type
+ * @param <R> reply message type
+ */
+ <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder for resurrecting incoming message
+ * @param handler handler function that processes the incoming message and produces a reply
+ * @param encoder encoder for serializing reply
+ * @param <M> incoming message type
+ * @param <R> reply message type
+ */
+ <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder to resurrecting incoming message
+ * @param handler handler for handling message
+ * @param executor executor to run this handler on
+ * @param <M> incoming message type
+ */
+ <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor);
+
+ /**
+ * Removes a subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ */
+ void removeSubscriber(MessageSubject subject);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java
new file mode 100644
index 0000000..042c803
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Service for unified communication across controller nodes running multiple software versions.
+ * <p>
+ * This service supports communicating across nodes running different versions of the software simultaneously. During
+ * upgrades, when nodes may be running a mixture of versions, this service can be used to coordinate across those
+ * versions. But users of this service must be extremely careful to preserve backward/forward compatibility for
+ * messages sent across versions. Encoders and decoders used for messages sent/received on this service should
+ * support evolving schemas.
+ */
+@Beta
+public interface UnifiedClusterCommunicationService extends ClusterCommunicator {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
index a9547c9..adca843 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
@@ -22,6 +22,11 @@
*/
public interface AtomicValue<V> extends DistributedPrimitive {
+ @Override
+ default Type primitiveType() {
+ return Type.VALUE;
+ }
+
/**
* Atomically sets the value to the given updated value if the current value is equal to the expected value.
* <p>
diff --git a/core/api/src/main/java/org/onosproject/store/service/CoordinationService.java b/core/api/src/main/java/org/onosproject/store/service/CoordinationService.java
new file mode 100644
index 0000000..8725a06
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/CoordinationService.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Cross-version storage/coordination service.
+ * <p>
+ * This is a special type of {@link PrimitiveService} that differs semantically from {@link StorageService} in that
+ * it supports cross-version backward/forward compatible storage. During upgrades, when nodes are running different
+ * versions of the software, this service guarantees that cross-version compatibility will be maintained and provides
+ * shared compatible primitives for coordinating across versions. Users must ensure that all objects stored in
+ * primitives created via this service are stored using a serialization format that is backward/forward compatible,
+ * e.g. using {@link org.onlab.util.KryoNamespace.Builder#setCompatible(boolean)}.
+ *
+ * @see org.onlab.util.KryoNamespace.Builder#setCompatible(boolean)
+ * @see StorageService
+ */
+@Beta
+public interface CoordinationService extends PrimitiveService {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java b/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java
index fb903b8..c3c3835 100644
--- a/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java
+++ b/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java
@@ -15,15 +15,16 @@
*/
package org.onosproject.store.service;
-import com.google.common.collect.ImmutableList;
-
import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.onosproject.cluster.PartitionId;
+
/**
* Contains information about a database partition.
*/
public class PartitionInfo {
- private final String name;
+ private final PartitionId partitionId;
private final long term;
private final List<String> members;
private final String leader;
@@ -31,25 +32,25 @@
/**
* Class constructor.
*
- * @param name partition name
+ * @param partitionId partition identifier
* @param term term number
* @param members partition members
* @param leader leader name
*/
- public PartitionInfo(String name, long term, List<String> members, String leader) {
- this.name = name;
+ public PartitionInfo(PartitionId partitionId, long term, List<String> members, String leader) {
+ this.partitionId = partitionId;
this.term = term;
this.members = ImmutableList.copyOf(members);
this.leader = leader;
}
/**
- * Returns the name of the partition.
+ * Returns the partition ID.
*
- * @return partition name
+ * @return partition ID
*/
- public String name() {
- return name;
+ public PartitionId id() {
+ return partitionId;
}
/**
diff --git a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
new file mode 100644
index 0000000..7be0480
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Primitive service.
+ * <p>
+ * This service provides builders for various distributed primitives.
+ * <p>
+ * It is expected that services and applications will leverage the primitives indirectly provided by
+ * this service for their distributed state management and coordination.
+ */
+public interface PrimitiveService {
+
+ /**
+ * Creates a new EventuallyConsistentMapBuilder.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @return builder for an eventually consistent map
+ */
+ <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
+
+ /**
+ * Creates a new ConsistentMapBuilder.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @return builder for a consistent map
+ */
+ <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
+
+ /**
+ * Creates a new ConsistentMapBuilder.
+ *
+ * @param <V> value type
+ * @return builder for a document tree
+ */
+ <V> DocumentTreeBuilder<V> documentTreeBuilder();
+
+ /**
+ * Creates a new {@code AsyncConsistentTreeMapBuilder}.
+ *
+ * @param <V> value type
+ * @return builder for a consistent tree map
+ */
+ <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder();
+
+ /**
+ * Creates a new {@code AsyncConsistentSetMultimapBuilder}.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @return builder for a set based consistent multimap
+ */
+ <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder();
+
+ /**
+ * Creates a new {@code AtomicCounterMapBuilder}.
+ *
+ * @param <K> key type
+ * @return builder for an atomic counter map
+ */
+ <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder();
+
+ /**
+ * Creates a new DistributedSetBuilder.
+ *
+ * @param <E> set element type
+ * @return builder for a distributed set
+ */
+ <E> DistributedSetBuilder<E> setBuilder();
+
+ /**
+ * Creates a new AtomicCounterBuilder.
+ *
+ * @return atomic counter builder
+ */
+ AtomicCounterBuilder atomicCounterBuilder();
+
+ /**
+ * Creates a new AtomicIdGeneratorBuilder.
+ *
+ * @return atomic ID generator builder
+ */
+ AtomicIdGeneratorBuilder atomicIdGeneratorBuilder();
+
+ /**
+ * Creates a new AtomicValueBuilder.
+ *
+ * @param <V> atomic value type
+ * @return atomic value builder
+ */
+ <V> AtomicValueBuilder<V> atomicValueBuilder();
+
+ /**
+ * Creates a new LeaderElectorBuilder.
+ *
+ * @return leader elector builder
+ */
+ LeaderElectorBuilder leaderElectorBuilder();
+
+ /**
+ * Creates a new transaction context builder.
+ *
+ * @return a builder for a transaction context.
+ */
+ TransactionContextBuilder transactionContextBuilder();
+
+ /**
+ * Returns an instance of {@code AsyncAtomicCounter} with specified name.
+ * @param name counter name
+ *
+ * @return AsyncAtomicCounter instance
+ */
+ default AsyncAtomicCounter getAsyncAtomicCounter(String name) {
+ return atomicCounterBuilder().withName(name).build();
+ }
+
+ /**
+ * Returns an instance of {@code AsyncAtomicIdGenerator} with specified name.
+ *
+ * @param name ID generator name
+ * @return AsyncAtomicIdGenerator instance
+ */
+ default AsyncAtomicIdGenerator getAsyncAtomicIdGenerator(String name) {
+ return atomicIdGeneratorBuilder().withName(name).build();
+ }
+
+ /**
+ * Returns an instance of {@code AtomicCounter} with specified name.
+ * @param name counter name
+ *
+ * @return AtomicCounter instance
+ */
+ default AtomicCounter getAtomicCounter(String name) {
+ return getAsyncAtomicCounter(name).asAtomicCounter();
+ }
+
+ /**
+ * Returns an instance of {@code AtomicIdGenerator} with specified name.
+ *
+ * @param name ID generator name
+ * @return AtomicIdGenerator instance
+ */
+ default AtomicIdGenerator getAtomicIdGenerator(String name) {
+ return getAsyncAtomicIdGenerator(name).asAtomicIdGenerator();
+ }
+
+ /**
+ * Returns an instance of {@code WorkQueue} with specified name.
+ *
+ * @param <E> work element type
+ * @param name work queue name
+ * @param serializer serializer
+ * @return WorkQueue instance
+ */
+ <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
+
+ /**
+ * Returns an instance of {@code AsyncDocumentTree} with specified name.
+ *
+ * @param <V> tree node value type
+ * @param name document tree name
+ * @param serializer serializer
+ * @return AsyncDocumentTree instance
+ */
+ <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
+
+ /** Returns a set backed instance of {@code AsyncConsistentMultimap} with
+ * the specified name.
+ *
+ * @param name the multimap name
+ * @param serializer serializer
+ * @param <K> key type
+ * @param <V> value type
+ * @return set backed {@code AsyncConsistentMultimap} instance
+ */
+ <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name,
+ Serializer serializer);
+
+ /**
+ * Returns an instance of {@code AsyncConsistentTreeMap} with the specified
+ * name.
+ *
+ * @param name the treemap name
+ * @param serializer serializer
+ * @param <V> value type
+ * @return set backed {@code AsyncConsistentTreeMap} instance
+ */
+ <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name,
+ Serializer serializer);
+
+ /**
+ * Returns an instance of {@code Topic} with specified name.
+ *
+ * @param <T> topic message type
+ * @param name topic name
+ * @param serializer serializer
+ *
+ * @return Topic instance
+ */
+ <T> Topic<T> getTopic(String name, Serializer serializer);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index d863983..94843fc 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -23,195 +23,5 @@
* It is expected that services and applications will leverage the primitives indirectly provided by
* this service for their distributed state management and coordination.
*/
-public interface StorageService {
-
- /**
- * Creates a new EventuallyConsistentMapBuilder.
- *
- * @param <K> key type
- * @param <V> value type
- * @return builder for an eventually consistent map
- */
- <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
-
- /**
- * Creates a new ConsistentMapBuilder.
- *
- * @param <K> key type
- * @param <V> value type
- * @return builder for a consistent map
- */
- <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
-
- /**
- * Creates a new ConsistentMapBuilder.
- *
- * @param <V> value type
- * @return builder for a consistent map
- */
- <V> DocumentTreeBuilder<V> documentTreeBuilder();
-
- /**
- * Creates a new {@code AsyncConsistentTreeMapBuilder}.
- *
- * @param <V> value type
- * @return builder for a async consistent tree map
- */
- <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder();
-
- /**
- * Creates a new {@code AsyncConsistentSetMultimapBuilder}.
- *
- * @param <K> key type
- * @param <V> value type
- * @return builder for a set based async consistent multimap
- */
- <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder();
-
- /**
- * Creates a new {@code AtomicCounterMapBuilder}.
- *
- * @param <K> key type
- * @return builder for an atomic counter map
- */
- <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder();
-
- /**
- * Creates a new DistributedSetBuilder.
- *
- * @param <E> set element type
- * @return builder for an distributed set
- */
- <E> DistributedSetBuilder<E> setBuilder();
-
- /**
- * Creates a new AtomicCounterBuilder.
- *
- * @return atomic counter builder
- */
- AtomicCounterBuilder atomicCounterBuilder();
-
- /**
- * Creates a new AtomicIdGeneratorBuilder.
- *
- * @return atomic ID generator builder
- */
- AtomicIdGeneratorBuilder atomicIdGeneratorBuilder();
-
- /**
- * Creates a new AtomicValueBuilder.
- *
- * @param <V> atomic value type
- * @return atomic value builder
- */
- <V> AtomicValueBuilder<V> atomicValueBuilder();
-
- /**
- * Creates a new LeaderElectorBuilder.
- *
- * @return leader elector builder
- */
- LeaderElectorBuilder leaderElectorBuilder();
-
- /**
- * Creates a new transaction context builder.
- *
- * @return a builder for a transaction context.
- */
- TransactionContextBuilder transactionContextBuilder();
-
- /**
- * Returns an instance of {@code AsyncAtomicCounter} with specified name.
- * @param name counter name
- *
- * @return AsyncAtomicCounter instance
- */
- default AsyncAtomicCounter getAsyncAtomicCounter(String name) {
- return atomicCounterBuilder().withName(name).build();
- }
-
- /**
- * Returns an instance of {@code AsyncAtomicIdGenerator} with specified name.
- *
- * @param name ID generator name
- * @return AsyncAtomicIdGenerator instance
- */
- default AsyncAtomicIdGenerator getAsyncAtomicIdGenerator(String name) {
- return atomicIdGeneratorBuilder().withName(name).build();
- }
-
- /**
- * Returns an instance of {@code AtomicCounter} with specified name.
- * @param name counter name
- *
- * @return AtomicCounter instance
- */
- default AtomicCounter getAtomicCounter(String name) {
- return getAsyncAtomicCounter(name).asAtomicCounter();
- }
-
- /**
- * Returns an instance of {@code AtomicIdGenerator} with specified name.
- *
- * @param name ID generator name
- * @return AtomicIdGenerator instance
- */
- default AtomicIdGenerator getAtomicIdGenerator(String name) {
- return getAsyncAtomicIdGenerator(name).asAtomicIdGenerator();
- }
-
- /**
- * Returns an instance of {@code WorkQueue} with specified name.
- *
- * @param <E> work element type
- * @param name work queue name
- * @param serializer serializer
- * @return WorkQueue instance
- */
- <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
-
- /**
- * Returns an instance of {@code AsyncDocumentTree} with specified name.
- *
- * @param <V> tree node value type
- * @param name document tree name
- * @param serializer serializer
- * @return AsyncDocumentTree instance
- */
- <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
-
- /** Returns a set backed instance of {@code AsyncConsistentMultimap} with
- * the specified name.
- *
- * @param name the multimap name
- * @param serializer serializer
- * @param <K> key type
- * @param <V> value type
- * @return set backed {@code AsyncConsistentMultimap} instance
- */
- <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name,
- Serializer serializer);
-
- /**
- * Returns an instance of {@code AsyncConsistentTreeMap} with the specified
- * name.
- *
- * @param name the treemap name
- * @param serializer serializer
- * @param <V> value type
- * @return set backed {@code AsyncConsistentTreeMap} instance
- */
- <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name,
- Serializer serializer);
-
- /**
- * Returns an instance of {@code Topic} with specified name.
- *
- * @param <T> topic message type
- * @param name topic name
- * @param serializer serializer
- *
- * @return Topic instance
- */
- <T> Topic<T> getTopic(String name, Serializer serializer);
+public interface StorageService extends PrimitiveService {
}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/Upgrade.java b/core/api/src/main/java/org/onosproject/upgrade/Upgrade.java
new file mode 100644
index 0000000..79fa7ba
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/Upgrade.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.core.Version;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Represents the state of an upgrade.
+ * <p>
+ * An upgrade consists of a {@link #source() source} and {@link #target() target} version and an upgrade
+ * {@link #status()}.
+ */
+@Beta
+public class Upgrade {
+
+ /**
+ * Represents the phase of the upgrade protocol.
+ */
+ @Beta
+ public enum Status {
+
+ /**
+ * Represents state in which no upgrade has been initialized.
+ */
+ INACTIVE(false, false),
+
+ /**
+ * Indicates that an upgrade is being initialized.
+ */
+ INITIALIZING(true, false),
+
+ /**
+ * Indicates that an upgrade has been initialized.
+ */
+ INITIALIZED(true, false),
+
+ /**
+ * Indicates that an upgrade is in progress.
+ */
+ UPGRADING(true, true),
+
+ /**
+ * Indicates that an upgrade is complete.
+ */
+ UPGRADED(true, true),
+
+ /**d
+ * Indicates that an upgrade is being committed.
+ */
+ COMMITTING(true, true),
+
+ /**
+ * Indicates that an upgrade has been committed.
+ */
+ COMMITTED(false, true),
+
+ /**
+ * Indicates that an upgrade is being rolled back.
+ */
+ ROLLING_BACK(true, false),
+
+ /**
+ * Indicates that an upgrade has been rolled back.
+ */
+ ROLLED_BACK(true, false),
+
+ /**
+ * Indicates that an upgrade is being reset.
+ */
+ RESETTING(true, false),
+
+ /**
+ * Indicates that an upgrade has been reset.
+ */
+ RESET(false, false);
+
+ private final boolean active;
+ private final boolean upgraded;
+
+ Status(boolean active, boolean upgraded) {
+ this.active = active;
+ this.upgraded = upgraded;
+ }
+
+ /**
+ * Returns whether the upgrade status is active.
+ *
+ * @return whether the upgrade status is active
+ */
+ public boolean active() {
+ return active;
+ }
+
+ /**
+ * Returns whether the upgraded version is active.
+ *
+ * @return whether the upgraded version is active
+ */
+ public boolean upgraded() {
+ return upgraded;
+ }
+ }
+
+ private final Version source;
+ private final Version target;
+ private final Status status;
+
+ public Upgrade(Version source, Version target, Status status) {
+ this.source = source;
+ this.target = target;
+ this.status = status;
+ }
+
+ /**
+ * Returns the source version.
+ *
+ * @return the source version
+ */
+ public Version source() {
+ return source;
+ }
+
+ /**
+ * Returns the target version.
+ *
+ * @return the target version
+ */
+ public Version target() {
+ return target;
+ }
+
+ /**
+ * Returns the upgrade status.
+ *
+ * @return the upgrade status
+ */
+ public Status status() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("source", source)
+ .add("target", target)
+ .add("status", status)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeAdminService.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeAdminService.java
new file mode 100644
index 0000000..38c1bb1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeAdminService.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Abstraction for executing the stages of the upgrade process.
+ * <p>
+ * Upgrades are performed in three phases:
+ * <ul>
+ * <li>{@code initialize} - Initializes an upgrade</li>
+ * <li>{@code upgrade} - Performs an upgrade, transferring device mastership from the current version to the
+ * upgraded version</li>
+ * <li>{@code commit} or {@code rollback} - Completes or rolls back an upgrade, transferring mastership back
+ * to nodes running the previous version</li>
+ * </ul>
+ */
+@Beta
+public interface UpgradeAdminService {
+
+ /**
+ * Initializes an upgrade.
+ * <p>
+ * This method must be called to initialize an upgrade and prior to physically upgrading any nodes.
+ *
+ * @throws IllegalStateException if an upgrade is already in progress
+ */
+ void initialize();
+
+ /**
+ * Performs an upgrade, transferring device mastership to upgraded nodes.
+ * <p>
+ * This method transfers mastership from the current version of the software to the upgraded version. Thus,
+ * a subset of the nodes in the cluster must have been physically upgraded and restarted prior to executing this
+ * phase of the upgrade protocol.
+ *
+ * @throws IllegalStateException if no upgrade has been initialized
+ */
+ void upgrade();
+
+ /**
+ * Commits an upgrade.
+ * <p>
+ * Completes the upgrade process, committing the new cluster version.
+ *
+ * @throws IllegalStateException if no upgrade is in progress or not all nodes have been upgraded
+ */
+ void commit();
+
+ /**
+ * Rolls back an upgrade.
+ * <p>
+ * When an upgrade is rolled back, mastership is transferred from upgraded nodes back to nodes running the
+ * version of the software prior to the upgrade.
+ *
+ * @throws IllegalStateException if no upgrade is in progress
+ */
+ void rollback();
+
+ /**
+ * Resets an upgrade.
+ * <p>
+ * When an upgrade is rolled back, once nodes have been restored to the previos version the upgrade must be reset
+ * to restore the upgrade state to {@link Upgrade.Status#INACTIVE}.
+ *
+ * @throws IllegalStateException if nodes have not been restored to the previous state
+ */
+ void reset();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeEvent.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEvent.java
new file mode 100644
index 0000000..43cf523
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEvent.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import java.util.Objects;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Upgrade event.
+ */
+@Beta
+public class UpgradeEvent extends AbstractEvent<UpgradeEvent.Type, Upgrade> {
+
+ /**
+ * Type of upgrade-related events.
+ */
+ @Beta
+ public enum Type {
+
+ /**
+ * Indicates that a new upgrade was initialized.
+ */
+ INITIALIZED,
+
+ /**
+ * Indicates that mastership was reassigned to the upgraded cluster.
+ */
+ UPGRADED,
+
+ /**
+ * Indicates that an upgrade was committed.
+ */
+ COMMITTED,
+
+ /**
+ * Indicates that an upgrade was rolled back.
+ */
+ ROLLED_BACK,
+
+ /**
+ * Indicates that an upgrade was reset.
+ */
+ RESET,
+ }
+
+ /**
+ * Creates an event of a given type and for the specified state and the
+ * current time.
+ *
+ * @param type upgrade event type
+ * @param state upgrade state
+ */
+ public UpgradeEvent(UpgradeEvent.Type type, Upgrade state) {
+ super(type, state);
+ }
+
+ /**
+ * Creates an event of a given type and for the specified state and time.
+ *
+ * @param type upgrade event type
+ * @param state upgrade state
+ * @param time occurrence time
+ */
+ public UpgradeEvent(UpgradeEvent.Type type, Upgrade state, long time) {
+ super(type, state, time);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type(), subject(), time());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof UpgradeEvent) {
+ final UpgradeEvent other = (UpgradeEvent) obj;
+ return Objects.equals(this.type(), other.type()) &&
+ Objects.equals(this.subject(), other.subject()) &&
+ Objects.equals(this.time(), other.time());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this.getClass())
+ .add("type", type())
+ .add("subject", subject())
+ .add("time", time())
+ .toString();
+ }
+
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeEventListener.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEventListener.java
new file mode 100644
index 0000000..876d35a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEventListener.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.EventListener;
+
+/**
+ * Upgrade event listener.
+ */
+@Beta
+public interface UpgradeEventListener extends EventListener<UpgradeEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeService.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeService.java
new file mode 100644
index 0000000..ba5889c
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeService.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.core.Version;
+import org.onosproject.event.ListenerService;
+
+/**
+ * Upgrade service.
+ */
+@Beta
+public interface UpgradeService
+ extends ListenerService<UpgradeEvent, UpgradeEventListener> {
+
+ /**
+ * Returns whether an upgrade is in progress.
+ * <p>
+ * An upgrade is in progress if the upgrade {@link Upgrade.Status} is active, e.g.
+ * {@link Upgrade.Status#INITIALIZED}, {@link Upgrade.Status#UPGRADED}, etc.
+ *
+ * @return indicates whether an upgrade is in progress
+ */
+ boolean isUpgrading();
+
+ /**
+ * Returns the current upgrade state.
+ *
+ * @return the current upgrade state
+ */
+ Upgrade getState();
+
+ /**
+ * Returns the currently active software version.
+ * <p>
+ * The returned version is representative of the version currently in control of the network. When the upgrade
+ * transitions to the {@link Upgrade.Status#UPGRADING} state, control over the network is transferred from
+ * {@link Upgrade#source()} nodes to {@link Upgrade#target()} nodes, and the version returned by this method
+ * represents that change.
+ *
+ * @return the software version
+ */
+ Version getVersion();
+
+ /**
+ * Returns whether the local node is active.
+ * <p>
+ * The local node will be active if its {@link Version} matches the version returned by {@link #getVersion()}.
+ *
+ * @return indicates whether the local node is active according to its version
+ */
+ boolean isLocalActive();
+
+ /**
+ * Returns whether the local node is an upgraded node.
+ *
+ * @return {@code true} if the local node's version matches {@link Upgrade#target()}
+ */
+ boolean isLocalUpgraded();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/package-info.java b/core/api/src/main/java/org/onosproject/upgrade/package-info.java
new file mode 100644
index 0000000..5f19a54
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Abstractions for managing software upgrades.
+ */
+package org.onosproject.upgrade;
diff --git a/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java
new file mode 100644
index 0000000..aca74ec
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.joda.time.DateTime;
+import org.onosproject.core.Version;
+
+/**
+ * Compatible cluster service adapter.
+ */
+public class UnifiedClusterServiceAdapter implements UnifiedClusterService {
+ @Override
+ public ControllerNode getLocalNode() {
+ return null;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return null;
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public ControllerNode.State getState(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public Version getVersion(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public DateTime getLastUpdated(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncAtomicValueAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncAtomicValueAdapter.java
new file mode 100644
index 0000000..0aefe8c
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncAtomicValueAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async atomic value adapter.
+ */
+public class AsyncAtomicValueAdapter<V> implements AsyncAtomicValue<V> {
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<V> get() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<V> getAndSet(V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> set(V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
+ return null;
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AtomicValueAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AtomicValueAdapter.java
new file mode 100644
index 0000000..e6a4e2f
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AtomicValueAdapter.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Atomic value adapter.
+ */
+public class AtomicValueAdapter<V> implements AtomicValue<V> {
+ private final String name;
+
+ public AtomicValueAdapter() {
+ this(null);
+ }
+
+ public AtomicValueAdapter(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type primitiveType() {
+ return null;
+ }
+
+ @Override
+ public boolean compareAndSet(V expect, V update) {
+ return false;
+ }
+
+ @Override
+ public V get() {
+ return null;
+ }
+
+ @Override
+ public V getAndSet(V value) {
+ return null;
+ }
+
+ @Override
+ public void set(V value) {
+
+ }
+
+ @Override
+ public void addListener(AtomicValueEventListener<V> listener) {
+
+ }
+
+ @Override
+ public void removeListener(AtomicValueEventListener<V> listener) {
+
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
new file mode 100644
index 0000000..f05d9aa
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Coordination service adapter.
+ */
+public class CoordinationServiceAdapter implements CoordinationService {
+ @Override
+ public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+ return null;
+ }
+
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ return null;
+ }
+
+ @Override
+ public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+ return null;
+ }
+
+ @Override
+ public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+ return null;
+ }
+
+ @Override
+ public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+ return null;
+ }
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ return null;
+ }
+
+ @Override
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ return null;
+ }
+
+ @Override
+ public AtomicCounterBuilder atomicCounterBuilder() {
+ return null;
+ }
+
+ @Override
+ public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+ return null;
+ }
+
+ @Override
+ public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+ return null;
+ }
+
+ @Override
+ public LeaderElectorBuilder leaderElectorBuilder() {
+ return null;
+ }
+
+ @Override
+ public TransactionContextBuilder transactionContextBuilder() {
+ return null;
+ }
+
+ @Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ return null;
+ }
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ return null;
+ }
+
+ @Override
+ public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
+ return null;
+ }
+
+ @Override
+ public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
+ return null;
+ }
+
+ @Override
+ public <T> Topic<T> getTopic(String name, Serializer serializer) {
+ return null;
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 531a6fa..df7feff 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -15,7 +15,8 @@
*/
package org.onosproject.cluster.impl;
-import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -23,41 +24,19 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
-import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterAdminService;
-import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataAdminService;
-import org.onosproject.cluster.ClusterMetadataDiff;
-import org.onosproject.cluster.ClusterMetadataEvent;
-import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.UnifiedClusterAdminService;
+import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.core.VersionService;
import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
import static org.slf4j.LoggerFactory.getLogger;
@@ -67,172 +46,122 @@
*/
@Component(immediate = true)
@Service
-public class ClusterManager
- extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
- implements ClusterService, ClusterAdminService {
+public class ClusterManager implements ClusterService, ClusterAdminService {
- public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
- private static final int DEFAULT_PARTITION_SIZE = 3;
private final Logger log = getLogger(getClass());
- private ClusterStoreDelegate delegate = new InternalStoreDelegate();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private UnifiedClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
+ private UnifiedClusterAdminService clusterAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataAdminService clusterMetadataAdminService;
+ private VersionService versionService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterStore store;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected SystemService systemService;
-
- private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
- private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+ private Version version;
@Activate
public void activate() {
- store.setDelegate(delegate);
- eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
- clusterMetadataService.addListener(metadataListener);
- processMetadata(clusterMetadataService.getClusterMetadata());
+ version = versionService.version();
log.info("Started");
}
@Deactivate
public void deactivate() {
- clusterMetadataService.removeListener(metadataListener);
- store.unsetDelegate(delegate);
- eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
}
@Override
public ControllerNode getLocalNode() {
checkPermission(CLUSTER_READ);
- return store.getLocalNode();
+ return clusterService.getLocalNode();
}
@Override
public Set<ControllerNode> getNodes() {
checkPermission(CLUSTER_READ);
- return store.getNodes();
+ return clusterService.getNodes()
+ .stream()
+ .filter(node -> clusterService.getVersion(node.id()).equals(version))
+ .collect(Collectors.toSet());
}
@Override
public ControllerNode getNode(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getNode(nodeId);
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ if (nodeVersion != null && nodeVersion.equals(version)) {
+ return clusterService.getNode(nodeId);
+ }
+ return null;
}
@Override
public ControllerNode.State getState(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getState(nodeId);
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ if (nodeVersion != null && nodeVersion.equals(version)) {
+ return clusterService.getState(nodeId);
+ }
+ return null;
}
@Override
public Version getVersion(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getVersion(nodeId);
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ if (nodeVersion != null && nodeVersion.equals(version)) {
+ return nodeVersion;
+ }
+ return null;
}
@Override
public void markFullyStarted(boolean started) {
- store.markFullyStarted(started);
+ clusterAdminService.markFullyStarted(started);
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- return store.getLastUpdated(nodeId);
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ if (nodeVersion != null && nodeVersion.equals(version)) {
+ return clusterService.getLastUpdated(nodeId);
+ }
+ return null;
}
@Override
public void formCluster(Set<ControllerNode> nodes) {
- formCluster(nodes, DEFAULT_PARTITION_SIZE);
+ clusterAdminService.formCluster(nodes);
}
@Override
public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
- checkNotNull(nodes, "Nodes cannot be null");
- checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
-
- ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
- clusterMetadataAdminService.setClusterMetadata(metadata);
- try {
- log.warn("Shutting down container for cluster reconfiguration!");
- // Clean up persistent state associated with previous cluster configuration.
- Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
- systemService.reboot("now", SystemService.Swipe.NONE);
- } catch (Exception e) {
- log.error("Unable to reboot container", e);
- }
+ clusterAdminService.formCluster(nodes, partitionSize);
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- checkNotNull(ip, "IP address cannot be null");
- checkArgument(tcpPort > 5000, "TCP port must be > 5000");
- return store.addNode(nodeId, ip, tcpPort);
+ return clusterAdminService.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- store.removeNode(nodeId);
- }
-
- // Store delegate to re-post events emitted from the store.
- private class InternalStoreDelegate implements ClusterStoreDelegate {
- @Override
- public void notify(ClusterEvent event) {
- post(event);
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ if (nodeVersion != null && nodeVersion.equals(version)) {
+ clusterAdminService.removeNode(nodeId);
}
}
- private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
- List<ControllerNode> sorted = new ArrayList<>(nodes);
- Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
- Set<Partition> partitions = Sets.newHashSet();
- // add partitions
- int length = nodes.size();
- int count = Math.min(partitionSize, length);
- for (int i = 0; i < length; i++) {
- int index = i;
- Set<NodeId> set = new HashSet<>(count);
- for (int j = 0; j < count; j++) {
- set.add(sorted.get((i + j) % length).id());
- }
- partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
- }
- return partitions;
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ clusterService.addListener(listener);
}
- /**
- * Processes metadata by adding and removing nodes from the cluster.
- */
- private synchronized void processMetadata(ClusterMetadata metadata) {
- try {
- ClusterMetadataDiff examiner =
- new ClusterMetadataDiff(currentMetadata.get(), metadata);
- examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
- examiner.nodesRemoved().forEach(this::removeNode);
- } finally {
- currentMetadata.set(metadata);
- }
- }
-
- private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
- @Override
- public void event(ClusterMetadataEvent event) {
- processMetadata(event.subject());
- }
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ clusterService.removeListener(listener);
}
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java
new file mode 100644
index 0000000..3617305
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2014-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.karaf.system.SystemService;
+import org.joda.time.DateTime;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ClusterStore;
+import org.onosproject.cluster.ClusterStoreDelegate;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.UnifiedClusterAdminService;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+import org.onosproject.event.AbstractListenerManager;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the cluster service.
+ */
+@Component(immediate = true)
+@Service
+public class UnifiedClusterManager
+ extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
+ implements UnifiedClusterService, UnifiedClusterAdminService {
+
+ public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+ private static final int DEFAULT_PARTITION_SIZE = 3;
+ private final Logger log = getLogger(getClass());
+
+ private ClusterStoreDelegate delegate = new InternalStoreDelegate();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterMetadataService clusterMetadataService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterMetadataAdminService clusterMetadataAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected SystemService systemService;
+
+ private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+ private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
+ @Activate
+ public void activate() {
+ store.setDelegate(delegate);
+ eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
+ clusterMetadataService.addListener(metadataListener);
+ processMetadata(clusterMetadataService.getClusterMetadata());
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterMetadataService.removeListener(metadataListener);
+ store.unsetDelegate(delegate);
+ eventDispatcher.removeSink(ClusterEvent.class);
+ log.info("Stopped");
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ checkPermission(CLUSTER_READ);
+ return store.getLocalNode();
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ checkPermission(CLUSTER_READ);
+ return store.getNodes();
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ checkPermission(CLUSTER_READ);
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return store.getNode(nodeId);
+ }
+
+ @Override
+ public ControllerNode.State getState(NodeId nodeId) {
+ checkPermission(CLUSTER_READ);
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return store.getState(nodeId);
+ }
+
+ @Override
+ public Version getVersion(NodeId nodeId) {
+ checkPermission(CLUSTER_READ);
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return store.getVersion(nodeId);
+ }
+
+ @Override
+ public void markFullyStarted(boolean started) {
+ store.markFullyStarted(started);
+ }
+
+ @Override
+ public DateTime getLastUpdated(NodeId nodeId) {
+ checkPermission(CLUSTER_READ);
+ return store.getLastUpdated(nodeId);
+ }
+
+ @Override
+ public void formCluster(Set<ControllerNode> nodes) {
+ formCluster(nodes, DEFAULT_PARTITION_SIZE);
+ }
+
+ @Override
+ public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
+ checkNotNull(nodes, "Nodes cannot be null");
+ checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
+
+ ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
+ clusterMetadataAdminService.setClusterMetadata(metadata);
+ try {
+ log.warn("Shutting down container for cluster reconfiguration!");
+ // Clean up persistent state associated with previous cluster configuration.
+ Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
+ systemService.reboot("now", SystemService.Swipe.NONE);
+ } catch (Exception e) {
+ log.error("Unable to reboot container", e);
+ }
+ }
+
+ @Override
+ public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ checkNotNull(ip, "IP address cannot be null");
+ checkArgument(tcpPort > 5000, "TCP port must be > 5000");
+ return store.addNode(nodeId, ip, tcpPort);
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ store.removeNode(nodeId);
+ }
+
+ // Store delegate to re-post events emitted from the store.
+ private class InternalStoreDelegate implements ClusterStoreDelegate {
+ @Override
+ public void notify(ClusterEvent event) {
+ post(event);
+ }
+ }
+
+ private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
+ List<ControllerNode> sorted = new ArrayList<>(nodes);
+ Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
+ Set<Partition> partitions = Sets.newHashSet();
+ // add partitions
+ int length = nodes.size();
+ int count = Math.min(partitionSize, length);
+ for (int i = 0; i < length; i++) {
+ int index = i;
+ Set<NodeId> set = new HashSet<>(count);
+ for (int j = 0; j < count; j++) {
+ set.add(sorted.get((i + j) % length).id());
+ }
+ partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
+ }
+ return partitions;
+ }
+
+ /**
+ * Processes metadata by adding and removing nodes from the cluster.
+ */
+ private synchronized void processMetadata(ClusterMetadata metadata) {
+ try {
+ ClusterMetadataDiff examiner =
+ new ClusterMetadataDiff(currentMetadata.get(), metadata);
+ examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+ examiner.nodesRemoved().forEach(this::removeNode);
+ } finally {
+ currentMetadata.set(metadata);
+ }
+ }
+
+ private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+ @Override
+ public void event(ClusterMetadataEvent event) {
+ processMetadata(event.subject());
+ }
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java b/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java
index e734f7d..12210a9 100644
--- a/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java
+++ b/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java
@@ -54,6 +54,7 @@
// version file not found, using default
log.trace("Version file not found", e);
}
+ log.info("Started");
}
@Override
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index d2b2536..bbf6d39 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -463,7 +463,7 @@
// isReachable but was not MASTER or STANDBY, get a role and apply
// Note: NONE triggers request to MastershipService
- reassertRole(deviceId, NONE);
+ reassertRole(deviceId, mastershipService.getLocalRole(deviceId));
}
}
@@ -819,49 +819,32 @@
private void reassertRole(final DeviceId did,
final MastershipRole nextRole) {
- MastershipRole myNextRole = nextRole;
- if (myNextRole == NONE) {
- try {
- mastershipService.requestRoleFor(did).get();
- MastershipTerm term = termService.getMastershipTerm(did);
- if (term != null && localNodeId.equals(term.master())) {
- myNextRole = MASTER;
- } else {
- myNextRole = STANDBY;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Interrupted waiting for Mastership", e);
- } catch (ExecutionException e) {
- log.error("Encountered an error waiting for Mastership", e);
- }
- }
-
- switch (myNextRole) {
+ switch (nextRole) {
case MASTER:
final Device device = getDevice(did);
if ((device != null) && !isAvailable(did)) {
store.markOnline(did);
}
// TODO: should apply role only if there is mismatch
- log.debug("Applying role {} to {}", myNextRole, did);
+ log.debug("Applying role {} to {}", nextRole, did);
if (!applyRoleAndProbe(did, MASTER)) {
- log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
+ log.warn("Unsuccessful applying role {} to {}", nextRole, did);
// immediately failed to apply role
mastershipService.relinquishMastership(did);
// FIXME disconnect?
}
break;
case STANDBY:
- log.debug("Applying role {} to {}", myNextRole, did);
+ log.debug("Applying role {} to {}", nextRole, did);
if (!applyRoleAndProbe(did, STANDBY)) {
- log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
+ log.warn("Unsuccessful applying role {} to {}", nextRole, did);
// immediately failed to apply role
mastershipService.relinquishMastership(did);
// FIXME disconnect?
}
break;
case NONE:
+ break;
default:
// should never reach here
log.error("You didn't see anything. I did not exist.");
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
new file mode 100644
index 0000000..ce55eda
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade.impl;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.upgrade.Upgrade;
+import org.onosproject.upgrade.UpgradeAdminService;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
+import org.onosproject.upgrade.UpgradeService;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Upgrade service implementation.
+ * <p>
+ * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive.
+ * Upgrade state can be seen by current and future version nodes.
+ */
+@Component(immediate = true)
+@Service
+public class UpgradeManager
+ extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener>
+ implements UpgradeService, UpgradeAdminService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoordinationService coordinationService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterService clusterService;
+
+ private Version localVersion;
+ private AtomicValue<Upgrade> state;
+ private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
+ private final AtomicValueEventListener<Upgrade> stateListener = event -> handleChange(event);
+
+ @Activate
+ public void activate() {
+ state = coordinationService.<Upgrade>atomicValueBuilder()
+ .withName("onos-upgrade-state")
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .build()
+ .asAtomicValue();
+ localVersion = versionService.version();
+
+ currentState.set(state.get());
+ if (currentState.get() == null) {
+ currentState.set(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE));
+ state.set(currentState.get());
+ }
+
+ Upgrade upgrade = currentState.get();
+
+ // If the upgrade state is not initialized, ensure this node matches the version of the cluster.
+ if (!upgrade.status().active() && !Objects.equals(upgrade.source(), localVersion)) {
+ log.error("Node version {} inconsistent with cluster version {}", localVersion, upgrade.source());
+ throw new IllegalStateException("Node version " + localVersion +
+ " inconsistent with cluster version " + upgrade.source());
+ }
+
+ // If the upgrade state is initialized then check the node version.
+ if (upgrade.status() == Upgrade.Status.INITIALIZED) {
+ // If the source version equals the target version, attempt to update the target version.
+ if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) {
+ upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status());
+ currentState.set(upgrade);
+ state.set(upgrade);
+ }
+ }
+
+ // If the upgrade status is active, verify that the local version matches the upgrade version.
+ if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) {
+ // If the upgrade source/target are not equal, validate that the node's version is consistent
+ // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being
+ // restarted, or that a node has been upgraded, so we need to check that this node is running either
+ // the source or target version.
+ if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) {
+ log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress",
+ localVersion, upgrade.target());
+ throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " +
+ upgrade.target() + " already in progress");
+ }
+ }
+
+ state.addListener(stateListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ state.removeListener(stateListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public boolean isUpgrading() {
+ return getState().status().active();
+ }
+
+ @Override
+ public Upgrade getState() {
+ return currentState.get();
+ }
+
+ @Override
+ public Version getVersion() {
+ Upgrade upgrade = currentState.get();
+ return upgrade.status().upgraded()
+ ? upgrade.target()
+ : upgrade.source();
+ }
+
+ @Override
+ public boolean isLocalActive() {
+ return localVersion.equals(getVersion());
+ }
+
+ @Override
+ public boolean isLocalUpgraded() {
+ Upgrade upgrade = currentState.get();
+ return upgrade.status().active()
+ && !upgrade.source().equals(upgrade.target())
+ && localVersion.equals(upgrade.target());
+ }
+
+ @Override
+ public void initialize() {
+ Upgrade inactive = currentState.get();
+
+ // If the current upgrade status is active, fail initialization.
+ if (inactive.status().active()) {
+ throw new IllegalStateException("Upgrade already active");
+ }
+
+ // Set the upgrade status to INITIALIZING.
+ Upgrade initializing = new Upgrade(
+ localVersion,
+ localVersion,
+ Upgrade.Status.INITIALIZING);
+ if (!state.compareAndSet(inactive, initializing)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(initializing);
+
+ // Set the upgrade status to INITIALIZED.
+ Upgrade initialized = new Upgrade(
+ initializing.source(),
+ initializing.target(),
+ Upgrade.Status.INITIALIZED);
+ if (!state.compareAndSet(initializing, initialized)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(initialized);
+ }
+ }
+ }
+
+ @Override
+ public void upgrade() {
+ Upgrade initialized = currentState.get();
+
+ // If the current upgrade status is not INITIALIZED, throw an exception.
+ if (initialized.status() != Upgrade.Status.INITIALIZED) {
+ throw new IllegalStateException("Upgrade not initialized");
+ }
+
+ // Set the upgrade status to UPGRADING.
+ Upgrade upgrading = new Upgrade(
+ initialized.source(),
+ initialized.target(),
+ Upgrade.Status.UPGRADING);
+ if (!state.compareAndSet(initialized, upgrading)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(upgrading);
+
+ // Set the upgrade status to UPGRADED.
+ Upgrade upgraded = new Upgrade(
+ upgrading.source(),
+ upgrading.target(),
+ Upgrade.Status.UPGRADED);
+ if (!state.compareAndSet(upgrading, upgraded)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(upgraded);
+ }
+ }
+ }
+
+ @Override
+ public void commit() {
+ Upgrade upgraded = currentState.get();
+
+ // If the current upgrade status is not UPGRADED, throw an exception.
+ if (upgraded.status() != Upgrade.Status.UPGRADED) {
+ throw new IllegalStateException("Upgrade not performed");
+ }
+
+ // Determine whether any nodes have not been upgraded to the target version.
+ boolean upgradeComplete = clusterService.getNodes()
+ .stream()
+ .allMatch(node -> {
+ ControllerNode.State state = clusterService.getState(node.id());
+ Version version = clusterService.getVersion(node.id());
+ return state.isActive() && version != null && version.equals(upgraded.target());
+ });
+
+ // If some nodes have not yet been upgraded, throw an exception.
+ if (!upgradeComplete) {
+ throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target());
+ }
+
+ // Set the upgrade status to COMMITTING.
+ Upgrade committing = new Upgrade(
+ upgraded.source(),
+ upgraded.target(),
+ Upgrade.Status.COMMITTING);
+ if (!state.compareAndSet(upgraded, committing)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(committing);
+
+ // Set the upgrade status to COMMITTED.
+ Upgrade committed = new Upgrade(
+ committing.source(),
+ committing.target(),
+ Upgrade.Status.COMMITTED);
+ if (!state.compareAndSet(committing, committed)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(committed);
+
+ // Set the upgrade status to INACTIVE.
+ Upgrade inactive = new Upgrade(
+ localVersion,
+ localVersion,
+ Upgrade.Status.INACTIVE);
+ if (!state.compareAndSet(committed, inactive)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(inactive);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void rollback() {
+ Upgrade upgraded = currentState.get();
+
+ // If the current upgrade status is not UPGRADED, throw an exception.
+ if (upgraded.status() != Upgrade.Status.UPGRADED) {
+ throw new IllegalStateException("Upgrade not performed");
+ }
+
+ // Set the upgrade status to ROLLING_BACK.
+ Upgrade rollingBack = new Upgrade(
+ upgraded.source(),
+ upgraded.target(),
+ Upgrade.Status.ROLLING_BACK);
+ if (!state.compareAndSet(upgraded, rollingBack)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(rollingBack);
+
+ // Set the upgrade status to ROLLED_BACK.
+ Upgrade rolledBack = new Upgrade(
+ rollingBack.source(),
+ rollingBack.target(),
+ Upgrade.Status.ROLLED_BACK);
+ if (!state.compareAndSet(rollingBack, rolledBack)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(rolledBack);
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+ Upgrade upgraded = currentState.get();
+
+ // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception.
+ if (upgraded.status() != Upgrade.Status.INITIALIZED
+ && upgraded.status() != Upgrade.Status.ROLLED_BACK) {
+ throw new IllegalStateException("Upgrade not rolled back");
+ }
+
+ // Determine whether any nodes are still running the target version.
+ boolean rollbackComplete = clusterService.getNodes()
+ .stream()
+ .allMatch(node -> {
+ ControllerNode.State state = clusterService.getState(node.id());
+ Version version = clusterService.getVersion(node.id());
+ return state.isActive() && version != null && version.equals(upgraded.source());
+ });
+
+ // If some nodes have not yet been downgraded, throw an exception.
+ if (!rollbackComplete) {
+ throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source());
+ }
+
+ // Set the upgrade status to RESETTING.
+ Upgrade resetting = new Upgrade(
+ upgraded.source(),
+ upgraded.target(),
+ Upgrade.Status.RESETTING);
+ if (!state.compareAndSet(upgraded, resetting)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(resetting);
+
+ // Set the upgrade status to RESET.
+ Upgrade reset = new Upgrade(
+ resetting.source(),
+ resetting.target(),
+ Upgrade.Status.RESET);
+ if (!state.compareAndSet(resetting, reset)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(reset);
+
+ // Set the upgrade status to INACTIVE.
+ Upgrade inactive = new Upgrade(
+ localVersion,
+ localVersion,
+ Upgrade.Status.INACTIVE);
+ if (!state.compareAndSet(reset, inactive)) {
+ throw new IllegalStateException("Concurrent upgrade modification");
+ } else {
+ currentState.set(inactive);
+ }
+ }
+ }
+ }
+
+ private void handleChange(AtomicValueEvent<Upgrade> event) {
+ currentState.set(event.newValue());
+ switch (event.newValue().status()) {
+ case INITIALIZED:
+ post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue()));
+ break;
+ case UPGRADED:
+ post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue()));
+ break;
+ case COMMITTED:
+ post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue()));
+ break;
+ case ROLLED_BACK:
+ post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue()));
+ break;
+ case RESET:
+ post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue()));
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/package-info.java b/core/net/src/main/java/org/onosproject/upgrade/impl/package-info.java
new file mode 100644
index 0000000..013c93a
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Software upgrade management.
+ */
+package org.onosproject.upgrade.impl;
diff --git a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
new file mode 100644
index 0000000..4137744
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.UnifiedClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionServiceAdapter;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncAtomicValueAdapter;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueAdapter;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.CoordinationServiceAdapter;
+import org.onosproject.upgrade.Upgrade;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Upgrade manager test.
+ */
+public class UpgradeManagerTest {
+
+ /**
+ * Creates a new upgrade manager to test.
+ *
+ * @param version the local node version
+ * @param state the initial upgrade state
+ * @param versions a list of controller node versions
+ * @return the activated upgrade manager
+ */
+ @SuppressWarnings("unchecked")
+ private UpgradeManager createUpgradeManager(Version version, Upgrade state, List<Version> versions) {
+ UpgradeManager upgradeManager = new UpgradeManager();
+ upgradeManager.clusterService = new UnifiedClusterServiceAdapter() {
+ @Override
+ public Set<ControllerNode> getNodes() {
+ AtomicInteger nodeCounter = new AtomicInteger();
+ return versions.stream()
+ .map(v -> {
+ int nodeId = nodeCounter.getAndIncrement();
+ return new DefaultControllerNode(
+ NodeId.nodeId(String.valueOf(nodeId)),
+ IpAddress.valueOf("127.0.0.1"),
+ nodeId);
+ })
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public ControllerNode.State getState(NodeId nodeId) {
+ return ControllerNode.State.READY;
+ }
+
+ @Override
+ public Version getVersion(NodeId nodeId) {
+ return versions.get(Integer.parseInt(nodeId.id()));
+ }
+ };
+
+ upgradeManager.versionService = new VersionServiceAdapter() {
+ @Override
+ public Version version() {
+ return version;
+ }
+ };
+
+ upgradeManager.coordinationService = new CoordinationServiceAdapter() {
+ @Override
+ public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+ return new AtomicValueBuilder<V>() {
+ @Override
+ public AsyncAtomicValue<V> build() {
+ return new AsyncAtomicValueAdapter() {
+ @Override
+ public AtomicValue asAtomicValue() {
+ return new AtomicValueAdapter() {
+ private Object value = state;
+
+ @Override
+ public void set(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public Object get() {
+ return value;
+ }
+
+ @Override
+ public boolean compareAndSet(Object expect, Object update) {
+ if ((value == null && expect == null)
+ || (value != null && value.equals(expect))) {
+ value = update;
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+ };
+ }
+ };
+ }
+ };
+
+ upgradeManager.activate();
+ return upgradeManager;
+ }
+
+ @Test
+ public void testFailedCommit() throws Exception {
+ UpgradeManager upgradeManager = createUpgradeManager(
+ Version.version("1.0.0"),
+ new Upgrade(Version.version("1.0.0"), Version.version("1.0.0"), Upgrade.Status.INACTIVE),
+ Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.1")));
+
+ assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+ assertTrue(upgradeManager.isLocalActive());
+ assertFalse(upgradeManager.isLocalUpgraded());
+
+ upgradeManager.initialize();
+
+ assertEquals(Upgrade.Status.INITIALIZED, upgradeManager.getState().status());
+ assertEquals(Version.version("1.0.0"), upgradeManager.getState().source());
+ assertEquals(Version.version("1.0.0"), upgradeManager.getState().target());
+ assertEquals(Version.version("1.0.0"), upgradeManager.getVersion());
+ assertTrue(upgradeManager.isLocalActive());
+ assertFalse(upgradeManager.isLocalUpgraded());
+
+ upgradeManager.upgrade();
+ assertEquals(Upgrade.Status.UPGRADED, upgradeManager.getState().status());
+
+ try {
+ upgradeManager.commit();
+ fail();
+ } catch (IllegalStateException e) {
+ }
+ }
+
+ @Test
+ public void testSuccessfulCommit() throws Exception {
+ UpgradeManager upgradeManager = createUpgradeManager(
+ Version.version("1.0.1"),
+ new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.UPGRADED),
+ Arrays.asList(Version.version("1.0.1"), Version.version("1.0.1"), Version.version("1.0.1")));
+
+ assertEquals(Upgrade.Status.UPGRADED, upgradeManager.getState().status());
+ assertTrue(upgradeManager.isLocalActive());
+ assertTrue(upgradeManager.isLocalUpgraded());
+
+ upgradeManager.commit();
+ assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+ }
+
+ @Test
+ public void testFailedReset() throws Exception {
+ UpgradeManager upgradeManager = createUpgradeManager(
+ Version.version("1.0.0"),
+ new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.INITIALIZED),
+ Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.1")));
+
+ assertEquals(Upgrade.Status.INITIALIZED, upgradeManager.getState().status());
+ assertEquals(Version.version("1.0.0"), upgradeManager.getState().source());
+ assertEquals(Version.version("1.0.1"), upgradeManager.getState().target());
+ assertEquals(Version.version("1.0.0"), upgradeManager.getVersion());
+ assertTrue(upgradeManager.isLocalActive());
+ assertFalse(upgradeManager.isLocalUpgraded());
+
+ upgradeManager.upgrade();
+ assertEquals(Upgrade.Status.UPGRADED, upgradeManager.getState().status());
+ assertEquals(Version.version("1.0.1"), upgradeManager.getVersion());
+
+ upgradeManager.rollback();
+ assertEquals(Upgrade.Status.ROLLED_BACK, upgradeManager.getState().status());
+
+ try {
+ upgradeManager.reset();
+ fail();
+ } catch (IllegalStateException e) {
+ }
+ }
+
+ @Test
+ public void testSuccessfulResetFromInitialized() throws Exception {
+ UpgradeManager upgradeManager = createUpgradeManager(
+ Version.version("1.0.0"),
+ new Upgrade(Version.version("1.0.0"), Version.version("1.0.0"), Upgrade.Status.INITIALIZED),
+ Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.0")));
+
+ assertEquals(Upgrade.Status.INITIALIZED, upgradeManager.getState().status());
+ assertTrue(upgradeManager.isLocalActive());
+ assertFalse(upgradeManager.isLocalUpgraded());
+
+ upgradeManager.reset();
+ assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+ }
+
+ @Test
+ public void testSuccessfulResetFromRolledBack() throws Exception {
+ UpgradeManager upgradeManager = createUpgradeManager(
+ Version.version("1.0.0"),
+ new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.ROLLED_BACK),
+ Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.0")));
+
+ assertEquals(Upgrade.Status.ROLLED_BACK, upgradeManager.getState().status());
+ assertTrue(upgradeManager.isLocalActive());
+ assertFalse(upgradeManager.isLocalUpgraded());
+
+ upgradeManager.reset();
+ assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 78d2d4e..2bb1ea3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,14 +15,12 @@
*/
package org.onosproject.store.cluster.impl;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
@@ -37,13 +35,21 @@
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.CoordinationService;
import org.onosproject.store.service.LeaderElector;
-import org.onosproject.store.service.StorageService;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
+import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
* primitive.
@@ -54,25 +60,41 @@
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
+ private static final char VERSION_SEP = '|';
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
+ protected CoordinationService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UpgradeService upgradeService;
private ExecutorService statusChangeHandler;
private NodeId localNodeId;
private LeaderElector leaderElector;
private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
+ private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
Leadership oldValue = change.oldValue();
Leadership newValue = change.newValue();
+
+ // If the topic is not relevant to this version, skip the event.
+ if (!isLocalTopic(newValue.topic())) {
+ return;
+ }
+
boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
+
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
@@ -83,7 +105,10 @@
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
- notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+ notifyDelegate(new LeadershipEvent(eventType, new Leadership(
+ parseTopic(change.newValue().topic()),
+ change.newValue().leader(),
+ change.newValue().candidates())));
// Update local cache of currently held leaderships
if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
localLeaderCache.put(newValue.topic(), newValue);
@@ -101,18 +126,27 @@
// Service Restored
localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
leaderElector.getLeaderships().forEach((topic, leadership) ->
- notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+ notifyDelegate(new LeadershipEvent(
+ LeadershipEvent.Type.SERVICE_RESTORED,
+ new Leadership(
+ parseTopic(leadership.topic()),
+ leadership.leader(),
+ leadership.candidates()))));
} else if (status == Status.SUSPENDED) {
// Service Suspended
localLeaderCache.forEach((topic, leadership) ->
- notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+ notifyDelegate(new LeadershipEvent(
+ LeadershipEvent.Type.SERVICE_DISRUPTED,
+ new Leadership(
+ parseTopic(leadership.topic()),
+ leadership.leader(),
+ leadership.candidates()))));
} else {
// Should be only inactive state
return;
}
}
-
@Activate
public void activate() {
statusChangeHandler = Executors.newSingleThreadExecutor(
@@ -124,6 +158,7 @@
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
leaderElector.addStatusChangeListener(clientStatusListener);
+ upgradeService.addListener(upgradeListener);
log.info("Started");
}
@@ -131,18 +166,20 @@
public void deactivate() {
leaderElector.removeChangeListener(leadershipChangeListener);
leaderElector.removeStatusChangeListener(clientStatusListener);
+ upgradeService.removeListener(upgradeListener);
statusChangeHandler.shutdown();
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
- return leaderElector.run(topic, localNodeId);
+ leaderElector.run(getLocalTopic(topic), localNodeId);
+ return getLeadership(topic);
}
@Override
public void removeRegistration(String topic) {
- leaderElector.withdraw(topic);
+ leaderElector.withdraw(getLocalTopic(topic));
}
@Override
@@ -152,21 +189,108 @@
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
- return leaderElector.anoint(topic, toNodeId);
+ return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
- return leaderElector.promote(topic, nodeId);
+ return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
}
@Override
public Leadership getLeadership(String topic) {
- return leaderElector.getLeadership(topic);
+ Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
+ return leadership != null ? new Leadership(
+ parseTopic(leadership.topic()),
+ leadership.leader(),
+ leadership.candidates()) : null;
}
@Override
public Map<String, Leadership> getLeaderships() {
- return leaderElector.getLeaderships();
+ Map<String, Leadership> leaderships = leaderElector.getLeaderships();
+ return leaderships.entrySet().stream()
+ .filter(e -> isActiveTopic(e.getKey()))
+ .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
+ e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
+ }
+
+ /**
+ * Returns a leader elector topic namespaced with the local node's version.
+ *
+ * @param topic the base topic
+ * @return a topic string namespaced with the local node's version
+ */
+ private String getLocalTopic(String topic) {
+ return topic + VERSION_SEP + versionService.version();
+ }
+
+ /**
+ * Returns a leader elector topic namespaced with the current cluster version.
+ *
+ * @param topic the base topic
+ * @return a topic string namespaced with the current cluster version
+ */
+ private String getActiveTopic(String topic) {
+ return topic + VERSION_SEP + upgradeService.getVersion();
+ }
+
+ /**
+ * Returns whether the given topic is a topic for the local version.
+ *
+ * @param topic the topic to check
+ * @return whether the given topic is relevant to the local version
+ */
+ private boolean isLocalTopic(String topic) {
+ return topic.endsWith(versionService.version().toString());
+ }
+
+ /**
+ * Returns whether the given topic is a topic for the current cluster version.
+ *
+ * @param topic the topic to check
+ * @return whether the given topic is relevant to the current cluster version
+ */
+ private boolean isActiveTopic(String topic) {
+ return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
+ }
+
+ /**
+ * Parses a topic string, returning the base topic.
+ *
+ * @param topic the topic string to parse
+ * @return the base topic string
+ */
+ private String parseTopic(String topic) {
+ return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
+ }
+
+ /**
+ * Returns the versioned topic for the given node.
+ *
+ * @param topic the topic for the given node
+ * @param nodeId the node for which to return the namespaced topic
+ * @return the versioned topic for the given node
+ */
+ private String getTopicFor(String topic, NodeId nodeId) {
+ Version nodeVersion = clusterService.getVersion(nodeId);
+ return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
+ }
+
+ /**
+ * Internal upgrade event listener.
+ */
+ private class InternalUpgradeEventListener implements UpgradeEventListener {
+ @Override
+ public void event(UpgradeEvent event) {
+ if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
+ // Iterate through all current leaderships for the new version and trigger events.
+ for (Leadership leadership : getLeaderships().values()) {
+ notifyDelegate(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
+ leadership));
+ }
+ }
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
new file mode 100644
index 0000000..aa96131
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
+
+@Component(componentAbstract = true)
+public abstract class AbstractClusterCommunicationManager
+ implements ClusterCommunicationService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+ private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+ private static final String PRIMITIVE_NAME = "clusterCommunication";
+ private static final String SUBJECT_PREFIX = "subject";
+ private static final String ENDPOINT_PREFIX = "endpoint";
+
+ private static final String SERIALIZING = "serialization";
+ private static final String DESERIALIZING = "deserialization";
+ private static final String NODE_PREFIX = "node:";
+ private static final String ROUND_TRIP_SUFFIX = ".rtt";
+ private static final String ONE_WAY_SUFFIX = ".oneway";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
+
+ private NodeId localNodeId;
+
+ /**
+ * Returns the type for the given message subject.
+ *
+ * @param subject the type for the given message subject
+ * @return the message subject
+ */
+ protected abstract String getType(MessageSubject subject);
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+ ).getBytes();
+ return doUnicast(subject, payload, toNodeId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ checkPermission(CLUSTER_WRITE);
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+ .getBytes();
+ nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+ apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+ thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
+ return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e));
+ }
+
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context epContext = endpointMeteringAgent.
+ startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+ MeteringAgent.Context subjectContext = subjectMeteringAgent.
+ startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+ return messagingService.sendAndReceive(nodeEp, getType(subject), payload).
+ whenComplete((bytes, throwable) -> {
+ subjectContext.stop(throwable);
+ epContext.stop(throwable);
+ });
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber,
+ ExecutorService executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalClusterMessageHandler(subscriber),
+ executor);
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.unregisterHandler(getType(subject));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalMessageResponder<>(decoder, encoder, handler));
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(getType(subject),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ /**
+ * Performs the timed function, returning the value it would while timing the operation.
+ *
+ * @param timedFunction the function to be timed
+ * @param meter the metering agent to be used to time the function
+ * @param opName the opname to be used when starting the meter
+ * @param <A> The param type of the function
+ * @param <B> The return type of the function
+ * @return the value returned by the timed function
+ */
+ private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+ MeteringAgent meter, String opName) {
+ checkNotNull(timedFunction);
+ checkNotNull(meter);
+ checkNotNull(opName);
+ return new Function<A, B>() {
+ @Override
+ public B apply(A a) {
+ final MeteringAgent.Context context = meter.startTimer(opName);
+ B result = null;
+ try {
+ result = timedFunction.apply(a);
+ context.stop(null);
+ return result;
+ } catch (Exception e) {
+ context.stop(e);
+ Throwables.propagate(e);
+ return null;
+ }
+ }
+ };
+ }
+
+
+ private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
+ private ClusterMessageHandler handler;
+
+ public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public byte[] apply(Endpoint sender, byte[] bytes) {
+ ClusterMessage message = ClusterMessage.fromBytes(bytes);
+ handler.handle(message);
+ return message.response();
+ }
+ }
+
+ private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, CompletableFuture<R>> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, CompletableFuture<R>> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
+ return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload())).
+ thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(Endpoint sender, byte[] bytes) {
+ consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload()));
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1bdac37..4602702 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,328 +15,24 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.base.Throwables;
-import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.core.VersionService;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.utils.MeteringAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(immediate = true)
@Service
-public class ClusterCommunicationManager
- implements ClusterCommunicationService {
+public class ClusterCommunicationManager extends AbstractClusterCommunicationManager {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
- private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
-
- private static final String PRIMITIVE_NAME = "clusterCommunication";
- private static final String SUBJECT_PREFIX = "subject";
- private static final String ENDPOINT_PREFIX = "endpoint";
-
- private static final String SERIALIZING = "serialization";
- private static final String DESERIALIZING = "deserialization";
- private static final String NODE_PREFIX = "node:";
- private static final String ROUND_TRIP_SUFFIX = ".rtt";
- private static final String ONE_WAY_SUFFIX = ".oneway";
+ private static final char VERSION_SEP = '-';
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
-
- private NodeId localNodeId;
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
+ private VersionService versionService;
@Override
- public <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
- ).getBytes();
- return doUnicast(subject, payload, toNodeId);
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- @Override
- public <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodes) {
- checkPermission(CLUSTER_WRITE);
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
- .getBytes();
- nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
- }
-
- @Override
- public <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
- apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).
- thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
- return messagingService.sendAsync(nodeEp, subject.value(), payload).whenComplete((r, e) -> context.stop(e));
- }
-
- private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context epContext = endpointMeteringAgent.
- startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
- MeteringAgent.Context subjectContext = subjectMeteringAgent.
- startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
- return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
- whenComplete((bytes, throwable) -> {
- subjectContext.stop(throwable);
- epContext.stop(throwable);
- });
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber,
- ExecutorService executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalClusterMessageHandler(subscriber),
- executor);
- }
-
- @Override
- public void removeSubscriber(MessageSubject subject) {
- checkPermission(CLUSTER_WRITE);
- messagingService.unregisterHandler(subject.value());
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<M, R>(decoder, encoder, m -> {
- CompletableFuture<R> responseFuture = new CompletableFuture<>();
- executor.execute(() -> {
- try {
- responseFuture.complete(handler.apply(m));
- } catch (Exception e) {
- responseFuture.completeExceptionally(e);
- }
- });
- return responseFuture;
- }));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalMessageResponder<>(decoder, encoder, handler));
- }
-
- @Override
- public <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(subject.value(),
- new InternalMessageConsumer<>(decoder, handler),
- executor);
- }
-
- /**
- * Performs the timed function, returning the value it would while timing the operation.
- *
- * @param timedFunction the function to be timed
- * @param meter the metering agent to be used to time the function
- * @param opName the opname to be used when starting the meter
- * @param <A> The param type of the function
- * @param <B> The return type of the function
- * @return the value returned by the timed function
- */
- private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
- MeteringAgent meter, String opName) {
- checkNotNull(timedFunction);
- checkNotNull(meter);
- checkNotNull(opName);
- return new Function<A, B>() {
- @Override
- public B apply(A a) {
- final MeteringAgent.Context context = meter.startTimer(opName);
- B result = null;
- try {
- result = timedFunction.apply(a);
- context.stop(null);
- return result;
- } catch (Exception e) {
- context.stop(e);
- Throwables.propagate(e);
- return null;
- }
- }
- };
- }
-
-
- private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
- private ClusterMessageHandler handler;
-
- public InternalClusterMessageHandler(ClusterMessageHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public byte[] apply(Endpoint sender, byte[] bytes) {
- ClusterMessage message = ClusterMessage.fromBytes(bytes);
- handler.handle(message);
- return message.response();
- }
- }
-
- private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
- private final Function<byte[], M> decoder;
- private final Function<R, byte[]> encoder;
- private final Function<M, CompletableFuture<R>> handler;
-
- public InternalMessageResponder(Function<byte[], M> decoder,
- Function<R, byte[]> encoder,
- Function<M, CompletableFuture<R>> handler) {
- this.decoder = decoder;
- this.encoder = encoder;
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
- return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload())).
- thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
- }
- }
-
- private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
- private final Function<byte[], M> decoder;
- private final Consumer<M> consumer;
-
- public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
- this.decoder = decoder;
- this.consumer = consumer;
- }
-
- @Override
- public void accept(Endpoint sender, byte[] bytes) {
- consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload()));
- }
+ protected String getType(MessageSubject subject) {
+ return subject.value() + VERSION_SEP + versionService.version().toString();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
new file mode 100644
index 0000000..45c84af
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+@Component(immediate = true)
+@Service
+public class UnifiedClusterCommunicationManager
+ extends AbstractClusterCommunicationManager
+ implements UnifiedClusterCommunicationService {
+ @Override
+ protected String getType(MessageSubject subject) {
+ return subject.value();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 136df56..5d9f453 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -60,6 +60,7 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
+import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
import com.google.common.base.Objects;
@@ -90,6 +91,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UpgradeService upgradeService;
+
private NodeId localNodeId;
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
@@ -155,8 +159,12 @@
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
- return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
- ? MastershipRole.MASTER : MastershipRole.STANDBY);
+ NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+ List<NodeId> candidates = leadership == null ?
+ ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ MastershipRole role = Objects.equal(localNodeId, leader) ?
+ MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
+ return CompletableFuture.completedFuture(role);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
new file mode 100644
index 0000000..29045a2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.DocumentTreeBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
+import org.slf4j.Logger;
+
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
+ * in the cluster regardless of version.
+ */
+@Service
+@Component(immediate = true)
+public class CoordinationManager implements CoordinationService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UnifiedClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PersistenceService persistenceService;
+
+ private StoragePartition partition;
+ private DistributedPrimitiveCreator primitiveCreator;
+
+ @Activate
+ public void activate() {
+ partition = new StoragePartition(
+ new DefaultPartition(
+ PartitionId.SHARED,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet())),
+ null,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") + "/partitions/coordination"));
+ partition.open().join();
+ primitiveCreator = partition.client();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+ clusterCommunicator,
+ persistenceService);
+ }
+
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultConsistentMapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDocumentTreeBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+ return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
+ }
+
+ @Override
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+ }
+
+ @Override
+ public AtomicCounterBuilder atomicCounterBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicCounterBuilder(primitiveCreator);
+ }
+
+ @Override
+ public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
+ }
+
+ @Override
+ public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+ checkPermission(STORAGE_WRITE);
+ Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
+ () -> this.<String, byte[]>consistentMapBuilder()
+ .withName("onos-atomic-values")
+ .withSerializer(Serializer.using(KryoNamespaces.BASIC));
+ return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
+ }
+
+ @Override
+ public TransactionContextBuilder transactionContextBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LeaderElectorBuilder leaderElectorBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new DefaultLeaderElectorBuilder(primitiveCreator);
+ }
+
+ @Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newWorkQueue(name, serializer);
+ }
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newAsyncDocumentTree(name, serializer);
+ }
+
+ @Override
+ public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
+ String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newAsyncConsistentSetMultimap(name,
+ serializer);
+ }
+
+ @Override
+ public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
+ String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
+ }
+
+ @Override
+ public <T> Topic<T> getTopic(String name, Serializer serializer) {
+ AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
+ .withName("topic-" + name)
+ .withSerializer(serializer)
+ .build();
+ return new DefaultDistributedTopic<>(atomicValue);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index d15451e..1bfaaed 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -16,11 +16,11 @@
package org.onosproject.store.primitives.impl;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
@@ -38,8 +38,8 @@
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
- private final ClusterService clusterService;
- private final ClusterCommunicationService clusterCommunicator;
+ private final MembershipService clusterService;
+ private final ClusterCommunicator clusterCommunicator;
private String name;
private KryoNamespace serializer;
@@ -64,8 +64,8 @@
* @param clusterCommunicator cluster communication service
* @param persistenceService persistence service
*/
- public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
+ public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
+ ClusterCommunicator clusterCommunicator,
PersistenceService persistenceService) {
this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index bed19d5..461245e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,13 +25,13 @@
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
@@ -86,8 +86,8 @@
private final Map<K, MapValue<V>> items;
- private final ClusterService clusterService;
- private final ClusterCommunicationService clusterCommunicator;
+ private final MembershipService clusterService;
+ private final ClusterCommunicator clusterCommunicator;
private final Serializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
* @param persistenceService persistence service
*/
EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
+ MembershipService clusterService,
+ ClusterCommunicator clusterCommunicator,
KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 500b75c..4a92682 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -42,8 +42,10 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
@@ -51,6 +53,7 @@
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -68,7 +71,7 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
+ protected UnifiedClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService metadataService;
@@ -76,7 +79,14 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected UpgradeService upgradeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
+ private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
+ private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@@ -85,17 +95,58 @@
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
currentClusterMetadata.set(metadataService.getClusterMetadata());
metadataService.addListener(metadataListener);
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
- clusterCommunicator,
- clusterService,
- new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
- CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
- .stream()
- .map(StoragePartition::open)
- .toArray(CompletableFuture[]::new));
+ // If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
+ CompletableFuture<Void> openFuture;
+ if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
+ Version sourceVersion = upgradeService.getState().source();
+ Version targetVersion = upgradeService.getState().target();
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ sourceVersion,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + sourceVersion + "/" + partition.getId()))));
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ targetVersion,
+ sourceVersion,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + targetVersion + "/" + partition.getId()))));
+
+ // We have to fork existing partitions before we can start inactive partition servers to
+ // avoid duplicate message handlers when both servers are running.
+ openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new))
+ .thenCompose(v -> CompletableFuture.allOf(inactivePartitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new)));
+ } else {
+ Version version = versionService.version();
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ version,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + version + "/" + partition.getId()))));
+ openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new));
+ }
+
openFuture.join();
log.info("Started");
}
@@ -105,10 +156,13 @@
metadataService.removeListener(metadataListener);
eventDispatcher.removeSink(PartitionEvent.class);
- CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
- .stream()
- .map(StoragePartition::close)
- .toArray(CompletableFuture[]::new));
+ CompletableFuture<Void> closeFuture = CompletableFuture.allOf(
+ CompletableFuture.allOf(inactivePartitions.values().stream()
+ .map(StoragePartition::close)
+ .toArray(CompletableFuture[]::new)),
+ CompletableFuture.allOf(activePartitions.values().stream()
+ .map(StoragePartition::close)
+ .toArray(CompletableFuture[]::new)));
closeFuture.join();
log.info("Stopped");
}
@@ -116,25 +170,25 @@
@Override
public int getNumberOfPartitions() {
checkPermission(PARTITION_READ);
- return partitions.size();
+ return activePartitions.size();
}
@Override
public Set<PartitionId> getAllPartitionIds() {
checkPermission(PARTITION_READ);
- return partitions.keySet();
+ return activePartitions.keySet();
}
@Override
public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
checkPermission(PARTITION_READ);
- return partitions.get(partitionId).client();
+ return activePartitions.get(partitionId).client();
}
@Override
public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
checkPermission(PARTITION_READ);
- StoragePartition partition = partitions.get(partitionId);
+ StoragePartition partition = activePartitions.get(partitionId);
return ImmutableSet.copyOf(partition.getMembers());
}
@@ -148,7 +202,7 @@
@Override
public List<PartitionInfo> partitionInfo() {
- return partitions.values()
+ return activePartitions.values()
.stream()
.flatMap(x -> Tools.stream(x.info()))
.collect(Collectors.toList());
@@ -161,7 +215,7 @@
.values()
.stream()
.filter(PartitionDiff::hasChanged)
- .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+ .forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
}
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@@ -173,7 +227,7 @@
@Override
public List<PartitionClientInfo> partitionClientInfo() {
- return partitions.values()
+ return activePartitions.values()
.stream()
.map(StoragePartition::client)
.map(StoragePartitionClient::clientInfo)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..8bae2a3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,8 +40,7 @@
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@@ -50,10 +49,10 @@
public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol {
public RaftClientCommunicator(
- PartitionId partitionId,
+ String prefix,
Serializer serializer,
- ClusterCommunicationService clusterCommunicator) {
- super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+ ClusterCommunicator clusterCommunicator) {
+ super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 1117ab9..765eb02 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.cluster.MemberId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
public abstract class RaftCommunicator {
protected final RaftMessageContext context;
protected final Serializer serializer;
- protected final ClusterCommunicationService clusterCommunicator;
+ protected final ClusterCommunicator clusterCommunicator;
public RaftCommunicator(
RaftMessageContext context,
Serializer serializer,
- ClusterCommunicationService clusterCommunicator) {
+ ClusterCommunicator clusterCommunicator) {
this.context = checkNotNull(context, "context cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..2710a2c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -56,8 +56,8 @@
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@@ -66,10 +66,10 @@
public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
public RaftServerCommunicator(
- PartitionId partitionId,
+ String prefix,
Serializer serializer,
- ClusterCommunicationService clusterCommunicator) {
- super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+ ClusterCommunicator clusterCommunicator) {
+ super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 822641c..59c4b17 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,10 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
-import static org.slf4j.LoggerFactory.getLogger;
-
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -26,25 +22,26 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicCounterMapBuilder;
import org.onosproject.store.service.AtomicIdGeneratorBuilder;
@@ -68,7 +65,9 @@
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
-import com.google.common.collect.Maps;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -82,10 +81,10 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
+ protected UnifiedClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
+ protected UnifiedClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
@@ -105,7 +104,7 @@
public void activate() {
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
- .filter(id -> !id.equals(PartitionId.from(0)))
+ .filter(id -> !id.equals(PartitionId.SHARED))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
transactionManager = new TransactionManager(this, partitionService, BUCKETS);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 842e047..414c49c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,11 +29,12 @@
import com.google.common.collect.ImmutableMap;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -53,8 +54,11 @@
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final ClusterCommunicationService clusterCommunicator;
- private final File logFolder;
+ private final UnifiedClusterCommunicationService clusterCommunicator;
+ private final MembershipService clusterService;
+ private final Version version;
+ private final Version source;
+ private final File dataFolder;
private Partition partition;
private NodeId localNodeId;
private StoragePartitionServer server;
@@ -77,14 +81,20 @@
() -> new AtomixDocumentTreeService(Ordering.INSERTION))
.build();
- public StoragePartition(Partition partition,
- ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService,
- File logFolder) {
+ public StoragePartition(
+ Partition partition,
+ Version version,
+ Version source,
+ UnifiedClusterCommunicationService clusterCommunicator,
+ MembershipService clusterService,
+ File dataFolder) {
this.partition = partition;
+ this.version = version;
+ this.source = source;
this.clusterCommunicator = clusterCommunicator;
+ this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
- this.logFolder = logFolder;
+ this.dataFolder = dataFolder;
}
/**
@@ -97,7 +107,12 @@
@Override
public CompletableFuture<Void> open() {
- if (partition.getMembers().contains(localNodeId)) {
+ if (source != null) {
+ return forkServer(source)
+ .thenCompose(v -> openClient())
+ .thenAccept(v -> isOpened.set(true))
+ .thenApply(v -> null);
+ } else if (partition.getMembers().contains(localNodeId)) {
return openServer()
.thenCompose(v -> openClient())
.thenAccept(v -> isOpened.set(true))
@@ -116,6 +131,43 @@
}
/**
+ * Returns the partition name.
+ *
+ * @return the partition name
+ */
+ public String getName() {
+ return getName(version);
+ }
+
+ /**
+ * Returns the partition name for the given version.
+ *
+ * @param version the version for which to return the partition name
+ * @return the partition name for the given version
+ */
+ String getName(Version version) {
+ return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
+ }
+
+ /**
+ * Returns the partition version.
+ *
+ * @return the partition version
+ */
+ public Version getVersion() {
+ return version;
+ }
+
+ /**
+ * Returns the partition data folder.
+ *
+ * @return the partition data folder
+ */
+ public File getDataFolder() {
+ return dataFolder;
+ }
+
+ /**
* Returns the identifier of the {@link Partition partition} associated with this instance.
* @return partition identifier
*/
@@ -136,7 +188,23 @@
* @return partition member identifiers
*/
public Collection<MemberId> getMemberIds() {
- return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+ return source != null ?
+ clusterService.getNodes()
+ .stream()
+ .map(node -> MemberId.from(node.id().id()))
+ .collect(Collectors.toList()) :
+ Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+ }
+
+ Collection<MemberId> getMemberIds(Version version) {
+ if (source == null || version.equals(source)) {
+ return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+ } else {
+ return clusterService.getNodes()
+ .stream()
+ .map(node -> MemberId.from(node.id().id()))
+ .collect(Collectors.toList());
+ }
}
/**
@@ -144,20 +212,37 @@
* @return future that is completed after the operation is complete
*/
private CompletableFuture<Void> openServer() {
- if (!partition.getMembers().contains(localNodeId) || server != null) {
- return CompletableFuture.completedFuture(null);
- }
- StoragePartitionServer server = new StoragePartitionServer(this,
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
MemberId.from(localNodeId.id()),
- () -> new RaftServerCommunicator(
- partition.getId(),
- Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
- clusterCommunicator),
- logFolder);
+ clusterCommunicator);
return server.open().thenRun(() -> this.server = server);
}
/**
+ * Forks the server from the given version.
+ *
+ * @return future to be completed once the server has been forked
+ */
+ private CompletableFuture<Void> forkServer(Version version) {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+
+ CompletableFuture<Void> future;
+ if (clusterService.getNodes().size() == 1) {
+ future = server.fork(version);
+ } else {
+ future = server.join(clusterService.getNodes().stream()
+ .filter(node -> !node.id().equals(localNodeId))
+ .map(node -> MemberId.from(node.id().id()))
+ .collect(Collectors.toList()));
+ }
+ return future.thenRun(() -> this.server = server);
+ }
+
+ /**
* Attempts to join the partition as a new member.
* @return future that is completed after the operation is complete
*/
@@ -168,11 +253,7 @@
.collect(Collectors.toSet());
StoragePartitionServer server = new StoragePartitionServer(this,
MemberId.from(localNodeId.id()),
- () -> new RaftServerCommunicator(
- partition.getId(),
- Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
- clusterCommunicator),
- logFolder);
+ clusterCommunicator);
return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
.thenRun(() -> this.server = server);
}
@@ -181,7 +262,7 @@
client = new StoragePartitionClient(this,
MemberId.from(localNodeId.id()),
new RaftClientCommunicator(
- partition.getId(),
+ getName(),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator));
return client.open().thenApply(v -> client);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..96e5140 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -92,7 +92,6 @@
log.info("Failed to start client for partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
-
}
@Override
@@ -316,7 +315,7 @@
private RaftClient newRaftClient(RaftClientProtocol protocol) {
return RaftClient.newBuilder()
.withClientId("partition-" + partition.getId())
- .withMemberId(MemberId.from(localMemberId.id()))
+ .withMemberId(localMemberId)
.withProtocol(protocol)
.build();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 04c0bdf..1bed89d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -99,7 +99,7 @@
public PartitionInfo toPartitionInfo() {
Function<RaftMember, String> memberToString =
m -> m == null ? "none" : m.memberId().toString();
- return new PartitionInfo(partitionId.toString(),
+ return new PartitionInfo(partitionId,
leaderTerm,
activeMembers.stream().map(memberToString).collect(Collectors.toList()),
memberToString.apply(leader));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..123c3b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -16,16 +16,19 @@
package org.onosproject.store.primitives.impl;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@@ -46,23 +49,21 @@
private final MemberId localMemberId;
private final StoragePartition partition;
- private final Supplier<RaftServerProtocol> protocol;
- private final File dataFolder;
+ private final UnifiedClusterCommunicationService clusterCommunicator;
private RaftServer server;
public StoragePartitionServer(
StoragePartition partition,
MemberId localMemberId,
- Supplier<RaftServerProtocol> protocol,
- File dataFolder) {
+ UnifiedClusterCommunicationService clusterCommunicator) {
this.partition = partition;
this.localMemberId = localMemberId;
- this.protocol = protocol;
- this.dataFolder = dataFolder;
+ this.clusterCommunicator = clusterCommunicator;
}
@Override
public CompletableFuture<Void> open() {
+ log.info("Starting server for partition {} ({})", partition.getId(), partition.getVersion());
CompletableFuture<RaftServer> serverOpenFuture;
if (partition.getMemberIds().contains(localMemberId)) {
if (server != null && server.isRunning()) {
@@ -77,9 +78,11 @@
}
return serverOpenFuture.whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully started server for partition {}", partition.getId());
+ log.info("Successfully started server for partition {} ({})",
+ partition.getId(), partition.getVersion());
} else {
- log.info("Failed to start server for partition {}", partition.getId(), e);
+ log.info("Failed to start server for partition {} ({})",
+ partition.getId(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
@@ -97,16 +100,68 @@
return server.leave();
}
- private RaftServer buildServer() {
+ /**
+ * Forks the existing partition into a new partition.
+ *
+ * @param version the version from which to fork the server
+ * @return future to be completed once the fork operation is complete
+ */
+ public CompletableFuture<Void> fork(Version version) {
+ log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName("partition-" + partition.getId())
- .withProtocol(protocol.get())
+ .withName(partition.getName(version))
+ .withType(RaftMember.Type.PASSIVE)
+ .withProtocol(new RaftServerCommunicator(
+ partition.getName(version),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
- .withDirectory(dataFolder)
+ .withDirectory(partition.getDataFolder())
+ .withMaxSegmentSize(MAX_SEGMENT_SIZE)
+ .build());
+ StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+ RaftServer server = builder.build();
+ return server.join(partition.getMemberIds(version))
+ .thenCompose(v -> server.shutdown())
+ .thenCompose(v -> {
+ // Delete the cluster configuration file from the forked partition.
+ try {
+ Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+ } catch (IOException e) {
+ log.error("Failed to delete partition configuration: {}", e);
+ }
+
+ // Build and bootstrap a new server.
+ this.server = buildServer();
+ return this.server.bootstrap();
+ }).whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully forked server for partition {} ({}->{})",
+ partition.getId(), version, partition.getVersion());
+ } else {
+ log.info("Failed to fork server for partition {} ({}->{})",
+ partition.getId(), version, partition.getVersion(), e);
+ }
+ }).thenApply(v -> null);
+ }
+
+ private RaftServer buildServer() {
+ RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
+ .withName(partition.getName())
+ .withProtocol(new RaftServerCommunicator(
+ partition.getName(),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator))
+ .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
+ .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
+ .withStorage(RaftStorage.newBuilder()
+ .withStorageLevel(StorageLevel.MAPPED)
+ .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
+ .withDirectory(partition.getDataFolder())
.withMaxSegmentSize(MAX_SEGMENT_SIZE)
.build());
StoragePartition.RAFT_SERVICES.forEach(builder::addService);
@@ -114,12 +169,13 @@
}
public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
+ log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully joined partition {}", partition.getId());
+ log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
} else {
- log.info("Failed to join partition {}", partition.getId(), e);
+ log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 6ed8ec3..be24d00 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -98,7 +98,6 @@
import org.junit.Before;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.impl.RaftClientCommunicator;
import org.onosproject.store.primitives.impl.RaftServerCommunicator;
import org.onosproject.store.service.Serializer;
@@ -382,7 +381,7 @@
RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
.withType(member.getType())
.withProtocol(new RaftServerCommunicator(
- PartitionId.from(1),
+ "partition-1",
PROTOCOL_SERIALIZER,
communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
.withStorage(RaftStorage.newBuilder()
@@ -406,7 +405,7 @@
RaftClient client = RaftClient.newBuilder()
.withMemberId(memberId)
.withProtocol(new RaftClientCommunicator(
- PartitionId.from(1),
+ "partition-1",
PROTOCOL_SERIALIZER,
communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
.build();
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 2de7dcc..d5deae1 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -250,6 +250,7 @@
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
import org.onosproject.ui.model.topo.UiTopoLayoutId;
+import org.onosproject.upgrade.Upgrade;
import java.net.URI;
import java.time.Duration;
@@ -627,6 +628,8 @@
PiCriterion.class,
PiInstruction.class
)
+ .register(Upgrade.class)
+ .register(Upgrade.Status.class)
.build("API");
/**
diff --git a/tools/dev/bash_profile b/tools/dev/bash_profile
index 3a3c3c6..991cfaf 100644
--- a/tools/dev/bash_profile
+++ b/tools/dev/bash_profile
@@ -174,6 +174,25 @@
echo $OCI
}
+# Sets minority (OCMI) and majority (OCMA) variables
+function setMinorityMajority {
+ nodes=($(env | grep 'OC[0-9]*=' | sort | cut -d= -f2))
+ middle=$(expr "${#nodes[@]}" / "2")
+ index=0
+ min=1
+ maj=1
+ for node in "${nodes[@]}"; do
+ if [ "$index" -gt "$middle" ]; then
+ export OCMI${min}=${node}
+ min=$(expr $min + 1)
+ else
+ export OCMA${maj}=${node}
+ maj=$(expr $maj + 1)
+ fi
+ index=$(expr $index + 1)
+ done
+}
+
# ON.Lab shared test cell warden address
export CELL_WARDEN="10.254.1.19"
export CELL_SLAVES="$CELL_WARDEN 10.254.1.18 10.254.1.17"
@@ -184,6 +203,8 @@
unset OCI OCN OCT ONOS_INSTANCES ONOS_FEATURES
unset ONOS_USER ONOS_GROUP ONOS_WEB_USER ONOS_WEB_PASS
unset $(env | sed -n 's:\(^OC[0-9]\{1,\}\)=.*:\1 :g p')
+ unset $(env | sed -n 's:\(^OCMI[0-9]\{1,\}\)=.*:\1 :g p')
+ unset $(env | sed -n 's:\(^OCMA[0-9]\{1,\}\)=.*:\1 :g p')
}
# Applies the settings in the specified cell file or lists current cell definition
@@ -204,6 +225,7 @@
. $aux
rm -f $aux
setPrimaryInstance 1 >/dev/null
+ setMinorityMajority >/dev/null
onos-verify-cell
topo default >/dev/null
;;
diff --git a/tools/test/bin/onos-install b/tools/test/bin/onos-install
index cc00da0..65b7a42 100755
--- a/tools/test/bin/onos-install
+++ b/tools/test/bin/onos-install
@@ -36,7 +36,7 @@
onos-check-bits
-while getopts fnm: o; do
+while getopts fnvm: o; do
case "$o" in
f) uninstall=true;;
u) noupstart=true; noinitd=true; nosysd=true;;
@@ -44,6 +44,7 @@
s) nosysd=true;;
n) nostart=true;;
m) mvn_settings=$OPTARG;;
+ v) upgrade=true;;
esac
done
let OPC=$OPTIND-1
@@ -60,7 +61,7 @@
[ ! -z "$mvn_settings" ] && scp -q $mvn_settings $remote:/tmp/settings.xml
ssh -tt $remote "
- [ -d $ONOS_INSTALL_DIR/bin ] && echo \"ONOS is already installed\" && exit 1
+ [ -z "$upgrade" ] && [ -d $ONOS_INSTALL_DIR/bin ] && echo \"ONOS is already installed\" && exit 1
# Prepare a landing zone and unroll the bits
sudo mkdir -p $ONOS_INSTALL_DIR && sudo chown ${ONOS_USER}:${ONOS_GROUP} $ONOS_INSTALL_DIR
@@ -81,6 +82,12 @@
# Set up correct user to run onos-service
echo 'export ONOS_USER=$ONOS_USER' >> $ONOS_INSTALL_DIR/options
+ # If the upgrade flag is set, append ".upgrade" to the version string.
+ if [ ! -z "$upgrade" ]
+ then
+ echo '.upgrade' >> $ONOS_INSTALL_DIR/VERSION
+ fi
+
# Remove any previous ON.Lab bits from ~/.m2 repo.
rm -fr ~/.m2/repository/org/onosproject
diff --git a/tools/test/scenarios/upgrade-rollback.xml b/tools/test/scenarios/upgrade-rollback.xml
new file mode 100644
index 0000000..4e58829
--- /dev/null
+++ b/tools/test/scenarios/upgrade-rollback.xml
@@ -0,0 +1,125 @@
+<!--
+ ~ Copyright 2017-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<scenario name="upgrade-rollback" description="ONOS cluster upgrade and rollback">
+ <group name="Upgrade-Rollback">
+ <step name="Push-Bits" exec="onos-push-bits-through-proxy" if="${OCT}"/>
+
+ <step name="Initialize-Upgrade"
+ exec="onos ${OC1} issu init"/>
+
+ <group name="Phase-1">
+ <sequential var="${OCMI#}"
+ starts="Phase-One-Stop-Service-${#}"
+ ends="Phase-One-Wait-for-Start-${#-1}">
+ <step name="Phase-One-Stop-Service-${#}"
+ exec="onos-service ${OCMI#} stop"
+ requires="Initialize-Upgrade"/>
+
+ <step name="Phase-One-Wait-for-Stop-${#}"
+ exec="onos-wait-for-stop ${OCMI#}"
+ requires="~Phase-One-Stop-Service-${#}"/>
+
+ <step name="Phase-One-Uninstall-${#}"
+ exec="onos-uninstall ${OCMI#}"
+ requires="~Phase-One-Wait-for-Stop-${#}"/>
+
+ <step name="Phase-One-Push-Bits-${#}"
+ exec="onos-push-bits ${OCMI#}"
+ unless="${OCT}"
+ requires="~Phase-One-Stop-Service-${#}"/>
+
+ <step name="Phase-One-Install-Upgrade-${#}"
+ exec="onos-install -v ${OCMI#}"
+ requires="Phase-One-Push-Bits-${#},Push-Bits,Phase-One-Uninstall-${#}"/>
+
+ <step name="Phase-One-Secure-SSH-${#}"
+ exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMI#}"
+ requires="~Phase-One-Install-Upgrade-${#}"/>
+
+ <step name="Phase-One-Wait-for-Start-${#}"
+ exec="onos-wait-for-start ${OCMI#}"
+ requires="Phase-One-Secure-SSH-${#}"/>
+ </sequential>
+ </group>
+
+ <step name="Run-Upgrade"
+ exec="onos ${OC1} issu upgrade"
+ requires="Phase-1"/>
+
+ <step name="Run-Rollback"
+ exec="onos ${OC1} issu rollback"
+ requires="Run-Upgrade"/>
+
+ <group name="Phase-2" requires="Run-Rollback">
+ <sequential var="${OCMI#}"
+ starts="Phase-Two-Stop-Service-${#}"
+ ends="Phase-Two-Wait-for-Start-${#-1}">
+ <step name="Phase-Two-Stop-Service-${#}"
+ exec="onos-service ${OCMI#} stop"
+ requires="Run-Rollback"/>
+
+ <step name="Phase-Two-Wait-for-Stop-${#}"
+ exec="onos-wait-for-stop ${OCMI#}"
+ requires="~Phase-Two-Stop-Service-${#}"/>
+
+ <step name="Phase-Two-Uninstall-${#}"
+ exec="onos-uninstall ${OCMI#}"
+ requires="~Phase-Two-Wait-for-Stop-${#}"/>
+
+ <step name="Phase-Two-Push-Bits-${#}"
+ exec="onos-push-bits ${OCMI#}"
+ unless="${OCT}"
+ requires="~Phase-Two-Stop-Service-${#}"/>
+
+ <step name="Phase-Two-Install-Upgrade-${#}"
+ exec="onos-install ${OCMI#}"
+ requires="Phase-Two-Push-Bits-${#},Push-Bits,Phase-Two-Uninstall-${#}"/>
+
+ <step name="Phase-Two-Secure-SSH-${#}"
+ exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMI#}"
+ requires="~Phase-Two-Install-Upgrade-${#}"/>
+
+ <step name="Phase-Two-Wait-for-Start-${#}"
+ exec="onos-wait-for-start ${OCMI#}"
+ requires="Phase-Two-Secure-SSH-${#}"/>
+ </sequential>
+ </group>
+
+ <step name="Reset-Upgrade"
+ exec="onos ${OC1} issu reset"
+ requires="Phase-2"/>
+
+ <group name="Verify-Rollback" requires="Reset-Upgrade">
+ <parallel var="${OC#}">
+ <step name="Check-Nodes-${#}"
+ exec="onos-check-nodes ${OC#}"
+ delay="3"
+ requires="Reset-Upgrade"/>
+ <step name="Check-Components-${#}"
+ exec="onos-check-components ${OC#}"
+ delay="5"
+ requires="~Check-Nodes-${#}"/>
+
+ <step name="Check-Logs-${#}"
+ exec="onos-check-logs ${OC#}"
+ requires="~Check-Components-${#}"/>
+ <step name="Check-Apps-${#}"
+ exec="onos-check-apps ${OC#} ${ONOS_APPS} includes"
+ requires="~Check-Components-${#}"/>
+ </parallel>
+ </group>
+ </group>
+</scenario>
\ No newline at end of file
diff --git a/tools/test/scenarios/upgrade.xml b/tools/test/scenarios/upgrade.xml
new file mode 100644
index 0000000..e675632
--- /dev/null
+++ b/tools/test/scenarios/upgrade.xml
@@ -0,0 +1,121 @@
+<!--
+ ~ Copyright 2017-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<scenario name="upgrade" description="ONOS cluster upgrade">
+ <group name="Upgrade">
+ <step name="Push-Bits" exec="onos-push-bits-through-proxy" if="${OCT}"/>
+
+ <step name="Initialize-Upgrade"
+ exec="onos ${OC1} issu init"/>
+
+ <group name="Phase-1">
+ <sequential var="${OCMI#}"
+ starts="Phase-One-Stop-Service-${#}"
+ ends="Phase-One-Wait-for-Start-${#-1}">
+ <step name="Phase-One-Stop-Service-${#}"
+ exec="onos-service ${OCMI#} stop"
+ requires="Initialize-Upgrade"/>
+
+ <step name="Phase-One-Wait-for-Stop-${#}"
+ exec="onos-wait-for-stop ${OCMI#}"
+ requires="~Phase-One-Stop-Service-${#}"/>
+
+ <step name="Phase-One-Uninstall-${#}"
+ exec="onos-uninstall ${OCMI#}"
+ requires="~Phase-One-Wait-for-Stop-${#}"/>
+
+ <step name="Phase-One-Push-Bits-${#}"
+ exec="onos-push-bits ${OCMI#}"
+ unless="${OCT}"
+ requires="~Phase-One-Stop-Service-${#}"/>
+
+ <step name="Phase-One-Install-Upgrade-${#}"
+ exec="onos-install -v ${OCMI#}"
+ requires="Phase-One-Push-Bits-${#},Push-Bits,Phase-One-Uninstall-${#}"/>
+
+ <step name="Phase-One-Secure-SSH-${#}"
+ exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMI#}"
+ requires="~Phase-One-Install-Upgrade-${#}"/>
+
+ <step name="Phase-One-Wait-for-Start-${#}"
+ exec="onos-wait-for-start ${OCMI#}"
+ requires="Phase-One-Secure-SSH-${#}"/>
+ </sequential>
+ </group>
+
+ <step name="Run-Upgrade"
+ exec="onos ${OC1} issu upgrade"
+ requires="Phase-1"/>
+
+ <group name="Phase-2" requires="Run-Upgrade">
+ <sequential var="${OCMA#}"
+ starts="Phase-Two-Stop-Service-${#}"
+ ends="Phase-Two-Wait-for-Start-${#-1}">
+ <step name="Phase-Two-Stop-Service-${#}"
+ exec="onos-service ${OCMA#} stop"
+ requires="Run-Upgrade"/>
+
+ <step name="Phase-Two-Wait-for-Stop-${#}"
+ exec="onos-wait-for-stop ${OCMA#}"
+ requires="~Phase-Two-Stop-Service-${#}"/>
+
+ <step name="Phase-Two-Uninstall-${#}"
+ exec="onos-uninstall ${OCMA#}"
+ requires="~Phase-Two-Wait-for-Stop-${#}"/>
+
+ <step name="Phase-Two-Push-Bits-${#}"
+ exec="onos-push-bits ${OCMA#}"
+ unless="${OCT}"
+ requires="~Phase-Two-Stop-Service-${#}"/>
+
+ <step name="Phase-Two-Install-Upgrade-${#}"
+ exec="onos-install -v ${OCMA#}"
+ requires="Phase-Two-Push-Bits-${#},Push-Bits,Phase-Two-Uninstall-${#}"/>
+
+ <step name="Phase-Two-Secure-SSH-${#}"
+ exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMA#}"
+ requires="~Phase-Two-Install-Upgrade-${#}"/>
+
+ <step name="Phase-Two-Wait-for-Start-${#}"
+ exec="onos-wait-for-start ${OCMA#}"
+ requires="Phase-Two-Secure-SSH-${#}"/>
+ </sequential>
+ </group>
+
+ <step name="Commit-Upgrade"
+ exec="onos ${OC1} issu commit"
+ requires="Phase-2"/>
+
+ <group name="Verify-Upgrade" requires="Commit-Upgrade">
+ <parallel var="${OC#}">
+ <step name="Check-Nodes-${#}"
+ exec="onos-check-nodes ${OC#}"
+ delay="3"
+ requires="Commit-Upgrade"/>
+ <step name="Check-Components-${#}"
+ exec="onos-check-components ${OC#}"
+ delay="5"
+ requires="~Check-Nodes-${#}"/>
+
+ <step name="Check-Logs-${#}"
+ exec="onos-check-logs ${OC#}"
+ requires="~Check-Components-${#}"/>
+ <step name="Check-Apps-${#}"
+ exec="onos-check-apps ${OC#} ${ONOS_APPS} includes"
+ requires="~Check-Components-${#}"/>
+ </parallel>
+ </group>
+ </group>
+</scenario>
\ No newline at end of file
diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java b/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java
index a0769da..7537871 100644
--- a/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java
+++ b/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java
@@ -88,7 +88,7 @@
}
private void populateRow(TableModel.Row row, PartitionInfo p) {
- row.cell(NAME, p.name())
+ row.cell(NAME, p.id())
.cell(TERM, p.term())
.cell(LEADER, p.leader())
.cell(MEMBERS, p.members());