Replace Unified* services with MembershipService for subgroup membership
Change-Id: Iabff173ce3501d1ed300513cac445bb712614bd9
diff --git a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
index 5a55c99..a0967b3 100644
--- a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
@@ -22,8 +22,8 @@
import org.apache.karaf.shell.commands.Command;
import org.joda.time.DateTime;
import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.MembershipAdminService;
import org.onosproject.core.Version;
import org.onosproject.utils.Comparators;
@@ -32,7 +32,6 @@
import static com.google.common.collect.Lists.newArrayList;
-
/**
* Lists all controller cluster nodes.
*/
@@ -44,7 +43,7 @@
@Override
protected void execute() {
- MembershipAdminService service = get(MembershipAdminService.class);
+ ClusterAdminService service = get(ClusterAdminService.class);
List<ControllerNode> nodes = newArrayList(service.getNodes());
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
if (outputJson()) {
@@ -68,7 +67,7 @@
}
// Produces JSON structure.
- private JsonNode json(MembershipAdminService service, List<ControllerNode> nodes) {
+ private JsonNode json(ClusterAdminService service, List<ControllerNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
ControllerNode self = service.getLocalNode();
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
index e680877..d1c4325 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
@@ -15,14 +15,56 @@
*/
package org.onosproject.cluster;
+import java.util.Set;
+
+import org.onlab.packet.IpAddress;
+
/**
* Service for administering the cluster node membership.
- * <p>
- * This service has a view of the cluster membership that is isolated to the local node's version during upgrades.
- * For an equivalent service that has control over all nodes during an upgrade use
- * {@link UnifiedClusterAdminService}.
- *
- * @see UnifiedClusterAdminService
*/
-public interface ClusterAdminService extends MembershipAdminService {
+public interface ClusterAdminService extends ClusterService {
+
+ /**
+ * Forms cluster configuration based on the specified set of node
+ * information. This method resets and restarts the controller
+ * instance.
+ *
+ * @param nodes set of nodes that form the cluster
+ */
+ void formCluster(Set<ControllerNode> nodes);
+
+ /**
+ * Forms cluster configuration based on the specified set of node
+ * information. This method resets and restarts the controller
+ * instance.
+ *
+ * @param nodes set of nodes that form the cluster
+ * @param partitionSize number of nodes to compose a partition
+ */
+ void formCluster(Set<ControllerNode> nodes, int partitionSize);
+
+ /**
+ * Adds a new controller node to the cluster.
+ *
+ * @param nodeId controller node identifier
+ * @param ip node IP listen address
+ * @param tcpPort tcp listen port
+ * @return newly added node
+ */
+ ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
+
+ /**
+ * Removes the specified node from the cluster node list.
+ *
+ * @param nodeId controller node identifier
+ */
+ void removeNode(NodeId nodeId);
+
+ /**
+ * Marks the current node as fully started or not.
+ *
+ * @param started true indicates all components have been started
+ */
+ void markFullyStarted(boolean started);
+
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
index f17e4d9..e4cf7ec 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
@@ -15,13 +15,63 @@
*/
package org.onosproject.cluster;
+import java.util.Set;
+
+import org.joda.time.DateTime;
+import org.onosproject.core.Version;
+import org.onosproject.event.ListenerService;
+
/**
* Service for obtaining information about the individual nodes within the controller cluster.
- * <p>
- * This service's view of the nodes in the cluster is isolated to a single version of the software. During upgrades,
- * when multiple versions of the software are running in the same cluster, users of this service will only be able
- * to see nodes running the same version as the local node. This is useful for limiting communication to nodes running
- * the same version of the software.
*/
-public interface ClusterService extends MembershipService {
+public interface ClusterService extends ListenerService<ClusterEvent, ClusterEventListener> {
+
+ /**
+ * Returns the local controller node.
+ *
+ * @return local controller node
+ */
+ ControllerNode getLocalNode();
+
+ /**
+ * Returns the set of current cluster members.
+ *
+ * @return set of cluster members
+ */
+ Set<ControllerNode> getNodes();
+
+ /**
+ * Returns the specified controller node.
+ *
+ * @param nodeId controller node identifier
+ * @return controller node
+ */
+ ControllerNode getNode(NodeId nodeId);
+
+ /**
+ * Returns the availability state of the specified controller node. Note
+ * that this does not imply that all the core and application components
+ * have been fully activated; only that the node has joined the cluster.
+ *
+ * @param nodeId controller node identifier
+ * @return availability state
+ */
+ ControllerNode.State getState(NodeId nodeId);
+
+ /**
+ * Returns the version of the given controller node.
+ *
+ * @param nodeId controller node identifier
+ * @return controller version
+ */
+ Version getVersion(NodeId nodeId);
+
+ /**
+ * Returns the system time when the availability state was last updated.
+ *
+ * @param nodeId controller node identifier
+ * @return system time when the availability state was last updated.
+ */
+ DateTime getLastUpdated(NodeId nodeId);
+
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java b/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java
index 4983652..1d1a3ec 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ControllerNode.java
@@ -74,7 +74,6 @@
*/
IpAddress ip();
-
/**
* Returns the TCP port on which the node listens for connections.
*
diff --git a/core/api/src/main/java/org/onosproject/cluster/Member.java b/core/api/src/main/java/org/onosproject/cluster/Member.java
new file mode 100644
index 0000000..796299b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/Member.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Objects;
+
+import org.onosproject.core.Version;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Controller member identity.
+ */
+public final class Member {
+
+ private final NodeId nodeId;
+ private final Version version;
+
+ /**
+ * Creates a new cluster member identifier from the specified string.
+ *
+ * @param nodeId node identifier
+ * @param version node version
+ */
+ public Member(NodeId nodeId, Version version) {
+ this.nodeId = checkNotNull(nodeId);
+ this.version = version;
+ }
+
+ /**
+ * Returns the node identifier.
+ *
+ * @return the node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the node version.
+ *
+ * @return the node version
+ */
+ public Version version() {
+ return version;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, version);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof Member) {
+ Member member = (Member) object;
+ return member.nodeId.equals(nodeId) && Objects.equals(member.version, version);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("nodeId", nodeId)
+ .add("version", version)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java
deleted file mode 100644
index 908a963..0000000
--- a/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cluster;
-
-import java.util.Set;
-
-import org.onlab.packet.IpAddress;
-
-/**
- * Service for administering the cluster node membership.
- */
-public interface MembershipAdminService extends MembershipService {
-
- /**
- * Forms cluster configuration based on the specified set of node
- * information. This method resets and restarts the controller
- * instance.
- *
- * @param nodes set of nodes that form the cluster
- */
- void formCluster(Set<ControllerNode> nodes);
-
- /**
- * Forms cluster configuration based on the specified set of node
- * information. This method resets and restarts the controller
- * instance.
- *
- * @param nodes set of nodes that form the cluster
- * @param partitionSize number of nodes to compose a partition
- */
- void formCluster(Set<ControllerNode> nodes, int partitionSize);
-
- /**
- * Adds a new controller node to the cluster.
- *
- * @param nodeId controller node identifier
- * @param ip node IP listen address
- * @param tcpPort tcp listen port
- * @return newly added node
- */
- ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
-
- /**
- * Removes the specified node from the cluster node list.
- *
- * @param nodeId controller node identifier
- */
- void removeNode(NodeId nodeId);
-
- /**
- * Marks the current node as fully started or not.
- *
- * @param started true indicates all components have been started
- */
- void markFullyStarted(boolean started);
-
-}
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipGroup.java b/core/api/src/main/java/org/onosproject/cluster/MembershipGroup.java
new file mode 100644
index 0000000..6526a1d
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/MembershipGroup.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.onosproject.core.Version;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Membership group.
+ */
+public class MembershipGroup {
+ private final Version version;
+ private final Set<Member> members;
+
+ public MembershipGroup(Version version, Set<Member> members) {
+ this.version = version;
+ this.members = members;
+ }
+
+ /**
+ * Returns the group version.
+ *
+ * @return the group version
+ */
+ public Version version() {
+ return version;
+ }
+
+ /**
+ * Returns the set of members in the group.
+ *
+ * @return the set of members in the group
+ */
+ public Set<Member> members() {
+ return members;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("version", version)
+ .add("members", members)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
index 7b366f6..f56daf5 100644
--- a/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
@@ -15,32 +15,59 @@
*/
package org.onosproject.cluster;
+import java.util.Collection;
import java.util.Set;
-import org.joda.time.DateTime;
import org.onosproject.core.Version;
-import org.onosproject.event.ListenerService;
/**
- * Service for obtaining information about the individual nodes within
- * the controller cluster.
+ * Service for obtaining information about the individual members of the controller cluster.
*/
-public interface MembershipService
- extends ListenerService<ClusterEvent, ClusterEventListener> {
+public interface MembershipService {
/**
- * Returns the local controller node.
+ * Returns the local member.
*
- * @return local controller node
+ * @return local member
*/
- ControllerNode getLocalNode();
+ Member getLocalMember();
/**
- * Returns the set of current cluster members.
+ * Returns the group associated with the local member.
*
- * @return set of cluster members
+ * @return the group associated with the local member
*/
- Set<ControllerNode> getNodes();
+ MembershipGroup getLocalGroup();
+
+ /**
+ * Returns the set of current cluster members in the local group.
+ *
+ * @return set of cluster members in the local group
+ */
+ Set<Member> getMembers();
+
+ /**
+ * Returns the set of membership groups in the cluster.
+ *
+ * @return the set of membership groups in the cluster
+ */
+ Collection<MembershipGroup> getGroups();
+
+ /**
+ * Returns the membership group for the given version.
+ *
+ * @param version the version for which to return the membership group
+ * @return the membership group for the given version
+ */
+ MembershipGroup getGroup(Version version);
+
+ /**
+ * Returns the set of members in the given version.
+ *
+ * @param version the version for which to return the set of members
+ * @return the set of members for the given version
+ */
+ Set<Member> getMembers(Version version);
/**
* Returns the specified controller node.
@@ -48,32 +75,6 @@
* @param nodeId controller node identifier
* @return controller node
*/
- ControllerNode getNode(NodeId nodeId);
-
- /**
- * Returns the availability state of the specified controller node. Note
- * that this does not imply that all the core and application components
- * have been fully activated; only that the node has joined the cluster.
- *
- * @param nodeId controller node identifier
- * @return availability state
- */
- ControllerNode.State getState(NodeId nodeId);
-
- /**
- * Returns the version of the given controller node.
- *
- * @param nodeId controller node identifier
- * @return controller version
- */
- Version getVersion(NodeId nodeId);
-
- /**
- * Returns the system time when the availability state was last updated.
- *
- * @param nodeId controller node identifier
- * @return system time when the availability state was last updated.
- */
- DateTime getLastUpdated(NodeId nodeId);
+ Member getMember(NodeId nodeId);
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java
deleted file mode 100644
index 9fb7407..0000000
--- a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cluster;
-
-import com.google.common.annotations.Beta;
-
-/**
- * Cluster membership administration service that supports modification of all nodes during an upgrade.
- */
-@Beta
-public interface UnifiedClusterAdminService extends MembershipAdminService {
-}
diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java
deleted file mode 100644
index f690f3e..0000000
--- a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cluster;
-
-import com.google.common.annotations.Beta;
-
-/**
- * Unified multi-version cluster membership service.
- * <p>
- * During upgrades, the nodes within a cluster may be running multiple versions of the software.
- * This service has a view of the entire cluster running any version. Users of this service must be careful when
- * communicating with nodes described by this service as compatibility issues can result from communicating across
- * versions. For an equivalent service that has an isolated view of the cluster, see {@link ClusterService}.
- *
- * @see ClusterService
- */
-@Beta
-public interface UnifiedClusterService extends MembershipService {
-}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index dfc558d..7aa5ac6 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,15 +15,152 @@
*/
package org.onosproject.store.cluster.messaging;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.onosproject.cluster.NodeId;
+
/**
* Service for assisting communications between controller cluster nodes.
- * <p>
- * Communication via this service is isolated to nodes running a single version of the software. During upgrades, when
- * nodes may be running multiple versions simultaneously, this service prevents nodes running different versions of
- * the software from communicating with each other, thus avoiding compatibility issues. For an equivalent cross-version
- * compatible service, see {@link UnifiedClusterCommunicationService}.
- *
- * @see UnifiedClusterCommunicationService
*/
-public interface ClusterCommunicationService extends ClusterCommunicator {
+public interface ClusterCommunicationService {
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param subscriber message subscriber
+ * @param executor executor to use for running handler.
+ * @deprecated in Cardinal Release
+ */
+ @Deprecated
+ void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
+
+ /**
+ * Broadcasts a message to all controller nodes.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param <M> message type
+ */
+ <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder);
+
+ /**
+ * Broadcasts a message to all controller nodes including self.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param <M> message type
+ */
+ <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder);
+
+ /**
+ * Sends a message to the specified controller node.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param toNodeId destination node identifier
+ * @param <M> message type
+ * @return future that is completed when the message is sent
+ */
+ <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId);
+
+ /**
+ * Multicasts a message to a set of controller nodes.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param nodeIds recipient node identifiers
+ * @param <M> message type
+ */
+ <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodeIds);
+
+ /**
+ * Sends a message and expects a reply.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding request to byte[]
+ * @param decoder function for decoding response from byte[]
+ * @param toNodeId recipient node identifier
+ * @param <M> request type
+ * @param <R> reply type
+ * @return reply future
+ */
+ <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder for resurrecting incoming message
+ * @param handler handler function that processes the incoming message and produces a reply
+ * @param encoder encoder for serializing reply
+ * @param executor executor to run this handler on
+ * @param <M> incoming message type
+ * @param <R> reply message type
+ */
+ <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder for resurrecting incoming message
+ * @param handler handler function that processes the incoming message and produces a reply
+ * @param encoder encoder for serializing reply
+ * @param <M> incoming message type
+ * @param <R> reply message type
+ */
+ <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder to resurrecting incoming message
+ * @param handler handler for handling message
+ * @param executor executor to run this handler on
+ * @param <M> incoming message type
+ */
+ <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor);
+
+ /**
+ * Removes a subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ */
+ void removeSubscriber(MessageSubject subject);
}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java
deleted file mode 100644
index 7d2480e..0000000
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.onosproject.cluster.NodeId;
-
-/**
- * Service for assisting communications between controller cluster nodes.
- */
-public interface ClusterCommunicator {
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param subscriber message subscriber
- * @param executor executor to use for running handler.
- * @deprecated in Cardinal Release
- */
- @Deprecated
- void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
-
- /**
- * Broadcasts a message to all controller nodes.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param <M> message type
- */
- <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder);
-
- /**
- * Broadcasts a message to all controller nodes including self.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param <M> message type
- */
- <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder);
-
- /**
- * Sends a message to the specified controller node.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param toNodeId destination node identifier
- * @param <M> message type
- * @return future that is completed when the message is sent
- */
- <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId);
-
- /**
- * Multicasts a message to a set of controller nodes.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding message to byte[]
- * @param nodeIds recipient node identifiers
- * @param <M> message type
- */
- <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodeIds);
-
- /**
- * Sends a message and expects a reply.
- *
- * @param message message to send
- * @param subject message subject
- * @param encoder function for encoding request to byte[]
- * @param decoder function for decoding response from byte[]
- * @param toNodeId recipient node identifier
- * @param <M> request type
- * @param <R> reply type
- * @return reply future
- */
- <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId);
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param decoder decoder for resurrecting incoming message
- * @param handler handler function that processes the incoming message and produces a reply
- * @param encoder encoder for serializing reply
- * @param executor executor to run this handler on
- * @param <M> incoming message type
- * @param <R> reply message type
- */
- <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor);
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param decoder decoder for resurrecting incoming message
- * @param handler handler function that processes the incoming message and produces a reply
- * @param encoder encoder for serializing reply
- * @param <M> incoming message type
- * @param <R> reply message type
- */
- <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder);
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param decoder decoder to resurrecting incoming message
- * @param handler handler for handling message
- * @param executor executor to run this handler on
- * @param <M> incoming message type
- */
- <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor);
-
- /**
- * Removes a subscriber for the specified message subject.
- *
- * @param subject message subject
- */
- void removeSubscriber(MessageSubject subject);
-}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java
deleted file mode 100644
index 042c803..0000000
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging;
-
-import com.google.common.annotations.Beta;
-
-/**
- * Service for unified communication across controller nodes running multiple software versions.
- * <p>
- * This service supports communicating across nodes running different versions of the software simultaneously. During
- * upgrades, when nodes may be running a mixture of versions, this service can be used to coordinate across those
- * versions. But users of this service must be extremely careful to preserve backward/forward compatibility for
- * messages sent across versions. Encoders and decoders used for messages sent/received on this service should
- * support evolving schemas.
- */
-@Beta
-public interface UnifiedClusterCommunicationService extends ClusterCommunicator {
-}
diff --git a/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/MembershipServiceAdapter.java
similarity index 62%
rename from core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java
rename to core/api/src/test/java/org/onosproject/cluster/MembershipServiceAdapter.java
index aca74ec..172c82c 100644
--- a/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/MembershipServiceAdapter.java
@@ -15,52 +15,47 @@
*/
package org.onosproject.cluster;
+import java.util.Collection;
import java.util.Set;
-import org.joda.time.DateTime;
import org.onosproject.core.Version;
/**
- * Compatible cluster service adapter.
+ * Membership service adapter.
*/
-public class UnifiedClusterServiceAdapter implements UnifiedClusterService {
+public class MembershipServiceAdapter implements MembershipService {
@Override
- public ControllerNode getLocalNode() {
+ public Member getLocalMember() {
return null;
}
@Override
- public Set<ControllerNode> getNodes() {
+ public MembershipGroup getLocalGroup() {
return null;
}
@Override
- public ControllerNode getNode(NodeId nodeId) {
+ public Set<Member> getMembers() {
return null;
}
@Override
- public ControllerNode.State getState(NodeId nodeId) {
+ public Collection<MembershipGroup> getGroups() {
return null;
}
@Override
- public Version getVersion(NodeId nodeId) {
+ public MembershipGroup getGroup(Version version) {
return null;
}
@Override
- public DateTime getLastUpdated(NodeId nodeId) {
+ public Set<Member> getMembers(Version version) {
return null;
}
@Override
- public void addListener(ClusterEventListener listener) {
-
- }
-
- @Override
- public void removeListener(ClusterEventListener listener) {
-
+ public Member getMember(NodeId nodeId) {
+ return null;
}
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index df7feff..fc3d802 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -15,28 +15,48 @@
*/
package org.onosproject.cluster.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterAdminService;
+import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
+import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ClusterStore;
+import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.UnifiedClusterAdminService;
-import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
+import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.core.VersionService;
+import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
import static org.slf4j.LoggerFactory.getLogger;
@@ -46,122 +66,172 @@
*/
@Component(immediate = true)
@Service
-public class ClusterManager implements ClusterService, ClusterAdminService {
+public class ClusterManager
+ extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
+ implements ClusterService, ClusterAdminService {
+ public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+ private static final int DEFAULT_PARTITION_SIZE = 3;
private final Logger log = getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private UnifiedClusterService clusterService;
+ private ClusterStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private UnifiedClusterAdminService clusterAdminService;
+ protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private VersionService versionService;
+ protected ClusterMetadataAdminService clusterMetadataAdminService;
- private Version version;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterStore store;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected SystemService systemService;
+
+ private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+ private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@Activate
public void activate() {
- version = versionService.version();
+ store.setDelegate(delegate);
+ eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
+ clusterMetadataService.addListener(metadataListener);
+ processMetadata(clusterMetadataService.getClusterMetadata());
log.info("Started");
}
@Deactivate
public void deactivate() {
+ clusterMetadataService.removeListener(metadataListener);
+ store.unsetDelegate(delegate);
+ eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
}
@Override
public ControllerNode getLocalNode() {
checkPermission(CLUSTER_READ);
- return clusterService.getLocalNode();
+ return store.getLocalNode();
}
@Override
public Set<ControllerNode> getNodes() {
checkPermission(CLUSTER_READ);
- return clusterService.getNodes()
- .stream()
- .filter(node -> clusterService.getVersion(node.id()).equals(version))
- .collect(Collectors.toSet());
+ return store.getNodes();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- Version nodeVersion = clusterService.getVersion(nodeId);
- if (nodeVersion != null && nodeVersion.equals(version)) {
- return clusterService.getNode(nodeId);
- }
- return null;
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return store.getNode(nodeId);
}
@Override
public ControllerNode.State getState(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- Version nodeVersion = clusterService.getVersion(nodeId);
- if (nodeVersion != null && nodeVersion.equals(version)) {
- return clusterService.getState(nodeId);
- }
- return null;
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return store.getState(nodeId);
}
@Override
public Version getVersion(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- Version nodeVersion = clusterService.getVersion(nodeId);
- if (nodeVersion != null && nodeVersion.equals(version)) {
- return nodeVersion;
- }
- return null;
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return store.getVersion(nodeId);
}
@Override
public void markFullyStarted(boolean started) {
- clusterAdminService.markFullyStarted(started);
+ store.markFullyStarted(started);
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
checkPermission(CLUSTER_READ);
- Version nodeVersion = clusterService.getVersion(nodeId);
- if (nodeVersion != null && nodeVersion.equals(version)) {
- return clusterService.getLastUpdated(nodeId);
- }
- return null;
+ return store.getLastUpdated(nodeId);
}
@Override
public void formCluster(Set<ControllerNode> nodes) {
- clusterAdminService.formCluster(nodes);
+ formCluster(nodes, DEFAULT_PARTITION_SIZE);
}
@Override
public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
- clusterAdminService.formCluster(nodes, partitionSize);
- }
+ checkNotNull(nodes, "Nodes cannot be null");
+ checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- return clusterAdminService.addNode(nodeId, ip, tcpPort);
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- Version nodeVersion = clusterService.getVersion(nodeId);
- if (nodeVersion != null && nodeVersion.equals(version)) {
- clusterAdminService.removeNode(nodeId);
+ ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
+ clusterMetadataAdminService.setClusterMetadata(metadata);
+ try {
+ log.warn("Shutting down container for cluster reconfiguration!");
+ // Clean up persistent state associated with previous cluster configuration.
+ Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
+ systemService.reboot("now", SystemService.Swipe.NONE);
+ } catch (Exception e) {
+ log.error("Unable to reboot container", e);
}
}
@Override
- public void addListener(ClusterEventListener listener) {
- clusterService.addListener(listener);
+ public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ checkNotNull(ip, "IP address cannot be null");
+ checkArgument(tcpPort > 5000, "TCP port must be > 5000");
+ return store.addNode(nodeId, ip, tcpPort);
}
@Override
- public void removeListener(ClusterEventListener listener) {
- clusterService.removeListener(listener);
+ public void removeNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ store.removeNode(nodeId);
+ }
+
+ // Store delegate to re-post events emitted from the store.
+ private class InternalStoreDelegate implements ClusterStoreDelegate {
+ @Override
+ public void notify(ClusterEvent event) {
+ post(event);
+ }
+ }
+
+ private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
+ List<ControllerNode> sorted = new ArrayList<>(nodes);
+ Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
+ Set<Partition> partitions = Sets.newHashSet();
+ // add partitions
+ int length = nodes.size();
+ int count = Math.min(partitionSize, length);
+ for (int i = 0; i < length; i++) {
+ int index = i;
+ Set<NodeId> set = new HashSet<>(count);
+ for (int j = 0; j < count; j++) {
+ set.add(sorted.get((i + j) % length).id());
+ }
+ partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
+ }
+ return partitions;
+ }
+
+ /**
+ * Processes metadata by adding and removing nodes from the cluster.
+ */
+ private synchronized void processMetadata(ClusterMetadata metadata) {
+ try {
+ ClusterMetadataDiff examiner =
+ new ClusterMetadataDiff(currentMetadata.get(), metadata);
+ examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+ examiner.nodesRemoved().forEach(this::removeNode);
+ } finally {
+ currentMetadata.set(metadata);
+ }
+ }
+
+ private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+ @Override
+ public void event(ClusterMetadataEvent event) {
+ processMetadata(event.subject());
+ }
}
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MembershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MembershipManager.java
new file mode 100644
index 0000000..8d869ca
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MembershipManager.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipGroup;
+import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.Version;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Cluster membership manager.
+ */
+@Component(immediate = true)
+@Service
+public class MembershipManager implements MembershipService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private Member localMember;
+
+ @Activate
+ public void activate() {
+ localMember = new Member(
+ clusterService.getLocalNode().id(),
+ clusterService.getVersion(clusterService.getLocalNode().id()));
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ private Member toMemberId(ControllerNode node) {
+ return new Member(node.id(), clusterService.getVersion(node.id()));
+ }
+
+ @Override
+ public Member getLocalMember() {
+ return localMember;
+ }
+
+ @Override
+ public MembershipGroup getLocalGroup() {
+ return getGroup(getLocalMember().version());
+ }
+
+ @Override
+ public Set<Member> getMembers() {
+ return clusterService.getNodes().stream()
+ .map(this::toMemberId)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Collection<MembershipGroup> getGroups() {
+ Map<Version, Set<Member>> groups = Maps.newHashMap();
+ clusterService.getNodes().stream()
+ .map(this::toMemberId)
+ .forEach(member ->
+ groups.computeIfAbsent(member.version(), k -> Sets.newHashSet()).add(member));
+ return Maps.transformEntries(groups, MembershipGroup::new).values();
+ }
+
+ @Override
+ public MembershipGroup getGroup(Version version) {
+ return new MembershipGroup(version, getMembers(version));
+ }
+
+ @Override
+ public Set<Member> getMembers(Version version) {
+ return getMembers()
+ .stream()
+ .filter(m -> Objects.equals(m.version(), version))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Member getMember(NodeId nodeId) {
+ ControllerNode node = clusterService.getNode(nodeId);
+ return node != null ? toMemberId(node) : null;
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java
deleted file mode 100644
index 3617305..0000000
--- a/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cluster.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.collect.Sets;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.karaf.system.SystemService;
-import org.joda.time.DateTime;
-import org.onlab.packet.IpAddress;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataAdminService;
-import org.onosproject.cluster.ClusterMetadataDiff;
-import org.onosproject.cluster.ClusterMetadataEvent;
-import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultPartition;
-import org.onosproject.cluster.UnifiedClusterAdminService;
-import org.onosproject.cluster.UnifiedClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.Version;
-import org.onosproject.event.AbstractListenerManager;
-import org.slf4j.Logger;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Implementation of the cluster service.
- */
-@Component(immediate = true)
-@Service
-public class UnifiedClusterManager
- extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
- implements UnifiedClusterService, UnifiedClusterAdminService {
-
- public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
- private static final int DEFAULT_PARTITION_SIZE = 3;
- private final Logger log = getLogger(getClass());
-
- private ClusterStoreDelegate delegate = new InternalStoreDelegate();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataAdminService clusterMetadataAdminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterStore store;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected SystemService systemService;
-
- private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
- private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
-
- @Activate
- public void activate() {
- store.setDelegate(delegate);
- eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
- clusterMetadataService.addListener(metadataListener);
- processMetadata(clusterMetadataService.getClusterMetadata());
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- clusterMetadataService.removeListener(metadataListener);
- store.unsetDelegate(delegate);
- eventDispatcher.removeSink(ClusterEvent.class);
- log.info("Stopped");
- }
-
- @Override
- public ControllerNode getLocalNode() {
- checkPermission(CLUSTER_READ);
- return store.getLocalNode();
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- checkPermission(CLUSTER_READ);
- return store.getNodes();
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getNode(nodeId);
- }
-
- @Override
- public ControllerNode.State getState(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getState(nodeId);
- }
-
- @Override
- public Version getVersion(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- return store.getVersion(nodeId);
- }
-
- @Override
- public void markFullyStarted(boolean started) {
- store.markFullyStarted(started);
- }
-
- @Override
- public DateTime getLastUpdated(NodeId nodeId) {
- checkPermission(CLUSTER_READ);
- return store.getLastUpdated(nodeId);
- }
-
- @Override
- public void formCluster(Set<ControllerNode> nodes) {
- formCluster(nodes, DEFAULT_PARTITION_SIZE);
- }
-
- @Override
- public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
- checkNotNull(nodes, "Nodes cannot be null");
- checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
-
- ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
- clusterMetadataAdminService.setClusterMetadata(metadata);
- try {
- log.warn("Shutting down container for cluster reconfiguration!");
- // Clean up persistent state associated with previous cluster configuration.
- Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
- systemService.reboot("now", SystemService.Swipe.NONE);
- } catch (Exception e) {
- log.error("Unable to reboot container", e);
- }
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- checkNotNull(ip, "IP address cannot be null");
- checkArgument(tcpPort > 5000, "TCP port must be > 5000");
- return store.addNode(nodeId, ip, tcpPort);
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- store.removeNode(nodeId);
- }
-
- // Store delegate to re-post events emitted from the store.
- private class InternalStoreDelegate implements ClusterStoreDelegate {
- @Override
- public void notify(ClusterEvent event) {
- post(event);
- }
- }
-
- private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
- List<ControllerNode> sorted = new ArrayList<>(nodes);
- Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
- Set<Partition> partitions = Sets.newHashSet();
- // add partitions
- int length = nodes.size();
- int count = Math.min(partitionSize, length);
- for (int i = 0; i < length; i++) {
- int index = i;
- Set<NodeId> set = new HashSet<>(count);
- for (int j = 0; j < count; j++) {
- set.add(sorted.get((i + j) % length).id());
- }
- partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
- }
- return partitions;
- }
-
- /**
- * Processes metadata by adding and removing nodes from the cluster.
- */
- private synchronized void processMetadata(ClusterMetadata metadata) {
- try {
- ClusterMetadataDiff examiner =
- new ClusterMetadataDiff(currentMetadata.get(), metadata);
- examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
- examiner.nodesRemoved().forEach(this::removeNode);
- } finally {
- currentMetadata.set(metadata);
- }
- }
-
- private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
- @Override
- public void event(ClusterMetadataEvent event) {
- processMetadata(event.subject());
- }
- }
-}
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
index 410b137..9fdd7a2 100644
--- a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
@@ -28,9 +28,10 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
@@ -70,7 +71,10 @@
protected CoordinationService coordinationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MembershipService membershipService;
private Version localVersion;
private AtomicValue<Upgrade> state;
@@ -243,13 +247,8 @@
}
// Determine whether any nodes have not been upgraded to the target version.
- boolean upgradeComplete = clusterService.getNodes()
- .stream()
- .allMatch(node -> {
- ControllerNode.State state = clusterService.getState(node.id());
- Version version = clusterService.getVersion(node.id());
- return state.isActive() && version != null && version.equals(upgraded.target());
- });
+ boolean upgradeComplete = membershipService.getGroups().size() == 1
+ && membershipService.getLocalGroup().version().equals(upgraded.target());
// If some nodes have not yet been upgraded, throw an exception.
if (!upgradeComplete) {
@@ -333,13 +332,8 @@
}
// Determine whether any nodes are still running the target version.
- boolean rollbackComplete = clusterService.getNodes()
- .stream()
- .allMatch(node -> {
- ControllerNode.State state = clusterService.getState(node.id());
- Version version = clusterService.getVersion(node.id());
- return state.isActive() && version != null && version.equals(upgraded.source());
- });
+ boolean rollbackComplete = membershipService.getGroups().size() == 1
+ && membershipService.getLocalGroup().version().equals(upgraded.source());
// If some nodes have not yet been downgraded, throw an exception.
if (!rollbackComplete) {
diff --git a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
index 48337cd..78127ee 100644
--- a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
@@ -16,17 +16,24 @@
package org.onosproject.upgrade.impl;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.UnifiedClusterServiceAdapter;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipGroup;
+import org.onosproject.cluster.MembershipServiceAdapter;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.Version;
import org.onosproject.core.VersionServiceAdapter;
@@ -59,7 +66,29 @@
@SuppressWarnings("unchecked")
private UpgradeManager createUpgradeManager(Version version, Upgrade state, List<Version> versions) {
UpgradeManager upgradeManager = new UpgradeManager();
- upgradeManager.clusterService = new UnifiedClusterServiceAdapter() {
+ upgradeManager.membershipService = new MembershipServiceAdapter() {
+ @Override
+ public MembershipGroup getLocalGroup() {
+ return getGroups()
+ .stream()
+ .filter(group -> group.version().equals(version))
+ .findFirst()
+ .get();
+ }
+
+ @Override
+ public Collection<MembershipGroup> getGroups() {
+ AtomicInteger nodeCounter = new AtomicInteger();
+ Map<Version, Set<Member>> groups = Maps.newHashMap();
+ versions.stream().forEach(version -> {
+ groups.computeIfAbsent(version, k -> Sets.newHashSet())
+ .add(new Member(NodeId.nodeId(String.valueOf(nodeCounter.getAndIncrement())), version));
+ });
+ return Maps.transformEntries(groups, MembershipGroup::new).values();
+ }
+ };
+
+ upgradeManager.clusterService = new ClusterServiceAdapter() {
@Override
public Set<ControllerNode> getNodes() {
AtomicInteger nodeCounter = new AtomicInteger();
@@ -84,11 +113,6 @@
}
@Override
- public ControllerNode.State getState(NodeId nodeId) {
- return ControllerNode.State.READY;
- }
-
- @Override
public Version getVersion(NodeId nodeId) {
return versions.get(Integer.parseInt(nodeId.id()));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
deleted file mode 100644
index aa96131..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Throwables;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.UnifiedClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.utils.MeteringAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
-
-@Component(componentAbstract = true)
-public abstract class AbstractClusterCommunicationManager
- implements ClusterCommunicationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
- private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
-
- private static final String PRIMITIVE_NAME = "clusterCommunication";
- private static final String SUBJECT_PREFIX = "subject";
- private static final String ENDPOINT_PREFIX = "endpoint";
-
- private static final String SERIALIZING = "serialization";
- private static final String DESERIALIZING = "deserialization";
- private static final String NODE_PREFIX = "node:";
- private static final String ROUND_TRIP_SUFFIX = ".rtt";
- private static final String ONE_WAY_SUFFIX = ".oneway";
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
-
- private NodeId localNodeId;
-
- /**
- * Returns the type for the given message subject.
- *
- * @param subject the type for the given message subject
- * @return the message subject
- */
- protected abstract String getType(MessageSubject subject);
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
- ).getBytes();
- return doUnicast(subject, payload, toNodeId);
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- @Override
- public <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodes) {
- checkPermission(CLUSTER_WRITE);
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
- .getBytes();
- nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
- }
-
- @Override
- public <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
- apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).
- thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
- return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e));
- }
-
- private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context epContext = endpointMeteringAgent.
- startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
- MeteringAgent.Context subjectContext = subjectMeteringAgent.
- startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
- return messagingService.sendAndReceive(nodeEp, getType(subject), payload).
- whenComplete((bytes, throwable) -> {
- subjectContext.stop(throwable);
- epContext.stop(throwable);
- });
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber,
- ExecutorService executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalClusterMessageHandler(subscriber),
- executor);
- }
-
- @Override
- public void removeSubscriber(MessageSubject subject) {
- checkPermission(CLUSTER_WRITE);
- messagingService.unregisterHandler(getType(subject));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalMessageResponder<M, R>(decoder, encoder, m -> {
- CompletableFuture<R> responseFuture = new CompletableFuture<>();
- executor.execute(() -> {
- try {
- responseFuture.complete(handler.apply(m));
- } catch (Exception e) {
- responseFuture.completeExceptionally(e);
- }
- });
- return responseFuture;
- }));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalMessageResponder<>(decoder, encoder, handler));
- }
-
- @Override
- public <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalMessageConsumer<>(decoder, handler),
- executor);
- }
-
- /**
- * Performs the timed function, returning the value it would while timing the operation.
- *
- * @param timedFunction the function to be timed
- * @param meter the metering agent to be used to time the function
- * @param opName the opname to be used when starting the meter
- * @param <A> The param type of the function
- * @param <B> The return type of the function
- * @return the value returned by the timed function
- */
- private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
- MeteringAgent meter, String opName) {
- checkNotNull(timedFunction);
- checkNotNull(meter);
- checkNotNull(opName);
- return new Function<A, B>() {
- @Override
- public B apply(A a) {
- final MeteringAgent.Context context = meter.startTimer(opName);
- B result = null;
- try {
- result = timedFunction.apply(a);
- context.stop(null);
- return result;
- } catch (Exception e) {
- context.stop(e);
- Throwables.propagate(e);
- return null;
- }
- }
- };
- }
-
-
- private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
- private ClusterMessageHandler handler;
-
- public InternalClusterMessageHandler(ClusterMessageHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public byte[] apply(Endpoint sender, byte[] bytes) {
- ClusterMessage message = ClusterMessage.fromBytes(bytes);
- handler.handle(message);
- return message.response();
- }
- }
-
- private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
- private final Function<byte[], M> decoder;
- private final Function<R, byte[]> encoder;
- private final Function<M, CompletableFuture<R>> handler;
-
- public InternalMessageResponder(Function<byte[], M> decoder,
- Function<R, byte[]> encoder,
- Function<M, CompletableFuture<R>> handler) {
- this.decoder = decoder;
- this.encoder = encoder;
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
- return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload())).
- thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
- }
- }
-
- private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
- private final Function<byte[], M> decoder;
- private final Consumer<M> consumer;
-
- public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
- this.decoder = decoder;
- this.consumer = consumer;
- }
-
- @Override
- public void accept(Endpoint sender, byte[] bytes) {
- consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload()));
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 4602702..868006b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-present Open Networking Foundation
+ * Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,24 +15,326 @@
*/
package org.onosproject.store.cluster.messaging.impl;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.VersionService;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(immediate = true)
@Service
-public class ClusterCommunicationManager extends AbstractClusterCommunicationManager {
+public class ClusterCommunicationManager implements ClusterCommunicationService {
- private static final char VERSION_SEP = '-';
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+ private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+ private static final String PRIMITIVE_NAME = "clusterCommunication";
+ private static final String SUBJECT_PREFIX = "subject";
+ private static final String ENDPOINT_PREFIX = "endpoint";
+
+ private static final String SERIALIZING = "serialization";
+ private static final String DESERIALIZING = "deserialization";
+ private static final String NODE_PREFIX = "node:";
+ private static final String ROUND_TRIP_SUFFIX = ".rtt";
+ private static final String ONE_WAY_SUFFIX = ".oneway";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private VersionService versionService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
+
+ private NodeId localNodeId;
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
@Override
- protected String getType(MessageSubject subject) {
- return subject.value() + VERSION_SEP + versionService.version().toString();
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+ ).getBytes();
+ return doUnicast(subject, payload, toNodeId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ checkPermission(CLUSTER_WRITE);
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+ .getBytes();
+ nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+ apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+ thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
+ return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
+ }
+
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context epContext = endpointMeteringAgent.
+ startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+ MeteringAgent.Context subjectContext = subjectMeteringAgent.
+ startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+ return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
+ whenComplete((bytes, throwable) -> {
+ subjectContext.stop(throwable);
+ epContext.stop(throwable);
+ });
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber,
+ ExecutorService executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalClusterMessageHandler(subscriber),
+ executor);
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.unregisterHandler(subject.toString());
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalMessageResponder<>(decoder, encoder, handler));
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ /**
+ * Performs the timed function, returning the value it would while timing the operation.
+ *
+ * @param timedFunction the function to be timed
+ * @param meter the metering agent to be used to time the function
+ * @param opName the opname to be used when starting the meter
+ * @param <A> The param type of the function
+ * @param <B> The return type of the function
+ * @return the value returned by the timed function
+ */
+ private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+ MeteringAgent meter, String opName) {
+ checkNotNull(timedFunction);
+ checkNotNull(meter);
+ checkNotNull(opName);
+ return new Function<A, B>() {
+ @Override
+ public B apply(A a) {
+ final MeteringAgent.Context context = meter.startTimer(opName);
+ B result = null;
+ try {
+ result = timedFunction.apply(a);
+ context.stop(null);
+ return result;
+ } catch (Exception e) {
+ context.stop(e);
+ Throwables.propagate(e);
+ return null;
+ }
+ }
+ };
+ }
+
+
+ private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
+ private ClusterMessageHandler handler;
+
+ public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public byte[] apply(Endpoint sender, byte[] bytes) {
+ ClusterMessage message = ClusterMessage.fromBytes(bytes);
+ handler.handle(message);
+ return message.response();
+ }
+ }
+
+ private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, CompletableFuture<R>> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, CompletableFuture<R>> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
+ return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload())).
+ thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(Endpoint sender, byte[] bytes) {
+ consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload()));
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
deleted file mode 100644
index 45c84af..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-@Component(immediate = true)
-@Service
-public class UnifiedClusterCommunicationManager
- extends AbstractClusterCommunicationManager
- implements UnifiedClusterCommunicationService {
- @Override
- protected String getType(MessageSubject subject) {
- return subject.value();
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index 29045a2..80753f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -25,12 +25,12 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -70,10 +70,10 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
+ protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index 1bfaaed..9d6a016 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -15,21 +15,21 @@
*/
package org.onosproject.store.primitives.impl;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.MembershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,8 +38,8 @@
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
- private final MembershipService clusterService;
- private final ClusterCommunicator clusterCommunicator;
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
private String name;
private KryoNamespace serializer;
@@ -64,9 +64,10 @@
* @param clusterCommunicator cluster communication service
* @param persistenceService persistence service
*/
- public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
- ClusterCommunicator clusterCommunicator,
- PersistenceService persistenceService) {
+ public EventuallyConsistentMapBuilderImpl(
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ PersistenceService persistenceService) {
this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 461245e..462d4ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -15,34 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.AbstractAccumulator;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.MembershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.LogicalTimestamp;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DistributedPrimitive;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -67,6 +39,34 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SlidingWindowCounter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -86,8 +86,8 @@
private final Map<K, MapValue<V>> items;
- private final MembershipService clusterService;
- private final ClusterCommunicator clusterCommunicator;
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
private final Serializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
* @param persistenceService persistence service
*/
EventuallyConsistentMapImpl(String mapName,
- MembershipService clusterService,
- ClusterCommunicator clusterCommunicator,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 4a92682..5bbd681 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -45,7 +45,7 @@
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
@@ -71,7 +71,7 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService metadataService;
@@ -103,24 +103,24 @@
Version targetVersion = upgradeService.getState().target();
currentClusterMetadata.get()
.getPartitions()
- .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
- partition,
- sourceVersion,
- null,
- clusterCommunicator,
- clusterService,
- new File(System.getProperty("karaf.data") +
- "/partitions/" + sourceVersion + "/" + partition.getId()))));
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
- partition,
- targetVersion,
- sourceVersion,
- clusterCommunicator,
- clusterService,
- new File(System.getProperty("karaf.data") +
- "/partitions/" + targetVersion + "/" + partition.getId()))));
+ .forEach(partition -> {
+ inactivePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ sourceVersion,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + sourceVersion + "/" + partition.getId())));
+ activePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ targetVersion,
+ sourceVersion,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + targetVersion + "/" + partition.getId())));
+ });
// We have to fork existing partitions before we can start inactive partition servers to
// avoid duplicate message handlers when both servers are running.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 8bae2a3..ecba56d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,7 +40,7 @@
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.Serializer;
/**
@@ -51,7 +51,7 @@
public RaftClientCommunicator(
String prefix,
Serializer serializer,
- ClusterCommunicator clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 765eb02..1117ab9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.cluster.MemberId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
public abstract class RaftCommunicator {
protected final RaftMessageContext context;
protected final Serializer serializer;
- protected final ClusterCommunicator clusterCommunicator;
+ protected final ClusterCommunicationService clusterCommunicator;
public RaftCommunicator(
RaftMessageContext context,
Serializer serializer,
- ClusterCommunicator clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
this.context = checkNotNull(context, "context cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 2710a2c..097ee46 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -57,7 +57,6 @@
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@@ -68,7 +67,7 @@
public RaftServerCommunicator(
String prefix,
Serializer serializer,
- ClusterCommunicator clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 59c4b17..b73dd61 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -29,10 +29,10 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionService;
@@ -81,10 +81,10 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
+ protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 414c49c..5458edd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,12 +29,12 @@
import com.google.common.collect.ImmutableMap;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -54,8 +54,8 @@
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final UnifiedClusterCommunicationService clusterCommunicator;
- private final MembershipService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
+ private final ClusterService clusterService;
private final Version version;
private final Version source;
private final File dataFolder;
@@ -85,8 +85,8 @@
Partition partition,
Version version,
Version source,
- UnifiedClusterCommunicationService clusterCommunicator,
- MembershipService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService,
File dataFolder) {
this.partition = partition;
this.version = version;
@@ -191,6 +191,10 @@
return source != null ?
clusterService.getNodes()
.stream()
+ .filter(node -> {
+ Version nodeVersion = clusterService.getVersion(node.id());
+ return nodeVersion != null && nodeVersion.equals(version);
+ })
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList()) :
Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
@@ -202,6 +206,10 @@
} else {
return clusterService.getNodes()
.stream()
+ .filter(node -> {
+ Version nodeVersion = clusterService.getVersion(node.id());
+ return nodeVersion != null && nodeVersion.equals(version);
+ })
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList());
}
@@ -231,13 +239,10 @@
clusterCommunicator);
CompletableFuture<Void> future;
- if (clusterService.getNodes().size() == 1) {
+ if (getMemberIds().size() == 1) {
future = server.fork(version);
} else {
- future = server.join(clusterService.getNodes().stream()
- .filter(node -> !node.id().equals(localNodeId))
- .map(node -> MemberId.from(node.id().id()))
- .collect(Collectors.toList()));
+ future = server.join(getMemberIds());
}
return future.thenRun(() -> this.server = server);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 123c3b6..3a15fce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -28,7 +28,7 @@
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@@ -49,13 +49,13 @@
private final MemberId localMemberId;
private final StoragePartition partition;
- private final UnifiedClusterCommunicationService clusterCommunicator;
+ private final ClusterCommunicationService clusterCommunicator;
private RaftServer server;
public StoragePartitionServer(
StoragePartition partition,
MemberId localMemberId,
- UnifiedClusterCommunicationService clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
this.partition = partition;
this.localMemberId = localMemberId;
this.clusterCommunicator = clusterCommunicator;