Code clean up: Removed unused code. Fixed comments. Renamed some files.
Change-Id: I78ca1f4a973c3b5356f749680ebe0f4ccde01279
(cherry picked from commit 78be249d5219805d488095ef62603d040394dd99)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
deleted file mode 100644
index 3c1be0a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cfg;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cfg.ComponentConfigEvent;
-import org.onosproject.cfg.ComponentConfigStore;
-import org.onosproject.cfg.ComponentConfigStoreDelegate;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_SET;
-import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_UNSET;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of component configurations in a distributed data store
- * that uses optimistic replication and gossip based anti-entropy techniques.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class GossipComponentConfigStore
- extends AbstractStore<ComponentConfigEvent, ComponentConfigStoreDelegate>
- implements ComponentConfigStore {
-
- private static final String SEP = "#";
-
- private final Logger log = getLogger(getClass());
-
- private EventuallyConsistentMap<String, String> properties;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LogicalClockService clockService;
-
- @Activate
- public void activate() {
- KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API);
-
- properties = storageService.<String, String>eventuallyConsistentMapBuilder()
- .withName("cfg")
- .withSerializer(serializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp())
- .build();
-
- properties.addListener(new InternalPropertiesListener());
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- properties.destroy();
- log.info("Stopped");
- }
-
- @Override
- public void setProperty(String componentName, String name, String value) {
- properties.put(key(componentName, name), value);
-
- }
-
- @Override
- public void unsetProperty(String componentName, String name) {
- properties.remove(key(componentName, name));
- }
-
- /**
- * Listener to component configuration properties distributed map changes.
- */
- private final class InternalPropertiesListener
- implements EventuallyConsistentMapListener<String, String> {
-
- @Override
- public void event(EventuallyConsistentMapEvent<String, String> event) {
- String[] keys = event.key().split(SEP);
- String value = event.value();
- if (event.type() == PUT) {
- delegate.notify(new ComponentConfigEvent(PROPERTY_SET, keys[0], keys[1], value));
- } else if (event.type() == REMOVE) {
- delegate.notify(new ComponentConfigEvent(PROPERTY_UNSET, keys[0], keys[1], null));
- }
- }
- }
-
- // Generates a key from component name and property name.
- private String key(String componentName, String name) {
- return componentName + SEP + name;
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManagementMessageSubjects.java
deleted file mode 100644
index 6cc3023..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManagementMessageSubjects.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-//Not used right now
-public final class ClusterManagementMessageSubjects {
- // avoid instantiation
- private ClusterManagementMessageSubjects() {}
-
- public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterMembershipEvent.java
deleted file mode 100644
index af628a5..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterMembershipEvent.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import org.onosproject.cluster.ControllerNode;
-
-//Not used right now
-/**
- * Contains information that will be published when a cluster membership event occurs.
- */
-public class ClusterMembershipEvent {
-
- private final ClusterMembershipEventType type;
- private final ControllerNode node;
-
- public ClusterMembershipEvent(ClusterMembershipEventType type, ControllerNode node) {
- this.type = type;
- this.node = node;
- }
-
- public ClusterMembershipEventType type() {
- return type;
- }
-
- public ControllerNode node() {
- return node;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterMembershipEventType.java
deleted file mode 100644
index 1714e8c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterMembershipEventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-//Not used right now
-public enum ClusterMembershipEventType {
- NEW_MEMBER,
- LEAVING_MEMBER,
- UNREACHABLE_MEMBER,
- HEART_BEAT,
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterNodesDelegate.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterNodesDelegate.java
deleted file mode 100644
index 4e4a8be..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterNodesDelegate.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onlab.packet.IpAddress;
-
-// Not used right now
-/**
- * Simple back interface through which connection manager can interact with
- * the cluster store.
- */
-public interface ClusterNodesDelegate {
-
- /**
- * Notifies about cluster node coming online.
- *
- * @param nodeId newly detected cluster node id
- * @param ip node IP listen address
- * @param tcpPort node TCP listen port
- * @return the controller node
- */
- DefaultControllerNode nodeDetected(NodeId nodeId, IpAddress ip,
- int tcpPort);
-
- /**
- * Notifies about cluster node going offline.
- *
- * @param nodeId identifier of the cluster node that vanished
- */
- void nodeVanished(NodeId nodeId);
-
- /**
- * Notifies about remote request to remove node from cluster.
- *
- * @param nodeId identifier of the cluster node that was removed
- */
- void nodeRemoved(NodeId nodeId);
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index b81f4de..58dffce 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -17,10 +17,9 @@
import static org.slf4j.LoggerFactory.getLogger;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Objects;
+import java.util.function.Consumer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -29,39 +28,28 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
/**
- * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
+ * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
+ * primitive.
*/
@Service
-@Component(immediate = true, enabled = false)
+@Component(immediate = true, enabled = true)
public class DistributedLeadershipStore
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
- private static final Logger log = getLogger(DistributedLeadershipStore.class);
+ private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -69,20 +57,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- protected NodeId localNodeId;
- protected ConsistentMap<String, InternalLeadership> leadershipMap;
- protected Map<String, Versioned<InternalLeadership>> leadershipCache = Maps.newConcurrentMap();
+ private NodeId localNodeId;
+ private LeaderElector leaderElector;
- private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
- event -> {
- Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
- Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
- boolean leaderChanged =
- !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
- boolean candidatesChanged =
- !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
- ImmutableSet.<NodeId>of() : oldValue.candidates()),
- Sets.newHashSet(newValue.candidates())).isEmpty();
+ private final Consumer<Change<Leadership>> leadershipChangeListener =
+ change -> {
+ Leadership oldValue = change.oldValue();
+ Leadership newValue = change.newValue();
+ boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
+ boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
@@ -93,193 +76,58 @@
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
- leadershipCache.compute(event.key(), (k, v) -> {
- if (v == null || v.version() < event.newValue().version()) {
- return event.newValue();
- }
- return v;
- });
- notifyDelegate(new LeadershipEvent(eventType, newValue));
+ notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
};
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
- leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
- .withName("onos-leadership")
- .withPartitionsDisabled()
- .withRelaxedReadConsistency()
- .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
- .build();
- leadershipMap.entrySet().forEach(e -> leadershipCache.put(e.getKey(), e.getValue()));
- leadershipMap.addListener(leadershipChangeListener);
+ leaderElector = storageService.leaderElectorBuilder()
+ .withName("onos-leadership-elections")
+ .build()
+ .asLeaderElector();
+ leaderElector.addChangeListener(leadershipChangeListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
- leadershipMap.removeListener(leadershipChangeListener);
+ leaderElector.removeChangeListener(leadershipChangeListener);
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
- Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
- v -> v == null || !v.candidates().contains(localNodeId),
- (k, v) -> {
- if (v == null || v.candidates().isEmpty()) {
- return new InternalLeadership(topic,
- localNodeId,
- v == null ? 1 : v.term() + 1,
- System.currentTimeMillis(),
- ImmutableList.of(localNodeId));
- }
- List<NodeId> newCandidates = new ArrayList<>(v.candidates());
- newCandidates.add(localNodeId);
- return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
- });
- return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
+ return leaderElector.run(topic, localNodeId);
}
@Override
public void removeRegistration(String topic) {
- removeRegistration(topic, localNodeId);
- }
-
- private void removeRegistration(String topic, NodeId nodeId) {
- leadershipMap.computeIf(topic,
- v -> v != null && v.candidates().contains(nodeId),
- (k, v) -> {
- List<NodeId> newCandidates = v.candidates()
- .stream()
- .filter(id -> !nodeId.equals(id))
- .collect(Collectors.toList());
- NodeId newLeader = nodeId.equals(v.leader()) ?
- newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
- long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
- v.term() : v.term() + 1;
- long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
- v.termStartTime() : System.currentTimeMillis();
- return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
- });
+ leaderElector.withdraw(topic);
}
@Override
public void removeRegistration(NodeId nodeId) {
- leadershipMap.entrySet()
- .stream()
- .filter(e -> e.getValue().value().candidates().contains(nodeId))
- .map(e -> e.getKey())
- .forEach(topic -> this.removeRegistration(topic, nodeId));
+ leaderElector.evict(nodeId);
}
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
- Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
- v -> v != null &&
- v.candidates().contains(toNodeId) &&
- !Objects.equal(v.leader(), toNodeId),
- (k, v) -> {
- List<NodeId> newCandidates = new ArrayList<>();
- newCandidates.add(toNodeId);
- newCandidates.addAll(v.candidates()
- .stream()
- .filter(id -> !toNodeId.equals(id))
- .collect(Collectors.toList()));
- return new InternalLeadership(topic,
- toNodeId,
- v.term() + 1,
- System.currentTimeMillis(),
- newCandidates);
- });
- return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
+ return leaderElector.anoint(topic, toNodeId);
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
- Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
- v -> v != null &&
- v.candidates().contains(nodeId) &&
- !v.candidates().get(0).equals(nodeId),
- (k, v) -> {
- List<NodeId> newCandidates = new ArrayList<>();
- newCandidates.add(nodeId);
- newCandidates.addAll(v.candidates()
- .stream()
- .filter(id -> !nodeId.equals(id))
- .collect(Collectors.toList()));
- return new InternalLeadership(topic,
- v.leader(),
- v.term(),
- System.currentTimeMillis(),
- newCandidates);
- });
- return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
+ return leaderElector.promote(topic, nodeId);
}
@Override
public Leadership getLeadership(String topic) {
- InternalLeadership internalLeadership = Versioned.valueOrNull(leadershipMap.get(topic));
- return internalLeadership == null ? null : internalLeadership.asLeadership();
+ return leaderElector.getLeadership(topic);
}
@Override
public Map<String, Leadership> getLeaderships() {
- return ImmutableMap.copyOf(Maps.transformValues(leadershipCache, v -> v.value().asLeadership()));
- }
-
- private static class InternalLeadership {
- private final String topic;
- private final NodeId leader;
- private final long term;
- private final long termStartTime;
- private final List<NodeId> candidates;
-
- public InternalLeadership(String topic,
- NodeId leader,
- long term,
- long termStartTime,
- List<NodeId> candidates) {
- this.topic = topic;
- this.leader = leader;
- this.term = term;
- this.termStartTime = termStartTime;
- this.candidates = ImmutableList.copyOf(candidates);
- }
-
- public NodeId leader() {
- return this.leader;
- }
-
- public long term() {
- return term;
- }
-
- public long termStartTime() {
- return termStartTime;
- }
-
- public List<NodeId> candidates() {
- return candidates;
- }
-
- public Leadership asLeadership() {
- return new Leadership(topic, leader == null ?
- null : new Leader(leader, term, termStartTime), candidates);
- }
-
- public static Leadership toLeadership(InternalLeadership internalLeadership) {
- return internalLeadership == null ? null : internalLeadership.asLeadership();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("leader", leader)
- .add("term", term)
- .add("termStartTime", termStartTime)
- .add("candidates", candidates)
- .toString();
- }
+ return leaderElector.getLeaderships();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java
deleted file mode 100644
index 2af8a8a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.Consumer;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipStore;
-import org.onosproject.cluster.LeadershipStoreDelegate;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.service.LeaderElector;
-import org.onosproject.store.service.StorageService;
-import org.slf4j.Logger;
-
-/**
- * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
- * primitive.
- */
-@Service
-@Component(immediate = true, enabled = true)
-public class NewDistributedLeadershipStore
- extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
- implements LeadershipStore {
-
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- private NodeId localNodeId;
- private LeaderElector leaderElector;
-
- private final Consumer<Change<Leadership>> leadershipChangeListener =
- change -> {
- Leadership oldValue = change.oldValue();
- Leadership newValue = change.newValue();
- boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
- boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
- LeadershipEvent.Type eventType = null;
- if (leaderChanged && candidatesChanged) {
- eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
- }
- if (leaderChanged && !candidatesChanged) {
- eventType = LeadershipEvent.Type.LEADER_CHANGED;
- }
- if (!leaderChanged && candidatesChanged) {
- eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
- }
- notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
- };
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
- leaderElector = storageService.leaderElectorBuilder()
- .withName("onos-leadership-elections")
- .build()
- .asLeaderElector();
- leaderElector.addChangeListener(leadershipChangeListener);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- leaderElector.removeChangeListener(leadershipChangeListener);
- log.info("Stopped");
- }
-
- @Override
- public Leadership addRegistration(String topic) {
- return leaderElector.run(topic, localNodeId);
- }
-
- @Override
- public void removeRegistration(String topic) {
- leaderElector.withdraw(topic);
- }
-
- @Override
- public void removeRegistration(NodeId nodeId) {
- leaderElector.evict(nodeId);
- }
-
- @Override
- public boolean moveLeadership(String topic, NodeId toNodeId) {
- return leaderElector.anoint(topic, toNodeId);
- }
-
- @Override
- public boolean makeTopCandidate(String topic, NodeId nodeId) {
- return leaderElector.promote(topic, nodeId);
- }
-
- @Override
- public Leadership getLeadership(String topic) {
- return leaderElector.getLeadership(topic);
- }
-
- @Override
- public Map<String, Leadership> getLeaderships() {
- return leaderElector.getLeaderships();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NodeInfo.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NodeInfo.java
deleted file mode 100644
index cdeacba..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NodeInfo.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import java.util.Objects;
-
-import org.onosproject.cluster.ControllerNode;
-
-/**
- * Node info read from configuration files during bootstrap.
- */
-public final class NodeInfo {
- private final String id;
- private final String ip;
- private final int tcpPort;
-
- private NodeInfo(String id, String ip, int port) {
- this.id = id;
- this.ip = ip;
- this.tcpPort = port;
- }
-
- /*
- * Needed for serialization.
- */
- private NodeInfo() {
- id = null;
- ip = null;
- tcpPort = 0;
- }
-
- /**
- * Creates a new instance.
- * @param id node id
- * @param ip node ip address
- * @param port tcp port
- * @return NodeInfo
- */
- public static NodeInfo from(String id, String ip, int port) {
- NodeInfo node = new NodeInfo(id, ip, port);
- return node;
- }
-
- /**
- * Returns the NodeInfo for a controller node.
- * @param node controller node
- * @return NodeInfo
- */
- public static NodeInfo of(ControllerNode node) {
- return NodeInfo.from(node.id().toString(), node.ip().toString(), node.tcpPort());
- }
-
- /**
- * Returns node id.
- * @return node id
- */
- public String getId() {
- return id;
- }
-
- /**
- * Returns node ip.
- * @return node ip
- */
- public String getIp() {
- return ip;
- }
-
- /**
- * Returns node port.
- * @return port
- */
- public int getTcpPort() {
- return tcpPort;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, ip, tcpPort);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o instanceof NodeInfo) {
- NodeInfo that = (NodeInfo) o;
- return Objects.equals(this.id, that.id) &&
- Objects.equals(this.ip, that.ip) &&
- Objects.equals(this.tcpPort, that.tcpPort);
- }
- return false;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("id", id)
- .add("ip", ip)
- .add("tcpPort", tcpPort).toString();
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/package-info.java
index 116be3a..01c7bc7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of a distributed cluster node store using Hazelcast.
+ * Implementation of a distributed cluster membership store and failure detector.
*/
package org.onosproject.store.cluster.impl;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/config/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/config/impl/package-info.java
index 614779d..3b0871f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/config/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/config/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of the network configuration distributed store.
+ * Implementation of the distributed network configuration store.
*/
package org.onosproject.store.config.impl;
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java
deleted file mode 100644
index 6b0f3e4..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentApplicationIdStore.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.core.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Map;
-import java.util.Set;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.ApplicationIdStore;
-import org.onosproject.core.DefaultApplicationId;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicCounter;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageException;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-/**
- * ApplicationIdStore implementation on top of {@code AtomicCounter}
- * and {@code ConsistentMap} primitives.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class ConsistentApplicationIdStore implements ApplicationIdStore {
-
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- private AtomicCounter appIdCounter;
- private ConsistentMap<String, ApplicationId> registeredIds;
- private Map<String, ApplicationId> nameToAppIdCache = Maps.newConcurrentMap();
- private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap();
-
- private static final Serializer SERIALIZER = Serializer.using(new KryoNamespace.Builder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .build());
-
- @Activate
- public void activate() {
- appIdCounter = storageService.getAtomicCounter("onos-app-id-counter");
-
- registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
- .withName("onos-app-ids")
- .withSerializer(SERIALIZER)
- .build();
-
- primeAppIds();
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public Set<ApplicationId> getAppIds() {
- // TODO: Rework this when we have notification support in ConsistentMap.
- primeAppIds();
- return ImmutableSet.copyOf(nameToAppIdCache.values());
- }
-
- @Override
- public ApplicationId getAppId(Short id) {
- if (!idToAppIdCache.containsKey(id)) {
- primeAppIds();
- }
- return idToAppIdCache.get(id);
- }
-
- @Override
- public ApplicationId getAppId(String name) {
- ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> {
- Versioned<ApplicationId> existingAppId = registeredIds.get(key);
- return existingAppId != null ? existingAppId.value() : null;
- });
- if (appId != null) {
- idToAppIdCache.putIfAbsent(appId.id(), appId);
- }
- return appId;
- }
-
- @Override
- public ApplicationId registerApplication(String name) {
- ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> {
- Versioned<ApplicationId> existingAppId = registeredIds.get(name);
- if (existingAppId == null) {
- int id = Tools.retryable(appIdCounter::incrementAndGet, StorageException.class, 1, 2000)
- .get()
- .intValue();
- DefaultApplicationId newAppId = new DefaultApplicationId(id, name);
- existingAppId = registeredIds.putIfAbsent(name, newAppId);
- if (existingAppId != null) {
- return existingAppId.value();
- } else {
- return newAppId;
- }
- } else {
- return existingAppId.value();
- }
- });
- idToAppIdCache.putIfAbsent(appId.id(), appId);
- return appId;
- }
-
- private void primeAppIds() {
- registeredIds.values()
- .stream()
- .map(Versioned::value)
- .forEach(appId -> {
- nameToAppIdCache.putIfAbsent(appId.name(), appId);
- idToAppIdCache.putIfAbsent(appId.id(), appId);
- });
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
new file mode 100644
index 0000000..cc98ac5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.core.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+
+import java.util.Map;
+import java.util.Set;
+
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.ApplicationIdStore;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+/**
+ * ApplicationIdStore implementation on top of {@code AtomicCounter}
+ * and {@code ConsistentMap} primitives.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedApplicationIdStore implements ApplicationIdStore {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private AtomicCounter appIdCounter;
+ private ConsistentMap<String, ApplicationId> registeredIds;
+ private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap();
+ private MapEventListener<String, ApplicationId> mapEventListener = event -> {
+ if (event.type() == MapEvent.Type.INSERT) {
+ idToAppIdCache.put(event.newValue().value().id(), event.newValue().value());
+ }
+ };
+
+ @Activate
+ public void activate() {
+ appIdCounter = storageService.getAtomicCounter("onos-app-id-counter");
+
+ registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
+ .withName("onos-app-ids")
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .withRelaxedReadConsistency()
+ .build();
+
+ primeIdToAppIdCache();
+ registeredIds.addListener(mapEventListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ registeredIds.removeListener(mapEventListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<ApplicationId> getAppIds() {
+ return ImmutableSet.copyOf(registeredIds.asJavaMap().values());
+ }
+
+ @Override
+ public ApplicationId getAppId(Short id) {
+ if (!idToAppIdCache.containsKey(id)) {
+ primeIdToAppIdCache();
+ }
+ return idToAppIdCache.get(id);
+ }
+
+ @Override
+ public ApplicationId getAppId(String name) {
+ return registeredIds.asJavaMap().get(name);
+ }
+
+ @Override
+ public ApplicationId registerApplication(String name) {
+ return Versioned.valueOrNull(registeredIds.computeIfAbsent(name,
+ key -> new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name)));
+ }
+
+ private void primeIdToAppIdCache() {
+ registeredIds.asJavaMap()
+ .values()
+ .forEach(appId -> {
+ idToAppIdCache.putIfAbsent(appId.id(), appId);
+ });
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentIdBlockStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedIdBlockStore.java
similarity index 96%
rename from core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentIdBlockStore.java
rename to core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedIdBlockStore.java
index a68dee8..be2389a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/ConsistentIdBlockStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedIdBlockStore.java
@@ -38,7 +38,7 @@
*/
@Component(immediate = true, enabled = true)
@Service
-public class ConsistentIdBlockStore implements IdBlockStore {
+public class DistributedIdBlockStore implements IdBlockStore {
private final Logger log = getLogger(getClass());
private final Map<String, AtomicCounter> topicCounters = Maps.newConcurrentMap();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java
index 5c00782..e9956de 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/LogicalClockManager.java
@@ -34,7 +34,7 @@
import static org.onosproject.security.AppPermission.Type.CLOCK_WRITE;
/**
- * LogicalClockService implementation based on a AtomicCounter.
+ * LogicalClockService implementation based on a {@link AtomicCounter}.
*/
@Component(immediate = true, enabled = true)
@Service
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/package-info.java
index 991f031..5633729 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of a distributed application ID registry store using Hazelcast.
+ * Implementation of a distributed application registry.
*/
package org.onosproject.store.core.impl;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/package-info.java
index f644360..e658a9f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/package-info.java
@@ -14,6 +14,6 @@
* limitations under the License.
*/
/**
- * Implementation of the group store.
+ * Implementation of a distributed group store.
*/
package org.onosproject.store.group.impl;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/package-info.java
index 5610af4..c75a978 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of the distributed host store using p2p synchronization protocol.
+ * Implementation of a distributed host store.
*/
package org.onosproject.store.host.impl;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
deleted file mode 100644
index c5b6bbd..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ /dev/null
@@ -1,950 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.AnnotationsUtil;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DefaultLink;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.Link;
-import org.onosproject.net.Link.Type;
-import org.onosproject.net.LinkKey;
-import org.onosproject.net.SparseAnnotations;
-import org.onosproject.net.device.DeviceClockService;
-import org.onosproject.net.link.DefaultLinkDescription;
-import org.onosproject.net.link.LinkDescription;
-import org.onosproject.net.link.LinkEvent;
-import org.onosproject.net.link.LinkStore;
-import org.onosproject.net.link.LinkStoreDelegate;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.Timestamped;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
-import org.slf4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.minPriority;
-import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.onosproject.net.DefaultAnnotations.union;
-import static org.onosproject.net.Link.State.ACTIVE;
-import static org.onosproject.net.Link.State.INACTIVE;
-import static org.onosproject.net.Link.Type.DIRECT;
-import static org.onosproject.net.Link.Type.INDIRECT;
-import static org.onosproject.net.LinkKey.linkKey;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
-import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
-import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of infrastructure links in distributed data store
- * that uses optimistic replication and gossip based techniques.
- */
-//@Component(immediate = true, enabled = false)
-@Service
-public class GossipLinkStore
- extends AbstractStore<LinkEvent, LinkStoreDelegate>
- implements LinkStore {
-
- // Timeout in milliseconds to process links on remote master node
- private static final int REMOTE_MASTER_TIMEOUT = 1000;
-
- // Default delay for ScheduledExecutorService of anti-entropy(BackgroundExecutor)
- private static final long DEFAULT_INITIAL_DELAY = 5;
-
- // Default period for ScheduledExecutorService of anti-entropy(BackgroundExecutor)
- private static final long DEFAULT_PERIOD = 5;
-
- private static long initialDelaySec = DEFAULT_INITIAL_DELAY;
- private static long periodSec = DEFAULT_PERIOD;
-
- private final Logger log = getLogger(getClass());
-
- // Link inventory
- private final ConcurrentMap<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>> linkDescs =
- new ConcurrentHashMap<>();
-
- // Link instance cache
- private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
-
- // Egress and ingress link sets
- private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
- private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
-
- // Remove links
- private final Map<LinkKey, Timestamp> removedLinks = new ConcurrentHashMap<>();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceClockService deviceClockService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
-
- protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
- KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .register(InternalLinkEvent.class)
- .register(InternalLinkRemovedEvent.class)
- .register(LinkAntiEntropyAdvertisement.class)
- .register(LinkFragmentId.class)
- .register(LinkInjectedEvent.class)
- .build("GossipLink"));
-
- private ExecutorService executor;
-
- private ScheduledExecutorService backgroundExecutors;
-
- @Activate
- public void activate() {
-
- executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
-
- backgroundExecutors =
- newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
-
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_UPDATE,
- new InternalLinkEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_REMOVED,
- new InternalLinkRemovedEventListener(), executor);
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
- clusterCommunicator.addSubscriber(
- GossipLinkStoreMessageSubjects.LINK_INJECTED,
- new LinkInjectedEventListener(), executor);
-
- // start anti-entropy thread
- backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
- initialDelaySec, periodSec, TimeUnit.SECONDS);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
-
- executor.shutdownNow();
-
- backgroundExecutors.shutdownNow();
- try {
- if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
- log.error("Timeout during executor shutdown");
- }
- } catch (InterruptedException e) {
- log.error("Error during executor shutdown", e);
- }
-
- linkDescs.clear();
- links.clear();
- srcLinks.clear();
- dstLinks.clear();
- log.info("Stopped");
- }
-
- @Override
- public int getLinkCount() {
- return links.size();
- }
-
- @Override
- public Iterable<Link> getLinks() {
- return Collections.unmodifiableCollection(links.values());
- }
-
- @Override
- public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
- // lock for iteration
- synchronized (srcLinks) {
- return FluentIterable.from(srcLinks.get(deviceId))
- .transform(lookupLink())
- .filter(notNull())
- .toSet();
- }
- }
-
- @Override
- public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
- // lock for iteration
- synchronized (dstLinks) {
- return FluentIterable.from(dstLinks.get(deviceId))
- .transform(lookupLink())
- .filter(notNull())
- .toSet();
- }
- }
-
- @Override
- public Link getLink(ConnectPoint src, ConnectPoint dst) {
- return links.get(linkKey(src, dst));
- }
-
- @Override
- public Set<Link> getEgressLinks(ConnectPoint src) {
- Set<Link> egress = new HashSet<>();
- //
- // Change `srcLinks` to ConcurrentMap<DeviceId, (Concurrent)Set>
- // to remove this synchronized block, if we hit performance issue.
- // SetMultiMap#get returns wrapped collection to provide modifiable-view.
- // And the wrapped collection is not concurrent access safe.
- //
- // Our use case here does not require returned collection to be modifiable,
- // so the wrapped collection forces us to lock the whole multiset,
- // for benefit we don't need.
- //
- // Same applies to `dstLinks`
- synchronized (srcLinks) {
- for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
- if (linkKey.src().equals(src)) {
- Link link = links.get(linkKey);
- if (link != null) {
- egress.add(link);
- } else {
- log.debug("Egress link for {} was null, skipped", linkKey);
- }
- }
- }
- }
- return egress;
- }
-
- @Override
- public Set<Link> getIngressLinks(ConnectPoint dst) {
- Set<Link> ingress = new HashSet<>();
- synchronized (dstLinks) {
- for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
- if (linkKey.dst().equals(dst)) {
- Link link = links.get(linkKey);
- if (link != null) {
- ingress.add(link);
- } else {
- log.debug("Ingress link for {} was null, skipped", linkKey);
- }
- }
- }
- }
- return ingress;
- }
-
- @Override
- public LinkEvent createOrUpdateLink(ProviderId providerId,
- LinkDescription linkDescription) {
-
- final DeviceId dstDeviceId = linkDescription.dst().deviceId();
- final NodeId localNode = clusterService.getLocalNode().id();
- final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
-
- // Process link update only if we're the master of the destination node,
- // otherwise signal the actual master.
- LinkEvent linkEvent = null;
- if (localNode.equals(dstNode)) {
-
- Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
-
- final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
-
- LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
- final Timestamped<LinkDescription> mergedDesc;
- Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
-
- synchronized (map) {
- linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
- mergedDesc = map.get(providerId);
- }
-
- if (linkEvent != null) {
- log.debug("Notifying peers of a link update topology event from providerId: "
- + "{} between src: {} and dst: {}",
- providerId, linkDescription.src(), linkDescription.dst());
- notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
- }
-
- } else {
- // Only forward for ConfigProvider
- // Forwarding was added as a workaround for ONOS-490
- if (!providerId.scheme().equals("cfg")) {
- return null;
- }
- // FIXME Temporary hack for NPE (ONOS-1171).
- // Proper fix is to implement forwarding to master on ConfigProvider
- // redo ONOS-490
- if (dstNode == null) {
- // silently ignore
- return null;
- }
-
-
- LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
-
- // TODO check unicast return value
- clusterCommunicator.unicast(linkInjectedEvent,
- GossipLinkStoreMessageSubjects.LINK_INJECTED,
- SERIALIZER::encode,
- dstNode);
- }
-
- return linkEvent;
- }
-
- @Override
- public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
- Link link = getLink(src, dst);
- if (link == null) {
- return null;
- }
-
- if (link.isDurable()) {
- // FIXME: this is not the right thing to call for the gossip store; will not sync link state!!!
- return link.state() == INACTIVE ? null :
- updateLink(linkKey(link.src(), link.dst()), link,
- DefaultLink.builder()
- .providerId(link.providerId())
- .src(link.src())
- .dst(link.dst())
- .type(link.type())
- .state(INACTIVE)
- .isExpected(link.isExpected())
- .annotations(link.annotations())
- .build());
- }
- return removeLink(src, dst);
- }
-
- private LinkEvent createOrUpdateLinkInternal(
- ProviderId providerId,
- Timestamped<LinkDescription> linkDescription) {
-
- final LinkKey key = linkKey(linkDescription.value().src(),
- linkDescription.value().dst());
- Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
-
- synchronized (descs) {
- // if the link was previously removed, we should proceed if and
- // only if this request is more recent.
- Timestamp linkRemovedTimestamp = removedLinks.get(key);
- if (linkRemovedTimestamp != null) {
- if (linkDescription.isNewerThan(linkRemovedTimestamp)) {
- removedLinks.remove(key);
- } else {
- log.trace("Link {} was already removed ignoring.", key);
- return null;
- }
- }
-
- final Link oldLink = links.get(key);
- // update description
- createOrUpdateLinkDescription(descs, providerId, linkDescription);
- final Link newLink = composeLink(descs);
- if (oldLink == null) {
- return createLink(key, newLink);
- }
- return updateLink(key, oldLink, newLink);
- }
- }
-
- // Guarded by linkDescs value (=locking each Link)
- private Timestamped<LinkDescription> createOrUpdateLinkDescription(
- Map<ProviderId, Timestamped<LinkDescription>> descs,
- ProviderId providerId,
- Timestamped<LinkDescription> linkDescription) {
-
- // merge existing annotations
- Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
- if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
- log.trace("local info is more up-to-date, ignoring {}.", linkDescription);
- return null;
- }
- Timestamped<LinkDescription> newLinkDescription = linkDescription;
- if (existingLinkDescription != null) {
- // we only allow transition from INDIRECT -> DIRECT
- final Type newType;
- if (existingLinkDescription.value().type() == DIRECT) {
- newType = DIRECT;
- } else {
- newType = linkDescription.value().type();
- }
- SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
- linkDescription.value().annotations());
- newLinkDescription = new Timestamped<>(
- new DefaultLinkDescription(
- linkDescription.value().src(),
- linkDescription.value().dst(),
- newType,
- existingLinkDescription.value().isExpected(),
- merged),
- linkDescription.timestamp());
- }
- return descs.put(providerId, newLinkDescription);
- }
-
- // Creates and stores the link and returns the appropriate event.
- // Guarded by linkDescs value (=locking each Link)
- private LinkEvent createLink(LinkKey key, Link newLink) {
- links.put(key, newLink);
- srcLinks.put(newLink.src().deviceId(), key);
- dstLinks.put(newLink.dst().deviceId(), key);
- return new LinkEvent(LINK_ADDED, newLink);
- }
-
- // Updates, if necessary the specified link and returns the appropriate event.
- // Guarded by linkDescs value (=locking each Link)
- private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
- // Note: INDIRECT -> DIRECT transition only
- // so that BDDP discovered Link will not overwrite LDDP Link
- if (oldLink.state() != newLink.state() ||
- (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
- !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
-
- links.put(key, newLink);
- // strictly speaking following can be omitted
- srcLinks.put(oldLink.src().deviceId(), key);
- dstLinks.put(oldLink.dst().deviceId(), key);
- return new LinkEvent(LINK_UPDATED, newLink);
- }
- return null;
- }
-
- @Override
- public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
- final LinkKey key = linkKey(src, dst);
-
- DeviceId dstDeviceId = dst.deviceId();
- Timestamp timestamp = null;
- try {
- timestamp = deviceClockService.getTimestamp(dstDeviceId);
- } catch (IllegalStateException e) {
- log.debug("Failed to remove link {}, was not the master", key);
- // there are times when this is called before mastership
- // handoff correctly completes.
- return null;
- }
-
- LinkEvent event = removeLinkInternal(key, timestamp);
-
- if (event != null) {
- log.debug("Notifying peers of a link removed topology event for a link "
- + "between src: {} and dst: {}", src, dst);
- notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
- }
- return event;
- }
-
- private static Timestamped<LinkDescription> getPrimaryDescription(
- Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
-
- synchronized (linkDescriptions) {
- for (Entry<ProviderId, Timestamped<LinkDescription>>
- e : linkDescriptions.entrySet()) {
-
- if (!e.getKey().isAncillary()) {
- return e.getValue();
- }
- }
- }
- return null;
- }
-
-
- // TODO: consider slicing out as Timestamp utils
- /**
- * Checks is timestamp is more recent than timestamped object.
- *
- * @param timestamp to check if this is more recent then other
- * @param timestamped object to be tested against
- * @return true if {@code timestamp} is more recent than {@code timestamped}
- * or {@code timestamped is null}
- */
- private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
- checkNotNull(timestamp);
- if (timestamped == null) {
- return true;
- }
- return timestamp.compareTo(timestamped.timestamp()) > 0;
- }
-
- private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
- Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
- = getOrCreateLinkDescriptions(key);
-
- synchronized (linkDescriptions) {
- if (linkDescriptions.isEmpty()) {
- // never seen such link before. keeping timestamp for record
- removedLinks.put(key, timestamp);
- return null;
- }
- // accept removal request if given timestamp is newer than
- // the latest Timestamp from Primary provider
- Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
- if (!isMoreRecent(timestamp, prim)) {
- // outdated remove request, ignore
- return null;
- }
- removedLinks.put(key, timestamp);
- Link link = links.remove(key);
- linkDescriptions.clear();
- if (link != null) {
- srcLinks.remove(link.src().deviceId(), key);
- dstLinks.remove(link.dst().deviceId(), key);
- return new LinkEvent(LINK_REMOVED, link);
- }
- return null;
- }
- }
-
- /**
- * Creates concurrent readable, synchronized HashMultimap.
- *
- * @return SetMultimap
- */
- private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
- return synchronizedSetMultimap(
- Multimaps.newSetMultimap(new ConcurrentHashMap<>(),
- () -> Sets.newConcurrentHashSet()));
- }
-
- /**
- * @return primary ProviderID, or randomly chosen one if none exists
- */
- private static ProviderId pickBaseProviderId(
- Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
-
- ProviderId fallBackPrimary = null;
- for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
- if (!e.getKey().isAncillary()) {
- // found primary
- return e.getKey();
- } else if (fallBackPrimary == null) {
- // pick randomly as a fallback in case there is no primary
- fallBackPrimary = e.getKey();
- }
- }
- return fallBackPrimary;
- }
-
- // Guarded by linkDescs value (=locking each Link)
- private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
- ProviderId baseProviderId = pickBaseProviderId(descs);
- Timestamped<LinkDescription> base = descs.get(baseProviderId);
-
- ConnectPoint src = base.value().src();
- ConnectPoint dst = base.value().dst();
- Type type = base.value().type();
- DefaultAnnotations annotations = DefaultAnnotations.builder().build();
- annotations = merge(annotations, base.value().annotations());
-
- for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
- if (baseProviderId.equals(e.getKey())) {
- continue;
- }
-
- // Note: In the long run we should keep track of Description timestamp
- // and only merge conflicting keys when timestamp is newer
- // Currently assuming there will never be a key conflict between
- // providers
-
- // annotation merging. not so efficient, should revisit later
- annotations = merge(annotations, e.getValue().value().annotations());
- }
-
- //boolean isDurable = Objects.equals(annotations.value(AnnotationKeys.DURABLE), "true");
-
- // TEMP
- Link.State initialLinkState = base.value().isExpected() ? ACTIVE : INACTIVE;
- return DefaultLink.builder()
- .providerId(baseProviderId)
- .src(src)
- .dst(dst)
- .type(type)
- .state(initialLinkState)
- .isExpected(base.value().isExpected())
- .annotations(annotations)
- .build();
- }
-
- private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
- Map<ProviderId, Timestamped<LinkDescription>> r;
- r = linkDescs.get(key);
- if (r != null) {
- return r;
- }
- r = new HashMap<>();
- final Map<ProviderId, Timestamped<LinkDescription>> concurrentlyAdded;
- concurrentlyAdded = linkDescs.putIfAbsent(key, r);
- if (concurrentlyAdded != null) {
- return concurrentlyAdded;
- } else {
- return r;
- }
- }
-
- private final Function<LinkKey, Link> lookupLink = new LookupLink();
-
- /**
- * Returns a Function to lookup Link instance using LinkKey from cache.
- *
- * @return lookup link function
- */
- private Function<LinkKey, Link> lookupLink() {
- return lookupLink;
- }
-
- private final class LookupLink implements Function<LinkKey, Link> {
- @Override
- public Link apply(LinkKey input) {
- if (input == null) {
- return null;
- } else {
- return links.get(input);
- }
- }
- }
-
- private void notifyDelegateIfNotNull(LinkEvent event) {
- if (event != null) {
- notifyDelegate(event);
- }
- }
-
- private void broadcastMessage(MessageSubject subject, Object event) {
- clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
- }
-
- private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
- clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
- }
-
- private void notifyPeers(InternalLinkEvent event) {
- broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
- }
-
- private void notifyPeers(InternalLinkRemovedEvent event) {
- broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
- }
-
- // notify peer, silently ignoring error
- private void notifyPeer(NodeId peer, InternalLinkEvent event) {
- try {
- unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
- } catch (IOException e) {
- log.debug("Failed to notify peer {} with message {}", peer, event);
- }
- }
-
- // notify peer, silently ignoring error
- private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
- try {
- unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
- } catch (IOException e) {
- log.debug("Failed to notify peer {} with message {}", peer, event);
- }
- }
-
- /**
- * sets the time to delay first execution for anti-entropy.
- * (scheduleAtFixedRate of ScheduledExecutorService)
- *
- * @param delay the time to delay first execution for anti-entropy
- */
- private void setInitialDelaySec(long delay) {
- checkArgument(delay >= 0, "Initial delay of scheduleAtFixedRate() must be 0 or more");
- initialDelaySec = delay;
- }
-
- /**
- * sets the period between successive execution for anti-entropy.
- * (scheduleAtFixedRate of ScheduledExecutorService)
- *
- * @param period the period between successive execution for anti-entropy
- */
- private void setPeriodSec(long period) {
- checkArgument(period > 0, "Period of scheduleAtFixedRate() must be greater than 0");
- periodSec = period;
- }
-
- private final class SendAdvertisementTask implements Runnable {
-
- @Override
- public void run() {
- if (Thread.currentThread().isInterrupted()) {
- log.debug("Interrupted, quitting");
- return;
- }
-
- try {
- final NodeId self = clusterService.getLocalNode().id();
- Set<ControllerNode> nodes = clusterService.getNodes();
-
- ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
- .transform(toNodeId())
- .toList();
-
- if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
- log.trace("No other peers in the cluster.");
- return;
- }
-
- NodeId peer;
- do {
- int idx = RandomUtils.nextInt(0, nodeIds.size());
- peer = nodeIds.get(idx);
- } while (peer.equals(self));
-
- LinkAntiEntropyAdvertisement ad = createAdvertisement();
-
- if (Thread.currentThread().isInterrupted()) {
- log.debug("Interrupted, quitting");
- return;
- }
-
- try {
- unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
- } catch (IOException e) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- return;
- }
- } catch (Exception e) {
- // catch all Exception to avoid Scheduled task being suppressed.
- log.error("Exception thrown while sending advertisement", e);
- }
- }
- }
-
- private LinkAntiEntropyAdvertisement createAdvertisement() {
- final NodeId self = clusterService.getLocalNode().id();
-
- Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
- Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
-
- linkDescs.forEach((linkKey, linkDesc) -> {
- synchronized (linkDesc) {
- for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
- linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
- }
- }
- });
-
- linkTombstones.putAll(removedLinks);
-
- return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
- }
-
- private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
-
- final NodeId sender = ad.sender();
- boolean localOutdated = false;
-
- for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
- l : linkDescs.entrySet()) {
-
- final LinkKey key = l.getKey();
- final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
- synchronized (link) {
- Timestamp localLatest = removedLinks.get(key);
-
- for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
- final ProviderId providerId = p.getKey();
- final Timestamped<LinkDescription> pDesc = p.getValue();
-
- final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
- // remote
- Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
- if (remoteTimestamp == null) {
- remoteTimestamp = ad.linkTombstones().get(key);
- }
- if (remoteTimestamp == null ||
- pDesc.isNewerThan(remoteTimestamp)) {
- // I have more recent link description. update peer.
- notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
- } else {
- final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
- if (remoteLive != null &&
- remoteLive.compareTo(pDesc.timestamp()) > 0) {
- // I have something outdated
- localOutdated = true;
- }
- }
-
- // search local latest along the way
- if (localLatest == null ||
- pDesc.isNewerThan(localLatest)) {
- localLatest = pDesc.timestamp();
- }
- }
- // Tests if remote remove is more recent then local latest.
- final Timestamp remoteRemove = ad.linkTombstones().get(key);
- if (remoteRemove != null) {
- if (localLatest != null &&
- localLatest.compareTo(remoteRemove) < 0) {
- // remote remove is more recent
- notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
- }
- }
- }
- }
-
- // populate remove info if not known locally
- for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
- final LinkKey key = remoteRm.getKey();
- final Timestamp remoteRemove = remoteRm.getValue();
- // relying on removeLinkInternal to ignore stale info
- notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
- }
-
- if (localOutdated) {
- // send back advertisement to speed up convergence
- try {
- unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
- createAdvertisement());
- } catch (IOException e) {
- log.debug("Failed to send back active advertisement");
- }
- }
- }
-
- private final class InternalLinkEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
-
- log.trace("Received link event from peer: {}", message.sender());
- InternalLinkEvent event = SERIALIZER.decode(message.payload());
-
- ProviderId providerId = event.providerId();
- Timestamped<LinkDescription> linkDescription = event.linkDescription();
-
- try {
- notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
- } catch (Exception e) {
- log.warn("Exception thrown handling link event", e);
- }
- }
- }
-
- private final class InternalLinkRemovedEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
-
- log.trace("Received link removed event from peer: {}", message.sender());
- InternalLinkRemovedEvent event = SERIALIZER.decode(message.payload());
-
- LinkKey linkKey = event.linkKey();
- Timestamp timestamp = event.timestamp();
-
- try {
- notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling link removed", e);
- }
- }
- }
-
- private final class InternalLinkAntiEntropyAdvertisementListener
- implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
- LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown while handling Link advertisements", e);
- throw e;
- }
- }
- }
-
- private final class LinkInjectedEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
-
- log.trace("Received injected link event from peer: {}", message.sender());
- LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
-
- ProviderId providerId = linkInjectedEvent.providerId();
- LinkDescription linkDescription = linkInjectedEvent.linkDescription();
-
- final DeviceId deviceId = linkDescription.dst().deviceId();
- if (!deviceClockService.isTimestampAvailable(deviceId)) {
- // workaround for ONOS-1208
- log.warn("Not ready to accept update. Dropping {}", linkDescription);
- return;
- }
-
- try {
- createOrUpdateLink(providerId, linkDescription);
- } catch (Exception e) {
- log.warn("Exception thrown while handling link injected event", e);
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
deleted file mode 100644
index 541c724..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStoreMessageSubjects.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
- import org.onosproject.store.cluster.messaging.MessageSubject;
-
-/**
- * MessageSubjects used by GossipLinkStore peer-peer communication.
- */
-public final class GossipLinkStoreMessageSubjects {
-
- private GossipLinkStoreMessageSubjects() {}
-
- public static final MessageSubject LINK_UPDATE =
- new MessageSubject("peer-link-update");
- public static final MessageSubject LINK_REMOVED =
- new MessageSubject("peer-link-removed");
- public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
- new MessageSubject("link-enti-entropy-advertisement");
- public static final MessageSubject LINK_INJECTED =
- new MessageSubject("peer-link-injected");
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/InternalLinkEvent.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/InternalLinkEvent.java
deleted file mode 100644
index e7ec781..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/InternalLinkEvent.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import com.google.common.base.MoreObjects;
-
-import org.onosproject.net.link.LinkDescription;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.impl.Timestamped;
-
-/**
- * Information published by GossipDeviceStore to notify peers of a device
- * change event.
- */
-public class InternalLinkEvent {
-
- private final ProviderId providerId;
- private final Timestamped<LinkDescription> linkDescription;
-
- protected InternalLinkEvent(
- ProviderId providerId,
- Timestamped<LinkDescription> linkDescription) {
- this.providerId = providerId;
- this.linkDescription = linkDescription;
- }
-
- public ProviderId providerId() {
- return providerId;
- }
-
- public Timestamped<LinkDescription> linkDescription() {
- return linkDescription;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("providerId", providerId)
- .add("linkDescription", linkDescription)
- .toString();
- }
-
- // for serializer
- protected InternalLinkEvent() {
- this.providerId = null;
- this.linkDescription = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/InternalLinkRemovedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/InternalLinkRemovedEvent.java
deleted file mode 100644
index 92d4645..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/InternalLinkRemovedEvent.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import org.onosproject.net.LinkKey;
-import org.onosproject.store.Timestamp;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Information published by GossipLinkStore to notify peers of a link
- * being removed.
- */
-public class InternalLinkRemovedEvent {
-
- private final LinkKey linkKey;
- private final Timestamp timestamp;
-
- /**
- * Creates a InternalLinkRemovedEvent.
- * @param linkKey identifier of the removed link.
- * @param timestamp timestamp of when the link was removed.
- */
- public InternalLinkRemovedEvent(LinkKey linkKey, Timestamp timestamp) {
- this.linkKey = linkKey;
- this.timestamp = timestamp;
- }
-
- public LinkKey linkKey() {
- return linkKey;
- }
-
- public Timestamp timestamp() {
- return timestamp;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("linkKey", linkKey)
- .add("timestamp", timestamp)
- .toString();
- }
-
- // for serializer
- @SuppressWarnings("unused")
- private InternalLinkRemovedEvent() {
- linkKey = null;
- timestamp = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkAntiEntropyAdvertisement.java
deleted file mode 100644
index 128c406..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkAntiEntropyAdvertisement.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Map;
-
-import org.onosproject.cluster.NodeId;
-import org.onosproject.net.LinkKey;
-import org.onosproject.store.Timestamp;
-
-/**
- * Link AE Advertisement message.
- */
-public class LinkAntiEntropyAdvertisement {
-
- private final NodeId sender;
- private final Map<LinkFragmentId, Timestamp> linkTimestamps;
- private final Map<LinkKey, Timestamp> linkTombstones;
-
-
- public LinkAntiEntropyAdvertisement(NodeId sender,
- Map<LinkFragmentId, Timestamp> linkTimestamps,
- Map<LinkKey, Timestamp> linkTombstones) {
- this.sender = checkNotNull(sender);
- this.linkTimestamps = checkNotNull(linkTimestamps);
- this.linkTombstones = checkNotNull(linkTombstones);
- }
-
- public NodeId sender() {
- return sender;
- }
-
- public Map<LinkFragmentId, Timestamp> linkTimestamps() {
- return linkTimestamps;
- }
-
- public Map<LinkKey, Timestamp> linkTombstones() {
- return linkTombstones;
- }
-
- // For serializer
- @SuppressWarnings("unused")
- private LinkAntiEntropyAdvertisement() {
- this.sender = null;
- this.linkTimestamps = null;
- this.linkTombstones = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkFragmentId.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkFragmentId.java
deleted file mode 100644
index 082e457..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkFragmentId.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import java.util.Objects;
-
-import org.onosproject.net.LinkKey;
-import org.onosproject.net.provider.ProviderId;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Identifier for LinkDescription from a Provider.
- */
-public final class LinkFragmentId {
- public final ProviderId providerId;
- public final LinkKey linkKey;
-
- public LinkFragmentId(LinkKey linkKey, ProviderId providerId) {
- this.providerId = providerId;
- this.linkKey = linkKey;
- }
-
- public LinkKey linkKey() {
- return linkKey;
- }
-
- public ProviderId providerId() {
- return providerId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(providerId, linkKey);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof LinkFragmentId)) {
- return false;
- }
- LinkFragmentId that = (LinkFragmentId) obj;
- return Objects.equals(this.linkKey, that.linkKey) &&
- Objects.equals(this.providerId, that.providerId);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("providerId", providerId)
- .add("linkKey", linkKey)
- .toString();
- }
-
- // for serializer
- @SuppressWarnings("unused")
- private LinkFragmentId() {
- this.providerId = null;
- this.linkKey = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java
deleted file mode 100644
index 99c9928..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/LinkInjectedEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.net.link.LinkDescription;
-import org.onosproject.net.provider.ProviderId;
-
-public class LinkInjectedEvent {
-
- ProviderId providerId;
- LinkDescription linkDescription;
-
- public LinkInjectedEvent(ProviderId providerId, LinkDescription linkDescription) {
- this.providerId = providerId;
- this.linkDescription = linkDescription;
- }
-
- public ProviderId providerId() {
- return providerId;
- }
-
- public LinkDescription linkDescription() {
- return linkDescription;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("providerId", providerId)
- .add("linkDescription", linkDescription)
- .toString();
- }
-
- // for serializer
- protected LinkInjectedEvent() {
- this.providerId = null;
- this.linkDescription = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/package-info.java
index 4328988..f32eda4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of distributed link store using p2p synchronization protocol.
+ * Implementation of distributed link store using eventually consistent map primitive.
*/
package org.onosproject.store.link.impl;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValue.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValue.java
deleted file mode 100644
index b42a68c..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValue.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.mastership.impl;
-
-import static org.onosproject.net.MastershipRole.MASTER;
-import static org.onosproject.net.MastershipRole.NONE;
-import static org.onosproject.net.MastershipRole.STANDBY;
-
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.net.MastershipRole;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.collect.Lists;
-
-/**
- * A structure that holds node mastership roles associated with a
- * {@link org.onosproject.net.DeviceId}. This structure needs to be locked through IMap.
- */
-final class RoleValue {
-
- protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
-
- /**
- * Constructs empty RoleValue.
- */
- public RoleValue() {
- value.put(MastershipRole.MASTER, new LinkedList<>());
- value.put(MastershipRole.STANDBY, new LinkedList<>());
- value.put(MastershipRole.NONE, new LinkedList<>());
- }
-
- /**
- * Constructs copy of specified RoleValue.
- *
- * @param original original to create copy from
- */
- public RoleValue(final RoleValue original) {
- value.put(MASTER, Lists.newLinkedList(original.value.get(MASTER)));
- value.put(STANDBY, Lists.newLinkedList(original.value.get(STANDBY)));
- value.put(NONE, Lists.newLinkedList(original.value.get(NONE)));
- }
-
- // exposing internals for serialization purpose only
- Map<MastershipRole, List<NodeId>> value() {
- return Collections.unmodifiableMap(value);
- }
-
- public List<NodeId> nodesOfRole(MastershipRole type) {
- return value.get(type);
- }
-
- /**
- * Returns the first node to match the MastershipRole, or if there
- * are none, null.
- *
- * @param type the role
- * @return a node ID or null
- */
- public NodeId get(MastershipRole type) {
- return value.get(type).isEmpty() ? null : value.get(type).get(0);
- }
-
- public boolean contains(MastershipRole type, NodeId nodeId) {
- return value.get(type).contains(nodeId);
- }
-
- public MastershipRole getRole(NodeId nodeId) {
- if (contains(MASTER, nodeId)) {
- return MASTER;
- }
- if (contains(STANDBY, nodeId)) {
- return STANDBY;
- }
- return NONE;
- }
-
- /**
- * Associates a node to a certain role.
- *
- * @param type the role
- * @param nodeId the node ID of the node to associate
- * @return true if modified
- */
- public boolean add(MastershipRole type, NodeId nodeId) {
- List<NodeId> nodes = value.get(type);
-
- if (!nodes.contains(nodeId)) {
- return nodes.add(nodeId);
- }
- return false;
- }
-
- /**
- * Removes a node from a certain role.
- *
- * @param type the role
- * @param nodeId the ID of the node to remove
- * @return true if modified
- */
- public boolean remove(MastershipRole type, NodeId nodeId) {
- List<NodeId> nodes = value.get(type);
- if (!nodes.isEmpty()) {
- return nodes.remove(nodeId);
- } else {
- return false;
- }
- }
-
- /**
- * Reassigns a node from one role to another. If the node was not of the
- * old role, it will still be assigned the new role.
- *
- * @param nodeId the Node ID of node changing roles
- * @param from the old role
- * @param to the new role
- * @return true if modified
- */
- public boolean reassign(NodeId nodeId, MastershipRole from, MastershipRole to) {
- boolean modified = remove(from, nodeId);
- modified |= add(to, nodeId);
- return modified;
- }
-
- /**
- * Replaces a node in one role with another node. Even if there is no node to
- * replace, the new node is associated to the role.
- *
- * @param from the old NodeId to replace
- * @param to the new NodeId
- * @param type the role associated with the old NodeId
- * @return true if modified
- */
- public boolean replace(NodeId from, NodeId to, MastershipRole type) {
- boolean modified = remove(type, from);
- modified |= add(type, to);
- return modified;
- }
-
- /**
- * Summarizes this RoleValue as a RoleInfo. Note that master and/or backups
- * may be empty, so the values should be checked for safety.
- *
- * @return the RoleInfo.
- */
- public RoleInfo roleInfo() {
- return new RoleInfo(
- get(MastershipRole.MASTER), nodesOfRole(MastershipRole.STANDBY));
- }
-
- @Override
- public String toString() {
- ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
- for (Map.Entry<MastershipRole, List<NodeId>> el : value.entrySet()) {
- helper.add(el.getKey().toString(), el.getValue());
- }
- return helper.toString();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java
deleted file mode 100644
index afa51a5..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.mastership.impl;
-
-import java.util.List;
-import java.util.Map;
-
-import org.onosproject.cluster.NodeId;
-import org.onosproject.net.MastershipRole;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Serializer for RoleValues used by {@link org.onosproject.mastership.MastershipStore}.
- */
-public class RoleValueSerializer extends Serializer<RoleValue> {
-
- //RoleValues are assumed to hold a Map of MastershipRoles (an enum)
- //to a List of NodeIds.
-
- @Override
- public RoleValue read(Kryo kryo, Input input, Class<RoleValue> type) {
- RoleValue rv = new RoleValue();
- int size = input.readInt();
- for (int i = 0; i < size; i++) {
- MastershipRole role = MastershipRole.values()[input.readInt()];
- int s = input.readInt();
- for (int j = 0; j < s; j++) {
- rv.add(role, new NodeId(input.readString()));
- }
- }
- return rv;
- }
-
- @Override
- public void write(Kryo kryo, Output output, RoleValue type) {
- final Map<MastershipRole, List<NodeId>> map = type.value();
- output.writeInt(map.size());
-
- for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
- output.writeInt(el.getKey().ordinal());
-
- List<NodeId> nodes = el.getValue();
- output.writeInt(nodes.size());
- for (NodeId n : nodes) {
- output.writeString(n.toString());
- }
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/package-info.java
index 112eac3..c0c2d5f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of a distributed mastership store using Hazelcast.
+ * Implementation of a distributed mastership store.
*/
package org.onosproject.store.mastership.impl;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/ClusterMessageSerializer.java
deleted file mode 100644
index f1868bf..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/ClusterMessageSerializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.serializers.custom;
-
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
-
- /**
- * Creates a serializer for {@link ClusterMessage}.
- */
- public ClusterMessageSerializer() {
- // does not accept null
- super(false);
- }
-
- @Override
- public void write(Kryo kryo, Output output, ClusterMessage message) {
- kryo.writeClassAndObject(output, message.sender());
- kryo.writeClassAndObject(output, message.subject());
- output.writeInt(message.payload().length);
- output.writeBytes(message.payload());
- }
-
- @Override
- public ClusterMessage read(Kryo kryo, Input input,
- Class<ClusterMessage> type) {
- NodeId sender = (NodeId) kryo.readClassAndObject(input);
- MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
- int payloadSize = input.readInt();
- byte[] payload = input.readBytes(payloadSize);
- return new ClusterMessage(sender, subject, payload);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/MessageSubjectSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/MessageSubjectSerializer.java
deleted file mode 100644
index bf2e3c5..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/MessageSubjectSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.serializers.custom;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
-
- /**
- * Creates a serializer for {@link MessageSubject}.
- */
- public MessageSubjectSerializer() {
- // non-null, immutable
- super(false, true);
- }
-
-
- @Override
- public void write(Kryo kryo, Output output, MessageSubject object) {
- output.writeString(object.value());
- }
-
- @Override
- public MessageSubject read(Kryo kryo, Input input,
- Class<MessageSubject> type) {
- return new MessageSubject(input.readString());
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/package-info.java
index 22af810..f1c9a43 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/serializers/custom/package-info.java
@@ -15,8 +15,6 @@
*/
/**
- * Cluster messaging and distributed store serializers.
+ * Distributed store serializers.
*/
-//FIXME what is the right name for this package?
-//FIXME can this be moved to onos-core-serializers?
package org.onosproject.store.serializers.custom;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
index f8e7c71..74ab40b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -17,6 +17,7 @@
package org.onosproject.store.statistic.impl;
import com.google.common.base.Objects;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -38,6 +39,7 @@
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
@@ -59,8 +61,6 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -89,6 +89,9 @@
private Map<ConnectPoint, Set<FlowEntry>> current =
new ConcurrentHashMap<>();
+ public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
+ public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
+
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
private NodeId local;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index 1cb9dbe..51552f0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -38,6 +38,7 @@
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.osgi.service.component.ComponentContext;
@@ -59,8 +60,6 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
import static org.slf4j.LoggerFactory.getLogger;
@@ -85,6 +84,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
+ public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
+
private Map<ConnectPoint, InternalStatisticRepresentation> representations =
new ConcurrentHashMap<>();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/StatisticStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/StatisticStoreMessageSubjects.java
deleted file mode 100644
index ee26582..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/StatisticStoreMessageSubjects.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.statistic.impl;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-/**
- * MessageSubjects used by DistributedStatisticStore peer-peer communication.
- */
-public final class StatisticStoreMessageSubjects {
- private StatisticStoreMessageSubjects() {}
- public static final MessageSubject GET_CURRENT =
- new MessageSubject("peer-return-current");
- public static final MessageSubject GET_PREVIOUS =
- new MessageSubject("peer-return-previous");
-
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
deleted file mode 100644
index f7ced12..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.messaging.impl;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
-import org.onlab.packet.IpAddress;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests of the cluster communication manager.
- */
-public class ClusterCommunicationManagerTest {
-
- private static final NodeId N1 = new NodeId("n1");
- private static final NodeId N2 = new NodeId("n2");
-
- private static final int P1 = 9881;
- private static final int P2 = 9882;
-
- private static final IpAddress IP = IpAddress.valueOf("127.0.0.1");
-
- private ClusterCommunicationManager ccm1;
- private ClusterCommunicationManager ccm2;
-
- private TestDelegate cnd1 = new TestDelegate();
- private TestDelegate cnd2 = new TestDelegate();
-
- private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
- private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
-
- @Before
- public void setUp() throws Exception {
-
- NettyMessagingManager messagingService = new NettyMessagingManager();
- messagingService.activate();
-
- ccm1 = new ClusterCommunicationManager();
- ccm1.activate();
-
- ccm2 = new ClusterCommunicationManager();
- ccm2.activate();
-
-// ccm1.initialize(node1, cnd1);
-// ccm2.initialize(node2, cnd2);
- }
-
- @After
- public void tearDown() {
- ccm1.deactivate();
- ccm2.deactivate();
- }
-
- @Ignore("FIXME: failing randomly?")
- @Test
- public void connect() throws Exception {
- cnd1.latch = new CountDownLatch(1);
- cnd2.latch = new CountDownLatch(1);
-
-// ccm1.addNode(node2);
- validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
- validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
- }
-
- @Test
- @Ignore
- public void disconnect() throws Exception {
- cnd1.latch = new CountDownLatch(1);
- cnd2.latch = new CountDownLatch(1);
-
-// ccm1.addNode(node2);
- validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
- validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
-
- cnd1.latch = new CountDownLatch(1);
- cnd2.latch = new CountDownLatch(1);
- ccm1.deactivate();
-//
-// validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
- }
-
- private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
- throws InterruptedException {
- assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
- assertEquals("incorrect event", op, delegate.op);
- assertEquals("incorrect event node", nodeId, delegate.nodeId);
- }
-
- enum Op { DETECTED, VANISHED, REMOVED }
-
- private class TestDelegate implements ClusterNodesDelegate {
-
- Op op;
- CountDownLatch latch;
- NodeId nodeId;
-
- @Override
- public DefaultControllerNode nodeDetected(NodeId nodeId, IpAddress ip, int tcpPort) {
- latch(nodeId, Op.DETECTED);
- return new DefaultControllerNode(nodeId, ip, tcpPort);
- }
-
- @Override
- public void nodeVanished(NodeId nodeId) {
- latch(nodeId, Op.VANISHED);
- }
-
- @Override
- public void nodeRemoved(NodeId nodeId) {
- latch(nodeId, Op.REMOVED);
- }
-
- private void latch(NodeId nodeId, Op op) {
- this.op = op;
- this.nodeId = nodeId;
- latch.countDown();
- }
- }
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/ECLinkStoreTest.java
similarity index 84%
rename from core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
rename to core/store/dist/src/test/java/org/onosproject/store/link/impl/ECLinkStoreTest.java
index 8dcc96c..8ae9c82 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/ECLinkStoreTest.java
@@ -17,7 +17,6 @@
import com.google.common.collect.Iterables;
-import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -59,8 +58,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
@@ -76,7 +73,8 @@
/**
* Test of the GossipLinkStoreTest implementation.
*/
-public class GossipLinkStoreTest {
+@Ignore
+public class ECLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
@@ -114,10 +112,9 @@
private static final ControllerNode ONOS2 =
new DefaultControllerNode(NID2, IpAddress.valueOf("127.0.0.2"));
- private GossipLinkStore linkStoreImpl;
+ private ECLinkStore linkStoreImpl;
private LinkStore linkStore;
- private final AtomicLong ticker = new AtomicLong();
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
@@ -139,7 +136,7 @@
expectLastCall().anyTimes();
replay(clusterCommunicator);
- linkStoreImpl = new GossipLinkStore();
+ linkStoreImpl = new ECLinkStore();
linkStoreImpl.deviceClockService = deviceClockService;
linkStoreImpl.clusterCommunicator = clusterCommunicator;
linkStoreImpl.clusterService = new TestClusterService();
@@ -163,28 +160,10 @@
SparseAnnotations... annotations) {
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
- reset(clusterCommunicator);
- clusterCommunicator.<InternalLinkEvent>broadcast(
- anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
- expectLastCall().anyTimes();
- replay(clusterCommunicator);
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
verify(clusterCommunicator);
}
- private <T> void resetCommunicatorExpectingSingleBroadcast(
- Capture<T> message,
- Capture<MessageSubject> subject,
- Capture<Function<T, byte[]>> encoder) {
- message.reset();
- subject.reset();
- encoder.reset();
- reset(clusterCommunicator);
- clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
- expectLastCall().once();
- replay(clusterCommunicator);
- }
-
private void putLink(LinkKey key, Type type, SparseAnnotations... annotations) {
putLink(key.src().deviceId(), key.src().port(),
key.dst().deviceId(), key.dst().port(),
@@ -358,57 +337,26 @@
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
- Capture<InternalLinkEvent> message = new Capture<>();
- Capture<MessageSubject> subject = new Capture<>();
- Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
-
- // add link
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
LinkEvent event = linkStore.createOrUpdateLink(PID,
linkDescription);
- verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
assertEquals(LINK_ADDED, event.type());
- // update link type
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
assertEquals(LINK_UPDATED, event2.type());
// no change
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyNoBroadcastMessage(message);
assertNull("No change event expected", event3);
}
- private <T> void verifyNoBroadcastMessage(Capture<T> message) {
- assertFalse("No broadcast expected", message.hasCaptured());
- }
-
- private void verifyLinkBroadcastMessage(ProviderId providerId,
- NodeId sender,
- ConnectPoint src,
- ConnectPoint dst,
- Type type,
- Capture<InternalLinkEvent> actualLinkEvent,
- Capture<MessageSubject> actualSubject,
- Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
- verify(clusterCommunicator);
- assertTrue(actualLinkEvent.hasCaptured());
- assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
- assertEquals(providerId, actualLinkEvent.getValue().providerId());
- assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
- }
-
private static void assertLinkDescriptionEquals(ConnectPoint src,
ConnectPoint dst,
Type type,
@@ -424,33 +372,23 @@
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
- Capture<InternalLinkEvent> message = new Capture<>();
- Capture<MessageSubject> subject = new Capture<>();
- Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
-
// add Ancillary link
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, INDIRECT, A1));
- verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
assertNotNull("Ancillary only link is ignored", event);
// add Primary link
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, INDIRECT, A2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
assertEquals(LINK_UPDATED, event2.type());
// update link type
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
@@ -458,38 +396,30 @@
// no change
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event4 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyNoBroadcastMessage(message);
assertNull("No change event expected", event4);
// update link annotation (Primary)
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event5 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2_2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
assertEquals(LINK_UPDATED, event5.type());
// update link annotation (Ancillary)
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, DIRECT, A1_2));
- verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
assertEquals(LINK_UPDATED, event6.type());
// update link type (Ancillary) : ignored
- resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, EDGE));
- verifyNoBroadcastMessage(message);
assertNull("Ancillary change other than annotation is ignored", event7);
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/LinkFragmentIdTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/LinkFragmentIdTest.java
deleted file mode 100644
index 3508d2c..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/LinkFragmentIdTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.link.impl;
-
-import static org.onosproject.net.DeviceId.deviceId;
-
-import org.junit.Test;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.LinkKey;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.provider.ProviderId;
-import com.google.common.testing.EqualsTester;
-
-public class LinkFragmentIdTest {
-
- private static final ProviderId PID = new ProviderId("of", "foo");
- private static final ProviderId PIDA = new ProviderId("of", "bar", true);
-
- private static final DeviceId DID1 = deviceId("of:foo");
- private static final DeviceId DID2 = deviceId("of:bar");
-
- private static final PortNumber P1 = PortNumber.portNumber(1);
- private static final PortNumber P2 = PortNumber.portNumber(2);
- private static final PortNumber P3 = PortNumber.portNumber(3);
-
- private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
- private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
-
- private static final ConnectPoint CP3 = new ConnectPoint(DID1, P2);
- private static final ConnectPoint CP4 = new ConnectPoint(DID2, P3);
-
- private static final LinkKey L1 = LinkKey.linkKey(CP1, CP2);
- private static final LinkKey L2 = LinkKey.linkKey(CP3, CP4);
-
- @Test
- public void testEquals() {
- new EqualsTester()
- .addEqualityGroup(new LinkFragmentId(L1, PID),
- new LinkFragmentId(L1, PID))
- .addEqualityGroup(new LinkFragmentId(L2, PID),
- new LinkFragmentId(L2, PID))
- .addEqualityGroup(new LinkFragmentId(L1, PIDA),
- new LinkFragmentId(L1, PIDA))
- .addEqualityGroup(new LinkFragmentId(L2, PIDA),
- new LinkFragmentId(L2, PIDA))
- .testEquals();
- }
-
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/RoleValueTest.java b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/RoleValueTest.java
deleted file mode 100644
index d50bc75..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/RoleValueTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.mastership.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.onosproject.net.MastershipRole.*;
-
-import org.junit.Test;
-import org.onosproject.cluster.NodeId;
-
-import com.google.common.collect.Sets;
-
-public class RoleValueTest {
-
- private static final RoleValue RV = new RoleValue();
-
- private static final NodeId NID1 = new NodeId("node1");
- private static final NodeId NID2 = new NodeId("node2");
- private static final NodeId NID3 = new NodeId("node3");
-
- @Test
- public void add() {
- assertEquals("faulty initialization: ", 3, RV.value.size());
- RV.add(MASTER, NID1);
- RV.add(STANDBY, NID2);
- RV.add(STANDBY, NID3);
-
- assertEquals("wrong nodeID: ", NID1, RV.get(MASTER));
- assertTrue("wrong nodeIDs: ",
- Sets.newHashSet(NID3, NID2).containsAll(RV.nodesOfRole(STANDBY)));
- }
-}