blob: 391c0026d49ce6d4bbb8997815a49c8a4b447b58 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2 * Copyright 2011, Big Switch Networks, Inc.
3 * Originally created by David Erickson, Stanford University
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may
6 * not use this file except in compliance with the License. You may obtain
7 * a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
16 **/
17
18package net.floodlightcontroller.core;
19
20import java.util.Collection;
21import java.util.HashMap;
22import java.util.HashSet;
23import java.util.Iterator;
24import java.util.Map;
25import java.util.Timer;
26import java.util.TimerTask;
27import java.util.concurrent.ConcurrentHashMap;
28import java.util.concurrent.ScheduledExecutorService;
29
30import org.jboss.netty.buffer.ChannelBuffer;
31import org.jboss.netty.buffer.ChannelBuffers;
32import org.openflow.protocol.OFFlowMod;
33import org.openflow.protocol.OFMessage;
34import org.openflow.protocol.OFPacketIn;
35import org.openflow.protocol.OFPacketOut;
36import org.openflow.protocol.OFType;
37import org.openflow.util.HexString;
38import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
41import java.util.List;
42import java.util.ArrayList;
43import org.apache.thrift.TException;
44import org.apache.thrift.transport.TFramedTransport;
45import org.apache.thrift.transport.TTransport;
46import org.apache.thrift.transport.TSocket;
47import org.apache.thrift.protocol.TBinaryProtocol;
48import org.apache.thrift.protocol.TProtocol;
49
50import net.floodlightcontroller.core.annotations.LogMessageCategory;
51import net.floodlightcontroller.core.annotations.LogMessageDoc;
52import net.floodlightcontroller.core.module.FloodlightModuleContext;
53import net.floodlightcontroller.core.module.FloodlightModuleException;
54import net.floodlightcontroller.core.module.IFloodlightModule;
55import net.floodlightcontroller.core.module.IFloodlightService;
56import net.floodlightcontroller.packet.Ethernet;
57import net.floodlightcontroller.packetstreamer.thrift.*;
58import net.floodlightcontroller.threadpool.IThreadPoolService;
59
60@LogMessageCategory("OpenFlow Message Tracing")
61public class OFMessageFilterManager
62 implements IOFMessageListener, IFloodlightModule, IOFMessageFilterManagerService {
63
64 /**
65 * @author Srini
66 */
67 protected static Logger log = LoggerFactory.getLogger(OFMessageFilterManager.class);
68
69 // The port and client reference for packet streaming
70 protected int serverPort = 9090;
71 protected final int MaxRetry = 1;
72 protected static TTransport transport = null;
73 protected static PacketStreamer.Client packetClient = null;
74
75 protected IFloodlightProviderService floodlightProvider = null;
76 protected IThreadPoolService threadPool = null;
77 // filter List is a key value pair. Key is the session id,
78 // value is the filter rules.
79 protected ConcurrentHashMap<String,
80 ConcurrentHashMap<String,
81 String>> filterMap = null;
82 protected ConcurrentHashMap<String, Long> filterTimeoutMap = null;
83 protected Timer timer = null;
84
85 protected int MAX_FILTERS=5;
86 protected long MAX_FILTER_TIME= 300000; // maximum filter time is 5 minutes.
87 protected int TIMER_INTERVAL = 1000; // 1 second time interval.
88
89 public static final String SUCCESS = "0";
90 public static final String FILTER_SETUP_FAILED = "-1001";
91 public static final String FILTER_NOT_FOUND = "-1002";
92 public static final String FILTER_LIMIT_REACHED = "-1003";
93 public static final String FILTER_SESSION_ID_NOT_FOUND = "-1004";
94 public static final String SERVICE_UNAVAILABLE = "-1005";
95
96 public enum FilterResult {
97 /*
98 * FILTER_NOT_DEFINED: Filter is not defined
99 * FILTER_NO_MATCH: Filter is defined and the packet doesn't
100 * match the filter
101 * FILTER_MATCH: Filter is defined and the packet matches
102 * the filter
103 */
104 FILTER_NOT_DEFINED, FILTER_NO_MATCH, FILTER_MATCH
105 }
106
107 protected String addFilter(ConcurrentHashMap<String,String> f, long delta) {
108
109 // Create unique session ID.
110 int prime = 33791;
111 String s = null;
112 int i;
113
114 if ((filterMap == null) || (filterTimeoutMap == null))
115 return String.format("%d", FILTER_SETUP_FAILED);
116
117 for (i=0; i<MAX_FILTERS; ++i) {
118 Integer x = prime + i;
119 s = String.format("%d", x.hashCode());
120 // implies you can use this key for session id.
121 if (!filterMap.containsKey(s)) break;
122 }
123
124 if (i==MAX_FILTERS) {
125 return FILTER_LIMIT_REACHED;
126 }
127
128 filterMap.put(s, f);
129 if (filterTimeoutMap.containsKey(s)) filterTimeoutMap.remove(s);
130 filterTimeoutMap.put(s, delta);
131
132 // set the timer as there will be no existing timers.
133 if (filterMap.size() == 1) {
134 TimeoutFilterTask task = new TimeoutFilterTask(this);
135 Timer timer = new Timer();
136 timer.schedule (task, TIMER_INTERVAL);
137 // Keep the listeners to avoid race condition
138 //startListening();
139 }
140 return s; // the return string is the session ID.
141 }
142
143 public String setupFilter(String sid,
144 ConcurrentHashMap<String,String> f,
145 int deltaInMilliSeconds) {
146
147 if (sid == null) {
148 // Delta in filter needs to be milliseconds
149 log.debug("Adding new filter: {} for {} ms", f, deltaInMilliSeconds);
150 return addFilter(f, deltaInMilliSeconds);
151 } else {// this is the session id.
152 // we will ignore the hash map features.
153 if (deltaInMilliSeconds > 0)
154 return refreshFilter(sid, deltaInMilliSeconds);
155 else
156 return deleteFilter(sid);
157 }
158 }
159
160 public int timeoutFilters() {
161 Iterator<String> i = filterTimeoutMap.keySet().iterator();
162
163 while(i.hasNext()) {
164 String s = i.next();
165
166 Long t = filterTimeoutMap.get(s);
167 if (t != null) {
168 i.remove();
169 t -= TIMER_INTERVAL;
170 if (t > 0) {
171 filterTimeoutMap.put(s, t);
172 } else deleteFilter(s);
173 } else deleteFilter(s);
174 }
175 return filterMap.size();
176 }
177
178 protected String refreshFilter(String s, int delta) {
179 Long t = filterTimeoutMap.get(s);
180 if (t != null) {
181 filterTimeoutMap.remove(s);
182 t += delta; // time is in milliseconds
183 if (t > MAX_FILTER_TIME) t = MAX_FILTER_TIME;
184 filterTimeoutMap.put(s, t);
185 return SUCCESS;
186 } else return FILTER_SESSION_ID_NOT_FOUND;
187 }
188
189 @LogMessageDoc(level="ERROR",
190 message="Error while terminating packet " +
191 "filter session",
192 explanation="An unknown error occurred while terminating " +
193 "a packet filter session.",
194 recommendation=LogMessageDoc.GENERIC_ACTION)
195 protected String deleteFilter(String sessionId) {
196
197 if (filterMap.containsKey(sessionId)) {
198 filterMap.remove(sessionId);
199 try {
200 if (packetClient != null)
201 packetClient.terminateSession(sessionId);
202 } catch (TException e) {
203 log.error("Error while terminating packet " +
204 "filter session", e);
205 }
206 log.debug("Deleted Filter {}. # of filters" +
207 " remaining: {}", sessionId, filterMap.size());
208 return SUCCESS;
209 } else return FILTER_SESSION_ID_NOT_FOUND;
210 }
211
212 public HashSet<String> getMatchedFilters(OFMessage m, FloodlightContext cntx) {
213
214 HashSet<String> matchedFilters = new HashSet<String>();
215
216 // This default function is written to match on packet ins and
217 // packet outs.
218 Ethernet eth = null;
219
220 if (m.getType() == OFType.PACKET_IN) {
221 eth = IFloodlightProviderService.bcStore.get(cntx,
222 IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
223 } else if (m.getType() == OFType.PACKET_OUT) {
224 eth = new Ethernet();
225 OFPacketOut p = (OFPacketOut) m;
226
227 // No MAC match if packetOut doesn't have the packet.
228 if (p.getPacketData() == null) return null;
229
230 eth.deserialize(p.getPacketData(), 0, p.getPacketData().length);
231 } else if (m.getType() == OFType.FLOW_MOD) {
232 // flow-mod can't be matched by mac.
233 return null;
234 }
235
236 if (eth == null) return null;
237
238 Iterator<String> filterIt = filterMap.keySet().iterator();
239 while (filterIt.hasNext()) { // for every filter
240 boolean filterMatch = false;
241 String filterSessionId = filterIt.next();
242 Map<String,String> filter = filterMap.get(filterSessionId);
243
244 // If the filter has empty fields, then it is not considered as a match.
245 if (filter == null || filter.isEmpty()) continue;
246 Iterator<String> fieldIt = filter.keySet().iterator();
247 while (fieldIt.hasNext()) {
248 String filterFieldType = fieldIt.next();
249 String filterFieldValue = filter.get(filterFieldType);
250 if (filterFieldType.equals("mac")) {
251
252 String srcMac = HexString.toHexString(eth.getSourceMACAddress());
253 String dstMac = HexString.toHexString(eth.getDestinationMACAddress());
254 log.debug("srcMac: {}, dstMac: {}", srcMac, dstMac);
255
256 if (filterFieldValue.equals(srcMac) ||
257 filterFieldValue.equals(dstMac)){
258 filterMatch = true;
259 } else {
260 filterMatch = false;
261 break;
262 }
263 }
264 }
265 if (filterMatch) {
266 matchedFilters.add(filterSessionId);
267 }
268 }
269
270 if (matchedFilters.isEmpty())
271 return null;
272 else
273 return matchedFilters;
274 }
275
276 @LogMessageDoc(level="ERROR",
277 message="Failed to establish connection with the " +
278 "packetstreamer server.",
279 explanation="The message tracing server is not running " +
280 "or otherwise unavailable.",
281 recommendation=LogMessageDoc.CHECK_CONTROLLER)
282 public boolean connectToPSServer() {
283 int numRetries = 0;
284 if (transport != null && transport.isOpen()) {
285 return true;
286 }
287
288 while (numRetries++ < MaxRetry) {
289 try {
290 transport = new TFramedTransport(new TSocket("localhost",
291 serverPort));
292 transport.open();
293
294 TProtocol protocol = new TBinaryProtocol(transport);
295 packetClient = new PacketStreamer.Client(protocol);
296
297 log.debug("Have a connection to packetstreamer server " +
298 "localhost:{}", serverPort);
299 break;
300 } catch (TException x) {
301 try {
302 // Wait for 1 second before retry
303 if (numRetries < MaxRetry) {
304 Thread.sleep(1000);
305 }
306 } catch (Exception e) {}
307 }
308 }
309
310 if (numRetries > MaxRetry) {
311 log.error("Failed to establish connection with the " +
312 "packetstreamer server.");
313 return false;
314 }
315 return true;
316 }
317
318 public void disconnectFromPSServer() {
319 if (transport != null && transport.isOpen()) {
320 log.debug("Close the connection to packetstreamer server" +
321 " localhost:{}", serverPort);
322 transport.close();
323 }
324 }
325
326 @Override
327 public String getName() {
328 return "messageFilterManager";
329 }
330
331 @Override
332 public boolean isCallbackOrderingPrereq(OFType type, String name) {
333 return (type == OFType.PACKET_IN && name.equals("devicemanager"));
334 }
335
336 @Override
337 public boolean isCallbackOrderingPostreq(OFType type, String name) {
338 return (type == OFType.PACKET_IN && name.equals("learningswitch"));
339 }
340
341 @Override
342 @LogMessageDoc(level="ERROR",
343 message="Error while sending packet",
344 explanation="Failed to send a message to the message " +
345 "tracing server",
346 recommendation=LogMessageDoc.CHECK_CONTROLLER)
347 public Command receive(IOFSwitch sw, OFMessage msg,
348 FloodlightContext cntx) {
349
350 if (filterMap == null || filterMap.isEmpty()) return Command.CONTINUE;
351
352 HashSet<String> matchedFilters = null;
353 if (log.isDebugEnabled()) {
354 log.debug("Received packet {} from switch {}",
355 msg, sw.getStringId());
356 }
357
358 matchedFilters = getMatchedFilters(msg, cntx);
359 if (matchedFilters == null) {
360 return Command.CONTINUE;
361 } else {
362 try {
363 sendPacket(matchedFilters, sw, msg, cntx, true);
364 } catch (Exception e) {
365 log.error("Error while sending packet", e);
366 }
367 }
368
369 return Command.CONTINUE;
370 }
371
372
373 public class TimeoutFilterTask extends TimerTask {
374
375 OFMessageFilterManager filterManager;
376 ScheduledExecutorService ses = threadPool.getScheduledExecutor();
377
378 public TimeoutFilterTask(OFMessageFilterManager manager) {
379 filterManager = manager;
380 }
381
382 public void run() {
383 int x = filterManager.timeoutFilters();
384
385 if (x > 0) { // there's at least one filter still active.
386 Timer timer = new Timer();
387 timer.schedule(new TimeoutFilterTask(filterManager),
388 TIMER_INTERVAL);
389 } else {
390 // Don't stop the listener to avoid race condition
391 //stopListening();
392 }
393 }
394 }
395
396 public int getNumberOfFilters() {
397 return filterMap.size();
398 }
399
400 public int getMaxFilterSize() {
401 return MAX_FILTERS;
402 }
403
404 protected void sendPacket(HashSet<String> matchedFilters, IOFSwitch sw,
405 OFMessage msg, FloodlightContext cntx, boolean sync)
406 throws TException {
407 Message sendMsg = new Message();
408 Packet packet = new Packet();
409 ChannelBuffer bb;
410 sendMsg.setPacket(packet);
411
412 List<String> sids = new ArrayList<String>(matchedFilters);
413
414 sendMsg.setSessionIDs(sids);
415 packet.setMessageType(OFMessageType.findByValue((msg.getType().ordinal())));
416
417 switch (msg.getType()) {
418 case PACKET_IN:
419 OFPacketIn pktIn = (OFPacketIn)msg;
420 packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
421 pktIn.getInPort()));
422 bb = ChannelBuffers.buffer(pktIn.getLength());
423 pktIn.writeTo(bb);
424 packet.setData(OFMessage.getData(sw, msg, cntx));
425 break;
426 case PACKET_OUT:
427 OFPacketOut pktOut = (OFPacketOut)msg;
428 packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
429 pktOut.getInPort()));
430 bb = ChannelBuffers.buffer(pktOut.getLength());
431 pktOut.writeTo(bb);
432 packet.setData(OFMessage.getData(sw, msg, cntx));
433 break;
434 case FLOW_MOD:
435 OFFlowMod offlowMod = (OFFlowMod)msg;
436 packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
437 offlowMod.
438 getOutPort()));
439 bb = ChannelBuffers.buffer(offlowMod.getLength());
440 offlowMod.writeTo(bb);
441 packet.setData(OFMessage.getData(sw, msg, cntx));
442 break;
443 default:
444 packet.setSwPortTuple(new SwitchPortTuple(sw.getId(),
445 (short)0));
446 String strData = "Unknown packet";
447 packet.setData(strData.getBytes());
448 break;
449 }
450
451 try {
452 if (transport == null ||
453 !transport.isOpen() ||
454 packetClient == null) {
455 if (!connectToPSServer()) {
456 // No need to sendPacket if can't make connection to
457 // the server
458 return;
459 }
460 }
461 if (sync) {
462 log.debug("Send packet sync: {}", packet.toString());
463 packetClient.pushMessageSync(sendMsg);
464 } else {
465 log.debug("Send packet sync: ", packet.toString());
466 packetClient.pushMessageAsync(sendMsg);
467 }
468 } catch (Exception e) {
469 log.error("Error while sending packet", e);
470 disconnectFromPSServer();
471 connectToPSServer();
472 }
473 }
474
475 // IFloodlightModule methods
476
477 @Override
478 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
479 Collection<Class<? extends IFloodlightService>> l =
480 new ArrayList<Class<? extends IFloodlightService>>();
481 l.add(IOFMessageFilterManagerService.class);
482 return l;
483 }
484
485 @Override
486 public Map<Class<? extends IFloodlightService>, IFloodlightService>
487 getServiceImpls() {
488 Map<Class<? extends IFloodlightService>,
489 IFloodlightService> m =
490 new HashMap<Class<? extends IFloodlightService>,
491 IFloodlightService>();
492 // We are the class that implements the service
493 m.put(IOFMessageFilterManagerService.class, this);
494 return m;
495 }
496
497 @Override
498 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
499 Collection<Class<? extends IFloodlightService>> l =
500 new ArrayList<Class<? extends IFloodlightService>>();
501 l.add(IFloodlightProviderService.class);
502 l.add(IThreadPoolService.class);
503 return l;
504 }
505
506 @Override
507 public void init(FloodlightModuleContext context)
508 throws FloodlightModuleException {
509 this.floodlightProvider =
510 context.getServiceImpl(IFloodlightProviderService.class);
511 this.threadPool =
512 context.getServiceImpl(IThreadPoolService.class);
513 }
514
515 @Override
516 public void startUp(FloodlightModuleContext context) {
517 // This is our 'constructor'
518
519 filterMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,String>>();
520 filterTimeoutMap = new ConcurrentHashMap<String, Long>();
521 serverPort =
522 Integer.parseInt(System.getProperty("net.floodlightcontroller." +
523 "packetstreamer.port", "9090"));
524
525 floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
526 floodlightProvider.addOFMessageListener(OFType.PACKET_OUT, this);
527 floodlightProvider.addOFMessageListener(OFType.FLOW_MOD, this);
528 }
529}