Enhancements to the distributed config manager
- Compute and persist a partition config based on the node config
- In the incoming JSON, use the same attribute names as the node
config uses
- implement separate "id" and "ip" fields to follow how ONOS
encoding works
- removed "IsEnable" field from inbound JSON, it has no
meaning for ONOS
- persist and reload the partition config
- add a "/exit" endpoint to terminate the server
- add unit test for basic functionality
Change-Id: I1f1d511fdfc76ec3420661e47b3fe9033ffd070e
diff --git a/tools/test/bin/onos-distributed-manager b/tools/test/bin/onos-distributed-manager
index 1a17f2d..7e8fc3a 100755
--- a/tools/test/bin/onos-distributed-manager
+++ b/tools/test/bin/onos-distributed-manager
@@ -14,15 +14,16 @@
limitations under the License.
"""
-from flask import Flask, jsonify, request
import argparse
import json
+import subprocess
import sys
-import httplib
+import os
from httplib import OK, NOT_FOUND, BAD_REQUEST
-import jsonschema
-from jsonschema import validate
+from subprocess import Popen
+from flask import Flask, jsonify, request
+from jsonschema import validate
"""
Onos Distributed Manager
@@ -30,12 +31,11 @@
Message Samples :
--Adding node
-curl -H "Content-Type: application/json" -X POST
- --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl -H "Content-Type: application/json" -X POST --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
--Updating node
-curl -H "Content-Type: application/json" -X PUT --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl -H "Content-Type: application/json" -X PUT --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
--Deleting node
-curl -H "Content-Type: application/json" -X DELETE --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl -H "Content-Type: application/json" -X DELETE --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
--Getting node data
curl -X GET http://10.15.176.228:5000/cluster.json
@@ -45,12 +45,11 @@
schema = {
"type": "object",
"properties": {
- "Id": {"type": "string"},
- "Port": {"type": "number"},
- "IsEnable": {"type": "boolean"},
- "IpAddress": {"type": "string"},
+ "id": {"type": "string"},
+ "port": {"type": "number"},
+ "ip": {"type": "string"},
},
- "required": ["Id", "Port", "IsEnable", "IpAddress"]
+ "required": ["id", "port", "ip"]
}
@@ -63,14 +62,40 @@
return True
class Manager:
- controller_list = {}
+ cluster_config = {
+ "nodes": [],
+ "name": 315256287,
+ "partitions": []
+ }
persistence_filename = ""
+ def found_node_index(self, content):
+ for index, node in enumerate(self.cluster_config["nodes"]):
+ if node["id"] == content["id"]:
+ return index
+ return None
+
+
def save_to_file(self):
- data = json.dumps(self.controller_list)
+ config = self.cluster_config
+ if len(self.cluster_config["nodes"]) > 0:
+ command = ["/Users/ray/Documents/work/onos-next/tools/test/bin/onos-gen-partitions", "" ]
+ for controller_node in self.cluster_config["nodes"]:
+ node_string = controller_node["id"]
+ node_string += ":"
+ node_string += controller_node["ip"]
+ node_string += ":"
+ node_string += str(controller_node["port"])
+ command.append(node_string)
+ process = Popen(command, stdout=subprocess.PIPE)
+ config = json.load(process.stdout)
+ self.cluster_config = config
+ else:
+ self.cluster_config["nodes"] = []
+ self.cluster_config["partitions"] = []
try:
with open(self.persistence_filename, 'w') as file:
- file.write(data)
+ file.write(json.dumps(config))
file.close()
except Exception as error:
print error
@@ -80,7 +105,7 @@
try:
with open(self.persistence_filename, 'r') as file:
data = file.read()
- self.controller_list = json.loads(data)
+ self.cluster_config = json.loads(data)
except Exception:
self.save_to_file()
@@ -90,45 +115,46 @@
pagereturn += "<br><h3> Status of Added controllers </h3><br>"
pagereturn += " Id,  Ip,  Port,  Is Active <br> "
- for key in self.controller_list.keys():
- pagereturn += self.controller_list[key]["Id"] + ",  " + \
- self.controller_list[key]["IpAddress"] + ",  " + \
- str(self.controller_list[key]["Port"]) + ",  " + \
- str(self.controller_list[key]["IsEnable"])
+ for key in self.cluster_config["nodes"].keys():
+ pagereturn += self.cluster_config["nodes"][key]["id"] + ",  " + \
+ self.cluster_config["nodes"][key]["ip"] + ",  " + \
+ str(self.cluster_config["nodes"][key]["port"]) + ",  "
pagereturn += " <br>"
return pagereturn, OK
def data_post_handler(self, content):
if not is_valid_node_json(content):
- return "Id, IpAddress, Port, and IsEnable must be specified", \
+ return "id, ip, and port must be specified", \
BAD_REQUEST
- if content["Id"] in self.controller_list:
+ if self.found_node_index(content) is not None:
return "Content Id is already in the list", BAD_REQUEST
else:
- self.controller_list[content["Id"]] = content
+ self.cluster_config["nodes"].append(content)
self.save_to_file()
return "POST called with content", OK
def data_put_handler(self, content):
if not is_valid_node_json(content):
- return "Id, IpAddress, Port, and IsEnable must be specified", \
+ return "id, ip, and port must be specified", \
BAD_REQUEST
- if content["Id"] in self.controller_list:
- self.controller_list[content["Id"]] = content
+ node_index = self.found_node_index(content)
+ if node_index is not None:
+ self.cluster_config["nodes"][node_index] = content
self.save_to_file()
return "Update succeeded", OK
else:
- return "Id %s is not found " % (content["Id"]), NOT_FOUND
+ return "Id %s is not found " % (content["id"]), NOT_FOUND
def data_delete_handler(self, content):
- if content["Id"] in self.controller_list:
- del self.controller_list[content["Id"]]
+ node_index = self.found_node_index(content)
+ if node_index is not None:
+ del self.cluster_config["nodes"][node_index]
self.save_to_file()
return "Deletion succeed.", OK
@@ -142,30 +168,7 @@
def cluster_responder(self):
- cluster_info = dict()
- nodes = list()
- partition = dict()
- # Todo: For first release , only 1 partition implemented
- cluster_members = list()
-
- # "nodes" array
- for controller_id in self.controller_list:
- controller_node = self.controller_list[controller_id]
- if controller_node["IsEnable"]:
- node_data = dict()
- node_data["ip"] = controller_node["IpAddress"]
- node_data["id"] = controller_node["Id"]
- node_data["port"] = controller_node["Port"]
- nodes.append(node_data)
- cluster_members.append(controller_node["Id"])
-
- partition["id"] = 1 # Todo: this will be updated .
- partition["members"] = cluster_members
-
- cluster_info["nodes"] = nodes
- cluster_info["name"] = -1394421542720337000 # Todo: add a real value here
- cluster_info["partitions"] = partition
- return jsonify(cluster_info), OK
+ return jsonify(self.cluster_config), OK
app = Flask(__name__)
manager = Manager()
@@ -190,6 +193,15 @@
return "json required", BAD_REQUEST
+@app.route('/exit', methods=['PUT'])
+def data_exit_handler():
+ func = request.environ.get('werkzeug.server.shutdown')
+ if func is None:
+ raise RuntimeError('Not running with the Werkzeug Server')
+ func()
+ return "", OK
+
+
@app.route('/', methods=['PUT'])
def data_put_handler():
if request.is_json:
diff --git a/tools/test/bin/onos-gen-partitions b/tools/test/bin/onos-gen-partitions
index f5184f2..a982c36 100755
--- a/tools/test/bin/onos-gen-partitions
+++ b/tools/test/bin/onos-gen-partitions
@@ -42,11 +42,23 @@
vars.append(var)
return sorted(vars, key=alphanum_key)
-def get_nodes(ips=None, port=9876):
- node = lambda k: { 'id': k, 'ip': k, 'port': port }
- if not ips:
- ips = [ environ[v] for v in get_OC_vars() ]
- return [ node(v) for v in ips ]
+def get_nodes(ips=None, default_port=9876):
+ node = lambda id, ip, port : { 'id': id, 'ip': ip, 'port': port }
+ result = []
+ if not ips:
+ ips = [ environ[v] for v in get_OC_vars() ]
+ for ip_string in ips:
+ address_tuple = ip_string.split(":")
+ if len(address_tuple) == 3:
+ id=address_tuple[0]
+ ip=address_tuple[1]
+ port=int(address_tuple[2])
+ else:
+ id=ip_string
+ ip=ip_string
+ port=default_port
+ result.append(node(id, ip, port))
+ return result
def generate_partitions(nodes, k, n):
l = deque(nodes)