blob: 80c1c80342b1430fe158367807c7354605f50b9a [file] [log] [blame]
Charles Chan8d316332018-06-19 20:31:57 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.xconnect.impl;
17
pier6fd24fd2018-11-27 11:23:50 -080018import com.google.common.collect.ImmutableList;
pier567465b2018-11-24 11:16:28 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalNotification;
Charles Chan0b1dd7e2018-08-19 19:21:46 -070022import com.google.common.collect.ImmutableMap;
Charles Chan8d316332018-06-19 20:31:57 -070023import com.google.common.collect.ImmutableSet;
pier6fd24fd2018-11-27 11:23:50 -080024import com.google.common.collect.Lists;
Charles Chan8d316332018-06-19 20:31:57 -070025import com.google.common.collect.Sets;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053026import org.onlab.packet.Ethernet;
Charles Chan8d316332018-06-19 20:31:57 -070027import org.onlab.packet.MacAddress;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
pier6fd24fd2018-11-27 11:23:50 -080030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.LeadershipService;
32import org.onosproject.cluster.NodeId;
Charles Chan8d316332018-06-19 20:31:57 -070033import org.onosproject.codec.CodecService;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
pier567465b2018-11-24 11:16:28 -080036import org.onosproject.l2lb.api.L2LbEvent;
37import org.onosproject.l2lb.api.L2LbId;
38import org.onosproject.l2lb.api.L2LbListener;
Charles Chan48df9ad2018-10-30 18:08:59 -070039import org.onosproject.l2lb.api.L2LbService;
Charles Chan8d316332018-06-19 20:31:57 -070040import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DeviceId;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053043import org.onosproject.net.Host;
44import org.onosproject.net.HostLocation;
Charles Chan8d316332018-06-19 20:31:57 -070045import org.onosproject.net.PortNumber;
46import org.onosproject.net.config.NetworkConfigService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.net.flow.DefaultTrafficSelector;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.TrafficSelector;
53import org.onosproject.net.flow.TrafficTreatment;
54import org.onosproject.net.flow.criteria.Criteria;
55import org.onosproject.net.flowobjective.DefaultFilteringObjective;
56import org.onosproject.net.flowobjective.DefaultForwardingObjective;
57import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan48df9ad2018-10-30 18:08:59 -070058import org.onosproject.net.flowobjective.DefaultNextTreatment;
Charles Chan8d316332018-06-19 20:31:57 -070059import org.onosproject.net.flowobjective.DefaultObjectiveContext;
60import org.onosproject.net.flowobjective.FilteringObjective;
61import org.onosproject.net.flowobjective.FlowObjectiveService;
62import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan48df9ad2018-10-30 18:08:59 -070063import org.onosproject.net.flowobjective.IdNextTreatment;
Charles Chan8d316332018-06-19 20:31:57 -070064import org.onosproject.net.flowobjective.NextObjective;
Charles Chan48df9ad2018-10-30 18:08:59 -070065import org.onosproject.net.flowobjective.NextTreatment;
Charles Chan8d316332018-06-19 20:31:57 -070066import org.onosproject.net.flowobjective.Objective;
67import org.onosproject.net.flowobjective.ObjectiveContext;
68import org.onosproject.net.flowobjective.ObjectiveError;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053069import org.onosproject.net.host.HostEvent;
70import org.onosproject.net.host.HostListener;
71import org.onosproject.net.host.HostService;
72import org.onosproject.net.intf.InterfaceService;
Charles Chan8d316332018-06-19 20:31:57 -070073import org.onosproject.segmentrouting.SegmentRoutingService;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053074import org.onosproject.segmentrouting.storekey.VlanNextObjectiveStoreKey;
Charles Chan8d316332018-06-19 20:31:57 -070075import org.onosproject.segmentrouting.xconnect.api.XconnectCodec;
76import org.onosproject.segmentrouting.xconnect.api.XconnectDesc;
Charles Chan445659f2019-01-02 13:46:16 -080077import org.onosproject.segmentrouting.xconnect.api.XconnectEndpoint;
Charles Chan8d316332018-06-19 20:31:57 -070078import org.onosproject.segmentrouting.xconnect.api.XconnectKey;
Charles Chan445659f2019-01-02 13:46:16 -080079import org.onosproject.segmentrouting.xconnect.api.XconnectLoadBalancerEndpoint;
80import org.onosproject.segmentrouting.xconnect.api.XconnectPortEndpoint;
Charles Chan8d316332018-06-19 20:31:57 -070081import org.onosproject.segmentrouting.xconnect.api.XconnectService;
82import org.onosproject.store.serializers.KryoNamespaces;
83import org.onosproject.store.service.ConsistentMap;
84import org.onosproject.store.service.MapEvent;
85import org.onosproject.store.service.MapEventListener;
86import org.onosproject.store.service.Serializer;
87import org.onosproject.store.service.StorageService;
88import org.onosproject.store.service.Versioned;
Ray Milkey2bd24a92018-08-17 14:54:17 -070089import org.osgi.service.component.annotations.Activate;
90import org.osgi.service.component.annotations.Component;
91import org.osgi.service.component.annotations.Deactivate;
92import org.osgi.service.component.annotations.Reference;
93import org.osgi.service.component.annotations.ReferenceCardinality;
Charles Chan8d316332018-06-19 20:31:57 -070094import org.slf4j.Logger;
95import org.slf4j.LoggerFactory;
96
97import java.io.Serializable;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053098import java.util.Collections;
99import java.util.List;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530100import java.util.Optional;
Charles Chan8d316332018-06-19 20:31:57 -0700101import java.util.Set;
102import java.util.concurrent.CompletableFuture;
Charles Chan56542b62018-08-07 12:48:36 -0700103import java.util.concurrent.ExecutorService;
104import java.util.concurrent.Executors;
pier567465b2018-11-24 11:16:28 -0800105import java.util.concurrent.ScheduledExecutorService;
106import java.util.concurrent.TimeUnit;
Charles Chan8d316332018-06-19 20:31:57 -0700107import java.util.function.BiConsumer;
108import java.util.function.Consumer;
109import java.util.stream.Collectors;
110
pier567465b2018-11-24 11:16:28 -0800111import static java.util.concurrent.Executors.newScheduledThreadPool;
Charles Chan56542b62018-08-07 12:48:36 -0700112import static org.onlab.util.Tools.groupedThreads;
113
Ray Milkey2bd24a92018-08-17 14:54:17 -0700114@Component(immediate = true, service = XconnectService.class)
Charles Chan8d316332018-06-19 20:31:57 -0700115public class XconnectManager implements XconnectService {
Ray Milkey2bd24a92018-08-17 14:54:17 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700117 private CoreService coreService;
118
Ray Milkey2bd24a92018-08-17 14:54:17 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700120 private CodecService codecService;
121
Ray Milkey2bd24a92018-08-17 14:54:17 -0700122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700123 private StorageService storageService;
124
Ray Milkey2bd24a92018-08-17 14:54:17 -0700125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700126 public NetworkConfigService netCfgService;
127
Ray Milkey2bd24a92018-08-17 14:54:17 -0700128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700129 public DeviceService deviceService;
130
Ray Milkey2bd24a92018-08-17 14:54:17 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700132 public FlowObjectiveService flowObjectiveService;
133
Ray Milkey2bd24a92018-08-17 14:54:17 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pier6fd24fd2018-11-27 11:23:50 -0800135 private LeadershipService leadershipService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 private ClusterService clusterService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700141 public MastershipService mastershipService;
142
Ray Milkey2bd24a92018-08-17 14:54:17 -0700143 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
Charles Chan8d316332018-06-19 20:31:57 -0700144 public SegmentRoutingService srService;
145
Ray Milkey3cad4db2018-10-04 15:13:33 -0700146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530147 public InterfaceService interfaceService;
148
Ray Milkey3cad4db2018-10-04 15:13:33 -0700149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530150 HostService hostService;
151
Charles Chan48df9ad2018-10-30 18:08:59 -0700152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan445659f2019-01-02 13:46:16 -0800153 private L2LbService l2LbService;
Charles Chan48df9ad2018-10-30 18:08:59 -0700154
Charles Chan8d316332018-06-19 20:31:57 -0700155 private static final String APP_NAME = "org.onosproject.xconnect";
pier6fd24fd2018-11-27 11:23:50 -0800156 private static final String ERROR_NOT_LEADER = "Not leader controller";
Charles Chan48df9ad2018-10-30 18:08:59 -0700157 private static final String ERROR_NEXT_OBJ_BUILDER = "Unable to construct next objective builder";
158 private static final String ERROR_NEXT_ID = "Unable to get next id";
Charles Chan8d316332018-06-19 20:31:57 -0700159
160 private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
161
162 private ApplicationId appId;
Charles Chan445659f2019-01-02 13:46:16 -0800163 private ConsistentMap<XconnectKey, Set<XconnectEndpoint>> xconnectStore;
Charles Chan1fb65132018-09-21 11:29:12 -0700164 private ConsistentMap<XconnectKey, Integer> xconnectNextObjStore;
Charles Chan8d316332018-06-19 20:31:57 -0700165
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530166 private ConsistentMap<VlanNextObjectiveStoreKey, Integer> xconnectMulticastNextStore;
167 private ConsistentMap<VlanNextObjectiveStoreKey, List<PortNumber>> xconnectMulticastPortsStore;
168
Charles Chan445659f2019-01-02 13:46:16 -0800169 private final MapEventListener<XconnectKey, Set<XconnectEndpoint>> xconnectListener = new XconnectMapListener();
pier6fd24fd2018-11-27 11:23:50 -0800170 private ExecutorService xConnectExecutor;
Charles Chan8d316332018-06-19 20:31:57 -0700171
pier6fd24fd2018-11-27 11:23:50 -0800172 private final DeviceListener deviceListener = new InternalDeviceListener();
Charles Chan56542b62018-08-07 12:48:36 -0700173 private ExecutorService deviceEventExecutor;
174
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530175 private final HostListener hostListener = new InternalHostListener();
176 private ExecutorService hostEventExecutor;
177
pier567465b2018-11-24 11:16:28 -0800178 // Wait time for the cache
179 private static final int WAIT_TIME_MS = 15000;
180 //The cache is implemented as buffer for waiting the installation of L2Lb when present
181 private Cache<L2LbId, XconnectKey> l2LbCache;
182 // Executor for the cache
183 private ScheduledExecutorService l2lbExecutor;
pier567465b2018-11-24 11:16:28 -0800184 // We need to listen for some events to properly installed the xconnect with l2lb
185 private final L2LbListener l2LbListener = new InternalL2LbListener();
186
Charles Chan8d316332018-06-19 20:31:57 -0700187 @Activate
188 void activate() {
189 appId = coreService.registerApplication(APP_NAME);
190 codecService.registerCodec(XconnectDesc.class, new XconnectCodec());
191
192 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
193 .register(KryoNamespaces.API)
Charles Chanfbaad962018-07-23 12:53:16 -0700194 .register(XconnectManager.class)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530195 .register(XconnectKey.class)
Charles Chan445659f2019-01-02 13:46:16 -0800196 .register(XconnectEndpoint.class)
197 .register(XconnectPortEndpoint.class)
198 .register(XconnectLoadBalancerEndpoint.class)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530199 .register(VlanNextObjectiveStoreKey.class);
Charles Chan8d316332018-06-19 20:31:57 -0700200
Charles Chan445659f2019-01-02 13:46:16 -0800201 xconnectStore = storageService.<XconnectKey, Set<XconnectEndpoint>>consistentMapBuilder()
Charles Chan8d316332018-06-19 20:31:57 -0700202 .withName("onos-sr-xconnect")
203 .withRelaxedReadConsistency()
204 .withSerializer(Serializer.using(serializer.build()))
205 .build();
pier6fd24fd2018-11-27 11:23:50 -0800206 xConnectExecutor = Executors.newSingleThreadScheduledExecutor(
207 groupedThreads("sr-xconnect-event", "%d", log));
208 xconnectStore.addListener(xconnectListener, xConnectExecutor);
Charles Chan8d316332018-06-19 20:31:57 -0700209
Charles Chan1fb65132018-09-21 11:29:12 -0700210 xconnectNextObjStore = storageService.<XconnectKey, Integer>consistentMapBuilder()
Charles Chan8d316332018-06-19 20:31:57 -0700211 .withName("onos-sr-xconnect-next")
212 .withRelaxedReadConsistency()
213 .withSerializer(Serializer.using(serializer.build()))
214 .build();
215
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530216 xconnectMulticastNextStore = storageService.<VlanNextObjectiveStoreKey, Integer>consistentMapBuilder()
217 .withName("onos-sr-xconnect-l2multicast-next")
218 .withSerializer(Serializer.using(serializer.build()))
219 .build();
220 xconnectMulticastPortsStore = storageService.<VlanNextObjectiveStoreKey, List<PortNumber>>consistentMapBuilder()
221 .withName("onos-sr-xconnect-l2multicast-ports")
222 .withSerializer(Serializer.using(serializer.build()))
223 .build();
224
Charles Chan56542b62018-08-07 12:48:36 -0700225 deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
226 groupedThreads("sr-xconnect-device-event", "%d", log));
Charles Chan8d316332018-06-19 20:31:57 -0700227 deviceService.addListener(deviceListener);
228
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530229 hostEventExecutor = Executors.newSingleThreadExecutor(
230 groupedThreads("sr-xconnect-host-event", "%d", log));
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530231 hostService.addListener(hostListener);
232
pier567465b2018-11-24 11:16:28 -0800233 l2LbCache = CacheBuilder.newBuilder()
234 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
235 .removalListener((RemovalNotification<L2LbId, XconnectKey> notification) ->
236 log.debug("L2Lb cache removal event. l2LbId={}, xConnectKey={}",
237 notification.getKey(), notification.getValue())).build();
238 l2lbExecutor = newScheduledThreadPool(1,
239 groupedThreads("l2LbCacheWorker", "l2LbCacheWorker-%d", log));
240 // Let's schedule the cleanup of the cache
241 l2lbExecutor.scheduleAtFixedRate(l2LbCache::cleanUp, 0,
242 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
243 l2LbService.addListener(l2LbListener);
244
Charles Chan8d316332018-06-19 20:31:57 -0700245 log.info("Started");
246 }
247
248 @Deactivate
249 void deactivate() {
250 xconnectStore.removeListener(xconnectListener);
251 deviceService.removeListener(deviceListener);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530252 hostService.removeListener(hostListener);
Charles Chan8d316332018-06-19 20:31:57 -0700253 codecService.unregisterCodec(XconnectDesc.class);
254
Charles Chan56542b62018-08-07 12:48:36 -0700255 deviceEventExecutor.shutdown();
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530256 hostEventExecutor.shutdown();
pier6fd24fd2018-11-27 11:23:50 -0800257 xConnectExecutor.shutdown();
pier567465b2018-11-24 11:16:28 -0800258 l2lbExecutor.shutdown();
Charles Chan56542b62018-08-07 12:48:36 -0700259
Charles Chan8d316332018-06-19 20:31:57 -0700260 log.info("Stopped");
261 }
262
263 @Override
Charles Chan445659f2019-01-02 13:46:16 -0800264 public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<XconnectEndpoint> endpoints) {
265 log.info("Adding or updating xconnect. deviceId={}, vlanId={}, endpoints={}",
266 deviceId, vlanId, endpoints);
Charles Chan8d316332018-06-19 20:31:57 -0700267 final XconnectKey key = new XconnectKey(deviceId, vlanId);
Charles Chan445659f2019-01-02 13:46:16 -0800268 xconnectStore.put(key, endpoints);
Charles Chan8d316332018-06-19 20:31:57 -0700269 }
270
271 @Override
272 public void removeXonnect(DeviceId deviceId, VlanId vlanId) {
273 log.info("Removing xconnect. deviceId={}, vlanId={}",
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530274 deviceId, vlanId);
Charles Chan8d316332018-06-19 20:31:57 -0700275 final XconnectKey key = new XconnectKey(deviceId, vlanId);
276 xconnectStore.remove(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530277
278 // Cleanup multicasting support, if any.
Charles Chan445659f2019-01-02 13:46:16 -0800279 srService.getPairDeviceId(deviceId).ifPresent(pairDeviceId ->
280 cleanupL2MulticastRule(pairDeviceId, srService.getPairLocalPort(pairDeviceId).get(), vlanId, true)
281 );
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530282
Charles Chan8d316332018-06-19 20:31:57 -0700283 }
284
285 @Override
286 public Set<XconnectDesc> getXconnects() {
287 return xconnectStore.asJavaMap().entrySet().stream()
288 .map(e -> new XconnectDesc(e.getKey(), e.getValue()))
289 .collect(Collectors.toSet());
290 }
291
292 @Override
293 public boolean hasXconnect(ConnectPoint cp) {
Charles Chan445659f2019-01-02 13:46:16 -0800294 return getXconnects().stream().anyMatch(desc ->
295 desc.key().deviceId().equals(cp.deviceId()) && desc.endpoints().stream().anyMatch(ep ->
296 ep.type() == XconnectEndpoint.Type.PORT && ((XconnectPortEndpoint) ep).port().equals(cp.port())
297 )
Charles Chan8d316332018-06-19 20:31:57 -0700298 );
299 }
300
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700301 @Override
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530302 public List<VlanId> getXconnectVlans(DeviceId deviceId, PortNumber port) {
303 return getXconnects().stream()
Charles Chan445659f2019-01-02 13:46:16 -0800304 .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.endpoints().stream().anyMatch(ep ->
305 ep.type() == XconnectEndpoint.Type.PORT && ((XconnectPortEndpoint) ep).port().equals(port)))
306 .map(XconnectDesc::key)
307 .map(XconnectKey::vlanId)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530308 .collect(Collectors.toList());
309 }
310
311 @Override
312 public boolean isXconnectVlan(DeviceId deviceId, VlanId vlanId) {
pier6fd24fd2018-11-27 11:23:50 -0800313 XconnectKey key = new XconnectKey(deviceId, vlanId);
314 return Versioned.valueOrNull(xconnectStore.get(key)) != null;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530315 }
316
317 @Override
Charles Chan1fb65132018-09-21 11:29:12 -0700318 public ImmutableMap<XconnectKey, Integer> getNext() {
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700319 if (xconnectNextObjStore != null) {
320 return ImmutableMap.copyOf(xconnectNextObjStore.asJavaMap());
321 } else {
322 return ImmutableMap.of();
323 }
324 }
325
326 @Override
pier6fd24fd2018-11-27 11:23:50 -0800327 public int getNextId(DeviceId deviceId, VlanId vlanId) {
328 return Versioned.valueOrElse(xconnectNextObjStore.get(new XconnectKey(deviceId, vlanId)), -1);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530329 }
330
331 @Override
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700332 public void removeNextId(int nextId) {
333 xconnectNextObjStore.entrySet().forEach(e -> {
Charles Chan1fb65132018-09-21 11:29:12 -0700334 if (e.getValue().value() == nextId) {
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700335 xconnectNextObjStore.remove(e.getKey());
336 }
337 });
338 }
339
Charles Chan445659f2019-01-02 13:46:16 -0800340 private class XconnectMapListener implements MapEventListener<XconnectKey, Set<XconnectEndpoint>> {
Charles Chan8d316332018-06-19 20:31:57 -0700341 @Override
Charles Chan445659f2019-01-02 13:46:16 -0800342 public void event(MapEvent<XconnectKey, Set<XconnectEndpoint>> event) {
Charles Chan8d316332018-06-19 20:31:57 -0700343 XconnectKey key = event.key();
Charles Chan445659f2019-01-02 13:46:16 -0800344 Set<XconnectEndpoint> ports = Versioned.valueOrNull(event.newValue());
345 Set<XconnectEndpoint> oldPorts = Versioned.valueOrNull(event.oldValue());
Charles Chan8d316332018-06-19 20:31:57 -0700346
347 switch (event.type()) {
348 case INSERT:
Charles Chan48df9ad2018-10-30 18:08:59 -0700349 populateXConnect(key, ports);
Charles Chan8d316332018-06-19 20:31:57 -0700350 break;
351 case UPDATE:
Charles Chan48df9ad2018-10-30 18:08:59 -0700352 updateXConnect(key, oldPorts, ports);
Charles Chan8d316332018-06-19 20:31:57 -0700353 break;
354 case REMOVE:
Charles Chan48df9ad2018-10-30 18:08:59 -0700355 revokeXConnect(key, oldPorts);
Charles Chan8d316332018-06-19 20:31:57 -0700356 break;
357 default:
358 break;
359 }
360 }
361 }
362
363 private class InternalDeviceListener implements DeviceListener {
pier6fd24fd2018-11-27 11:23:50 -0800364 // Offload the execution to an executor and then process the event
365 // if this instance is the leader of the device
Charles Chan8d316332018-06-19 20:31:57 -0700366 @Override
367 public void event(DeviceEvent event) {
Charles Chan56542b62018-08-07 12:48:36 -0700368 deviceEventExecutor.execute(() -> {
369 DeviceId deviceId = event.subject().id();
pier6fd24fd2018-11-27 11:23:50 -0800370 // Just skip if we are not the leader
371 if (!isLocalLeader(deviceId)) {
372 log.debug("Not the leader of {}. Skip event {}", deviceId, event);
Charles Chan56542b62018-08-07 12:48:36 -0700373 return;
374 }
pier6fd24fd2018-11-27 11:23:50 -0800375 // Populate or revoke according to the device availability
376 if (deviceService.isAvailable(deviceId)) {
377 init(deviceId);
378 } else {
379 cleanup(deviceId);
Charles Chan56542b62018-08-07 12:48:36 -0700380 }
381 });
Charles Chan8d316332018-06-19 20:31:57 -0700382 }
pier6fd24fd2018-11-27 11:23:50 -0800383 // We want to manage only a subset of events and if we are the leader
384 @Override
385 public boolean isRelevant(DeviceEvent event) {
386 return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
387 event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
388 event.type() == DeviceEvent.Type.DEVICE_UPDATED;
389 }
Charles Chan8d316332018-06-19 20:31:57 -0700390 }
391
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530392 private class InternalHostListener implements HostListener {
393 @Override
394 public void event(HostEvent event) {
395 hostEventExecutor.execute(() -> {
396
397 switch (event.type()) {
398 case HOST_MOVED:
399 log.trace("Processing host event {}", event);
400
401 Host host = event.subject();
402 Set<HostLocation> prevLocations = event.prevSubject().locations();
403 Set<HostLocation> newLocations = host.locations();
404
405 // Dual-home host port failure
406 // For each old location, in failed and paired devices update L2 vlan groups
407 Sets.difference(prevLocations, newLocations).forEach(prevLocation -> {
408
409 Optional<DeviceId> pairDeviceId = srService.getPairDeviceId(prevLocation.deviceId());
410 Optional<PortNumber> pairLocalPort = srService.getPairLocalPort(prevLocation.deviceId());
411
412 if (pairDeviceId.isPresent() && pairLocalPort.isPresent() && newLocations.stream()
413 .anyMatch(location -> location.deviceId().equals(pairDeviceId.get())) &&
414 hasXconnect(new ConnectPoint(prevLocation.deviceId(), prevLocation.port()))) {
415
416 List<VlanId> xconnectVlans = getXconnectVlans(prevLocation.deviceId(),
417 prevLocation.port());
418 xconnectVlans.forEach(xconnectVlan -> {
419 // Add single-home host into L2 multicast group at paired device side.
420 // Also append ACL rule to forward traffic from paired port to L2 multicast group.
421 newLocations.stream()
422 .filter(location -> location.deviceId().equals(pairDeviceId.get()))
423 .forEach(location -> populateL2Multicast(location.deviceId(),
424 srService.getPairLocalPort(
425 location.deviceId()).get(),
426 xconnectVlan,
427 Collections.singletonList(
428 location.port())));
pier6fd24fd2018-11-27 11:23:50 -0800429 // Ensure pair-port attached to xconnect vlan flooding group
430 // at dual home failed device.
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530431 updateL2Flooding(prevLocation.deviceId(), pairLocalPort.get(), xconnectVlan, true);
432 });
433 }
434 });
435
436 // Dual-home host port restoration
437 // For each new location, reverse xconnect loop prevention groups.
438 Sets.difference(newLocations, prevLocations).forEach(newLocation -> {
439 final Optional<DeviceId> pairDeviceId = srService.getPairDeviceId(newLocation.deviceId());
440 Optional<PortNumber> pairLocalPort = srService.getPairLocalPort(newLocation.deviceId());
441 if (pairDeviceId.isPresent() && pairLocalPort.isPresent() &&
442 hasXconnect((new ConnectPoint(newLocation.deviceId(), newLocation.port())))) {
443
444 List<VlanId> xconnectVlans = getXconnectVlans(newLocation.deviceId(),
445 newLocation.port());
446 xconnectVlans.forEach(xconnectVlan -> {
447 // Remove recovered dual homed port from vlan L2 multicast group
448 prevLocations.stream()
449 .filter(prevLocation -> prevLocation.deviceId().equals(pairDeviceId.get()))
pier6fd24fd2018-11-27 11:23:50 -0800450 .forEach(prevLocation -> revokeL2Multicast(
451 prevLocation.deviceId(),
452 xconnectVlan,
453 Collections.singletonList(newLocation.port()))
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530454 );
455
pier6fd24fd2018-11-27 11:23:50 -0800456 // Remove pair-port from vlan's flooding group at dual home
457 // restored device, if needed.
458 if (!hasAccessPortInMulticastGroup(new VlanNextObjectiveStoreKey(
459 newLocation.deviceId(), xconnectVlan), pairLocalPort.get())) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530460 updateL2Flooding(newLocation.deviceId(),
461 pairLocalPort.get(),
462 xconnectVlan,
463 false);
464
465 // Clean L2 multicast group at pair-device; also update store.
466 cleanupL2MulticastRule(pairDeviceId.get(),
467 srService.getPairLocalPort(pairDeviceId.get()).get(),
468 xconnectVlan,
469 false);
470 }
471 });
472 }
473 });
474 break;
475
476 default:
477 log.warn("Unsupported host event type: {} received. Ignoring.", event.type());
478 break;
479 }
480 });
481 }
482 }
483
Charles Chan1fb65132018-09-21 11:29:12 -0700484 private void init(DeviceId deviceId) {
Charles Chan8d316332018-06-19 20:31:57 -0700485 getXconnects().stream()
486 .filter(desc -> desc.key().deviceId().equals(deviceId))
Charles Chan445659f2019-01-02 13:46:16 -0800487 .forEach(desc -> populateXConnect(desc.key(), desc.endpoints()));
Charles Chan8d316332018-06-19 20:31:57 -0700488 }
489
Charles Chan1fb65132018-09-21 11:29:12 -0700490 private void cleanup(DeviceId deviceId) {
Charles Chan8d316332018-06-19 20:31:57 -0700491 xconnectNextObjStore.entrySet().stream()
492 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
493 .forEach(entry -> xconnectNextObjStore.remove(entry.getKey()));
494 log.debug("{} is removed from xConnectNextObjStore", deviceId);
495 }
496
497 /**
498 * Populates XConnect groups and flows for given key.
499 *
Charles Chan445659f2019-01-02 13:46:16 -0800500 * @param key XConnect key
501 * @param endpoints a set of endpoints to be cross-connected
Charles Chan8d316332018-06-19 20:31:57 -0700502 */
Charles Chan445659f2019-01-02 13:46:16 -0800503 private void populateXConnect(XconnectKey key, Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800504 if (!isLocalLeader(key.deviceId())) {
505 log.debug("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
Charles Chan8d316332018-06-19 20:31:57 -0700506 return;
507 }
508
Charles Chan445659f2019-01-02 13:46:16 -0800509 int nextId = populateNext(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700510 if (nextId == -1) {
511 log.warn("Fail to populateXConnect {}: {}", key, ERROR_NEXT_ID);
512 return;
513 }
Charles Chan445659f2019-01-02 13:46:16 -0800514 populateFilter(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700515 populateFwd(key, nextId);
Charles Chan8d316332018-06-19 20:31:57 -0700516 populateAcl(key);
517 }
518
519 /**
520 * Populates filtering objectives for given XConnect.
521 *
Charles Chan445659f2019-01-02 13:46:16 -0800522 * @param key XConnect store key
523 * @param endpoints XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700524 */
Charles Chan445659f2019-01-02 13:46:16 -0800525 private void populateFilter(XconnectKey key, Set<XconnectEndpoint> endpoints) {
Charles Chan48df9ad2018-10-30 18:08:59 -0700526 // FIXME Improve the logic
527 // If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
528 // The purpose is to make sure existing XConnect logic can still work on a configured port.
Charles Chan445659f2019-01-02 13:46:16 -0800529 boolean filtered = endpoints.stream()
530 .map(ep -> getNextTreatment(key.deviceId(), ep, false))
Charles Chan48df9ad2018-10-30 18:08:59 -0700531 .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
532
Charles Chan445659f2019-01-02 13:46:16 -0800533 endpoints.stream()
534 .map(ep -> getPhysicalPorts(key.deviceId(), ep))
Charles Chan48df9ad2018-10-30 18:08:59 -0700535 .flatMap(Set::stream).forEach(port -> {
Charles Chan445659f2019-01-02 13:46:16 -0800536 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
537 ObjectiveContext context = new DefaultObjectiveContext(
538 (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
539 key, port),
540 (objective, error) ->
541 log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
542 key, port, error));
543 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
544 });
Charles Chan8d316332018-06-19 20:31:57 -0700545 }
546
547 /**
548 * Populates next objectives for given XConnect.
549 *
Charles Chan445659f2019-01-02 13:46:16 -0800550 * @param key XConnect store key
551 * @param endpoints XConnect endpoints
Charles Chan48df9ad2018-10-30 18:08:59 -0700552 * @return next id
Charles Chan8d316332018-06-19 20:31:57 -0700553 */
Charles Chan445659f2019-01-02 13:46:16 -0800554 private int populateNext(XconnectKey key, Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800555 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
556 if (nextId != -1) {
Charles Chan1fb65132018-09-21 11:29:12 -0700557 log.debug("NextObj for {} found, id={}", key, nextId);
558 return nextId;
Charles Chan8d316332018-06-19 20:31:57 -0700559 } else {
Charles Chan445659f2019-01-02 13:46:16 -0800560 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700561 if (nextObjBuilder == null) {
562 log.warn("Fail to populate {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
563 return -1;
564 }
Charles Chan8d316332018-06-19 20:31:57 -0700565 ObjectiveContext nextContext = new DefaultObjectiveContext(
566 // To serialize this with kryo
567 (Serializable & Consumer<Objective>) (objective) ->
568 log.debug("XConnect NextObj for {} added", key),
Charles Chanfacfbef2018-08-23 14:30:33 -0700569 (Serializable & BiConsumer<Objective, ObjectiveError>) (objective, error) -> {
570 log.warn("Failed to add XConnect NextObj for {}: {}", key, error);
571 srService.invalidateNextObj(objective.id());
572 });
Charles Chan1fb65132018-09-21 11:29:12 -0700573 NextObjective nextObj = nextObjBuilder.add(nextContext);
Charles Chan8d316332018-06-19 20:31:57 -0700574 flowObjectiveService.next(key.deviceId(), nextObj);
Charles Chan1fb65132018-09-21 11:29:12 -0700575 xconnectNextObjStore.put(key, nextObj.id());
Charles Chan8d316332018-06-19 20:31:57 -0700576 log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
Charles Chan1fb65132018-09-21 11:29:12 -0700577 return nextObj.id();
Charles Chan8d316332018-06-19 20:31:57 -0700578 }
Charles Chan8d316332018-06-19 20:31:57 -0700579 }
580
581 /**
582 * Populates bridging forwarding objectives for given XConnect.
583 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530584 * @param key XConnect store key
Charles Chan1fb65132018-09-21 11:29:12 -0700585 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700586 */
Charles Chan1fb65132018-09-21 11:29:12 -0700587 private void populateFwd(XconnectKey key, int nextId) {
588 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextId);
Charles Chan8d316332018-06-19 20:31:57 -0700589 ObjectiveContext fwdContext = new DefaultObjectiveContext(
590 (objective) -> log.debug("XConnect FwdObj for {} populated", key),
591 (objective, error) ->
592 log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
593 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
594 }
595
596 /**
597 * Populates ACL forwarding objectives for given XConnect.
598 *
599 * @param key XConnect store key
600 */
601 private void populateAcl(XconnectKey key) {
602 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
603 ObjectiveContext aclContext = new DefaultObjectiveContext(
604 (objective) -> log.debug("XConnect AclObj for {} populated", key),
605 (objective, error) ->
606 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
607 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
608 }
609
610 /**
611 * Revokes XConnect groups and flows for given key.
612 *
Charles Chan445659f2019-01-02 13:46:16 -0800613 * @param key XConnect key
614 * @param endpoints XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700615 */
Charles Chan445659f2019-01-02 13:46:16 -0800616 private void revokeXConnect(XconnectKey key, Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800617 if (!isLocalLeader(key.deviceId())) {
618 log.debug("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
Charles Chan8d316332018-06-19 20:31:57 -0700619 return;
620 }
621
Charles Chan445659f2019-01-02 13:46:16 -0800622 revokeFilter(key, endpoints);
pier6fd24fd2018-11-27 11:23:50 -0800623 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
624 if (nextId != -1) {
Charles Chan1fb65132018-09-21 11:29:12 -0700625 revokeFwd(key, nextId, null);
Charles Chan445659f2019-01-02 13:46:16 -0800626 revokeNext(key, endpoints, nextId, null);
Charles Chan8d316332018-06-19 20:31:57 -0700627 } else {
628 log.warn("NextObj for {} does not exist in the store.", key);
629 }
Charles Chan445659f2019-01-02 13:46:16 -0800630 revokeFilter(key, endpoints);
Charles Chan8d316332018-06-19 20:31:57 -0700631 revokeAcl(key);
632 }
633
634 /**
635 * Revokes filtering objectives for given XConnect.
636 *
Charles Chan445659f2019-01-02 13:46:16 -0800637 * @param key XConnect store key
638 * @param endpoints XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700639 */
Charles Chan445659f2019-01-02 13:46:16 -0800640 private void revokeFilter(XconnectKey key, Set<XconnectEndpoint> endpoints) {
Charles Chan48df9ad2018-10-30 18:08:59 -0700641 // FIXME Improve the logic
642 // If L2 load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
643 // The purpose is to make sure existing XConnect logic can still work on a configured port.
Charles Chan445659f2019-01-02 13:46:16 -0800644 boolean filtered = endpoints.stream()
645 .map(ep -> getNextTreatment(key.deviceId(), ep, false))
646 .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
647
648 endpoints.stream()
649 .map(ep -> getPhysicalPorts(key.deviceId(), ep)).
650 flatMap(Set::stream).forEach(port -> {
651 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
652 ObjectiveContext context = new DefaultObjectiveContext(
653 (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
654 key, port),
655 (objective, error) ->
656 log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
657 key, port, error));
658 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
659 });
Charles Chan8d316332018-06-19 20:31:57 -0700660 }
661
662 /**
663 * Revokes next objectives for given XConnect.
664 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530665 * @param key XConnect store key
Charles Chan445659f2019-01-02 13:46:16 -0800666 * @param endpoints XConnect endpoints
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530667 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700668 * @param nextFuture completable future for this next objective operation
669 */
Charles Chan445659f2019-01-02 13:46:16 -0800670 private void revokeNext(XconnectKey key, Set<XconnectEndpoint> endpoints, int nextId,
Charles Chan8d316332018-06-19 20:31:57 -0700671 CompletableFuture<ObjectiveError> nextFuture) {
672 ObjectiveContext context = new ObjectiveContext() {
673 @Override
674 public void onSuccess(Objective objective) {
675 log.debug("Previous NextObj for {} removed", key);
676 if (nextFuture != null) {
677 nextFuture.complete(null);
678 }
679 }
680
681 @Override
682 public void onError(Objective objective, ObjectiveError error) {
683 log.warn("Failed to remove previous NextObj for {}: {}", key, error);
684 if (nextFuture != null) {
685 nextFuture.complete(error);
686 }
Charles Chanfacfbef2018-08-23 14:30:33 -0700687 srService.invalidateNextObj(objective.id());
Charles Chan8d316332018-06-19 20:31:57 -0700688 }
689 };
Charles Chan48df9ad2018-10-30 18:08:59 -0700690
Charles Chan445659f2019-01-02 13:46:16 -0800691 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, endpoints, nextId);
Charles Chan48df9ad2018-10-30 18:08:59 -0700692 if (nextObjBuilder == null) {
693 log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
694 return;
695 }
pier567465b2018-11-24 11:16:28 -0800696 // Release the L2Lbs if present
Charles Chan445659f2019-01-02 13:46:16 -0800697 endpoints.stream()
698 .filter(endpoint -> endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER)
699 .forEach(endpoint -> {
700 String l2LbKey = String.valueOf(((XconnectLoadBalancerEndpoint) endpoint).key());
701 l2LbService.release(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)), appId);
702 });
Charles Chan48df9ad2018-10-30 18:08:59 -0700703 flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
Charles Chan8d316332018-06-19 20:31:57 -0700704 xconnectNextObjStore.remove(key);
705 }
706
707 /**
708 * Revokes bridging forwarding objectives for given XConnect.
709 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530710 * @param key XConnect store key
711 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700712 * @param fwdFuture completable future for this forwarding objective operation
713 */
Charles Chan1fb65132018-09-21 11:29:12 -0700714 private void revokeFwd(XconnectKey key, int nextId, CompletableFuture<ObjectiveError> fwdFuture) {
715 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextId);
Charles Chan8d316332018-06-19 20:31:57 -0700716 ObjectiveContext context = new ObjectiveContext() {
717 @Override
718 public void onSuccess(Objective objective) {
719 log.debug("Previous FwdObj for {} removed", key);
720 if (fwdFuture != null) {
721 fwdFuture.complete(null);
722 }
723 }
724
725 @Override
726 public void onError(Objective objective, ObjectiveError error) {
727 log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
728 if (fwdFuture != null) {
729 fwdFuture.complete(error);
730 }
731 }
732 };
733 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.remove(context));
734 }
735
736 /**
737 * Revokes ACL forwarding objectives for given XConnect.
738 *
739 * @param key XConnect store key
740 */
741 private void revokeAcl(XconnectKey key) {
742 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
743 ObjectiveContext aclContext = new DefaultObjectiveContext(
744 (objective) -> log.debug("XConnect AclObj for {} populated", key),
745 (objective, error) ->
746 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
747 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.remove(aclContext));
748 }
749
750 /**
751 * Updates XConnect groups and flows for given key.
752 *
Charles Chan445659f2019-01-02 13:46:16 -0800753 * @param key XConnect key
754 * @param prevEndpoints previous XConnect endpoints
755 * @param endpoints new XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700756 */
Charles Chan445659f2019-01-02 13:46:16 -0800757 private void updateXConnect(XconnectKey key, Set<XconnectEndpoint> prevEndpoints,
758 Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800759 if (!isLocalLeader(key.deviceId())) {
760 log.debug("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
761 return;
762 }
Charles Chan8d316332018-06-19 20:31:57 -0700763 // NOTE: ACL flow doesn't include port information. No need to update it.
764 // Pair port is built-in and thus not going to change. No need to update it.
765
766 // remove old filter
Charles Chan445659f2019-01-02 13:46:16 -0800767 prevEndpoints.stream().filter(prevEndpoint -> !endpoints.contains(prevEndpoint)).forEach(prevEndpoint ->
768 revokeFilter(key, ImmutableSet.of(prevEndpoint)));
Charles Chan8d316332018-06-19 20:31:57 -0700769 // install new filter
Charles Chan445659f2019-01-02 13:46:16 -0800770 endpoints.stream().filter(endpoint -> !prevEndpoints.contains(endpoint)).forEach(endpoint ->
771 populateFilter(key, ImmutableSet.of(endpoint)));
Charles Chan8d316332018-06-19 20:31:57 -0700772
773 CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
774 CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
775
pier6fd24fd2018-11-27 11:23:50 -0800776 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
777 if (nextId != -1) {
Charles Chan1fb65132018-09-21 11:29:12 -0700778 revokeFwd(key, nextId, fwdFuture);
Charles Chan8d316332018-06-19 20:31:57 -0700779
780 fwdFuture.thenAcceptAsync(fwdStatus -> {
781 if (fwdStatus == null) {
782 log.debug("Fwd removed. Now remove group {}", key);
Charles Chan445659f2019-01-02 13:46:16 -0800783 revokeNext(key, prevEndpoints, nextId, nextFuture);
Charles Chan8d316332018-06-19 20:31:57 -0700784 }
785 });
786
787 nextFuture.thenAcceptAsync(nextStatus -> {
788 if (nextStatus == null) {
789 log.debug("Installing new group and flow for {}", key);
Charles Chan445659f2019-01-02 13:46:16 -0800790 int newNextId = populateNext(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700791 if (newNextId == -1) {
792 log.warn("Fail to updateXConnect {}: {}", key, ERROR_NEXT_ID);
793 return;
794 }
795 populateFwd(key, newNextId);
Charles Chan8d316332018-06-19 20:31:57 -0700796 }
797 });
798 } else {
799 log.warn("NextObj for {} does not exist in the store.", key);
800 }
801 }
802
803 /**
Charles Chan1fb65132018-09-21 11:29:12 -0700804 * Creates a next objective builder for XConnect with given nextId.
Charles Chan8d316332018-06-19 20:31:57 -0700805 *
Charles Chan445659f2019-01-02 13:46:16 -0800806 * @param key XConnect key
807 * @param endpoints XConnect endpoints
Charles Chan48df9ad2018-10-30 18:08:59 -0700808 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700809 * @return next objective builder
810 */
Charles Chan445659f2019-01-02 13:46:16 -0800811 private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<XconnectEndpoint> endpoints, int nextId) {
Charles Chan8d316332018-06-19 20:31:57 -0700812 TrafficSelector metadata =
813 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
814 NextObjective.Builder nextObjBuilder = DefaultNextObjective
815 .builder().withId(nextId)
816 .withType(NextObjective.Type.BROADCAST).fromApp(appId)
817 .withMeta(metadata);
Charles Chan48df9ad2018-10-30 18:08:59 -0700818
Charles Chan445659f2019-01-02 13:46:16 -0800819 for (XconnectEndpoint endpoint : endpoints) {
820 NextTreatment nextTreatment = getNextTreatment(key.deviceId(), endpoint, true);
Charles Chan48df9ad2018-10-30 18:08:59 -0700821 if (nextTreatment == null) {
pier567465b2018-11-24 11:16:28 -0800822 // If a L2Lb is used in the XConnect - putting on hold
Charles Chan445659f2019-01-02 13:46:16 -0800823 if (endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER) {
pier567465b2018-11-24 11:16:28 -0800824 log.warn("Unable to create nextObj. L2Lb not ready");
Charles Chan445659f2019-01-02 13:46:16 -0800825 String l2LbKey = String.valueOf(((XconnectLoadBalancerEndpoint) endpoint).key());
pier567465b2018-11-24 11:16:28 -0800826 l2LbCache.asMap().putIfAbsent(new L2LbId(key.deviceId(), Integer.parseInt(l2LbKey)),
827 key);
828 } else {
829 log.warn("Unable to create nextObj. Null NextTreatment");
830 }
Charles Chan48df9ad2018-10-30 18:08:59 -0700831 return null;
832 }
833 nextObjBuilder.addTreatment(nextTreatment);
834 }
835
Charles Chan8d316332018-06-19 20:31:57 -0700836 return nextObjBuilder;
837 }
838
839 /**
Charles Chan1fb65132018-09-21 11:29:12 -0700840 * Creates a next objective builder for XConnect.
841 *
Charles Chan445659f2019-01-02 13:46:16 -0800842 * @param key XConnect key
843 * @param endpoints Xconnect endpoints
Charles Chan1fb65132018-09-21 11:29:12 -0700844 * @return next objective builder
845 */
Charles Chan445659f2019-01-02 13:46:16 -0800846 private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<XconnectEndpoint> endpoints) {
Charles Chan1fb65132018-09-21 11:29:12 -0700847 int nextId = flowObjectiveService.allocateNextId();
Charles Chan445659f2019-01-02 13:46:16 -0800848 return nextObjBuilder(key, endpoints, nextId);
Charles Chan1fb65132018-09-21 11:29:12 -0700849 }
850
851
852 /**
Charles Chan8d316332018-06-19 20:31:57 -0700853 * Creates a bridging forwarding objective builder for XConnect.
854 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530855 * @param key XConnect key
Charles Chan8d316332018-06-19 20:31:57 -0700856 * @param nextId next ID of the broadcast group for this XConnect key
857 * @return forwarding objective builder
858 */
859 private ForwardingObjective.Builder fwdObjBuilder(XconnectKey key, int nextId) {
860 /*
861 * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
862 * as the VLAN cross-connect broadcast rules
863 */
864 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
865 sbuilder.matchVlanId(key.vlanId());
866 sbuilder.matchEthDst(MacAddress.NONE);
867
868 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
869 fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
870 .withSelector(sbuilder.build())
871 .nextStep(nextId)
872 .withPriority(XCONNECT_PRIORITY)
873 .fromApp(appId)
874 .makePermanent();
875 return fob;
876 }
877
878 /**
879 * Creates an ACL forwarding objective builder for XConnect.
880 *
881 * @param vlanId cross connect VLAN id
882 * @return forwarding objective builder
883 */
884 private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
885 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
886 sbuilder.matchVlanId(vlanId);
887
888 TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
889
890 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
891 fob.withFlag(ForwardingObjective.Flag.VERSATILE)
892 .withSelector(sbuilder.build())
893 .withTreatment(tbuilder.build())
894 .withPriority(XCONNECT_ACL_PRIORITY)
895 .fromApp(appId)
896 .makePermanent();
897 return fob;
898 }
899
900 /**
901 * Creates a filtering objective builder for XConnect.
902 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530903 * @param key XConnect key
Charles Chan8d316332018-06-19 20:31:57 -0700904 * @param port XConnect ports
Charles Chan48df9ad2018-10-30 18:08:59 -0700905 * @param filtered true if this is a filtered port
Charles Chan8d316332018-06-19 20:31:57 -0700906 * @return next objective builder
907 */
Charles Chan48df9ad2018-10-30 18:08:59 -0700908 private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port, boolean filtered) {
Charles Chan8d316332018-06-19 20:31:57 -0700909 FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
910 fob.withKey(Criteria.matchInPort(port))
Charles Chan8d316332018-06-19 20:31:57 -0700911 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
912 .withPriority(XCONNECT_PRIORITY);
Charles Chan48df9ad2018-10-30 18:08:59 -0700913 if (filtered) {
914 fob.addCondition(Criteria.matchVlanId(key.vlanId()));
915 } else {
916 fob.addCondition(Criteria.matchVlanId(VlanId.ANY));
917 }
Charles Chan8d316332018-06-19 20:31:57 -0700918 return fob.permit().fromApp(appId);
919 }
920
921 /**
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530922 * Updates L2 flooding groups; add pair link into L2 flooding group of given xconnect vlan.
Charles Chan8d316332018-06-19 20:31:57 -0700923 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530924 * @param deviceId Device ID
925 * @param port Port details
926 * @param vlanId VLAN ID
927 * @param install Whether to add or revoke pair link addition to flooding group
Charles Chan8d316332018-06-19 20:31:57 -0700928 */
pier6fd24fd2018-11-27 11:23:50 -0800929 private void updateL2Flooding(DeviceId deviceId, PortNumber port, VlanId vlanId, boolean install) {
930 XconnectKey key = new XconnectKey(deviceId, vlanId);
931 // Ensure leadership on device
932 if (!isLocalLeader(deviceId)) {
933 log.debug("Abort updating L2Flood {}: {}", key, ERROR_NOT_LEADER);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530934 return;
Charles Chan8d316332018-06-19 20:31:57 -0700935 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530936
937 // Locate L2 flooding group details for given xconnect vlan
pier6fd24fd2018-11-27 11:23:50 -0800938 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530939 if (nextId == -1) {
940 log.debug("XConnect vlan {} broadcast group for device {} doesn't exists. " +
941 "Aborting pair group linking.", vlanId, deviceId);
942 return;
943 }
944
945 // Add pairing-port group to flooding group
946 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
947 // treatment.popVlan();
948 treatment.setOutput(port);
949 ObjectiveContext context = new DefaultObjectiveContext(
950 (objective) ->
951 log.debug("Pair port added/removed to vlan {} next objective {} on {}",
952 vlanId, nextId, deviceId),
953 (objective, error) ->
954 log.warn("Failed adding/removing pair port to vlan {} next objective {} on {}." +
955 "Error : {}", vlanId, nextId, deviceId, error)
956 );
957 NextObjective.Builder vlanNextObjectiveBuilder = DefaultNextObjective.builder()
958 .withId(nextId)
959 .withType(NextObjective.Type.BROADCAST)
960 .fromApp(srService.appId())
961 .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId).build())
962 .addTreatment(treatment.build());
963 if (install) {
964 flowObjectiveService.next(deviceId, vlanNextObjectiveBuilder.addToExisting(context));
965 } else {
966 flowObjectiveService.next(deviceId, vlanNextObjectiveBuilder.removeFromExisting(context));
967 }
968 log.debug("Submitted next objective {} for vlan: {} in device {}",
969 nextId, vlanId, deviceId);
Charles Chan8d316332018-06-19 20:31:57 -0700970 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530971
972 /**
973 * Populate L2 multicast rule on given deviceId that matches given mac, given vlan and
974 * output to given port's L2 mulitcast group.
975 *
976 * @param deviceId Device ID
977 * @param pairPort Pair port number
978 * @param vlanId VLAN ID
979 * @param accessPorts List of access ports to be added into L2 multicast group
980 */
pier6fd24fd2018-11-27 11:23:50 -0800981 private void populateL2Multicast(DeviceId deviceId, PortNumber pairPort,
982 VlanId vlanId, List<PortNumber> accessPorts) {
983 // Ensure enough rights to program pair device
984 if (!srService.shouldProgram(deviceId)) {
985 log.debug("Abort populate L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
986 return;
987 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530988
989 boolean multicastGroupExists = true;
990 int vlanMulticastNextId;
pier6fd24fd2018-11-27 11:23:50 -0800991 VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530992
993 // Step 1 : Populate single homed access ports into vlan's L2 multicast group
994 NextObjective.Builder vlanMulticastNextObjBuilder = DefaultNextObjective
995 .builder()
996 .withType(NextObjective.Type.BROADCAST)
997 .fromApp(srService.appId())
998 .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId)
999 .matchEthDst(MacAddress.IPV4_MULTICAST).build());
pier6fd24fd2018-11-27 11:23:50 -08001000 vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301001 if (vlanMulticastNextId == -1) {
1002 // Vlan's L2 multicast group doesn't exist; create it, update store and add pair port as sub-group
1003 multicastGroupExists = false;
1004 vlanMulticastNextId = flowObjectiveService.allocateNextId();
pier6fd24fd2018-11-27 11:23:50 -08001005 addMulticastGroupNextObjectiveId(key, vlanMulticastNextId);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301006 vlanMulticastNextObjBuilder.addTreatment(
1007 DefaultTrafficTreatment.builder().popVlan().setOutput(pairPort).build()
1008 );
1009 }
1010 vlanMulticastNextObjBuilder.withId(vlanMulticastNextId);
pier6fd24fd2018-11-27 11:23:50 -08001011 int nextId = vlanMulticastNextId;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301012 accessPorts.forEach(p -> {
1013 TrafficTreatment.Builder egressAction = DefaultTrafficTreatment.builder();
1014 // Do vlan popup action based on interface configuration
1015 if (interfaceService.getInterfacesByPort(new ConnectPoint(deviceId, p))
1016 .stream().noneMatch(i -> i.vlanTagged().contains(vlanId))) {
1017 egressAction.popVlan();
1018 }
1019 egressAction.setOutput(p);
1020 vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
pier6fd24fd2018-11-27 11:23:50 -08001021 addMulticastGroupPort(key, p);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301022 });
1023 ObjectiveContext context = new DefaultObjectiveContext(
1024 (objective) ->
1025 log.debug("L2 multicast group installed/updated. "
1026 + "NextObject Id {} on {} for subnet {} ",
1027 nextId, deviceId, vlanId),
1028 (objective, error) ->
1029 log.warn("L2 multicast group failed to install/update. "
1030 + " NextObject Id {} on {} for subnet {} : {}",
1031 nextId, deviceId, vlanId, error)
1032 );
1033 if (!multicastGroupExists) {
1034 flowObjectiveService.next(deviceId, vlanMulticastNextObjBuilder.add(context));
1035
1036 // Step 2 : Populate ACL rule; selector = vlan + pair-port, output = vlan L2 multicast group
1037 TrafficSelector.Builder multicastSelector = DefaultTrafficSelector.builder();
1038 multicastSelector.matchEthType(Ethernet.TYPE_VLAN);
1039 multicastSelector.matchInPort(pairPort);
1040 multicastSelector.matchVlanId(vlanId);
1041 ForwardingObjective.Builder vlanMulticastForwardingObj = DefaultForwardingObjective.builder()
1042 .withFlag(ForwardingObjective.Flag.VERSATILE)
1043 .nextStep(vlanMulticastNextId)
1044 .withSelector(multicastSelector.build())
1045 .withPriority(100)
1046 .fromApp(srService.appId())
1047 .makePermanent();
1048 context = new DefaultObjectiveContext(
1049 (objective) -> log.debug("L2 multicasting versatile rule for device {}, port/vlan {}/{} populated",
1050 deviceId,
1051 pairPort,
1052 vlanId),
1053 (objective, error) -> log.warn("Failed to populate L2 multicasting versatile rule for device {}, " +
1054 "ports/vlan {}/{}: {}", deviceId, pairPort, vlanId, error));
1055 flowObjectiveService.forward(deviceId, vlanMulticastForwardingObj.add(context));
1056 } else {
1057 // L2_MULTICAST & BROADCAST are similar structure in subgroups; so going with BROADCAST type.
1058 vlanMulticastNextObjBuilder.withType(NextObjective.Type.BROADCAST);
1059 flowObjectiveService.next(deviceId, vlanMulticastNextObjBuilder.addToExisting(context));
1060 }
1061 }
1062
1063 /**
1064 * Removes access ports from VLAN L2 multicast group on given deviceId.
1065 *
1066 * @param deviceId Device ID
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301067 * @param vlanId VLAN ID
1068 * @param accessPorts List of access ports to be added into L2 multicast group
1069 */
pier6fd24fd2018-11-27 11:23:50 -08001070 private void revokeL2Multicast(DeviceId deviceId, VlanId vlanId, List<PortNumber> accessPorts) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301071 // Ensure enough rights to program pair device
1072 if (!srService.shouldProgram(deviceId)) {
pier6fd24fd2018-11-27 11:23:50 -08001073 log.debug("Abort revoke L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301074 return;
1075 }
1076
pier6fd24fd2018-11-27 11:23:50 -08001077 VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
1078
1079 int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301080 if (vlanMulticastNextId == -1) {
1081 return;
1082 }
1083 NextObjective.Builder vlanMulticastNextObjBuilder = DefaultNextObjective
1084 .builder()
1085 .withType(NextObjective.Type.BROADCAST)
1086 .fromApp(srService.appId())
1087 .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId).build())
1088 .withId(vlanMulticastNextId);
1089 accessPorts.forEach(p -> {
1090 TrafficTreatment.Builder egressAction = DefaultTrafficTreatment.builder();
1091 // Do vlan popup action based on interface configuration
1092 if (interfaceService.getInterfacesByPort(new ConnectPoint(deviceId, p))
1093 .stream().noneMatch(i -> i.vlanTagged().contains(vlanId))) {
1094 egressAction.popVlan();
1095 }
1096 egressAction.setOutput(p);
1097 vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
pier6fd24fd2018-11-27 11:23:50 -08001098 removeMulticastGroupPort(key, p);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301099 });
1100 ObjectiveContext context = new DefaultObjectiveContext(
1101 (objective) ->
1102 log.debug("L2 multicast group installed/updated. "
1103 + "NextObject Id {} on {} for subnet {} ",
1104 vlanMulticastNextId, deviceId, vlanId),
1105 (objective, error) ->
1106 log.warn("L2 multicast group failed to install/update. "
1107 + " NextObject Id {} on {} for subnet {} : {}",
1108 vlanMulticastNextId, deviceId, vlanId, error)
1109 );
1110 flowObjectiveService.next(deviceId, vlanMulticastNextObjBuilder.removeFromExisting(context));
1111 }
1112
1113 /**
1114 * Cleans up VLAN L2 multicast group on given deviceId. ACL rules for the group will also be deleted.
1115 * Normally multicast group is not removed if it contains access ports; which can be forced
1116 * by "force" flag
1117 *
1118 * @param deviceId Device ID
1119 * @param pairPort Pair port number
1120 * @param vlanId VLAN ID
1121 * @param force Forceful removal
1122 */
1123 private void cleanupL2MulticastRule(DeviceId deviceId, PortNumber pairPort, VlanId vlanId, boolean force) {
1124
1125 // Ensure enough rights to program pair device
1126 if (!srService.shouldProgram(deviceId)) {
pier6fd24fd2018-11-27 11:23:50 -08001127 log.debug("Abort cleanup L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301128 return;
1129 }
1130
pier6fd24fd2018-11-27 11:23:50 -08001131 VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
1132
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301133 // Ensure L2 multicast group doesn't contain access ports
pier6fd24fd2018-11-27 11:23:50 -08001134 if (hasAccessPortInMulticastGroup(key, pairPort) && !force) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301135 return;
1136 }
1137
1138 // Load L2 multicast group details
pier6fd24fd2018-11-27 11:23:50 -08001139 int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301140 if (vlanMulticastNextId == -1) {
1141 return;
1142 }
1143
1144 // Step 1 : Clear ACL rule; selector = vlan + pair-port, output = vlan L2 multicast group
1145 TrafficSelector.Builder l2MulticastSelector = DefaultTrafficSelector.builder();
1146 l2MulticastSelector.matchEthType(Ethernet.TYPE_VLAN);
1147 l2MulticastSelector.matchInPort(pairPort);
1148 l2MulticastSelector.matchVlanId(vlanId);
1149 ForwardingObjective.Builder vlanMulticastForwardingObj = DefaultForwardingObjective.builder()
1150 .withFlag(ForwardingObjective.Flag.VERSATILE)
1151 .nextStep(vlanMulticastNextId)
1152 .withSelector(l2MulticastSelector.build())
1153 .withPriority(100)
1154 .fromApp(srService.appId())
1155 .makePermanent();
1156 ObjectiveContext context = new DefaultObjectiveContext(
1157 (objective) -> log.debug("L2 multicasting rule for device {}, port/vlan {}/{} deleted", deviceId,
1158 pairPort, vlanId),
1159 (objective, error) -> log.warn("Failed to delete L2 multicasting rule for device {}, " +
1160 "ports/vlan {}/{}: {}", deviceId, pairPort, vlanId, error));
1161 flowObjectiveService.forward(deviceId, vlanMulticastForwardingObj.remove(context));
1162
1163 // Step 2 : Clear L2 multicast group associated with vlan
1164 NextObjective.Builder l2MulticastGroupBuilder = DefaultNextObjective
1165 .builder()
1166 .withId(vlanMulticastNextId)
1167 .withType(NextObjective.Type.BROADCAST)
1168 .fromApp(srService.appId())
1169 .withMeta(DefaultTrafficSelector.builder()
1170 .matchVlanId(vlanId)
1171 .matchEthDst(MacAddress.IPV4_MULTICAST).build())
1172 .addTreatment(DefaultTrafficTreatment.builder().popVlan().setOutput(pairPort).build());
1173 context = new DefaultObjectiveContext(
1174 (objective) ->
1175 log.debug("L2 multicast group with NextObject Id {} deleted on {} for subnet {} ",
1176 vlanMulticastNextId, deviceId, vlanId),
1177 (objective, error) ->
1178 log.warn("L2 multicast group with NextObject Id {} failed to delete on {} for subnet {} : {}",
1179 vlanMulticastNextId, deviceId, vlanId, error)
1180 );
1181 flowObjectiveService.next(deviceId, l2MulticastGroupBuilder.remove(context));
1182
1183 // Finally clear store.
pier6fd24fd2018-11-27 11:23:50 -08001184 removeMulticastGroup(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301185 }
1186
pier6fd24fd2018-11-27 11:23:50 -08001187 private int getMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key) {
1188 return Versioned.valueOrElse(xconnectMulticastNextStore.get(key), -1);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301189 }
1190
pier6fd24fd2018-11-27 11:23:50 -08001191 private void addMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key, int nextId) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301192 if (nextId == -1) {
1193 return;
1194 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301195 xconnectMulticastNextStore.put(key, nextId);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301196 }
1197
pier6fd24fd2018-11-27 11:23:50 -08001198 private void addMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
1199 xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
1200 if (ports == null) {
1201 ports = Lists.newArrayList();
1202 }
1203 ports.add(port);
1204 return ports;
1205 });
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301206 }
1207
pier6fd24fd2018-11-27 11:23:50 -08001208 private void removeMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
1209 xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
1210 if (ports != null && !ports.isEmpty()) {
1211 ports.remove(port);
1212 }
1213 return ports;
1214 });
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301215 }
1216
pier6fd24fd2018-11-27 11:23:50 -08001217 private void removeMulticastGroup(VlanNextObjectiveStoreKey groupKey) {
1218 xconnectMulticastPortsStore.remove(groupKey);
1219 xconnectMulticastNextStore.remove(groupKey);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301220 }
1221
pier6fd24fd2018-11-27 11:23:50 -08001222 private boolean hasAccessPortInMulticastGroup(VlanNextObjectiveStoreKey groupKey, PortNumber pairPort) {
1223 List<PortNumber> ports = Versioned.valueOrElse(xconnectMulticastPortsStore.get(groupKey), ImmutableList.of());
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301224 return ports.stream().anyMatch(p -> !p.equals(pairPort));
1225 }
1226
pier6fd24fd2018-11-27 11:23:50 -08001227 // Custom-built function, when the device is not available we need a fallback mechanism
1228 private boolean isLocalLeader(DeviceId deviceId) {
1229 if (!mastershipService.isLocalMaster(deviceId)) {
1230 // When the device is available we just check the mastership
1231 if (deviceService.isAvailable(deviceId)) {
1232 return false;
1233 }
1234 // Fallback with Leadership service - device id is used as topic
1235 NodeId leader = leadershipService.runForLeadership(
1236 deviceId.toString()).leaderNodeId();
1237 // Verify if this node is the leader
1238 return clusterService.getLocalNode().id().equals(leader);
1239 }
1240 return true;
1241 }
1242
Charles Chan445659f2019-01-02 13:46:16 -08001243 private Set<PortNumber> getPhysicalPorts(DeviceId deviceId, XconnectEndpoint endpoint) {
1244 if (endpoint.type() == XconnectEndpoint.Type.PORT) {
1245 PortNumber port = ((XconnectPortEndpoint) endpoint).port();
1246 return Sets.newHashSet(port);
Charles Chan48df9ad2018-10-30 18:08:59 -07001247 }
Charles Chan445659f2019-01-02 13:46:16 -08001248 if (endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER) {
1249 L2LbId l2LbId = new L2LbId(deviceId, ((XconnectLoadBalancerEndpoint) endpoint).key());
1250 Set<PortNumber> ports = l2LbService.getL2Lb(l2LbId).ports();
1251 return Sets.newHashSet(ports);
Charles Chan48df9ad2018-10-30 18:08:59 -07001252 }
Charles Chan48df9ad2018-10-30 18:08:59 -07001253 return Sets.newHashSet();
1254 }
1255
Charles Chan445659f2019-01-02 13:46:16 -08001256 private NextTreatment getNextTreatment(DeviceId deviceId, XconnectEndpoint endpoint, boolean reserve) {
1257 if (endpoint.type() == XconnectEndpoint.Type.PORT) {
1258 PortNumber port = ((XconnectPortEndpoint) endpoint).port();
1259 return DefaultNextTreatment.of(DefaultTrafficTreatment.builder().setOutput(port).build());
Charles Chan48df9ad2018-10-30 18:08:59 -07001260 }
Charles Chan445659f2019-01-02 13:46:16 -08001261 if (endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER) {
1262 L2LbId l2LbId = new L2LbId(deviceId, ((XconnectLoadBalancerEndpoint) endpoint).key());
1263 NextTreatment idNextTreatment = IdNextTreatment.of(l2LbService.getL2LbNext(l2LbId));
pier567465b2018-11-24 11:16:28 -08001264 // Reserve only one time during next objective creation
1265 if (reserve) {
Charles Chan445659f2019-01-02 13:46:16 -08001266 if (!l2LbService.reserve(l2LbId, appId)) {
pier567465b2018-11-24 11:16:28 -08001267 log.warn("Reservation failed for {}", l2LbId);
1268 idNextTreatment = null;
1269 }
1270 }
1271 return idNextTreatment;
Charles Chan48df9ad2018-10-30 18:08:59 -07001272 }
Charles Chan48df9ad2018-10-30 18:08:59 -07001273 return null;
1274 }
pier567465b2018-11-24 11:16:28 -08001275
pier567465b2018-11-24 11:16:28 -08001276 private class InternalL2LbListener implements L2LbListener {
1277 // Populate xconnect once l2lb is available
1278 @Override
1279 public void event(L2LbEvent event) {
1280 l2lbExecutor.execute(() -> dequeue(event.subject().l2LbId()));
1281 }
1282 // When we receive INSTALLED l2 load balancing is ready
1283 @Override
1284 public boolean isRelevant(L2LbEvent event) {
1285 return event.type() == L2LbEvent.Type.INSTALLED;
1286 }
1287 }
1288
1289 // Invalidate the cache and re-start the xconnect installation
1290 private void dequeue(L2LbId l2LbId) {
1291 XconnectKey xconnectKey = l2LbCache.getIfPresent(l2LbId);
1292 if (xconnectKey == null) {
1293 log.trace("{} not present in the cache", l2LbId);
1294 return;
1295 }
1296 log.debug("Dequeue {}", l2LbId);
1297 l2LbCache.invalidate(l2LbId);
Charles Chan445659f2019-01-02 13:46:16 -08001298 Set<XconnectEndpoint> endpoints = Versioned.valueOrNull(xconnectStore.get(xconnectKey));
1299 if (endpoints == null || endpoints.isEmpty()) {
1300 log.warn("Endpoints not found for XConnect {}", xconnectKey);
pier567465b2018-11-24 11:16:28 -08001301 return;
1302 }
Charles Chan445659f2019-01-02 13:46:16 -08001303 populateXConnect(xconnectKey, endpoints);
pier567465b2018-11-24 11:16:28 -08001304 log.trace("L2Lb cache size {}", l2LbCache.size());
1305 }
1306
Charles Chan8d316332018-06-19 20:31:57 -07001307}