blob: eabee8079a11dfce55fc7e57695079d765c68a82 [file] [log] [blame]
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.net.intent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.onos.net.intent.IntentState.COMPILING;
import static org.onlab.onos.net.intent.IntentState.FAILED;
import static org.onlab.onos.net.intent.IntentState.INSTALLED;
import static org.onlab.onos.net.intent.IntentState.INSTALLING;
import static org.onlab.onos.net.intent.IntentState.RECOMPILING;
import static org.onlab.onos.net.intent.IntentState.WITHDRAWING;
import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentException;
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* An implementation of Intent Manager.
*/
@Component(immediate = true)
@Service
public class IntentManager
implements IntentService, IntentExtensionService {
private final Logger log = getLogger(getClass());
public static final String INTENT_NULL = "Intent cannot be null";
public static final String INTENT_ID_NULL = "Intent ID cannot be null";
// Collections for compiler, installer, and listener are ONOS instance local
private final ConcurrentMap<Class<? extends Intent>,
IntentCompiler<? extends Intent>> compilers = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<? extends Intent>,
IntentInstaller<? extends Intent>> installers = new ConcurrentHashMap<>();
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
private ExecutorService executor;
private ExecutorService monitorExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ObjectiveTrackerService trackerService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Activate
public void activate() {
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
executor = newSingleThreadExecutor(namedThreads("onos-intents"));
monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
log.info("Started");
}
@Deactivate
public void deactivate() {
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
executor.shutdown();
monitorExecutor.shutdown();
log.info("Stopped");
}
@Override
public void submit(Intent intent) {
checkNotNull(intent, INTENT_NULL);
registerSubclassCompilerIfNeeded(intent);
IntentEvent event = store.createIntent(intent);
if (event != null) {
eventDispatcher.post(event);
executor.execute(new IntentTask(COMPILING, intent));
}
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
executor.execute(new IntentTask(WITHDRAWING, intent));
}
// FIXME: implement this method
@Override
public void replace(IntentId oldIntentId, Intent newIntent) {
throw new UnsupportedOperationException("execute() is not implemented yet");
}
// FIXME: implement this method
@Override
public Future<IntentOperations> execute(IntentOperations operations) {
throw new UnsupportedOperationException("execute() is not implemented yet");
}
@Override
public Iterable<Intent> getIntents() {
return store.getIntents();
}
@Override
public long getIntentCount() {
return store.getIntentCount();
}
@Override
public Intent getIntent(IntentId id) {
checkNotNull(id, INTENT_ID_NULL);
return store.getIntent(id);
}
@Override
public IntentState getIntentState(IntentId id) {
checkNotNull(id, INTENT_ID_NULL);
return store.getIntentState(id);
}
@Override
public List<Intent> getInstallableIntents(IntentId intentId) {
checkNotNull(intentId, INTENT_ID_NULL);
return store.getInstallableIntents(intentId);
}
@Override
public void addListener(IntentListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(IntentListener listener) {
listenerRegistry.removeListener(listener);
}
@Override
public <T extends Intent> void registerCompiler(Class<T> cls, IntentCompiler<T> compiler) {
compilers.put(cls, compiler);
}
@Override
public <T extends Intent> void unregisterCompiler(Class<T> cls) {
compilers.remove(cls);
}
@Override
public Map<Class<? extends Intent>, IntentCompiler<? extends Intent>> getCompilers() {
return ImmutableMap.copyOf(compilers);
}
@Override
public <T extends Intent> void registerInstaller(Class<T> cls, IntentInstaller<T> installer) {
installers.put(cls, installer);
}
@Override
public <T extends Intent> void unregisterInstaller(Class<T> cls) {
installers.remove(cls);
}
@Override
public Map<Class<? extends Intent>, IntentInstaller<? extends Intent>> getInstallers() {
return ImmutableMap.copyOf(installers);
}
/**
* Returns the corresponding intent compiler to the specified intent.
*
* @param intent intent
* @param <T> the type of intent
* @return intent compiler corresponding to the specified intent
*/
private <T extends Intent> IntentCompiler<T> getCompiler(T intent) {
@SuppressWarnings("unchecked")
IntentCompiler<T> compiler = (IntentCompiler<T>) compilers.get(intent.getClass());
if (compiler == null) {
throw new IntentException("no compiler for class " + intent.getClass());
}
return compiler;
}
/**
* Returns the corresponding intent installer to the specified installable intent.
*
* @param intent intent
* @param <T> the type of installable intent
* @return intent installer corresponding to the specified installable intent
*/
private <T extends Intent> IntentInstaller<T> getInstaller(T intent) {
@SuppressWarnings("unchecked")
IntentInstaller<T> installer = (IntentInstaller<T>) installers.get(intent.getClass());
if (installer == null) {
throw new IntentException("no installer for class " + intent.getClass());
}
return installer;
}
/**
* Compiles the specified intent.
*
* @param intent intent to be compiled
*/
private void executeCompilingPhase(Intent intent) {
// Indicate that the intent is entering the compiling phase.
store.setState(intent, COMPILING);
try {
// Compile the intent into installable derivatives.
List<Intent> installable = compileIntent(intent);
// If all went well, associate the resulting list of installable
// intents with the top-level intent and proceed to install.
store.addInstallableIntents(intent.id(), installable);
executeInstallingPhase(intent);
} catch (Exception e) {
log.warn("Unable to compile intent {} due to:", intent.id(), e);
// If compilation failed, mark the intent as failed.
IntentEvent event = store.setState(intent, FAILED);
if (event != null) {
eventDispatcher.post(event);
}
}
}
/**
* Compiles an intent recursively.
*
* @param intent intent
* @return result of compilation
*/
private List<Intent> compileIntent(Intent intent) {
if (intent.isInstallable()) {
return ImmutableList.of(intent);
}
List<Intent> installable = new ArrayList<>();
// TODO do we need to registerSubclassCompiler?
for (Intent compiled : getCompiler(intent).compile(intent)) {
installable.addAll(compileIntent(compiled));
}
return installable;
}
/**
* Installs all installable intents associated with the specified top-level
* intent.
*
* @param intent intent to be installed
*/
private void executeInstallingPhase(Intent intent) {
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
List<FlowRuleBatchOperation> installWork = Lists.newArrayList();
try {
List<Intent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (Intent installable : installables) {
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.resources());
List<FlowRuleBatchOperation> batch = getInstaller(installable).install(installable);
installWork.addAll(batch);
}
}
// FIXME we have to wait for the installable intents
//eventDispatcher.post(store.setState(intent, INSTALLED));
monitorExecutor.execute(new IntentInstallMonitor(intent, installWork, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to:", intent.id(), e);
uninstallIntent(intent, RECOMPILING);
// If compilation failed, kick off the recompiling phase.
// FIXME
//executeRecompilingPhase(intent);
}
}
/**
* Recompiles the specified intent.
*
* @param intent intent to be recompiled
*/
private void executeRecompilingPhase(Intent intent) {
// Indicate that the intent is entering the recompiling phase.
store.setState(intent, RECOMPILING);
try {
// Compile the intent into installable derivatives.
List<Intent> installable = compileIntent(intent);
// If all went well, compare the existing list of installable
// intents with the newly compiled list. If they are the same,
// bail, out since the previous approach was determined not to
// be viable.
List<Intent> originalInstallable = store.getInstallableIntents(intent.id());
if (Objects.equals(originalInstallable, installable)) {
eventDispatcher.post(store.setState(intent, FAILED));
} else {
// Otherwise, re-associate the newly compiled installable intents
// with the top-level intent and kick off installing phase.
store.addInstallableIntents(intent.id(), installable);
executeInstallingPhase(intent);
}
} catch (Exception e) {
log.warn("Unable to recompile intent {} due to:", intent.id(), e);
// If compilation failed, mark the intent as failed.
eventDispatcher.post(store.setState(intent, FAILED));
}
}
/**
* Uninstalls the specified intent by uninstalling all of its associated
* installable derivatives.
*
* @param intent intent to be installed
*/
private void executeWithdrawingPhase(Intent intent) {
// Indicate that the intent is being withdrawn.
store.setState(intent, WITHDRAWING);
uninstallIntent(intent, WITHDRAWN);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
// FIXME need to clean up
//store.removeInstalledIntents(intent.id());
// FIXME
//eventDispatcher.post(store.setState(intent, WITHDRAWN));
}
/**
* Uninstalls all installable intents associated with the given intent.
*
* @param intent intent to be uninstalled
*/
private void uninstallIntent(Intent intent, IntentState nextState) {
List<FlowRuleBatchOperation> uninstallWork = Lists.newArrayList();
try {
List<Intent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (Intent installable : installables) {
List<FlowRuleBatchOperation> batches = getInstaller(installable).uninstall(installable);
uninstallWork.addAll(batches);
}
}
monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallWork, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to:", intent.id(), e);
}
}
/**
* Registers an intent compiler of the specified intent if an intent compiler
* for the intent is not registered. This method traverses the class hierarchy of
* the intent. Once an intent compiler for a parent type is found, this method
* registers the found intent compiler.
*
* @param intent intent
*/
private void registerSubclassCompilerIfNeeded(Intent intent) {
if (!compilers.containsKey(intent.getClass())) {
Class<?> cls = intent.getClass();
while (cls != Object.class) {
// As long as we're within the Intent class descendants
if (Intent.class.isAssignableFrom(cls)) {
IntentCompiler<?> compiler = compilers.get(cls);
if (compiler != null) {
compilers.put(intent.getClass(), compiler);
return;
}
}
cls = cls.getSuperclass();
}
}
}
/**
* Registers an intent installer of the specified intent if an intent installer
* for the intent is not registered. This method traverses the class hierarchy of
* the intent. Once an intent installer for a parent type is found, this method
* registers the found intent installer.
*
* @param intent intent
*/
private void registerSubclassInstallerIfNeeded(Intent intent) {
if (!installers.containsKey(intent.getClass())) {
Class<?> cls = intent.getClass();
while (cls != Object.class) {
// As long as we're within the Intent class descendants
if (Intent.class.isAssignableFrom(cls)) {
IntentInstaller<?> installer = installers.get(cls);
if (installer != null) {
installers.put(intent.getClass(), installer);
return;
}
}
cls = cls.getSuperclass();
}
}
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements IntentStoreDelegate {
@Override
public void notify(IntentEvent event) {
eventDispatcher.post(event);
if (event.type() == IntentEvent.Type.SUBMITTED) {
executor.execute(new IntentTask(COMPILING, event.subject()));
}
}
}
// Topology change delegate
private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
@Override
public void triggerCompile(Iterable<IntentId> intentIds,
boolean compileAllFailed) {
// Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
uninstallIntent(intent, RECOMPILING);
//FIXME
//executeRecompilingPhase(intent);
}
if (compileAllFailed) {
// If required, compile all currently failed intents.
for (Intent intent : getIntents()) {
if (getIntentState(intent.id()) == FAILED) {
executeCompilingPhase(intent);
}
}
}
}
}
// Auxiliary runnable to perform asynchronous tasks.
private class IntentTask implements Runnable {
private final IntentState state;
private final Intent intent;
public IntentTask(IntentState state, Intent intent) {
this.state = state;
this.intent = intent;
}
@Override
public void run() {
if (state == COMPILING) {
executeCompilingPhase(intent);
} else if (state == RECOMPILING) {
executeRecompilingPhase(intent);
} else if (state == WITHDRAWING) {
executeWithdrawingPhase(intent);
}
}
}
private class IntentInstallMonitor implements Runnable {
private final Intent intent;
private final List<FlowRuleBatchOperation> work;
private final List<Future<CompletedBatchOperation>> futures;
private final IntentState nextState;
public IntentInstallMonitor(Intent intent,
List<FlowRuleBatchOperation> work,
IntentState nextState) {
this.intent = intent;
this.work = work;
// TODO how many Futures can be outstanding? one?
this.futures = Lists.newLinkedList();
this.nextState = nextState;
// TODO need to kick off the first batch sometime, why not now?
futures.add(applyNextBatch());
}
/**
* Update the intent store with the next status for this intent.
*/
private void updateIntent() {
if (nextState == RECOMPILING) {
executor.execute(new IntentTask(nextState, intent));
} else if (nextState == INSTALLED || nextState == WITHDRAWN) {
eventDispatcher.post(store.setState(intent, nextState));
} else {
log.warn("Invalid next intent state {} for intent {}", nextState, intent);
}
}
/**
* Applies the next batch.
*/
private Future<CompletedBatchOperation> applyNextBatch() {
if (work.isEmpty()) {
return null;
}
FlowRuleBatchOperation batch = work.remove(0);
return flowRuleService.applyBatch(batch);
}
/**
* Iterate through the pending futures, and remove them when they have completed.
*/
private void processFutures() {
List<Future<CompletedBatchOperation>> newFutures = Lists.newArrayList();
for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
Future<CompletedBatchOperation> future = i.next();
try {
// TODO: we may want to get the future here and go back to the future.
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
if (completed.isSuccess()) {
Future<CompletedBatchOperation> newFuture = applyNextBatch();
if (newFuture != null) {
// we'll add this later so that we don't get a ConcurrentModException
newFutures.add(newFuture);
}
} else {
// TODO check if future succeeded and if not report fail items
log.warn("Failed items: {}", completed.failedItems());
// TODO revert....
//uninstallIntent(intent, RECOMPILING);
}
i.remove();
} catch (TimeoutException | InterruptedException | ExecutionException te) {
log.debug("Intallations of intent {} is still pending", intent);
}
}
futures.addAll(newFutures);
}
@Override
public void run() {
processFutures();
if (futures.isEmpty()) {
// woohoo! we are done!
updateIntent();
} else {
// resubmit ourselves if we are not done yet
monitorExecutor.submit(this);
}
}
}
}