blob: 52e7dd9182bb5a3f7d7aab6116b041cfb49ce5ea [file] [log] [blame]
Ray Milkey18b44ac2014-08-22 08:29:47 -07001package net.onrc.onos.core.matchaction;
2
Ray Milkey18b44ac2014-08-22 08:29:47 -07003import java.util.ArrayList;
4import java.util.Collections;
5import java.util.EventListener;
6import java.util.HashMap;
7import java.util.HashSet;
8import java.util.List;
9import java.util.Map;
10import java.util.Set;
11import java.util.concurrent.ArrayBlockingQueue;
12import java.util.concurrent.BlockingQueue;
13import java.util.concurrent.ConcurrentHashMap;
14import java.util.concurrent.ConcurrentMap;
15import java.util.concurrent.ExecutionException;
16
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070017import net.floodlightcontroller.core.IFloodlightProviderService;
18import net.floodlightcontroller.core.internal.OFMessageFuture;
19import net.floodlightcontroller.core.module.IFloodlightService;
20import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
21import net.onrc.onos.core.datagrid.IDatagridService;
22import net.onrc.onos.core.datagrid.IEventChannel;
23import net.onrc.onos.core.datagrid.IEventChannelListener;
24import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
Ray Milkey626a2b12014-09-02 11:19:06 -070025import net.onrc.onos.core.registry.IControllerRegistryService;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070026import net.onrc.onos.core.util.Dpid;
Ray Milkey626a2b12014-09-02 11:19:06 -070027import net.onrc.onos.core.util.IdBlockAllocator;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070028import net.onrc.onos.core.util.IdGenerator;
29import net.onrc.onos.core.util.SwitchPort;
30
31import org.apache.commons.lang3.tuple.Pair;
32import org.projectfloodlight.openflow.protocol.OFBarrierReply;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
Ray Milkey18b44ac2014-08-22 08:29:47 -070035
Ray Milkey18b44ac2014-08-22 08:29:47 -070036/**
37 * Manages Match-Action entries.
38 * <p>
39 * TODO: Make all methods thread-safe
40 */
41public class MatchActionComponent implements MatchActionService, IFloodlightService {
42
43 private static final Logger log = LoggerFactory.getLogger(MatchActionService.class);
44 IFlowPusherService pusher;
45 IFloodlightProviderService provider;
46
47 private ConcurrentMap<MatchActionId, MatchAction> matchActionMap = new ConcurrentHashMap<>();
48 private ConcurrentMap<MatchActionOperationsId, MatchActionOperations> matchSetMap =
49 new ConcurrentHashMap<>();
50 // TODO - want something better here for the resolved Queue
51 private BlockingQueue<MatchActionOperationsId> resolvedQueue = new ArrayBlockingQueue<>(100);
52 private BlockingQueue<MatchActionOperations> installationWorkQueue = new ArrayBlockingQueue<>(100);
53
54 private IEventChannel<String, MatchActionOperations> installSetChannel;
55 private IEventChannel<String, SwitchResultList> installSetReplyChannel;
56
Ray Milkey18b44ac2014-08-22 08:29:47 -070057 // TODO Single instance for now, should be a work queue of some sort eventually
58 private Thread coordinator;
59 private Thread installer;
60 private final IDatagridService datagrid;
Ray Milkey626a2b12014-09-02 11:19:06 -070061 private IControllerRegistryService registryService;
62
63 private MatchActionIdGeneratorWithIdBlockAllocator matchActionIdGenerator;
64 private MatchActionOperationsIdGeneratorWithIdBlockAllocator matchActionOperationsIdGenerator;
Ray Milkey18b44ac2014-08-22 08:29:47 -070065
66 public MatchActionComponent(final IDatagridService newDatagrid,
67 final IFlowPusherService newPusher,
Ray Milkey626a2b12014-09-02 11:19:06 -070068 final IFloodlightProviderService newProvider,
69 final IControllerRegistryService newRegistryService) {
Ray Milkey18b44ac2014-08-22 08:29:47 -070070 datagrid = newDatagrid;
71 pusher = newPusher;
72 provider = newProvider;
Ray Milkey626a2b12014-09-02 11:19:06 -070073 registryService = newRegistryService;
Ray Milkey18b44ac2014-08-22 08:29:47 -070074 }
75
76 public void start() {
Ray Milkey626a2b12014-09-02 11:19:06 -070077 IdBlockAllocator idBlockAllocator = registryService;
78 matchActionIdGenerator =
79 new MatchActionIdGeneratorWithIdBlockAllocator(idBlockAllocator);
80 matchActionOperationsIdGenerator =
81 new MatchActionOperationsIdGeneratorWithIdBlockAllocator(idBlockAllocator);
82
Ray Milkey18b44ac2014-08-22 08:29:47 -070083 installSetChannel = datagrid.createChannel("onos.matchaction.installSetChannel",
84 String.class,
85 MatchActionOperations.class);
86
87 installSetReplyChannel = datagrid.createChannel("onos.matchaction.installSetReplyChannel",
88 String.class,
89 SwitchResultList.class);
90
91 coordinator = new Coordinator();
92 coordinator.start();
93
94 installer = new InstallerWorker();
95 installer.start();
96 }
97
98 public MatchActionOperationsId installMatchActionOperations(MatchActionOperations matchSet) {
99 if (checkResolved(matchSet)) {
100 matchSet.setState(MatchActionOperationsState.RESOLVED);
101 } else {
102 matchSet.setState(MatchActionOperationsState.INIT);
103 }
104 matchSetMap.put(matchSet.getOperationsId(), matchSet);
105 if (matchSet.getState() == MatchActionOperationsState.RESOLVED) {
106 resolvedQueue.add(matchSet.getOperationsId());
107 }
108 return matchSet.getOperationsId();
109 }
110
111 public MatchActionOperationsState getMatchActionOperationsState(MatchActionOperationsId matchSetId) {
112 MatchActionOperations set = matchSetMap.get(matchSetId);
113 return (set == null) ? null : set.getState();
114 }
115
116 protected boolean checkResolved(MatchActionOperations matchSet) {
117 boolean resolved = true;
118 for (MatchActionOperationsId setId : matchSet.getDependencies()) {
119 MatchActionOperations set = matchSetMap.get(setId);
120 if (set == null || set.getState() != MatchActionOperationsState.RESOLVED) {
121 resolved = false;
122 break;
123 }
124 }
125 return resolved;
126 }
127
Ray Milkey18b44ac2014-08-22 08:29:47 -0700128 class Coordinator extends Thread
129 implements IEventChannelListener<String, SwitchResultList> {
130
131 private Map<MatchActionOperationsId, Map<Dpid, SwitchResult>> pendingMatchActionOperationss = new HashMap<>();
132
133 protected Coordinator() {
134 installSetReplyChannel.addListener(this);
135 }
136
137 @Override
138 public void run() {
139 while (true) {
140 // 1. Remove MatchActionOperations(s) from the Global Resolved Queue
141 try {
142 MatchActionOperationsId setId = resolvedQueue.take();
143 processSet(setId);
144 } catch (InterruptedException e) {
145 log.warn("Error taking from resolved queue: {}", e.getMessage());
146 }
147 }
148 }
149
150 private void processSet(MatchActionOperationsId setId) {
151 MatchActionOperations matchSet = matchSetMap.get(setId);
152 matchSet.setState(MatchActionOperationsState.PENDING);
153 matchSetMap.put(setId, matchSet);
154
155 // TODO apply updates to in-memory flow table and resolve conflicts
156 // TODO generate apply and undo sets, using MatchActionOperations for now...
157
158 // build pending switches set for coordinator tracking
159 Map<Dpid, SwitchResult> switches = new HashMap<>();
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700160 for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
161 MatchAction matchAction = matchActionOp.getTarget();
162 SwitchPort sw = matchAction.getSwitchPort();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700163 switches.put(sw.getDpid(), new SwitchResult(setId, sw.getDpid()));
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700164 switch(matchActionOp.getOperator()) {
165 case ADD:
166 matchActionMap.put(matchAction.getId(), matchAction);
167 break;
168 case REMOVE:
169 //TODO
170 default:
171 throw new UnsupportedOperationException(
172 "Unsupported MatchAction operation" +
173 matchActionOp.getOperator().toString());
174 }
Ray Milkey18b44ac2014-08-22 08:29:47 -0700175 }
176 pendingMatchActionOperationss.put(setId, switches);
177
178 // distribute apply/undo sets to cluster
179 //installSetChannel.addTransientEntry(setId.toString(), matchSet);
180 }
181
182 @Override
183 public void entryAdded(SwitchResultList value) {
184 updateSwitchResults(value);
185 }
186
187 @Override
188 public void entryRemoved(SwitchResultList value) {
189 // noop
190 }
191
192 @Override
193 public void entryUpdated(SwitchResultList value) {
194 updateSwitchResults(value);
195 }
196
197 private void updateSwitchResults(SwitchResultList results) {
198 if (results == null || results.size() == 0) {
199 return;
200 }
201 MatchActionOperationsId matchSetId = results.get(0).getMatchActionOperationsId();
202
203 // apply updates from results list
204 Map<Dpid, SwitchResult> resultMap = pendingMatchActionOperationss.get(matchSetId);
205 for (SwitchResult result : results) {
206 SwitchResult resultToUpdate = resultMap.get(result.getSwitch());
207 if (resultToUpdate != null) {
208 resultToUpdate.setStatus(result.getStatus());
209 }
210 // else {
211 // TODO error!
212 // }
213 }
214
215 // check to see the overall outcome of the install operation
216 SwitchResult.Status setResult = SwitchResult.Status.SUCCESS;
217 for (SwitchResult result : resultMap.values()) {
218 if (result.getStatus().equals(SwitchResult.Status.FAILURE)) {
219 setResult = SwitchResult.Status.FAILURE;
220 // if any switch fails, we fail the installation
221 break;
222 } else if (!setResult.equals(SwitchResult.Status.FAILURE)
223 && result.getStatus().equals(SwitchResult.Status.UNKNOWN)) {
224 setResult = SwitchResult.Status.UNKNOWN;
225 }
226 }
227 switch (setResult) {
228 case SUCCESS:
229 // mark MatchActionOperations as INSTALLED
230 MatchActionOperations matchSet = matchSetMap.get(matchSetId);
231 matchSet.setState(MatchActionOperationsState.INSTALLED);
232 matchSetMap.replace(matchSetId, matchSet);
233 pendingMatchActionOperationss.remove(matchSetId);
234
235 // TODO update dependent sets as needed
236 break;
237 case FAILURE:
238 // mark MatchActionOperations as FAILED
239 matchSet = matchSetMap.get(matchSetId);
240 matchSet.setState(MatchActionOperationsState.FAILED);
241 matchSetMap.replace(matchSetId, matchSet);
242
243 // TODO instruct installers to install Undo set
244 break;
245 case UNKNOWN:
246 default:
247 // noop, still waiting for results
248 // TODO: check to see if installers are dead after timeout
249 }
250 }
251 }
252
253
254 class InstallerWorker extends Thread {
255
256 // Note: we should consider using an alternative representation for
257 // apply sets
258 protected void install(MatchActionOperations matchSet) {
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700259 Set<Long> masterDpids = provider.getAllMasterSwitchDpids();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700260
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700261 Set<MatchActionOperationEntry> installSet = new HashSet<>();
262 Set<Dpid> modifiedSwitches = new HashSet<>();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700263
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700264 for (MatchActionOperationEntry matchActionOp : matchSet.getOperations()) {
265 MatchAction matchAction = matchActionOp.getTarget();
266 Dpid dpid = matchAction.getSwitchPort().getDpid();
267 if (masterDpids.contains(dpid.value())) {
268 // only install if we are the master
269 // TODO this optimization will introduce some nice race
270 // conditions on failure requiring mastership change
271 installSet.add(matchActionOp);
272 modifiedSwitches.add(dpid);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700273 }
Ray Milkey18b44ac2014-08-22 08:29:47 -0700274 }
275
276 // push flow entries to switches
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700277 pusher.pushMatchActions(installSet);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700278
279 // insert a barrier after each phase on each modifiedSwitch
280 // wait for confirmation messages before proceeding
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700281 List<Pair<Dpid, OFMessageFuture<OFBarrierReply>>> barriers = new ArrayList<>();
282 for (Dpid dpid : modifiedSwitches) {
283 barriers.add(Pair.of(dpid, pusher.barrierAsync(dpid)));
Ray Milkey18b44ac2014-08-22 08:29:47 -0700284 }
285 List<SwitchResult> switchResults = new ArrayList<>();
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700286 for (Pair<Dpid, OFMessageFuture<OFBarrierReply>> pair : barriers) {
287 Dpid dpid = pair.getLeft();
Ray Milkey18b44ac2014-08-22 08:29:47 -0700288 OFMessageFuture<OFBarrierReply> future = pair.getRight();
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700289 SwitchResult switchResult = new SwitchResult(matchSet.getOperationsId(),
290 dpid);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700291 try {
292 future.get();
293 switchResult.setStatus(SwitchResult.Status.SUCCESS);
294 } catch (InterruptedException | ExecutionException e) {
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700295 log.error("Barrier message not received for sw: {}", dpid);
Ray Milkey18b44ac2014-08-22 08:29:47 -0700296 switchResult.setStatus(SwitchResult.Status.FAILURE);
297 }
298 switchResults.add(switchResult);
299 }
300
301 // send update message to coordinator
302 // TODO: we might want to use another ID here, i.e. GUID, to avoid
303 // overlap
304 final SwitchResultList switchResultList = new SwitchResultList();
305 switchResultList.addAll(switchResults);
306 installSetReplyChannel.addTransientEntry(matchSet.getOperationsId().toString(),
307 switchResultList);
308 }
309
Ray Milkey18b44ac2014-08-22 08:29:47 -0700310 @Override
311 public void run() {
312 while (true) {
313 // 1. Remove MatchActionOperations(s) from the Global Resolved Queue
314 try {
315 MatchActionOperations operations = installationWorkQueue.take();
316 install(operations);
317 } catch (InterruptedException e) {
318 log.warn("Error taking from installation queue: {}", e.getMessage());
319 }
320 }
321 }
322 }
323
324 class Installer
325 implements IEventChannelListener<String, MatchActionOperations> {
326
327 protected Installer() {
328 installSetChannel.addListener(this);
329 }
330
331
332 @Override
333 public void entryAdded(MatchActionOperations value) {
334 installationWorkQueue.add(value);
335 }
336
337 @Override
338 public void entryRemoved(MatchActionOperations value) {
339 // noop
340 }
341
342 @Override
343 public void entryUpdated(MatchActionOperations value) {
344 installationWorkQueue.add(value);
345 }
346 }
347
348 private final HashSet<MatchAction> currentOperations = new HashSet<>();
349
350 private boolean processMatchActionEntries(
351 final List<MatchActionOperationEntry> entries) {
352 int successfulOperations = 0;
353 for (final MatchActionOperationEntry entry : entries) {
354 if (currentOperations.add(entry.getTarget())) {
355 successfulOperations++;
356 }
357 }
358 return entries.size() == successfulOperations;
359 }
360
361 @Override
362 public boolean addMatchAction(MatchAction matchAction) {
363 return false;
364 }
365
366 @Override
367 public Set<MatchAction> getMatchActions() {
368 return Collections.unmodifiableSet(currentOperations);
369 }
370
371 @Override
372 public boolean executeOperations(final MatchActionOperations operations) {
373 installMatchActionOperations(operations);
374 return processMatchActionEntries(operations.getOperations());
375 }
376
377 @Override
378 public void setConflictDetectionPolicy(ConflictDetectionPolicy policy) {
379 // TODO Auto-generated method stub
380
381 }
382
383 @Override
384 public ConflictDetectionPolicy getConflictDetectionPolicy() {
385 // TODO Auto-generated method stub
386 return null;
387 }
388
389 @Override
390 public void addEventListener(EventListener listener) {
391 // TODO Auto-generated method stub
392
393 }
394
395 @Override
396 public void removeEventListener(EventListener listener) {
397 // TODO Auto-generated method stub
398
399 }
400
401 @Override
402 public IdGenerator<MatchActionId> getMatchActionIdGenerator() {
Ray Milkey626a2b12014-09-02 11:19:06 -0700403 return matchActionIdGenerator;
Ray Milkey18b44ac2014-08-22 08:29:47 -0700404 }
405
406 @Override
407 public IdGenerator<MatchActionOperationsId> getMatchActionOperationsIdGenerator() {
Ray Milkey626a2b12014-09-02 11:19:06 -0700408 return matchActionOperationsIdGenerator;
Ray Milkey18b44ac2014-08-22 08:29:47 -0700409 }
410
411}