Enhancements to the distributed config manager
- Compute and persist a partition config based on the node config
- In the incoming JSON, use the same attribute names as the node
config uses
- implement separate "id" and "ip" fields to follow how ONOS
encoding works
- removed "IsEnable" field from inbound JSON, it has no
meaning for ONOS
- persist and reload the partition config
- add a "/exit" endpoint to terminate the server
- add unit test for basic functionality
Change-Id: I1f1d511fdfc76ec3420661e47b3fe9033ffd070e
diff --git a/tools/test/bin/onos-distributed-manager b/tools/test/bin/onos-distributed-manager
index 1a17f2d..7e8fc3a 100755
--- a/tools/test/bin/onos-distributed-manager
+++ b/tools/test/bin/onos-distributed-manager
@@ -14,15 +14,16 @@
limitations under the License.
"""
-from flask import Flask, jsonify, request
import argparse
import json
+import subprocess
import sys
-import httplib
+import os
from httplib import OK, NOT_FOUND, BAD_REQUEST
-import jsonschema
-from jsonschema import validate
+from subprocess import Popen
+from flask import Flask, jsonify, request
+from jsonschema import validate
"""
Onos Distributed Manager
@@ -30,12 +31,11 @@
Message Samples :
--Adding node
-curl -H "Content-Type: application/json" -X POST
- --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl -H "Content-Type: application/json" -X POST --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
--Updating node
-curl -H "Content-Type: application/json" -X PUT --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl -H "Content-Type: application/json" -X PUT --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
--Deleting node
-curl -H "Content-Type: application/json" -X DELETE --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl -H "Content-Type: application/json" -X DELETE --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
--Getting node data
curl -X GET http://10.15.176.228:5000/cluster.json
@@ -45,12 +45,11 @@
schema = {
"type": "object",
"properties": {
- "Id": {"type": "string"},
- "Port": {"type": "number"},
- "IsEnable": {"type": "boolean"},
- "IpAddress": {"type": "string"},
+ "id": {"type": "string"},
+ "port": {"type": "number"},
+ "ip": {"type": "string"},
},
- "required": ["Id", "Port", "IsEnable", "IpAddress"]
+ "required": ["id", "port", "ip"]
}
@@ -63,14 +62,40 @@
return True
class Manager:
- controller_list = {}
+ cluster_config = {
+ "nodes": [],
+ "name": 315256287,
+ "partitions": []
+ }
persistence_filename = ""
+ def found_node_index(self, content):
+ for index, node in enumerate(self.cluster_config["nodes"]):
+ if node["id"] == content["id"]:
+ return index
+ return None
+
+
def save_to_file(self):
- data = json.dumps(self.controller_list)
+ config = self.cluster_config
+ if len(self.cluster_config["nodes"]) > 0:
+ command = ["/Users/ray/Documents/work/onos-next/tools/test/bin/onos-gen-partitions", "" ]
+ for controller_node in self.cluster_config["nodes"]:
+ node_string = controller_node["id"]
+ node_string += ":"
+ node_string += controller_node["ip"]
+ node_string += ":"
+ node_string += str(controller_node["port"])
+ command.append(node_string)
+ process = Popen(command, stdout=subprocess.PIPE)
+ config = json.load(process.stdout)
+ self.cluster_config = config
+ else:
+ self.cluster_config["nodes"] = []
+ self.cluster_config["partitions"] = []
try:
with open(self.persistence_filename, 'w') as file:
- file.write(data)
+ file.write(json.dumps(config))
file.close()
except Exception as error:
print error
@@ -80,7 +105,7 @@
try:
with open(self.persistence_filename, 'r') as file:
data = file.read()
- self.controller_list = json.loads(data)
+ self.cluster_config = json.loads(data)
except Exception:
self.save_to_file()
@@ -90,45 +115,46 @@
pagereturn += "<br><h3> Status of Added controllers </h3><br>"
pagereturn += " Id,  Ip,  Port,  Is Active <br> "
- for key in self.controller_list.keys():
- pagereturn += self.controller_list[key]["Id"] + ",  " + \
- self.controller_list[key]["IpAddress"] + ",  " + \
- str(self.controller_list[key]["Port"]) + ",  " + \
- str(self.controller_list[key]["IsEnable"])
+ for key in self.cluster_config["nodes"].keys():
+ pagereturn += self.cluster_config["nodes"][key]["id"] + ",  " + \
+ self.cluster_config["nodes"][key]["ip"] + ",  " + \
+ str(self.cluster_config["nodes"][key]["port"]) + ",  "
pagereturn += " <br>"
return pagereturn, OK
def data_post_handler(self, content):
if not is_valid_node_json(content):
- return "Id, IpAddress, Port, and IsEnable must be specified", \
+ return "id, ip, and port must be specified", \
BAD_REQUEST
- if content["Id"] in self.controller_list:
+ if self.found_node_index(content) is not None:
return "Content Id is already in the list", BAD_REQUEST
else:
- self.controller_list[content["Id"]] = content
+ self.cluster_config["nodes"].append(content)
self.save_to_file()
return "POST called with content", OK
def data_put_handler(self, content):
if not is_valid_node_json(content):
- return "Id, IpAddress, Port, and IsEnable must be specified", \
+ return "id, ip, and port must be specified", \
BAD_REQUEST
- if content["Id"] in self.controller_list:
- self.controller_list[content["Id"]] = content
+ node_index = self.found_node_index(content)
+ if node_index is not None:
+ self.cluster_config["nodes"][node_index] = content
self.save_to_file()
return "Update succeeded", OK
else:
- return "Id %s is not found " % (content["Id"]), NOT_FOUND
+ return "Id %s is not found " % (content["id"]), NOT_FOUND
def data_delete_handler(self, content):
- if content["Id"] in self.controller_list:
- del self.controller_list[content["Id"]]
+ node_index = self.found_node_index(content)
+ if node_index is not None:
+ del self.cluster_config["nodes"][node_index]
self.save_to_file()
return "Deletion succeed.", OK
@@ -142,30 +168,7 @@
def cluster_responder(self):
- cluster_info = dict()
- nodes = list()
- partition = dict()
- # Todo: For first release , only 1 partition implemented
- cluster_members = list()
-
- # "nodes" array
- for controller_id in self.controller_list:
- controller_node = self.controller_list[controller_id]
- if controller_node["IsEnable"]:
- node_data = dict()
- node_data["ip"] = controller_node["IpAddress"]
- node_data["id"] = controller_node["Id"]
- node_data["port"] = controller_node["Port"]
- nodes.append(node_data)
- cluster_members.append(controller_node["Id"])
-
- partition["id"] = 1 # Todo: this will be updated .
- partition["members"] = cluster_members
-
- cluster_info["nodes"] = nodes
- cluster_info["name"] = -1394421542720337000 # Todo: add a real value here
- cluster_info["partitions"] = partition
- return jsonify(cluster_info), OK
+ return jsonify(self.cluster_config), OK
app = Flask(__name__)
manager = Manager()
@@ -190,6 +193,15 @@
return "json required", BAD_REQUEST
+@app.route('/exit', methods=['PUT'])
+def data_exit_handler():
+ func = request.environ.get('werkzeug.server.shutdown')
+ if func is None:
+ raise RuntimeError('Not running with the Werkzeug Server')
+ func()
+ return "", OK
+
+
@app.route('/', methods=['PUT'])
def data_put_handler():
if request.is_json:
diff --git a/tools/test/bin/onos-gen-partitions b/tools/test/bin/onos-gen-partitions
index f5184f2..a982c36 100755
--- a/tools/test/bin/onos-gen-partitions
+++ b/tools/test/bin/onos-gen-partitions
@@ -42,11 +42,23 @@
vars.append(var)
return sorted(vars, key=alphanum_key)
-def get_nodes(ips=None, port=9876):
- node = lambda k: { 'id': k, 'ip': k, 'port': port }
- if not ips:
- ips = [ environ[v] for v in get_OC_vars() ]
- return [ node(v) for v in ips ]
+def get_nodes(ips=None, default_port=9876):
+ node = lambda id, ip, port : { 'id': id, 'ip': ip, 'port': port }
+ result = []
+ if not ips:
+ ips = [ environ[v] for v in get_OC_vars() ]
+ for ip_string in ips:
+ address_tuple = ip_string.split(":")
+ if len(address_tuple) == 3:
+ id=address_tuple[0]
+ ip=address_tuple[1]
+ port=int(address_tuple[2])
+ else:
+ id=ip_string
+ ip=ip_string
+ port=default_port
+ result.append(node(id, ip, port))
+ return result
def generate_partitions(nodes, k, n):
l = deque(nodes)
diff --git a/tools/test/tests/onos-distributed-manager-test b/tools/test/tests/onos-distributed-manager-test
new file mode 100755
index 0000000..029acbf
--- /dev/null
+++ b/tools/test/tests/onos-distributed-manager-test
@@ -0,0 +1,112 @@
+#! /usr/bin/env python
+
+import unittest
+from httplib import OK, BAD_REQUEST
+
+import requests
+import json
+from subprocess import Popen
+import time
+
+
+class TestOnosDistributedManager(unittest.TestCase):
+
+ base_url = 'http://localhost:5000/'
+ cluster_json_url = base_url + 'cluster.json'
+
+ def setUp(self):
+ command = ["onos-distributed-manager"]
+ self.server_process = Popen(command)
+ time.sleep(1)
+
+ def tearDown(self):
+ requests.put(self.base_url + 'exit')
+
+ def test_operations(self):
+ node1_json = '{"id":"node1","ip":"10.128.11.161","port":9876}'
+ node2_json = '{"id":"node2","ip":"10.128.11.162","port":9876}'
+ node3_json = '{"id":"node3","ip":"10.128.11.163","port":9876}'
+ node4_json = '{"id":"node4","ip":"10.128.11.164","port":9876}'
+ node5_json = '{"id":"node5","ip":"10.128.11.165","port":9876}'
+ new_node5_json = '{"id":"node5","ip":"10.128.11.265","port":1234}'
+
+ node1_dict = json.loads(node1_json)
+ new_node5_dict = json.loads(new_node5_json)
+
+ post_headers = { 'Content-Type': 'application/json' }
+
+ def check_list_lengths(expected_length):
+ json_request = requests.get(self.cluster_json_url)
+ self.assertEqual(json_request.status_code, OK)
+ json_object = json_request.json()
+ self.assertEqual(len(json_object["nodes"]), expected_length)
+ self.assertEqual(len(json_object["partitions"]), expected_length)
+
+ def post_node(node_json):
+ request = requests.post(self.base_url,
+ data=node_json,
+ headers=post_headers)
+ self.assertEqual(request.status_code, OK)
+
+ # Test initial configuration - node list should be empty
+ check_list_lengths(0)
+
+ # Test post operation to add a node
+ post_node(node1_json)
+
+ json_request = requests.get(self.cluster_json_url)
+ self.assertEqual(json_request.status_code, OK)
+ json_object = json_request.json()
+ self.assertEqual(len(json_object["nodes"]), 1)
+ self.assertEqual(len(json_object["partitions"]), 1)
+ self.assertIn(node1_dict, json_object["nodes"])
+ self.assertIn(node1_dict["id"], json_object["partitions"][0]["members"])
+
+ # Re-posting the same node should cause an error
+ json_request = requests.post(self.base_url,
+ data=node1_json,
+ headers=post_headers)
+ self.assertEqual(json_request.status_code, BAD_REQUEST)
+ check_list_lengths(1)
+
+ # Add 4 more nodes and check the partitions generated
+ post_node(node2_json)
+ post_node(node3_json)
+ post_node(node4_json)
+ post_node(node5_json)
+
+ check_list_lengths(5)
+ json_request = requests.get(self.cluster_json_url)
+ self.assertEqual(json_request.status_code, OK)
+ json_object = json_request.json()
+ self.assertEqual(["node4", "node5", "node1"],
+ json_object["partitions"][3]["members"])
+
+ # modify a node
+ json_request = requests.put(self.base_url,
+ data=new_node5_json,
+ headers=post_headers)
+ self.assertEqual(json_request.status_code, OK)
+ json_request = requests.get(self.cluster_json_url)
+ self.assertEqual(json_request.status_code, OK)
+ json_object = json_request.json()
+ self.assertIn(new_node5_dict, json_object["nodes"])
+
+ # delete all the nodes
+ def delete_node(json_string):
+ request = requests.delete(self.base_url,
+ data=json_string,
+ headers=post_headers)
+ self.assertEqual(request.status_code, OK)
+
+ delete_node(new_node5_json)
+ delete_node(node4_json)
+ delete_node(node3_json)
+ delete_node(node2_json)
+ delete_node(node1_json)
+
+ # make sure that no nodes remain
+ check_list_lengths(0)
+
+if __name__ == '__main__':
+ unittest.main()