blob: 8de2b7d4c645ce830f7c5c5b720b22deb3d1e917 [file] [log] [blame]
Charles Chan7f987c52018-07-31 18:22:46 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.l2lb.app;
18
Charles Chan7f987c52018-07-31 18:22:46 -070019import com.google.common.collect.Sets;
20import org.onlab.util.KryoNamespace;
pierddc59d92018-11-20 15:06:43 +010021import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.LeadershipService;
23import org.onosproject.cluster.NodeId;
Charles Chan7f987c52018-07-31 18:22:46 -070024import org.onosproject.core.ApplicationId;
25import org.onosproject.core.CoreService;
26import org.onosproject.l2lb.api.L2Lb;
pierddc59d92018-11-20 15:06:43 +010027import org.onosproject.l2lb.api.L2LbData;
Charles Chan7f987c52018-07-31 18:22:46 -070028import org.onosproject.l2lb.api.L2LbEvent;
29import org.onosproject.l2lb.api.L2LbAdminService;
30import org.onosproject.l2lb.api.L2LbId;
31import org.onosproject.l2lb.api.L2LbListener;
32import org.onosproject.l2lb.api.L2LbMode;
33import org.onosproject.l2lb.api.L2LbService;
34import org.onosproject.mastership.MastershipService;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.PortNumber;
pierddc59d92018-11-20 15:06:43 +010037import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.device.DeviceListener;
Charles Chan7f987c52018-07-31 18:22:46 -070039import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.flow.DefaultTrafficSelector;
41import org.onosproject.net.flow.DefaultTrafficTreatment;
42import org.onosproject.net.flow.TrafficSelector;
43import org.onosproject.net.flow.TrafficTreatment;
pier0023ca92018-11-29 10:32:40 -080044import org.onosproject.net.flow.instructions.Instruction;
45import org.onosproject.net.flow.instructions.Instructions;
Charles Chan7f987c52018-07-31 18:22:46 -070046import org.onosproject.net.flowobjective.DefaultNextObjective;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.NextObjective;
49import org.onosproject.net.flowobjective.Objective;
50import org.onosproject.net.flowobjective.ObjectiveContext;
51import org.onosproject.net.flowobjective.ObjectiveError;
52import org.onosproject.net.intf.InterfaceService;
53import org.onosproject.net.packet.PacketService;
54import org.onosproject.store.serializers.KryoNamespaces;
55import org.onosproject.store.service.ConsistentMap;
56import org.onosproject.store.service.MapEvent;
57import org.onosproject.store.service.MapEventListener;
58import org.onosproject.store.service.Serializer;
59import org.onosproject.store.service.StorageService;
60import org.onosproject.store.service.Versioned;
61import org.osgi.service.component.annotations.Activate;
62import org.osgi.service.component.annotations.Component;
63import org.osgi.service.component.annotations.Deactivate;
64import org.osgi.service.component.annotations.Reference;
65import org.osgi.service.component.annotations.ReferenceCardinality;
66import org.slf4j.Logger;
67
pier0023ca92018-11-29 10:32:40 -080068import java.util.Collection;
Charles Chan7f987c52018-07-31 18:22:46 -070069import java.util.Map;
70import java.util.Set;
71import java.util.concurrent.ExecutorService;
72import java.util.concurrent.Executors;
pier0023ca92018-11-29 10:32:40 -080073import java.util.stream.Collectors;
Charles Chan7f987c52018-07-31 18:22:46 -070074
75import static org.onlab.util.Tools.groupedThreads;
76import static org.slf4j.LoggerFactory.getLogger;
77
78@Component(
79 immediate = true,
80 service = {
81 L2LbService.class,
82 L2LbAdminService.class
83 }
84)
85public class L2LbManager implements L2LbService, L2LbAdminService {
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 private CoreService coreService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 private PacketService packetService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 private InterfaceService interfaceService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 private StorageService storageService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 private FlowObjectiveService flowObjService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 private MastershipService mastershipService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierddc59d92018-11-20 15:06:43 +0100106 private LeadershipService leadershipService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 private ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan7f987c52018-07-31 18:22:46 -0700112 private DeviceService deviceService;
113
114 private static final Logger log = getLogger(L2LbManager.class);
115 private static final String APP_NAME = "org.onosproject.l2lb";
116
117 private ApplicationId appId;
118 private ConsistentMap<L2LbId, L2Lb> l2LbStore;
119 private ConsistentMap<L2LbId, Integer> l2LbNextStore;
pierddc59d92018-11-20 15:06:43 +0100120 // TODO Evaluate if ResourceService is a better option
121 private ConsistentMap<L2LbId, ApplicationId> l2LbResStore;
Charles Chan7f987c52018-07-31 18:22:46 -0700122 private Set<L2LbListener> listeners = Sets.newConcurrentHashSet();
123
124 private ExecutorService l2LbEventExecutor;
125 private ExecutorService l2LbProvExecutor;
pierddc59d92018-11-20 15:06:43 +0100126 private ExecutorService deviceEventExecutor;
127
Charles Chan7f987c52018-07-31 18:22:46 -0700128 private MapEventListener<L2LbId, L2Lb> l2LbStoreListener;
129 // TODO build CLI to view and clear the next store
130 private MapEventListener<L2LbId, Integer> l2LbNextStoreListener;
pierddc59d92018-11-20 15:06:43 +0100131 private MapEventListener<L2LbId, ApplicationId> l2LbResStoreListener;
132 private final DeviceListener deviceListener = new InternalDeviceListener();
Charles Chan7f987c52018-07-31 18:22:46 -0700133
134 @Activate
135 public void activate() {
136 appId = coreService.registerApplication(APP_NAME);
137
pierddc59d92018-11-20 15:06:43 +0100138 l2LbEventExecutor = Executors.newSingleThreadExecutor(
139 groupedThreads("l2lb-event", "%d", log));
140 l2LbProvExecutor = Executors.newSingleThreadExecutor(
141 groupedThreads("l2lb-prov", "%d", log));
142 deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
143 groupedThreads("l2lb-dev-event", "%d", log));
Charles Chan7f987c52018-07-31 18:22:46 -0700144 l2LbStoreListener = new L2LbStoreListener();
145 l2LbNextStoreListener = new L2LbNextStoreListener();
pierddc59d92018-11-20 15:06:43 +0100146 l2LbResStoreListener = new L2LbResStoreListener();
Charles Chan7f987c52018-07-31 18:22:46 -0700147
148 KryoNamespace serializer = KryoNamespace.newBuilder()
149 .register(KryoNamespaces.API)
150 .register(L2Lb.class)
151 .register(L2LbId.class)
152 .register(L2LbMode.class)
153 .build();
154 l2LbStore = storageService.<L2LbId, L2Lb>consistentMapBuilder()
155 .withName("onos-l2lb-store")
156 .withRelaxedReadConsistency()
157 .withSerializer(Serializer.using(serializer))
158 .build();
159 l2LbStore.addListener(l2LbStoreListener);
160 l2LbNextStore = storageService.<L2LbId, Integer>consistentMapBuilder()
161 .withName("onos-l2lb-next-store")
162 .withRelaxedReadConsistency()
163 .withSerializer(Serializer.using(serializer))
164 .build();
165 l2LbNextStore.addListener(l2LbNextStoreListener);
pierddc59d92018-11-20 15:06:43 +0100166 l2LbResStore = storageService.<L2LbId, ApplicationId>consistentMapBuilder()
167 .withName("onos-l2lb-res-store")
168 .withRelaxedReadConsistency()
169 .withSerializer(Serializer.using(serializer))
170 .build();
171 l2LbResStore.addListener(l2LbResStoreListener);
172
173 deviceService.addListener(deviceListener);
Charles Chan7f987c52018-07-31 18:22:46 -0700174
175 log.info("Started");
176 }
177
178 @Deactivate
179 public void deactivate() {
180 l2LbStore.removeListener(l2LbStoreListener);
181 l2LbNextStore.removeListener(l2LbNextStoreListener);
182
183 l2LbEventExecutor.shutdown();
pierddc59d92018-11-20 15:06:43 +0100184 l2LbProvExecutor.shutdown();
185 deviceEventExecutor.shutdown();
Charles Chan7f987c52018-07-31 18:22:46 -0700186
187 log.info("Stopped");
188 }
189
190 @Override
191 public void addListener(L2LbListener listener) {
192 listeners.add(listener);
193 }
194
195 @Override
196 public void removeListener(L2LbListener listener) {
197 listeners.remove(listener);
198 }
199
200 @Override
201 public L2Lb createOrUpdate(DeviceId deviceId, int key, Set<PortNumber> ports, L2LbMode mode) {
202 L2LbId l2LbId = new L2LbId(deviceId, key);
203 log.debug("Putting {} -> {} {} into L2 load balancer store", l2LbId, mode, ports);
204 return Versioned.valueOrNull(l2LbStore.put(l2LbId, new L2Lb(l2LbId, ports, mode)));
205 }
206
207 @Override
208 public L2Lb remove(DeviceId deviceId, int key) {
209 L2LbId l2LbId = new L2LbId(deviceId, key);
pierddc59d92018-11-20 15:06:43 +0100210 ApplicationId reservation = Versioned.valueOrNull(l2LbResStore.get(l2LbId));
211 // Remove only if it is not used - otherwise it is necessary to release first
212 if (reservation == null) {
213 log.debug("Removing {} from L2 load balancer store", l2LbId);
214 return Versioned.valueOrNull(l2LbStore.remove(l2LbId));
215 }
216 log.warn("Removal {} from L2 load balancer store was not possible " +
217 "due to a previous reservation", l2LbId);
218 return null;
Charles Chan7f987c52018-07-31 18:22:46 -0700219 }
220
221 @Override
222 public Map<L2LbId, L2Lb> getL2Lbs() {
223 return l2LbStore.asJavaMap();
224 }
225
226 @Override
227 public L2Lb getL2Lb(DeviceId deviceId, int key) {
228 return Versioned.valueOrNull(l2LbStore.get(new L2LbId(deviceId, key)));
229 }
230
231 @Override
232 public Map<L2LbId, Integer> getL2LbNexts() {
233 return l2LbNextStore.asJavaMap();
234 }
235
236 @Override
piercc6ca772018-11-24 11:16:28 -0800237 public int getL2LbNext(DeviceId deviceId, int key) {
Charles Chan7f987c52018-07-31 18:22:46 -0700238 return Versioned.valueOrNull(l2LbNextStore.get(new L2LbId(deviceId, key)));
239 }
240
pierddc59d92018-11-20 15:06:43 +0100241 @Override
242 public boolean reserve(L2LbId l2LbId, ApplicationId appId) {
243 // Check if the resource is available
244 ApplicationId reservation = Versioned.valueOrNull(l2LbResStore.get(l2LbId));
245 L2Lb l2Lb = Versioned.valueOrNull(l2LbStore.get(l2LbId));
246 if (reservation == null && l2Lb != null) {
247 log.debug("Reserving {} -> {} into L2 load balancer reservation store", l2LbId, appId);
248 return l2LbResStore.put(l2LbId, appId) == null;
249 } else if (reservation != null && reservation.equals(appId)) {
250 // App try to reserve the resource a second time
251 log.debug("Already reserved {} -> {} skip reservation", l2LbId, appId);
252 return true;
253 }
254 log.warn("Reservation failed {} -> {}", l2LbId, appId);
255 return false;
256 }
257
258 @Override
259 public boolean release(L2LbId l2LbId, ApplicationId appId) {
260 // Check if the resource is reserved
261 ApplicationId reservation = Versioned.valueOrNull(l2LbResStore.get(l2LbId));
262 if (reservation != null && reservation.equals(appId)) {
263 log.debug("Removing {} -> {} from L2 load balancer reservation store", l2LbId, appId);
264 return l2LbResStore.remove(l2LbId) != null;
265 }
266 log.warn("Release failed {} -> {}", l2LbId, appId);
267 return false;
268 }
269
270 @Override
271 public ApplicationId getReservation(L2LbId l2LbId) {
272 return Versioned.valueOrNull(l2LbResStore.get(l2LbId));
273 }
274
275 @Override
276 public Map<L2LbId, ApplicationId> getReservations() {
277 return l2LbResStore.asJavaMap();
278 }
279
Charles Chan7f987c52018-07-31 18:22:46 -0700280 private class L2LbStoreListener implements MapEventListener<L2LbId, L2Lb> {
281 public void event(MapEvent<L2LbId, L2Lb> event) {
282 switch (event.type()) {
283 case INSERT:
284 log.debug("L2Lb {} insert new={}, old={}", event.key(), event.newValue(), event.oldValue());
pierddc59d92018-11-20 15:06:43 +0100285 post(new L2LbEvent(L2LbEvent.Type.ADDED, event.newValue().value().data(), null));
Charles Chan7f987c52018-07-31 18:22:46 -0700286 populateL2Lb(event.newValue().value());
287 break;
288 case REMOVE:
289 log.debug("L2Lb {} remove new={}, old={}", event.key(), event.newValue(), event.oldValue());
pierddc59d92018-11-20 15:06:43 +0100290 post(new L2LbEvent(L2LbEvent.Type.REMOVED, null, event.oldValue().value().data()));
Charles Chan7f987c52018-07-31 18:22:46 -0700291 revokeL2Lb(event.oldValue().value());
292 break;
293 case UPDATE:
294 log.debug("L2Lb {} update new={}, old={}", event.key(), event.newValue(), event.oldValue());
pierddc59d92018-11-20 15:06:43 +0100295 post(new L2LbEvent(L2LbEvent.Type.UPDATED, event.newValue().value().data(),
296 event.oldValue().value().data()));
Charles Chan7f987c52018-07-31 18:22:46 -0700297 updateL2Lb(event.newValue().value(), event.oldValue().value());
298 break;
299 default:
300 break;
301 }
302 }
303 }
304
305 private class L2LbNextStoreListener implements MapEventListener<L2LbId, Integer> {
306 public void event(MapEvent<L2LbId, Integer> event) {
307 switch (event.type()) {
308 case INSERT:
309 log.debug("L2Lb next {} insert new={}, old={}", event.key(), event.newValue(), event.oldValue());
310 break;
311 case REMOVE:
312 log.debug("L2Lb next {} remove new={}, old={}", event.key(), event.newValue(), event.oldValue());
313 break;
314 case UPDATE:
315 log.debug("L2Lb next {} update new={}, old={}", event.key(), event.newValue(), event.oldValue());
316 break;
317 default:
318 break;
319 }
320 }
321 }
322
pierddc59d92018-11-20 15:06:43 +0100323 private class L2LbResStoreListener implements MapEventListener<L2LbId, ApplicationId> {
324 public void event(MapEvent<L2LbId, ApplicationId> event) {
325 switch (event.type()) {
326 case INSERT:
327 log.debug("L2Lb reservation {} insert new={}, old={}", event.key(), event.newValue(),
328 event.oldValue());
329 break;
330 case REMOVE:
331 log.debug("L2Lb reservation {} remove new={}, old={}", event.key(), event.newValue(),
332 event.oldValue());
333 break;
334 case UPDATE:
335 log.debug("L2Lb reservation {} update new={}, old={}", event.key(), event.newValue(),
336 event.oldValue());
337 break;
338 default:
339 break;
340 }
341 }
342 }
343
344 private class InternalDeviceListener implements DeviceListener {
345 // We want to manage only a subset of events and if we are the leader
346 @Override
347 public void event(DeviceEvent event) {
348 deviceEventExecutor.execute(() -> {
349 DeviceId deviceId = event.subject().id();
350 if (!isLocalLeader(deviceId)) {
351 log.debug("Not the leader of {}. Skip event {}", deviceId, event);
352 return;
353 }
354 // Populate or revoke according to the device availability
355 if (deviceService.isAvailable(deviceId)) {
356 init(deviceId);
357 } else {
358 cleanup(deviceId);
359 }
360 });
361 }
362 // Some events related to the devices are skipped
363 @Override
364 public boolean isRelevant(DeviceEvent event) {
365 return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
366 event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
367 event.type() == DeviceEvent.Type.DEVICE_UPDATED;
368 }
369 }
370
Charles Chan7f987c52018-07-31 18:22:46 -0700371 private void post(L2LbEvent l2LbEvent) {
372 l2LbEventExecutor.execute(() -> {
373 for (L2LbListener l : listeners) {
pierddc59d92018-11-20 15:06:43 +0100374 if (l.isRelevant(l2LbEvent)) {
375 l.event(l2LbEvent);
376 }
Charles Chan7f987c52018-07-31 18:22:46 -0700377 }
378 });
379 }
380
pierddc59d92018-11-20 15:06:43 +0100381 private void init(DeviceId deviceId) {
382 l2LbStore.entrySet().stream()
383 .filter(l2lbentry -> l2lbentry.getKey().deviceId().equals(deviceId))
384 .forEach(l2lbentry -> populateL2Lb(l2lbentry.getValue().value()));
385 }
386
387 private void cleanup(DeviceId deviceId) {
388 l2LbStore.entrySet().stream()
389 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
390 .forEach(entry -> l2LbNextStore.remove(entry.getKey()));
391 log.debug("{} is removed from l2LbNextObjStore", deviceId);
392 }
393
Charles Chan7f987c52018-07-31 18:22:46 -0700394 private void populateL2Lb(L2Lb l2Lb) {
395 DeviceId deviceId = l2Lb.l2LbId().deviceId();
pierddc59d92018-11-20 15:06:43 +0100396 if (!isLocalLeader(deviceId)) {
397 log.debug("Not the leader of {}. Skip populateL2Lb {}", deviceId, l2Lb.l2LbId());
Charles Chan7f987c52018-07-31 18:22:46 -0700398 return;
399 }
400
401 l2LbProvExecutor.execute(() -> {
pierddc59d92018-11-20 15:06:43 +0100402 Integer nextid = Versioned.valueOrNull(l2LbNextStore.get(l2Lb.l2LbId()));
403 if (nextid == null) {
404 // Build a new context and new next objective
405 L2LbObjectiveContext context = new L2LbObjectiveContext(l2Lb.l2LbId());
406 NextObjective nextObj = nextObjBuilder(l2Lb.l2LbId(), l2Lb.ports(), nextid).add(context);
407 // Finally submit, store, and register the resource
408 flowObjService.next(deviceId, nextObj);
409 l2LbNextStore.put(l2Lb.l2LbId(), nextObj.id());
410 } else {
411 log.info("NextObj for {} already exists. Skip populateL2Lb", l2Lb.l2LbId());
412 }
Charles Chan7f987c52018-07-31 18:22:46 -0700413 });
414 }
415
416 private void revokeL2Lb(L2Lb l2Lb) {
417 DeviceId deviceId = l2Lb.l2LbId().deviceId();
pierddc59d92018-11-20 15:06:43 +0100418 if (!isLocalLeader(deviceId)) {
419 log.debug("Not the leader of {}. Skip revokeL2Lb {}", deviceId, l2Lb.l2LbId());
Charles Chan7f987c52018-07-31 18:22:46 -0700420 return;
421 }
422
423 l2LbProvExecutor.execute(() -> {
pierddc59d92018-11-20 15:06:43 +0100424 Integer nextid = Versioned.valueOrNull(l2LbNextStore.get(l2Lb.l2LbId()));
425 if (nextid != null) {
426 // Build a new context and remove old next objective
427 L2LbObjectiveContext context = new L2LbObjectiveContext(l2Lb.l2LbId());
428 NextObjective nextObj = nextObjBuilder(l2Lb.l2LbId(), l2Lb.ports(), nextid).remove(context);
429 // Finally submit and invalidate the store
430 flowObjService.next(deviceId, nextObj);
431 l2LbNextStore.remove(l2Lb.l2LbId());
432 } else {
433 log.info("NextObj for {} does not exist. Skip revokeL2Lb", l2Lb.l2LbId());
434 }
Charles Chan7f987c52018-07-31 18:22:46 -0700435 });
436 }
437
438 private void updateL2Lb(L2Lb newL2Lb, L2Lb oldL2Lb) {
439 DeviceId deviceId = newL2Lb.l2LbId().deviceId();
pierddc59d92018-11-20 15:06:43 +0100440 if (!isLocalLeader(deviceId)) {
441 log.debug("Not the leader of {}. Skip updateL2Lb {}", deviceId, newL2Lb.l2LbId());
Charles Chan7f987c52018-07-31 18:22:46 -0700442 return;
443 }
444
445 l2LbProvExecutor.execute(() -> {
pierddc59d92018-11-20 15:06:43 +0100446 Integer nextid = Versioned.valueOrNull(l2LbNextStore.get(newL2Lb.l2LbId()));
447 if (nextid != null) {
448 // Compute modifications and context
449 L2LbObjectiveContext context = new L2LbObjectiveContext(newL2Lb.l2LbId());
450 Set<PortNumber> portsToBeAdded = Sets.difference(newL2Lb.ports(), oldL2Lb.ports());
451 Set<PortNumber> portsToBeRemoved = Sets.difference(oldL2Lb.ports(), newL2Lb.ports());
452 // and send them to the flowobj subsystem
453 if (!portsToBeAdded.isEmpty()) {
454 flowObjService.next(deviceId, nextObjBuilder(newL2Lb.l2LbId(), portsToBeAdded, nextid)
455 .addToExisting(context));
456 } else {
457 log.debug("NextObj for {} nothing to add", newL2Lb.l2LbId());
Charles Chan7f987c52018-07-31 18:22:46 -0700458
pierddc59d92018-11-20 15:06:43 +0100459 }
460 if (!portsToBeRemoved.isEmpty()) {
461 flowObjService.next(deviceId, nextObjBuilder(newL2Lb.l2LbId(), portsToBeRemoved, nextid)
462 .removeFromExisting(context));
463 } else {
464 log.debug("NextObj for {} nothing to remove", newL2Lb.l2LbId());
465 }
466 } else {
467 log.info("NextObj for {} does not exist. Skip updateL2Lb", newL2Lb.l2LbId());
468 }
Charles Chan7f987c52018-07-31 18:22:46 -0700469 });
470 }
471
pierddc59d92018-11-20 15:06:43 +0100472 private NextObjective.Builder nextObjBuilder(L2LbId l2LbId, Set<PortNumber> ports, Integer nextId) {
473 if (nextId == null) {
474 nextId = flowObjService.allocateNextId();
475 }
Charles Chan7f987c52018-07-31 18:22:46 -0700476 // TODO replace logical l2lb port
477 TrafficSelector meta = DefaultTrafficSelector.builder()
478 .matchInPort(PortNumber.portNumber(l2LbId.key())).build();
479 NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
480 .withId(nextId)
481 .withMeta(meta)
482 .withType(NextObjective.Type.HASHED)
483 .fromApp(appId);
484 ports.forEach(port -> {
485 TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(port).build();
486 nextObjBuilder.addTreatment(treatment);
487 });
488 return nextObjBuilder;
489 }
490
pierddc59d92018-11-20 15:06:43 +0100491 // Custom-built function, when the device is not available we need a fallback mechanism
492 private boolean isLocalLeader(DeviceId deviceId) {
493 if (!mastershipService.isLocalMaster(deviceId)) {
494 // When the device is available we just check the mastership
495 if (deviceService.isAvailable(deviceId)) {
496 return false;
497 }
498 // Fallback with Leadership service - device id is used as topic
499 NodeId leader = leadershipService.runForLeadership(
500 deviceId.toString()).leaderNodeId();
501 // Verify if this node is the leader
502 return clusterService.getLocalNode().id().equals(leader);
503 }
504 return true;
505 }
506
Charles Chan7f987c52018-07-31 18:22:46 -0700507 private final class L2LbObjectiveContext implements ObjectiveContext {
508 private final L2LbId l2LbId;
509
510 private L2LbObjectiveContext(L2LbId l2LbId) {
511 this.l2LbId = l2LbId;
512 }
513
514 @Override
515 public void onSuccess(Objective objective) {
516 NextObjective nextObj = (NextObjective) objective;
pierddc59d92018-11-20 15:06:43 +0100517 log.debug("Success {} nextobj {} for L2 load balancer {}", nextObj.op(), nextObj, l2LbId);
pier0023ca92018-11-29 10:32:40 -0800518 l2LbProvExecutor.execute(() -> onSuccessHandler(nextObj, l2LbId));
Charles Chan7f987c52018-07-31 18:22:46 -0700519 }
520
521 @Override
522 public void onError(Objective objective, ObjectiveError error) {
523 NextObjective nextObj = (NextObjective) objective;
pierddc59d92018-11-20 15:06:43 +0100524 log.debug("Failed {} nextobj {} for L2 load balancer {} due to {}", nextObj.op(), nextObj,
525 l2LbId, error);
pier0023ca92018-11-29 10:32:40 -0800526 l2LbProvExecutor.execute(() -> onErrorHandler(nextObj, l2LbId));
527 }
528 }
529
530 private void onSuccessHandler(NextObjective nextObjective, L2LbId l2LbId) {
531 // Operation done
532 L2LbData oldl2LbData = new L2LbData(l2LbId);
533 L2LbData newl2LbData = new L2LbData(l2LbId);
534 // Other operations will not lead to a generation of an event
535 switch (nextObjective.op()) {
536 case ADD:
537 newl2LbData.setNextId(nextObjective.id());
538 post(new L2LbEvent(L2LbEvent.Type.INSTALLED, newl2LbData, oldl2LbData));
539 break;
540 case REMOVE:
541 oldl2LbData.setNextId(nextObjective.id());
542 post(new L2LbEvent(L2LbEvent.Type.UNINSTALLED, newl2LbData, oldl2LbData));
543 break;
544 default:
545 break;
546 }
547 }
548
549 private void onErrorHandler(NextObjective nextObjective, L2LbId l2LbId) {
550 // There was a failure
551 L2LbData l2LbData = new L2LbData(l2LbId);
552 // send FAILED event;
553 switch (nextObjective.op()) {
554 case ADD:
555 // If ADD is failing apps do not know the next id; let's update the store
556 l2LbNextStore.remove(l2LbId);
557 l2LbResStore.remove(l2LbId);
558 l2LbStore.remove(l2LbId);
559 post(new L2LbEvent(L2LbEvent.Type.FAILED, l2LbData, l2LbData));
560 break;
561 case ADD_TO_EXISTING:
562 // If ADD_TO_EXISTING is failing let's remove the failed ports
563 Collection<PortNumber> addedPorts = nextObjective.next().stream()
564 .flatMap(t -> t.allInstructions().stream())
565 .filter(i -> i.type() == Instruction.Type.OUTPUT)
566 .map(i -> ((Instructions.OutputInstruction) i).port())
567 .collect(Collectors.toList());
568 l2LbStore.compute(l2LbId, (key, value) -> {
569 if (value != null && value.ports() != null && !value.ports().isEmpty()) {
570 value.ports().removeAll(addedPorts);
571 }
572 return value;
573 });
574 l2LbData.setNextId(nextObjective.id());
575 post(new L2LbEvent(L2LbEvent.Type.FAILED, l2LbData, l2LbData));
576 break;
577 case REMOVE_FROM_EXISTING:
578 // If REMOVE_TO_EXISTING is failing let's re-add the failed ports
579 Collection<PortNumber> removedPorts = nextObjective.next().stream()
580 .flatMap(t -> t.allInstructions().stream())
581 .filter(i -> i.type() == Instruction.Type.OUTPUT)
582 .map(i -> ((Instructions.OutputInstruction) i).port())
583 .collect(Collectors.toList());
584 l2LbStore.compute(l2LbId, (key, value) -> {
585 if (value != null && value.ports() != null) {
586 value.ports().addAll(removedPorts);
587 }
588 return value;
589 });
590 l2LbData.setNextId(nextObjective.id());
591 post(new L2LbEvent(L2LbEvent.Type.FAILED, l2LbData, l2LbData));
592 break;
593 case VERIFY:
594 case REMOVE:
595 // If ADD/REMOVE_TO_EXISTING, REMOVE and VERIFY are failing let's send
596 // also the info about the next id
597 l2LbData.setNextId(nextObjective.id());
598 post(new L2LbEvent(L2LbEvent.Type.FAILED, l2LbData, l2LbData));
599 break;
600 default:
601 break;
Charles Chan7f987c52018-07-31 18:22:46 -0700602 }
603
604 }
605}