Updates to ECM interface
Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209
Adding origin to IntentData and use it to pick GossipIntentStore peer
Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index bbdbdb7..2bc992b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.intent.impl;
+import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,8 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
@@ -41,7 +44,10 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.onosproject.net.intent.IntentState.*;
@@ -51,6 +57,8 @@
* Manages inventory of Intents in a distributed data store that uses optimistic
* replication and gossip based techniques.
*/
+//FIXME we should listen for leadership changes. if the local instance has just
+// ... become a leader, scan the pending map and process those
@Component(immediate = false, enabled = true)
@Service
public class GossipIntentStore
@@ -86,15 +94,17 @@
clusterService,
clusterCommunicator,
intentSerializer,
- new IntentDataLogicalClockManager<>());
+ new IntentDataLogicalClockManager<>(),
+ (key, intentData) -> getPeerNodes(key, intentData));
pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
- new IntentDataClockManager<>());
+ new IntentDataClockManager<>(),
+ (key, intentData) -> getPeerNodes(key, intentData));
- currentMap.addListener(new InternalIntentStatesListener());
+ currentMap.addListener(new InternalCurrentListener());
pendingMap.addListener(new InternalPendingListener());
log.info("Started");
@@ -226,7 +236,6 @@
@Override
public void write(IntentData newData) {
IntentData currentData = currentMap.get(newData.key());
-
if (isUpdateAcceptable(currentData, newData)) {
// Only the master is modifying the current state. Therefore assume
// this always succeeds
@@ -239,6 +248,34 @@
}
}
+ private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
+ NodeId master = partitionService.getLeader(key);
+ NodeId origin = (data != null) ? data.origin() : null;
+ NodeId me = clusterService.getLocalNode().id();
+ boolean isMaster = Objects.equals(master, me);
+ boolean isOrigin = Objects.equals(origin, me);
+ if (isMaster && isOrigin) {
+ return ImmutableList.of(getRandomNode());
+ } else if (isMaster) {
+ return ImmutableList.of(origin);
+ } else if (isOrigin) {
+ return ImmutableList.of(master);
+ } else {
+ // FIXME: why are we here? log error?
+ return ImmutableList.of(master);
+ }
+ }
+
+ private NodeId getRandomNode() {
+ List<NodeId> nodes = clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toCollection(ArrayList::new));
+ Collections.shuffle(nodes);
+ // FIXME check if self
+ // FIXME verify nodes.size() > 0
+ return nodes.get(0);
+ }
+
@Override
public void batchWrite(Iterable<IntentData> updates) {
updates.forEach(this::write);
@@ -263,6 +300,7 @@
if (data.version() == null) {
data.setVersion(new WallClockTimestamp());
}
+ data.setOrigin(clusterService.getLocalNode().id());
pendingMap.put(data.key(), copyData(data));
}
@@ -292,7 +330,7 @@
}
}
- private final class InternalIntentStatesListener implements
+ private final class InternalCurrentListener implements
EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(