blob: 362038f3f32759b9a5468bdcd40c6f9e39078fba [file] [log] [blame]
Ray Milkey18b44ac2014-08-22 08:29:47 -07001package net.onrc.onos.core.matchaction;
2
3import net.floodlightcontroller.core.IFloodlightProviderService;
4import net.floodlightcontroller.core.IOFSwitch;
5import net.floodlightcontroller.core.internal.OFMessageFuture;
6import net.floodlightcontroller.core.module.IFloodlightService;
7import net.floodlightcontroller.util.MACAddress;
8import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
9import net.onrc.onos.core.datagrid.IDatagridService;
10import net.onrc.onos.core.datagrid.IEventChannel;
11import net.onrc.onos.core.datagrid.IEventChannelListener;
12import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
13import net.onrc.onos.core.intent.FlowEntry;
14import net.onrc.onos.core.matchaction.action.Action;
15import net.onrc.onos.core.matchaction.action.OutputAction;
16import net.onrc.onos.core.matchaction.match.Match;
17import net.onrc.onos.core.matchaction.match.PacketMatch;
18import net.onrc.onos.core.util.Dpid;
19import net.onrc.onos.core.util.IdGenerator;
20import net.onrc.onos.core.util.SwitchPort;
21import org.apache.commons.lang3.tuple.Pair;
22import org.projectfloodlight.openflow.protocol.OFBarrierReply;
23import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26import java.util.ArrayList;
27import java.util.Collections;
28import java.util.EventListener;
29import java.util.HashMap;
30import java.util.HashSet;
31import java.util.List;
32import java.util.Map;
33import java.util.Set;
34import java.util.concurrent.ArrayBlockingQueue;
35import java.util.concurrent.BlockingQueue;
36import java.util.concurrent.ConcurrentHashMap;
37import java.util.concurrent.ConcurrentMap;
38import java.util.concurrent.ExecutionException;
39
40import static com.google.common.base.Preconditions.checkArgument;
41
42
43/**
44 * Manages Match-Action entries.
45 * <p>
46 * TODO: Make all methods thread-safe
47 */
48public class MatchActionComponent implements MatchActionService, IFloodlightService {
49
50 private static final Logger log = LoggerFactory.getLogger(MatchActionService.class);
51 IFlowPusherService pusher;
52 IFloodlightProviderService provider;
53
54 private ConcurrentMap<MatchActionId, MatchAction> matchActionMap = new ConcurrentHashMap<>();
55 private ConcurrentMap<MatchActionOperationsId, MatchActionOperations> matchSetMap =
56 new ConcurrentHashMap<>();
57 // TODO - want something better here for the resolved Queue
58 private BlockingQueue<MatchActionOperationsId> resolvedQueue = new ArrayBlockingQueue<>(100);
59 private BlockingQueue<MatchActionOperations> installationWorkQueue = new ArrayBlockingQueue<>(100);
60
61 private IEventChannel<String, MatchActionOperations> installSetChannel;
62 private IEventChannel<String, SwitchResultList> installSetReplyChannel;
63
64 // Convenience declarations to hide the name space collision on the Operator type
65 private static final net.onrc.onos.core.intent.IntentOperation.Operator INTENT_ADD_OP =
66 net.onrc.onos.core.intent.IntentOperation.Operator.ADD;
67 private static final net.onrc.onos.core.intent.IntentOperation.Operator INTENT_REMOVE_OP =
68 net.onrc.onos.core.intent.IntentOperation.Operator.REMOVE;
69
70 // TODO Single instance for now, should be a work queue of some sort eventually
71 private Thread coordinator;
72 private Thread installer;
73 private final IDatagridService datagrid;
74
75 public MatchActionComponent(final IDatagridService newDatagrid,
76 final IFlowPusherService newPusher,
77 final IFloodlightProviderService newProvider) {
78 datagrid = newDatagrid;
79 pusher = newPusher;
80 provider = newProvider;
81 }
82
83 public void start() {
84 installSetChannel = datagrid.createChannel("onos.matchaction.installSetChannel",
85 String.class,
86 MatchActionOperations.class);
87
88 installSetReplyChannel = datagrid.createChannel("onos.matchaction.installSetReplyChannel",
89 String.class,
90 SwitchResultList.class);
91
92 coordinator = new Coordinator();
93 coordinator.start();
94
95 installer = new InstallerWorker();
96 installer.start();
97 }
98
99 public MatchActionOperationsId installMatchActionOperations(MatchActionOperations matchSet) {
100 if (checkResolved(matchSet)) {
101 matchSet.setState(MatchActionOperationsState.RESOLVED);
102 } else {
103 matchSet.setState(MatchActionOperationsState.INIT);
104 }
105 matchSetMap.put(matchSet.getOperationsId(), matchSet);
106 if (matchSet.getState() == MatchActionOperationsState.RESOLVED) {
107 resolvedQueue.add(matchSet.getOperationsId());
108 }
109 return matchSet.getOperationsId();
110 }
111
112 public MatchActionOperationsState getMatchActionOperationsState(MatchActionOperationsId matchSetId) {
113 MatchActionOperations set = matchSetMap.get(matchSetId);
114 return (set == null) ? null : set.getState();
115 }
116
117 protected boolean checkResolved(MatchActionOperations matchSet) {
118 boolean resolved = true;
119 for (MatchActionOperationsId setId : matchSet.getDependencies()) {
120 MatchActionOperations set = matchSetMap.get(setId);
121 if (set == null || set.getState() != MatchActionOperationsState.RESOLVED) {
122 resolved = false;
123 break;
124 }
125 }
126 return resolved;
127 }
128
129 // TODO need operation too...
130 protected List<MatchAction> getMatchActions(final MatchActionOperations matchSet) {
131
132 final List<MatchAction> result = new ArrayList<>();
133 for (MatchActionOperationEntry op : matchSet.getOperations()) {
134 final MatchAction match = op.getTarget();
135
136 switch(op.getOperator()) {
137 case ADD:
138 matchActionMap.put(match.getId(), match);
139 break;
140
141 case REMOVE:
142 default:
143 throw new UnsupportedOperationException(
144 "Unsupported MatchAction operation" +
145 op.getOperator().toString());
146 }
147 result.add(match);
148 }
149 return result;
150 }
151
152 class Coordinator extends Thread
153 implements IEventChannelListener<String, SwitchResultList> {
154
155 private Map<MatchActionOperationsId, Map<Dpid, SwitchResult>> pendingMatchActionOperationss = new HashMap<>();
156
157 protected Coordinator() {
158 installSetReplyChannel.addListener(this);
159 }
160
161 @Override
162 public void run() {
163 while (true) {
164 // 1. Remove MatchActionOperations(s) from the Global Resolved Queue
165 try {
166 MatchActionOperationsId setId = resolvedQueue.take();
167 processSet(setId);
168 } catch (InterruptedException e) {
169 log.warn("Error taking from resolved queue: {}", e.getMessage());
170 }
171 }
172 }
173
174 private void processSet(MatchActionOperationsId setId) {
175 MatchActionOperations matchSet = matchSetMap.get(setId);
176 matchSet.setState(MatchActionOperationsState.PENDING);
177 matchSetMap.put(setId, matchSet);
178
179 // TODO apply updates to in-memory flow table and resolve conflicts
180 // TODO generate apply and undo sets, using MatchActionOperations for now...
181
182 // build pending switches set for coordinator tracking
183 Map<Dpid, SwitchResult> switches = new HashMap<>();
184 for (MatchAction match : getMatchActions(matchSet)) {
185 SwitchPort sw = match.getSwitchPort();
186 switches.put(sw.getDpid(), new SwitchResult(setId, sw.getDpid()));
187 }
188 pendingMatchActionOperationss.put(setId, switches);
189
190 // distribute apply/undo sets to cluster
191 //installSetChannel.addTransientEntry(setId.toString(), matchSet);
192 }
193
194 @Override
195 public void entryAdded(SwitchResultList value) {
196 updateSwitchResults(value);
197 }
198
199 @Override
200 public void entryRemoved(SwitchResultList value) {
201 // noop
202 }
203
204 @Override
205 public void entryUpdated(SwitchResultList value) {
206 updateSwitchResults(value);
207 }
208
209 private void updateSwitchResults(SwitchResultList results) {
210 if (results == null || results.size() == 0) {
211 return;
212 }
213 MatchActionOperationsId matchSetId = results.get(0).getMatchActionOperationsId();
214
215 // apply updates from results list
216 Map<Dpid, SwitchResult> resultMap = pendingMatchActionOperationss.get(matchSetId);
217 for (SwitchResult result : results) {
218 SwitchResult resultToUpdate = resultMap.get(result.getSwitch());
219 if (resultToUpdate != null) {
220 resultToUpdate.setStatus(result.getStatus());
221 }
222 // else {
223 // TODO error!
224 // }
225 }
226
227 // check to see the overall outcome of the install operation
228 SwitchResult.Status setResult = SwitchResult.Status.SUCCESS;
229 for (SwitchResult result : resultMap.values()) {
230 if (result.getStatus().equals(SwitchResult.Status.FAILURE)) {
231 setResult = SwitchResult.Status.FAILURE;
232 // if any switch fails, we fail the installation
233 break;
234 } else if (!setResult.equals(SwitchResult.Status.FAILURE)
235 && result.getStatus().equals(SwitchResult.Status.UNKNOWN)) {
236 setResult = SwitchResult.Status.UNKNOWN;
237 }
238 }
239 switch (setResult) {
240 case SUCCESS:
241 // mark MatchActionOperations as INSTALLED
242 MatchActionOperations matchSet = matchSetMap.get(matchSetId);
243 matchSet.setState(MatchActionOperationsState.INSTALLED);
244 matchSetMap.replace(matchSetId, matchSet);
245 pendingMatchActionOperationss.remove(matchSetId);
246
247 // TODO update dependent sets as needed
248 break;
249 case FAILURE:
250 // mark MatchActionOperations as FAILED
251 matchSet = matchSetMap.get(matchSetId);
252 matchSet.setState(MatchActionOperationsState.FAILED);
253 matchSetMap.replace(matchSetId, matchSet);
254
255 // TODO instruct installers to install Undo set
256 break;
257 case UNKNOWN:
258 default:
259 // noop, still waiting for results
260 // TODO: check to see if installers are dead after timeout
261 }
262 }
263 }
264
265
266 class InstallerWorker extends Thread {
267
268 // Note: we should consider using an alternative representation for
269 // apply sets
270 protected void install(MatchActionOperations matchSet) {
271 Map<Long, IOFSwitch> switches = provider.getSwitches();
272
273 Set<Pair<Dpid, FlowEntry>> entries = new HashSet<>();
274 Set<IOFSwitch> modifiedSwitches = new HashSet<>();
275
276 // convert flow entries and create pairs
277 for (MatchAction entry : getMatchActions(matchSet)) {
278 Dpid swDpid = entry.getSwitchPort().getDpid();
279 IOFSwitch sw = switches.get(swDpid.value());
280 if (sw == null) {
281 // no active switch, skip this flow entry
282 log.debug("Skipping flow entry: {}", entry);
283 continue;
284 }
285 final List<FlowEntry> flowEntries = getFlowEntry(entry);
286 for (final FlowEntry flowEntry : flowEntries) {
287 entries.add(Pair.of(swDpid, flowEntry));
288 }
289 modifiedSwitches.add(sw);
290 }
291
292 // push flow entries to switches
293 pusher.pushFlowEntries(entries);
294
295 // insert a barrier after each phase on each modifiedSwitch
296 // wait for confirmation messages before proceeding
297 List<Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
298 for (IOFSwitch sw : modifiedSwitches) {
299 barriers.add(Pair.of(sw, pusher.barrierAsync(new Dpid(sw.getId()))));
300 }
301 List<SwitchResult> switchResults = new ArrayList<>();
302 for (Pair<IOFSwitch, OFMessageFuture<OFBarrierReply>> pair : barriers) {
303 IOFSwitch sw = pair.getLeft();
304 OFMessageFuture<OFBarrierReply> future = pair.getRight();
305 SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(), new Dpid(
306 sw.getId()));
307 try {
308 future.get();
309 switchResult.setStatus(SwitchResult.Status.SUCCESS);
310 } catch (InterruptedException | ExecutionException e) {
311 log.error("Barrier message not received for sw: {}", sw);
312 switchResult.setStatus(SwitchResult.Status.FAILURE);
313 }
314 switchResults.add(switchResult);
315 }
316
317 // send update message to coordinator
318 // TODO: we might want to use another ID here, i.e. GUID, to avoid
319 // overlap
320 final SwitchResultList switchResultList = new SwitchResultList();
321 switchResultList.addAll(switchResults);
322 installSetReplyChannel.addTransientEntry(matchSet.getOperationsId().toString(),
323 switchResultList);
324 }
325
326 // TODO this should be removed when FlowPusher supports MatchAction
327 private List<FlowEntry> getFlowEntry(MatchAction matchAction) {
328 final Match match = matchAction.getMatch();
329 // Currently we only support Packet based matching
330 checkArgument(match instanceof PacketMatch);
331
332 final PacketMatch packetMatch = (PacketMatch) match;
333 final SwitchPort srcPort = matchAction.getSwitchPort();
334
335 final long switchId = srcPort.getDpid().value();
336 final long srcPortNumber = srcPort.getPortNumber().value();
337 final int srcIp = packetMatch.getSrcIpAddress().address().value();
338 final MACAddress srcMacAddress = packetMatch.getSrcMacAddress();
339 final int dstIp = packetMatch.getDstIpAddress().address().value();
340 final MACAddress dstMacAddress = packetMatch.getDstMacAddress();
341
342 final List<FlowEntry> result = new ArrayList<>();
343
344 for (final Action action : matchAction.getActions()) {
345 if (action instanceof OutputAction) {
346 final OutputAction outputAction = (OutputAction) action;
347 final long dstPortNumber =
348 outputAction.getPortNumber().value();
349
350
351 final FlowEntry entry = new FlowEntry(
352 switchId,
353 srcPortNumber,
354 dstPortNumber,
355 srcMacAddress,
356 dstMacAddress,
357 srcIp,
358 dstIp,
359 INTENT_ADD_OP
360 );
361 result.add(entry);
362 }
363 }
364
365 return result;
366 }
367
368 @Override
369 public void run() {
370 while (true) {
371 // 1. Remove MatchActionOperations(s) from the Global Resolved Queue
372 try {
373 MatchActionOperations operations = installationWorkQueue.take();
374 install(operations);
375 } catch (InterruptedException e) {
376 log.warn("Error taking from installation queue: {}", e.getMessage());
377 }
378 }
379 }
380 }
381
382 class Installer
383 implements IEventChannelListener<String, MatchActionOperations> {
384
385 protected Installer() {
386 installSetChannel.addListener(this);
387 }
388
389
390 @Override
391 public void entryAdded(MatchActionOperations value) {
392 installationWorkQueue.add(value);
393 }
394
395 @Override
396 public void entryRemoved(MatchActionOperations value) {
397 // noop
398 }
399
400 @Override
401 public void entryUpdated(MatchActionOperations value) {
402 installationWorkQueue.add(value);
403 }
404 }
405
406 private final HashSet<MatchAction> currentOperations = new HashSet<>();
407
408 private boolean processMatchActionEntries(
409 final List<MatchActionOperationEntry> entries) {
410 int successfulOperations = 0;
411 for (final MatchActionOperationEntry entry : entries) {
412 if (currentOperations.add(entry.getTarget())) {
413 successfulOperations++;
414 }
415 }
416 return entries.size() == successfulOperations;
417 }
418
419 @Override
420 public boolean addMatchAction(MatchAction matchAction) {
421 return false;
422 }
423
424 @Override
425 public Set<MatchAction> getMatchActions() {
426 return Collections.unmodifiableSet(currentOperations);
427 }
428
429 @Override
430 public boolean executeOperations(final MatchActionOperations operations) {
431 installMatchActionOperations(operations);
432 return processMatchActionEntries(operations.getOperations());
433 }
434
435 @Override
436 public void setConflictDetectionPolicy(ConflictDetectionPolicy policy) {
437 // TODO Auto-generated method stub
438
439 }
440
441 @Override
442 public ConflictDetectionPolicy getConflictDetectionPolicy() {
443 // TODO Auto-generated method stub
444 return null;
445 }
446
447 @Override
448 public void addEventListener(EventListener listener) {
449 // TODO Auto-generated method stub
450
451 }
452
453 @Override
454 public void removeEventListener(EventListener listener) {
455 // TODO Auto-generated method stub
456
457 }
458
459 @Override
460 public IdGenerator<MatchActionId> getMatchActionIdGenerator() {
461 return null;
462 }
463
464 @Override
465 public IdGenerator<MatchActionOperationsId> getMatchActionOperationsIdGenerator() {
466 return null;
467 }
468
469}