ONOS-5450 Initial implementation of OFAgent
- Refactored OFAgent immutable
- Added OFAgentStore and OFAgentEvent
- Implemented OFAgentManager and OFSwitchManager
- Added unit tests
Change-Id: Ie39ad2db9e6bd6259a062371b3ffe116b8c8cc52
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java
index 6a6ba06..cac1e03 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFAgent.java
@@ -15,111 +15,134 @@
*/
package org.onosproject.ofagent.impl;
-import io.netty.channel.nio.NioEventLoopGroup;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
-import org.onosproject.net.flow.FlowRuleEvent;
-import org.onosproject.net.flow.FlowRuleListener;
-import org.onosproject.net.packet.PacketContext;
-import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFController;
-import org.onosproject.ofagent.api.OFSwitch;
-import java.util.Map;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Implementation of OF agent.
+ * Implementation of OpenFlow agent.
*/
public final class DefaultOFAgent implements OFAgent {
private final NetworkId networkId;
- private final Map<Class<?>, Object> services;
private final Set<OFController> controllers;
- private final ExecutorService eventExecutor;
- private final NioEventLoopGroup ioWorker;
-
- private final ConcurrentHashMap<DeviceId, OFSwitch> switchMap = new ConcurrentHashMap<>();
- private final DeviceListener deviceListener = new InternalDeviceListener();
- private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
- private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+ private final State state;
private DefaultOFAgent(NetworkId networkId,
- Map<Class<?>, Object> services,
Set<OFController> controllers,
- ExecutorService eventExecutor,
- NioEventLoopGroup ioWorker) {
+ State state) {
this.networkId = networkId;
- this.services = services;
this.controllers = controllers;
- this.eventExecutor = eventExecutor;
- this.ioWorker = ioWorker;
+ this.state = state;
}
@Override
public NetworkId networkId() {
- return null;
+ return networkId;
}
@Override
public Set<OFController> controllers() {
- return null;
+ return controllers;
}
@Override
- public void start() {
- // TODO add listeners to the services
- // TODO connect all virtual devices in this network to the controllers
+ public State state() {
+ return state;
}
@Override
- public void stop() {
- // TODO remove listeners from the services
- // TODO disconnect all active connections
+ public int hashCode() {
+ return Objects.hash(networkId);
}
- private void connect(OFSwitch ofSwitch, OFController controller) {
- // TODO connect the switch to the controller
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof DefaultOFAgent) {
+ DefaultOFAgent that = (DefaultOFAgent) obj;
+ if (Objects.equals(networkId, that.networkId)) {
+ return true;
+ }
+ }
+ return false;
}
- private void disconnect(OFSwitch ofSwitch, OFController controller) {
- // TODO disconnect the controller from the ofSwitch
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("networkId", this.networkId)
+ .add("controllers", this.controllers)
+ .add("state", this.state)
+ .toString();
}
- private class InternalFlowRuleListener implements FlowRuleListener {
+ /**
+ * Returns new builder instance.
+ *
+ * @return default ofagent builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Returns new builder instance from the existing agent.
+ *
+ * @param ofAgent the existing agent
+ * @return default ofagent builder
+ */
+ public static Builder builder(OFAgent ofAgent) {
+ return new Builder()
+ .networkId(ofAgent.networkId())
+ .controllers(ofAgent.controllers())
+ .state(ofAgent.state());
+ }
+
+ public static final class Builder implements OFAgent.Builder {
+
+ private NetworkId networkId;
+ private Set<OFController> controllers;
+ private State state;
+
+ private Builder() {
+ }
@Override
- public void event(FlowRuleEvent event) {
- // TODO handle flow rule event
- }
- }
+ public OFAgent build() {
+ checkNotNull(networkId, "Network ID cannot be null");
+ checkNotNull(state, "State cannot be null");
+ controllers = controllers == null ? ImmutableSet.of() : controllers;
- private class InternalDeviceListener implements DeviceListener {
+ return new DefaultOFAgent(networkId, controllers, state);
+ }
@Override
- public void event(DeviceEvent event) {
- // TODO handle device event
- // device detected: connect the device to controllers
- // device removed: disconnect and remove the switch from the map
- // device state available: connect the switch to the controllers
- // device state unavailable: disconnect the switch from the controllers
- // port added: send out features reply
- // port status change
+ public Builder networkId(NetworkId networkId) {
+ this.networkId = networkId;
+ return this;
}
- }
-
- private class InternalPacketProcessor implements PacketProcessor {
@Override
- public void process(PacketContext context) {
- // TODO handle packet-in
+ public Builder controllers(Set<OFController> controllers) {
+ this.controllers = controllers;
+ return this;
+ }
+
+ @Override
+ public Builder state(State state) {
+ this.state = state;
+ return this;
}
}
-
- // TODO implement builder
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
index 32956c4..310c5a4 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
@@ -15,22 +15,41 @@
*/
package org.onosproject.ofagent.impl;
+import com.google.common.base.MoreObjects;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.ofagent.api.OFController;
-/**
- * Implementation of tenant openflow controller.
- */
-public class DefaultOFController implements OFController {
- private IpAddress ip;
- private TpPort port;
+import java.util.Objects;
- public DefaultOFController(IpAddress ip, TpPort port) {
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the default OpenFlow controller.
+ */
+public final class DefaultOFController implements OFController {
+
+ private final IpAddress ip;
+ private final TpPort port;
+
+ private DefaultOFController(IpAddress ip, TpPort port) {
this.ip = ip;
this.port = port;
}
+ /**
+ * Returns new OpenFlow controller with the supplied IP and port.
+ *
+ * @param ip ip address
+ * @param port port number
+ * @return openflow controller
+ */
+ public static DefaultOFController of(IpAddress ip, TpPort port) {
+ checkNotNull(ip, "Controller IP address cannot be null");
+ checkNotNull(port, "Controller port address cannot be null");
+ return new DefaultOFController(ip, port);
+ }
+
@Override
public IpAddress ip() {
return ip;
@@ -40,4 +59,33 @@
public TpPort port() {
return port;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, port);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof DefaultOFController) {
+ DefaultOFController that = (DefaultOFController) obj;
+ if (Objects.equals(ip, that.ip) &&
+ Objects.equals(port, that.port)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("ip", this.ip)
+ .add("port", this.port)
+ .toString();
+ }
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
index 4258321..878fae0 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
@@ -15,10 +15,8 @@
*/
package org.onosproject.ofagent.impl;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.packet.InboundPacket;
@@ -35,7 +33,7 @@
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.types.DatapathId;
-import java.util.List;
+import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,7 +42,7 @@
import static org.projectfloodlight.openflow.protocol.OFControllerRole.*;
/**
- * Implementation of OF switch.
+ * Implementation of the default OpenFlow switch.
*/
public final class DefaultOFSwitch implements OFSwitch {
@@ -53,49 +51,34 @@
private static final long NUM_BUFFERS = 1024;
private static final short NUM_TABLES = 3;
- private final Device device;
+ private final DatapathId dpId;
private final OFSwitchCapabilities capabilities;
- private final DatapathId datapathId;
private final ConcurrentHashMap<Channel, OFControllerRole> controllerRoleMap
= new ConcurrentHashMap<>();
+ private static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
- protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
- private int handshakeTransactionIds;
+ private int handshakeTransactionIds = -1;
- public DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) {
- this.device = device;
+ private DefaultOFSwitch(DatapathId dpid, OFSwitchCapabilities capabilities) {
+ this.dpId = dpid;
this.capabilities = capabilities;
- datapathId = getDpidFromDeviceId(device.id());
- handshakeTransactionIds = -1;
-
}
- // TODO add builder
+ public static DefaultOFSwitch of(DatapathId dpid, OFSwitchCapabilities capabilities) {
+ checkNotNull(dpid, "DPID cannot be null");
+ checkNotNull(capabilities, "OF capabilities cannot be null");
+ return new DefaultOFSwitch(dpid, capabilities);
+ }
@Override
- public Device device() {
- return device;
+ public DatapathId dpid() {
+ return this.dpId;
}
@Override
public OFSwitchCapabilities capabilities() {
- return capabilities;
- }
-
- @Override
- public boolean isConnected() {
- return !controllerChannels().isEmpty();
- }
-
- @Override
- public void started() {
- // TODO do some initial setups
- }
-
- @Override
- public void stopped() {
- // TODO implement
+ return this.capabilities;
}
@Override
@@ -136,7 +119,7 @@
@Override
public Set<Channel> controllerChannels() {
- return null;
+ return ImmutableSet.copyOf(controllerRoleMap.keySet());
}
@Override
@@ -181,19 +164,14 @@
@Override
public void processFeaturesRequest(Channel channel, OFMessage msg) {
- // TODO process features request and send reply
- List<OFMessage> ofMessageList = Lists.newArrayList();
-
- OFFeaturesReply.Builder frBuilder = FACTORY.buildFeaturesReply()
- .setDatapathId(datapathId)
+ OFFeaturesReply ofFeaturesReply = FACTORY.buildFeaturesReply()
+ .setDatapathId(dpId)
.setNBuffers(NUM_BUFFERS)
.setNTables(NUM_TABLES)
.setCapabilities(capabilities.ofSwitchCapabilities())
- .setXid(msg.getXid());
-
- ofMessageList.add(frBuilder.build());
- channel.write(ofMessageList);
-
+ .setXid(msg.getXid())
+ .build();
+ channel.writeAndFlush(Collections.singletonList(ofFeaturesReply));
}
@Override
@@ -203,38 +181,18 @@
@Override
public void sendOfHello(Channel channel) {
- List<OFMessage> ofMessageList = Lists.newArrayList();
- OFHello.Builder ofHello = FACTORY.buildHello()
- .setXid(this.handshakeTransactionIds--);
-
- ofMessageList.add(ofHello.build());
- channel.write(ofMessageList);
+ OFHello ofHello = FACTORY.buildHello()
+ .setXid(this.handshakeTransactionIds--)
+ .build();
+ channel.writeAndFlush(Collections.singletonList(ofHello));
}
@Override
public void processEchoRequest(Channel channel, OFMessage msg) {
- List<OFMessage> ofMessageList = Lists.newArrayList();
- OFEchoReply.Builder echoBuilder = FACTORY.buildEchoReply()
+ OFEchoReply ofEchoReply = FACTORY.buildEchoReply()
.setXid(msg.getXid())
- .setData(((OFEchoRequest) msg).getData());
-
- ofMessageList.add(echoBuilder.build());
- channel.write(ofMessageList);
- }
-
- private DatapathId getDpidFromDeviceId(DeviceId deviceId) {
- String deviceIdToString = deviceId.toString().split(":")[1];
-
- assert (deviceIdToString.length() == 16);
-
- String resultedHexString = new String();
- for (int i = 0; i < 8; i++) {
- resultedHexString = resultedHexString + deviceIdToString.charAt(2 * i)
- + deviceIdToString.charAt(2 * i + 1);
- if (i != 7) {
- resultedHexString += ":";
- }
- }
- return DatapathId.of(resultedHexString);
+ .setData(((OFEchoRequest) msg).getData())
+ .build();
+ channel.writeAndFlush(Collections.singletonList(ofEchoReply));
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java
new file mode 100644
index 0000000..9f1f88f
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DistributedOFAgentStore.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentEvent.Type;
+import org.onosproject.ofagent.api.OFAgentStore;
+import org.onosproject.ofagent.api.OFAgentStoreDelegate;
+import org.onosproject.ofagent.api.OFController;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
+import static org.onosproject.ofagent.api.OFAgentEvent.Type.*;
+import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the {@link OFAgentStore} with consistent map.
+ */
+@Service
+@Component(immediate = true)
+public class DistributedOFAgentStore extends AbstractStore<OFAgentEvent, OFAgentStoreDelegate>
+ implements OFAgentStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final String ERR_NOT_FOUND = " does not exist";
+ private static final String ERR_DUPLICATE = " already exists";
+
+ private static final KryoNamespace SERIALIZER_OFAGENT = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(OFAgent.class)
+ .register(OFAgent.State.class)
+ .register(NetworkId.class)
+ .register(DefaultOFAgent.class)
+ .register(OFController.class)
+ .register(DefaultOFController.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final MapEventListener<NetworkId, OFAgent> ofAgentMapListener = new OFAgentMapListener();
+
+ private ConsistentMap<NetworkId, OFAgent> ofAgentStore;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(APPLICATION_NAME);
+ ofAgentStore = storageService.<NetworkId, OFAgent>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_OFAGENT))
+ .withName("ofagentstore")
+ .withApplicationId(appId)
+ .build();
+ ofAgentStore.addListener(ofAgentMapListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ ofAgentStore.removeListener(ofAgentMapListener);
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void createOfAgent(OFAgent ofAgent) {
+ ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
+ final String error = ofAgent.networkId() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return ofAgent;
+ });
+ }
+
+ @Override
+ public void updateOfAgent(OFAgent ofAgent) {
+ ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
+ final String error = ofAgent.networkId() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return ofAgent;
+ });
+ }
+
+ @Override
+ public OFAgent removeOfAgent(NetworkId networkId) {
+ Versioned<OFAgent> ofAgent = ofAgentStore.remove(networkId);
+ return ofAgent == null ? null : ofAgent.value();
+ }
+
+ @Override
+ public OFAgent ofAgent(NetworkId networkId) {
+ Versioned<OFAgent> ofAgent = ofAgentStore.get(networkId);
+ return ofAgent == null ? null : ofAgent.value();
+ }
+
+ @Override
+ public Set<OFAgent> ofAgents() {
+ Set<OFAgent> ofAgents = ofAgentStore.values().stream()
+ .map(Versioned::value)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(ofAgents);
+ }
+
+ private class OFAgentMapListener implements MapEventListener<NetworkId, OFAgent> {
+
+ @Override
+ public void event(MapEvent<NetworkId, OFAgent> event) {
+ switch (event.type()) {
+ case INSERT:
+ eventExecutor.execute(() -> {
+ log.debug("OFAgent for network {} created", event.key());
+ notifyDelegate(new OFAgentEvent(
+ Type.OFAGENT_CREATED,
+ event.newValue().value()));
+ });
+ break;
+ case UPDATE:
+ eventExecutor.execute(() -> {
+ log.debug("OFAgent for network {} updated", event.key());
+ processUpdated(event.oldValue().value(), event.newValue().value());
+ });
+ break;
+ case REMOVE:
+ eventExecutor.execute(() -> {
+ log.debug("OFAgent for network {} removed", event.key());
+ notifyDelegate(new OFAgentEvent(
+ Type.OFAGENT_REMOVED,
+ event.oldValue().value()));
+ });
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void processUpdated(OFAgent oldValue, OFAgent newValue) {
+ if (!oldValue.controllers().equals(newValue.controllers())) {
+ oldValue.controllers().stream()
+ .filter(controller -> !newValue.controllers().contains(controller))
+ .forEach(controller -> notifyDelegate(new OFAgentEvent(
+ OFAGENT_CONTROLLER_REMOVED,
+ newValue,
+ controller)
+ ));
+
+ newValue.controllers().stream()
+ .filter(controller -> !oldValue.controllers().contains(controller))
+ .forEach(controller -> notifyDelegate(new OFAgentEvent(
+ OFAGENT_CONTROLLER_ADDED,
+ newValue,
+ controller
+ )));
+ }
+
+ if (oldValue.state() != newValue.state()) {
+ Type eventType = newValue.state() == STARTED ? OFAGENT_STARTED : OFAGENT_STOPPED;
+ notifyDelegate(new OFAgentEvent(eventType, newValue));
+ }
+ }
+ }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java
index 3005f73..ce7c30f 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFAgentManager.java
@@ -15,99 +15,258 @@
*/
package org.onosproject.ofagent.impl;
-import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentAdminService;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFAgentService;
-import org.onosproject.ofagent.api.OFController;
+import org.onosproject.ofagent.api.OFAgentStore;
+import org.onosproject.ofagent.api.OFAgentStoreDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
+import static org.onosproject.ofagent.api.OFAgent.State.STOPPED;
/**
* Implementation of OpenFlow agent service.
*/
@Component(immediate = true)
@Service
-public class OFAgentManager implements OFAgentService {
+public class OFAgentManager extends ListenerRegistry<OFAgentEvent, OFAgentListener>
+ implements OFAgentService, OFAgentAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
- // TODO make it to be configurable with component config
- private static final int NUM_OF_THREADS = 1;
- private final ExecutorService eventExecutor = newFixedThreadPool(
- NUM_OF_THREADS,
- groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private static final String MSG_OFAGENT = "OFAgent for network %s %s";
+ private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
+ private static final String MSG_REMOVED = "removed";
+ private static final String MSG_IN_STARTED = "is already in active state, do nothing";
+ private static final String MSG_IN_STOPPED = "is already in inactive state, do nothing";
- // TODO change it to ConsistentMap and support multi-instance mode
- private ConcurrentHashMap<NetworkId, OFAgent> agentMap = new ConcurrentHashMap<>();
- private NioEventLoopGroup ioWorker;
+ private static final String ERR_NULL_OFAGENT = "OFAgent cannot be null";
+ private static final String ERR_NULL_NETID = "Network ID cannot be null";
+ private static final String ERR_NOT_EXIST = "does not exist";
+ private static final String ERR_IN_USE = "is in start state, stop the agent first";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VirtualNetworkService virtualNetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OFAgentStore ofAgentStore;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
+ private final VirtualNetworkListener virtualNetListener = new InternalVirtualNetworkListener();
+ private final OFAgentStoreDelegate delegate = new InternalOFAgentStoreDelegate();
+
+ private ApplicationId appId;
+ private NodeId localId;
@Activate
protected void activate() {
- // TODO listen to the virtual network event
- ioWorker = new NioEventLoopGroup();
+ appId = coreService.registerApplication(APPLICATION_NAME);
+ localId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+
+ ofAgentStore.setDelegate(delegate);
+ virtualNetService.addListener(virtualNetListener);
+ leadershipService.addListener(leadershipListener);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
- ioWorker.shutdownGracefully();
+ leadershipService.removeListener(leadershipListener);
+ virtualNetService.removeListener(virtualNetListener);
+ ofAgentStore.unsetDelegate(delegate);
+ ofAgentStore.ofAgents().forEach(ofAgent -> stopAgent(ofAgent.networkId()));
+
eventExecutor.shutdown();
+ leadershipService.withdraw(appId.name());
+
log.info("Stopped");
}
@Override
- public Set<OFAgent> agents() {
- // TODO return existing agents
- return null;
+ public void createAgent(OFAgent ofAgent) {
+ checkNotNull(ofAgent, ERR_NULL_OFAGENT);
+ if (ofAgent.state() == STARTED) {
+ log.warn(String.format(MSG_OFAGENT, ofAgent.networkId(), ERR_IN_USE));
+ return;
+ }
+ ofAgentStore.createOfAgent(ofAgent);
+ log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_CREATED));
}
@Override
- public void createAgent(NetworkId networkId, OFController... controllers) {
- // TODO create OFAgent instance with the given network ID, controllers
- // TODO and device, flowRule, link, and packet service for the virtual network
- // TODO start the OFAgent only if the virtual network state is active
+ public void updateAgent(OFAgent ofAgent) {
+ checkNotNull(ofAgent, ERR_NULL_OFAGENT);
+ ofAgentStore.updateOfAgent(ofAgent);
+ log.info(String.format(MSG_OFAGENT, ofAgent.networkId(), MSG_UPDATED));
}
@Override
public void removeAgent(NetworkId networkId) {
- // TODO stop and remove the OFAgent for the network
+ checkNotNull(networkId, ERR_NULL_NETID);
+ synchronized (this) {
+ OFAgent existing = ofAgentStore.ofAgent(networkId);
+ if (existing == null) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
+ throw new IllegalStateException(error);
+ }
+ if (existing.state() == STARTED) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_IN_USE);
+ throw new IllegalStateException(error);
+ }
+ ofAgentStore.removeOfAgent(networkId);
+ log.info(String.format(MSG_OFAGENT, networkId, MSG_REMOVED));
+ }
}
@Override
public void startAgent(NetworkId networkId) {
- // TODO starts the agent for the network
+ checkNotNull(networkId, ERR_NULL_NETID);
+ synchronized (this) {
+ OFAgent existing = ofAgentStore.ofAgent(networkId);
+ if (existing == null) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
+ throw new IllegalStateException(error);
+ }
+ if (existing.state() == STARTED) {
+ log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STARTED));
+ return;
+ }
+ OFAgent updated = DefaultOFAgent.builder(existing).state(STARTED).build();
+ ofAgentStore.updateOfAgent(updated);
+ }
}
@Override
public void stopAgent(NetworkId networkId) {
- // TODO stops the agent for the network
+ checkNotNull(networkId, ERR_NULL_NETID);
+ synchronized (this) {
+ OFAgent existing = ofAgentStore.ofAgent(networkId);
+ if (existing == null) {
+ final String error = String.format(MSG_OFAGENT, networkId, ERR_NOT_EXIST);
+ throw new IllegalStateException(error);
+ }
+ if (existing.state() == STOPPED) {
+ log.warn(String.format(MSG_OFAGENT, networkId, MSG_IN_STOPPED));
+ return;
+ }
+ OFAgent updated = DefaultOFAgent.builder(existing).state(STOPPED).build();
+ ofAgentStore.updateOfAgent(updated);
+ }
}
@Override
- public boolean isActive(NetworkId networkId) {
- // TODO manage the OF agent status
- return false;
+ public Set<OFAgent> agents() {
+ return ofAgentStore.ofAgents();
+ }
+
+ @Override
+ public OFAgent agent(NetworkId networkId) {
+ checkNotNull(networkId, ERR_NULL_NETID);
+ return ofAgentStore.ofAgent(networkId);
+ }
+
+ private class InternalLeadershipListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ // TODO check if local node is relevant to the leadership change event
+ return false;
+ }
+
+ @Override
+ public void event(LeadershipEvent event) {
+ switch (event.type()) {
+ case LEADER_CHANGED:
+ case LEADER_AND_CANDIDATES_CHANGED:
+ // TODO handle leadership changed events -> restart agents?
+ default:
+ break;
+ }
+ }
}
private class InternalVirtualNetworkListener implements VirtualNetworkListener {
@Override
+ public boolean isRelevant(VirtualNetworkEvent event) {
+ // do not allow without leadership
+ return Objects.equals(localId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
public void event(VirtualNetworkEvent event) {
- // TODO handle virtual network start and stop
+ switch (event.type()) {
+ case NETWORK_UPDATED:
+ // TODO handle virtual network stopped -> stop agent
+ break;
+ case NETWORK_REMOVED:
+ // TODO remove related OFAgent -> stop agent
+ break;
+ case NETWORK_ADDED:
+ case VIRTUAL_DEVICE_ADDED:
+ case VIRTUAL_DEVICE_UPDATED:
+ case VIRTUAL_DEVICE_REMOVED:
+ case VIRTUAL_PORT_ADDED:
+ case VIRTUAL_PORT_UPDATED:
+ case VIRTUAL_PORT_REMOVED:
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ private class InternalOFAgentStoreDelegate implements OFAgentStoreDelegate {
+
+ @Override
+ public void notify(OFAgentEvent event) {
+ if (event != null) {
+ log.trace("send ofagent event {}", event);
+ process(event);
+ }
}
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
index 38530737..6514b3c 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
@@ -15,39 +15,35 @@
*/
package org.onosproject.ofagent.impl;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.ReadTimeoutException;
-import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.osgi.ServiceDirectory;
-import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFSwitch;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
-import org.projectfloodlight.openflow.protocol.OFFactories;
-import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.projectfloodlight.openflow.protocol.OFVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
-import java.util.List;
import java.util.concurrent.RejectedExecutionException;
+import static org.onosproject.ofagent.impl.OFChannelHandler.ChannelState.INIT;
+
/**
* Implementation of OpenFlow channel handler.
* It processes OpenFlow message according to the channel state.
*/
public final class OFChannelHandler extends ChannelDuplexHandler {
+ private static final String MSG_CHANNEL_STATE = "Set channel(%s) state: %s";
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final OFSwitch ofSwitch;
- private ChannelHandlerContext ctx;
+ private Channel channel;
private ChannelState state;
- protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
- protected VirtualNetworkService vNetService;
enum ChannelState {
@@ -62,7 +58,6 @@
@Override
void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) {
-
switch (msg.getType()) {
case HELLO:
handler.setState(ChannelState.WAIT_FEATURE_REQUEST);
@@ -77,17 +72,16 @@
@Override
void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) {
-
switch (msg.getType()) {
case FEATURES_REQUEST:
- handler.ofSwitch.processFeaturesRequest(handler.ctx.channel(), msg);
+ handler.ofSwitch.processFeaturesRequest(handler.channel, msg);
handler.setState(ChannelState.ESTABLISHED);
break;
case ECHO_REQUEST:
- handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+ handler.ofSwitch.processEchoRequest(handler.channel, msg);
break;
case ERROR:
- handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+ handler.logErrorClose((OFErrorMsg) msg);
break;
default:
handler.illegalMessageReceived(msg);
@@ -117,10 +111,10 @@
//TODO implement
break;
case ECHO_REQUEST:
- handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+ handler.ofSwitch.processEchoRequest(handler.channel, msg);
break;
case ERROR:
- handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+ handler.logErrorClose((OFErrorMsg) msg);
break;
default:
handler.unhandledMessageReceived(msg);
@@ -128,8 +122,8 @@
}
}
};
- abstract void processOFMessage(final OFChannelHandler handler,
- final OFMessage msg);
+
+ abstract void processOFMessage(final OFChannelHandler handler, final OFMessage msg);
}
/**
@@ -140,47 +134,36 @@
public OFChannelHandler(OFSwitch ofSwitch) {
super();
this.ofSwitch = ofSwitch;
-
- setState(ChannelState.INIT);
-
- ServiceDirectory services = new DefaultServiceDirectory();
- vNetService = services.get(VirtualNetworkService.class);
+ setState(INIT);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- this.ctx = ctx;
- log.debug("Channel Active. Send OF_13 Hello to {}", ctx.channel().remoteAddress());
-
+ this.channel = ctx.channel();
+ // FIXME move this to channel handler and add channel when OF handshake is done
+ ofSwitch.addControllerChannel(channel);
try {
- ofSwitch.sendOfHello(ctx.channel());
+ ofSwitch.sendOfHello(channel);
+ log.trace("Send OF_13 Hello to {}", channel.remoteAddress());
setState(ChannelState.WAIT_HELLO);
- } catch (Throwable cause) {
- log.error("Exception occured because of{}", cause.getMessage());
+ } catch (Exception ex) {
+ log.error("Failed sending OF_13 Hello to {} for {}", channel.remoteAddress(), ex.getMessage());
}
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
+ ofSwitch.deleteControllerChannel(channel);
+ log.info("Device {} disconnected from controller {}", ofSwitch.dpid(), channel.remoteAddress());
+ }
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
- if (msg instanceof List) {
- ((List) msg).forEach(ofm -> {
- state.processOFMessage(this, (OFMessage) ofm);
- });
- } else {
- state.processOFMessage(this, (OFMessage) msg);
- }
- } catch (Throwable cause) {
- log.error("Exception occured {}", cause.getMessage());
+ state.processOFMessage(this, (OFMessage) msg);
+ } catch (Exception ex) {
+ ctx.fireExceptionCaught(ex);
}
-
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx)
- throws Exception {
}
@Override
@@ -188,41 +171,40 @@
if (cause instanceof ReadTimeoutException) {
log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage());
} else if (cause instanceof ClosedChannelException) {
- log.error("ClosedChannelException occured");
+ log.error("ClosedChannelException occurred");
return;
} else if (cause instanceof RejectedExecutionException) {
log.error("Could not process message: queue full");
} else if (cause instanceof IOException) {
- log.error("IOException occured");
+ log.error("IOException occurred");
} else {
log.error("Error while processing message from switch {}", cause.getMessage());
}
- ctx.close();
+ channel.close();
}
private void setState(ChannelState state) {
this.state = state;
+ if (state != INIT) {
+ log.debug(String.format(MSG_CHANNEL_STATE, channel.remoteAddress(), state.name()));
+ }
}
- private void logErrorClose(ChannelHandlerContext ctx, OFErrorMsg errorMsg) {
+ private void logErrorClose(OFErrorMsg errorMsg) {
log.error("{} from switch {} in state {}",
errorMsg,
- ofSwitch.device().id().toString(),
+ ofSwitch.dpid(),
state);
-
- log.error("Disconnecting...");
- ctx.close();
+ channel.close();
}
private void illegalMessageReceived(OFMessage ofMessage) {
log.warn("Controller should never send this message {} in current state {}",
- ofMessage.getType().toString(),
- state);
+ ofMessage.getType(), state);
}
private void unhandledMessageReceived(OFMessage ofMessage) {
- log.warn("Unhandled message {} received in state {}. Ignored",
- ofMessage.getType().toString(),
- state);
+ log.warn("Unexpected message {} received in state {}",
+ ofMessage.getType(), state);
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
index e93b0c9..58df71c 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
@@ -39,7 +39,6 @@
@Override
protected void initChannel(SocketChannel ch) throws Exception {
-
ch.pipeline().addLast(new OFMessageDecoder())
.addLast(new OFMessageEncoder())
.addLast(new ReadTimeoutHandler(READ_TIMEOUT))
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
index 7f59cf5..ed897ff 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
@@ -38,10 +38,17 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String MSG_STATE = "Device %s %s to controller %s:%s";
+ private static final String MSG_CONNECTING = "connecting";
+ private static final String MSG_CONNECTED = "connected";
+ private static final String MSG_FAILED = "failed to connect";
+
private final AtomicInteger retryCount;
private final OFSwitch ofSwitch;
private final OFController controller;
private final EventLoopGroup workGroup;
+
+ // TODO make this value configurable
private static final int MAX_RETRY = 3;
/**
@@ -61,32 +68,40 @@
/**
* Creates a connection to the supplied controller.
- *
*/
public void connect() {
-
- SocketAddress remoteAddr = new InetSocketAddress(controller.ip().toInetAddress(), controller.port().toInt());
-
- log.debug("Connecting to controller {}:{}", controller.ip(), controller.port());
+ SocketAddress remoteAddr = new InetSocketAddress(
+ controller.ip().toInetAddress(), controller.port().toInt());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new OFChannelInitializer(ofSwitch));
+ log.debug(String.format(MSG_STATE,
+ ofSwitch.dpid(),
+ MSG_CONNECTING,
+ controller.ip(),
+ controller.port()));
bootstrap.connect(remoteAddr).addListener(this);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
-
if (future.isSuccess()) {
- ofSwitch.addControllerChannel(future.channel());
- log.debug("Connected to controller {}:{} for device {}",
- controller.ip(), controller.port(), ofSwitch.device().id());
+ log.info(String.format(MSG_STATE,
+ ofSwitch.dpid(),
+ MSG_CONNECTED,
+ controller.ip(),
+ controller.port()));
} else {
- log.info("Failed to connect controller {}:{}. Retry...", controller.ip(), controller.port());
- if (retryCount.getAndIncrement() < MAX_RETRY) {
+ if (retryCount.getAndIncrement() > MAX_RETRY) {
+ log.warn(String.format(MSG_STATE,
+ ofSwitch.dpid(),
+ MSG_FAILED,
+ controller.ip(),
+ controller.port()));
+ } else {
this.connect();
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
index 7e3d1d4..0e07d41 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
@@ -32,22 +32,20 @@
public final class OFMessageDecoder extends ByteToMessageDecoder {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
-
if (!ctx.channel().isActive()) {
return;
}
try {
- OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
OFMessage message = reader.readFrom(in);
out.add(message);
} catch (Throwable cause) {
- log.error("Exception occured while processing decoding because of {}", cause.getMessage());
+ log.error("Failed decode OF message for {}", cause.getMessage());
}
-
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
index 3d9f8ee..67d0ccc 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
@@ -16,50 +16,24 @@
package org.onosproject.ofagent.impl;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Encodes OFMessage to a byte buffer.
*/
public final class OFMessageEncoder extends MessageToByteEncoder<Iterable<OFMessage>> {
- private final Logger log = LoggerFactory.getLogger(getClass());
@Override
protected void encode(ChannelHandlerContext ctx, Iterable<OFMessage> msgList, ByteBuf out)
throws Exception {
-
if (!ctx.channel().isActive()) {
return;
}
- if (msgList instanceof Iterable) {
- msgList.forEach(msg -> {
- try {
- ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer();
- msg.writeTo(byteBuf);
-
- ctx.writeAndFlush(byteBuf);
- } catch (Exception e) {
- log.error("error occured because of {}", e.getMessage());
- }
- });
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- if (cause instanceof EncoderException) {
- log.error("Connection closed because of EncoderException {}", cause.getMessage());
- ctx.close();
- } else {
- log.error("Exception occured while processing encoding because of {}", cause.getMessage());
- ctx.close();
+ for (OFMessage ofm : msgList) {
+ ofm.writeTo(out);
}
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java
new file mode 100644
index 0000000..f522d68
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFSwitchManager.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.onosproject.ofagent.api.OFAgent;
+import org.onosproject.ofagent.api.OFAgentEvent;
+import org.onosproject.ofagent.api.OFAgentListener;
+import org.onosproject.ofagent.api.OFAgentService;
+import org.onosproject.ofagent.api.OFController;
+import org.onosproject.ofagent.api.OFSwitch;
+import org.onosproject.ofagent.api.OFSwitchCapabilities;
+import org.onosproject.ofagent.api.OFSwitchService;
+import org.projectfloodlight.openflow.types.DatapathId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
+
+/**
+ * Manages OF switches.
+ */
+@Component(immediate = true)
+@Service
+public class OFSwitchManager implements OFSwitchService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final OFSwitchCapabilities DEFAULT_CAPABILITIES = DefaultOFSwitchCapabilities.builder()
+ .flowStats()
+ .tableStats()
+ .portStats()
+ .groupStats()
+ .queueStats()
+ .ipReasm()
+ .portBlocked()
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VirtualNetworkService virtualNetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OFAgentService ofAgentService;
+
+ private final ConcurrentHashMap<DeviceId, OFSwitch> ofSwitchMap = new ConcurrentHashMap<>();
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+ private final OFAgentListener ofAgentListener = new InternalOFAgentListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
+ private final PacketProcessor packetProcessor = new InternalPacketProcessor();
+
+ private NioEventLoopGroup ioWorker;
+ private ApplicationId appId;
+ private NodeId localId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(APPLICATION_NAME);
+ localId = clusterService.getLocalNode().id();
+ ioWorker = new NioEventLoopGroup();
+ ofAgentService.addListener(ofAgentListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ ofAgentService.removeListener(ofAgentListener);
+ ofAgentService.agents().forEach(this::processOFAgentStopped);
+
+ ioWorker.shutdownGracefully();
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<OFSwitch> ofSwitches() {
+ return ImmutableSet.copyOf(ofSwitchMap.values());
+ }
+
+ @Override
+ public Set<OFSwitch> ofSwitches(NetworkId networkId) {
+ Set<OFSwitch> ofSwitches = devices(networkId).stream()
+ .map(ofSwitchMap::get)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(ofSwitches);
+ }
+
+ private void addOFSwitch(DeviceId deviceId) {
+ OFSwitch ofSwitch = DefaultOFSwitch.of(
+ dpidWithDeviceId(deviceId),
+ DEFAULT_CAPABILITIES);
+ ofSwitchMap.put(deviceId, ofSwitch);
+ log.debug("Added virtual OF switch for {}", deviceId);
+ // TODO connect controllers if the agent is in started state
+ }
+
+ private void deleteOFSwitch(DeviceId deviceId) {
+ // TODO disconnect switch if it has active connection
+ OFSwitch ofSwitch = ofSwitchMap.remove(deviceId);
+ if (ofSwitch != null) {
+ log.debug("Removed virtual OFSwitch for {}", deviceId);
+ }
+ }
+
+ private void connectController(OFSwitch ofSwitch, Set<OFController> controllers) {
+ controllers.forEach(controller -> {
+ OFConnectionHandler connectionHandler = new OFConnectionHandler(
+ ofSwitch,
+ controller,
+ ioWorker);
+ connectionHandler.connect();
+ });
+ log.debug("Connection requested for {}, controller:{}", ofSwitch.dpid(), controllers);
+ }
+
+ private void disconnectController(OFSwitch ofSwitch, Set<OFController> controllers) {
+ Set<SocketAddress> controllerAddrs = controllers.stream()
+ .map(ctrl -> new InetSocketAddress(ctrl.ip().toInetAddress(), ctrl.port().toInt()))
+ .collect(Collectors.toSet());
+
+ ofSwitch.controllerChannels().stream()
+ .filter(channel -> controllerAddrs.contains(channel.remoteAddress()))
+ .forEach(ChannelOutboundInvoker::disconnect);
+ log.debug("Disconnection requested for {}, controller:{}", ofSwitch.dpid(), controllers);
+ }
+
+ private Set<DeviceId> devices(NetworkId networkId) {
+ DeviceService deviceService = virtualNetService.get(
+ networkId,
+ DeviceService.class);
+ Set<DeviceId> deviceIds = Tools.stream(deviceService.getAvailableDevices())
+ .map(Device::id)
+ .collect(Collectors.toSet());
+ return ImmutableSet.copyOf(deviceIds);
+ }
+
+ private DatapathId dpidWithDeviceId(DeviceId deviceId) {
+ String strDeviceId = deviceId.toString().split(":")[1];
+ checkArgument(strDeviceId.length() == 16, "Invalid device ID " + strDeviceId);
+
+ String resultedHexString = "";
+ for (int i = 0; i < 8; i++) {
+ resultedHexString = resultedHexString + strDeviceId.charAt(2 * i)
+ + strDeviceId.charAt(2 * i + 1);
+ if (i != 7) {
+ resultedHexString += ":";
+ }
+ }
+ return DatapathId.of(resultedHexString);
+ }
+
+ private void processOFAgentCreated(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(this::addOFSwitch);
+ DeviceService deviceService = virtualNetService.get(
+ ofAgent.networkId(),
+ DeviceService.class);
+ deviceService.addListener(deviceListener);
+ }
+
+ private void processOFAgentRemoved(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(this::deleteOFSwitch);
+ DeviceService deviceService = virtualNetService.get(
+ ofAgent.networkId(),
+ DeviceService.class);
+ deviceService.removeListener(deviceListener);
+ }
+
+ private void processOFAgentStarted(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(deviceId -> {
+ OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
+ if (ofSwitch != null) {
+ connectController(ofSwitch, ofAgent.controllers());
+ }
+ });
+
+ PacketService packetService = virtualNetService.get(
+ ofAgent.networkId(),
+ PacketService.class);
+ packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
+
+ FlowRuleService flowRuleService = virtualNetService.get(
+ ofAgent.networkId(),
+ FlowRuleService.class);
+ flowRuleService.addListener(flowRuleListener);
+ }
+
+ private void processOFAgentStopped(OFAgent ofAgent) {
+ devices(ofAgent.networkId()).forEach(deviceId -> {
+ OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
+ if (ofSwitch != null) {
+ disconnectController(ofSwitch, ofAgent.controllers());
+ }
+ });
+
+ PacketService packetService = virtualNetService.get(
+ ofAgent.networkId(),
+ PacketService.class);
+ packetService.removeProcessor(packetProcessor);
+
+ FlowRuleService flowRuleService = virtualNetService.get(
+ ofAgent.networkId(),
+ FlowRuleService.class);
+ flowRuleService.removeListener(flowRuleListener);
+ }
+
+ private class InternalOFAgentListener implements OFAgentListener {
+
+ @Override
+ public boolean isRelevant(OFAgentEvent event) {
+ return Objects.equals(localId, leadershipService.getLeader(appId.name()));
+ }
+
+ @Override
+ public void event(OFAgentEvent event) {
+ switch (event.type()) {
+ case OFAGENT_CREATED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent created: {}", ofAgent);
+ processOFAgentCreated(ofAgent);
+ });
+ break;
+ case OFAGENT_REMOVED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent removed: {}", ofAgent);
+ processOFAgentRemoved(ofAgent);
+ });
+ break;
+ case OFAGENT_CONTROLLER_ADDED:
+ // TODO handle additional controller
+ break;
+ case OFAGENT_CONTROLLER_REMOVED:
+ // TODO handle removed controller
+ break;
+ case OFAGENT_STARTED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent started: {}", ofAgent);
+ processOFAgentStarted(ofAgent);
+ });
+ break;
+ case OFAGENT_STOPPED:
+ eventExecutor.execute(() -> {
+ OFAgent ofAgent = event.subject();
+ log.debug("Processing OFAgent stopped: {}", ofAgent);
+ processOFAgentStopped(ofAgent);
+ });
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ eventExecutor.execute(() -> {
+ Device device = event.subject();
+ log.debug("Processing device added: {}", device);
+ addOFSwitch(device.id());
+ });
+ break;
+ case DEVICE_REMOVED:
+ eventExecutor.execute(() -> {
+ Device device = event.subject();
+ log.debug("Processing device added: {}", device);
+ deleteOFSwitch(device.id());
+ });
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ // TODO handle event
+ break;
+ case DEVICE_UPDATED:
+ case DEVICE_SUSPENDED:
+ case PORT_ADDED:
+ // TODO handle event
+ case PORT_REMOVED:
+ // TODO handle event
+ case PORT_STATS_UPDATED:
+ case PORT_UPDATED:
+ default:
+ break;
+ }
+ }
+ }
+
+ private class InternalPacketProcessor implements PacketProcessor {
+
+ @Override
+ public void process(PacketContext context) {
+ // TODO handle packet-in
+ }
+ }
+
+ private class InternalFlowRuleListener implements FlowRuleListener {
+
+ @Override
+ public void event(FlowRuleEvent event) {
+ // TODO handle flow rule event
+ }
+ }
+}