[ONOS-6211] Top node delete and accumulator addition.
Change-Id: Icad1e6919569fc7dc1de4d08b86f912e1a9d88a5
diff --git a/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java b/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java
index 6121d7b..ca16eca 100644
--- a/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java
+++ b/apps/l3vpn/netl3vpn/src/main/java/org/onosproject/l3vpn/netl3vpn/impl/NetL3vpnManager.java
@@ -20,6 +20,8 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
@@ -28,6 +30,8 @@
import org.onosproject.config.DynamicConfigEvent;
import org.onosproject.config.DynamicConfigListener;
import org.onosproject.config.DynamicConfigService;
+import org.onosproject.config.FailedException;
+import org.onosproject.config.Filter;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
@@ -57,6 +61,7 @@
import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.accessvpnpolicy.VpnAttachment;
import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.accessvpnpolicy.vpnattachment.AttachmentFlavor;
import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.accessvpnpolicy.vpnattachment.attachmentflavor.DefaultVpnId;
+import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.l3vpnsvc.DefaultSites;
import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.l3vpnsvc.Sites;
import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.l3vpnsvc.VpnServices;
import org.onosproject.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.l3vpn.svc.rev20160730.ietfl3vpnsvc.l3vpnsvc.sites.Site;
@@ -99,28 +104,33 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Timer;
-import static org.onlab.util.Tools.groupedThreads;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.config.DynamicConfigEvent.Type.NODE_ADDED;
import static org.onosproject.config.DynamicConfigEvent.Type.NODE_DELETED;
import static org.onosproject.l3vpn.netl3vpn.VpnType.HUB;
import static org.onosproject.l3vpn.netl3vpn.impl.BgpConstructionUtil.createBgpInfo;
import static org.onosproject.l3vpn.netl3vpn.impl.InsConstructionUtil.createInstance;
+import static org.onosproject.l3vpn.netl3vpn.impl.InsConstructionUtil.deleteInstance;
import static org.onosproject.l3vpn.netl3vpn.impl.IntConstructionUtil.createInterface;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.BEARER_NULL;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.CONS_HUNDRED;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.DEVICE_INFO_NULL;
-import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.EVENT_NOT_SUPPORTED;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.EVENT_NULL;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.ID_LIMIT;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.ID_LIMIT_EXCEEDED;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.INT_INFO_NULL;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.IP;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.IP_INT_INFO_NULL;
-import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.MG_MT_ADD;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.MAX_BATCH_MS;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.MAX_EVENTS;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.MAX_IDLE_MS;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.PORT_NAME;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.SITE_ROLE_NULL;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.SITE_VPN_MISMATCH;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.TIMER;
+import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.UNKNOWN_EVENT;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.VPN_ATTACHMENT_NULL;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.VPN_POLICY_NOT_SUPPORTED;
import static org.onosproject.l3vpn.netl3vpn.impl.NetL3VpnUtil.VPN_TYPE_UNSUPPORTED;
@@ -144,11 +154,18 @@
private static final String APP_ID = "org.onosproject.app.l3vpn";
private static final String L3_VPN_ID_TOPIC = "l3vpn-id";
- private static final String ONOS_NET_L3VPN = "onos/netl3vpn";
- private static final String EVENT_HANDLER = "event-handler-%d";
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final DynamicConfigListener configListener =
+ new InternalConfigListener();
+
+ private final Accumulator<DynamicConfigEvent> accumulator =
+ new InternalEventAccumulator();
+
+ private final InternalLeadershipListener leadershipEventListener =
+ new InternalLeadershipListener();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -177,23 +194,19 @@
protected ClusterService clusterService;
protected IdGenerator l3VpnIdGen;
+
private NodeId localNodeId;
+
private ApplicationId appId;
- private final DynamicConfigListener configListener = new
- InternalConfigListener();
-
- private final InternalLeadershipListener leadershipEventListener =
- new InternalLeadershipListener();
-
private ModelRegistrationParam regParam;
- private ExecutorService executor = Executors.newSingleThreadExecutor(
- groupedThreads(ONOS_NET_L3VPN, EVENT_HANDLER, log));
-
private ResourceId id;
+
private ResourceId module;
+
private ResourceId sites;
+
private boolean isElectedLeader = false;
@Activate
@@ -307,9 +320,7 @@
DefaultModelObjectData.Builder data = DefaultModelObjectData.builder()
.identifier(modelId);
ResourceData resData = modelConverter.createDataNode(data.build());
- //TODO: change this when yang runtime manager implements model
- // converter api.
- return null;
+ return resData.resourceId();
}
/**
@@ -317,14 +328,26 @@
* The root object is then used for l3VPN processing.
*
* @param storeId store resource id
+ * @param node data node
*/
- private void processCreateFromStore(ResourceId storeId) {
+ private void processCreateFromStore(ResourceId storeId, DataNode node) {
if (isElectedLeader) {
- List<ModelObject> objects = getModelObjects(storeId, module);
- for (ModelObject obj : objects) {
- if (obj instanceof DefaultL3VpnSvc) {
- DefaultL3VpnSvc l3VpnSvc = (DefaultL3VpnSvc) obj;
- createGlobalConfig(l3VpnSvc);
+ List<NodeKey> keys = storeId.nodeKeys();
+ List<ModelObject> objects = null;
+ if (keys.size() == 1) {
+ objects = getModelObjects(node, module);
+ } else if (keys.size() == 2) {
+ objects = getModelObjects(node, id);
+ }
+ if (objects != null) {
+ for (ModelObject obj : objects) {
+ if (obj instanceof DefaultL3VpnSvc) {
+ DefaultL3VpnSvc l3VpnSvc = (DefaultL3VpnSvc) obj;
+ createGlobalConfig(l3VpnSvc);
+ } else if (obj instanceof DefaultSites) {
+ DefaultSites sites = (DefaultSites) obj;
+ createInterfaceConfig(sites);
+ }
}
}
}
@@ -332,29 +355,31 @@
/**
* Processes delete request from the store, by taking the root object.
- * The root object is then used for l3VPN processing.
+ * The root object would have got deleted from store. So all the
+ * configurations are removed.
*
- * @param storeId store resource id
+ * @param dataNode data node
*/
- private void processDeleteFromStore(ResourceId storeId) {
+ private void processDeleteFromStore(DataNode dataNode) {
if (isElectedLeader) {
- //TODO: add delete logic here.
+ if (dataNode == null) {
+ //TODO: Delete for inner nodes.
+ deleteGlobalConfig(null);
+ }
}
}
/**
- * Returns model objects of the store. The data node is read from the
- * config store. This returns the resource id's node. So the node's
- * parent resource id is taken and the data node is given to model
- * converter.
+ * Returns model objects of the store. The data node read from store
+ * gives the particular node. So the node's parent resource id is taken
+ * and the data node is given to model converter.
*
- * @param storeId store resource id
- * @param appId parent resource id
+ * @param dataNode data node from store
+ * @param appId parent resource id
* @return model objects
*/
- public List<ModelObject> getModelObjects(ResourceId storeId,
+ public List<ModelObject> getModelObjects(DataNode dataNode,
ResourceId appId) {
- DataNode dataNode = configService.readNode(storeId, null);
ResourceData data = getResourceData(dataNode, appId);
ModelObjectData modelData = modelConverter.createModel(data);
return modelData.modelObjects();
@@ -371,18 +396,25 @@
ResourceId rsId = event.subject();
List<NodeKey> storeKeys = rsId.nodeKeys();
List<NodeKey> regKeys = id.nodeKeys();
+ List<NodeKey> sitesKeys = sites.nodeKeys();
if (storeKeys != null) {
- if (storeKeys.size() == 1) {
+ int storeSize = storeKeys.size();
+ if (storeSize == 1) {
return storeKeys.get(0).equals(regKeys.get(1)) &&
(event.type() == NODE_ADDED ||
event.type() == NODE_DELETED);
+ } else if (storeSize == 2) {
+ return (storeKeys.get(0).equals(sitesKeys.get(1))) &&
+ storeKeys.get(1).equals(sitesKeys.get(2)) &&
+ (event.type() == NODE_ADDED ||
+ event.type() == NODE_DELETED);
}
}
return false;
}
/***
- * Creation of all configuration in standard device model.
+ * Creates all configuration in the standard device model.
*
* @param l3VpnSvc l3VPN service object
*/
@@ -475,9 +507,7 @@
throw new NetL3VpnException(SITE_ROLE_NULL);
}
VpnType role = getRole(vpnId.siteRole());
- VpnSiteRole vpnRole = new VpnSiteRole(
- String.valueOf(vpnId.vpnId()), role);
- return vpnRole;
+ return new VpnSiteRole(String.valueOf(vpnId.vpnId()), role);
}
/**
@@ -593,7 +623,7 @@
*/
public DeviceId getId(String ip) {
for (Device device : deviceService.getAvailableDevices()) {
- String val = device.annotations().value(MG_MT_ADD);
+ String val = device.annotations().value(IP);
if (ip.equals(val)) {
return device.id();
}
@@ -877,6 +907,123 @@
}
/**
+ * Creates all configuration in the standard device model.
+ *
+ * @param l3VpnSvc l3 VPN service
+ */
+ void deleteGlobalConfig(L3VpnSvc l3VpnSvc) {
+ deleteGlobalVpn(l3VpnSvc);
+ //TODO: Site and access deletion needs to be added.
+ }
+
+ /**
+ * Deletes the global VPN from the device model and delete from the device.
+ *
+ * @param l3VpnSvc L3 VPN service
+ */
+ private void deleteGlobalVpn(L3VpnSvc l3VpnSvc) {
+ Map<String, VpnInstance> insMap = l3VpnStore.getVpnInstances();
+ //TODO: check for VPN delete deleting interface from store.
+ if (l3VpnSvc == null || l3VpnSvc.vpnServices() == null ||
+ l3VpnSvc.vpnServices().vpnSvc() == null) {
+ for (Map.Entry<String, VpnInstance> vpnMap : insMap.entrySet()) {
+ deleteVpnInstance(vpnMap.getValue(), false);
+ }
+ return;
+ }
+ List<VpnSvc> vpnList = l3VpnSvc.vpnServices().vpnSvc();
+ for (Map.Entry<String, VpnInstance> vpnMap : insMap.entrySet()) {
+ boolean isPresent = isVpnPresent(vpnMap.getKey(), vpnList);
+ if (!isPresent) {
+ deleteVpnInstance(vpnMap.getValue(), false);
+ }
+ }
+ }
+
+ /**
+ * Returns true if the VPN in the distributed map is also present in the
+ * service; false otherwise.
+ *
+ * @param vpnName VPN name from store
+ * @param vpnList VPN list from service
+ * @return true if VPN available; false otherwise
+ */
+ private boolean isVpnPresent(String vpnName, List<VpnSvc> vpnList) {
+ for (VpnSvc svc : vpnList) {
+ if (svc.vpnId().string().equals(vpnName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Deletes the VPN instance by constructing standard device model of
+ * instances.
+ *
+ * @param instance VPN instance
+ * @param isIntDeleted if interface already removed.
+ */
+ private void deleteVpnInstance(VpnInstance instance, boolean isIntDeleted) {
+ Map<DeviceId, DeviceInfo> devices = instance.devInfo();
+ for (Map.Entry<DeviceId, DeviceInfo> device : devices.entrySet()) {
+ Map<AccessInfo, InterfaceInfo> intMap =
+ l3VpnStore.getInterfaceInfo();
+ NetworkInstances ins = deleteInstance(instance.vpnName());
+ DeviceInfo dev = device.getValue();
+
+ ModelObjectData devMod = getVpnCreateModObj(
+ intMap, ins, dev.deviceId().toString());
+ ModelObjectData driMod = dev.processDeleteInstance(driverService,
+ devMod);
+ ResourceData resData = modelConverter.createDataNode(driMod);
+ deleteFromStore(resData);
+ if (!isIntDeleted) {
+ //TODO: Remove from store.
+ remInterfaceFromMap(dev);
+ }
+ }
+ l3VpnStore.removeVpnInstance(instance.vpnName());
+ }
+
+ /**
+ * Deletes the resource data that is received from the driver, after
+ * converting from the model object data.
+ *
+ * @param resData resource data
+ */
+ private void deleteFromStore(ResourceData resData) {
+ if (resData != null) {
+ configService.deleteNode(resData.resourceId());
+ }
+ }
+
+ /**
+ * Removes the interface from the app distributed map, if the driver
+ * interfaces are already removed from the store.
+ *
+ * @param deviceInfo device info
+ */
+ private void remInterfaceFromMap(DeviceInfo deviceInfo) {
+ List<AccessInfo> accesses = deviceInfo.accesses();
+ for (AccessInfo access : accesses) {
+ l3VpnStore.removeInterfaceInfo(access);
+ }
+ deviceInfo.ifNames(null);
+ deviceInfo.accesses(null);
+ }
+
+ /**
+ * Signals that the leadership has changed.
+ *
+ * @param isLeader true if this instance is now the leader, otherwise false
+ */
+ private void leaderChanged(boolean isLeader) {
+ log.debug("Leader changed: {}", isLeader);
+ isElectedLeader = isLeader;
+ }
+
+ /**
* Representation of internal listener, listening for dynamic config event.
*/
private class InternalConfigListener implements DynamicConfigListener {
@@ -888,36 +1035,50 @@
@Override
public void event(DynamicConfigEvent event) {
- executor.execute(() -> {
- try {
- ResourceId rsId = event.subject();
- switch (event.type()) {
- case NODE_ADDED:
- processCreateFromStore(rsId);
- break;
-
- case NODE_DELETED:
- //TODO: To be committed.
- break;
-
- default:
- throw new NetL3VpnException(EVENT_NOT_SUPPORTED);
- }
- } catch (Exception e) {
- log.warn("Failed to process {}", event, e);
- }
- });
+ accumulator.add(event);
}
}
/**
- * Signals that the leadership has changed.
- *
- * @param isLeader true if this instance is now the leader, otherwise false
+ * Accumulates events to allow processing after a desired number of
+ * events were accumulated.
*/
- private void leaderChanged(boolean isLeader) {
- log.debug("Leader changed: {}", isLeader);
- isElectedLeader = isLeader;
+ private class InternalEventAccumulator extends
+ AbstractAccumulator<DynamicConfigEvent> {
+
+ /**
+ * Constructs the event accumulator with timer and event limit.
+ */
+ protected InternalEventAccumulator() {
+ super(new Timer(TIMER), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+ }
+
+ @Override
+ public void processItems(List<DynamicConfigEvent> events) {
+ for (DynamicConfigEvent event : events) {
+ checkNotNull(event, EVENT_NULL);
+ Filter filter = new Filter();
+ DataNode node;
+ try {
+ node = configService.readNode(event.subject(), filter);
+ } catch (FailedException e) {
+ node = null;
+ }
+ switch (event.type()) {
+ case NODE_ADDED:
+ processCreateFromStore(event.subject(), node);
+ break;
+
+ case NODE_DELETED:
+ processDeleteFromStore(node);
+ break;
+
+ default:
+ log.warn(UNKNOWN_EVENT, event.type());
+ break;
+ }
+ }
+ }
}
/**