ONOS-5691 ONOS-5742 Fixing intent framework difficulties
- Refactoring AbstractAccumulator to use less blocking synchronization
- Fixing bug in AbstractAccumulator that could leave some items
without firing
- Updated IntentStore for resubmitting pending operations
Change-Id: Iaf240da65e11ceb7d7d745cf4e25bfb8c26ed1eb
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
index feada17..c30993e 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
@@ -89,6 +89,29 @@
}
/**
+ * Creates a new intent data object.
+ *
+ * @param intent intent this metadata references
+ * @param state intent state
+ * @param request intent request
+ * @param version version of the intent for this key
+ * @param origin ID of the node where the data was originally created
+ */
+ public IntentData(Intent intent, IntentState state, IntentState request, Timestamp version, NodeId origin) {
+ checkNotNull(intent);
+ checkNotNull(state);
+ checkNotNull(request);
+ checkNotNull(version);
+ checkNotNull(origin);
+
+ this.intent = intent;
+ this.state = state;
+ this.request = request;
+ this.version = version;
+ this.origin = origin;
+ }
+
+ /**
* Copy constructor.
*
* @param intentData intent data to copy
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index f6f69c5..6c0dff4 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -131,6 +131,15 @@
Iterable<IntentData> getPendingData();
/**
+ * Returns the intent data object that are pending processing for a specfied
+ * key.
+ *
+ * @param intentKey key to look up
+ * @return pending intent data object
+ */
+ IntentData getPendingData(Key intentKey);
+
+ /**
* Returns the intent data objects that are pending processing for longer
* than the specified duration.
*
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java
index 41ce036..02d7eb2 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java
@@ -195,6 +195,11 @@
}
@Override
+ public IntentData getPendingData(Key intentKey) {
+ return pending.get(intentKey);
+ }
+
+ @Override
public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
final SystemClockTimestamp time = new SystemClockTimestamp(older);
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
index 56ee709..bb180c8 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
@@ -210,6 +210,13 @@
private void cleanup() {
int corruptCount = 0, failedCount = 0, stuckCount = 0, pendingCount = 0;
+ // Check the pending map first, because the check of the current map
+ // will add items to the pending map.
+ for (IntentData intentData : store.getPendingData(true, periodMs)) {
+ resubmitPendingRequest(intentData);
+ pendingCount++;
+ }
+
for (IntentData intentData : store.getIntentData(true, periodMs)) {
switch (intentData.state()) {
case FAILED:
@@ -231,11 +238,6 @@
}
}
- for (IntentData intentData : store.getPendingData(true, periodMs)) {
- resubmitPendingRequest(intentData);
- pendingCount++;
- }
-
if (corruptCount + failedCount + stuckCount + pendingCount > 0) {
log.debug("Intent cleanup ran and resubmitted {} corrupt, {} failed, {} stuck, and {} pending intents",
corruptCount, failedCount, stuckCount, pendingCount);
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
index 8ea910b..631151b 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
@@ -53,7 +53,7 @@
*/
class IntentInstaller {
- private static final Logger log = getLogger(IntentManager.class);
+ private static final Logger log = getLogger(IntentInstaller.class);
private IntentStore store;
private ObjectiveTrackerService trackerService;
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 12c636c..5524d63 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -49,6 +49,7 @@
import org.onosproject.net.intent.impl.compiler.PointToPointIntentCompiler;
import org.onosproject.net.intent.impl.phase.FinalIntentProcessPhase;
import org.onosproject.net.intent.impl.phase.IntentProcessPhase;
+import org.onosproject.net.intent.impl.phase.Skipped;
import org.osgi.service.component.ComponentContext;
import org.onosproject.net.resource.ResourceService;
import org.slf4j.Logger;
@@ -428,7 +429,8 @@
//FIXME
log.warn("Future failed: {}", e);
return null;
- })).collect(Collectors.toList());
+ }))
+ .collect(Collectors.toList());
// write multiple data to store in order
store.batchWrite(Tools.allOf(futures).join().stream()
@@ -449,6 +451,16 @@
}
private IntentProcessPhase createInitialPhase(IntentData data) {
+ IntentData pending = store.getPendingData(data.key());
+ if (pending == null || pending.version().isNewerThan(data.version())) {
+ /*
+ If the pending map is null, then this intent was compiled by a
+ previous batch iteration, so we can skip it.
+ If the pending map has a newer request, it will get compiled as
+ part of the next batch, so we can skip it.
+ */
+ return Skipped.getPhase();
+ }
IntentData current = store.getIntentData(data.key());
return newInitialPhase(processor, data, current);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/phase/Skipped.java b/core/net/src/main/java/org/onosproject/net/intent/impl/phase/Skipped.java
new file mode 100644
index 0000000..a8afde6
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/phase/Skipped.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent.impl.phase;
+
+import org.onosproject.net.intent.IntentData;
+
+/**
+ * Represents a phase where an intent is not compiled.
+ * <p>
+ * This should be used if a new version of the intent will immediately override
+ * this one.
+ * </p>
+ */
+public final class Skipped extends FinalIntentProcessPhase {
+
+ private static final Skipped SINGLETON = new Skipped();
+
+ /**
+ * Returns a shared skipped phase.
+ *
+ * @return skipped phase
+ */
+ public static Skipped getPhase() {
+ return SINGLETON;
+ }
+
+ // Prevent object construction; use getPhase()
+ private Skipped() {
+ }
+
+ @Override
+ public IntentData data() {
+ return null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index c7dfb77..284a973 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -35,11 +35,11 @@
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -141,19 +141,26 @@
.withName("intent-current")
.withSerializer(intentSerializer)
.withTimestampProvider((key, intentData) ->
- new MultiValuedTimestamp<>(
- intentData == null ? new WallClockTimestamp() :
- intentData.version(),
- sequenceNumber.getAndIncrement()))
+ new MultiValuedTimestamp<>(intentData == null ?
+ new WallClockTimestamp() : intentData.version(),
+ sequenceNumber.getAndIncrement()))
.withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData));
EventuallyConsistentMapBuilder pendingECMapBuilder =
storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
.withName("intent-pending")
.withSerializer(intentSerializer)
- .withTimestampProvider((key, intentData) -> intentData == null ?
- new MultiValuedTimestamp<>(new WallClockTimestamp(), System.nanoTime()) :
- new MultiValuedTimestamp<>(intentData.version(), System.nanoTime()))
+ .withTimestampProvider((key, intentData) ->
+ /*
+ We always want to accept new values in the pending map,
+ so we should use a high performance logical clock.
+ */
+ /*
+ TODO We use the wall clock for the time being, but
+ this could result in issues if there is clock skew
+ across instances.
+ */
+ new MultiValuedTimestamp<>(new WallClockTimestamp(), System.nanoTime()))
.withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData));
if (initiallyPersistent) {
currentECMapBuilder = currentECMapBuilder.withPersistence();
@@ -285,17 +292,19 @@
} else {
currentMap.put(newData.key(), new IntentData(newData));
}
-
- // Remove the intent data from the pending map if the newData is more
- // recent or equal to the existing entry.
- pendingMap.compute(newData.key(), (key, existingValue) -> {
- if (existingValue == null || !existingValue.version().isNewerThan(newData.version())) {
- return null;
- } else {
- return existingValue;
- }
- });
}
+ /*
+ * Remove the intent data from the pending map if the newData is more
+ * recent or equal to the existing entry. No matter if it is an acceptable
+ * update or not.
+ */
+ pendingMap.compute(newData.key(), (key, existingValue) -> {
+ if (existingValue == null || !existingValue.version().isNewerThan(newData.version())) {
+ return null;
+ } else {
+ return existingValue;
+ }
+ });
}
private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
@@ -363,7 +372,12 @@
Timestamp version = data.version() != null ? data.version() : new WallClockTimestamp();
pendingMap.compute(data.key(), (key, existingValue) -> {
if (existingValue == null || existingValue.version().isOlderThan(version)) {
- return new IntentData(data.intent(), data.state(),
+ /*
+ * This avoids to create Intent with state == request, which can
+ * be problematic if the Intent state is different from *REQ
+ * {INSTALL_, WITHDRAW_ and PURGE_}.
+ */
+ return new IntentData(data.intent(), data.state(), data.request(),
version, clusterService.getLocalNode().id());
} else {
return existingValue;
@@ -389,6 +403,11 @@
}
@Override
+ public IntentData getPendingData(Key intentKey) {
+ return pendingMap.get(intentKey);
+ }
+
+ @Override
public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
long now = System.currentTimeMillis();
final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
@@ -403,7 +422,6 @@
@Override
public void event(EventuallyConsistentMapEvent<Key, IntentData> event) {
IntentData intentData = event.value();
-
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
// The current intents map has been updated. If we are master for
// this intent's partition, notify the Manager that it should