blob: d81435fd2e67734b8cc38e2c80070d3497e9e803 [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.impl.EventuallyConsistentMap;
import org.onosproject.store.impl.EventuallyConsistentMapEvent;
import org.onosproject.store.impl.EventuallyConsistentMapImpl;
import org.onosproject.store.impl.EventuallyConsistentMapListener;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.SystemClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
import static org.onosproject.net.intent.IntentState.FAILED;
import static org.onosproject.net.intent.IntentState.INSTALLED;
import static org.onosproject.net.intent.IntentState.INSTALLING;
import static org.onosproject.net.intent.IntentState.WITHDRAWING;
import static org.onosproject.net.intent.IntentState.WITHDRAWN;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of Intents in a distributed data store that uses optimistic
* replication and gossip based techniques.
*/
@Component(immediate = false, enabled = true)
@Service
public class GossipIntentStore
extends AbstractStore<IntentEvent, IntentStoreDelegate>
implements IntentStore {
private final Logger log = getLogger(getClass());
// Map of intent key => current intent state
private EventuallyConsistentMap<Key, IntentData> currentState;
// Map of intent key => pending intent operation
private EventuallyConsistentMap<Key, IntentData> pending;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionService partitionService;
@Activate
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(IntentData.class)
.register(MultiValuedTimestamp.class)
.register(SystemClockTimestamp.class);
currentState = new EventuallyConsistentMapImpl<>("intent-current",
clusterService,
clusterCommunicator,
intentSerializer,
new IntentDataLogicalClockManager<>());
pending = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
new IntentDataClockManager<>());
currentState.addListener(new InternalIntentStatesListener());
pending.addListener(new InternalPendingListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
currentState.destroy();
pending.destroy();
log.info("Stopped");
}
@Override
public long getIntentCount() {
return currentState.size();
}
@Override
public Iterable<Intent> getIntents() {
return currentState.values().stream()
.map(IntentData::intent)
.collect(Collectors.toList());
}
@Override
public IntentState getIntentState(Key intentKey) {
IntentData data = currentState.get(intentKey);
if (data != null) {
return data.state();
}
return null;
}
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
IntentData data = currentState.get(intentKey);
if (data != null) {
return data.installables();
}
return null;
}
@Override
public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
// Deprecated
return null;
}
private IntentData copyData(IntentData original) {
if (original == null) {
return null;
}
IntentData result =
new IntentData(original.intent(), original.state(), original.version());
if (original.installables() != null) {
result.setInstallables(original.installables());
}
return result;
}
/**
* TODO.
* @param currentData
* @param newData
* @return
*/
private boolean isUpdateAcceptable(IntentData currentData, IntentData newData) {
if (currentData == null) {
return true;
} else if (currentData.version().compareTo(newData.version()) < 0) {
return true;
} else if (currentData.version().compareTo(newData.version()) > 0) {
return false;
}
// current and new data versions are the same
IntentState currentState = currentData.state();
IntentState newState = newData.state();
switch (newState) {
case INSTALLING:
if (currentState == INSTALLING) {
return false;
}
// FALLTHROUGH
case INSTALLED:
if (currentState == INSTALLED) {
return false;
} else if (currentState == WITHDRAWING || currentState == WITHDRAWN) {
log.warn("Invalid state transition from {} to {} for intent {}",
currentState, newState, newData.key());
return false;
}
return true;
case WITHDRAWING:
if (currentState == WITHDRAWING) {
return false;
}
// FALLTHROUGH
case WITHDRAWN:
if (currentState == WITHDRAWN) {
return false;
} else if (currentState == INSTALLING || currentState == INSTALLED) {
log.warn("Invalid state transition from {} to {} for intent {}",
currentState, newState, newData.key());
return false;
}
return true;
case FAILED:
if (currentState == FAILED) {
return false;
}
return true;
case COMPILING:
case RECOMPILING:
case INSTALL_REQ:
case WITHDRAW_REQ:
default:
log.warn("Invalid state {} for intent {}", newState, newData.key());
return false;
}
}
@Override
public void write(IntentData newData) {
//log.debug("writing intent {}", newData);
IntentData currentData = currentState.get(newData.key());
if (isUpdateAcceptable(currentData, newData)) {
// Only the master is modifying the current state. Therefore assume
// this always succeeds
currentState.put(newData.key(), copyData(newData));
// if current.put succeeded
pending.remove(newData.key(), newData);
} else {
log.debug("not writing update: {}", newData);
}
/*try {
notifyDelegate(IntentEvent.getEvent(newData));
} catch (IllegalArgumentException e) {
//no-op
log.trace("ignore this exception: {}", e);
}*/
}
@Override
public void batchWrite(Iterable<IntentData> updates) {
updates.forEach(this::write);
}
@Override
public Intent getIntent(Key key) {
IntentData data = currentState.get(key);
if (data != null) {
return data.intent();
}
return null;
}
@Override
public IntentData getIntentData(Key key) {
return copyData(currentState.get(key));
}
@Override
public void addPending(IntentData data) {
log.debug("new call to pending {}", data);
if (data.version() == null) {
log.debug("updating timestamp");
data.setVersion(new SystemClockTimestamp());
}
pending.put(data.key(), copyData(data));
}
@Override
public boolean isMaster(Intent intent) {
return partitionService.isMine(intent.key());
}
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
private final class InternalIntentStatesListener implements
EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(
EventuallyConsistentMapEvent<Key, IntentData> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
IntentEvent externalEvent;
IntentData intentData = event.value();
try {
externalEvent = IntentEvent.getEvent(intentData.state(), intentData.intent());
} catch (IllegalArgumentException e) {
externalEvent = null;
}
notifyDelegateIfNotNull(externalEvent);
}
}
}
private final class InternalPendingListener implements
EventuallyConsistentMapListener<Key, IntentData> {
@Override
public void event(
EventuallyConsistentMapEvent<Key, IntentData> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
// The pending intents map has been updated. If we are master for
// this intent's partition, notify the Manager that it should do
// some work.
if (isMaster(event.value().intent())) {
if (delegate != null) {
delegate.process(copyData(event.value()));
}
}
try {
notifyDelegate(IntentEvent.getEvent(event.value()));
} catch (IllegalArgumentException e) {
//no-op
log.trace("ignore this exception: {}", e);
}
}
}
}
}