blob: 031ba69fa0df00baec409f52ae3e95376f8d71d2 [file] [log] [blame]
Mohammad Shahid0cf9c0e2017-08-09 15:58:19 +05301/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.gluon.rsc;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import org.apache.http.HttpResponse;
21import org.apache.http.StatusLine;
22import org.apache.http.client.HttpClient;
23import org.apache.http.client.config.RequestConfig;
24import org.apache.http.client.methods.HttpGet;
25import org.apache.http.concurrent.FutureCallback;
26import org.apache.http.impl.client.HttpClientBuilder;
27import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
28import org.apache.http.impl.nio.client.HttpAsyncClients;
29import org.apache.http.util.EntityUtils;
30import org.onlab.osgi.DefaultServiceDirectory;
31import org.onosproject.net.config.NetworkConfigService;
32import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
35import java.io.IOException;
36import java.net.URI;
37import java.util.LinkedList;
38import java.util.List;
39import java.util.Map;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42
43import static org.onlab.util.Tools.groupedThreads;
44import static org.onosproject.gluon.manager.GluonManager.addServer;
45import static org.onosproject.gluon.manager.GluonManager.deleteServer;
46import static org.onosproject.gluon.manager.GluonManager.getAllServersIP;
47import static org.onosproject.gluon.rsc.GluonConstants.ACTION_DEL;
48import static org.onosproject.gluon.rsc.GluonConstants.ACTION_GET;
49import static org.onosproject.gluon.rsc.GluonConstants.ACTION_SET;
50import static org.onosproject.gluon.rsc.GluonConstants.ACTIVE_SERVER;
51import static org.onosproject.gluon.rsc.GluonConstants.BATCH_PROCESSING;
52import static org.onosproject.gluon.rsc.GluonConstants.BATCH_QUERING;
53import static org.onosproject.gluon.rsc.GluonConstants.BATCH_RECEIVED;
54import static org.onosproject.gluon.rsc.GluonConstants.BATCH_SERVICE_STATUS;
55import static org.onosproject.gluon.rsc.GluonConstants.BATCH_STOPPED;
56import static org.onosproject.gluon.rsc.GluonConstants.DATA_REMOVED;
57import static org.onosproject.gluon.rsc.GluonConstants.DATA_UPDATED;
58import static org.onosproject.gluon.rsc.GluonConstants.E_BATCH_PROCESSING;
59import static org.onosproject.gluon.rsc.GluonConstants.E_BATCH_PROCESSING_URL;
60import static org.onosproject.gluon.rsc.GluonConstants.E_CLIENT_STOP;
61import static org.onosproject.gluon.rsc.GluonConstants.E_REAL_TIME_PROCESSING;
62import static org.onosproject.gluon.rsc.GluonConstants.E_SUBKEYS_PROCESSING;
63import static org.onosproject.gluon.rsc.GluonConstants.GLUON_ACTION;
64import static org.onosproject.gluon.rsc.GluonConstants.GLUON_CREATE_INDEX;
65import static org.onosproject.gluon.rsc.GluonConstants.GLUON_KEY;
66import static org.onosproject.gluon.rsc.GluonConstants.GLUON_MOD_INDEX;
67import static org.onosproject.gluon.rsc.GluonConstants.GLUON_NODE;
68import static org.onosproject.gluon.rsc.GluonConstants.GLUON_NODES;
69import static org.onosproject.gluon.rsc.GluonConstants.GLUON_VALUE;
70import static org.onosproject.gluon.rsc.GluonConstants.INVALID_ACTION;
71import static org.onosproject.gluon.rsc.GluonConstants.KEYS;
72import static org.onosproject.gluon.rsc.GluonConstants.MODE_START;
73import static org.onosproject.gluon.rsc.GluonConstants.MODE_STOP;
74import static org.onosproject.gluon.rsc.GluonConstants.NO_SERVER_AVAIL;
75import static org.onosproject.gluon.rsc.GluonConstants.NO_SUBKEYS_AVAIL;
76import static org.onosproject.gluon.rsc.GluonConstants.PROCESSING_FAILED;
77import static org.onosproject.gluon.rsc.GluonConstants.PROTON;
78import static org.onosproject.gluon.rsc.GluonConstants.REAL_TIME_PROCESSING;
79import static org.onosproject.gluon.rsc.GluonConstants.REAL_TIME_RECEIVED;
80import static org.onosproject.gluon.rsc.GluonConstants.REAL_TIME_SERVICE_STATUS;
81import static org.onosproject.gluon.rsc.GluonConstants.SERVER_RUNNING;
82import static org.onosproject.gluon.rsc.GluonConstants.SERVER_STOPPED;
83import static org.onosproject.gluon.rsc.GluonConstants.STATUS_CODE;
84import static org.onosproject.gluon.rsc.GluonConstants.SUBKEYS_RECEIVED;
85
86
87public class GluonServer {
88
Ray Milkey06297ed2018-01-22 17:13:41 -080089 private String protonKeyUri;
90 private String serverUri;
Mohammad Shahid0cf9c0e2017-08-09 15:58:19 +053091
Ray Milkey06297ed2018-01-22 17:13:41 -080092 private CloseableHttpAsyncClient httpClient;
Mohammad Shahid0cf9c0e2017-08-09 15:58:19 +053093
94 //store gluon server supported subkeys
95 private List<String> subKeys = new LinkedList<>();
96
97 // Lists of gluon servers
98 public Map<String, GluonServer> serverMap = getAllServersIP();
99
100 private final Logger log = LoggerFactory.getLogger(getClass());
101
102 // Real time executor thread
103 private final ExecutorService executorRealTimeService = Executors
104 .newSingleThreadExecutor(groupedThreads("EtcdRealTimeMonitor",
105 "executor-%d", log));
106 // Batch executor thread
107 private final ExecutorService executorBatchService = Executors
108 .newSingleThreadExecutor(groupedThreads("EtcdBatchMonitor",
109 "executor-%d", log));
110
111 // Statistics counter
112 private int setCount = 0;
113 private int delCount = 0;
114 private int getCount = 0;
115 // Server etcd version
116 public String version;
117
118 /**
119 * To get Gluon server running version, needs to create at-least one object.
120 */
121 public GluonServer() {
122 }
123
124 /**
125 * Realising server functionality.
126 *
127 * @param etcduri server url
128 * @param targetProtonKey server key type, default net-l3vpn
129 * @param mode server mode start or stop
130 * @param version running server version
131 */
132 public GluonServer(String etcduri, String targetProtonKey,
133 String mode, String version) {
134 this.version = version;
135
136 switch (mode) {
137 // Handling stop mode
138 case MODE_STOP:
139 // return if server is not available into the server list
140 if (!serverMap.containsKey(etcduri)) {
141 log.debug(NO_SERVER_AVAIL);
142 return;
143 }
144 try {
145 // stop batch service executor thread
146 log.debug(BATCH_SERVICE_STATUS,
147 executorBatchService.isShutdown());
148 executorBatchService.shutdown();
149 // stop real time service executor thread
150 log.debug(REAL_TIME_SERVICE_STATUS,
151 executorRealTimeService.isShutdown());
152 executorRealTimeService.shutdown();
153 // closing http client
154 httpClient.close();
155 } catch (IOException io) {
156 log.error(E_CLIENT_STOP, io.getMessage());
157 }
158 // deletes server from gluon server list
159 deleteServer(etcduri);
160 log.debug(SERVER_STOPPED);
161 return;
162 // Handling start mode
163 case MODE_START:
164 if (serverMap.containsKey(etcduri)) {
165 //Returns user CLI if server is already running
166 // and logs all server info into log files
167 log.info(SERVER_RUNNING);
168 log.debug(ACTIVE_SERVER, serverMap.size());
169 return;
170 }
171 // Store gluon manager object and gluon server url
172 addServer(etcduri, this);
173 // Preparing server uri
174 serverUri = etcduri + "/v2" + KEYS;
175 // Preparing server subkeys uri
176 protonKeyUri = PROTON + targetProtonKey;
177 // Starts http client
178 RequestConfig requestConfig = RequestConfig.custom().build();
179 httpClient = HttpAsyncClients.custom()
180 .setDefaultRequestConfig(requestConfig).build();
181 httpClient.start();
182
183 // Start thread to handle and process RealTime data
184 handleRealTimeData(null);
185
186 // Start thread to handle and process batch data,
187 // iff subkeys are available
188 getAllProtonSubkeys(serverUri + protonKeyUri);
189 if (getProtonSubkeys().isEmpty()) {
190 log.debug(NO_SUBKEYS_AVAIL);
191 return;
192 }
193 // handle RealTime data
194 handleBatchData(0);
195 return;
196 default:
197 log.debug(INVALID_ACTION);
198
199 }
200 }
201
202 /**
203 * Handles real time data which is received from Gluon server.
204 *
205 * @param index, It will be used in recursive call of
206 * real time monitoring method.
207 * modified index receive from GluonConfig config file
208 */
209 private void handleRealTimeData(Long index) {
210 String realTimeUri = serverUri + protonKeyUri +
211 "/?wait=true&recursive=true";
212 if (index != null) {
213 realTimeUri += "&waitIndex=" + index;
214 }
215 HttpGet request = new HttpGet(URI.create(realTimeUri));
216 log.info(REAL_TIME_PROCESSING, realTimeUri);
217 // Starts real time executor thread
218 executorRealTimeService.execute(new Runnable() {
219 public void run() {
220 try {
221 httpClient.execute(
222 request, new FutureCallback<HttpResponse>() {
223
224 @Override
225 public void completed(HttpResponse result) {
226 StatusLine statusLine =
227 result.getStatusLine();
228 int statusCode = statusLine.getStatusCode();
229 if (statusCode ==
230 STATUS_CODE &&
231 result.getEntity() != null) {
232 try {
233 String json = EntityUtils
234 .toString(result.getEntity());
235 GluonConfig response =
236 processRealTimeResponse(json);
237 // Recursive call to handle
238 // real time data
239 handleRealTimeData(
240 response.modifiedIndex + 1);
241 } catch (IOException e) {
242 failed(e);
243 }
244 } else {
245 log.error(E_REAL_TIME_PROCESSING);
246 }
247 }
248
249 @Override
250 public void cancelled() {
251 log.debug("Nothing to do with " +
252 "this overridden method");
253 }
254
255 @Override
256 public void failed(Exception e) {
257 log.error(E_REAL_TIME_PROCESSING,
258 e.getMessage());
259 }
260 });
261 } catch (Exception e) {
262 log.error(E_REAL_TIME_PROCESSING, e.getMessage());
263 }
264 }
265 });
266 }
267
268
269 /**
270 * Handles batch data which is received from Gluon server.
271 *
272 * @param subKeyIndex gets all proton subkey value
273 */
274 private void handleBatchData(int subKeyIndex) {
275 String currBatchUri = serverUri + getProtonSubkeys().get(subKeyIndex);
276 HttpGet request = new HttpGet(URI.create(currBatchUri));
277
278 if (0 == subKeyIndex) {
279 log.debug(BATCH_PROCESSING, protonKeyUri);
280 }
281 log.info(BATCH_QUERING, currBatchUri);
282 // Starts batch executor thread
283 executorBatchService.execute(new Runnable() {
284 public void run() {
285 try {
286 httpClient.execute(request, new FutureCallback<HttpResponse>() {
287 @Override
288 public void completed(HttpResponse result) {
289 StatusLine statusLine = result.getStatusLine();
290 int statusCode = statusLine.getStatusCode();
291 if (statusCode == STATUS_CODE &&
292 result.getEntity() != null) {
293 try {
294 String json = EntityUtils
295 .toString(result.getEntity());
296 processBatchResponse(json);
297 // Stop batch executor thread
298 // once all gluon server subkeys processed
299 if (subKeyIndex ==
300 ((getProtonSubkeys().size()) - 1)) {
301 cancelled();
302 return;
303 }
304
305 handleBatchData(subKeyIndex + 1);
306 } catch (IOException e) {
307 failed(e);
308 }
309 } else {
310 log.error(E_BATCH_PROCESSING_URL, currBatchUri);
311 }
312 }
313
314 @Override
315 public void cancelled() {
316 executorBatchService.shutdown();
317 log.debug(BATCH_STOPPED, protonKeyUri);
318 }
319
320 @Override
321 public void failed(Exception e) {
322 log.error(E_BATCH_PROCESSING, e.getMessage());
323 }
324 });
325 } catch (Exception e) {
326 log.error(E_BATCH_PROCESSING, e.getMessage());
327 }
328 }
329 });
330 }
331
332 /**
333 * Parse and process real time json data which is received from Gluon server.
334 *
335 * @param result real time json data
336 * @return GluonConfig response
337 */
338 public GluonConfig processRealTimeResponse(String result) {
339 ObjectMapper mapper = new ObjectMapper();
340 GluonConfig response = null;
341 try {
342 log.info(REAL_TIME_RECEIVED, result);
343 JsonNode jsonNode = mapper.readTree(result);
344 String action = jsonNode.get(GLUON_ACTION).asText();
345 String key = jsonNode.get(GLUON_NODE).get(GLUON_KEY).asText();
346 long mIndex = jsonNode.get(GLUON_NODE)
347 .get(GLUON_MOD_INDEX).asLong();
348 long cIndex = jsonNode.get(GLUON_NODE)
349 .get(GLUON_CREATE_INDEX).asLong();
350 if (action.equals(ACTION_SET)) {
351 String value = jsonNode.get(GLUON_NODE)
352 .get(GLUON_VALUE).asText();
353 JsonNode modifyValue = mapper.readTree(value.replace("\\", ""));
354 response = new GluonConfig(action, key, modifyValue, mIndex,
355 cIndex);
356 setCount++;
357 } else if (action.equals(ACTION_DEL)) {
358 response = new GluonConfig(action, key, null, mIndex, cIndex);
359 delCount++;
360 } else {
361 log.debug(INVALID_ACTION);
362 }
363 } catch (IOException e) {
364 log.error(E_REAL_TIME_PROCESSING, e.getMessage());
365 }
366 processEtcdResponse(response);
367 return response;
368 }
369
370 /**
371 * Parse and process batch json data which is received from Gluon server.
372 *
373 * @param result batch json data
374 * @return GluonConfig response
375 */
376 public GluonConfig processBatchResponse(String result) {
377 ObjectMapper mapper = new ObjectMapper();
378 GluonConfig response = null;
379 try {
380 log.debug(BATCH_RECEIVED, result);
381 JsonNode jsonNode = mapper.readTree(result);
382 log.info("JSON NODE VALUE ARE: {}", jsonNode);
383 String action = jsonNode.get(GLUON_ACTION).asText();
384 JsonNode nodes = jsonNode.get(GLUON_NODE).get(GLUON_NODES);
385 if (null != nodes) {
386 for (JsonNode confNode : nodes) {
387 String key = confNode.get(GLUON_KEY).asText();
388 long mIndex = confNode.get(GLUON_MOD_INDEX).asLong();
389 long cIndex = confNode.get(GLUON_CREATE_INDEX).asLong();
390 String value = confNode.get(GLUON_VALUE).asText();
391 log.info("JSON NODE VALUE ARE 2: {}", value);
392 JsonNode modifyValue = mapper.readTree(value.replace("\\", ""));
393 log.info("JSON NODE MODIFY VALUE ARE 2: {}", modifyValue);
394 response = new GluonConfig(action, key,
395 modifyValue, mIndex, cIndex);
396 getCount++;
397 processEtcdResponse(response);
398
399 }
400 }
401 } catch (IOException e) {
402 log.error(E_BATCH_PROCESSING, e.getMessage());
403 }
404 return response;
405 }
406
407 /**
408 * Gets all the proton subkeys from Gluon server.
409 *
410 * @param subKeyUrl get every proton subkey Url
411 */
412 public void getAllProtonSubkeys(String subKeyUrl) {
413 HttpClient client = HttpClientBuilder.create().build();
414 HttpGet request = new HttpGet(subKeyUrl);
415 ObjectMapper mapper = new ObjectMapper();
416 try {
417 HttpResponse result = client.execute(request);
418 StatusLine statusLine = result.getStatusLine();
419 int statusCode = statusLine.getStatusCode();
420 if (statusCode == STATUS_CODE && result.getEntity() != null) {
421 String json = EntityUtils
422 .toString(result.getEntity());
423 log.debug(SUBKEYS_RECEIVED, json);
424 JsonNode jsonNode = mapper.readTree(json);
425 JsonNode nodes = jsonNode.get(GLUON_NODE).get(GLUON_NODES);
426
427 for (JsonNode confNode : nodes) {
428 String key = confNode.get(GLUON_KEY).asText();
429 storeProtonSubkey(key);
430 }
431 }
432 } catch (IOException e) {
433 log.error(E_SUBKEYS_PROCESSING, subKeyUrl);
434 }
435 return;
436 }
437
438 /**
439 * Gets all the proton subkeys from Gluon server.
440 *
441 * @param uri get every proton subkey Url
442 * @return version server version
443 */
444 public String getGluonServerVersion(String uri) {
445 HttpClient client = HttpClientBuilder.create().build();
446 HttpGet request = new HttpGet(uri);
447 ObjectMapper mapper = new ObjectMapper();
448 String version = null;
449 try {
450 HttpResponse result = client.execute(request);
451 StatusLine statusLine = result.getStatusLine();
452 int statusCode = statusLine.getStatusCode();
453 if (statusCode == STATUS_CODE && result.getEntity() != null) {
454 String json = EntityUtils
455 .toString(result.getEntity());
456 JsonNode jsonNode = mapper.readTree(json);
457 version = jsonNode.get("etcdserver").asText();
458 }
459 } catch (IOException e) {
460 log.error(PROCESSING_FAILED);
461 }
462 return version;
463 }
464
465 /**
466 * Gluon data updating and deleting into/from NetworkConfig datastore.
467 * config.apply will raise GluonConfig.class event for add,
468 * get and delete operations.
469 *
470 * @param gluonConfigMessage Etcdresponse data after parsing
471 */
472 public void processEtcdResponse(GluonConfig gluonConfigMessage) {
473
474 NetworkConfigService configService =
475 DefaultServiceDirectory.getService(NetworkConfigService.class);
476 if (gluonConfigMessage.action.equals(ACTION_SET) ||
477 gluonConfigMessage.action.equals(ACTION_GET)) {
478 GluonConfig config = configService
479 .addConfig(gluonConfigMessage.key, GluonConfig.class);
480 config.setEtcdResponse(gluonConfigMessage);
481 config.apply();
482 log.info(DATA_UPDATED);
483 } else if (gluonConfigMessage.action.equals(ACTION_DEL)) {
484 configService.removeConfig(gluonConfigMessage.key,
485 GluonConfig.class);
486 log.info(DATA_REMOVED);
487 } else {
488 log.info(INVALID_ACTION);
489 }
490 }
491
492 /**
493 * Returns set statistics.
494 *
495 * @return setCount
496 */
497 public int getSetCount() {
498 return setCount;
499 }
500
501 /**
502 * Returns get statistics.
503 *
504 * @return getCount
505 */
506 public int getGetCount() {
507 return getCount;
508 }
509
510 /**
511 * Returns delete statistics.
512 *
513 * @return delCount
514 */
515 public int getDelCount() {
516 return delCount;
517 }
518
519 /**
520 * Returns proton subkeys.
521 *
522 * @return subkeys
523 */
524 public List<String> getProtonSubkeys() {
525 return subKeys;
526 }
527
528 /**
529 * store proton subkeys.
530 *
531 * @param keys proton subkey
532 */
533 public void storeProtonSubkey(String keys) {
534 subKeys.add(keys);
535 }
536}
537