Implement L2 load balancing service
Including event/listener, CLI support
Change-Id: I26f1da578a72f5b3ead413aa5155233fbf9ab2b6
diff --git a/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java b/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
new file mode 100644
index 0000000..1f58960
--- /dev/null
+++ b/apps/l2lb/src/main/java/org/onosproject/l2lb/app/L2LbManager.java
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.l2lb.app;
+
+
+import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.l2lb.api.L2Lb;
+import org.onosproject.l2lb.api.L2LbEvent;
+import org.onosproject.l2lb.api.L2LbAdminService;
+import org.onosproject.l2lb.api.L2LbId;
+import org.onosproject.l2lb.api.L2LbListener;
+import org.onosproject.l2lb.api.L2LbMode;
+import org.onosproject.l2lb.api.L2LbService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.intf.InterfaceService;
+import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(
+ immediate = true,
+ service = {
+ L2LbService.class,
+ L2LbAdminService.class
+ }
+)
+public class L2LbManager implements L2LbService, L2LbAdminService {
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private InterfaceService interfaceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private FlowObjectiveService flowObjService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private DeviceService deviceService;
+
+ private static final Logger log = getLogger(L2LbManager.class);
+ private static final String APP_NAME = "org.onosproject.l2lb";
+
+ private ApplicationId appId;
+ private ConsistentMap<L2LbId, L2Lb> l2LbStore;
+ private ConsistentMap<L2LbId, Integer> l2LbNextStore;
+ private Set<L2LbListener> listeners = Sets.newConcurrentHashSet();
+
+ private ExecutorService l2LbEventExecutor;
+ private ExecutorService l2LbProvExecutor;
+ private MapEventListener<L2LbId, L2Lb> l2LbStoreListener;
+ // TODO build CLI to view and clear the next store
+ private MapEventListener<L2LbId, Integer> l2LbNextStoreListener;
+
+ @Activate
+ public void activate() {
+ appId = coreService.registerApplication(APP_NAME);
+
+ l2LbEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("l2lb-event", "%d", log));
+ l2LbProvExecutor = Executors.newSingleThreadExecutor(groupedThreads("l2lb-prov", "%d", log));
+ l2LbStoreListener = new L2LbStoreListener();
+ l2LbNextStoreListener = new L2LbNextStoreListener();
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(L2Lb.class)
+ .register(L2LbId.class)
+ .register(L2LbMode.class)
+ .build();
+ l2LbStore = storageService.<L2LbId, L2Lb>consistentMapBuilder()
+ .withName("onos-l2lb-store")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(serializer))
+ .build();
+ l2LbStore.addListener(l2LbStoreListener);
+ l2LbNextStore = storageService.<L2LbId, Integer>consistentMapBuilder()
+ .withName("onos-l2lb-next-store")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(serializer))
+ .build();
+ l2LbNextStore.addListener(l2LbNextStoreListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ l2LbStore.removeListener(l2LbStoreListener);
+ l2LbNextStore.removeListener(l2LbNextStoreListener);
+
+ l2LbEventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void addListener(L2LbListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(L2LbListener listener) {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public L2Lb createOrUpdate(DeviceId deviceId, int key, Set<PortNumber> ports, L2LbMode mode) {
+ L2LbId l2LbId = new L2LbId(deviceId, key);
+ log.debug("Putting {} -> {} {} into L2 load balancer store", l2LbId, mode, ports);
+ return Versioned.valueOrNull(l2LbStore.put(l2LbId, new L2Lb(l2LbId, ports, mode)));
+ }
+
+ @Override
+ public L2Lb remove(DeviceId deviceId, int key) {
+ L2LbId l2LbId = new L2LbId(deviceId, key);
+ log.debug("Removing {} from L2 load balancer store", l2LbId);
+ return Versioned.valueOrNull(l2LbStore.remove(l2LbId));
+ }
+
+ @Override
+ public Map<L2LbId, L2Lb> getL2Lbs() {
+ return l2LbStore.asJavaMap();
+ }
+
+ @Override
+ public L2Lb getL2Lb(DeviceId deviceId, int key) {
+ return Versioned.valueOrNull(l2LbStore.get(new L2LbId(deviceId, key)));
+ }
+
+ @Override
+ public Map<L2LbId, Integer> getL2LbNexts() {
+ return l2LbNextStore.asJavaMap();
+ }
+
+ @Override
+ public int getL2LbNexts(DeviceId deviceId, int key) {
+ return Versioned.valueOrNull(l2LbNextStore.get(new L2LbId(deviceId, key)));
+ }
+
+ private class L2LbStoreListener implements MapEventListener<L2LbId, L2Lb> {
+ public void event(MapEvent<L2LbId, L2Lb> event) {
+ switch (event.type()) {
+ case INSERT:
+ log.debug("L2Lb {} insert new={}, old={}", event.key(), event.newValue(), event.oldValue());
+ post(new L2LbEvent(L2LbEvent.Type.ADDED, event.newValue().value(), null));
+ populateL2Lb(event.newValue().value());
+ break;
+ case REMOVE:
+ log.debug("L2Lb {} remove new={}, old={}", event.key(), event.newValue(), event.oldValue());
+ post(new L2LbEvent(L2LbEvent.Type.REMOVED, null, event.oldValue().value()));
+ revokeL2Lb(event.oldValue().value());
+ break;
+ case UPDATE:
+ log.debug("L2Lb {} update new={}, old={}", event.key(), event.newValue(), event.oldValue());
+ post(new L2LbEvent(L2LbEvent.Type.UPDATED, event.newValue().value(),
+ event.oldValue().value()));
+ updateL2Lb(event.newValue().value(), event.oldValue().value());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private class L2LbNextStoreListener implements MapEventListener<L2LbId, Integer> {
+ public void event(MapEvent<L2LbId, Integer> event) {
+ switch (event.type()) {
+ case INSERT:
+ log.debug("L2Lb next {} insert new={}, old={}", event.key(), event.newValue(), event.oldValue());
+ break;
+ case REMOVE:
+ log.debug("L2Lb next {} remove new={}, old={}", event.key(), event.newValue(), event.oldValue());
+ break;
+ case UPDATE:
+ log.debug("L2Lb next {} update new={}, old={}", event.key(), event.newValue(), event.oldValue());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void post(L2LbEvent l2LbEvent) {
+ l2LbEventExecutor.execute(() -> {
+ for (L2LbListener l : listeners) {
+ l.event(l2LbEvent);
+ }
+ });
+ }
+
+ // TODO repopulate when device reconnect
+ private void populateL2Lb(L2Lb l2Lb) {
+ DeviceId deviceId = l2Lb.l2LbId().deviceId();
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.debug("Not the master of {}. Skip populateL2Lb {}", deviceId, l2Lb.l2LbId());
+ return;
+ }
+
+ l2LbProvExecutor.execute(() -> {
+ L2LbObjectiveContext context = new L2LbObjectiveContext(l2Lb.l2LbId());
+ NextObjective nextObj = nextObjBuilder(l2Lb.l2LbId(), l2Lb.ports()).add(context);
+
+ flowObjService.next(deviceId, nextObj);
+ l2LbNextStore.put(l2Lb.l2LbId(), nextObj.id());
+ });
+ }
+
+ private void revokeL2Lb(L2Lb l2Lb) {
+ DeviceId deviceId = l2Lb.l2LbId().deviceId();
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.debug("Not the master of {}. Skip revokeL2Lb {}", deviceId, l2Lb.l2LbId());
+ return;
+ }
+
+ l2LbProvExecutor.execute(() -> {
+ l2LbNextStore.remove(l2Lb.l2LbId());
+ // NOTE group is not removed and we rely on the garbage collection mechanism
+ });
+ }
+
+ private void updateL2Lb(L2Lb newL2Lb, L2Lb oldL2Lb) {
+ DeviceId deviceId = newL2Lb.l2LbId().deviceId();
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.debug("Not the master of {}. Skip updateL2Lb {}", deviceId, newL2Lb.l2LbId());
+ return;
+ }
+
+ l2LbProvExecutor.execute(() -> {
+ L2LbObjectiveContext context = new L2LbObjectiveContext(newL2Lb.l2LbId());
+ Set<PortNumber> portsToBeAdded = Sets.difference(newL2Lb.ports(), oldL2Lb.ports());
+ Set<PortNumber> portsToBeRemoved = Sets.difference(oldL2Lb.ports(), newL2Lb.ports());
+
+ flowObjService.next(deviceId, nextObjBuilder(newL2Lb.l2LbId(), portsToBeAdded).addToExisting(context));
+ flowObjService.next(deviceId, nextObjBuilder(newL2Lb.l2LbId(), portsToBeRemoved)
+ .removeFromExisting(context));
+ });
+ }
+
+ private NextObjective.Builder nextObjBuilder(L2LbId l2LbId, Set<PortNumber> ports) {
+ return nextObjBuilder(l2LbId, ports, flowObjService.allocateNextId());
+ }
+
+ private NextObjective.Builder nextObjBuilder(L2LbId l2LbId, Set<PortNumber> ports, int nextId) {
+ // TODO replace logical l2lb port
+ TrafficSelector meta = DefaultTrafficSelector.builder()
+ .matchInPort(PortNumber.portNumber(l2LbId.key())).build();
+ NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
+ .withId(nextId)
+ .withMeta(meta)
+ .withType(NextObjective.Type.HASHED)
+ .fromApp(appId);
+ ports.forEach(port -> {
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(port).build();
+ nextObjBuilder.addTreatment(treatment);
+ });
+ return nextObjBuilder;
+ }
+
+ private final class L2LbObjectiveContext implements ObjectiveContext {
+ private final L2LbId l2LbId;
+
+ private L2LbObjectiveContext(L2LbId l2LbId) {
+ this.l2LbId = l2LbId;
+ }
+
+ @Override
+ public void onSuccess(Objective objective) {
+ NextObjective nextObj = (NextObjective) objective;
+ log.debug("Added nextobj {} for L2 load balancer {}", nextObj, l2LbId);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ NextObjective nextObj = (NextObjective) objective;
+ log.debug("Failed to add nextobj {} for L2 load balancer {}", nextObj, l2LbId);
+ }
+
+ }
+}
\ No newline at end of file