blob: e26841b60b8783a23ccfb7fb47307f80f50d04f7 [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFAgentAdminService;
import org.onosproject.ofagent.api.OFAgentEvent;
import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFAgentService;
import org.onosproject.ofagent.api.OFAgentStore;
import org.onosproject.ofagent.api.OFAgentStoreDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
/**
* Implementation of OpenFlow agent service.
*/
@Component(immediate = true)
@Service
public class OFAgentManager extends ListenerRegistry<OFAgentEvent, OFAgentListener>
implements OFAgentService, OFAgentAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String MSG_OFAGENT = "OFAgent for network %s %s";
private static final String MSG_CREATED = "created";
private static final String MSG_UPDATED = "updated";
private static final String MSG_REMOVED = "removed";
private static final String MSG_STARTED = "started";
private static final String MSG_STOPPED = "stopped";
private static final String MSG_IN_STARTED = "is already in active state, do nothing";
private static final String MSG_IN_STOPPED = "is already in inactive state, do nothing";
private static final String ERR_NULL_OFAGENT = "OFAgent cannot be null";
private static final String ERR_NULL_NETID = "Network ID cannot be null";
private static final String ERR_NOT_EXIST = "does not exist";
private static final String ERR_IN_USE = "is in start state, stop the agent first";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VirtualNetworkService virtualNetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OFAgentStore ofAgentStore;
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
private final VirtualNetworkListener virtualNetListener = new InternalVirtualNetworkListener();
private final OFAgentStoreDelegate delegate = new InternalOFAgentStoreDelegate();
private ApplicationId appId;
private NodeId localId;
@Activate
protected void activate() {
appId = coreService.registerApplication(APPLICATION_NAME);
localId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
ofAgentStore.setDelegate(delegate);
virtualNetService.addListener(virtualNetListener);
leadershipService.addListener(leadershipListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
leadershipService.removeListener(leadershipListener);
virtualNetService.removeListener(virtualNetListener);
ofAgentStore.unsetDelegate(delegate);
ofAgentStore.ofAgents().stream()
.filter(ofAgent -> ofAgent.state() == STARTED)
.forEach(ofAgent -> stopAgent(ofAgent.networkId()));
eventExecutor.shutdown();
leadershipService.withdraw(appId.name());
log.info("Stopped");
}
@Override
public void createAgent(OFAgent ofAgent) {
checkNotNull(ofAgent, ERR_NULL_OFAGENT);
if (ofAgent.state() == STARTED) {
log.warn(String.format(MSG_OFAGENT, ofAgent.networkId(), ERR_IN_USE));
return;
}
// TODO check if the virtual network exists
ofAgentStore.createOfAgent(ofAgent);
log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_CREATED));
}
@Override
public void updateAgent(OFAgent ofAgent) {
checkNotNull(ofAgent, ERR_NULL_OFAGENT);
ofAgentStore.updateOfAgent(ofAgent);
log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_UPDATED));
}
@Override
public OFAgent removeAgent(NetworkId networkId) {
checkNotNull(networkId, ERR_NULL_NETID);
synchronized (this) {
OFAgent existing = ofAgentStore.ofAgent(networkId);
if (existing == null) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
throw new IllegalStateException(error);
}
if (existing.state() == STARTED) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_IN_USE);
throw new IllegalStateException(error);
}
log.info(String.format(MSG_OFAGENT, networkId, MSG_REMOVED));
return ofAgentStore.removeOfAgent(networkId);
}
}
@Override
public void startAgent(NetworkId networkId) {
checkNotNull(networkId, ERR_NULL_NETID);
synchronized (this) {
OFAgent existing = ofAgentStore.ofAgent(networkId);
if (existing == null) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
throw new IllegalStateException(error);
}
if (existing.state() == STARTED) {
final String error = String.format(MSG_OFAGENT, networkId, MSG_IN_STARTED);
throw new IllegalStateException(error);
}
OFAgent updated = DefaultOFAgent.builder()
.from(existing).state(STARTED).build();
ofAgentStore.updateOfAgent(updated);
log.info(String.format(MSG_OFAGENT, networkId, MSG_STARTED));
}
}
@Override
public void stopAgent(NetworkId networkId) {
checkNotNull(networkId, ERR_NULL_NETID);
synchronized (this) {
OFAgent existing = ofAgentStore.ofAgent(networkId);
if (existing == null) {
final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
throw new IllegalStateException(error);
}
if (existing.state() == STOPPED) {
final String error = String.format(MSG_OFAGENT, networkId, MSG_IN_STOPPED);
throw new IllegalStateException(error);
}
OFAgent updated = DefaultOFAgent.builder()
.from(existing).state(STOPPED).build();
ofAgentStore.updateOfAgent(updated);
log.info(String.format(MSG_OFAGENT, networkId, MSG_STOPPED));
}
}
@Override
public Set<OFAgent> agents() {
return ofAgentStore.ofAgents();
}
@Override
public OFAgent agent(NetworkId networkId) {
checkNotNull(networkId, ERR_NULL_NETID);
return ofAgentStore.ofAgent(networkId);
}
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
public boolean isRelevant(LeadershipEvent event) {
// TODO check if local node is relevant to the leadership change event
return false;
}
@Override
public void event(LeadershipEvent event) {
switch (event.type()) {
case LEADER_CHANGED:
case LEADER_AND_CANDIDATES_CHANGED:
// TODO handle leadership changed events -> restart agents?
default:
break;
}
}
}
private class InternalVirtualNetworkListener implements VirtualNetworkListener {
@Override
public boolean isRelevant(VirtualNetworkEvent event) {
// do not allow without leadership
return Objects.equals(localId, leadershipService.getLeader(appId.name()));
}
@Override
public void event(VirtualNetworkEvent event) {
switch (event.type()) {
case NETWORK_UPDATED:
// TODO handle virtual network stopped -> stop agent
break;
case NETWORK_REMOVED:
// TODO remove related OFAgent -> stop agent
break;
case NETWORK_ADDED:
case VIRTUAL_DEVICE_ADDED:
case VIRTUAL_DEVICE_UPDATED:
case VIRTUAL_DEVICE_REMOVED:
case VIRTUAL_PORT_ADDED:
case VIRTUAL_PORT_UPDATED:
case VIRTUAL_PORT_REMOVED:
default:
// do nothing
break;
}
}
}
private class InternalOFAgentStoreDelegate implements OFAgentStoreDelegate {
@Override
public void notify(OFAgentEvent event) {
if (event != null) {
log.trace("send ofagent event {}", event);
process(event);
}
}
}
}