blob: 391c0026d49ce6d4bbb8997815a49c8a4b447b58 [file] [log] [blame]
/**
* Copyright 2011, Big Switch Networks, Inc.
* Originally created by David Erickson, Stanford University
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
**/
package net.floodlightcontroller.core;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketIn;
import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFType;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.ArrayList;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import net.floodlightcontroller.core.annotations.LogMessageCategory;
import net.floodlightcontroller.core.annotations.LogMessageDoc;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packetstreamer.thrift.*;
import net.floodlightcontroller.threadpool.IThreadPoolService;
@LogMessageCategory("OpenFlow Message Tracing")
public class OFMessageFilterManager
implements IOFMessageListener, IFloodlightModule, IOFMessageFilterManagerService {
/**
* @author Srini
*/
protected static Logger log = LoggerFactory.getLogger(OFMessageFilterManager.class);
// The port and client reference for packet streaming
protected int serverPort = 9090;
protected final int MaxRetry = 1;
protected static TTransport transport = null;
protected static PacketStreamer.Client packetClient = null;
protected IFloodlightProviderService floodlightProvider = null;
protected IThreadPoolService threadPool = null;
// filter List is a key value pair. Key is the session id,
// value is the filter rules.
protected ConcurrentHashMap<String,
ConcurrentHashMap<String,
String>> filterMap = null;
protected ConcurrentHashMap<String, Long> filterTimeoutMap = null;
protected Timer timer = null;
protected int MAX_FILTERS=5;
protected long MAX_FILTER_TIME= 300000; // maximum filter time is 5 minutes.
protected int TIMER_INTERVAL = 1000; // 1 second time interval.
public static final String SUCCESS = "0";
public static final String FILTER_SETUP_FAILED = "-1001";
public static final String FILTER_NOT_FOUND = "-1002";
public static final String FILTER_LIMIT_REACHED = "-1003";
public static final String FILTER_SESSION_ID_NOT_FOUND = "-1004";
public static final String SERVICE_UNAVAILABLE = "-1005";
public enum FilterResult {
/*
* FILTER_NOT_DEFINED: Filter is not defined
* FILTER_NO_MATCH: Filter is defined and the packet doesn't
* match the filter
* FILTER_MATCH: Filter is defined and the packet matches
* the filter
*/
FILTER_NOT_DEFINED, FILTER_NO_MATCH, FILTER_MATCH
}
protected String addFilter(ConcurrentHashMap<String,String> f, long delta) {
// Create unique session ID.
int prime = 33791;
String s = null;
int i;
if ((filterMap == null) || (filterTimeoutMap == null))
return String.format("%d", FILTER_SETUP_FAILED);
for (i=0; i<MAX_FILTERS; ++i) {
Integer x = prime + i;
s = String.format("%d", x.hashCode());
// implies you can use this key for session id.
if (!filterMap.containsKey(s)) break;
}
if (i==MAX_FILTERS) {
return FILTER_LIMIT_REACHED;
}
filterMap.put(s, f);
if (filterTimeoutMap.containsKey(s)) filterTimeoutMap.remove(s);
filterTimeoutMap.put(s, delta);
// set the timer as there will be no existing timers.
if (filterMap.size() == 1) {
TimeoutFilterTask task = new TimeoutFilterTask(this);
Timer timer = new Timer();
timer.schedule (task, TIMER_INTERVAL);
// Keep the listeners to avoid race condition
//startListening();
}
return s; // the return string is the session ID.
}
public String setupFilter(String sid,
ConcurrentHashMap<String,String> f,
int deltaInMilliSeconds) {
if (sid == null) {
// Delta in filter needs to be milliseconds
log.debug("Adding new filter: {} for {} ms", f, deltaInMilliSeconds);
return addFilter(f, deltaInMilliSeconds);
} else {// this is the session id.
// we will ignore the hash map features.
if (deltaInMilliSeconds > 0)
return refreshFilter(sid, deltaInMilliSeconds);
else
return deleteFilter(sid);
}
}
public int timeoutFilters() {
Iterator<String> i = filterTimeoutMap.keySet().iterator();
while(i.hasNext()) {
String s = i.next();
Long t = filterTimeoutMap.get(s);
if (t != null) {
i.remove();
t -= TIMER_INTERVAL;
if (t > 0) {
filterTimeoutMap.put(s, t);
} else deleteFilter(s);
} else deleteFilter(s);
}
return filterMap.size();
}
protected String refreshFilter(String s, int delta) {
Long t = filterTimeoutMap.get(s);
if (t != null) {
filterTimeoutMap.remove(s);
t += delta; // time is in milliseconds
if (t > MAX_FILTER_TIME) t = MAX_FILTER_TIME;
filterTimeoutMap.put(s, t);
return SUCCESS;
} else return FILTER_SESSION_ID_NOT_FOUND;
}
@LogMessageDoc(level="ERROR",
message="Error while terminating packet " +
"filter session",
explanation="An unknown error occurred while terminating " +
"a packet filter session.",
recommendation=LogMessageDoc.GENERIC_ACTION)
protected String deleteFilter(String sessionId) {
if (filterMap.containsKey(sessionId)) {
filterMap.remove(sessionId);
try {
if (packetClient != null)
packetClient.terminateSession(sessionId);
} catch (TException e) {
log.error("Error while terminating packet " +
"filter session", e);
}
log.debug("Deleted Filter {}. # of filters" +
" remaining: {}", sessionId, filterMap.size());
return SUCCESS;
} else return FILTER_SESSION_ID_NOT_FOUND;
}
public HashSet<String> getMatchedFilters(OFMessage m, FloodlightContext cntx) {
HashSet<String> matchedFilters = new HashSet<String>();
// This default function is written to match on packet ins and
// packet outs.
Ethernet eth = null;
if (m.getType() == OFType.PACKET_IN) {
eth = IFloodlightProviderService.bcStore.get(cntx,
IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
} else if (m.getType() == OFType.PACKET_OUT) {
eth = new Ethernet();
OFPacketOut p = (OFPacketOut) m;
// No MAC match if packetOut doesn't have the packet.
if (p.getPacketData() == null) return null;
eth.deserialize(p.getPacketData(), 0, p.getPacketData().length);
} else if (m.getType() == OFType.FLOW_MOD) {
// flow-mod can't be matched by mac.
return null;
}
if (eth == null) return null;
Iterator<String> filterIt = filterMap.keySet().iterator();
while (filterIt.hasNext()) { // for every filter
boolean filterMatch = false;
String filterSessionId = filterIt.next();
Map<String,String> filter = filterMap.get(filterSessionId);
// If the filter has empty fields, then it is not considered as a match.
if (filter == null || filter.isEmpty()) continue;
Iterator<String> fieldIt = filter.keySet().iterator();
while (fieldIt.hasNext()) {
String filterFieldType = fieldIt.next();
String filterFieldValue = filter.get(filterFieldType);
if (filterFieldType.equals("mac")) {
String srcMac = HexString.toHexString(eth.getSourceMACAddress());
String dstMac = HexString.toHexString(eth.getDestinationMACAddress());
log.debug("srcMac: {}, dstMac: {}", srcMac, dstMac);
if (filterFieldValue.equals(srcMac) ||
filterFieldValue.equals(dstMac)){
filterMatch = true;
} else {
filterMatch = false;
break;
}
}
}
if (filterMatch) {
matchedFilters.add(filterSessionId);
}
}
if (matchedFilters.isEmpty())
return null;
else
return matchedFilters;
}
@LogMessageDoc(level="ERROR",
message="Failed to establish connection with the " +
"packetstreamer server.",
explanation="The message tracing server is not running " +
"or otherwise unavailable.",
recommendation=LogMessageDoc.CHECK_CONTROLLER)
public boolean connectToPSServer() {
int numRetries = 0;
if (transport != null && transport.isOpen()) {
return true;
}
while (numRetries++ < MaxRetry) {
try {
transport = new TFramedTransport(new TSocket("localhost",
serverPort));
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
packetClient = new PacketStreamer.Client(protocol);
log.debug("Have a connection to packetstreamer server " +
"localhost:{}", serverPort);
break;
} catch (TException x) {
try {
// Wait for 1 second before retry
if (numRetries < MaxRetry) {
Thread.sleep(1000);
}
} catch (Exception e) {}
}
}
if (numRetries > MaxRetry) {
log.error("Failed to establish connection with the " +
"packetstreamer server.");
return false;
}
return true;
}
public void disconnectFromPSServer() {
if (transport != null && transport.isOpen()) {
log.debug("Close the connection to packetstreamer server" +
" localhost:{}", serverPort);
transport.close();
}
}
@Override
public String getName() {
return "messageFilterManager";
}
@Override
public boolean isCallbackOrderingPrereq(OFType type, String name) {
return (type == OFType.PACKET_IN && name.equals("devicemanager"));
}
@Override
public boolean isCallbackOrderingPostreq(OFType type, String name) {
return (type == OFType.PACKET_IN && name.equals("learningswitch"));
}
@Override
@LogMessageDoc(level="ERROR",
message="Error while sending packet",
explanation="Failed to send a message to the message " +
"tracing server",
recommendation=LogMessageDoc.CHECK_CONTROLLER)
public Command receive(IOFSwitch sw, OFMessage msg,
FloodlightContext cntx) {
if (filterMap == null || filterMap.isEmpty()) return Command.CONTINUE;
HashSet<String> matchedFilters = null;
if (log.isDebugEnabled()) {
log.debug("Received packet {} from switch {}",
msg, sw.getStringId());
}
matchedFilters = getMatchedFilters(msg, cntx);
if (matchedFilters == null) {
return Command.CONTINUE;
} else {
try {
sendPacket(matchedFilters, sw, msg, cntx, true);
} catch (Exception e) {
log.error("Error while sending packet", e);
}
}
return Command.CONTINUE;
}
public class TimeoutFilterTask extends TimerTask {
OFMessageFilterManager filterManager;
ScheduledExecutorService ses = threadPool.getScheduledExecutor();
public TimeoutFilterTask(OFMessageFilterManager manager) {
filterManager = manager;
}
public void run() {
int x = filterManager.timeoutFilters();
if (x > 0) { // there's at least one filter still active.
Timer timer = new Timer();
timer.schedule(new TimeoutFilterTask(filterManager),
TIMER_INTERVAL);
} else {
// Don't stop the listener to avoid race condition
//stopListening();
}
}
}
public int getNumberOfFilters() {
return filterMap.size();
}
public int getMaxFilterSize() {
return MAX_FILTERS;
}
protected void sendPacket(HashSet<String> matchedFilters, IOFSwitch sw,
OFMessage msg, FloodlightContext cntx, boolean sync)
throws TException {
Message sendMsg = new Message();
Packet packet = new Packet();
ChannelBuffer bb;
sendMsg.setPacket(packet);
List<String> sids = new ArrayList<String>(matchedFilters);
sendMsg.setSessionIDs(sids);
packet.setMessageType(OFMessageType.findByValue((msg.getType().ordinal())));
switch (msg.getType()) {
case PACKET_IN:
OFPacketIn pktIn = (OFPacketIn)msg;
packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
pktIn.getInPort()));
bb = ChannelBuffers.buffer(pktIn.getLength());
pktIn.writeTo(bb);
packet.setData(OFMessage.getData(sw, msg, cntx));
break;
case PACKET_OUT:
OFPacketOut pktOut = (OFPacketOut)msg;
packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
pktOut.getInPort()));
bb = ChannelBuffers.buffer(pktOut.getLength());
pktOut.writeTo(bb);
packet.setData(OFMessage.getData(sw, msg, cntx));
break;
case FLOW_MOD:
OFFlowMod offlowMod = (OFFlowMod)msg;
packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
offlowMod.
getOutPort()));
bb = ChannelBuffers.buffer(offlowMod.getLength());
offlowMod.writeTo(bb);
packet.setData(OFMessage.getData(sw, msg, cntx));
break;
default:
packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
(short)0));
String strData = "Unknown packet";
packet.setData(strData.getBytes());
break;
}
try {
if (transport == null ||
!transport.isOpen() ||
packetClient == null) {
if (!connectToPSServer()) {
// No need to sendPacket if can't make connection to
// the server
return;
}
}
if (sync) {
log.debug("Send packet sync: {}", packet.toString());
packetClient.pushMessageSync(sendMsg);
} else {
log.debug("Send packet sync: ", packet.toString());
packetClient.pushMessageAsync(sendMsg);
}
} catch (Exception e) {
log.error("Error while sending packet", e);
disconnectFromPSServer();
connectToPSServer();
}
}
// IFloodlightModule methods
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IOFMessageFilterManagerService.class);
return l;
}
@Override
public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
Map<Class<? extends IFloodlightService>,
IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
// We are the class that implements the service
m.put(IOFMessageFilterManagerService.class, this);
return m;
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(IThreadPoolService.class);
return l;
}
@Override
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
this.floodlightProvider =
context.getServiceImpl(IFloodlightProviderService.class);
this.threadPool =
context.getServiceImpl(IThreadPoolService.class);
}
@Override
public void startUp(FloodlightModuleContext context) {
// This is our 'constructor'
filterMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,String>>();
filterTimeoutMap = new ConcurrentHashMap<String, Long>();
serverPort =
Integer.parseInt(System.getProperty("net.floodlightcontroller." +
"packetstreamer.port", "9090"));
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
floodlightProvider.addOFMessageListener(OFType.PACKET_OUT, this);
floodlightProvider.addOFMessageListener(OFType.FLOW_MOD, this);
}
}