Implemented a PartitionManager to keep track of partitions
assigned to instances.
Also updated GossipIntentStore a little to the new API. This work is not
yet complete.
Change-Id: I64d1779b669de51c35da686b65006a80ac4819b0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index ba348d4..f3917c9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -26,11 +26,14 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
+import org.onosproject.net.intent.IntentOperation;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
+import org.onosproject.net.intent.Key;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.impl.EventuallyConsistentMap;
@@ -66,12 +69,18 @@
private EventuallyConsistentMap<IntentId, List<Intent>> installables;
+ // Map of intent key => pending intent operation
+ private EventuallyConsistentMap<String, IntentOperation> pending;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PartitionService partitionService;
+
@Activate
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
@@ -93,16 +102,25 @@
intentSerializer,
new WallclockClockManager<>());
+ pending = new EventuallyConsistentMapImpl<>("intent-pending",
+ clusterService,
+ clusterCommunicator,
+ intentSerializer, // TODO
+ new WallclockClockManager<>());
+
intentStates.addListener(new InternalIntentStatesListener());
+ pending.addListener(new InternalPendingListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
+
intents.destroy();
intentStates.destroy();
installables.destroy();
+ pending.destroy();
log.info("Stopped");
}
@@ -148,6 +166,9 @@
intents.put(intent.id(), intent);
intentStates.put(intent.id(), INSTALL_REQ);
+ // TODO remove from pending?
+
+
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
@@ -193,6 +214,41 @@
return failed;
}
+ @Override
+ public void write(IntentData newData) {
+ // TODO
+ }
+
+ @Override
+ public void batchWrite(Iterable<IntentData> updates) {
+ // TODO
+ }
+
+ @Override
+ public Intent getIntent(Key key) {
+ return null; // TODO
+ }
+
+ @Override
+ public IntentData getIntentData(Key key) {
+ return null; // TODO
+ }
+
+ @Override
+ public void addPending(IntentData data) {
+ // TODO implement
+
+ // Check the intent versions
+ //pending.put(op.key(), op);
+ }
+
+ @Override
+ public boolean isMaster(Intent intent) {
+ // TODO
+ //return partitionService.isMine(intent.key());
+ return false;
+ }
+
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
@@ -219,5 +275,22 @@
}
}
+ private final class InternalPendingListener implements
+ EventuallyConsistentMapListener<String, IntentOperation> {
+ @Override
+ public void event(
+ EventuallyConsistentMapEvent<String, IntentOperation> event) {
+ if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ // The pending intents map has been updated. If we are master for
+ // this intent's partition, notify the Manager that they should do
+ // some work.
+ if (isMaster(event.value().intent())) {
+ // TODO delegate.process(event.value());
+ log.debug("implement this");
+ }
+ }
+ }
+ }
+
}