blob: d4a7430a3f4c2ea73b74be47bf0c96f418fc8448 [file] [log] [blame]
Charles Chan8d316332018-06-19 20:31:57 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.xconnect.impl;
17
pier6fd24fd2018-11-27 11:23:50 -080018import com.google.common.collect.ImmutableList;
pier567465b2018-11-24 11:16:28 -080019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalNotification;
Charles Chan0b1dd7e2018-08-19 19:21:46 -070022import com.google.common.collect.ImmutableMap;
Charles Chan8d316332018-06-19 20:31:57 -070023import com.google.common.collect.ImmutableSet;
pier6fd24fd2018-11-27 11:23:50 -080024import com.google.common.collect.Lists;
Charles Chan8d316332018-06-19 20:31:57 -070025import com.google.common.collect.Sets;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053026import org.onlab.packet.Ethernet;
Charles Chan8d316332018-06-19 20:31:57 -070027import org.onlab.packet.MacAddress;
28import org.onlab.packet.VlanId;
29import org.onlab.util.KryoNamespace;
pier6fd24fd2018-11-27 11:23:50 -080030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.LeadershipService;
32import org.onosproject.cluster.NodeId;
Charles Chan8d316332018-06-19 20:31:57 -070033import org.onosproject.codec.CodecService;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
Charles Chanc0a499b2019-01-16 15:30:39 -080036import org.onosproject.portloadbalancer.api.PortLoadBalancerEvent;
37import org.onosproject.portloadbalancer.api.PortLoadBalancerId;
38import org.onosproject.portloadbalancer.api.PortLoadBalancerListener;
39import org.onosproject.portloadbalancer.api.PortLoadBalancerService;
Charles Chan8d316332018-06-19 20:31:57 -070040import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DeviceId;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053043import org.onosproject.net.Host;
44import org.onosproject.net.HostLocation;
Charles Chan8d316332018-06-19 20:31:57 -070045import org.onosproject.net.PortNumber;
46import org.onosproject.net.config.NetworkConfigService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.net.flow.DefaultTrafficSelector;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.TrafficSelector;
53import org.onosproject.net.flow.TrafficTreatment;
54import org.onosproject.net.flow.criteria.Criteria;
55import org.onosproject.net.flowobjective.DefaultFilteringObjective;
56import org.onosproject.net.flowobjective.DefaultForwardingObjective;
57import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan48df9ad2018-10-30 18:08:59 -070058import org.onosproject.net.flowobjective.DefaultNextTreatment;
Charles Chan8d316332018-06-19 20:31:57 -070059import org.onosproject.net.flowobjective.DefaultObjectiveContext;
60import org.onosproject.net.flowobjective.FilteringObjective;
61import org.onosproject.net.flowobjective.FlowObjectiveService;
62import org.onosproject.net.flowobjective.ForwardingObjective;
Charles Chan48df9ad2018-10-30 18:08:59 -070063import org.onosproject.net.flowobjective.IdNextTreatment;
Charles Chan8d316332018-06-19 20:31:57 -070064import org.onosproject.net.flowobjective.NextObjective;
Charles Chan48df9ad2018-10-30 18:08:59 -070065import org.onosproject.net.flowobjective.NextTreatment;
Charles Chan8d316332018-06-19 20:31:57 -070066import org.onosproject.net.flowobjective.Objective;
67import org.onosproject.net.flowobjective.ObjectiveContext;
68import org.onosproject.net.flowobjective.ObjectiveError;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053069import org.onosproject.net.host.HostEvent;
70import org.onosproject.net.host.HostListener;
71import org.onosproject.net.host.HostService;
72import org.onosproject.net.intf.InterfaceService;
Charles Chan8d316332018-06-19 20:31:57 -070073import org.onosproject.segmentrouting.SegmentRoutingService;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053074import org.onosproject.segmentrouting.storekey.VlanNextObjectiveStoreKey;
Charles Chan8d316332018-06-19 20:31:57 -070075import org.onosproject.segmentrouting.xconnect.api.XconnectCodec;
76import org.onosproject.segmentrouting.xconnect.api.XconnectDesc;
Charles Chan445659f2019-01-02 13:46:16 -080077import org.onosproject.segmentrouting.xconnect.api.XconnectEndpoint;
Charles Chan8d316332018-06-19 20:31:57 -070078import org.onosproject.segmentrouting.xconnect.api.XconnectKey;
Charles Chan445659f2019-01-02 13:46:16 -080079import org.onosproject.segmentrouting.xconnect.api.XconnectLoadBalancerEndpoint;
80import org.onosproject.segmentrouting.xconnect.api.XconnectPortEndpoint;
Charles Chan8d316332018-06-19 20:31:57 -070081import org.onosproject.segmentrouting.xconnect.api.XconnectService;
82import org.onosproject.store.serializers.KryoNamespaces;
83import org.onosproject.store.service.ConsistentMap;
84import org.onosproject.store.service.MapEvent;
85import org.onosproject.store.service.MapEventListener;
86import org.onosproject.store.service.Serializer;
87import org.onosproject.store.service.StorageService;
88import org.onosproject.store.service.Versioned;
Ray Milkey2bd24a92018-08-17 14:54:17 -070089import org.osgi.service.component.annotations.Activate;
90import org.osgi.service.component.annotations.Component;
91import org.osgi.service.component.annotations.Deactivate;
92import org.osgi.service.component.annotations.Reference;
93import org.osgi.service.component.annotations.ReferenceCardinality;
Charles Chan8d316332018-06-19 20:31:57 -070094import org.slf4j.Logger;
95import org.slf4j.LoggerFactory;
96
97import java.io.Serializable;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +053098import java.util.Collections;
99import java.util.List;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530100import java.util.Optional;
Charles Chan8d316332018-06-19 20:31:57 -0700101import java.util.Set;
102import java.util.concurrent.CompletableFuture;
Charles Chan56542b62018-08-07 12:48:36 -0700103import java.util.concurrent.ExecutorService;
104import java.util.concurrent.Executors;
pier567465b2018-11-24 11:16:28 -0800105import java.util.concurrent.ScheduledExecutorService;
106import java.util.concurrent.TimeUnit;
Charles Chan8d316332018-06-19 20:31:57 -0700107import java.util.function.BiConsumer;
108import java.util.function.Consumer;
109import java.util.stream.Collectors;
110
pier567465b2018-11-24 11:16:28 -0800111import static java.util.concurrent.Executors.newScheduledThreadPool;
Charles Chan56542b62018-08-07 12:48:36 -0700112import static org.onlab.util.Tools.groupedThreads;
113
Ray Milkey2bd24a92018-08-17 14:54:17 -0700114@Component(immediate = true, service = XconnectService.class)
Charles Chan8d316332018-06-19 20:31:57 -0700115public class XconnectManager implements XconnectService {
Ray Milkey2bd24a92018-08-17 14:54:17 -0700116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700117 private CoreService coreService;
118
Ray Milkey2bd24a92018-08-17 14:54:17 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700120 private CodecService codecService;
121
Ray Milkey2bd24a92018-08-17 14:54:17 -0700122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700123 private StorageService storageService;
124
Ray Milkey2bd24a92018-08-17 14:54:17 -0700125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700126 public NetworkConfigService netCfgService;
127
Ray Milkey2bd24a92018-08-17 14:54:17 -0700128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700129 public DeviceService deviceService;
130
Ray Milkey2bd24a92018-08-17 14:54:17 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700132 public FlowObjectiveService flowObjectiveService;
133
Ray Milkey2bd24a92018-08-17 14:54:17 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pier6fd24fd2018-11-27 11:23:50 -0800135 private LeadershipService leadershipService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 private ClusterService clusterService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan8d316332018-06-19 20:31:57 -0700141 public MastershipService mastershipService;
142
Ray Milkey2bd24a92018-08-17 14:54:17 -0700143 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
Charles Chan8d316332018-06-19 20:31:57 -0700144 public SegmentRoutingService srService;
145
Ray Milkey3cad4db2018-10-04 15:13:33 -0700146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530147 public InterfaceService interfaceService;
148
Ray Milkey3cad4db2018-10-04 15:13:33 -0700149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530150 HostService hostService;
151
Charles Chan48df9ad2018-10-30 18:08:59 -0700152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chanc0a499b2019-01-16 15:30:39 -0800153 private PortLoadBalancerService portLoadBalancerService;
Charles Chan48df9ad2018-10-30 18:08:59 -0700154
Charles Chan8d316332018-06-19 20:31:57 -0700155 private static final String APP_NAME = "org.onosproject.xconnect";
pier6fd24fd2018-11-27 11:23:50 -0800156 private static final String ERROR_NOT_LEADER = "Not leader controller";
Charles Chan48df9ad2018-10-30 18:08:59 -0700157 private static final String ERROR_NEXT_OBJ_BUILDER = "Unable to construct next objective builder";
158 private static final String ERROR_NEXT_ID = "Unable to get next id";
Charles Chan8d316332018-06-19 20:31:57 -0700159
160 private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
161
162 private ApplicationId appId;
Charles Chan445659f2019-01-02 13:46:16 -0800163 private ConsistentMap<XconnectKey, Set<XconnectEndpoint>> xconnectStore;
Charles Chan1fb65132018-09-21 11:29:12 -0700164 private ConsistentMap<XconnectKey, Integer> xconnectNextObjStore;
Charles Chan8d316332018-06-19 20:31:57 -0700165
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530166 private ConsistentMap<VlanNextObjectiveStoreKey, Integer> xconnectMulticastNextStore;
167 private ConsistentMap<VlanNextObjectiveStoreKey, List<PortNumber>> xconnectMulticastPortsStore;
168
Charles Chan445659f2019-01-02 13:46:16 -0800169 private final MapEventListener<XconnectKey, Set<XconnectEndpoint>> xconnectListener = new XconnectMapListener();
pier6fd24fd2018-11-27 11:23:50 -0800170 private ExecutorService xConnectExecutor;
Charles Chan8d316332018-06-19 20:31:57 -0700171
pier6fd24fd2018-11-27 11:23:50 -0800172 private final DeviceListener deviceListener = new InternalDeviceListener();
Charles Chan56542b62018-08-07 12:48:36 -0700173 private ExecutorService deviceEventExecutor;
174
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530175 private final HostListener hostListener = new InternalHostListener();
176 private ExecutorService hostEventExecutor;
177
pier567465b2018-11-24 11:16:28 -0800178 // Wait time for the cache
179 private static final int WAIT_TIME_MS = 15000;
Charles Chanc0a499b2019-01-16 15:30:39 -0800180 //The cache is implemented as buffer for waiting the installation of PortLoadBalancer when present
181 private Cache<PortLoadBalancerId, XconnectKey> portLoadBalancerCache;
pier567465b2018-11-24 11:16:28 -0800182 // Executor for the cache
Charles Chanc0a499b2019-01-16 15:30:39 -0800183 private ScheduledExecutorService portLoadBalancerExecutor;
184 // We need to listen for some events to properly installed the xconnect with portloadbalancer
185 private final PortLoadBalancerListener portLoadBalancerListener = new InternalPortLoadBalancerListener();
pier567465b2018-11-24 11:16:28 -0800186
Charles Chan8d316332018-06-19 20:31:57 -0700187 @Activate
188 void activate() {
189 appId = coreService.registerApplication(APP_NAME);
190 codecService.registerCodec(XconnectDesc.class, new XconnectCodec());
191
192 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
193 .register(KryoNamespaces.API)
Charles Chanfbaad962018-07-23 12:53:16 -0700194 .register(XconnectManager.class)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530195 .register(XconnectKey.class)
Charles Chan445659f2019-01-02 13:46:16 -0800196 .register(XconnectEndpoint.class)
197 .register(XconnectPortEndpoint.class)
198 .register(XconnectLoadBalancerEndpoint.class)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530199 .register(VlanNextObjectiveStoreKey.class);
Charles Chan8d316332018-06-19 20:31:57 -0700200
Charles Chan445659f2019-01-02 13:46:16 -0800201 xconnectStore = storageService.<XconnectKey, Set<XconnectEndpoint>>consistentMapBuilder()
Charles Chan8d316332018-06-19 20:31:57 -0700202 .withName("onos-sr-xconnect")
203 .withRelaxedReadConsistency()
204 .withSerializer(Serializer.using(serializer.build()))
205 .build();
pier6fd24fd2018-11-27 11:23:50 -0800206 xConnectExecutor = Executors.newSingleThreadScheduledExecutor(
207 groupedThreads("sr-xconnect-event", "%d", log));
208 xconnectStore.addListener(xconnectListener, xConnectExecutor);
Charles Chan8d316332018-06-19 20:31:57 -0700209
Charles Chan1fb65132018-09-21 11:29:12 -0700210 xconnectNextObjStore = storageService.<XconnectKey, Integer>consistentMapBuilder()
Charles Chan8d316332018-06-19 20:31:57 -0700211 .withName("onos-sr-xconnect-next")
212 .withRelaxedReadConsistency()
213 .withSerializer(Serializer.using(serializer.build()))
214 .build();
215
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530216 xconnectMulticastNextStore = storageService.<VlanNextObjectiveStoreKey, Integer>consistentMapBuilder()
217 .withName("onos-sr-xconnect-l2multicast-next")
218 .withSerializer(Serializer.using(serializer.build()))
219 .build();
220 xconnectMulticastPortsStore = storageService.<VlanNextObjectiveStoreKey, List<PortNumber>>consistentMapBuilder()
221 .withName("onos-sr-xconnect-l2multicast-ports")
222 .withSerializer(Serializer.using(serializer.build()))
223 .build();
224
Charles Chan56542b62018-08-07 12:48:36 -0700225 deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
226 groupedThreads("sr-xconnect-device-event", "%d", log));
Charles Chan8d316332018-06-19 20:31:57 -0700227 deviceService.addListener(deviceListener);
228
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530229 hostEventExecutor = Executors.newSingleThreadExecutor(
230 groupedThreads("sr-xconnect-host-event", "%d", log));
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530231 hostService.addListener(hostListener);
232
Charles Chanc0a499b2019-01-16 15:30:39 -0800233 portLoadBalancerCache = CacheBuilder.newBuilder()
pier567465b2018-11-24 11:16:28 -0800234 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
Charles Chanc0a499b2019-01-16 15:30:39 -0800235 .removalListener((RemovalNotification<PortLoadBalancerId, XconnectKey> notification) ->
236 log.debug("PortLoadBalancer cache removal event. portLoadBalancerId={}, xConnectKey={}",
pier567465b2018-11-24 11:16:28 -0800237 notification.getKey(), notification.getValue())).build();
Charles Chanc0a499b2019-01-16 15:30:39 -0800238 portLoadBalancerExecutor = newScheduledThreadPool(1,
239 groupedThreads("portLoadBalancerCacheWorker", "-%d", log));
pier567465b2018-11-24 11:16:28 -0800240 // Let's schedule the cleanup of the cache
Charles Chanc0a499b2019-01-16 15:30:39 -0800241 portLoadBalancerExecutor.scheduleAtFixedRate(portLoadBalancerCache::cleanUp, 0,
pier567465b2018-11-24 11:16:28 -0800242 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chanc0a499b2019-01-16 15:30:39 -0800243 portLoadBalancerService.addListener(portLoadBalancerListener);
pier567465b2018-11-24 11:16:28 -0800244
Charles Chan8d316332018-06-19 20:31:57 -0700245 log.info("Started");
246 }
247
248 @Deactivate
249 void deactivate() {
250 xconnectStore.removeListener(xconnectListener);
251 deviceService.removeListener(deviceListener);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530252 hostService.removeListener(hostListener);
pier98345be2019-04-01 23:38:42 -0700253 portLoadBalancerService.removeListener(portLoadBalancerListener);
Charles Chan8d316332018-06-19 20:31:57 -0700254 codecService.unregisterCodec(XconnectDesc.class);
255
Charles Chan56542b62018-08-07 12:48:36 -0700256 deviceEventExecutor.shutdown();
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530257 hostEventExecutor.shutdown();
pier6fd24fd2018-11-27 11:23:50 -0800258 xConnectExecutor.shutdown();
Charles Chanc0a499b2019-01-16 15:30:39 -0800259 portLoadBalancerExecutor.shutdown();
Charles Chan56542b62018-08-07 12:48:36 -0700260
Charles Chan8d316332018-06-19 20:31:57 -0700261 log.info("Stopped");
262 }
263
264 @Override
Charles Chan445659f2019-01-02 13:46:16 -0800265 public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<XconnectEndpoint> endpoints) {
266 log.info("Adding or updating xconnect. deviceId={}, vlanId={}, endpoints={}",
267 deviceId, vlanId, endpoints);
Charles Chan8d316332018-06-19 20:31:57 -0700268 final XconnectKey key = new XconnectKey(deviceId, vlanId);
Charles Chan445659f2019-01-02 13:46:16 -0800269 xconnectStore.put(key, endpoints);
Charles Chan8d316332018-06-19 20:31:57 -0700270 }
271
272 @Override
273 public void removeXonnect(DeviceId deviceId, VlanId vlanId) {
274 log.info("Removing xconnect. deviceId={}, vlanId={}",
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530275 deviceId, vlanId);
Charles Chan8d316332018-06-19 20:31:57 -0700276 final XconnectKey key = new XconnectKey(deviceId, vlanId);
277 xconnectStore.remove(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530278
279 // Cleanup multicasting support, if any.
Charles Chan445659f2019-01-02 13:46:16 -0800280 srService.getPairDeviceId(deviceId).ifPresent(pairDeviceId ->
281 cleanupL2MulticastRule(pairDeviceId, srService.getPairLocalPort(pairDeviceId).get(), vlanId, true)
282 );
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530283
Charles Chan8d316332018-06-19 20:31:57 -0700284 }
285
286 @Override
287 public Set<XconnectDesc> getXconnects() {
288 return xconnectStore.asJavaMap().entrySet().stream()
289 .map(e -> new XconnectDesc(e.getKey(), e.getValue()))
290 .collect(Collectors.toSet());
291 }
292
293 @Override
294 public boolean hasXconnect(ConnectPoint cp) {
Charles Chan445659f2019-01-02 13:46:16 -0800295 return getXconnects().stream().anyMatch(desc ->
296 desc.key().deviceId().equals(cp.deviceId()) && desc.endpoints().stream().anyMatch(ep ->
297 ep.type() == XconnectEndpoint.Type.PORT && ((XconnectPortEndpoint) ep).port().equals(cp.port())
298 )
Charles Chan8d316332018-06-19 20:31:57 -0700299 );
300 }
301
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700302 @Override
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530303 public List<VlanId> getXconnectVlans(DeviceId deviceId, PortNumber port) {
304 return getXconnects().stream()
Charles Chan445659f2019-01-02 13:46:16 -0800305 .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.endpoints().stream().anyMatch(ep ->
306 ep.type() == XconnectEndpoint.Type.PORT && ((XconnectPortEndpoint) ep).port().equals(port)))
307 .map(XconnectDesc::key)
308 .map(XconnectKey::vlanId)
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530309 .collect(Collectors.toList());
310 }
311
312 @Override
313 public boolean isXconnectVlan(DeviceId deviceId, VlanId vlanId) {
pier6fd24fd2018-11-27 11:23:50 -0800314 XconnectKey key = new XconnectKey(deviceId, vlanId);
315 return Versioned.valueOrNull(xconnectStore.get(key)) != null;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530316 }
317
318 @Override
Charles Chan1fb65132018-09-21 11:29:12 -0700319 public ImmutableMap<XconnectKey, Integer> getNext() {
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700320 if (xconnectNextObjStore != null) {
321 return ImmutableMap.copyOf(xconnectNextObjStore.asJavaMap());
322 } else {
323 return ImmutableMap.of();
324 }
325 }
326
327 @Override
pier6fd24fd2018-11-27 11:23:50 -0800328 public int getNextId(DeviceId deviceId, VlanId vlanId) {
329 return Versioned.valueOrElse(xconnectNextObjStore.get(new XconnectKey(deviceId, vlanId)), -1);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530330 }
331
332 @Override
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700333 public void removeNextId(int nextId) {
334 xconnectNextObjStore.entrySet().forEach(e -> {
Charles Chan1fb65132018-09-21 11:29:12 -0700335 if (e.getValue().value() == nextId) {
Charles Chan0b1dd7e2018-08-19 19:21:46 -0700336 xconnectNextObjStore.remove(e.getKey());
337 }
338 });
339 }
340
Charles Chan445659f2019-01-02 13:46:16 -0800341 private class XconnectMapListener implements MapEventListener<XconnectKey, Set<XconnectEndpoint>> {
Charles Chan8d316332018-06-19 20:31:57 -0700342 @Override
Charles Chan445659f2019-01-02 13:46:16 -0800343 public void event(MapEvent<XconnectKey, Set<XconnectEndpoint>> event) {
Charles Chan8d316332018-06-19 20:31:57 -0700344 XconnectKey key = event.key();
Charles Chan445659f2019-01-02 13:46:16 -0800345 Set<XconnectEndpoint> ports = Versioned.valueOrNull(event.newValue());
346 Set<XconnectEndpoint> oldPorts = Versioned.valueOrNull(event.oldValue());
Charles Chan8d316332018-06-19 20:31:57 -0700347
348 switch (event.type()) {
349 case INSERT:
Charles Chan48df9ad2018-10-30 18:08:59 -0700350 populateXConnect(key, ports);
Charles Chan8d316332018-06-19 20:31:57 -0700351 break;
352 case UPDATE:
Charles Chan48df9ad2018-10-30 18:08:59 -0700353 updateXConnect(key, oldPorts, ports);
Charles Chan8d316332018-06-19 20:31:57 -0700354 break;
355 case REMOVE:
Charles Chan48df9ad2018-10-30 18:08:59 -0700356 revokeXConnect(key, oldPorts);
Charles Chan8d316332018-06-19 20:31:57 -0700357 break;
358 default:
359 break;
360 }
361 }
362 }
363
364 private class InternalDeviceListener implements DeviceListener {
pier6fd24fd2018-11-27 11:23:50 -0800365 // Offload the execution to an executor and then process the event
366 // if this instance is the leader of the device
Charles Chan8d316332018-06-19 20:31:57 -0700367 @Override
368 public void event(DeviceEvent event) {
Charles Chan56542b62018-08-07 12:48:36 -0700369 deviceEventExecutor.execute(() -> {
370 DeviceId deviceId = event.subject().id();
pier6fd24fd2018-11-27 11:23:50 -0800371 // Just skip if we are not the leader
372 if (!isLocalLeader(deviceId)) {
373 log.debug("Not the leader of {}. Skip event {}", deviceId, event);
Charles Chan56542b62018-08-07 12:48:36 -0700374 return;
375 }
pier6fd24fd2018-11-27 11:23:50 -0800376 // Populate or revoke according to the device availability
377 if (deviceService.isAvailable(deviceId)) {
378 init(deviceId);
379 } else {
380 cleanup(deviceId);
Charles Chan56542b62018-08-07 12:48:36 -0700381 }
382 });
Charles Chan8d316332018-06-19 20:31:57 -0700383 }
pier6fd24fd2018-11-27 11:23:50 -0800384 // We want to manage only a subset of events and if we are the leader
385 @Override
386 public boolean isRelevant(DeviceEvent event) {
387 return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
388 event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
389 event.type() == DeviceEvent.Type.DEVICE_UPDATED;
390 }
Charles Chan8d316332018-06-19 20:31:57 -0700391 }
392
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530393 private class InternalHostListener implements HostListener {
394 @Override
395 public void event(HostEvent event) {
396 hostEventExecutor.execute(() -> {
397
398 switch (event.type()) {
Sudhir Kumar Mauryafcc42f82019-05-02 03:03:59 -0400399 case HOST_ADDED:
400 case HOST_REMOVED:
401 case HOST_UPDATED:
402 log.trace("Unhandled host event type: {} received. Ignoring.", event.type());
403 break;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530404 case HOST_MOVED:
405 log.trace("Processing host event {}", event);
406
407 Host host = event.subject();
408 Set<HostLocation> prevLocations = event.prevSubject().locations();
409 Set<HostLocation> newLocations = host.locations();
410
411 // Dual-home host port failure
412 // For each old location, in failed and paired devices update L2 vlan groups
413 Sets.difference(prevLocations, newLocations).forEach(prevLocation -> {
414
415 Optional<DeviceId> pairDeviceId = srService.getPairDeviceId(prevLocation.deviceId());
416 Optional<PortNumber> pairLocalPort = srService.getPairLocalPort(prevLocation.deviceId());
417
418 if (pairDeviceId.isPresent() && pairLocalPort.isPresent() && newLocations.stream()
419 .anyMatch(location -> location.deviceId().equals(pairDeviceId.get())) &&
420 hasXconnect(new ConnectPoint(prevLocation.deviceId(), prevLocation.port()))) {
421
422 List<VlanId> xconnectVlans = getXconnectVlans(prevLocation.deviceId(),
423 prevLocation.port());
424 xconnectVlans.forEach(xconnectVlan -> {
425 // Add single-home host into L2 multicast group at paired device side.
426 // Also append ACL rule to forward traffic from paired port to L2 multicast group.
427 newLocations.stream()
428 .filter(location -> location.deviceId().equals(pairDeviceId.get()))
429 .forEach(location -> populateL2Multicast(location.deviceId(),
430 srService.getPairLocalPort(
431 location.deviceId()).get(),
432 xconnectVlan,
433 Collections.singletonList(
434 location.port())));
pier6fd24fd2018-11-27 11:23:50 -0800435 // Ensure pair-port attached to xconnect vlan flooding group
436 // at dual home failed device.
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530437 updateL2Flooding(prevLocation.deviceId(), pairLocalPort.get(), xconnectVlan, true);
438 });
439 }
440 });
441
442 // Dual-home host port restoration
443 // For each new location, reverse xconnect loop prevention groups.
444 Sets.difference(newLocations, prevLocations).forEach(newLocation -> {
445 final Optional<DeviceId> pairDeviceId = srService.getPairDeviceId(newLocation.deviceId());
446 Optional<PortNumber> pairLocalPort = srService.getPairLocalPort(newLocation.deviceId());
447 if (pairDeviceId.isPresent() && pairLocalPort.isPresent() &&
448 hasXconnect((new ConnectPoint(newLocation.deviceId(), newLocation.port())))) {
449
450 List<VlanId> xconnectVlans = getXconnectVlans(newLocation.deviceId(),
451 newLocation.port());
452 xconnectVlans.forEach(xconnectVlan -> {
453 // Remove recovered dual homed port from vlan L2 multicast group
454 prevLocations.stream()
455 .filter(prevLocation -> prevLocation.deviceId().equals(pairDeviceId.get()))
pier6fd24fd2018-11-27 11:23:50 -0800456 .forEach(prevLocation -> revokeL2Multicast(
457 prevLocation.deviceId(),
458 xconnectVlan,
459 Collections.singletonList(newLocation.port()))
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530460 );
461
pier6fd24fd2018-11-27 11:23:50 -0800462 // Remove pair-port from vlan's flooding group at dual home
463 // restored device, if needed.
464 if (!hasAccessPortInMulticastGroup(new VlanNextObjectiveStoreKey(
465 newLocation.deviceId(), xconnectVlan), pairLocalPort.get())) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530466 updateL2Flooding(newLocation.deviceId(),
467 pairLocalPort.get(),
468 xconnectVlan,
469 false);
470
471 // Clean L2 multicast group at pair-device; also update store.
472 cleanupL2MulticastRule(pairDeviceId.get(),
473 srService.getPairLocalPort(pairDeviceId.get()).get(),
474 xconnectVlan,
475 false);
476 }
477 });
478 }
479 });
480 break;
481
482 default:
483 log.warn("Unsupported host event type: {} received. Ignoring.", event.type());
484 break;
485 }
486 });
487 }
488 }
489
Charles Chan1fb65132018-09-21 11:29:12 -0700490 private void init(DeviceId deviceId) {
Charles Chan8d316332018-06-19 20:31:57 -0700491 getXconnects().stream()
492 .filter(desc -> desc.key().deviceId().equals(deviceId))
Charles Chan445659f2019-01-02 13:46:16 -0800493 .forEach(desc -> populateXConnect(desc.key(), desc.endpoints()));
Charles Chan8d316332018-06-19 20:31:57 -0700494 }
495
Charles Chan1fb65132018-09-21 11:29:12 -0700496 private void cleanup(DeviceId deviceId) {
Charles Chan8d316332018-06-19 20:31:57 -0700497 xconnectNextObjStore.entrySet().stream()
498 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
499 .forEach(entry -> xconnectNextObjStore.remove(entry.getKey()));
500 log.debug("{} is removed from xConnectNextObjStore", deviceId);
501 }
502
503 /**
504 * Populates XConnect groups and flows for given key.
505 *
Charles Chan445659f2019-01-02 13:46:16 -0800506 * @param key XConnect key
507 * @param endpoints a set of endpoints to be cross-connected
Charles Chan8d316332018-06-19 20:31:57 -0700508 */
Charles Chan445659f2019-01-02 13:46:16 -0800509 private void populateXConnect(XconnectKey key, Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800510 if (!isLocalLeader(key.deviceId())) {
511 log.debug("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
Charles Chan8d316332018-06-19 20:31:57 -0700512 return;
513 }
514
Charles Chan445659f2019-01-02 13:46:16 -0800515 int nextId = populateNext(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700516 if (nextId == -1) {
517 log.warn("Fail to populateXConnect {}: {}", key, ERROR_NEXT_ID);
518 return;
519 }
Charles Chan445659f2019-01-02 13:46:16 -0800520 populateFilter(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700521 populateFwd(key, nextId);
Charles Chan8d316332018-06-19 20:31:57 -0700522 populateAcl(key);
523 }
524
525 /**
526 * Populates filtering objectives for given XConnect.
527 *
Charles Chan445659f2019-01-02 13:46:16 -0800528 * @param key XConnect store key
529 * @param endpoints XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700530 */
Charles Chan445659f2019-01-02 13:46:16 -0800531 private void populateFilter(XconnectKey key, Set<XconnectEndpoint> endpoints) {
Charles Chan48df9ad2018-10-30 18:08:59 -0700532 // FIXME Improve the logic
Charles Chanc0a499b2019-01-16 15:30:39 -0800533 // If port load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
Charles Chan48df9ad2018-10-30 18:08:59 -0700534 // The purpose is to make sure existing XConnect logic can still work on a configured port.
Charles Chan445659f2019-01-02 13:46:16 -0800535 boolean filtered = endpoints.stream()
536 .map(ep -> getNextTreatment(key.deviceId(), ep, false))
Charles Chan48df9ad2018-10-30 18:08:59 -0700537 .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
538
Charles Chan445659f2019-01-02 13:46:16 -0800539 endpoints.stream()
540 .map(ep -> getPhysicalPorts(key.deviceId(), ep))
Charles Chan48df9ad2018-10-30 18:08:59 -0700541 .flatMap(Set::stream).forEach(port -> {
Charles Chan445659f2019-01-02 13:46:16 -0800542 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
543 ObjectiveContext context = new DefaultObjectiveContext(
544 (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
545 key, port),
546 (objective, error) ->
547 log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
548 key, port, error));
549 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
550 });
Charles Chan8d316332018-06-19 20:31:57 -0700551 }
552
553 /**
554 * Populates next objectives for given XConnect.
555 *
Charles Chan445659f2019-01-02 13:46:16 -0800556 * @param key XConnect store key
557 * @param endpoints XConnect endpoints
Charles Chan48df9ad2018-10-30 18:08:59 -0700558 * @return next id
Charles Chan8d316332018-06-19 20:31:57 -0700559 */
Charles Chan445659f2019-01-02 13:46:16 -0800560 private int populateNext(XconnectKey key, Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800561 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
562 if (nextId != -1) {
Charles Chan1fb65132018-09-21 11:29:12 -0700563 log.debug("NextObj for {} found, id={}", key, nextId);
564 return nextId;
Charles Chan8d316332018-06-19 20:31:57 -0700565 } else {
Charles Chan445659f2019-01-02 13:46:16 -0800566 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700567 if (nextObjBuilder == null) {
568 log.warn("Fail to populate {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
569 return -1;
570 }
Charles Chan8d316332018-06-19 20:31:57 -0700571 ObjectiveContext nextContext = new DefaultObjectiveContext(
572 // To serialize this with kryo
573 (Serializable & Consumer<Objective>) (objective) ->
574 log.debug("XConnect NextObj for {} added", key),
Charles Chanfacfbef2018-08-23 14:30:33 -0700575 (Serializable & BiConsumer<Objective, ObjectiveError>) (objective, error) -> {
576 log.warn("Failed to add XConnect NextObj for {}: {}", key, error);
577 srService.invalidateNextObj(objective.id());
578 });
Charles Chan1fb65132018-09-21 11:29:12 -0700579 NextObjective nextObj = nextObjBuilder.add(nextContext);
Charles Chan8d316332018-06-19 20:31:57 -0700580 flowObjectiveService.next(key.deviceId(), nextObj);
Charles Chan1fb65132018-09-21 11:29:12 -0700581 xconnectNextObjStore.put(key, nextObj.id());
Charles Chan8d316332018-06-19 20:31:57 -0700582 log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
Charles Chan1fb65132018-09-21 11:29:12 -0700583 return nextObj.id();
Charles Chan8d316332018-06-19 20:31:57 -0700584 }
Charles Chan8d316332018-06-19 20:31:57 -0700585 }
586
587 /**
588 * Populates bridging forwarding objectives for given XConnect.
589 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530590 * @param key XConnect store key
Charles Chan1fb65132018-09-21 11:29:12 -0700591 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700592 */
Charles Chan1fb65132018-09-21 11:29:12 -0700593 private void populateFwd(XconnectKey key, int nextId) {
594 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextId);
Charles Chan8d316332018-06-19 20:31:57 -0700595 ObjectiveContext fwdContext = new DefaultObjectiveContext(
596 (objective) -> log.debug("XConnect FwdObj for {} populated", key),
597 (objective, error) ->
598 log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
599 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
600 }
601
602 /**
603 * Populates ACL forwarding objectives for given XConnect.
604 *
605 * @param key XConnect store key
606 */
607 private void populateAcl(XconnectKey key) {
608 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
609 ObjectiveContext aclContext = new DefaultObjectiveContext(
610 (objective) -> log.debug("XConnect AclObj for {} populated", key),
611 (objective, error) ->
612 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
613 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
614 }
615
616 /**
617 * Revokes XConnect groups and flows for given key.
618 *
Charles Chan445659f2019-01-02 13:46:16 -0800619 * @param key XConnect key
620 * @param endpoints XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700621 */
Charles Chan445659f2019-01-02 13:46:16 -0800622 private void revokeXConnect(XconnectKey key, Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800623 if (!isLocalLeader(key.deviceId())) {
624 log.debug("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
Charles Chan8d316332018-06-19 20:31:57 -0700625 return;
626 }
627
Charles Chan445659f2019-01-02 13:46:16 -0800628 revokeFilter(key, endpoints);
pier6fd24fd2018-11-27 11:23:50 -0800629 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
630 if (nextId != -1) {
Charles Chan1fb65132018-09-21 11:29:12 -0700631 revokeFwd(key, nextId, null);
Charles Chan445659f2019-01-02 13:46:16 -0800632 revokeNext(key, endpoints, nextId, null);
Charles Chan8d316332018-06-19 20:31:57 -0700633 } else {
634 log.warn("NextObj for {} does not exist in the store.", key);
635 }
Charles Chan445659f2019-01-02 13:46:16 -0800636 revokeFilter(key, endpoints);
Charles Chan8d316332018-06-19 20:31:57 -0700637 revokeAcl(key);
638 }
639
640 /**
641 * Revokes filtering objectives for given XConnect.
642 *
Charles Chan445659f2019-01-02 13:46:16 -0800643 * @param key XConnect store key
644 * @param endpoints XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700645 */
Charles Chan445659f2019-01-02 13:46:16 -0800646 private void revokeFilter(XconnectKey key, Set<XconnectEndpoint> endpoints) {
Charles Chan48df9ad2018-10-30 18:08:59 -0700647 // FIXME Improve the logic
Charles Chanc0a499b2019-01-16 15:30:39 -0800648 // If port load balancer is not involved, use filtered port. Otherwise, use unfiltered port.
Charles Chan48df9ad2018-10-30 18:08:59 -0700649 // The purpose is to make sure existing XConnect logic can still work on a configured port.
Charles Chan445659f2019-01-02 13:46:16 -0800650 boolean filtered = endpoints.stream()
651 .map(ep -> getNextTreatment(key.deviceId(), ep, false))
652 .allMatch(t -> t.type().equals(NextTreatment.Type.TREATMENT));
653
654 endpoints.stream()
655 .map(ep -> getPhysicalPorts(key.deviceId(), ep)).
656 flatMap(Set::stream).forEach(port -> {
657 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port, filtered);
658 ObjectiveContext context = new DefaultObjectiveContext(
659 (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
660 key, port),
661 (objective, error) ->
662 log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
663 key, port, error));
664 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
665 });
Charles Chan8d316332018-06-19 20:31:57 -0700666 }
667
668 /**
669 * Revokes next objectives for given XConnect.
670 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530671 * @param key XConnect store key
Charles Chan445659f2019-01-02 13:46:16 -0800672 * @param endpoints XConnect endpoints
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530673 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700674 * @param nextFuture completable future for this next objective operation
675 */
Charles Chan445659f2019-01-02 13:46:16 -0800676 private void revokeNext(XconnectKey key, Set<XconnectEndpoint> endpoints, int nextId,
Charles Chan8d316332018-06-19 20:31:57 -0700677 CompletableFuture<ObjectiveError> nextFuture) {
678 ObjectiveContext context = new ObjectiveContext() {
679 @Override
680 public void onSuccess(Objective objective) {
681 log.debug("Previous NextObj for {} removed", key);
682 if (nextFuture != null) {
683 nextFuture.complete(null);
684 }
685 }
686
687 @Override
688 public void onError(Objective objective, ObjectiveError error) {
689 log.warn("Failed to remove previous NextObj for {}: {}", key, error);
690 if (nextFuture != null) {
691 nextFuture.complete(error);
692 }
Charles Chanfacfbef2018-08-23 14:30:33 -0700693 srService.invalidateNextObj(objective.id());
Charles Chan8d316332018-06-19 20:31:57 -0700694 }
695 };
Charles Chan48df9ad2018-10-30 18:08:59 -0700696
Charles Chan445659f2019-01-02 13:46:16 -0800697 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, endpoints, nextId);
Charles Chan48df9ad2018-10-30 18:08:59 -0700698 if (nextObjBuilder == null) {
699 log.warn("Fail to revokeNext {}: {}", key, ERROR_NEXT_OBJ_BUILDER);
700 return;
701 }
Charles Chanc0a499b2019-01-16 15:30:39 -0800702 // Release the port load balancer if present
Charles Chan445659f2019-01-02 13:46:16 -0800703 endpoints.stream()
704 .filter(endpoint -> endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER)
705 .forEach(endpoint -> {
Charles Chanc0a499b2019-01-16 15:30:39 -0800706 String portLoadBalancerKey = String.valueOf(((XconnectLoadBalancerEndpoint) endpoint).key());
707 portLoadBalancerService.release(new PortLoadBalancerId(key.deviceId(),
708 Integer.parseInt(portLoadBalancerKey)), appId);
Charles Chan445659f2019-01-02 13:46:16 -0800709 });
Charles Chan48df9ad2018-10-30 18:08:59 -0700710 flowObjectiveService.next(key.deviceId(), nextObjBuilder.remove(context));
Charles Chan8d316332018-06-19 20:31:57 -0700711 xconnectNextObjStore.remove(key);
712 }
713
714 /**
715 * Revokes bridging forwarding objectives for given XConnect.
716 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530717 * @param key XConnect store key
718 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700719 * @param fwdFuture completable future for this forwarding objective operation
720 */
Charles Chan1fb65132018-09-21 11:29:12 -0700721 private void revokeFwd(XconnectKey key, int nextId, CompletableFuture<ObjectiveError> fwdFuture) {
722 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextId);
Charles Chan8d316332018-06-19 20:31:57 -0700723 ObjectiveContext context = new ObjectiveContext() {
724 @Override
725 public void onSuccess(Objective objective) {
726 log.debug("Previous FwdObj for {} removed", key);
727 if (fwdFuture != null) {
728 fwdFuture.complete(null);
729 }
730 }
731
732 @Override
733 public void onError(Objective objective, ObjectiveError error) {
734 log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
735 if (fwdFuture != null) {
736 fwdFuture.complete(error);
737 }
738 }
739 };
740 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.remove(context));
741 }
742
743 /**
744 * Revokes ACL forwarding objectives for given XConnect.
745 *
746 * @param key XConnect store key
747 */
748 private void revokeAcl(XconnectKey key) {
749 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
750 ObjectiveContext aclContext = new DefaultObjectiveContext(
751 (objective) -> log.debug("XConnect AclObj for {} populated", key),
752 (objective, error) ->
753 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
754 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.remove(aclContext));
755 }
756
757 /**
758 * Updates XConnect groups and flows for given key.
759 *
Charles Chan445659f2019-01-02 13:46:16 -0800760 * @param key XConnect key
761 * @param prevEndpoints previous XConnect endpoints
762 * @param endpoints new XConnect endpoints
Charles Chan8d316332018-06-19 20:31:57 -0700763 */
Charles Chan445659f2019-01-02 13:46:16 -0800764 private void updateXConnect(XconnectKey key, Set<XconnectEndpoint> prevEndpoints,
765 Set<XconnectEndpoint> endpoints) {
pier6fd24fd2018-11-27 11:23:50 -0800766 if (!isLocalLeader(key.deviceId())) {
767 log.debug("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
768 return;
769 }
Charles Chan8d316332018-06-19 20:31:57 -0700770 // NOTE: ACL flow doesn't include port information. No need to update it.
771 // Pair port is built-in and thus not going to change. No need to update it.
772
773 // remove old filter
Charles Chan445659f2019-01-02 13:46:16 -0800774 prevEndpoints.stream().filter(prevEndpoint -> !endpoints.contains(prevEndpoint)).forEach(prevEndpoint ->
775 revokeFilter(key, ImmutableSet.of(prevEndpoint)));
Charles Chan8d316332018-06-19 20:31:57 -0700776 // install new filter
Charles Chan445659f2019-01-02 13:46:16 -0800777 endpoints.stream().filter(endpoint -> !prevEndpoints.contains(endpoint)).forEach(endpoint ->
778 populateFilter(key, ImmutableSet.of(endpoint)));
Charles Chan8d316332018-06-19 20:31:57 -0700779
780 CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
781 CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
782
pier6fd24fd2018-11-27 11:23:50 -0800783 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
784 if (nextId != -1) {
Charles Chan1fb65132018-09-21 11:29:12 -0700785 revokeFwd(key, nextId, fwdFuture);
Charles Chan8d316332018-06-19 20:31:57 -0700786
787 fwdFuture.thenAcceptAsync(fwdStatus -> {
788 if (fwdStatus == null) {
789 log.debug("Fwd removed. Now remove group {}", key);
Charles Chan445659f2019-01-02 13:46:16 -0800790 revokeNext(key, prevEndpoints, nextId, nextFuture);
Charles Chan8d316332018-06-19 20:31:57 -0700791 }
792 });
793
794 nextFuture.thenAcceptAsync(nextStatus -> {
795 if (nextStatus == null) {
796 log.debug("Installing new group and flow for {}", key);
Charles Chan445659f2019-01-02 13:46:16 -0800797 int newNextId = populateNext(key, endpoints);
Charles Chan48df9ad2018-10-30 18:08:59 -0700798 if (newNextId == -1) {
799 log.warn("Fail to updateXConnect {}: {}", key, ERROR_NEXT_ID);
800 return;
801 }
802 populateFwd(key, newNextId);
Charles Chan8d316332018-06-19 20:31:57 -0700803 }
804 });
805 } else {
806 log.warn("NextObj for {} does not exist in the store.", key);
807 }
808 }
809
810 /**
Charles Chan1fb65132018-09-21 11:29:12 -0700811 * Creates a next objective builder for XConnect with given nextId.
Charles Chan8d316332018-06-19 20:31:57 -0700812 *
Charles Chan445659f2019-01-02 13:46:16 -0800813 * @param key XConnect key
814 * @param endpoints XConnect endpoints
Charles Chan48df9ad2018-10-30 18:08:59 -0700815 * @param nextId next objective id
Charles Chan8d316332018-06-19 20:31:57 -0700816 * @return next objective builder
817 */
Charles Chan445659f2019-01-02 13:46:16 -0800818 private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<XconnectEndpoint> endpoints, int nextId) {
Charles Chan8d316332018-06-19 20:31:57 -0700819 TrafficSelector metadata =
820 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
821 NextObjective.Builder nextObjBuilder = DefaultNextObjective
822 .builder().withId(nextId)
823 .withType(NextObjective.Type.BROADCAST).fromApp(appId)
824 .withMeta(metadata);
Charles Chan48df9ad2018-10-30 18:08:59 -0700825
Charles Chan445659f2019-01-02 13:46:16 -0800826 for (XconnectEndpoint endpoint : endpoints) {
827 NextTreatment nextTreatment = getNextTreatment(key.deviceId(), endpoint, true);
Charles Chan48df9ad2018-10-30 18:08:59 -0700828 if (nextTreatment == null) {
Charles Chanc0a499b2019-01-16 15:30:39 -0800829 // If a PortLoadBalancer is used in the XConnect - putting on hold
Charles Chan445659f2019-01-02 13:46:16 -0800830 if (endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER) {
Charles Chanc0a499b2019-01-16 15:30:39 -0800831 log.warn("Unable to create nextObj. PortLoadBalancer not ready");
832 String portLoadBalancerKey = String.valueOf(((XconnectLoadBalancerEndpoint) endpoint).key());
833 portLoadBalancerCache.asMap().putIfAbsent(new PortLoadBalancerId(key.deviceId(),
834 Integer.parseInt(portLoadBalancerKey)), key);
pier567465b2018-11-24 11:16:28 -0800835 } else {
836 log.warn("Unable to create nextObj. Null NextTreatment");
837 }
Charles Chan48df9ad2018-10-30 18:08:59 -0700838 return null;
839 }
840 nextObjBuilder.addTreatment(nextTreatment);
841 }
842
Charles Chan8d316332018-06-19 20:31:57 -0700843 return nextObjBuilder;
844 }
845
846 /**
Charles Chan1fb65132018-09-21 11:29:12 -0700847 * Creates a next objective builder for XConnect.
848 *
Charles Chan445659f2019-01-02 13:46:16 -0800849 * @param key XConnect key
850 * @param endpoints Xconnect endpoints
Charles Chan1fb65132018-09-21 11:29:12 -0700851 * @return next objective builder
852 */
Charles Chan445659f2019-01-02 13:46:16 -0800853 private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<XconnectEndpoint> endpoints) {
Charles Chan1fb65132018-09-21 11:29:12 -0700854 int nextId = flowObjectiveService.allocateNextId();
Charles Chan445659f2019-01-02 13:46:16 -0800855 return nextObjBuilder(key, endpoints, nextId);
Charles Chan1fb65132018-09-21 11:29:12 -0700856 }
857
858
859 /**
Charles Chan8d316332018-06-19 20:31:57 -0700860 * Creates a bridging forwarding objective builder for XConnect.
861 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530862 * @param key XConnect key
Charles Chan8d316332018-06-19 20:31:57 -0700863 * @param nextId next ID of the broadcast group for this XConnect key
864 * @return forwarding objective builder
865 */
866 private ForwardingObjective.Builder fwdObjBuilder(XconnectKey key, int nextId) {
867 /*
868 * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
869 * as the VLAN cross-connect broadcast rules
870 */
871 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
872 sbuilder.matchVlanId(key.vlanId());
873 sbuilder.matchEthDst(MacAddress.NONE);
874
875 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
876 fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
877 .withSelector(sbuilder.build())
878 .nextStep(nextId)
879 .withPriority(XCONNECT_PRIORITY)
880 .fromApp(appId)
881 .makePermanent();
882 return fob;
883 }
884
885 /**
886 * Creates an ACL forwarding objective builder for XConnect.
887 *
888 * @param vlanId cross connect VLAN id
889 * @return forwarding objective builder
890 */
891 private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
892 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
893 sbuilder.matchVlanId(vlanId);
894
895 TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
896
897 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
898 fob.withFlag(ForwardingObjective.Flag.VERSATILE)
899 .withSelector(sbuilder.build())
900 .withTreatment(tbuilder.build())
901 .withPriority(XCONNECT_ACL_PRIORITY)
902 .fromApp(appId)
903 .makePermanent();
904 return fob;
905 }
906
907 /**
908 * Creates a filtering objective builder for XConnect.
909 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530910 * @param key XConnect key
Charles Chan8d316332018-06-19 20:31:57 -0700911 * @param port XConnect ports
Charles Chan48df9ad2018-10-30 18:08:59 -0700912 * @param filtered true if this is a filtered port
Charles Chan8d316332018-06-19 20:31:57 -0700913 * @return next objective builder
914 */
Charles Chan48df9ad2018-10-30 18:08:59 -0700915 private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port, boolean filtered) {
Charles Chan8d316332018-06-19 20:31:57 -0700916 FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
917 fob.withKey(Criteria.matchInPort(port))
Charles Chan8d316332018-06-19 20:31:57 -0700918 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
919 .withPriority(XCONNECT_PRIORITY);
Charles Chan48df9ad2018-10-30 18:08:59 -0700920 if (filtered) {
921 fob.addCondition(Criteria.matchVlanId(key.vlanId()));
922 } else {
923 fob.addCondition(Criteria.matchVlanId(VlanId.ANY));
924 }
Charles Chan8d316332018-06-19 20:31:57 -0700925 return fob.permit().fromApp(appId);
926 }
927
928 /**
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530929 * Updates L2 flooding groups; add pair link into L2 flooding group of given xconnect vlan.
Charles Chan8d316332018-06-19 20:31:57 -0700930 *
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530931 * @param deviceId Device ID
932 * @param port Port details
933 * @param vlanId VLAN ID
934 * @param install Whether to add or revoke pair link addition to flooding group
Charles Chan8d316332018-06-19 20:31:57 -0700935 */
pier6fd24fd2018-11-27 11:23:50 -0800936 private void updateL2Flooding(DeviceId deviceId, PortNumber port, VlanId vlanId, boolean install) {
937 XconnectKey key = new XconnectKey(deviceId, vlanId);
938 // Ensure leadership on device
939 if (!isLocalLeader(deviceId)) {
940 log.debug("Abort updating L2Flood {}: {}", key, ERROR_NOT_LEADER);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530941 return;
Charles Chan8d316332018-06-19 20:31:57 -0700942 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530943
944 // Locate L2 flooding group details for given xconnect vlan
pier6fd24fd2018-11-27 11:23:50 -0800945 int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530946 if (nextId == -1) {
947 log.debug("XConnect vlan {} broadcast group for device {} doesn't exists. " +
948 "Aborting pair group linking.", vlanId, deviceId);
949 return;
950 }
951
952 // Add pairing-port group to flooding group
953 TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
954 // treatment.popVlan();
955 treatment.setOutput(port);
956 ObjectiveContext context = new DefaultObjectiveContext(
957 (objective) ->
958 log.debug("Pair port added/removed to vlan {} next objective {} on {}",
959 vlanId, nextId, deviceId),
960 (objective, error) ->
961 log.warn("Failed adding/removing pair port to vlan {} next objective {} on {}." +
962 "Error : {}", vlanId, nextId, deviceId, error)
963 );
964 NextObjective.Builder vlanNextObjectiveBuilder = DefaultNextObjective.builder()
965 .withId(nextId)
966 .withType(NextObjective.Type.BROADCAST)
967 .fromApp(srService.appId())
968 .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId).build())
969 .addTreatment(treatment.build());
970 if (install) {
971 flowObjectiveService.next(deviceId, vlanNextObjectiveBuilder.addToExisting(context));
972 } else {
973 flowObjectiveService.next(deviceId, vlanNextObjectiveBuilder.removeFromExisting(context));
974 }
975 log.debug("Submitted next objective {} for vlan: {} in device {}",
976 nextId, vlanId, deviceId);
Charles Chan8d316332018-06-19 20:31:57 -0700977 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530978
979 /**
980 * Populate L2 multicast rule on given deviceId that matches given mac, given vlan and
981 * output to given port's L2 mulitcast group.
982 *
983 * @param deviceId Device ID
984 * @param pairPort Pair port number
985 * @param vlanId VLAN ID
986 * @param accessPorts List of access ports to be added into L2 multicast group
987 */
pier6fd24fd2018-11-27 11:23:50 -0800988 private void populateL2Multicast(DeviceId deviceId, PortNumber pairPort,
989 VlanId vlanId, List<PortNumber> accessPorts) {
990 // Ensure enough rights to program pair device
991 if (!srService.shouldProgram(deviceId)) {
992 log.debug("Abort populate L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
993 return;
994 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530995
996 boolean multicastGroupExists = true;
997 int vlanMulticastNextId;
pier6fd24fd2018-11-27 11:23:50 -0800998 VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +0530999
1000 // Step 1 : Populate single homed access ports into vlan's L2 multicast group
1001 NextObjective.Builder vlanMulticastNextObjBuilder = DefaultNextObjective
1002 .builder()
1003 .withType(NextObjective.Type.BROADCAST)
1004 .fromApp(srService.appId())
1005 .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId)
1006 .matchEthDst(MacAddress.IPV4_MULTICAST).build());
pier6fd24fd2018-11-27 11:23:50 -08001007 vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301008 if (vlanMulticastNextId == -1) {
1009 // Vlan's L2 multicast group doesn't exist; create it, update store and add pair port as sub-group
1010 multicastGroupExists = false;
1011 vlanMulticastNextId = flowObjectiveService.allocateNextId();
pier6fd24fd2018-11-27 11:23:50 -08001012 addMulticastGroupNextObjectiveId(key, vlanMulticastNextId);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301013 vlanMulticastNextObjBuilder.addTreatment(
psneha94e1d302019-07-01 05:34:23 -04001014 DefaultTrafficTreatment.builder().setOutput(pairPort).build()
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301015 );
1016 }
1017 vlanMulticastNextObjBuilder.withId(vlanMulticastNextId);
pier6fd24fd2018-11-27 11:23:50 -08001018 int nextId = vlanMulticastNextId;
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301019 accessPorts.forEach(p -> {
1020 TrafficTreatment.Builder egressAction = DefaultTrafficTreatment.builder();
1021 // Do vlan popup action based on interface configuration
1022 if (interfaceService.getInterfacesByPort(new ConnectPoint(deviceId, p))
1023 .stream().noneMatch(i -> i.vlanTagged().contains(vlanId))) {
1024 egressAction.popVlan();
1025 }
1026 egressAction.setOutput(p);
1027 vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
pier6fd24fd2018-11-27 11:23:50 -08001028 addMulticastGroupPort(key, p);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301029 });
1030 ObjectiveContext context = new DefaultObjectiveContext(
1031 (objective) ->
1032 log.debug("L2 multicast group installed/updated. "
1033 + "NextObject Id {} on {} for subnet {} ",
1034 nextId, deviceId, vlanId),
1035 (objective, error) ->
1036 log.warn("L2 multicast group failed to install/update. "
1037 + " NextObject Id {} on {} for subnet {} : {}",
1038 nextId, deviceId, vlanId, error)
1039 );
1040 if (!multicastGroupExists) {
1041 flowObjectiveService.next(deviceId, vlanMulticastNextObjBuilder.add(context));
1042
1043 // Step 2 : Populate ACL rule; selector = vlan + pair-port, output = vlan L2 multicast group
1044 TrafficSelector.Builder multicastSelector = DefaultTrafficSelector.builder();
1045 multicastSelector.matchEthType(Ethernet.TYPE_VLAN);
1046 multicastSelector.matchInPort(pairPort);
1047 multicastSelector.matchVlanId(vlanId);
1048 ForwardingObjective.Builder vlanMulticastForwardingObj = DefaultForwardingObjective.builder()
1049 .withFlag(ForwardingObjective.Flag.VERSATILE)
1050 .nextStep(vlanMulticastNextId)
1051 .withSelector(multicastSelector.build())
1052 .withPriority(100)
1053 .fromApp(srService.appId())
1054 .makePermanent();
1055 context = new DefaultObjectiveContext(
1056 (objective) -> log.debug("L2 multicasting versatile rule for device {}, port/vlan {}/{} populated",
1057 deviceId,
1058 pairPort,
1059 vlanId),
1060 (objective, error) -> log.warn("Failed to populate L2 multicasting versatile rule for device {}, " +
1061 "ports/vlan {}/{}: {}", deviceId, pairPort, vlanId, error));
1062 flowObjectiveService.forward(deviceId, vlanMulticastForwardingObj.add(context));
1063 } else {
1064 // L2_MULTICAST & BROADCAST are similar structure in subgroups; so going with BROADCAST type.
1065 vlanMulticastNextObjBuilder.withType(NextObjective.Type.BROADCAST);
1066 flowObjectiveService.next(deviceId, vlanMulticastNextObjBuilder.addToExisting(context));
1067 }
1068 }
1069
1070 /**
1071 * Removes access ports from VLAN L2 multicast group on given deviceId.
1072 *
1073 * @param deviceId Device ID
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301074 * @param vlanId VLAN ID
1075 * @param accessPorts List of access ports to be added into L2 multicast group
1076 */
pier6fd24fd2018-11-27 11:23:50 -08001077 private void revokeL2Multicast(DeviceId deviceId, VlanId vlanId, List<PortNumber> accessPorts) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301078 // Ensure enough rights to program pair device
1079 if (!srService.shouldProgram(deviceId)) {
pier6fd24fd2018-11-27 11:23:50 -08001080 log.debug("Abort revoke L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301081 return;
1082 }
1083
pier6fd24fd2018-11-27 11:23:50 -08001084 VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
1085
1086 int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301087 if (vlanMulticastNextId == -1) {
1088 return;
1089 }
1090 NextObjective.Builder vlanMulticastNextObjBuilder = DefaultNextObjective
1091 .builder()
1092 .withType(NextObjective.Type.BROADCAST)
1093 .fromApp(srService.appId())
1094 .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId).build())
1095 .withId(vlanMulticastNextId);
1096 accessPorts.forEach(p -> {
1097 TrafficTreatment.Builder egressAction = DefaultTrafficTreatment.builder();
1098 // Do vlan popup action based on interface configuration
1099 if (interfaceService.getInterfacesByPort(new ConnectPoint(deviceId, p))
1100 .stream().noneMatch(i -> i.vlanTagged().contains(vlanId))) {
1101 egressAction.popVlan();
1102 }
1103 egressAction.setOutput(p);
1104 vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
pier6fd24fd2018-11-27 11:23:50 -08001105 removeMulticastGroupPort(key, p);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301106 });
1107 ObjectiveContext context = new DefaultObjectiveContext(
1108 (objective) ->
1109 log.debug("L2 multicast group installed/updated. "
1110 + "NextObject Id {} on {} for subnet {} ",
1111 vlanMulticastNextId, deviceId, vlanId),
1112 (objective, error) ->
1113 log.warn("L2 multicast group failed to install/update. "
1114 + " NextObject Id {} on {} for subnet {} : {}",
1115 vlanMulticastNextId, deviceId, vlanId, error)
1116 );
1117 flowObjectiveService.next(deviceId, vlanMulticastNextObjBuilder.removeFromExisting(context));
1118 }
1119
1120 /**
1121 * Cleans up VLAN L2 multicast group on given deviceId. ACL rules for the group will also be deleted.
1122 * Normally multicast group is not removed if it contains access ports; which can be forced
1123 * by "force" flag
1124 *
1125 * @param deviceId Device ID
1126 * @param pairPort Pair port number
1127 * @param vlanId VLAN ID
1128 * @param force Forceful removal
1129 */
1130 private void cleanupL2MulticastRule(DeviceId deviceId, PortNumber pairPort, VlanId vlanId, boolean force) {
1131
1132 // Ensure enough rights to program pair device
1133 if (!srService.shouldProgram(deviceId)) {
pier6fd24fd2018-11-27 11:23:50 -08001134 log.debug("Abort cleanup L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301135 return;
1136 }
1137
pier6fd24fd2018-11-27 11:23:50 -08001138 VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
1139
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301140 // Ensure L2 multicast group doesn't contain access ports
pier6fd24fd2018-11-27 11:23:50 -08001141 if (hasAccessPortInMulticastGroup(key, pairPort) && !force) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301142 return;
1143 }
1144
1145 // Load L2 multicast group details
pier6fd24fd2018-11-27 11:23:50 -08001146 int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301147 if (vlanMulticastNextId == -1) {
1148 return;
1149 }
1150
1151 // Step 1 : Clear ACL rule; selector = vlan + pair-port, output = vlan L2 multicast group
1152 TrafficSelector.Builder l2MulticastSelector = DefaultTrafficSelector.builder();
1153 l2MulticastSelector.matchEthType(Ethernet.TYPE_VLAN);
1154 l2MulticastSelector.matchInPort(pairPort);
1155 l2MulticastSelector.matchVlanId(vlanId);
1156 ForwardingObjective.Builder vlanMulticastForwardingObj = DefaultForwardingObjective.builder()
1157 .withFlag(ForwardingObjective.Flag.VERSATILE)
1158 .nextStep(vlanMulticastNextId)
1159 .withSelector(l2MulticastSelector.build())
1160 .withPriority(100)
1161 .fromApp(srService.appId())
1162 .makePermanent();
1163 ObjectiveContext context = new DefaultObjectiveContext(
1164 (objective) -> log.debug("L2 multicasting rule for device {}, port/vlan {}/{} deleted", deviceId,
1165 pairPort, vlanId),
1166 (objective, error) -> log.warn("Failed to delete L2 multicasting rule for device {}, " +
1167 "ports/vlan {}/{}: {}", deviceId, pairPort, vlanId, error));
1168 flowObjectiveService.forward(deviceId, vlanMulticastForwardingObj.remove(context));
1169
1170 // Step 2 : Clear L2 multicast group associated with vlan
1171 NextObjective.Builder l2MulticastGroupBuilder = DefaultNextObjective
1172 .builder()
1173 .withId(vlanMulticastNextId)
1174 .withType(NextObjective.Type.BROADCAST)
1175 .fromApp(srService.appId())
1176 .withMeta(DefaultTrafficSelector.builder()
1177 .matchVlanId(vlanId)
1178 .matchEthDst(MacAddress.IPV4_MULTICAST).build())
1179 .addTreatment(DefaultTrafficTreatment.builder().popVlan().setOutput(pairPort).build());
1180 context = new DefaultObjectiveContext(
1181 (objective) ->
1182 log.debug("L2 multicast group with NextObject Id {} deleted on {} for subnet {} ",
1183 vlanMulticastNextId, deviceId, vlanId),
1184 (objective, error) ->
1185 log.warn("L2 multicast group with NextObject Id {} failed to delete on {} for subnet {} : {}",
1186 vlanMulticastNextId, deviceId, vlanId, error)
1187 );
1188 flowObjectiveService.next(deviceId, l2MulticastGroupBuilder.remove(context));
1189
1190 // Finally clear store.
pier6fd24fd2018-11-27 11:23:50 -08001191 removeMulticastGroup(key);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301192 }
1193
pier6fd24fd2018-11-27 11:23:50 -08001194 private int getMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key) {
1195 return Versioned.valueOrElse(xconnectMulticastNextStore.get(key), -1);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301196 }
1197
pier6fd24fd2018-11-27 11:23:50 -08001198 private void addMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key, int nextId) {
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301199 if (nextId == -1) {
1200 return;
1201 }
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301202 xconnectMulticastNextStore.put(key, nextId);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301203 }
1204
pier6fd24fd2018-11-27 11:23:50 -08001205 private void addMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
1206 xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
1207 if (ports == null) {
1208 ports = Lists.newArrayList();
1209 }
1210 ports.add(port);
1211 return ports;
1212 });
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301213 }
1214
pier6fd24fd2018-11-27 11:23:50 -08001215 private void removeMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
1216 xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
1217 if (ports != null && !ports.isEmpty()) {
1218 ports.remove(port);
1219 }
1220 return ports;
1221 });
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301222 }
1223
pier6fd24fd2018-11-27 11:23:50 -08001224 private void removeMulticastGroup(VlanNextObjectiveStoreKey groupKey) {
1225 xconnectMulticastPortsStore.remove(groupKey);
1226 xconnectMulticastNextStore.remove(groupKey);
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301227 }
1228
pier6fd24fd2018-11-27 11:23:50 -08001229 private boolean hasAccessPortInMulticastGroup(VlanNextObjectiveStoreKey groupKey, PortNumber pairPort) {
1230 List<PortNumber> ports = Versioned.valueOrElse(xconnectMulticastPortsStore.get(groupKey), ImmutableList.of());
jayakumarthazhath66c9ec12018-10-01 00:51:54 +05301231 return ports.stream().anyMatch(p -> !p.equals(pairPort));
1232 }
1233
pier6fd24fd2018-11-27 11:23:50 -08001234 // Custom-built function, when the device is not available we need a fallback mechanism
1235 private boolean isLocalLeader(DeviceId deviceId) {
1236 if (!mastershipService.isLocalMaster(deviceId)) {
1237 // When the device is available we just check the mastership
1238 if (deviceService.isAvailable(deviceId)) {
1239 return false;
1240 }
1241 // Fallback with Leadership service - device id is used as topic
1242 NodeId leader = leadershipService.runForLeadership(
1243 deviceId.toString()).leaderNodeId();
1244 // Verify if this node is the leader
1245 return clusterService.getLocalNode().id().equals(leader);
1246 }
1247 return true;
1248 }
1249
Charles Chan445659f2019-01-02 13:46:16 -08001250 private Set<PortNumber> getPhysicalPorts(DeviceId deviceId, XconnectEndpoint endpoint) {
1251 if (endpoint.type() == XconnectEndpoint.Type.PORT) {
1252 PortNumber port = ((XconnectPortEndpoint) endpoint).port();
1253 return Sets.newHashSet(port);
Charles Chan48df9ad2018-10-30 18:08:59 -07001254 }
Charles Chan445659f2019-01-02 13:46:16 -08001255 if (endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER) {
Charles Chanc0a499b2019-01-16 15:30:39 -08001256 PortLoadBalancerId portLoadBalancerId = new PortLoadBalancerId(deviceId,
1257 ((XconnectLoadBalancerEndpoint) endpoint).key());
1258 Set<PortNumber> ports = portLoadBalancerService.getPortLoadBalancer(portLoadBalancerId).ports();
Charles Chan445659f2019-01-02 13:46:16 -08001259 return Sets.newHashSet(ports);
Charles Chan48df9ad2018-10-30 18:08:59 -07001260 }
Charles Chan48df9ad2018-10-30 18:08:59 -07001261 return Sets.newHashSet();
1262 }
1263
Charles Chan445659f2019-01-02 13:46:16 -08001264 private NextTreatment getNextTreatment(DeviceId deviceId, XconnectEndpoint endpoint, boolean reserve) {
1265 if (endpoint.type() == XconnectEndpoint.Type.PORT) {
1266 PortNumber port = ((XconnectPortEndpoint) endpoint).port();
1267 return DefaultNextTreatment.of(DefaultTrafficTreatment.builder().setOutput(port).build());
Charles Chan48df9ad2018-10-30 18:08:59 -07001268 }
Charles Chan445659f2019-01-02 13:46:16 -08001269 if (endpoint.type() == XconnectEndpoint.Type.LOAD_BALANCER) {
Charles Chanc0a499b2019-01-16 15:30:39 -08001270 PortLoadBalancerId portLoadBalancerId = new PortLoadBalancerId(deviceId,
1271 ((XconnectLoadBalancerEndpoint) endpoint).key());
1272 NextTreatment idNextTreatment = IdNextTreatment.of(portLoadBalancerService
1273 .getPortLoadBalancerNext(portLoadBalancerId));
pier567465b2018-11-24 11:16:28 -08001274 // Reserve only one time during next objective creation
1275 if (reserve) {
Charles Chanc0a499b2019-01-16 15:30:39 -08001276 if (!portLoadBalancerService.reserve(portLoadBalancerId, appId)) {
1277 log.warn("Reservation failed for {}", portLoadBalancerId);
pier567465b2018-11-24 11:16:28 -08001278 idNextTreatment = null;
1279 }
1280 }
1281 return idNextTreatment;
Charles Chan48df9ad2018-10-30 18:08:59 -07001282 }
Charles Chan48df9ad2018-10-30 18:08:59 -07001283 return null;
1284 }
pier567465b2018-11-24 11:16:28 -08001285
Charles Chanc0a499b2019-01-16 15:30:39 -08001286 private class InternalPortLoadBalancerListener implements PortLoadBalancerListener {
1287 // Populate xconnect once portloadbalancer is available
pier567465b2018-11-24 11:16:28 -08001288 @Override
Charles Chanc0a499b2019-01-16 15:30:39 -08001289 public void event(PortLoadBalancerEvent event) {
1290 portLoadBalancerExecutor.execute(() -> dequeue(event.subject().portLoadBalancerId()));
pier567465b2018-11-24 11:16:28 -08001291 }
Charles Chanc0a499b2019-01-16 15:30:39 -08001292 // When we receive INSTALLED port load balancing is ready
pier567465b2018-11-24 11:16:28 -08001293 @Override
Charles Chanc0a499b2019-01-16 15:30:39 -08001294 public boolean isRelevant(PortLoadBalancerEvent event) {
1295 return event.type() == PortLoadBalancerEvent.Type.INSTALLED;
pier567465b2018-11-24 11:16:28 -08001296 }
1297 }
1298
1299 // Invalidate the cache and re-start the xconnect installation
Charles Chanc0a499b2019-01-16 15:30:39 -08001300 private void dequeue(PortLoadBalancerId portLoadBalancerId) {
1301 XconnectKey xconnectKey = portLoadBalancerCache.getIfPresent(portLoadBalancerId);
pier567465b2018-11-24 11:16:28 -08001302 if (xconnectKey == null) {
Charles Chanc0a499b2019-01-16 15:30:39 -08001303 log.trace("{} not present in the cache", portLoadBalancerId);
pier567465b2018-11-24 11:16:28 -08001304 return;
1305 }
Charles Chanc0a499b2019-01-16 15:30:39 -08001306 log.debug("Dequeue {}", portLoadBalancerId);
1307 portLoadBalancerCache.invalidate(portLoadBalancerId);
Charles Chan445659f2019-01-02 13:46:16 -08001308 Set<XconnectEndpoint> endpoints = Versioned.valueOrNull(xconnectStore.get(xconnectKey));
1309 if (endpoints == null || endpoints.isEmpty()) {
1310 log.warn("Endpoints not found for XConnect {}", xconnectKey);
pier567465b2018-11-24 11:16:28 -08001311 return;
1312 }
Charles Chan445659f2019-01-02 13:46:16 -08001313 populateXConnect(xconnectKey, endpoints);
Charles Chanc0a499b2019-01-16 15:30:39 -08001314 log.trace("PortLoadBalancer cache size {}", portLoadBalancerCache.size());
pier567465b2018-11-24 11:16:28 -08001315 }
1316
Charles Chan8d316332018-06-19 20:31:57 -07001317}