blob: 1f5896095740c3cce589e90f4ae7654dfcaefc72 [file] [log] [blame]
Charles Chan7f987c52018-07-31 18:22:46 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.l2lb.app;
18
19
20import com.google.common.collect.Sets;
21import org.onlab.util.KryoNamespace;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
24import org.onosproject.l2lb.api.L2Lb;
25import org.onosproject.l2lb.api.L2LbEvent;
26import org.onosproject.l2lb.api.L2LbAdminService;
27import org.onosproject.l2lb.api.L2LbId;
28import org.onosproject.l2lb.api.L2LbListener;
29import org.onosproject.l2lb.api.L2LbMode;
30import org.onosproject.l2lb.api.L2LbService;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.PortNumber;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.flow.DefaultTrafficSelector;
36import org.onosproject.net.flow.DefaultTrafficTreatment;
37import org.onosproject.net.flow.TrafficSelector;
38import org.onosproject.net.flow.TrafficTreatment;
39import org.onosproject.net.flowobjective.DefaultNextObjective;
40import org.onosproject.net.flowobjective.FlowObjectiveService;
41import org.onosproject.net.flowobjective.NextObjective;
42import org.onosproject.net.flowobjective.Objective;
43import org.onosproject.net.flowobjective.ObjectiveContext;
44import org.onosproject.net.flowobjective.ObjectiveError;
45import org.onosproject.net.intf.InterfaceService;
46import org.onosproject.net.packet.PacketService;
47import org.onosproject.store.serializers.KryoNamespaces;
48import org.onosproject.store.service.ConsistentMap;
49import org.onosproject.store.service.MapEvent;
50import org.onosproject.store.service.MapEventListener;
51import org.onosproject.store.service.Serializer;
52import org.onosproject.store.service.StorageService;
53import org.onosproject.store.service.Versioned;
54import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
59import org.slf4j.Logger;
60
61import java.util.Map;
62import java.util.Set;
63import java.util.concurrent.ExecutorService;
64import java.util.concurrent.Executors;
65
66import static org.onlab.util.Tools.groupedThreads;
67import static org.slf4j.LoggerFactory.getLogger;
68
69@Component(
70 immediate = true,
71 service = {
72 L2LbService.class,
73 L2LbAdminService.class
74 }
75)
76public class L2LbManager implements L2LbService, L2LbAdminService {
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 private CoreService coreService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 private PacketService packetService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 private InterfaceService interfaceService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 private StorageService storageService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 private FlowObjectiveService flowObjService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 private MastershipService mastershipService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 private DeviceService deviceService;
98
99 private static final Logger log = getLogger(L2LbManager.class);
100 private static final String APP_NAME = "org.onosproject.l2lb";
101
102 private ApplicationId appId;
103 private ConsistentMap<L2LbId, L2Lb> l2LbStore;
104 private ConsistentMap<L2LbId, Integer> l2LbNextStore;
105 private Set<L2LbListener> listeners = Sets.newConcurrentHashSet();
106
107 private ExecutorService l2LbEventExecutor;
108 private ExecutorService l2LbProvExecutor;
109 private MapEventListener<L2LbId, L2Lb> l2LbStoreListener;
110 // TODO build CLI to view and clear the next store
111 private MapEventListener<L2LbId, Integer> l2LbNextStoreListener;
112
113 @Activate
114 public void activate() {
115 appId = coreService.registerApplication(APP_NAME);
116
117 l2LbEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("l2lb-event", "%d", log));
118 l2LbProvExecutor = Executors.newSingleThreadExecutor(groupedThreads("l2lb-prov", "%d", log));
119 l2LbStoreListener = new L2LbStoreListener();
120 l2LbNextStoreListener = new L2LbNextStoreListener();
121
122 KryoNamespace serializer = KryoNamespace.newBuilder()
123 .register(KryoNamespaces.API)
124 .register(L2Lb.class)
125 .register(L2LbId.class)
126 .register(L2LbMode.class)
127 .build();
128 l2LbStore = storageService.<L2LbId, L2Lb>consistentMapBuilder()
129 .withName("onos-l2lb-store")
130 .withRelaxedReadConsistency()
131 .withSerializer(Serializer.using(serializer))
132 .build();
133 l2LbStore.addListener(l2LbStoreListener);
134 l2LbNextStore = storageService.<L2LbId, Integer>consistentMapBuilder()
135 .withName("onos-l2lb-next-store")
136 .withRelaxedReadConsistency()
137 .withSerializer(Serializer.using(serializer))
138 .build();
139 l2LbNextStore.addListener(l2LbNextStoreListener);
140
141 log.info("Started");
142 }
143
144 @Deactivate
145 public void deactivate() {
146 l2LbStore.removeListener(l2LbStoreListener);
147 l2LbNextStore.removeListener(l2LbNextStoreListener);
148
149 l2LbEventExecutor.shutdown();
150
151 log.info("Stopped");
152 }
153
154 @Override
155 public void addListener(L2LbListener listener) {
156 listeners.add(listener);
157 }
158
159 @Override
160 public void removeListener(L2LbListener listener) {
161 listeners.remove(listener);
162 }
163
164 @Override
165 public L2Lb createOrUpdate(DeviceId deviceId, int key, Set<PortNumber> ports, L2LbMode mode) {
166 L2LbId l2LbId = new L2LbId(deviceId, key);
167 log.debug("Putting {} -> {} {} into L2 load balancer store", l2LbId, mode, ports);
168 return Versioned.valueOrNull(l2LbStore.put(l2LbId, new L2Lb(l2LbId, ports, mode)));
169 }
170
171 @Override
172 public L2Lb remove(DeviceId deviceId, int key) {
173 L2LbId l2LbId = new L2LbId(deviceId, key);
174 log.debug("Removing {} from L2 load balancer store", l2LbId);
175 return Versioned.valueOrNull(l2LbStore.remove(l2LbId));
176 }
177
178 @Override
179 public Map<L2LbId, L2Lb> getL2Lbs() {
180 return l2LbStore.asJavaMap();
181 }
182
183 @Override
184 public L2Lb getL2Lb(DeviceId deviceId, int key) {
185 return Versioned.valueOrNull(l2LbStore.get(new L2LbId(deviceId, key)));
186 }
187
188 @Override
189 public Map<L2LbId, Integer> getL2LbNexts() {
190 return l2LbNextStore.asJavaMap();
191 }
192
193 @Override
194 public int getL2LbNexts(DeviceId deviceId, int key) {
195 return Versioned.valueOrNull(l2LbNextStore.get(new L2LbId(deviceId, key)));
196 }
197
198 private class L2LbStoreListener implements MapEventListener<L2LbId, L2Lb> {
199 public void event(MapEvent<L2LbId, L2Lb> event) {
200 switch (event.type()) {
201 case INSERT:
202 log.debug("L2Lb {} insert new={}, old={}", event.key(), event.newValue(), event.oldValue());
203 post(new L2LbEvent(L2LbEvent.Type.ADDED, event.newValue().value(), null));
204 populateL2Lb(event.newValue().value());
205 break;
206 case REMOVE:
207 log.debug("L2Lb {} remove new={}, old={}", event.key(), event.newValue(), event.oldValue());
208 post(new L2LbEvent(L2LbEvent.Type.REMOVED, null, event.oldValue().value()));
209 revokeL2Lb(event.oldValue().value());
210 break;
211 case UPDATE:
212 log.debug("L2Lb {} update new={}, old={}", event.key(), event.newValue(), event.oldValue());
213 post(new L2LbEvent(L2LbEvent.Type.UPDATED, event.newValue().value(),
214 event.oldValue().value()));
215 updateL2Lb(event.newValue().value(), event.oldValue().value());
216 break;
217 default:
218 break;
219 }
220 }
221 }
222
223 private class L2LbNextStoreListener implements MapEventListener<L2LbId, Integer> {
224 public void event(MapEvent<L2LbId, Integer> event) {
225 switch (event.type()) {
226 case INSERT:
227 log.debug("L2Lb next {} insert new={}, old={}", event.key(), event.newValue(), event.oldValue());
228 break;
229 case REMOVE:
230 log.debug("L2Lb next {} remove new={}, old={}", event.key(), event.newValue(), event.oldValue());
231 break;
232 case UPDATE:
233 log.debug("L2Lb next {} update new={}, old={}", event.key(), event.newValue(), event.oldValue());
234 break;
235 default:
236 break;
237 }
238 }
239 }
240
241 private void post(L2LbEvent l2LbEvent) {
242 l2LbEventExecutor.execute(() -> {
243 for (L2LbListener l : listeners) {
244 l.event(l2LbEvent);
245 }
246 });
247 }
248
249 // TODO repopulate when device reconnect
250 private void populateL2Lb(L2Lb l2Lb) {
251 DeviceId deviceId = l2Lb.l2LbId().deviceId();
252 if (!mastershipService.isLocalMaster(deviceId)) {
253 log.debug("Not the master of {}. Skip populateL2Lb {}", deviceId, l2Lb.l2LbId());
254 return;
255 }
256
257 l2LbProvExecutor.execute(() -> {
258 L2LbObjectiveContext context = new L2LbObjectiveContext(l2Lb.l2LbId());
259 NextObjective nextObj = nextObjBuilder(l2Lb.l2LbId(), l2Lb.ports()).add(context);
260
261 flowObjService.next(deviceId, nextObj);
262 l2LbNextStore.put(l2Lb.l2LbId(), nextObj.id());
263 });
264 }
265
266 private void revokeL2Lb(L2Lb l2Lb) {
267 DeviceId deviceId = l2Lb.l2LbId().deviceId();
268 if (!mastershipService.isLocalMaster(deviceId)) {
269 log.debug("Not the master of {}. Skip revokeL2Lb {}", deviceId, l2Lb.l2LbId());
270 return;
271 }
272
273 l2LbProvExecutor.execute(() -> {
274 l2LbNextStore.remove(l2Lb.l2LbId());
275 // NOTE group is not removed and we rely on the garbage collection mechanism
276 });
277 }
278
279 private void updateL2Lb(L2Lb newL2Lb, L2Lb oldL2Lb) {
280 DeviceId deviceId = newL2Lb.l2LbId().deviceId();
281 if (!mastershipService.isLocalMaster(deviceId)) {
282 log.debug("Not the master of {}. Skip updateL2Lb {}", deviceId, newL2Lb.l2LbId());
283 return;
284 }
285
286 l2LbProvExecutor.execute(() -> {
287 L2LbObjectiveContext context = new L2LbObjectiveContext(newL2Lb.l2LbId());
288 Set<PortNumber> portsToBeAdded = Sets.difference(newL2Lb.ports(), oldL2Lb.ports());
289 Set<PortNumber> portsToBeRemoved = Sets.difference(oldL2Lb.ports(), newL2Lb.ports());
290
291 flowObjService.next(deviceId, nextObjBuilder(newL2Lb.l2LbId(), portsToBeAdded).addToExisting(context));
292 flowObjService.next(deviceId, nextObjBuilder(newL2Lb.l2LbId(), portsToBeRemoved)
293 .removeFromExisting(context));
294 });
295 }
296
297 private NextObjective.Builder nextObjBuilder(L2LbId l2LbId, Set<PortNumber> ports) {
298 return nextObjBuilder(l2LbId, ports, flowObjService.allocateNextId());
299 }
300
301 private NextObjective.Builder nextObjBuilder(L2LbId l2LbId, Set<PortNumber> ports, int nextId) {
302 // TODO replace logical l2lb port
303 TrafficSelector meta = DefaultTrafficSelector.builder()
304 .matchInPort(PortNumber.portNumber(l2LbId.key())).build();
305 NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
306 .withId(nextId)
307 .withMeta(meta)
308 .withType(NextObjective.Type.HASHED)
309 .fromApp(appId);
310 ports.forEach(port -> {
311 TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(port).build();
312 nextObjBuilder.addTreatment(treatment);
313 });
314 return nextObjBuilder;
315 }
316
317 private final class L2LbObjectiveContext implements ObjectiveContext {
318 private final L2LbId l2LbId;
319
320 private L2LbObjectiveContext(L2LbId l2LbId) {
321 this.l2LbId = l2LbId;
322 }
323
324 @Override
325 public void onSuccess(Objective objective) {
326 NextObjective nextObj = (NextObjective) objective;
327 log.debug("Added nextobj {} for L2 load balancer {}", nextObj, l2LbId);
328 }
329
330 @Override
331 public void onError(Objective objective, ObjectiveError error) {
332 NextObjective nextObj = (NextObjective) objective;
333 log.debug("Failed to add nextobj {} for L2 load balancer {}", nextObj, l2LbId);
334 }
335
336 }
337}