[ONOS-5451] Implement virtual switch and controller connection handler
- ONOS-5453 that implement OF message encoder/decoder is also covered
- When OFConnectionHandler.connect() is called, it connects to the tenant controller with given vSwitch
- OpenFlow sesstion establishment will covered with ONOS-5452
Change-Id: I0c69d0ceac5aa04590d41f5b26170939ef6f5268
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
index 0252049..8ee6f64 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
@@ -42,6 +42,4 @@
* @return true if the switch is connected, false otherwise
*/
boolean isConnected();
-
- // TODO add builder interface
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java
index a385e8b..c9d35e9 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java
@@ -15,10 +15,79 @@
*/
package org.onosproject.ofagent.api;
+import org.projectfloodlight.openflow.protocol.OFCapabilities;
+
+import java.util.Set;
+
/**
* Representation of capabilities of a virtual OpenFlow switch.
*/
public interface OFSwitchCapabilities {
- // TODO implement
+ /**
+ * Returns the capabilities of the switch.
+ *
+ * @return capabilities
+ */
+ Set<OFCapabilities> ofSwitchCapabilities();
+
+ interface Builder {
+
+ /**
+ * Builds a OFSwitchCapabilities object.
+ *
+ * @return OFSwitchCapabilities
+ */
+ OFSwitchCapabilities build();
+
+ /**
+ * Enable OFPC_FLOW_STATS capability.
+ *
+ * @return Builder object
+ */
+ Builder flowStats();
+
+ /**
+ * Enable OFPC_TABLE_STATS capability.
+ *
+ * @return Builder object
+ */
+ Builder tableStats();
+
+ /**
+ * Enable OFPC_PORT_STATS capability.
+ *
+ * @return Builder object
+ */
+ Builder portStats();
+
+ /**
+ * Enable OFPC_GROUP_STATS capability.
+ *
+ * @return Builder object
+ */
+ Builder groupStats();
+
+ /**
+ * Enable OFPC_IP_REASM capability.
+ *
+ * @return Builder object
+ */
+ Builder ipReasm();
+
+ /**
+ * Enable OFPC_QUEUE_STATS capability.
+ *
+ * @return Builder object
+ */
+ Builder queueStats();
+
+ /**
+ * Enable OFPC_PORT_BLOCKED capability.
+ *
+ * @return Builder object
+ */
+ Builder portBlocked();
+ }
+
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
index de925b5..4f33b51 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
@@ -121,4 +121,19 @@
* @param msg packet out message with lldp
*/
void processLldp(Channel channel, OFMessage msg);
+
+ /**
+ * Sends hello to the controller.
+ *
+ * @param channel received channel
+ */
+ void sendOfHello(Channel channel);
+
+ /**
+ * Processes echo request from the controllers.
+ *
+ * @param channel received channel
+ * @param msg echo request message
+ */
+ void processEchoRequest(Channel channel, OFMessage msg);
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
new file mode 100644
index 0000000..32956c4
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.ofagent.api.OFController;
+
+/**
+ * Implementation of tenant openflow controller.
+ */
+public class DefaultOFController implements OFController {
+ private IpAddress ip;
+ private TpPort port;
+
+ public DefaultOFController(IpAddress ip, TpPort port) {
+ this.ip = ip;
+ this.port = port;
+ }
+
+ @Override
+ public IpAddress ip() {
+ return ip;
+ }
+
+ @Override
+ public TpPort port() {
+ return port;
+ }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
index 01068c9..4258321 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
@@ -15,16 +15,27 @@
*/
package org.onosproject.ofagent.impl;
+import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.ofagent.api.OFSwitch;
import org.onosproject.ofagent.api.OFSwitchCapabilities;
import org.projectfloodlight.openflow.protocol.OFControllerRole;
+import org.projectfloodlight.openflow.protocol.OFEchoReply;
+import org.projectfloodlight.openflow.protocol.OFEchoRequest;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFeaturesReply;
+import org.projectfloodlight.openflow.protocol.OFHello;
import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFVersion;
+import org.projectfloodlight.openflow.types.DatapathId;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,33 +50,42 @@
private static final String ERR_CH_DUPLICATE = "Channel already exists: ";
private static final String ERR_CH_NOT_FOUND = "Channel not found: ";
+ private static final long NUM_BUFFERS = 1024;
+ private static final short NUM_TABLES = 3;
private final Device device;
private final OFSwitchCapabilities capabilities;
+ private final DatapathId datapathId;
private final ConcurrentHashMap<Channel, OFControllerRole> controllerRoleMap
= new ConcurrentHashMap<>();
- private DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) {
+ protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
+ private int handshakeTransactionIds;
+
+ public DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) {
this.device = device;
this.capabilities = capabilities;
+ datapathId = getDpidFromDeviceId(device.id());
+ handshakeTransactionIds = -1;
+
}
// TODO add builder
@Override
public Device device() {
- return null;
+ return device;
}
@Override
public OFSwitchCapabilities capabilities() {
- return null;
+ return capabilities;
}
@Override
public boolean isConnected() {
- return false;
+ return !controllerChannels().isEmpty();
}
@Override
@@ -162,10 +182,59 @@
@Override
public void processFeaturesRequest(Channel channel, OFMessage msg) {
// TODO process features request and send reply
+ List<OFMessage> ofMessageList = Lists.newArrayList();
+
+ OFFeaturesReply.Builder frBuilder = FACTORY.buildFeaturesReply()
+ .setDatapathId(datapathId)
+ .setNBuffers(NUM_BUFFERS)
+ .setNTables(NUM_TABLES)
+ .setCapabilities(capabilities.ofSwitchCapabilities())
+ .setXid(msg.getXid());
+
+ ofMessageList.add(frBuilder.build());
+ channel.write(ofMessageList);
+
}
@Override
public void processLldp(Channel channel, OFMessage msg) {
// TODO process lldp
}
+
+ @Override
+ public void sendOfHello(Channel channel) {
+ List<OFMessage> ofMessageList = Lists.newArrayList();
+ OFHello.Builder ofHello = FACTORY.buildHello()
+ .setXid(this.handshakeTransactionIds--);
+
+ ofMessageList.add(ofHello.build());
+ channel.write(ofMessageList);
+ }
+
+ @Override
+ public void processEchoRequest(Channel channel, OFMessage msg) {
+ List<OFMessage> ofMessageList = Lists.newArrayList();
+ OFEchoReply.Builder echoBuilder = FACTORY.buildEchoReply()
+ .setXid(msg.getXid())
+ .setData(((OFEchoRequest) msg).getData());
+
+ ofMessageList.add(echoBuilder.build());
+ channel.write(ofMessageList);
+ }
+
+ private DatapathId getDpidFromDeviceId(DeviceId deviceId) {
+ String deviceIdToString = deviceId.toString().split(":")[1];
+
+ assert (deviceIdToString.length() == 16);
+
+ String resultedHexString = new String();
+ for (int i = 0; i < 8; i++) {
+ resultedHexString = resultedHexString + deviceIdToString.charAt(2 * i)
+ + deviceIdToString.charAt(2 * i + 1);
+ if (i != 7) {
+ resultedHexString += ":";
+ }
+ }
+ return DatapathId.of(resultedHexString);
+ }
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitchCapabilities.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitchCapabilities.java
new file mode 100644
index 0000000..3ac0f11
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitchCapabilities.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.onosproject.ofagent.api.OFSwitchCapabilities;
+import org.projectfloodlight.openflow.protocol.OFCapabilities;
+
+import java.util.Set;
+
+/**
+ * Implementation of openflow switch capabilities.
+ */
+public final class DefaultOFSwitchCapabilities implements OFSwitchCapabilities {
+
+ private final Set<OFCapabilities> ofCapabilities;
+
+ private DefaultOFSwitchCapabilities(Set<OFCapabilities> ofSwitchCapabilities) {
+ this.ofCapabilities = ImmutableSet.copyOf(ofSwitchCapabilities);
+ }
+
+ @Override
+ public Set<OFCapabilities> ofSwitchCapabilities() {
+ return ofCapabilities;
+ }
+
+ /**
+ * Returns DefaultOFSwitchCapabilities builder object.
+ *
+ * @return DefaultOFSwitchCapabilities builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder implements OFSwitchCapabilities.Builder {
+ private Set<OFCapabilities> ofCapabilities;
+
+ private Builder() {
+ ofCapabilities = Sets.newHashSet();
+ }
+
+ @Override
+ public Builder flowStats() {
+ ofCapabilities.add(OFCapabilities.FLOW_STATS);
+ return this;
+ }
+
+ @Override
+ public Builder tableStats() {
+ ofCapabilities.add(OFCapabilities.TABLE_STATS);
+ return this;
+ }
+
+ @Override
+ public Builder portStats() {
+ ofCapabilities.add(OFCapabilities.PORT_STATS);
+ return this;
+ }
+
+ @Override
+ public Builder groupStats() {
+ ofCapabilities.add(OFCapabilities.GROUP_STATS);
+ return this;
+ }
+
+ @Override
+ public Builder ipReasm() {
+ ofCapabilities.add(OFCapabilities.IP_REASM);
+ return this;
+ }
+
+ @Override
+ public Builder queueStats() {
+ ofCapabilities.add(OFCapabilities.QUEUE_STATS);
+ return this;
+ }
+
+ @Override
+ public Builder portBlocked() {
+ ofCapabilities.add(OFCapabilities.PORT_BLOCKED);
+ return this;
+ }
+
+ @Override
+ public OFSwitchCapabilities build() {
+ return new DefaultOFSwitchCapabilities(ofCapabilities);
+ }
+ }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
index b077af4..38530737 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
@@ -15,15 +15,26 @@
*/
package org.onosproject.ofagent.impl;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.ReferenceCountUtil;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.osgi.ServiceDirectory;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.ofagent.api.OFSwitch;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
/**
* Implementation of OpenFlow channel handler.
* It processes OpenFlow message according to the channel state.
@@ -33,10 +44,12 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final OFSwitch ofSwitch;
- private Channel channel;
+ private ChannelHandlerContext ctx;
private ChannelState state;
+ protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
+ protected VirtualNetworkService vNetService;
- private enum ChannelState {
+ enum ChannelState {
INIT {
@Override
@@ -49,14 +62,38 @@
@Override
void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) {
- // TODO implement
+
+ switch (msg.getType()) {
+ case HELLO:
+ handler.setState(ChannelState.WAIT_FEATURE_REQUEST);
+ break;
+ default:
+ handler.illegalMessageReceived(msg);
+ break;
+ }
}
},
WAIT_FEATURE_REQUEST {
@Override
void processOFMessage(final OFChannelHandler handler,
final OFMessage msg) {
- // TODO implement
+
+ switch (msg.getType()) {
+ case FEATURES_REQUEST:
+ handler.ofSwitch.processFeaturesRequest(handler.ctx.channel(), msg);
+ handler.setState(ChannelState.ESTABLISHED);
+ break;
+ case ECHO_REQUEST:
+ handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+ break;
+ case ERROR:
+ handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+ break;
+ default:
+ handler.illegalMessageReceived(msg);
+ break;
+
+ }
}
},
ESTABLISHED {
@@ -65,9 +102,32 @@
final OFMessage msg) {
// TODO implement
// TODO add this channel to ofSwitch role service
+ switch (msg.getType()) {
+ case STATS_REQUEST:
+ //TODO implement
+ //TODO: use vNetService to build OFPortDesc.
+ break;
+ case SET_CONFIG:
+ //TODO implement
+ break;
+ case GET_CONFIG_REQUEST:
+ //TODO implement
+ break;
+ case BARRIER_REQUEST:
+ //TODO implement
+ break;
+ case ECHO_REQUEST:
+ handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+ break;
+ case ERROR:
+ handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+ break;
+ default:
+ handler.unhandledMessageReceived(msg);
+ break;
+ }
}
};
-
abstract void processOFMessage(final OFChannelHandler handler,
final OFMessage msg);
}
@@ -78,39 +138,91 @@
* @param ofSwitch openflow switch that owns this channel
*/
public OFChannelHandler(OFSwitch ofSwitch) {
+ super();
this.ofSwitch = ofSwitch;
+
+ setState(ChannelState.INIT);
+
+ ServiceDirectory services = new DefaultServiceDirectory();
+ vNetService = services.get(VirtualNetworkService.class);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- this.channel = ctx.channel();
+ this.ctx = ctx;
+ log.debug("Channel Active. Send OF_13 Hello to {}", ctx.channel().remoteAddress());
+
+ try {
+ ofSwitch.sendOfHello(ctx.channel());
+ setState(ChannelState.WAIT_HELLO);
+ } catch (Throwable cause) {
+ log.error("Exception occured because of{}", cause.getMessage());
+ }
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- try {
- OFMessage ofMsg = (OFMessage) msg;
- // TODO process OF message
- } finally {
- ReferenceCountUtil.release(msg);
+ try {
+ if (msg instanceof List) {
+ ((List) msg).forEach(ofm -> {
+ state.processOFMessage(this, (OFMessage) ofm);
+ });
+ } else {
+ state.processOFMessage(this, (OFMessage) msg);
+ }
+ } catch (Throwable cause) {
+ log.error("Exception occured {}", cause.getMessage());
}
+
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
- ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof ReadTimeoutException) {
+ log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage());
+ } else if (cause instanceof ClosedChannelException) {
+ log.error("ClosedChannelException occured");
+ return;
+ } else if (cause instanceof RejectedExecutionException) {
+ log.error("Could not process message: queue full");
+ } else if (cause instanceof IOException) {
+ log.error("IOException occured");
+ } else {
+ log.error("Error while processing message from switch {}", cause.getMessage());
+ }
ctx.close();
}
private void setState(ChannelState state) {
this.state = state;
}
+
+ private void logErrorClose(ChannelHandlerContext ctx, OFErrorMsg errorMsg) {
+ log.error("{} from switch {} in state {}",
+ errorMsg,
+ ofSwitch.device().id().toString(),
+ state);
+
+ log.error("Disconnecting...");
+ ctx.close();
+ }
+
+ private void illegalMessageReceived(OFMessage ofMessage) {
+ log.warn("Controller should never send this message {} in current state {}",
+ ofMessage.getType().toString(),
+ state);
+ }
+
+ private void unhandledMessageReceived(OFMessage ofMessage) {
+ log.warn("Unhandled message {} received in state {}. Ignored",
+ ofMessage.getType().toString(),
+ state);
+ }
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
index a3be413..e93b0c9 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
@@ -17,6 +17,7 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
import org.onosproject.ofagent.api.OFSwitch;
/**
@@ -25,6 +26,7 @@
public final class OFChannelInitializer extends ChannelInitializer<SocketChannel> {
private final OFSwitch ofSwitch;
+ private static final int READ_TIMEOUT = 30;
/**
* Default constructor.
@@ -38,6 +40,9 @@
@Override
protected void initChannel(SocketChannel ch) throws Exception {
- // TODO configure OF channel pipeline
+ ch.pipeline().addLast(new OFMessageDecoder())
+ .addLast(new OFMessageEncoder())
+ .addLast(new ReadTimeoutHandler(READ_TIMEOUT))
+ .addLast(new OFChannelHandler(ofSwitch));
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
index 27a6bc5..7f59cf5 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
@@ -15,14 +15,19 @@
*/
package org.onosproject.ofagent.impl;
+import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
import org.onosproject.ofagent.api.OFController;
import org.onosproject.ofagent.api.OFSwitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -33,12 +38,11 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final int MAX_RETRY = 10;
-
- private final AtomicInteger retryCount = new AtomicInteger();
+ private final AtomicInteger retryCount;
private final OFSwitch ofSwitch;
private final OFController controller;
- private final NioEventLoopGroup workGroup;
+ private final EventLoopGroup workGroup;
+ private static final int MAX_RETRY = 3;
/**
* Default constructor.
@@ -48,28 +52,43 @@
* @param workGroup work group for connection
*/
public OFConnectionHandler(OFSwitch ofSwitch, OFController controller,
- NioEventLoopGroup workGroup) {
+ EventLoopGroup workGroup) {
this.ofSwitch = ofSwitch;
this.controller = controller;
this.workGroup = workGroup;
+ this.retryCount = new AtomicInteger();
}
/**
* Creates a connection to the supplied controller.
+ *
*/
public void connect() {
- // TODO initiates a connection to the controller
+
+ SocketAddress remoteAddr = new InetSocketAddress(controller.ip().toInetAddress(), controller.port().toInt());
+
+ log.debug("Connecting to controller {}:{}", controller.ip(), controller.port());
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new OFChannelInitializer(ofSwitch));
+
+ bootstrap.connect(remoteAddr).addListener(this);
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
- log.debug("{} is connected to controller {}", ofSwitch.device().id(), controller);
- // TODO do something for a new connection if there's any
+ ofSwitch.addControllerChannel(future.channel());
+ log.debug("Connected to controller {}:{} for device {}",
+ controller.ip(), controller.port(), ofSwitch.device().id());
} else {
- log.debug("{} failed to connect {}, retry..", ofSwitch.device().id(), controller);
- // TODO retry connect if retry count is less than MAX
+ log.info("Failed to connect controller {}:{}. Retry...", controller.ip(), controller.port());
+ if (retryCount.getAndIncrement() < MAX_RETRY) {
+ this.connect();
+ }
}
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
index 79d0f4e..7e3d1d4 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
@@ -18,6 +18,9 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFMessageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,9 +36,18 @@
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
- log.trace("Received message from {}: {}", ctx.channel().remoteAddress(),
- in.readByte());
- // TODO decode byte message to OFMessage
+ if (!ctx.channel().isActive()) {
+ return;
+ }
+
+ try {
+ OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
+ OFMessage message = reader.readFrom(in);
+ out.add(message);
+ } catch (Throwable cause) {
+ log.error("Exception occured while processing decoding because of {}", cause.getMessage());
+ }
+
}
}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
index 1a67602..3d9f8ee 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
@@ -16,19 +16,50 @@
package org.onosproject.ofagent.impl;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Encodes OFMessage to a byte buffer.
*/
public final class OFMessageEncoder extends MessageToByteEncoder<Iterable<OFMessage>> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
@Override
protected void encode(ChannelHandlerContext ctx, Iterable<OFMessage> msgList, ByteBuf out)
throws Exception {
- // TODO encode OFMessage to ByteBuf
+ if (!ctx.channel().isActive()) {
+ return;
+ }
+
+ if (msgList instanceof Iterable) {
+ msgList.forEach(msg -> {
+ try {
+ ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer();
+ msg.writeTo(byteBuf);
+
+ ctx.writeAndFlush(byteBuf);
+ } catch (Exception e) {
+ log.error("error occured because of {}", e.getMessage());
+ }
+ });
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof EncoderException) {
+ log.error("Connection closed because of EncoderException {}", cause.getMessage());
+ ctx.close();
+ } else {
+ log.error("Exception occured while processing encoding because of {}", cause.getMessage());
+ ctx.close();
+ }
}
}