blob: a4153162d74be4a70df2cad75c177e0d303342f1 [file] [log] [blame]
Ray Milkey18b44ac2014-08-22 08:29:47 -07001package net.onrc.onos.core.matchaction;
2
Ray Milkey18b44ac2014-08-22 08:29:47 -07003import java.util.ArrayList;
4import java.util.Collections;
5import java.util.EventListener;
6import java.util.HashMap;
7import java.util.HashSet;
8import java.util.List;
9import java.util.Map;
10import java.util.Set;
11import java.util.concurrent.ArrayBlockingQueue;
12import java.util.concurrent.BlockingQueue;
13import java.util.concurrent.ConcurrentHashMap;
14import java.util.concurrent.ConcurrentMap;
15import java.util.concurrent.ExecutionException;
16
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070017import net.floodlightcontroller.core.IFloodlightProviderService;
18import net.floodlightcontroller.core.internal.OFMessageFuture;
19import net.floodlightcontroller.core.module.IFloodlightService;
20import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
21import net.onrc.onos.core.datagrid.IDatagridService;
22import net.onrc.onos.core.datagrid.IEventChannel;
23import net.onrc.onos.core.datagrid.IEventChannelListener;
24import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
25import net.onrc.onos.core.util.Dpid;
26import net.onrc.onos.core.util.IdGenerator;
27import net.onrc.onos.core.util.SwitchPort;
28
29import org.apache.commons.lang3.tuple.Pair;
30import org.projectfloodlight.openflow.protocol.OFBarrierReply;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
Ray Milkey18b44ac2014-08-22 08:29:47 -070033
34
35/**
36 * Manages Match-Action entries.
37 * <p>
38 * TODO: Make all methods thread-safe
39 */
40public class MatchActionComponent implements MatchActionService, IFloodlightService {
41
42 private static final Logger log = LoggerFactory.getLogger(MatchActionService.class);
43 IFlowPusherService pusher;
44 IFloodlightProviderService provider;
45
46 private ConcurrentMap<MatchActionId, MatchAction> matchActionMap = new ConcurrentHashMap<>();
47 private ConcurrentMap<MatchActionOperationsId, MatchActionOperations> matchSetMap =
48 new ConcurrentHashMap<>();
49 // TODO - want something better here for the resolved Queue
50 private BlockingQueue<MatchActionOperationsId> resolvedQueue = new ArrayBlockingQueue<>(100);
51 private BlockingQueue<MatchActionOperations> installationWorkQueue = new ArrayBlockingQueue<>(100);
52
53 private IEventChannel<String, MatchActionOperations> installSetChannel;
54 private IEventChannel<String, SwitchResultList> installSetReplyChannel;
55
Ray Milkey18b44ac2014-08-22 08:29:47 -070056 // TODO Single instance for now, should be a work queue of some sort eventually
57 private Thread coordinator;
58 private Thread installer;
59 private final IDatagridService datagrid;
60
61 public MatchActionComponent(final IDatagridService newDatagrid,
62 final IFlowPusherService newPusher,
63 final IFloodlightProviderService newProvider) {
64 datagrid = newDatagrid;
65 pusher = newPusher;
66 provider = newProvider;
67 }
68
69 public void start() {
70 installSetChannel = datagrid.createChannel("onos.matchaction.installSetChannel",
71 String.class,
72 MatchActionOperations.class);
73
74 installSetReplyChannel = datagrid.createChannel("onos.matchaction.installSetReplyChannel",
75 String.class,
76 SwitchResultList.class);
77
78 coordinator = new Coordinator();
79 coordinator.start();
80
81 installer = new InstallerWorker();
82 installer.start();
83 }
84
85 public MatchActionOperationsId installMatchActionOperations(MatchActionOperations matchSet) {
86 if (checkResolved(matchSet)) {
87 matchSet.setState(MatchActionOperationsState.RESOLVED);
88 } else {
89 matchSet.setState(MatchActionOperationsState.INIT);
90 }
91 matchSetMap.put(matchSet.getOperationsId(), matchSet);
92 if (matchSet.getState() == MatchActionOperationsState.RESOLVED) {
93 resolvedQueue.add(matchSet.getOperationsId());
94 }
95 return matchSet.getOperationsId();
96 }
97
98 public MatchActionOperationsState getMatchActionOperationsState(MatchActionOperationsId matchSetId) {
99 MatchActionOperations set = matchSetMap.get(matchSetId);
100 return (set == null) ? null : set.getState();
101 }
102
103 protected boolean checkResolved(MatchActionOperations matchSet) {
104 boolean resolved = true;
105 for (MatchActionOperationsId setId : matchSet.getDependencies()) {
106 MatchActionOperations set = matchSetMap.get(setId);
107 if (set == null || set.getState() != MatchActionOperationsState.RESOLVED) {
108 resolved = false;
109 break;
110 }
111 }
112 return resolved;
113 }
114
Ray Milkey18b44ac2014-08-22 08:29:47 -0700115 class Coordinator extends Thread
116 implements IEventChannelListener<String, SwitchResultList> {
117
118 private Map<MatchActionOperationsId, Map<Dpid, SwitchResult>> pendingMatchActionOperationss = new HashMap<>();
119
120 protected Coordinator() {
121 installSetReplyChannel.addListener(this);
122 }
123
124 @Override
125 public void run() {
126 while (true) {
127 // 1. Remove MatchActionOperations(s) from the Global Resolved Queue
128 try {
129 MatchActionOperationsId setId = resolvedQueue.take();
130 processSet(setId);
131 } catch (InterruptedException e) {
132 log.warn("Error taking from resolved queue: {}", e.getMessage());
133 }
134 }
135 }
136
137 private void processSet(MatchActionOperationsId setId) {
138 MatchActionOperations matchSet = matchSetMap.get(setId);
139 matchSet.setState(MatchActionOperationsState.PENDING);
140 matchSetMap.put(setId, matchSet);
141
142 // TODO apply updates to in-memory flow table and resolve conflicts
143 // TODO generate apply and undo sets, using MatchActionOperations for now...
144
145 // build pending switches set for coordinator tracking
146 Map<Dpid, SwitchResult> switches = new HashMap<>();
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700147 for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
148 MatchAction matchAction = matchActionOp.getTarget();
149 SwitchPort sw = matchAction.getSwitchPort();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700150 switches.put(sw.getDpid(), new SwitchResult(setId, sw.getDpid()));
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700151 switch(matchActionOp.getOperator()) {
152 case ADD:
153 matchActionMap.put(matchAction.getId(), matchAction);
154 break;
155 case REMOVE:
156 //TODO
157 default:
158 throw new UnsupportedOperationException(
159 "Unsupported MatchAction operation" +
160 matchActionOp.getOperator().toString());
161 }
Ray Milkey18b44ac2014-08-22 08:29:47 -0700162 }
163 pendingMatchActionOperationss.put(setId, switches);
164
165 // distribute apply/undo sets to cluster
166 //installSetChannel.addTransientEntry(setId.toString(), matchSet);
167 }
168
169 @Override
170 public void entryAdded(SwitchResultList value) {
171 updateSwitchResults(value);
172 }
173
174 @Override
175 public void entryRemoved(SwitchResultList value) {
176 // noop
177 }
178
179 @Override
180 public void entryUpdated(SwitchResultList value) {
181 updateSwitchResults(value);
182 }
183
184 private void updateSwitchResults(SwitchResultList results) {
185 if (results == null || results.size() == 0) {
186 return;
187 }
188 MatchActionOperationsId matchSetId = results.get(0).getMatchActionOperationsId();
189
190 // apply updates from results list
191 Map<Dpid, SwitchResult> resultMap = pendingMatchActionOperationss.get(matchSetId);
192 for (SwitchResult result : results) {
193 SwitchResult resultToUpdate = resultMap.get(result.getSwitch());
194 if (resultToUpdate != null) {
195 resultToUpdate.setStatus(result.getStatus());
196 }
197 // else {
198 // TODO error!
199 // }
200 }
201
202 // check to see the overall outcome of the install operation
203 SwitchResult.Status setResult = SwitchResult.Status.SUCCESS;
204 for (SwitchResult result : resultMap.values()) {
205 if (result.getStatus().equals(SwitchResult.Status.FAILURE)) {
206 setResult = SwitchResult.Status.FAILURE;
207 // if any switch fails, we fail the installation
208 break;
209 } else if (!setResult.equals(SwitchResult.Status.FAILURE)
210 && result.getStatus().equals(SwitchResult.Status.UNKNOWN)) {
211 setResult = SwitchResult.Status.UNKNOWN;
212 }
213 }
214 switch (setResult) {
215 case SUCCESS:
216 // mark MatchActionOperations as INSTALLED
217 MatchActionOperations matchSet = matchSetMap.get(matchSetId);
218 matchSet.setState(MatchActionOperationsState.INSTALLED);
219 matchSetMap.replace(matchSetId, matchSet);
220 pendingMatchActionOperationss.remove(matchSetId);
221
222 // TODO update dependent sets as needed
223 break;
224 case FAILURE:
225 // mark MatchActionOperations as FAILED
226 matchSet = matchSetMap.get(matchSetId);
227 matchSet.setState(MatchActionOperationsState.FAILED);
228 matchSetMap.replace(matchSetId, matchSet);
229
230 // TODO instruct installers to install Undo set
231 break;
232 case UNKNOWN:
233 default:
234 // noop, still waiting for results
235 // TODO: check to see if installers are dead after timeout
236 }
237 }
238 }
239
240
241 class InstallerWorker extends Thread {
242
243 // Note: we should consider using an alternative representation for
244 // apply sets
245 protected void install(MatchActionOperations matchSet) {
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700246 Set<Long> masterDpids = provider.getAllMasterSwitchDpids();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700247
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700248 Set<MatchActionOperationEntry> installSet = new HashSet<>();
249 Set<Dpid> modifiedSwitches = new HashSet<>();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700250
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700251 for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
252 MatchAction matchAction = matchActionOp.getTarget();
253 Dpid dpid = matchAction.getSwitchPort().getDpid();
254 if (masterDpids.contains(dpid.value())) {
255 // only install if we are the master
256 // TODO this optimization will introduce some nice race
257 // conditions on failure requiring mastership change
258 installSet.add(matchActionOp);
259 modifiedSwitches.add(dpid);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700260 }
Ray Milkey18b44ac2014-08-22 08:29:47 -0700261 }
262
263 // push flow entries to switches
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700264 pusher.pushMatchActions(installSet);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700265
266 // insert a barrier after each phase on each modifiedSwitch
267 // wait for confirmation messages before proceeding
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700268 List<Pair<Dpid, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
269 for (Dpid dpid : modifiedSwitches) {
270 barriers.add(Pair.of(dpid, pusher.barrierAsync(dpid)));
Ray Milkey18b44ac2014-08-22 08:29:47 -0700271 }
272 List<SwitchResult> switchResults = new ArrayList<>();
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700273 for (Pair<Dpid, OFMessageFuture<OFBarrierReply>> pair : barriers) {
274 Dpid dpid = pair.getLeft();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700275 OFMessageFuture<OFBarrierReply> future = pair.getRight();
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700276 SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(),
277 dpid);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700278 try {
279 future.get();
280 switchResult.setStatus(SwitchResult.Status.SUCCESS);
281 } catch (InterruptedException | ExecutionException e) {
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700282 log.error("Barrier message not received for sw: {}", dpid);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700283 switchResult.setStatus(SwitchResult.Status.FAILURE);
284 }
285 switchResults.add(switchResult);
286 }
287
288 // send update message to coordinator
289 // TODO: we might want to use another ID here, i.e. GUID, to avoid
290 // overlap
291 final SwitchResultList switchResultList = new SwitchResultList();
292 switchResultList.addAll(switchResults);
293 installSetReplyChannel.addTransientEntry(matchSet.getOperationsId().toString(),
294 switchResultList);
295 }
296
Ray Milkey18b44ac2014-08-22 08:29:47 -0700297 @Override
298 public void run() {
299 while (true) {
300 // 1. Remove MatchActionOperations(s) from the Global Resolved Queue
301 try {
302 MatchActionOperations operations = installationWorkQueue.take();
303 install(operations);
304 } catch (InterruptedException e) {
305 log.warn("Error taking from installation queue: {}", e.getMessage());
306 }
307 }
308 }
309 }
310
311 class Installer
312 implements IEventChannelListener<String, MatchActionOperations> {
313
314 protected Installer() {
315 installSetChannel.addListener(this);
316 }
317
318
319 @Override
320 public void entryAdded(MatchActionOperations value) {
321 installationWorkQueue.add(value);
322 }
323
324 @Override
325 public void entryRemoved(MatchActionOperations value) {
326 // noop
327 }
328
329 @Override
330 public void entryUpdated(MatchActionOperations value) {
331 installationWorkQueue.add(value);
332 }
333 }
334
335 private final HashSet<MatchAction> currentOperations = new HashSet<>();
336
337 private boolean processMatchActionEntries(
338 final List<MatchActionOperationEntry> entries) {
339 int successfulOperations = 0;
340 for (final MatchActionOperationEntry entry : entries) {
341 if (currentOperations.add(entry.getTarget())) {
342 successfulOperations++;
343 }
344 }
345 return entries.size() == successfulOperations;
346 }
347
348 @Override
349 public boolean addMatchAction(MatchAction matchAction) {
350 return false;
351 }
352
353 @Override
354 public Set<MatchAction> getMatchActions() {
355 return Collections.unmodifiableSet(currentOperations);
356 }
357
358 @Override
359 public boolean executeOperations(final MatchActionOperations operations) {
360 installMatchActionOperations(operations);
361 return processMatchActionEntries(operations.getOperations());
362 }
363
364 @Override
365 public void setConflictDetectionPolicy(ConflictDetectionPolicy policy) {
366 // TODO Auto-generated method stub
367
368 }
369
370 @Override
371 public ConflictDetectionPolicy getConflictDetectionPolicy() {
372 // TODO Auto-generated method stub
373 return null;
374 }
375
376 @Override
377 public void addEventListener(EventListener listener) {
378 // TODO Auto-generated method stub
379
380 }
381
382 @Override
383 public void removeEventListener(EventListener listener) {
384 // TODO Auto-generated method stub
385
386 }
387
388 @Override
389 public IdGenerator<MatchActionId> getMatchActionIdGenerator() {
390 return null;
391 }
392
393 @Override
394 public IdGenerator<MatchActionOperationsId> getMatchActionOperationsIdGenerator() {
395 return null;
396 }
397
398}