blob: b040147f30b49c6b42ff790292f2f1467e0b95ec [file] [log] [blame]
Charles Chanf7b1b4b2019-01-16 15:30:39 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.portloadbalancer.app;
18
19import com.google.common.collect.ImmutableMap;
20import com.google.common.collect.Sets;
21import org.osgi.service.component.annotations.Activate;
22import org.osgi.service.component.annotations.Component;
23import org.osgi.service.component.annotations.Deactivate;
24import org.osgi.service.component.annotations.Reference;
25import org.osgi.service.component.annotations.ReferenceCardinality;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.portloadbalancer.api.PortLoadBalancer;
33import org.onosproject.portloadbalancer.api.PortLoadBalancerData;
34import org.onosproject.portloadbalancer.api.PortLoadBalancerEvent;
35import org.onosproject.portloadbalancer.api.PortLoadBalancerAdminService;
36import org.onosproject.portloadbalancer.api.PortLoadBalancerId;
37import org.onosproject.portloadbalancer.api.PortLoadBalancerListener;
38import org.onosproject.portloadbalancer.api.PortLoadBalancerMode;
39import org.onosproject.portloadbalancer.api.PortLoadBalancerService;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.device.DeviceEvent;
44import org.onosproject.net.device.DeviceListener;
45import org.onosproject.net.device.DeviceService;
46import org.onosproject.net.flow.DefaultTrafficSelector;
47import org.onosproject.net.flow.DefaultTrafficTreatment;
48import org.onosproject.net.flow.TrafficSelector;
49import org.onosproject.net.flow.TrafficTreatment;
50import org.onosproject.net.flow.instructions.Instruction;
51import org.onosproject.net.flow.instructions.Instructions;
52import org.onosproject.net.flowobjective.DefaultNextObjective;
53import org.onosproject.net.flowobjective.DefaultNextTreatment;
54import org.onosproject.net.flowobjective.FlowObjectiveService;
55import org.onosproject.net.flowobjective.NextObjective;
56import org.onosproject.net.flowobjective.Objective;
57import org.onosproject.net.flowobjective.ObjectiveContext;
58import org.onosproject.net.flowobjective.ObjectiveError;
59import org.onosproject.store.serializers.KryoNamespaces;
60import org.onosproject.store.service.ConsistentMap;
61import org.onosproject.store.service.MapEvent;
62import org.onosproject.store.service.MapEventListener;
63import org.onosproject.store.service.Serializer;
64import org.onosproject.store.service.StorageService;
65import org.onosproject.store.service.Versioned;
66import org.slf4j.Logger;
67
68import java.util.Collection;
69import java.util.Map;
70import java.util.Set;
71import java.util.concurrent.ExecutorService;
72import java.util.concurrent.Executors;
73import java.util.stream.Collectors;
74
75import static org.onlab.util.Tools.groupedThreads;
76import static org.slf4j.LoggerFactory.getLogger;
77
78@Component(
79 immediate = true,
80 service = { PortLoadBalancerService.class, PortLoadBalancerAdminService.class }
81)
82public class PortLoadBalancerManager implements PortLoadBalancerService, PortLoadBalancerAdminService {
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 private CoreService coreService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 private StorageService storageService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 private FlowObjectiveService flowObjService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 private MastershipService mastershipService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 private LeadershipService leadershipService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 private ClusterService clusterService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 private DeviceService deviceService;
104
105 private static final Logger log = getLogger(PortLoadBalancerManager.class);
106 private static final String APP_NAME = "org.onosproject.portloadbalancer";
107
108 private ApplicationId appId;
109 private ConsistentMap<PortLoadBalancerId, PortLoadBalancer> portLoadBalancerStore;
110 private ConsistentMap<PortLoadBalancerId, Integer> portLoadBalancerNextStore;
111 // TODO Evaluate if ResourceService is a better option
112 private ConsistentMap<PortLoadBalancerId, ApplicationId> portLoadBalancerResStore;
113 private Set<PortLoadBalancerListener> listeners = Sets.newConcurrentHashSet();
114
115 private ExecutorService portLoadBalancerEventExecutor;
116 private ExecutorService portLoadBalancerProvExecutor;
117 private ExecutorService deviceEventExecutor;
118
119 private MapEventListener<PortLoadBalancerId, PortLoadBalancer> portLoadBalancerStoreListener;
120 // TODO build CLI to view and clear the next store
121 private MapEventListener<PortLoadBalancerId, Integer> portLoadBalancerNextStoreListener;
122 private MapEventListener<PortLoadBalancerId, ApplicationId> portLoadBalancerResStoreListener;
123 private final DeviceListener deviceListener = new InternalDeviceListener();
124
125 @Activate
126 public void activate() {
127 appId = coreService.registerApplication(APP_NAME);
128
129 portLoadBalancerEventExecutor = Executors.newSingleThreadExecutor(
130 groupedThreads("portloadbalancer-event", "%d", log));
131 portLoadBalancerProvExecutor = Executors.newSingleThreadExecutor(
132 groupedThreads("portloadbalancer-prov", "%d", log));
133 deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
134 groupedThreads("portloadbalancer-dev-event", "%d", log));
135 portLoadBalancerStoreListener = new PortLoadBalancerStoreListener();
136 portLoadBalancerNextStoreListener = new PortLoadBalancerNextStoreListener();
137 portLoadBalancerResStoreListener = new PortLoadBalancerResStoreListener();
138
139 KryoNamespace serializer = KryoNamespace.newBuilder()
140 .register(KryoNamespaces.API)
141 .register(PortLoadBalancer.class)
142 .register(PortLoadBalancerId.class)
143 .register(PortLoadBalancerMode.class)
144 .build();
145 portLoadBalancerStore = storageService.<PortLoadBalancerId, PortLoadBalancer>consistentMapBuilder()
146 .withName("onos-portloadbalancer-store")
147 .withRelaxedReadConsistency()
148 .withSerializer(Serializer.using(serializer))
149 .build();
150 portLoadBalancerStore.addListener(portLoadBalancerStoreListener);
151 portLoadBalancerNextStore = storageService.<PortLoadBalancerId, Integer>consistentMapBuilder()
152 .withName("onos-portloadbalancer-next-store")
153 .withRelaxedReadConsistency()
154 .withSerializer(Serializer.using(serializer))
155 .build();
156 portLoadBalancerNextStore.addListener(portLoadBalancerNextStoreListener);
157 portLoadBalancerResStore = storageService.<PortLoadBalancerId, ApplicationId>consistentMapBuilder()
158 .withName("onos-portloadbalancer-res-store")
159 .withRelaxedReadConsistency()
160 .withSerializer(Serializer.using(serializer))
161 .build();
162 portLoadBalancerResStore.addListener(portLoadBalancerResStoreListener);
163
164 deviceService.addListener(deviceListener);
165
166 log.info("Started");
167 }
168
169 @Deactivate
170 public void deactivate() {
171 portLoadBalancerStore.removeListener(portLoadBalancerStoreListener);
172 portLoadBalancerNextStore.removeListener(portLoadBalancerNextStoreListener);
173
pier024f44c2019-04-01 23:38:42 -0700174 deviceService.removeListener(deviceListener);
175
Charles Chanf7b1b4b2019-01-16 15:30:39 -0800176 portLoadBalancerEventExecutor.shutdown();
177 portLoadBalancerProvExecutor.shutdown();
178 deviceEventExecutor.shutdown();
179
180 log.info("Stopped");
181 }
182
183 @Override
184 public void addListener(PortLoadBalancerListener listener) {
185 listeners.add(listener);
186 }
187
188 @Override
189 public void removeListener(PortLoadBalancerListener listener) {
190 listeners.remove(listener);
191 }
192
193 @Override
194 public PortLoadBalancer createOrUpdate(PortLoadBalancerId portLoadBalancerId, Set<PortNumber> ports,
195 PortLoadBalancerMode mode) {
196 log.debug("Putting {} -> {} {} into port load balancer store", portLoadBalancerId, mode, ports);
197 return Versioned.valueOrNull(portLoadBalancerStore.put(portLoadBalancerId,
198 new PortLoadBalancer(portLoadBalancerId, ports, mode)));
199 }
200
201 @Override
202 public PortLoadBalancer remove(PortLoadBalancerId portLoadBalancerId) {
203 ApplicationId reservation = Versioned.valueOrNull(portLoadBalancerResStore.get(portLoadBalancerId));
204 // Remove only if it is not used - otherwise it is necessary to release first
205 if (reservation == null) {
206 log.debug("Removing {} from port load balancer store", portLoadBalancerId);
207 return Versioned.valueOrNull(portLoadBalancerStore.remove(portLoadBalancerId));
208 }
209 log.warn("Removal {} from port load balancer store was not possible " +
210 "due to a previous reservation", portLoadBalancerId);
211 return null;
212 }
213
214 @Override
215 public Map<PortLoadBalancerId, PortLoadBalancer> getPortLoadBalancers() {
216 return ImmutableMap.copyOf(portLoadBalancerStore.asJavaMap());
217 }
218
219 @Override
220 public PortLoadBalancer getPortLoadBalancer(PortLoadBalancerId portLoadBalancerId) {
221 return Versioned.valueOrNull(portLoadBalancerStore.get(portLoadBalancerId));
222 }
223
224 @Override
225 public Map<PortLoadBalancerId, Integer> getPortLoadBalancerNexts() {
226 return ImmutableMap.copyOf(portLoadBalancerNextStore.asJavaMap());
227 }
228
229 @Override
230 public int getPortLoadBalancerNext(PortLoadBalancerId portLoadBalancerId) {
231 return Versioned.valueOrNull(portLoadBalancerNextStore.get(portLoadBalancerId));
232 }
233
234 @Override
235 public boolean reserve(PortLoadBalancerId portLoadBalancerId, ApplicationId appId) {
236 // Check if the resource is available
237 ApplicationId reservation = Versioned.valueOrNull(portLoadBalancerResStore.get(portLoadBalancerId));
238 PortLoadBalancer portLoadBalancer = Versioned.valueOrNull(portLoadBalancerStore.get(portLoadBalancerId));
239 if (reservation == null && portLoadBalancer != null) {
240 log.debug("Reserving {} -> {} into port load balancer reservation store", portLoadBalancerId, appId);
241 return portLoadBalancerResStore.put(portLoadBalancerId, appId) == null;
242 } else if (reservation != null && reservation.equals(appId)) {
243 // App try to reserve the resource a second time
244 log.debug("Already reserved {} -> {} skip reservation", portLoadBalancerId, appId);
245 return true;
246 }
247 log.warn("Reservation failed {} -> {}", portLoadBalancerId, appId);
248 return false;
249 }
250
251 @Override
252 public boolean release(PortLoadBalancerId portLoadBalancerId, ApplicationId appId) {
253 // Check if the resource is reserved
254 ApplicationId reservation = Versioned.valueOrNull(portLoadBalancerResStore.get(portLoadBalancerId));
255 if (reservation != null && reservation.equals(appId)) {
256 log.debug("Removing {} -> {} from port load balancer reservation store", portLoadBalancerId, appId);
257 return portLoadBalancerResStore.remove(portLoadBalancerId) != null;
258 }
259 log.warn("Release failed {} -> {}", portLoadBalancerId, appId);
260 return false;
261 }
262
263 @Override
264 public ApplicationId getReservation(PortLoadBalancerId portLoadBalancerId) {
265 return Versioned.valueOrNull(portLoadBalancerResStore.get(portLoadBalancerId));
266 }
267
268 @Override
269 public Map<PortLoadBalancerId, ApplicationId> getReservations() {
270 return portLoadBalancerResStore.asJavaMap();
271 }
272
273 private class PortLoadBalancerStoreListener implements MapEventListener<PortLoadBalancerId, PortLoadBalancer> {
274 public void event(MapEvent<PortLoadBalancerId, PortLoadBalancer> event) {
275 switch (event.type()) {
276 case INSERT:
277 log.debug("PortLoadBalancer {} insert new={}, old={}", event.key(), event.newValue(),
278 event.oldValue());
279 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.ADDED, event.newValue().value().data(),
280 null));
281 populatePortLoadBalancer(event.newValue().value());
282 break;
283 case REMOVE:
284 log.debug("PortLoadBalancer {} remove new={}, old={}", event.key(), event.newValue(),
285 event.oldValue());
286 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.REMOVED, null,
287 event.oldValue().value().data()));
288 revokePortLoadBalancer(event.oldValue().value());
289 break;
290 case UPDATE:
291 log.debug("PortLoadBalancer {} update new={}, old={}", event.key(), event.newValue(),
292 event.oldValue());
293 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.UPDATED, event.newValue().value().data(),
294 event.oldValue().value().data()));
295 updatePortLoadBalancer(event.newValue().value(), event.oldValue().value());
296 break;
297 default:
298 break;
299 }
300 }
301 }
302
303 private class PortLoadBalancerNextStoreListener implements MapEventListener<PortLoadBalancerId, Integer> {
304 public void event(MapEvent<PortLoadBalancerId, Integer> event) {
305 switch (event.type()) {
306 case INSERT:
307 log.debug("PortLoadBalancer next {} insert new={}, old={}", event.key(), event.newValue(),
308 event.oldValue());
309 break;
310 case REMOVE:
311 log.debug("PortLoadBalancer next {} remove new={}, old={}", event.key(), event.newValue(),
312 event.oldValue());
313 break;
314 case UPDATE:
315 log.debug("PortLoadBalancer next {} update new={}, old={}", event.key(), event.newValue(),
316 event.oldValue());
317 break;
318 default:
319 break;
320 }
321 }
322 }
323
324 private class PortLoadBalancerResStoreListener implements MapEventListener<PortLoadBalancerId, ApplicationId> {
325 public void event(MapEvent<PortLoadBalancerId, ApplicationId> event) {
326 switch (event.type()) {
327 case INSERT:
328 log.debug("PortLoadBalancer reservation {} insert new={}, old={}", event.key(), event.newValue(),
329 event.oldValue());
330 break;
331 case REMOVE:
332 log.debug("PortLoadBalancer reservation {} remove new={}, old={}", event.key(), event.newValue(),
333 event.oldValue());
334 break;
335 case UPDATE:
336 log.debug("PortLoadBalancer reservation {} update new={}, old={}", event.key(), event.newValue(),
337 event.oldValue());
338 break;
339 default:
340 break;
341 }
342 }
343 }
344
345 private class InternalDeviceListener implements DeviceListener {
346 // We want to manage only a subset of events and if we are the leader
347 @Override
348 public void event(DeviceEvent event) {
349 deviceEventExecutor.execute(() -> {
350 DeviceId deviceId = event.subject().id();
351 if (!isLocalLeader(deviceId)) {
352 log.debug("Not the leader of {}. Skip event {}", deviceId, event);
353 return;
354 }
355 // Populate or revoke according to the device availability
356 if (deviceService.isAvailable(deviceId)) {
357 init(deviceId);
358 } else {
359 cleanup(deviceId);
360 }
361 });
362 }
363 // Some events related to the devices are skipped
364 @Override
365 public boolean isRelevant(DeviceEvent event) {
366 return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
367 event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
368 event.type() == DeviceEvent.Type.DEVICE_UPDATED;
369 }
370 }
371
372 private void post(PortLoadBalancerEvent portLoadBalancerEvent) {
373 portLoadBalancerEventExecutor.execute(() -> {
374 for (PortLoadBalancerListener l : listeners) {
375 if (l.isRelevant(portLoadBalancerEvent)) {
376 l.event(portLoadBalancerEvent);
377 }
378 }
379 });
380 }
381
382 private void init(DeviceId deviceId) {
383 portLoadBalancerStore.entrySet().stream()
384 .filter(portLoadBalancerEntry -> portLoadBalancerEntry.getKey().deviceId().equals(deviceId))
385 .forEach(portLoadBalancerEntry -> populatePortLoadBalancer(portLoadBalancerEntry.getValue().value()));
386 }
387
388 private void cleanup(DeviceId deviceId) {
389 portLoadBalancerStore.entrySet().stream()
390 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
391 .forEach(entry -> portLoadBalancerNextStore.remove(entry.getKey()));
392 log.debug("{} is removed from portLoadBalancerNextStore", deviceId);
393 }
394
395 private void populatePortLoadBalancer(PortLoadBalancer portLoadBalancer) {
396 DeviceId deviceId = portLoadBalancer.portLoadBalancerId().deviceId();
397 if (!isLocalLeader(deviceId)) {
398 log.debug("Not the leader of {}. Skip populatePortLoadBalancer {}", deviceId,
399 portLoadBalancer.portLoadBalancerId());
400 return;
401 }
402
403 portLoadBalancerProvExecutor.execute(() -> {
404 Integer nextid = Versioned.valueOrNull(portLoadBalancerNextStore
405 .get(portLoadBalancer.portLoadBalancerId()));
406 if (nextid == null) {
407 // Build a new context and new next objective
408 PortLoadBalancerObjectiveContext context =
409 new PortLoadBalancerObjectiveContext(portLoadBalancer.portLoadBalancerId());
410 NextObjective nextObj = nextObjBuilder(portLoadBalancer.portLoadBalancerId(),
411 portLoadBalancer.ports(), nextid).add(context);
412 // Finally submit, store, and register the resource
413 flowObjService.next(deviceId, nextObj);
414 portLoadBalancerNextStore.put(portLoadBalancer.portLoadBalancerId(), nextObj.id());
415 } else {
416 log.info("NextObj for {} already exists. Skip populatePortLoadBalancer",
417 portLoadBalancer.portLoadBalancerId());
418 }
419 });
420 }
421
422 private void revokePortLoadBalancer(PortLoadBalancer portLoadBalancer) {
423 DeviceId deviceId = portLoadBalancer.portLoadBalancerId().deviceId();
424 if (!isLocalLeader(deviceId)) {
425 log.debug("Not the leader of {}. Skip revokePortLoadBalancer {}", deviceId,
426 portLoadBalancer.portLoadBalancerId());
427 return;
428 }
429
430 portLoadBalancerProvExecutor.execute(() -> {
431 Integer nextid = Versioned.valueOrNull(portLoadBalancerNextStore.get(portLoadBalancer
432 .portLoadBalancerId()));
433 if (nextid != null) {
434 // Build a new context and remove old next objective
435 PortLoadBalancerObjectiveContext context =
436 new PortLoadBalancerObjectiveContext(portLoadBalancer.portLoadBalancerId());
437 NextObjective nextObj = nextObjBuilder(portLoadBalancer.portLoadBalancerId(), portLoadBalancer.ports(),
438 nextid).remove(context);
439 // Finally submit and invalidate the store
440 flowObjService.next(deviceId, nextObj);
441 portLoadBalancerNextStore.remove(portLoadBalancer.portLoadBalancerId());
442 } else {
443 log.info("NextObj for {} does not exist. Skip revokePortLoadBalancer",
444 portLoadBalancer.portLoadBalancerId());
445 }
446 });
447 }
448
449 private void updatePortLoadBalancer(PortLoadBalancer newPortLoadBalancer, PortLoadBalancer oldPortLoadBalancer) {
450 DeviceId deviceId = newPortLoadBalancer.portLoadBalancerId().deviceId();
451 if (!isLocalLeader(deviceId)) {
452 log.debug("Not the leader of {}. Skip updatePortLoadBalancer {}", deviceId,
453 newPortLoadBalancer.portLoadBalancerId());
454 return;
455 }
456
457 portLoadBalancerProvExecutor.execute(() -> {
458 Integer nextid = Versioned.valueOrNull(portLoadBalancerNextStore
459 .get(newPortLoadBalancer.portLoadBalancerId()));
460 if (nextid != null) {
461 // Compute modifications and context
462 PortLoadBalancerObjectiveContext context =
463 new PortLoadBalancerObjectiveContext(newPortLoadBalancer.portLoadBalancerId());
464 Set<PortNumber> portsToBeAdded =
465 Sets.difference(newPortLoadBalancer.ports(), oldPortLoadBalancer.ports());
466 Set<PortNumber> portsToBeRemoved =
467 Sets.difference(oldPortLoadBalancer.ports(), newPortLoadBalancer.ports());
468 // and send them to the flowobj subsystem
469 if (!portsToBeAdded.isEmpty()) {
470 flowObjService.next(deviceId, nextObjBuilder(newPortLoadBalancer.portLoadBalancerId(),
471 portsToBeAdded, nextid)
472 .addToExisting(context));
473 } else {
474 log.debug("NextObj for {} nothing to add", newPortLoadBalancer.portLoadBalancerId());
475
476 }
477 if (!portsToBeRemoved.isEmpty()) {
478 flowObjService.next(deviceId, nextObjBuilder(newPortLoadBalancer.portLoadBalancerId(),
479 portsToBeRemoved, nextid)
480 .removeFromExisting(context));
481 } else {
482 log.debug("NextObj for {} nothing to remove", newPortLoadBalancer.portLoadBalancerId());
483 }
484 } else {
485 log.info("NextObj for {} does not exist. Skip updatePortLoadBalancer",
486 newPortLoadBalancer.portLoadBalancerId());
487 }
488 });
489 }
490
491 private NextObjective.Builder nextObjBuilder(PortLoadBalancerId portLoadBalancerId, Set<PortNumber> ports,
492 Integer nextId) {
493 if (nextId == null) {
494 nextId = flowObjService.allocateNextId();
495 }
496 // This metadata is used to pass the key to the driver.
497 // Some driver, e.g. OF-DPA, will use that information while creating load balancing group.
498 // TODO This is not an actual LAG port. In the future, we should extend metadata structure to carry
499 // generic information. We should avoid using in_port in the metadata once generic metadata is available.
500 TrafficSelector meta = DefaultTrafficSelector.builder()
501 .matchInPort(PortNumber.portNumber(portLoadBalancerId.key())).build();
502 NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
503 .withId(nextId)
504 .withMeta(meta)
505 .withType(NextObjective.Type.HASHED)
506 .fromApp(appId);
507 ports.forEach(port -> {
508 TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(port).build();
509 nextObjBuilder.addTreatment(DefaultNextTreatment.of(treatment));
510 });
511 return nextObjBuilder;
512 }
513
514 // Custom-built function, when the device is not available we need a fallback mechanism
515 private boolean isLocalLeader(DeviceId deviceId) {
516 if (!mastershipService.isLocalMaster(deviceId)) {
517 // When the device is available we just check the mastership
518 if (deviceService.isAvailable(deviceId)) {
519 return false;
520 }
521 // Fallback with Leadership service - device id is used as topic
522 NodeId leader = leadershipService.runForLeadership(
523 deviceId.toString()).leaderNodeId();
524 // Verify if this node is the leader
525 return clusterService.getLocalNode().id().equals(leader);
526 }
527 return true;
528 }
529
530 private final class PortLoadBalancerObjectiveContext implements ObjectiveContext {
531 private final PortLoadBalancerId portLoadBalancerId;
532
533 private PortLoadBalancerObjectiveContext(PortLoadBalancerId portLoadBalancerId) {
534 this.portLoadBalancerId = portLoadBalancerId;
535 }
536
537 @Override
538 public void onSuccess(Objective objective) {
539 NextObjective nextObj = (NextObjective) objective;
540 log.debug("Successfully {} nextobj {} for port load balancer {}. NextObj={}",
541 nextObj.op(), nextObj.id(), portLoadBalancerId, nextObj);
542 portLoadBalancerProvExecutor.execute(() -> onSuccessHandler(nextObj, portLoadBalancerId));
543 }
544
545 @Override
546 public void onError(Objective objective, ObjectiveError error) {
547 NextObjective nextObj = (NextObjective) objective;
548 log.warn("Failed to {} nextobj {} for port load balancer {} due to {}. NextObj={}",
549 nextObj.op(), nextObj.id(), portLoadBalancerId, error, nextObj);
550 portLoadBalancerProvExecutor.execute(() -> onErrorHandler(nextObj, portLoadBalancerId));
551 }
552 }
553
554 private void onSuccessHandler(NextObjective nextObjective, PortLoadBalancerId portLoadBalancerId) {
555 // Operation done
556 PortLoadBalancerData oldPortLoadBalancerData = new PortLoadBalancerData(portLoadBalancerId);
557 PortLoadBalancerData newPortLoadBalancerData = new PortLoadBalancerData(portLoadBalancerId);
558 // Other operations will not lead to a generation of an event
559 switch (nextObjective.op()) {
560 case ADD:
561 newPortLoadBalancerData.setNextId(nextObjective.id());
562 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.INSTALLED, newPortLoadBalancerData,
563 oldPortLoadBalancerData));
564 break;
565 case REMOVE:
566 oldPortLoadBalancerData.setNextId(nextObjective.id());
567 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.UNINSTALLED, newPortLoadBalancerData,
568 oldPortLoadBalancerData));
569 break;
570 default:
571 break;
572 }
573 }
574
575 private void onErrorHandler(NextObjective nextObjective, PortLoadBalancerId portLoadBalancerId) {
576 // There was a failure
577 PortLoadBalancerData portLoadBalancerData = new PortLoadBalancerData(portLoadBalancerId);
578 // send FAILED event;
579 switch (nextObjective.op()) {
580 case ADD:
581 // If ADD is failing apps do not know the next id; let's update the store
582 portLoadBalancerNextStore.remove(portLoadBalancerId);
583 portLoadBalancerResStore.remove(portLoadBalancerId);
584 portLoadBalancerStore.remove(portLoadBalancerId);
585 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.FAILED, portLoadBalancerData,
586 portLoadBalancerData));
587 break;
588 case ADD_TO_EXISTING:
589 // If ADD_TO_EXISTING is failing let's remove the failed ports
590 Collection<PortNumber> addedPorts = nextObjective.next().stream()
591 .flatMap(t -> t.allInstructions().stream())
592 .filter(i -> i.type() == Instruction.Type.OUTPUT)
593 .map(i -> ((Instructions.OutputInstruction) i).port())
594 .collect(Collectors.toList());
595 portLoadBalancerStore.compute(portLoadBalancerId, (key, value) -> {
596 if (value != null && value.ports() != null && !value.ports().isEmpty()) {
597 value.ports().removeAll(addedPorts);
598 }
599 return value;
600 });
601 portLoadBalancerData.setNextId(nextObjective.id());
602 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.FAILED, portLoadBalancerData,
603 portLoadBalancerData));
604 break;
605 case REMOVE_FROM_EXISTING:
606 // If REMOVE_TO_EXISTING is failing let's re-add the failed ports
607 Collection<PortNumber> removedPorts = nextObjective.next().stream()
608 .flatMap(t -> t.allInstructions().stream())
609 .filter(i -> i.type() == Instruction.Type.OUTPUT)
610 .map(i -> ((Instructions.OutputInstruction) i).port())
611 .collect(Collectors.toList());
612 portLoadBalancerStore.compute(portLoadBalancerId, (key, value) -> {
613 if (value != null && value.ports() != null) {
614 value.ports().addAll(removedPorts);
615 }
616 return value;
617 });
618 portLoadBalancerData.setNextId(nextObjective.id());
619 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.FAILED, portLoadBalancerData,
620 portLoadBalancerData));
621 break;
622 case VERIFY:
623 case REMOVE:
624 // If ADD/REMOVE_TO_EXISTING, REMOVE and VERIFY are failing let's send
625 // also the info about the next id
626 portLoadBalancerData.setNextId(nextObjective.id());
627 post(new PortLoadBalancerEvent(PortLoadBalancerEvent.Type.FAILED, portLoadBalancerData,
628 portLoadBalancerData));
629 break;
630 default:
631 break;
632 }
633
634 }
635}