ONOS-1823 and ONOS-1838:Segment Routing Multi-instance Support-1
Change-Id: I3cc848415a609a9c4001d135e51104c62fb2830d
diff --git a/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 9ace6b8..87282b4 100644
--- a/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -22,23 +22,22 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.Event;
import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
import org.onosproject.segmentrouting.grouphandler.NeighborSet;
+import org.onosproject.segmentrouting.grouphandler.NeighborSetNextObjectiveStoreKey;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
-import org.onosproject.net.MastershipRole;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flowobjective.FlowObjectiveService;
-import org.onosproject.net.group.Group;
-import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.IntentService;
@@ -51,9 +50,16 @@
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.segmentrouting.config.NetworkConfigManager;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.service.WallclockClockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
+import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -112,6 +118,11 @@
private static ScheduledFuture<?> eventHandlerFuture = null;
private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<DeviceId, DefaultGroupHandler>();
+ // Per device next objective ID store with (device id + neighbor set) as key
+ private EventuallyConsistentMap<NeighborSetNextObjectiveStoreKey,
+ Integer> nsNextObjStore = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
private NetworkConfigManager networkConfigService = new NetworkConfigManager();;
@@ -119,10 +130,34 @@
private static int numOfHandlerExecution = 0;
private static int numOfHandlerScheduled = 0;
+ private KryoNamespace.Builder kryoBuilder = null;
+
@Activate
protected void activate() {
appId = coreService
.registerApplication("org.onosproject.segmentrouting");
+
+ kryoBuilder = new KryoNamespace.Builder()
+ .register(NeighborSetNextObjectiveStoreKey.class,
+ NeighborSet.class,
+ DeviceId.class,
+ URI.class,
+ WallClockTimestamp.class,
+ org.onosproject.cluster.NodeId.class,
+ HashSet.class
+ );
+
+ log.debug("Creating EC map nsnextobjectivestore");
+ EventuallyConsistentMapBuilder<NeighborSetNextObjectiveStoreKey, Integer>
+ nsNextObjMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ nsNextObjStore = nsNextObjMapBuilder
+ .withName("nsnextobjectivestore")
+ .withSerializer(kryoBuilder)
+ .withClockService(new WallclockClockManager<>())
+ .build();
+ log.trace("Current size {}", nsNextObjStore.size());
+
networkConfigService.init();
deviceConfiguration = new DeviceConfiguration(networkConfigService);
arpHandler = new ArpHandler(this);
@@ -136,20 +171,18 @@
deviceService.addListener(new InternalDeviceListener());
for (Device device : deviceService.getDevices()) {
- if (mastershipService.getLocalRole(device.id()) == MastershipRole.MASTER) {
- DefaultGroupHandler groupHandler = DefaultGroupHandler
- .createGroupHandler(device.id(), appId,
- deviceConfiguration, linkService,
- flowObjectiveService);
- groupHandlerMap.put(device.id(), groupHandler);
- defaultRoutingHandler.populateTtpRules(device.id());
- log.debug("Initiating default group handling for {}", device.id());
- } else {
- log.debug("Activate: Local role {} "
- + "is not MASTER for device {}",
- mastershipService.getLocalRole(device.id()),
- device.id());
- }
+ //Irrespective whether the local is a MASTER or not for this device,
+ //create group handler instance and push default TTP flow rules.
+ //Because in a multi-instance setup, instances can initiate
+ //groups for any devices. Also the default TTP rules are needed
+ //to be pushed before inserting any IP table entries for any device
+ DefaultGroupHandler groupHandler = DefaultGroupHandler
+ .createGroupHandler(device.id(), appId,
+ deviceConfiguration, linkService,
+ flowObjectiveService,
+ nsNextObjStore);
+ groupHandlerMap.put(device.id(), groupHandler);
+ defaultRoutingHandler.populateTtpRules(device.id());
}
defaultRoutingHandler.startPopulationProcess();
@@ -180,8 +213,14 @@
public int getNextObjectiveId(DeviceId deviceId, NeighborSet ns) {
- return (groupHandlerMap.get(deviceId) != null) ? groupHandlerMap
- .get(deviceId).getNextObjectiveId(ns) : -1;
+ if (groupHandlerMap.get(deviceId) != null) {
+ log.trace("getNextObjectiveId query in device {}", deviceId);
+ return groupHandlerMap
+ .get(deviceId).getNextObjectiveId(ns);
+ } else {
+ log.warn("getNextObjectiveId query in device {} not found", deviceId);
+ return -1;
+ }
}
private class InternalPacketProcessor implements PacketProcessor {
@@ -224,12 +263,12 @@
@Override
public void event(DeviceEvent event) {
- if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
+ /*if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
log.debug("Local role {} is not MASTER for device {}",
mastershipService.getLocalRole(event.subject().id()),
event.subject().id());
return;
- }
+ }*/
switch (event.type()) {
case DEVICE_ADDED:
@@ -245,12 +284,14 @@
private void scheduleEventHandlerIfNotScheduled(Event event) {
- eventQueue.add(event);
- numOfEvents++;
- if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
- eventHandlerFuture = executorService
- .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
- numOfHandlerScheduled++;
+ synchronized (eventQueue) {
+ eventQueue.add(event);
+ numOfEvents++;
+ if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
+ eventHandlerFuture = executorService
+ .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
+ numOfHandlerScheduled++;
+ }
}
log.trace("numOfEvents {}, numOfEventHanlderScheduled {}", numOfEvents,
@@ -262,44 +303,68 @@
@Override
public void run() {
- numOfHandlerExecution++;
- while (!eventQueue.isEmpty()) {
- Event event = eventQueue.poll();
- if (event.type() == LinkEvent.Type.LINK_ADDED) {
- processLinkAdded((Link) event.subject());
- } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
- processLinkRemoved((Link) event.subject());
- } else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
- processGroupAdded((Group) event.subject());
- } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
- event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
- event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
- if (deviceService.isAvailable(((Device) event.subject()).id())) {
- processDeviceAdded((Device) event.subject());
+ try {
+ synchronized (eventQueue) {
+ numOfHandlerExecution++;
+ while (!eventQueue.isEmpty()) {
+ Event event = eventQueue.poll();
+ if (event.type() == LinkEvent.Type.LINK_ADDED) {
+ processLinkAdded((Link) event.subject());
+ } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
+ processLinkRemoved((Link) event.subject());
+ //} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+ // processGroupAdded((Group) event.subject());
+ } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+ event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+ event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
+ if (deviceService.isAvailable(((Device) event.subject()).id())) {
+ processDeviceAdded((Device) event.subject());
+ }
+ } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
+ processPortRemoved((Device) event.subject(),
+ ((DeviceEvent) event).port());
+ } else {
+ log.warn("Unhandled event type: {}", event.type());
+ }
}
- } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
- processPortRemoved((Device) event.subject(),
- ((DeviceEvent) event).port());
- } else {
- log.warn("Unhandled event type: {}", event.type());
}
+ log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
+ numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
+ } catch (Exception e) {
+ log.error("SegmentRouting event handler "
+ + "thread thrown an exception: {}", e);
}
- log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
- numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
}
}
private void processLinkAdded(Link link) {
log.debug("A new link {} was added", link.toString());
- if (mastershipService.getLocalRole(link.src().deviceId()) == MastershipRole.MASTER) {
- DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
- .deviceId());
- if (groupHandler != null) {
+ //Irrespective whether the local is a MASTER or not for this device,
+ //create group handler instance and push default TTP flow rules.
+ //Because in a multi-instance setup, instances can initiate
+ //groups for any devices. Also the default TTP rules are needed
+ //to be pushed before inserting any IP table entries for any device
+ DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
+ .deviceId());
+ if (groupHandler != null) {
+ groupHandler.linkUp(link);
+ } else {
+ Device device = deviceService.getDevice(link.src().deviceId());
+ if (device != null) {
+ log.warn("processLinkAdded: Link Added "
+ + "Notification without Device Added "
+ + "event, still handling it");
+ processDeviceAdded(device);
+ groupHandler = groupHandlerMap.get(link.src()
+ .deviceId());
groupHandler.linkUp(link);
}
}
- defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
+
+ //defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
+ log.trace("processLinkAdded: re-starting route population process");
+ defaultRoutingHandler.startPopulationProcess();
}
private void processLinkRemoved(Link link) {
@@ -308,20 +373,27 @@
if (groupHandler != null) {
groupHandler.portDown(link.src().port());
}
- defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
- }
-
- private void processGroupAdded(Group group) {
- log.debug("A new group with ID {} was added", group.id());
- defaultRoutingHandler.resumePopulationProcess();
+ //defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
+ log.trace("processLinkRemoved: re-starting route population process");
+ defaultRoutingHandler.startPopulationProcess();
}
private void processDeviceAdded(Device device) {
log.debug("A new device with ID {} was added", device.id());
- defaultRoutingHandler.populateTtpRules(device.id());
- DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device
- .id(), appId, deviceConfiguration, linkService, flowObjectiveService);
+ //Irrespective whether the local is a MASTER or not for this device,
+ //create group handler instance and push default TTP flow rules.
+ //Because in a multi-instance setup, instances can initiate
+ //groups for any devices. Also the default TTP rules are needed
+ //to be pushed before inserting any IP table entries for any device
+ DefaultGroupHandler dgh = DefaultGroupHandler.
+ createGroupHandler(device.id(),
+ appId,
+ deviceConfiguration,
+ linkService,
+ flowObjectiveService,
+ nsNextObjStore);
groupHandlerMap.put(device.id(), dgh);
+ defaultRoutingHandler.populateTtpRules(device.id());
}
private void processPortRemoved(Device device, Port port) {