Finished implementation of GossipIntentStore based on new API and semantics.
Change-Id: I1a71d075e5d34ab7b9f7c2533d389235d6da1d9a
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 2fc6fd8..17439ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -37,7 +37,8 @@
import org.onosproject.store.impl.EventuallyConsistentMapEvent;
import org.onosproject.store.impl.EventuallyConsistentMapImpl;
import org.onosproject.store.impl.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.WallclockClockManager;
+import org.onosproject.store.impl.MultiValuedTimestamp;
+import org.onosproject.store.impl.SystemClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
@@ -58,12 +59,6 @@
private final Logger log = getLogger(getClass());
- /*private EventuallyConsistentMap<IntentId, Intent> intents;
-
- private EventuallyConsistentMap<IntentId, IntentState> intentStates;
-
- private EventuallyConsistentMap<IntentId, List<Intent>> installables;*/
-
// Map of intent key => current intent state
private EventuallyConsistentMap<Key, IntentData> currentState;
@@ -82,36 +77,22 @@
@Activate
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API);
- /*intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
- clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
-
- intentStates = new EventuallyConsistentMapImpl<>("intent-states",
- clusterService,
- clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
-
- installables = new EventuallyConsistentMapImpl<>("intent-installables",
- clusterService,
- clusterCommunicator,
- intentSerializer,
- new WallclockClockManager<>());
- */
+ .register(KryoNamespaces.API)
+ .register(IntentData.class)
+ .register(MultiValuedTimestamp.class)
+ .register(SystemClockTimestamp.class);
currentState = new EventuallyConsistentMapImpl<>("intent-current",
clusterService,
clusterCommunicator,
intentSerializer,
- new WallclockClockManager<>());
+ new IntentDataLogicalClockManager<>());
pending = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
- new WallclockClockManager<>());
+ new IntentDataClockManager<>());
currentState.addListener(new InternalIntentStatesListener());
pending.addListener(new InternalPendingListener());
@@ -121,10 +102,6 @@
@Deactivate
public void deactivate() {
-
- /*intents.destroy();
- intentStates.destroy();
- installables.destroy();*/
currentState.destroy();
pending.destroy();
@@ -133,7 +110,6 @@
@Override
public long getIntentCount() {
- //return intents.size();
return currentState.size();
}
@@ -146,99 +122,45 @@
@Override
public IntentState getIntentState(Key intentKey) {
- // TODO: implement this
- return IntentState.FAILED;
+ IntentData data = currentState.get(intentKey);
+ if (data != null) {
+ return data.state();
+ }
+ return null;
}
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
- // TODO: implement this or delete class
+ IntentData data = currentState.get(intentKey);
+ if (data != null) {
+ return data.installables();
+ }
return null;
- /*
- return installables.get(intentId);
- */
}
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
- /*
- List<BatchWrite.Operation> failed = new ArrayList<>();
-
- for (BatchWrite.Operation op : batch.operations()) {
- switch (op.type()) {
- case CREATE_INTENT:
- checkArgument(op.args().size() == 1,
- "CREATE_INTENT takes 1 argument. %s", op);
- Intent intent = op.arg(0);
-
- intents.put(intent.id(), intent);
- intentStates.put(intent.id(), INSTALL_REQ);
-
- // TODO remove from pending?
-
-
- break;
- case REMOVE_INTENT:
- checkArgument(op.args().size() == 1,
- "REMOVE_INTENT takes 1 argument. %s", op);
- IntentId intentId = op.arg(0);
-
- intents.remove(intentId);
- intentStates.remove(intentId);
- installables.remove(intentId);
-
- break;
- case SET_STATE:
- checkArgument(op.args().size() == 2,
- "SET_STATE takes 2 arguments. %s", op);
- intent = op.arg(0);
- IntentState newState = op.arg(1);
-
- intentStates.put(intent.id(), newState);
-
- break;
- case SET_INSTALLABLE:
- checkArgument(op.args().size() == 2,
- "SET_INSTALLABLE takes 2 arguments. %s", op);
- intentId = op.arg(0);
- List<Intent> installableIntents = op.arg(1);
-
- installables.put(intentId, installableIntents);
-
- break;
- case REMOVE_INSTALLED:
- checkArgument(op.args().size() == 1,
- "REMOVE_INSTALLED takes 1 argument. %s", op);
- intentId = op.arg(0);
- installables.remove(intentId);
- break;
- default:
- log.warn("Unknown Operation encountered: {}", op);
- failed.add(op);
- break;
- }
- }
-
- return failed;
- */
+ // Deprecated
return null;
}
@Override
public void write(IntentData newData) {
+ log.debug("writing intent {}", newData);
+
// Only the master is modifying the current state. Therefore assume
// this always succeeds
currentState.put(newData.key(), newData);
// if current.put succeeded
- //pending.remove(newData.key(), newData);
+ pending.remove(newData.key(), newData);
- try {
+ /*try {
notifyDelegate(IntentEvent.getEvent(newData));
} catch (IllegalArgumentException e) {
//no-op
log.trace("ignore this exception: {}", e);
- }
+ }*/
}
@Override
@@ -262,14 +184,17 @@
@Override
public void addPending(IntentData data) {
+ log.debug("new call to pending {}", data);
+ if (data.version() == null) {
+ log.debug("updating timestamp");
+ data.setVersion(new SystemClockTimestamp());
+ }
pending.put(data.key(), data);
}
@Override
public boolean isMaster(Intent intent) {
- // TODO
- //return partitionService.isMine(intent.key());
- return false;
+ return partitionService.isMine(intent.key());
}
private void notifyDelegateIfNotNull(IntentEvent event) {
@@ -284,18 +209,16 @@
public void event(
EventuallyConsistentMapEvent<Key, IntentData> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
- // TODO check event send logic
IntentEvent externalEvent;
- IntentData intentData = currentState.get(event.key()); // TODO OK if this is null?
+ IntentData intentData = event.value();
- /*
try {
- externalEvent = IntentEvent.getEvent(event.value(), intent);
+ externalEvent = IntentEvent.getEvent(intentData.state(), intentData.intent());
} catch (IllegalArgumentException e) {
externalEvent = null;
}
- notifyDelegateIfNotNull(externalEvent);*/
+ notifyDelegateIfNotNull(externalEvent);
}
}
}
@@ -314,6 +237,13 @@
delegate.process(event.value());
}
}
+
+ try {
+ notifyDelegate(IntentEvent.getEvent(event.value()));
+ } catch (IllegalArgumentException e) {
+ //no-op
+ log.trace("ignore this exception: {}", e);
+ }
}
}
}