[ONOS-6163] (vnet) revise intent service for vnets

Revise intent service for virtual networks.

Change-Id: I4955cde2296e58d8f0e356d19483e84f51193318
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java
index a133c66..e40bf4e 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentManager.java
@@ -16,7 +16,7 @@
 
 package org.onosproject.incubator.net.virtual.impl;
 
-import com.google.common.collect.Iterators;
+import org.onlab.util.Tools;
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
 import org.onosproject.incubator.net.virtual.VirtualNetworkIntentStore;
@@ -25,26 +25,46 @@
 import org.onosproject.incubator.net.virtual.VirtualPort;
 import org.onosproject.incubator.net.virtual.VnetService;
 import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
+import org.onosproject.incubator.net.virtual.impl.intent.phase.VirtualFinalIntentProcessPhase;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentInstallCoordinator;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentAccumulator;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentCompilerRegistry;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentInstallerRegistry;
+import org.onosproject.incubator.net.virtual.impl.intent.phase.VirtualIntentProcessPhase;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentProcessor;
+import org.onosproject.incubator.net.virtual.impl.intent.VirtualIntentSkipped;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.group.GroupService;
 import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentBatchDelegate;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
 import org.onosproject.net.intent.IntentListener;
-import org.onosproject.net.intent.WorkPartitionService;
+import org.onosproject.net.intent.IntentStoreDelegate;
 import org.onosproject.net.intent.IntentService;
 import org.onosproject.net.intent.IntentState;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.resource.ResourceConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.*;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.incubator.net.virtual.impl.intent.phase.VirtualIntentProcessPhase.newInitialPhase;
+import static org.onosproject.net.intent.IntentState.FAILED;
 
 /**
  * Intent service implementation built on the virtual network service.
@@ -55,6 +75,9 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final int DEFAULT_NUM_THREADS = 12;
+    private int numThreads = DEFAULT_NUM_THREADS;
+
     private static final String NETWORK_ID_NULL = "Network ID cannot be null";
     private static final String DEVICE_NULL = "Device cannot be null";
     private static final String INTENT_NULL = "Intent cannot be null";
@@ -63,10 +86,28 @@
     private static final String INTENT_KEY_NULL = "Intent key cannot be null";
     private static final String CP_NULL = "Connect Point cannot be null";
 
-    protected IntentService intentService;
-    protected VirtualNetworkStore store;
+    //FIXME: Tracker service for vnet.
+
+    //ONOS core services
+    protected VirtualNetworkStore virtualNetworkStore;
     protected VirtualNetworkIntentStore intentStore;
-    protected WorkPartitionService partitionService;
+
+    //Virtual network services
+    protected GroupService groupService;
+
+    private final IntentBatchDelegate batchDelegate = new InternalBatchDelegate();
+    private final InternalIntentProcessor processor = new InternalIntentProcessor();
+    private final IntentStoreDelegate delegate = new InternalStoreDelegate();
+    private final VirtualIntentCompilerRegistry compilerRegistry =
+            VirtualIntentCompilerRegistry.getInstance();
+    private final VirtualIntentInstallerRegistry installerRegistry =
+            VirtualIntentInstallerRegistry.getInstance();
+    private final VirtualIntentAccumulator accumulator =
+            new VirtualIntentAccumulator(batchDelegate);
+
+    private VirtualIntentInstallCoordinator installCoordinator;
+    private ExecutorService batchExecutor;
+    private ExecutorService workerExecutor;
 
     /**
      * Creates a new VirtualNetworkIntentService object.
@@ -79,10 +120,18 @@
 
         super(virtualNetworkManager, networkId, IntentEvent.class);
 
-        this.store = serviceDirectory.get(VirtualNetworkStore.class);
+        this.virtualNetworkStore = serviceDirectory.get(VirtualNetworkStore.class);
         this.intentStore = serviceDirectory.get(VirtualNetworkIntentStore.class);
-        this.intentService = serviceDirectory.get(IntentService.class);
-        this.partitionService = serviceDirectory.get(WorkPartitionService.class);
+
+        this.groupService = manager.get(networkId, GroupService.class);
+
+        intentStore.setDelegate(networkId, delegate);
+        batchExecutor = newSingleThreadExecutor(groupedThreads("onos/intent", "batch", log));
+        workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d", log));
+
+        installCoordinator = new VirtualIntentInstallCoordinator(networkId, installerRegistry, intentStore);
+        log.info("Started");
+
     }
 
     @Override
@@ -91,7 +140,8 @@
         checkState(intent instanceof VirtualNetworkIntent, "Only VirtualNetworkIntent is supported.");
         checkArgument(validateIntent((VirtualNetworkIntent) intent), "Invalid Intent");
 
-        intentService.submit(intent);
+        IntentData data = IntentData.submit(intent);
+        intentStore.addPending(networkId, data);
     }
 
     /**
@@ -146,37 +196,19 @@
     @Override
     public void withdraw(Intent intent) {
         checkNotNull(intent, INTENT_NULL);
-        // Withdraws the physical intents created due to the virtual intents.
-        store.getTunnelIds(intent).forEach(tunnelId -> {
-            Key intentKey = Key.of(tunnelId.id(), intent.appId());
-            Intent physicalIntent = intentService.getIntent(intentKey);
-            checkNotNull(physicalIntent, INTENT_NULL);
-
-            // Withdraw the physical intent(s)
-            log.debug("Withdrawing pt-pt intent: " + physicalIntent);
-            intentService.withdraw(physicalIntent);
-        });
-        // Now withdraw the virtual intent
-        log.debug("Withdrawing virtual intent: " + intent);
-        intentService.withdraw(intent);
+        IntentData data = IntentData.withdraw(intent);
+        intentStore.addPending(networkId, data);
     }
 
     @Override
     public void purge(Intent intent) {
         checkNotNull(intent, INTENT_NULL);
-        // Purges the physical intents created for each tunnelId.
-        store.getTunnelIds(intent)
-                .forEach(tunnelId -> {
-                    Key intentKey = Key.of(tunnelId.id(), intent.appId());
-                    Intent physicalIntent = intentService.getIntent(intentKey);
-                    checkNotNull(physicalIntent, INTENT_NULL);
 
-                    // Purge the physical intent(s)
-                    intentService.purge(physicalIntent);
-                    store.removeTunnelId(intent, tunnelId);
-                });
-        // Now purge the virtual intent
-        intentService.purge(intent);
+        IntentData data = IntentData.purge(intent);
+        intentStore.addPending(networkId, data);
+
+        // remove associated group if there is one
+        // FIXME: Remove P2P intent for vnets
     }
 
     @Override
@@ -192,7 +224,9 @@
 
     @Override
     public void addPending(IntentData intentData) {
-        intentService.addPending(intentData);
+        checkNotNull(intentData, INTENT_NULL);
+        //TODO we might consider further checking / assertions
+        intentStore.addPending(networkId, intentData);
     }
 
     @Override
@@ -202,38 +236,177 @@
 
     @Override
     public long getIntentCount() {
-        return Iterators.size(getIntents().iterator());
+        return intentStore.getIntentCount(networkId);
     }
 
     @Override
     public IntentState getIntentState(Key intentKey) {
         checkNotNull(intentKey, KEY_NULL);
-        return Optional.ofNullable(intentStore.getIntentData(networkId, intentKey))
-                .map(IntentData::state)
-                .orElse(null);
+        return intentStore.getIntentState(networkId, intentKey);
     }
 
     @Override
     public List<Intent> getInstallableIntents(Key intentKey) {
-        List<Intent> intents = new ArrayList<>();
-        getIntentData().forEach(intentData -> {
-            if (intentData.intent().key().equals(intentKey)) {
-                intents.addAll(intentData.installables());
-            }
-        });
-        return intents;
+        return intentStore.getInstallableIntents(networkId, intentKey);
     }
 
     @Override
     public boolean isLocal(Key intentKey) {
-        checkNotNull(intentKey, INTENT_KEY_NULL);
-        Intent intent = getIntent(intentKey);
-        checkNotNull(intent, INTENT_NULL);
-        return partitionService.isMine(intentKey, Key::hash);
+        return intentStore.isMaster(networkId, intentKey);
     }
 
     @Override
     public Iterable<Intent> getPending() {
-        return null;
+        return intentStore.getPending(networkId);
+    }
+
+    // Store delegate to re-post events emitted from the store.
+    private class InternalStoreDelegate implements IntentStoreDelegate {
+        @Override
+        public void notify(IntentEvent event) {
+            post(event);
+            switch (event.type()) {
+                case WITHDRAWN:
+                    //FIXME: release resources
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        @Override
+        public void process(IntentData data) {
+            accumulator.add(data);
+        }
+
+        @Override
+        public void onUpdate(IntentData intentData) {
+            //FIXME: track intent
+        }
+
+        private void releaseResources(Intent intent) {
+            // If a resource group is set on the intent, the resource consumer is
+            // set equal to it. Otherwise it's set to the intent key
+            ResourceConsumer resourceConsumer =
+                    intent.resourceGroup() != null ? intent.resourceGroup() : intent.key();
+
+            // By default the resource doesn't get released
+            boolean removeResource = false;
+
+            if (intent.resourceGroup() == null) {
+                // If the intent doesn't have a resource group, it means the
+                // resource was registered using the intent key, so it can be
+                // released
+                removeResource = true;
+            } else {
+                // When a resource group is set, we make sure there are no other
+                // intents using the same resource group, before deleting the
+                // related resources.
+                Long remainingIntents =
+                        Tools.stream(intentStore.getIntents(networkId))
+                                .filter(i -> {
+                                    return i.resourceGroup() != null
+                                            && i.resourceGroup().equals(intent.resourceGroup());
+                                })
+                                .count();
+                if (remainingIntents == 0) {
+                    removeResource = true;
+                }
+            }
+
+            if (removeResource) {
+                // Release resources allocated to withdrawn intent
+                // FIXME: confirm resources are released
+            }
+        }
+    }
+
+    private class InternalBatchDelegate implements IntentBatchDelegate {
+        @Override
+        public void execute(Collection<IntentData> operations) {
+            log.debug("Execute {} operation(s).", operations.size());
+            log.trace("Execute operations: {}", operations);
+
+            // batchExecutor is single-threaded, so only one batch is in flight at a time
+            CompletableFuture.runAsync(() -> {
+                // process intent until the phase reaches one of the final phases
+                List<CompletableFuture<IntentData>> futures = operations.stream()
+                        .map(data -> {
+                            log.debug("Start processing of {} {}@{}", data.request(), data.key(), data.version());
+                            return data;
+                        })
+                        .map(x -> CompletableFuture.completedFuture(x)
+                                .thenApply(VirtualNetworkIntentManager.this::createInitialPhase)
+                                .thenApplyAsync(VirtualIntentProcessPhase::process, workerExecutor)
+                                .thenApply(VirtualFinalIntentProcessPhase::data)
+                                .exceptionally(e -> {
+                                    // When the future fails, we update the Intent to simulate the failure of
+                                    // the installation/withdrawal phase and we save in the current map. In
+                                    // the next round the CleanUp Thread will pick this Intent again.
+                                    log.warn("Future failed", e);
+                                    log.warn("Intent {} - state {} - request {}",
+                                             x.key(), x.state(), x.request());
+                                    switch (x.state()) {
+                                        case INSTALL_REQ:
+                                        case INSTALLING:
+                                        case WITHDRAW_REQ:
+                                        case WITHDRAWING:
+                                            // TODO should we swtich based on current
+                                            IntentData current = intentStore.getIntentData(networkId, x.key());
+                                            return IntentData.nextState(current, FAILED);
+                                        default:
+                                            return null;
+                                    }
+                                }))
+                        .collect(Collectors.toList());
+
+                // write multiple data to store in order
+                intentStore.batchWrite(networkId, Tools.allOf(futures).join().stream()
+                                         .filter(Objects::nonNull)
+                                         .collect(Collectors.toList()));
+            }, batchExecutor).exceptionally(e -> {
+                log.error("Error submitting batches:", e);
+                // FIXME incomplete Intents should be cleaned up
+                //       (transition to FAILED, etc.)
+
+                // the batch has failed
+                // TODO: maybe we should do more?
+                log.error("Walk the plank, matey...");
+                return null;
+            }).thenRun(accumulator::ready);
+
+        }
+    }
+
+    private VirtualIntentProcessPhase createInitialPhase(IntentData data) {
+        IntentData pending = intentStore.getPendingData(networkId, data.key());
+        if (pending == null || pending.version().isNewerThan(data.version())) {
+            /*
+                If the pending map is null, then this intent was compiled by a
+                previous batch iteration, so we can skip it.
+                If the pending map has a newer request, it will get compiled as
+                part of the next batch, so we can skip it.
+             */
+            return VirtualIntentSkipped.getPhase();
+        }
+        IntentData current = intentStore.getIntentData(networkId, data.key());
+        return newInitialPhase(networkId, processor, data, current);
+    }
+
+    private class InternalIntentProcessor implements VirtualIntentProcessor {
+        @Override
+        public List<Intent> compile(NetworkId networkId,
+                                    Intent intent,
+                                    List<Intent> previousInstallables) {
+            return compilerRegistry.compile(networkId, intent, previousInstallables);
+        }
+
+        @Override
+        public void apply(NetworkId networkId,
+                          Optional<IntentData> toUninstall,
+                          Optional<IntentData> toInstall) {
+
+            installCoordinator.installIntents(toUninstall, toInstall);
+        }
     }
 }