Started refactoring Intent Manager
Introduced IntentData and reworked APIs
Change-Id: I1fa437ceb1b72c4017ac2da1573bfbeb64c0632a
diff --git a/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java b/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java
index 3e0372f..f3fc65f 100644
--- a/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java
+++ b/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java
@@ -15,35 +15,18 @@
*/
package org.onosproject.event;
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.onlab.util.AbstractAccumulator;
-import java.util.List;
import java.util.Timer;
-import java.util.TimerTask;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
/**
* Base implementation of an event accumulator. It allows triggering based on
* event inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
-public abstract class AbstractEventAccumulator implements EventAccumulator {
-
- private Logger log = LoggerFactory.getLogger(AbstractEventAccumulator.class);
-
- private final Timer timer;
- private final int maxEvents;
- private final int maxBatchMillis;
- private final int maxIdleMillis;
-
- private TimerTask idleTask = new ProcessorTask();
- private TimerTask maxTask = new ProcessorTask();
-
- private List<Event> events = Lists.newArrayList();
+public abstract class AbstractEventAccumulator
+ extends AbstractAccumulator<Event>
+ implements EventAccumulator {
/**
* Creates an event accumulator capable of triggering on the specified
@@ -59,108 +42,6 @@
*/
protected AbstractEventAccumulator(Timer timer, int maxEvents,
int maxBatchMillis, int maxIdleMillis) {
- this.timer = checkNotNull(timer, "Timer cannot be null");
-
- checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
- checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
- checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
-
- this.maxEvents = maxEvents;
- this.maxBatchMillis = maxBatchMillis;
- this.maxIdleMillis = maxIdleMillis;
- }
-
- @Override
- public void add(Event event) {
- idleTask = cancelIfActive(idleTask);
- events.add(event);
-
- // Did we hit the max event threshold?
- if (events.size() == maxEvents) {
- maxTask = cancelIfActive(maxTask);
- schedule(1);
- } else {
- // Otherwise, schedule idle task and if this is a first event
- // also schedule the max batch age task.
- idleTask = schedule(maxIdleMillis);
- if (events.size() == 1) {
- maxTask = schedule(maxBatchMillis);
- }
- }
- }
-
- // Schedules a new processor task given number of millis in the future.
- private TimerTask schedule(int millis) {
- TimerTask task = new ProcessorTask();
- timer.schedule(task, millis);
- return task;
- }
-
- // Cancels the specified task if it is active.
- private TimerTask cancelIfActive(TimerTask task) {
- if (task != null) {
- task.cancel();
- }
- return task;
- }
-
- // Task for triggering processing of accumulated events
- private class ProcessorTask extends TimerTask {
- @Override
- public void run() {
- try {
- idleTask = cancelIfActive(idleTask);
- maxTask = cancelIfActive(maxTask);
- processEvents(finalizeCurrentBatch());
- } catch (Exception e) {
- log.warn("Unable to process batch due to {}", e.getMessage());
- }
- }
- }
-
- // Demotes and returns the current batch of events and promotes a new one.
- private synchronized List<Event> finalizeCurrentBatch() {
- List<Event> toBeProcessed = events;
- events = Lists.newArrayList();
- return toBeProcessed;
- }
-
- /**
- * Returns the backing timer.
- *
- * @return backing timer
- */
- public Timer timer() {
- return timer;
- }
-
- /**
- * Returns the maximum number of events allowed to accumulate before
- * processing is triggered.
- *
- * @return max number of events
- */
- public int maxEvents() {
- return maxEvents;
- }
-
- /**
- * Returns the maximum number of millis allowed to expire since the first
- * event before processing is triggered.
- *
- * @return max number of millis a batch is allowed to last
- */
- public int maxBatchMillis() {
- return maxBatchMillis;
- }
-
- /**
- * Returns the maximum number of millis allowed to expire since the last
- * event arrival before processing is triggered.
- *
- * @return max number of millis since the last event
- */
- public int maxIdleMillis() {
- return maxIdleMillis;
+ super(timer, maxEvents, maxBatchMillis, maxIdleMillis);
}
}
diff --git a/core/api/src/main/java/org/onosproject/event/EventAccumulator.java b/core/api/src/main/java/org/onosproject/event/EventAccumulator.java
index 52928ae..78acfa0 100644
--- a/core/api/src/main/java/org/onosproject/event/EventAccumulator.java
+++ b/core/api/src/main/java/org/onosproject/event/EventAccumulator.java
@@ -15,27 +15,11 @@
*/
package org.onosproject.event;
-import java.util.List;
+import org.onlab.util.Accumulator;
/**
* Abstraction of an accumulator capable of collecting events and at some
* point in time triggers processing of all previously accumulated events.
*/
-public interface EventAccumulator {
-
- /**
- * Adds an event to the current batch. This operation may, or may not
- * trigger processing of the current batch of events.
- *
- * @param event event to be added to the current batch
- */
- void add(Event event);
-
- /**
- * Processes the specified list of accumulated events.
- *
- * @param events list of accumulated events
- */
- void processEvents(List<Event> events);
-
+public interface EventAccumulator extends Accumulator<Event> {
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/Intent.java b/core/api/src/main/java/org/onosproject/net/intent/Intent.java
index 19170ae..1146013 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/Intent.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/Intent.java
@@ -36,7 +36,7 @@
private final IntentId id;
private final ApplicationId appId;
- private final String key;
+ private final String key; // TODO make this a class
private final Collection<NetworkResource> resources;
@@ -156,4 +156,8 @@
idGenerator = null;
}
}
+
+ public String key() {
+ return key;
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
new file mode 100644
index 0000000..0f2bd84
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.store.Timestamp;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A wrapper class that contains an intents, its state, and other metadata for
+ * internal use.
+ */
+public class IntentData { //FIXME need to make this "immutable"
+ // manager should be able to mutate a local copy while processing
+ private Intent intent;
+
+ private IntentState state;
+ private Timestamp version;
+
+ private List<Intent> installables;
+
+ public IntentData(Intent intent, IntentState state, Timestamp version) {
+ this.intent = intent;
+ this.state = state;
+ this.version = version;
+ }
+
+ // kryo constructor
+ protected IntentData() {
+ }
+
+ public Intent intent() {
+ return intent;
+ }
+
+ public IntentState state() {
+ return state;
+ }
+
+ public String key() {
+ return intent.key();
+ }
+
+ public void setState(IntentState newState) {
+ this.state = newState;
+ }
+
+ public void setInstallables(List<Intent> installables) {
+ this.installables = ImmutableList.copyOf(installables);
+ }
+
+ public List<Intent> installables() {
+ return installables;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(intent, version);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final IntentData other = (IntentData) obj;
+ return Objects.equals(this.intent, other.intent)
+ && Objects.equals(this.version, other.version);
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentOperation.java b/core/api/src/main/java/org/onosproject/net/intent/IntentOperation.java
index afb0010..c25c28f 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentOperation.java
@@ -27,9 +27,7 @@
public final class IntentOperation {
private final Type type;
- private final IntentId intentId;
private final Intent intent;
- //FIXME consider pulling the key out (we will hash based on key)
/**
* Operation type.
@@ -62,12 +60,10 @@
* Creates an intent operation.
*
* @param type operation type
- * @param intentId identifier of the intent subject to the operation
* @param intent intent subject
*/
- IntentOperation(Type type, IntentId intentId, Intent intent) {
+ public IntentOperation(Type type, Intent intent) {
this.type = checkNotNull(type);
- this.intentId = checkNotNull(intentId);
this.intent = intent;
}
@@ -86,7 +82,11 @@
* @return intent identifier
*/
public IntentId intentId() {
- return intentId;
+ return intent.id();
+ }
+
+ public String key() {
+ return intent.key();
}
/**
@@ -101,7 +101,7 @@
@Override
public int hashCode() {
- return Objects.hash(type, intentId, intent);
+ return Objects.hash(type, intent);
}
@Override
@@ -114,7 +114,6 @@
}
final IntentOperation other = (IntentOperation) obj;
return Objects.equals(this.type, other.type) &&
- Objects.equals(this.intentId, other.intentId) &&
Objects.equals(this.intent, other.intent);
}
@@ -123,7 +122,6 @@
public String toString() {
return toStringHelper(this)
.add("type", type)
- .add("intentId", intentId)
.add("intent", intent)
.toString();
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentOperations.java b/core/api/src/main/java/org/onosproject/net/intent/IntentOperations.java
index 2f30fa4..d1d4c3d 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentOperations.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentOperations.java
@@ -31,7 +31,7 @@
/**
* Batch of intent submit/withdraw/replace operations.
*/
-@Deprecated
+@Deprecated //DELETEME
public final class IntentOperations {
private final List<IntentOperation> operations;
@@ -120,7 +120,7 @@
*/
public Builder addSubmitOperation(Intent intent) {
checkNotNull(intent, "Intent cannot be null");
- builder.add(new IntentOperation(SUBMIT, intent.id(), intent));
+ builder.add(new IntentOperation(SUBMIT, intent));
return this;
}
@@ -134,7 +134,7 @@
public Builder addReplaceOperation(IntentId oldIntentId, Intent newIntent) {
checkNotNull(oldIntentId, "Intent ID cannot be null");
checkNotNull(newIntent, "Intent cannot be null");
- builder.add(new IntentOperation(REPLACE, oldIntentId, newIntent));
+ builder.add(new IntentOperation(REPLACE, newIntent)); //FIXME
return this;
}
@@ -146,7 +146,7 @@
*/
public Builder addWithdrawOperation(IntentId intentId) {
checkNotNull(intentId, "Intent ID cannot be null");
- builder.add(new IntentOperation(WITHDRAW, intentId, null));
+ builder.add(new IntentOperation(WITHDRAW, null)); //FIXME
return this;
}
@@ -158,7 +158,7 @@
*/
public Builder addUpdateOperation(IntentId intentId) {
checkNotNull(intentId, "Intent ID cannot be null");
- builder.add(new IntentOperation(UPDATE, intentId, null));
+ builder.add(new IntentOperation(UPDATE, null)); //FIXME
return this;
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentState.java b/core/api/src/main/java/org/onosproject/net/intent/IntentState.java
index e4d7423..73b337c 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentState.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentState.java
@@ -30,7 +30,7 @@
* Intents will also pass through this state when they are updated.
* </p>
*/
- INSTALL_REQ,
+ INSTALL_REQ, // TODO submit_REQ?
/**
* Signifies that the intent is being compiled into installable intents.
@@ -66,7 +66,7 @@
* previously failed to be installed.
* </p>
*/
- RECOMPILING,
+ RECOMPILING, // TODO perhaps repurpose as BROKEN.
/**
* Indicates that an application has requested that an intent be withdrawn.
@@ -92,5 +92,5 @@
* Signifies that the intent has failed compiling, installing or
* recompiling states.
*/
- FAILED
+ FAILED //TODO consider renaming to UNSAT.
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index 534279b..e951389 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -76,9 +76,9 @@
/**
* Adds a new operation, which should be persisted and delegated.
*
- * @param op operation
+ * @param intent operation
*/
- default void add(IntentOperation op) {} //FIXME remove when impl.
+ default void addPending(IntentData intent) {} //FIXME remove when impl.
/**
* Checks to see whether the calling instance is the master for processing
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStoreDelegate.java
index ab56c99..6d0f87e 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStoreDelegate.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStoreDelegate.java
@@ -23,10 +23,10 @@
public interface IntentStoreDelegate extends StoreDelegate<IntentEvent> {
/**
- * Provides an intent operation that should be processed (compiled and
+ * Provides an intent data object that should be processed (compiled and
* installed) by this manager.
*
- * @param op intent operation
+ * @param intentData intent data object
*/
- void process(IntentOperation op);
+ void process(IntentData intentData);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java
new file mode 100644
index 0000000..bc7a7f5
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent.impl;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.AbstractAccumulator;
+import org.onosproject.net.intent.IntentData;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+
+/**
+ * An accumulator for building batches of intent operations. Only one batch should
+ * be in process per instance at a time.
+ */
+public class IntentAccumulator extends AbstractAccumulator<IntentData> {
+
+ private static final int DEFAULT_MAX_EVENTS = 1000;
+ private static final int DEFAULT_MAX_IDLE_MS = 10;
+ private static final int DEFAULT_MAX_BATCH_MS = 50;
+
+ // FIXME: Replace with a system-wide timer instance;
+ // TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
+ private static final Timer TIMER = new Timer("intent-op-batching");
+
+ /**
+ * Creates an intent operation accumulator.
+ */
+ protected IntentAccumulator() {
+ super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
+ }
+
+ @Override
+ public void processEvents(List<IntentData> ops) {
+ Map<String, IntentData> opMap = reduce(ops);
+ // FIXME kick off the work
+ //for (IntentData data : opMap.values()) {}
+ }
+
+ private Map<String, IntentData> reduce(List<IntentData> ops) {
+ Map<String, IntentData> map = Maps.newHashMap();
+ for (IntentData op : ops) {
+ map.put(op.key(), op);
+ }
+ //TODO check the version... or maybe store will handle this.
+ return map;
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 685fa70..f94424f 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -37,6 +37,7 @@
import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentBatchService;
import org.onosproject.net.intent.IntentCompiler;
+import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentException;
import org.onosproject.net.intent.IntentExtensionService;
@@ -128,6 +129,8 @@
private final IntentBatchDelegate batchDelegate = new InternalBatchDelegate();
private IdGenerator idGenerator;
+ private final IntentAccumulator accumulator = new IntentAccumulator();
+
@Activate
public void activate() {
store.setDelegate(delegate);
@@ -154,32 +157,41 @@
@Override
public void submit(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- execute(IntentOperations.builder(intent.appId())
- .addSubmitOperation(intent).build());
+ IntentData data = new IntentData(intent, IntentState.INSTALL_REQ, null);
+ //FIXME timestamp?
+ store.addPending(data);
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- execute(IntentOperations.builder(intent.appId())
- .addWithdrawOperation(intent.id()).build());
+ IntentData data = new IntentData(intent, IntentState.WITHDRAW_REQ, null);
+ //FIXME timestamp?
+ store.addPending(data);
}
@Override
public void replace(IntentId oldIntentId, Intent newIntent) {
- checkNotNull(oldIntentId, INTENT_ID_NULL);
- checkNotNull(newIntent, INTENT_NULL);
- execute(IntentOperations.builder(newIntent.appId())
- .addReplaceOperation(oldIntentId, newIntent)
- .build());
+ throw new UnsupportedOperationException("replace is not implemented");
}
@Override
public void execute(IntentOperations operations) {
- if (operations.operations().isEmpty()) {
- return;
+ for (IntentOperation op : operations.operations()) {
+ switch (op.type()) {
+ case SUBMIT:
+ case UPDATE:
+ submit(op.intent());
+ break;
+ case WITHDRAW:
+ withdraw(op.intent());
+ break;
+ //fallthrough
+ case REPLACE:
+ default:
+ throw new UnsupportedOperationException("replace not supported");
+ }
}
- batchService.addIntentOperations(operations);
}
@Override
@@ -382,8 +394,8 @@
}
@Override
- public void process(IntentOperation op) {
- //FIXME
+ public void process(IntentData data) {
+ accumulator.add(data);
}
}
@@ -488,6 +500,7 @@
}
}
+ // TODO pull out the IntentUpdate inner classes
private class InstallRequest implements IntentUpdate {
private final Intent intent;
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
index d2e102e..f3a5cf0 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
@@ -17,19 +17,19 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.net.intent.BatchWrite;
+import org.onosproject.net.intent.BatchWrite.Operation;
import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
-import org.onosproject.net.intent.BatchWrite.Operation;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
@@ -38,8 +38,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Preconditions.*;
import static org.onosproject.net.intent.IntentState.WITHDRAWN;
import static org.slf4j.LoggerFactory.getLogger;
@@ -50,10 +49,13 @@
implements IntentStore {
private final Logger log = getLogger(getClass());
+
+ // current state maps FIXME.. make this a IntentData map
private final Map<IntentId, Intent> intents = new ConcurrentHashMap<>();
private final Map<IntentId, IntentState> states = new ConcurrentHashMap<>();
private final Map<IntentId, List<Intent>> installable = new ConcurrentHashMap<>();
+ private final Map<String, IntentData> pending = new ConcurrentHashMap<>(); //String is "key"
@Activate
public void activate() {
@@ -203,4 +205,19 @@
}
return failed;
}
+
+ @Override
+ public void addPending(IntentData data) {
+ //FIXME need to compare versions
+ pending.put(data.key(), data);
+ checkNotNull(delegate, "Store delegate is not set")
+ .process(data);
+ }
+ // FIXME!!! pending.remove(intent.key()); // TODO check version
+
+
+ @Override
+ public boolean isMaster(Intent intent) {
+ return true;
+ }
}