Replace Unified* services with MembershipService for subgroup membership
Change-Id: Iabff173ce3501d1ed300513cac445bb712614bd9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
deleted file mode 100644
index aa96131..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Throwables;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.UnifiedClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.utils.MeteringAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
-
-@Component(componentAbstract = true)
-public abstract class AbstractClusterCommunicationManager
- implements ClusterCommunicationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
- private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
-
- private static final String PRIMITIVE_NAME = "clusterCommunication";
- private static final String SUBJECT_PREFIX = "subject";
- private static final String ENDPOINT_PREFIX = "endpoint";
-
- private static final String SERIALIZING = "serialization";
- private static final String DESERIALIZING = "deserialization";
- private static final String NODE_PREFIX = "node:";
- private static final String ROUND_TRIP_SUFFIX = ".rtt";
- private static final String ONE_WAY_SUFFIX = ".oneway";
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
-
- private NodeId localNodeId;
-
- /**
- * Returns the type for the given message subject.
- *
- * @param subject the type for the given message subject
- * @return the message subject
- */
- protected abstract String getType(MessageSubject subject);
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public <M> void broadcast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> void broadcastIncludeSelf(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- multicast(message,
- subject,
- encoder,
- clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .collect(Collectors.toSet()));
- }
-
- @Override
- public <M> CompletableFuture<Void> unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
- ).getBytes();
- return doUnicast(subject, payload, toNodeId);
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- @Override
- public <M> void multicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Set<NodeId> nodes) {
- checkPermission(CLUSTER_WRITE);
- byte[] payload = new ClusterMessage(
- localNodeId,
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
- .getBytes();
- nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
- }
-
- @Override
- public <M, R> CompletableFuture<R> sendAndReceive(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- Function<byte[], R> decoder,
- NodeId toNodeId) {
- checkPermission(CLUSTER_WRITE);
- try {
- ClusterMessage envelope = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
- apply(message));
- return sendAndReceive(subject, envelope.getBytes(), toNodeId).
- thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
- return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e));
- }
-
- private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- ControllerNode node = clusterService.getNode(toNodeId);
- checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- MeteringAgent.Context epContext = endpointMeteringAgent.
- startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
- MeteringAgent.Context subjectContext = subjectMeteringAgent.
- startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
- return messagingService.sendAndReceive(nodeEp, getType(subject), payload).
- whenComplete((bytes, throwable) -> {
- subjectContext.stop(throwable);
- epContext.stop(throwable);
- });
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber,
- ExecutorService executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalClusterMessageHandler(subscriber),
- executor);
- }
-
- @Override
- public void removeSubscriber(MessageSubject subject) {
- checkPermission(CLUSTER_WRITE);
- messagingService.unregisterHandler(getType(subject));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, R> handler,
- Function<R, byte[]> encoder,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalMessageResponder<M, R>(decoder, encoder, m -> {
- CompletableFuture<R> responseFuture = new CompletableFuture<>();
- executor.execute(() -> {
- try {
- responseFuture.complete(handler.apply(m));
- } catch (Exception e) {
- responseFuture.completeExceptionally(e);
- }
- });
- return responseFuture;
- }));
- }
-
- @Override
- public <M, R> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Function<M, CompletableFuture<R>> handler,
- Function<R, byte[]> encoder) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalMessageResponder<>(decoder, encoder, handler));
- }
-
- @Override
- public <M> void addSubscriber(MessageSubject subject,
- Function<byte[], M> decoder,
- Consumer<M> handler,
- Executor executor) {
- checkPermission(CLUSTER_WRITE);
- messagingService.registerHandler(getType(subject),
- new InternalMessageConsumer<>(decoder, handler),
- executor);
- }
-
- /**
- * Performs the timed function, returning the value it would while timing the operation.
- *
- * @param timedFunction the function to be timed
- * @param meter the metering agent to be used to time the function
- * @param opName the opname to be used when starting the meter
- * @param <A> The param type of the function
- * @param <B> The return type of the function
- * @return the value returned by the timed function
- */
- private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
- MeteringAgent meter, String opName) {
- checkNotNull(timedFunction);
- checkNotNull(meter);
- checkNotNull(opName);
- return new Function<A, B>() {
- @Override
- public B apply(A a) {
- final MeteringAgent.Context context = meter.startTimer(opName);
- B result = null;
- try {
- result = timedFunction.apply(a);
- context.stop(null);
- return result;
- } catch (Exception e) {
- context.stop(e);
- Throwables.propagate(e);
- return null;
- }
- }
- };
- }
-
-
- private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
- private ClusterMessageHandler handler;
-
- public InternalClusterMessageHandler(ClusterMessageHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public byte[] apply(Endpoint sender, byte[] bytes) {
- ClusterMessage message = ClusterMessage.fromBytes(bytes);
- handler.handle(message);
- return message.response();
- }
- }
-
- private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
- private final Function<byte[], M> decoder;
- private final Function<R, byte[]> encoder;
- private final Function<M, CompletableFuture<R>> handler;
-
- public InternalMessageResponder(Function<byte[], M> decoder,
- Function<R, byte[]> encoder,
- Function<M, CompletableFuture<R>> handler) {
- this.decoder = decoder;
- this.encoder = encoder;
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
- return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload())).
- thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
- }
- }
-
- private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
- private final Function<byte[], M> decoder;
- private final Consumer<M> consumer;
-
- public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
- this.decoder = decoder;
- this.consumer = consumer;
- }
-
- @Override
- public void accept(Endpoint sender, byte[] bytes) {
- consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
- apply(ClusterMessage.fromBytes(bytes).payload()));
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 4602702..868006b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-present Open Networking Foundation
+ * Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,24 +15,326 @@
*/
package org.onosproject.store.cluster.messaging.impl;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.VersionService;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
@Component(immediate = true)
@Service
-public class ClusterCommunicationManager extends AbstractClusterCommunicationManager {
+public class ClusterCommunicationManager implements ClusterCommunicationService {
- private static final char VERSION_SEP = '-';
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+ private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+ private static final String PRIMITIVE_NAME = "clusterCommunication";
+ private static final String SUBJECT_PREFIX = "subject";
+ private static final String ENDPOINT_PREFIX = "endpoint";
+
+ private static final String SERIALIZING = "serialization";
+ private static final String DESERIALIZING = "deserialization";
+ private static final String NODE_PREFIX = "node:";
+ private static final String ROUND_TRIP_SUFFIX = ".rtt";
+ private static final String ONE_WAY_SUFFIX = ".oneway";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private VersionService versionService;
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MessagingService messagingService;
+
+ private NodeId localNodeId;
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
@Override
- protected String getType(MessageSubject subject) {
- return subject.value() + VERSION_SEP + versionService.version().toString();
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+ ).getBytes();
+ return doUnicast(subject, payload, toNodeId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ checkPermission(CLUSTER_WRITE);
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+ .getBytes();
+ nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ checkPermission(CLUSTER_WRITE);
+ try {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+ apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+ thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
+ return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
+ }
+
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ ControllerNode node = clusterService.getNode(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+ MeteringAgent.Context epContext = endpointMeteringAgent.
+ startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+ MeteringAgent.Context subjectContext = subjectMeteringAgent.
+ startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+ return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
+ whenComplete((bytes, throwable) -> {
+ subjectContext.stop(throwable);
+ epContext.stop(throwable);
+ });
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber,
+ ExecutorService executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalClusterMessageHandler(subscriber),
+ executor);
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.unregisterHandler(subject.toString());
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+ CompletableFuture<R> responseFuture = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ responseFuture.complete(handler.apply(m));
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ });
+ return responseFuture;
+ }));
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalMessageResponder<>(decoder, encoder, handler));
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor) {
+ checkPermission(CLUSTER_WRITE);
+ messagingService.registerHandler(subject.toString(),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ /**
+ * Performs the timed function, returning the value it would while timing the operation.
+ *
+ * @param timedFunction the function to be timed
+ * @param meter the metering agent to be used to time the function
+ * @param opName the opname to be used when starting the meter
+ * @param <A> The param type of the function
+ * @param <B> The return type of the function
+ * @return the value returned by the timed function
+ */
+ private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+ MeteringAgent meter, String opName) {
+ checkNotNull(timedFunction);
+ checkNotNull(meter);
+ checkNotNull(opName);
+ return new Function<A, B>() {
+ @Override
+ public B apply(A a) {
+ final MeteringAgent.Context context = meter.startTimer(opName);
+ B result = null;
+ try {
+ result = timedFunction.apply(a);
+ context.stop(null);
+ return result;
+ } catch (Exception e) {
+ context.stop(e);
+ Throwables.propagate(e);
+ return null;
+ }
+ }
+ };
+ }
+
+
+ private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
+ private ClusterMessageHandler handler;
+
+ public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public byte[] apply(Endpoint sender, byte[] bytes) {
+ ClusterMessage message = ClusterMessage.fromBytes(bytes);
+ handler.handle(message);
+ return message.response();
+ }
+ }
+
+ private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, CompletableFuture<R>> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, CompletableFuture<R>> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
+ return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload())).
+ thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(Endpoint sender, byte[] bytes) {
+ consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+ apply(ClusterMessage.fromBytes(bytes).payload()));
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
deleted file mode 100644
index 45c84af..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-@Component(immediate = true)
-@Service
-public class UnifiedClusterCommunicationManager
- extends AbstractClusterCommunicationManager
- implements UnifiedClusterCommunicationService {
- @Override
- protected String getType(MessageSubject subject) {
- return subject.value();
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index 29045a2..80753f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -25,12 +25,12 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -70,10 +70,10 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
+ protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index 1bfaaed..9d6a016 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -15,21 +15,21 @@
*/
package org.onosproject.store.primitives.impl;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.MembershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,8 +38,8 @@
*/
public class EventuallyConsistentMapBuilderImpl<K, V>
implements EventuallyConsistentMapBuilder<K, V> {
- private final MembershipService clusterService;
- private final ClusterCommunicator clusterCommunicator;
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
private String name;
private KryoNamespace serializer;
@@ -64,9 +64,10 @@
* @param clusterCommunicator cluster communication service
* @param persistenceService persistence service
*/
- public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
- ClusterCommunicator clusterCommunicator,
- PersistenceService persistenceService) {
+ public EventuallyConsistentMapBuilderImpl(
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ PersistenceService persistenceService) {
this.persistenceService = persistenceService;
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 461245e..462d4ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -15,34 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.AbstractAccumulator;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.MembershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.LogicalTimestamp;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DistributedPrimitive;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -67,6 +39,34 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SlidingWindowCounter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -86,8 +86,8 @@
private final Map<K, MapValue<V>> items;
- private final MembershipService clusterService;
- private final ClusterCommunicator clusterCommunicator;
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
private final Serializer serializer;
private final NodeId localNodeId;
private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
* @param persistenceService persistence service
*/
EventuallyConsistentMapImpl(String mapName,
- MembershipService clusterService,
- ClusterCommunicator clusterCommunicator,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
KryoNamespace ns,
BiFunction<K, V, Timestamp> timestampProvider,
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 4a92682..5bbd681 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -45,7 +45,7 @@
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
@@ -71,7 +71,7 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService metadataService;
@@ -103,24 +103,24 @@
Version targetVersion = upgradeService.getState().target();
currentClusterMetadata.get()
.getPartitions()
- .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
- partition,
- sourceVersion,
- null,
- clusterCommunicator,
- clusterService,
- new File(System.getProperty("karaf.data") +
- "/partitions/" + sourceVersion + "/" + partition.getId()))));
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
- partition,
- targetVersion,
- sourceVersion,
- clusterCommunicator,
- clusterService,
- new File(System.getProperty("karaf.data") +
- "/partitions/" + targetVersion + "/" + partition.getId()))));
+ .forEach(partition -> {
+ inactivePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ sourceVersion,
+ null,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + sourceVersion + "/" + partition.getId())));
+ activePartitions.put(partition.getId(), new StoragePartition(
+ partition,
+ targetVersion,
+ sourceVersion,
+ clusterCommunicator,
+ clusterService,
+ new File(System.getProperty("karaf.data") +
+ "/partitions/" + targetVersion + "/" + partition.getId())));
+ });
// We have to fork existing partitions before we can start inactive partition servers to
// avoid duplicate message handlers when both servers are running.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 8bae2a3..ecba56d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,7 +40,7 @@
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.Serializer;
/**
@@ -51,7 +51,7 @@
public RaftClientCommunicator(
String prefix,
Serializer serializer,
- ClusterCommunicator clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 765eb02..1117ab9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.cluster.MemberId;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
public abstract class RaftCommunicator {
protected final RaftMessageContext context;
protected final Serializer serializer;
- protected final ClusterCommunicator clusterCommunicator;
+ protected final ClusterCommunicationService clusterCommunicator;
public RaftCommunicator(
RaftMessageContext context,
Serializer serializer,
- ClusterCommunicator clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
this.context = checkNotNull(context, "context cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 2710a2c..097ee46 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -57,7 +57,6 @@
import io.atomix.protocols.raft.session.SessionId;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
import org.onosproject.store.service.Serializer;
/**
@@ -68,7 +67,7 @@
public RaftServerCommunicator(
String prefix,
Serializer serializer,
- ClusterCommunicator clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 59c4b17..b73dd61 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -29,10 +29,10 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.cluster.UnifiedClusterService;
import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionService;
@@ -81,10 +81,10 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterService clusterService;
+ protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UnifiedClusterCommunicationService clusterCommunicator;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PersistenceService persistenceService;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 414c49c..5458edd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,12 +29,12 @@
import com.google.common.collect.ImmutableMap;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -54,8 +54,8 @@
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final UnifiedClusterCommunicationService clusterCommunicator;
- private final MembershipService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
+ private final ClusterService clusterService;
private final Version version;
private final Version source;
private final File dataFolder;
@@ -85,8 +85,8 @@
Partition partition,
Version version,
Version source,
- UnifiedClusterCommunicationService clusterCommunicator,
- MembershipService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService,
File dataFolder) {
this.partition = partition;
this.version = version;
@@ -191,6 +191,10 @@
return source != null ?
clusterService.getNodes()
.stream()
+ .filter(node -> {
+ Version nodeVersion = clusterService.getVersion(node.id());
+ return nodeVersion != null && nodeVersion.equals(version);
+ })
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList()) :
Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
@@ -202,6 +206,10 @@
} else {
return clusterService.getNodes()
.stream()
+ .filter(node -> {
+ Version nodeVersion = clusterService.getVersion(node.id());
+ return nodeVersion != null && nodeVersion.equals(version);
+ })
.map(node -> MemberId.from(node.id().id()))
.collect(Collectors.toList());
}
@@ -231,13 +239,10 @@
clusterCommunicator);
CompletableFuture<Void> future;
- if (clusterService.getNodes().size() == 1) {
+ if (getMemberIds().size() == 1) {
future = server.fork(version);
} else {
- future = server.join(clusterService.getNodes().stream()
- .filter(node -> !node.id().equals(localNodeId))
- .map(node -> MemberId.from(node.id().id()))
- .collect(Collectors.toList()));
+ future = server.join(getMemberIds());
}
return future.thenRun(() -> this.server = server);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 123c3b6..3a15fce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -28,7 +28,7 @@
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@@ -49,13 +49,13 @@
private final MemberId localMemberId;
private final StoragePartition partition;
- private final UnifiedClusterCommunicationService clusterCommunicator;
+ private final ClusterCommunicationService clusterCommunicator;
private RaftServer server;
public StoragePartitionServer(
StoragePartition partition,
MemberId localMemberId,
- UnifiedClusterCommunicationService clusterCommunicator) {
+ ClusterCommunicationService clusterCommunicator) {
this.partition = partition;
this.localMemberId = localMemberId;
this.clusterCommunicator = clusterCommunicator;