Adding Gluon App code to support EVPN
Change-Id: I24aee104482a166e412e5e7b42ea1dbf518d51c6
diff --git a/apps/gluon/src/main/java/org/onosproject/gluon/rsc/GluonServer.java b/apps/gluon/src/main/java/org/onosproject/gluon/rsc/GluonServer.java
new file mode 100755
index 0000000..f4f400e
--- /dev/null
+++ b/apps/gluon/src/main/java/org/onosproject/gluon/rsc/GluonServer.java
@@ -0,0 +1,537 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.gluon.rsc;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.util.EntityUtils;
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onosproject.net.config.NetworkConfigService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.gluon.manager.GluonManager.addServer;
+import static org.onosproject.gluon.manager.GluonManager.deleteServer;
+import static org.onosproject.gluon.manager.GluonManager.getAllServersIP;
+import static org.onosproject.gluon.rsc.GluonConstants.ACTION_DEL;
+import static org.onosproject.gluon.rsc.GluonConstants.ACTION_GET;
+import static org.onosproject.gluon.rsc.GluonConstants.ACTION_SET;
+import static org.onosproject.gluon.rsc.GluonConstants.ACTIVE_SERVER;
+import static org.onosproject.gluon.rsc.GluonConstants.BATCH_PROCESSING;
+import static org.onosproject.gluon.rsc.GluonConstants.BATCH_QUERING;
+import static org.onosproject.gluon.rsc.GluonConstants.BATCH_RECEIVED;
+import static org.onosproject.gluon.rsc.GluonConstants.BATCH_SERVICE_STATUS;
+import static org.onosproject.gluon.rsc.GluonConstants.BATCH_STOPPED;
+import static org.onosproject.gluon.rsc.GluonConstants.DATA_REMOVED;
+import static org.onosproject.gluon.rsc.GluonConstants.DATA_UPDATED;
+import static org.onosproject.gluon.rsc.GluonConstants.E_BATCH_PROCESSING;
+import static org.onosproject.gluon.rsc.GluonConstants.E_BATCH_PROCESSING_URL;
+import static org.onosproject.gluon.rsc.GluonConstants.E_CLIENT_STOP;
+import static org.onosproject.gluon.rsc.GluonConstants.E_REAL_TIME_PROCESSING;
+import static org.onosproject.gluon.rsc.GluonConstants.E_SUBKEYS_PROCESSING;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_ACTION;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_CREATE_INDEX;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_KEY;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_MOD_INDEX;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_NODE;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_NODES;
+import static org.onosproject.gluon.rsc.GluonConstants.GLUON_VALUE;
+import static org.onosproject.gluon.rsc.GluonConstants.INVALID_ACTION;
+import static org.onosproject.gluon.rsc.GluonConstants.KEYS;
+import static org.onosproject.gluon.rsc.GluonConstants.MODE_START;
+import static org.onosproject.gluon.rsc.GluonConstants.MODE_STOP;
+import static org.onosproject.gluon.rsc.GluonConstants.NO_SERVER_AVAIL;
+import static org.onosproject.gluon.rsc.GluonConstants.NO_SUBKEYS_AVAIL;
+import static org.onosproject.gluon.rsc.GluonConstants.PROCESSING_FAILED;
+import static org.onosproject.gluon.rsc.GluonConstants.PROTON;
+import static org.onosproject.gluon.rsc.GluonConstants.REAL_TIME_PROCESSING;
+import static org.onosproject.gluon.rsc.GluonConstants.REAL_TIME_RECEIVED;
+import static org.onosproject.gluon.rsc.GluonConstants.REAL_TIME_SERVICE_STATUS;
+import static org.onosproject.gluon.rsc.GluonConstants.SERVER_RUNNING;
+import static org.onosproject.gluon.rsc.GluonConstants.SERVER_STOPPED;
+import static org.onosproject.gluon.rsc.GluonConstants.STATUS_CODE;
+import static org.onosproject.gluon.rsc.GluonConstants.SUBKEYS_RECEIVED;
+
+
+public class GluonServer {
+
+ private static String protonKeyUri;
+ private static String serverUri;
+
+ private static CloseableHttpAsyncClient httpClient;
+
+ //store gluon server supported subkeys
+ private List<String> subKeys = new LinkedList<>();
+
+ // Lists of gluon servers
+ public Map<String, GluonServer> serverMap = getAllServersIP();
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ // Real time executor thread
+ private final ExecutorService executorRealTimeService = Executors
+ .newSingleThreadExecutor(groupedThreads("EtcdRealTimeMonitor",
+ "executor-%d", log));
+ // Batch executor thread
+ private final ExecutorService executorBatchService = Executors
+ .newSingleThreadExecutor(groupedThreads("EtcdBatchMonitor",
+ "executor-%d", log));
+
+ // Statistics counter
+ private int setCount = 0;
+ private int delCount = 0;
+ private int getCount = 0;
+ // Server etcd version
+ public String version;
+
+ /**
+ * To get Gluon server running version, needs to create at-least one object.
+ */
+ public GluonServer() {
+ }
+
+ /**
+ * Realising server functionality.
+ *
+ * @param etcduri server url
+ * @param targetProtonKey server key type, default net-l3vpn
+ * @param mode server mode start or stop
+ * @param version running server version
+ */
+ public GluonServer(String etcduri, String targetProtonKey,
+ String mode, String version) {
+ this.version = version;
+
+ switch (mode) {
+ // Handling stop mode
+ case MODE_STOP:
+ // return if server is not available into the server list
+ if (!serverMap.containsKey(etcduri)) {
+ log.debug(NO_SERVER_AVAIL);
+ return;
+ }
+ try {
+ // stop batch service executor thread
+ log.debug(BATCH_SERVICE_STATUS,
+ executorBatchService.isShutdown());
+ executorBatchService.shutdown();
+ // stop real time service executor thread
+ log.debug(REAL_TIME_SERVICE_STATUS,
+ executorRealTimeService.isShutdown());
+ executorRealTimeService.shutdown();
+ // closing http client
+ httpClient.close();
+ } catch (IOException io) {
+ log.error(E_CLIENT_STOP, io.getMessage());
+ }
+ // deletes server from gluon server list
+ deleteServer(etcduri);
+ log.debug(SERVER_STOPPED);
+ return;
+ // Handling start mode
+ case MODE_START:
+ if (serverMap.containsKey(etcduri)) {
+ //Returns user CLI if server is already running
+ // and logs all server info into log files
+ log.info(SERVER_RUNNING);
+ log.debug(ACTIVE_SERVER, serverMap.size());
+ return;
+ }
+ // Store gluon manager object and gluon server url
+ addServer(etcduri, this);
+ // Preparing server uri
+ serverUri = etcduri + "/v2" + KEYS;
+ // Preparing server subkeys uri
+ protonKeyUri = PROTON + targetProtonKey;
+ // Starts http client
+ RequestConfig requestConfig = RequestConfig.custom().build();
+ httpClient = HttpAsyncClients.custom()
+ .setDefaultRequestConfig(requestConfig).build();
+ httpClient.start();
+
+ // Start thread to handle and process RealTime data
+ handleRealTimeData(null);
+
+ // Start thread to handle and process batch data,
+ // iff subkeys are available
+ getAllProtonSubkeys(serverUri + protonKeyUri);
+ if (getProtonSubkeys().isEmpty()) {
+ log.debug(NO_SUBKEYS_AVAIL);
+ return;
+ }
+ // handle RealTime data
+ handleBatchData(0);
+ return;
+ default:
+ log.debug(INVALID_ACTION);
+
+ }
+ }
+
+ /**
+ * Handles real time data which is received from Gluon server.
+ *
+ * @param index, It will be used in recursive call of
+ * real time monitoring method.
+ * modified index receive from GluonConfig config file
+ */
+ private void handleRealTimeData(Long index) {
+ String realTimeUri = serverUri + protonKeyUri +
+ "/?wait=true&recursive=true";
+ if (index != null) {
+ realTimeUri += "&waitIndex=" + index;
+ }
+ HttpGet request = new HttpGet(URI.create(realTimeUri));
+ log.info(REAL_TIME_PROCESSING, realTimeUri);
+ // Starts real time executor thread
+ executorRealTimeService.execute(new Runnable() {
+ public void run() {
+ try {
+ httpClient.execute(
+ request, new FutureCallback<HttpResponse>() {
+
+ @Override
+ public void completed(HttpResponse result) {
+ StatusLine statusLine =
+ result.getStatusLine();
+ int statusCode = statusLine.getStatusCode();
+ if (statusCode ==
+ STATUS_CODE &&
+ result.getEntity() != null) {
+ try {
+ String json = EntityUtils
+ .toString(result.getEntity());
+ GluonConfig response =
+ processRealTimeResponse(json);
+ // Recursive call to handle
+ // real time data
+ handleRealTimeData(
+ response.modifiedIndex + 1);
+ } catch (IOException e) {
+ failed(e);
+ }
+ } else {
+ log.error(E_REAL_TIME_PROCESSING);
+ }
+ }
+
+ @Override
+ public void cancelled() {
+ log.debug("Nothing to do with " +
+ "this overridden method");
+ }
+
+ @Override
+ public void failed(Exception e) {
+ log.error(E_REAL_TIME_PROCESSING,
+ e.getMessage());
+ }
+ });
+ } catch (Exception e) {
+ log.error(E_REAL_TIME_PROCESSING, e.getMessage());
+ }
+ }
+ });
+ }
+
+
+ /**
+ * Handles batch data which is received from Gluon server.
+ *
+ * @param subKeyIndex gets all proton subkey value
+ */
+ private void handleBatchData(int subKeyIndex) {
+ String currBatchUri = serverUri + getProtonSubkeys().get(subKeyIndex);
+ HttpGet request = new HttpGet(URI.create(currBatchUri));
+
+ if (0 == subKeyIndex) {
+ log.debug(BATCH_PROCESSING, protonKeyUri);
+ }
+ log.info(BATCH_QUERING, currBatchUri);
+ // Starts batch executor thread
+ executorBatchService.execute(new Runnable() {
+ public void run() {
+ try {
+ httpClient.execute(request, new FutureCallback<HttpResponse>() {
+ @Override
+ public void completed(HttpResponse result) {
+ StatusLine statusLine = result.getStatusLine();
+ int statusCode = statusLine.getStatusCode();
+ if (statusCode == STATUS_CODE &&
+ result.getEntity() != null) {
+ try {
+ String json = EntityUtils
+ .toString(result.getEntity());
+ processBatchResponse(json);
+ // Stop batch executor thread
+ // once all gluon server subkeys processed
+ if (subKeyIndex ==
+ ((getProtonSubkeys().size()) - 1)) {
+ cancelled();
+ return;
+ }
+
+ handleBatchData(subKeyIndex + 1);
+ } catch (IOException e) {
+ failed(e);
+ }
+ } else {
+ log.error(E_BATCH_PROCESSING_URL, currBatchUri);
+ }
+ }
+
+ @Override
+ public void cancelled() {
+ executorBatchService.shutdown();
+ log.debug(BATCH_STOPPED, protonKeyUri);
+ }
+
+ @Override
+ public void failed(Exception e) {
+ log.error(E_BATCH_PROCESSING, e.getMessage());
+ }
+ });
+ } catch (Exception e) {
+ log.error(E_BATCH_PROCESSING, e.getMessage());
+ }
+ }
+ });
+ }
+
+ /**
+ * Parse and process real time json data which is received from Gluon server.
+ *
+ * @param result real time json data
+ * @return GluonConfig response
+ */
+ public GluonConfig processRealTimeResponse(String result) {
+ ObjectMapper mapper = new ObjectMapper();
+ GluonConfig response = null;
+ try {
+ log.info(REAL_TIME_RECEIVED, result);
+ JsonNode jsonNode = mapper.readTree(result);
+ String action = jsonNode.get(GLUON_ACTION).asText();
+ String key = jsonNode.get(GLUON_NODE).get(GLUON_KEY).asText();
+ long mIndex = jsonNode.get(GLUON_NODE)
+ .get(GLUON_MOD_INDEX).asLong();
+ long cIndex = jsonNode.get(GLUON_NODE)
+ .get(GLUON_CREATE_INDEX).asLong();
+ if (action.equals(ACTION_SET)) {
+ String value = jsonNode.get(GLUON_NODE)
+ .get(GLUON_VALUE).asText();
+ JsonNode modifyValue = mapper.readTree(value.replace("\\", ""));
+ response = new GluonConfig(action, key, modifyValue, mIndex,
+ cIndex);
+ setCount++;
+ } else if (action.equals(ACTION_DEL)) {
+ response = new GluonConfig(action, key, null, mIndex, cIndex);
+ delCount++;
+ } else {
+ log.debug(INVALID_ACTION);
+ }
+ } catch (IOException e) {
+ log.error(E_REAL_TIME_PROCESSING, e.getMessage());
+ }
+ processEtcdResponse(response);
+ return response;
+ }
+
+ /**
+ * Parse and process batch json data which is received from Gluon server.
+ *
+ * @param result batch json data
+ * @return GluonConfig response
+ */
+ public GluonConfig processBatchResponse(String result) {
+ ObjectMapper mapper = new ObjectMapper();
+ GluonConfig response = null;
+ try {
+ log.debug(BATCH_RECEIVED, result);
+ JsonNode jsonNode = mapper.readTree(result);
+ log.info("JSON NODE VALUE ARE: {}", jsonNode);
+ String action = jsonNode.get(GLUON_ACTION).asText();
+ JsonNode nodes = jsonNode.get(GLUON_NODE).get(GLUON_NODES);
+ if (null != nodes) {
+ for (JsonNode confNode : nodes) {
+ String key = confNode.get(GLUON_KEY).asText();
+ long mIndex = confNode.get(GLUON_MOD_INDEX).asLong();
+ long cIndex = confNode.get(GLUON_CREATE_INDEX).asLong();
+ String value = confNode.get(GLUON_VALUE).asText();
+ log.info("JSON NODE VALUE ARE 2: {}", value);
+ JsonNode modifyValue = mapper.readTree(value.replace("\\", ""));
+ log.info("JSON NODE MODIFY VALUE ARE 2: {}", modifyValue);
+ response = new GluonConfig(action, key,
+ modifyValue, mIndex, cIndex);
+ getCount++;
+ processEtcdResponse(response);
+
+ }
+ }
+ } catch (IOException e) {
+ log.error(E_BATCH_PROCESSING, e.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Gets all the proton subkeys from Gluon server.
+ *
+ * @param subKeyUrl get every proton subkey Url
+ */
+ public void getAllProtonSubkeys(String subKeyUrl) {
+ HttpClient client = HttpClientBuilder.create().build();
+ HttpGet request = new HttpGet(subKeyUrl);
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ HttpResponse result = client.execute(request);
+ StatusLine statusLine = result.getStatusLine();
+ int statusCode = statusLine.getStatusCode();
+ if (statusCode == STATUS_CODE && result.getEntity() != null) {
+ String json = EntityUtils
+ .toString(result.getEntity());
+ log.debug(SUBKEYS_RECEIVED, json);
+ JsonNode jsonNode = mapper.readTree(json);
+ JsonNode nodes = jsonNode.get(GLUON_NODE).get(GLUON_NODES);
+
+ for (JsonNode confNode : nodes) {
+ String key = confNode.get(GLUON_KEY).asText();
+ storeProtonSubkey(key);
+ }
+ }
+ } catch (IOException e) {
+ log.error(E_SUBKEYS_PROCESSING, subKeyUrl);
+ }
+ return;
+ }
+
+ /**
+ * Gets all the proton subkeys from Gluon server.
+ *
+ * @param uri get every proton subkey Url
+ * @return version server version
+ */
+ public String getGluonServerVersion(String uri) {
+ HttpClient client = HttpClientBuilder.create().build();
+ HttpGet request = new HttpGet(uri);
+ ObjectMapper mapper = new ObjectMapper();
+ String version = null;
+ try {
+ HttpResponse result = client.execute(request);
+ StatusLine statusLine = result.getStatusLine();
+ int statusCode = statusLine.getStatusCode();
+ if (statusCode == STATUS_CODE && result.getEntity() != null) {
+ String json = EntityUtils
+ .toString(result.getEntity());
+ JsonNode jsonNode = mapper.readTree(json);
+ version = jsonNode.get("etcdserver").asText();
+ }
+ } catch (IOException e) {
+ log.error(PROCESSING_FAILED);
+ }
+ return version;
+ }
+
+ /**
+ * Gluon data updating and deleting into/from NetworkConfig datastore.
+ * config.apply will raise GluonConfig.class event for add,
+ * get and delete operations.
+ *
+ * @param gluonConfigMessage Etcdresponse data after parsing
+ */
+ public void processEtcdResponse(GluonConfig gluonConfigMessage) {
+
+ NetworkConfigService configService =
+ DefaultServiceDirectory.getService(NetworkConfigService.class);
+ if (gluonConfigMessage.action.equals(ACTION_SET) ||
+ gluonConfigMessage.action.equals(ACTION_GET)) {
+ GluonConfig config = configService
+ .addConfig(gluonConfigMessage.key, GluonConfig.class);
+ config.setEtcdResponse(gluonConfigMessage);
+ config.apply();
+ log.info(DATA_UPDATED);
+ } else if (gluonConfigMessage.action.equals(ACTION_DEL)) {
+ configService.removeConfig(gluonConfigMessage.key,
+ GluonConfig.class);
+ log.info(DATA_REMOVED);
+ } else {
+ log.info(INVALID_ACTION);
+ }
+ }
+
+ /**
+ * Returns set statistics.
+ *
+ * @return setCount
+ */
+ public int getSetCount() {
+ return setCount;
+ }
+
+ /**
+ * Returns get statistics.
+ *
+ * @return getCount
+ */
+ public int getGetCount() {
+ return getCount;
+ }
+
+ /**
+ * Returns delete statistics.
+ *
+ * @return delCount
+ */
+ public int getDelCount() {
+ return delCount;
+ }
+
+ /**
+ * Returns proton subkeys.
+ *
+ * @return subkeys
+ */
+ public List<String> getProtonSubkeys() {
+ return subKeys;
+ }
+
+ /**
+ * store proton subkeys.
+ *
+ * @param keys proton subkey
+ */
+ public void storeProtonSubkey(String keys) {
+ subKeys.add(keys);
+ }
+}
+