[ONOS-6163] (vnet) revise intent service for vnets
Revise intent service for virtual networks.
Change-Id: I4955cde2296e58d8f0e356d19483e84f51193318
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java
index a133c66..e40bf4e 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java
@@ -16,7 +16,7 @@
package org.onosproject.incubator.net.virtual.impl;
-import com.google.common.collect.Iterators;
+import org.onlab.util.Tools;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
import org.onosproject.incubator.net.virtual.VirtualNetworkIntentStore;
@@ -25,26 +25,46 @@
import org.onosproject.incubator.net.virtual.VirtualPort;
import org.onosproject.incubator.net.virtual.VnetService;
import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
+import org.onosproject.incubator.net.virtual.impl.intent.phase.VirtualFinalIntentProcessPhase;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentInstallCoordinator;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentAccumulator;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentCompilerRegistry;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentInstallerRegistry;
+import org.onosproject.incubator.net.virtual.impl.intent.phase.VirtualIntentProcessPhase;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentProcessor;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentSkipped;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.group.GroupService;
import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
-import org.onosproject.net.intent.WorkPartitionService;
+import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.resource.ResourceConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.*;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.incubator.net.virtual.impl.intent.phase.VirtualIntentProcessPhase.newInitialPhase;
+import static org.onosproject.net.intent.IntentState.FAILED;
/**
* Intent service implementation built on the virtual network service.
@@ -55,6 +75,9 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final int DEFAULT_NUM_THREADS = 12;
+ private int numThreads = DEFAULT_NUM_THREADS;
+
private static final String NETWORK_ID_NULL = "Network ID cannot be null";
private static final String DEVICE_NULL = "Device cannot be null";
private static final String INTENT_NULL = "Intent cannot be null";
@@ -63,10 +86,28 @@
private static final String INTENT_KEY_NULL = "Intent key cannot be null";
private static final String CP_NULL = "Connect Point cannot be null";
- protected IntentService intentService;
- protected VirtualNetworkStore store;
+ //FIXME: Tracker service for vnet.
+
+ //ONOS core services
+ protected VirtualNetworkStore virtualNetworkStore;
protected VirtualNetworkIntentStore intentStore;
- protected WorkPartitionService partitionService;
+
+ //Virtual network services
+ protected GroupService groupService;
+
+ private final IntentBatchDelegate batchDelegate = new InternalBatchDelegate();
+ private final InternalIntentProcessor processor = new InternalIntentProcessor();
+ private final IntentStoreDelegate delegate = new InternalStoreDelegate();
+ private final VirtualIntentCompilerRegistry compilerRegistry =
+ VirtualIntentCompilerRegistry.getInstance();
+ private final VirtualIntentInstallerRegistry installerRegistry =
+ VirtualIntentInstallerRegistry.getInstance();
+ private final VirtualIntentAccumulator accumulator =
+ new VirtualIntentAccumulator(batchDelegate);
+
+ private VirtualIntentInstallCoordinator installCoordinator;
+ private ExecutorService batchExecutor;
+ private ExecutorService workerExecutor;
/**
* Creates a new VirtualNetworkIntentService object.
@@ -79,10 +120,18 @@
super(virtualNetworkManager, networkId, IntentEvent.class);
- this.store = serviceDirectory.get(VirtualNetworkStore.class);
+ this.virtualNetworkStore = serviceDirectory.get(VirtualNetworkStore.class);
this.intentStore = serviceDirectory.get(VirtualNetworkIntentStore.class);
- this.intentService = serviceDirectory.get(IntentService.class);
- this.partitionService = serviceDirectory.get(WorkPartitionService.class);
+
+ this.groupService = manager.get(networkId, GroupService.class);
+
+ intentStore.setDelegate(networkId, delegate);
+ batchExecutor = newSingleThreadExecutor(groupedThreads("onos/intent", "batch", log));
+ workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d", log));
+
+ installCoordinator = new VirtualIntentInstallCoordinator(networkId, installerRegistry, intentStore);
+ log.info("Started");
+
}
@Override
@@ -91,7 +140,8 @@
checkState(intent instanceof VirtualNetworkIntent, "Only VirtualNetworkIntent is supported.");
checkArgument(validateIntent((VirtualNetworkIntent) intent), "Invalid Intent");
- intentService.submit(intent);
+ IntentData data = IntentData.submit(intent);
+ intentStore.addPending(networkId, data);
}
/**
@@ -146,37 +196,19 @@
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- // Withdraws the physical intents created due to the virtual intents.
- store.getTunnelIds(intent).forEach(tunnelId -> {
- Key intentKey = Key.of(tunnelId.id(), intent.appId());
- Intent physicalIntent = intentService.getIntent(intentKey);
- checkNotNull(physicalIntent, INTENT_NULL);
-
- // Withdraw the physical intent(s)
- log.debug("Withdrawing pt-pt intent: " + physicalIntent);
- intentService.withdraw(physicalIntent);
- });
- // Now withdraw the virtual intent
- log.debug("Withdrawing virtual intent: " + intent);
- intentService.withdraw(intent);
+ IntentData data = IntentData.withdraw(intent);
+ intentStore.addPending(networkId, data);
}
@Override
public void purge(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- // Purges the physical intents created for each tunnelId.
- store.getTunnelIds(intent)
- .forEach(tunnelId -> {
- Key intentKey = Key.of(tunnelId.id(), intent.appId());
- Intent physicalIntent = intentService.getIntent(intentKey);
- checkNotNull(physicalIntent, INTENT_NULL);
- // Purge the physical intent(s)
- intentService.purge(physicalIntent);
- store.removeTunnelId(intent, tunnelId);
- });
- // Now purge the virtual intent
- intentService.purge(intent);
+ IntentData data = IntentData.purge(intent);
+ intentStore.addPending(networkId, data);
+
+ // remove associated group if there is one
+ // FIXME: Remove P2P intent for vnets
}
@Override
@@ -192,7 +224,9 @@
@Override
public void addPending(IntentData intentData) {
- intentService.addPending(intentData);
+ checkNotNull(intentData, INTENT_NULL);
+ //TODO we might consider further checking / assertions
+ intentStore.addPending(networkId, intentData);
}
@Override
@@ -202,38 +236,177 @@
@Override
public long getIntentCount() {
- return Iterators.size(getIntents().iterator());
+ return intentStore.getIntentCount(networkId);
}
@Override
public IntentState getIntentState(Key intentKey) {
checkNotNull(intentKey, KEY_NULL);
- return Optional.ofNullable(intentStore.getIntentData(networkId, intentKey))
- .map(IntentData::state)
- .orElse(null);
+ return intentStore.getIntentState(networkId, intentKey);
}
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
- List<Intent> intents = new ArrayList<>();
- getIntentData().forEach(intentData -> {
- if (intentData.intent().key().equals(intentKey)) {
- intents.addAll(intentData.installables());
- }
- });
- return intents;
+ return intentStore.getInstallableIntents(networkId, intentKey);
}
@Override
public boolean isLocal(Key intentKey) {
- checkNotNull(intentKey, INTENT_KEY_NULL);
- Intent intent = getIntent(intentKey);
- checkNotNull(intent, INTENT_NULL);
- return partitionService.isMine(intentKey, Key::hash);
+ return intentStore.isMaster(networkId, intentKey);
}
@Override
public Iterable<Intent> getPending() {
- return null;
+ return intentStore.getPending(networkId);
+ }
+
+ // Store delegate to re-post events emitted from the store.
+ private class InternalStoreDelegate implements IntentStoreDelegate {
+ @Override
+ public void notify(IntentEvent event) {
+ post(event);
+ switch (event.type()) {
+ case WITHDRAWN:
+ //FIXME: release resources
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void process(IntentData data) {
+ accumulator.add(data);
+ }
+
+ @Override
+ public void onUpdate(IntentData intentData) {
+ //FIXME: track intent
+ }
+
+ private void releaseResources(Intent intent) {
+ // If a resource group is set on the intent, the resource consumer is
+ // set equal to it. Otherwise it's set to the intent key
+ ResourceConsumer resourceConsumer =
+ intent.resourceGroup() != null ? intent.resourceGroup() : intent.key();
+
+ // By default the resource doesn't get released
+ boolean removeResource = false;
+
+ if (intent.resourceGroup() == null) {
+ // If the intent doesn't have a resource group, it means the
+ // resource was registered using the intent key, so it can be
+ // released
+ removeResource = true;
+ } else {
+ // When a resource group is set, we make sure there are no other
+ // intents using the same resource group, before deleting the
+ // related resources.
+ Long remainingIntents =
+ Tools.stream(intentStore.getIntents(networkId))
+ .filter(i -> {
+ return i.resourceGroup() != null
+ && i.resourceGroup().equals(intent.resourceGroup());
+ })
+ .count();
+ if (remainingIntents == 0) {
+ removeResource = true;
+ }
+ }
+
+ if (removeResource) {
+ // Release resources allocated to withdrawn intent
+ // FIXME: confirm resources are released
+ }
+ }
+ }
+
+ private class InternalBatchDelegate implements IntentBatchDelegate {
+ @Override
+ public void execute(Collection<IntentData> operations) {
+ log.debug("Execute {} operation(s).", operations.size());
+ log.trace("Execute operations: {}", operations);
+
+ // batchExecutor is single-threaded, so only one batch is in flight at a time
+ CompletableFuture.runAsync(() -> {
+ // process intent until the phase reaches one of the final phases
+ List<CompletableFuture<IntentData>> futures = operations.stream()
+ .map(data -> {
+ log.debug("Start processing of {} {}@{}", data.request(), data.key(), data.version());
+ return data;
+ })
+ .map(x -> CompletableFuture.completedFuture(x)
+ .thenApply(VirtualNetworkIntentManager.this::createInitialPhase)
+ .thenApplyAsync(VirtualIntentProcessPhase::process, workerExecutor)
+ .thenApply(VirtualFinalIntentProcessPhase::data)
+ .exceptionally(e -> {
+ // When the future fails, we update the Intent to simulate the failure of
+ // the installation/withdrawal phase and we save in the current map. In
+ // the next round the CleanUp Thread will pick this Intent again.
+ log.warn("Future failed", e);
+ log.warn("Intent {} - state {} - request {}",
+ x.key(), x.state(), x.request());
+ switch (x.state()) {
+ case INSTALL_REQ:
+ case INSTALLING:
+ case WITHDRAW_REQ:
+ case WITHDRAWING:
+ // TODO should we swtich based on current
+ IntentData current = intentStore.getIntentData(networkId, x.key());
+ return IntentData.nextState(current, FAILED);
+ default:
+ return null;
+ }
+ }))
+ .collect(Collectors.toList());
+
+ // write multiple data to store in order
+ intentStore.batchWrite(networkId, Tools.allOf(futures).join().stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()));
+ }, batchExecutor).exceptionally(e -> {
+ log.error("Error submitting batches:", e);
+ // FIXME incomplete Intents should be cleaned up
+ // (transition to FAILED, etc.)
+
+ // the batch has failed
+ // TODO: maybe we should do more?
+ log.error("Walk the plank, matey...");
+ return null;
+ }).thenRun(accumulator::ready);
+
+ }
+ }
+
+ private VirtualIntentProcessPhase createInitialPhase(IntentData data) {
+ IntentData pending = intentStore.getPendingData(networkId, data.key());
+ if (pending == null || pending.version().isNewerThan(data.version())) {
+ /*
+ If the pending map is null, then this intent was compiled by a
+ previous batch iteration, so we can skip it.
+ If the pending map has a newer request, it will get compiled as
+ part of the next batch, so we can skip it.
+ */
+ return VirtualIntentSkipped.getPhase();
+ }
+ IntentData current = intentStore.getIntentData(networkId, data.key());
+ return newInitialPhase(networkId, processor, data, current);
+ }
+
+ private class InternalIntentProcessor implements VirtualIntentProcessor {
+ @Override
+ public List<Intent> compile(NetworkId networkId,
+ Intent intent,
+ List<Intent> previousInstallables) {
+ return compilerRegistry.compile(networkId, intent, previousInstallables);
+ }
+
+ @Override
+ public void apply(NetworkId networkId,
+ Optional<IntentData> toUninstall,
+ Optional<IntentData> toInstall) {
+
+ installCoordinator.installIntents(toUninstall, toInstall);
+ }
}
}