Enhancements to the distributed config manager

- Compute and persist a partition config based on the node config
- In the incoming JSON, use the same attribute names as the node
  config uses
- implement separate "id" and "ip" fields to follow how ONOS
  encoding works
- removed "IsEnable" field from inbound JSON, it has no
  meaning for ONOS
- persist and reload the partition config
- add a "/exit" endpoint to terminate the server
- add unit test for basic functionality

Change-Id: I1f1d511fdfc76ec3420661e47b3fe9033ffd070e
diff --git a/tools/test/bin/onos-distributed-manager b/tools/test/bin/onos-distributed-manager
index 1a17f2d..7e8fc3a 100755
--- a/tools/test/bin/onos-distributed-manager
+++ b/tools/test/bin/onos-distributed-manager
@@ -14,15 +14,16 @@
  limitations under the License.
 """
 
-from flask import Flask, jsonify, request
 import argparse
 import json
+import subprocess
 import sys
-import httplib
+import os
 from httplib import OK, NOT_FOUND, BAD_REQUEST
-import jsonschema
-from jsonschema import validate
+from subprocess import Popen
 
+from flask import Flask, jsonify, request
+from jsonschema import validate
 
 """
 Onos Distributed Manager
@@ -30,12 +31,11 @@
 
 Message Samples :
 --Adding node
-curl  -H "Content-Type: application/json" -X POST
-     --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl  -H "Content-Type: application/json" -X POST --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
 --Updating node
-curl  -H "Content-Type: application/json" -X PUT --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl  -H "Content-Type: application/json" -X PUT --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
 --Deleting node
-curl  -H "Content-Type: application/json" -X DELETE --data '{"Id":"1.2.3.4","IpAddress":"1.2.3.4","Port":3456,"IsEnable":true}' http://localhost:5000
+curl  -H "Content-Type: application/json" -X DELETE --data '{"id":"node-1","ip":"1.2.3.4","port":3456}' http://localhost:5000
 --Getting node data
 curl  -X GET  http://10.15.176.228:5000/cluster.json
 
@@ -45,12 +45,11 @@
 schema = {
     "type": "object",
     "properties": {
-        "Id": {"type": "string"},
-        "Port": {"type": "number"},
-        "IsEnable": {"type": "boolean"},
-        "IpAddress": {"type": "string"},
+        "id": {"type": "string"},
+        "port": {"type": "number"},
+        "ip": {"type": "string"},
     },
-    "required": ["Id", "Port", "IsEnable", "IpAddress"]
+    "required": ["id", "port", "ip"]
 }
 
 
@@ -63,14 +62,40 @@
     return True
 
 class Manager:
-    controller_list = {}
+    cluster_config = {
+        "nodes": [],
+        "name": 315256287,
+        "partitions": []
+    }
     persistence_filename = ""
 
+    def found_node_index(self, content):
+        for index, node in enumerate(self.cluster_config["nodes"]):
+            if node["id"] == content["id"]:
+                return index
+        return None
+
+
     def save_to_file(self):
-        data = json.dumps(self.controller_list)
+        config = self.cluster_config
+        if len(self.cluster_config["nodes"]) > 0:
+            command = ["/Users/ray/Documents/work/onos-next/tools/test/bin/onos-gen-partitions", "" ]
+            for controller_node in self.cluster_config["nodes"]:
+                node_string = controller_node["id"]
+                node_string += ":"
+                node_string += controller_node["ip"]
+                node_string += ":"
+                node_string += str(controller_node["port"])
+                command.append(node_string)
+            process = Popen(command, stdout=subprocess.PIPE)
+            config = json.load(process.stdout)
+            self.cluster_config = config
+        else:
+            self.cluster_config["nodes"] = []
+            self.cluster_config["partitions"] = []
         try:
             with open(self.persistence_filename, 'w') as file:
-                file.write(data)
+                file.write(json.dumps(config))
                 file.close()
         except Exception as error:
             print error
@@ -80,7 +105,7 @@
         try:
             with open(self.persistence_filename, 'r') as file:
                 data = file.read()
-                self.controller_list = json.loads(data)
+                self.cluster_config = json.loads(data)
         except Exception:
             self.save_to_file()
 
@@ -90,45 +115,46 @@
         pagereturn += "<br><h3> Status of Added controllers  </h3><br>"
         pagereturn += " Id,&emsp; Ip,&emsp; Port,&emsp; Is Active <br> "
 
-        for key in self.controller_list.keys():
-            pagereturn += self.controller_list[key]["Id"] + ",&emsp; " + \
-                          self.controller_list[key]["IpAddress"] + ",&emsp; " + \
-                          str(self.controller_list[key]["Port"]) + ",&emsp; " + \
-                          str(self.controller_list[key]["IsEnable"])
+        for key in self.cluster_config["nodes"].keys():
+            pagereturn += self.cluster_config["nodes"][key]["id"] + ",&emsp; " + \
+                          self.cluster_config["nodes"][key]["ip"] + ",&emsp; " + \
+                          str(self.cluster_config["nodes"][key]["port"]) + ",&emsp; "
             pagereturn += " <br>"
 
         return pagereturn, OK
 
     def data_post_handler(self, content):
         if not is_valid_node_json(content):
-            return "Id, IpAddress, Port, and IsEnable must be specified", \
+            return "id, ip, and port must be specified", \
                    BAD_REQUEST
 
-        if content["Id"] in self.controller_list:
+        if self.found_node_index(content) is not None:
             return "Content Id is already in the list", BAD_REQUEST
 
         else:
-            self.controller_list[content["Id"]] = content
+            self.cluster_config["nodes"].append(content)
             self.save_to_file()
 
         return "POST called with content", OK
 
     def data_put_handler(self, content):
         if not is_valid_node_json(content):
-            return "Id, IpAddress, Port, and IsEnable must be specified", \
+            return "id, ip, and port must be specified", \
                    BAD_REQUEST
 
-        if content["Id"] in self.controller_list:
-            self.controller_list[content["Id"]] = content
+        node_index = self.found_node_index(content)
+        if node_index is not None:
+            self.cluster_config["nodes"][node_index] = content
             self.save_to_file()
             return "Update succeeded", OK
 
         else:
-            return "Id %s is not found " % (content["Id"]), NOT_FOUND
+            return "Id %s is not found " % (content["id"]), NOT_FOUND
 
     def data_delete_handler(self, content):
-        if content["Id"] in self.controller_list:
-            del self.controller_list[content["Id"]]
+        node_index = self.found_node_index(content)
+        if node_index is not None:
+            del self.cluster_config["nodes"][node_index]
             self.save_to_file()
             return "Deletion succeed.", OK
 
@@ -142,30 +168,7 @@
 
     def cluster_responder(self):
 
-        cluster_info = dict()
-        nodes = list()
-        partition = dict()
-        # Todo: For first release , only 1 partition implemented
-        cluster_members = list()
-
-        # "nodes" array
-        for controller_id in self.controller_list:
-            controller_node = self.controller_list[controller_id]
-            if controller_node["IsEnable"]:
-                node_data = dict()
-                node_data["ip"] = controller_node["IpAddress"]
-                node_data["id"] = controller_node["Id"]
-                node_data["port"] = controller_node["Port"]
-                nodes.append(node_data)
-                cluster_members.append(controller_node["Id"])
-
-        partition["id"] = 1  # Todo: this will be updated .
-        partition["members"] = cluster_members
-
-        cluster_info["nodes"] = nodes
-        cluster_info["name"] = -1394421542720337000 # Todo: add a real value here
-        cluster_info["partitions"] = partition
-        return jsonify(cluster_info), OK
+        return jsonify(self.cluster_config), OK
 
 app = Flask(__name__)
 manager = Manager()
@@ -190,6 +193,15 @@
         return "json required", BAD_REQUEST
 
 
+@app.route('/exit', methods=['PUT'])
+def data_exit_handler():
+    func = request.environ.get('werkzeug.server.shutdown')
+    if func is None:
+        raise RuntimeError('Not running with the Werkzeug Server')
+    func()
+    return "", OK
+
+
 @app.route('/', methods=['PUT'])
 def data_put_handler():
     if request.is_json:
diff --git a/tools/test/bin/onos-gen-partitions b/tools/test/bin/onos-gen-partitions
index f5184f2..a982c36 100755
--- a/tools/test/bin/onos-gen-partitions
+++ b/tools/test/bin/onos-gen-partitions
@@ -42,11 +42,23 @@
       vars.append(var)
   return sorted(vars, key=alphanum_key)
 
-def get_nodes(ips=None, port=9876):
-  node = lambda k: { 'id': k, 'ip': k, 'port': port }
-  if not ips:
-    ips = [ environ[v] for v in get_OC_vars() ]
-  return [ node(v) for v in ips ]
+def get_nodes(ips=None, default_port=9876):
+    node = lambda id, ip, port : { 'id': id, 'ip': ip, 'port': port }
+    result = []
+    if not ips:
+        ips = [ environ[v] for v in get_OC_vars() ]
+    for ip_string in ips:
+        address_tuple = ip_string.split(":")
+        if len(address_tuple) == 3:
+            id=address_tuple[0]
+            ip=address_tuple[1]
+            port=int(address_tuple[2])
+        else:
+            id=ip_string
+            ip=ip_string
+            port=default_port
+        result.append(node(id, ip, port))
+    return result
 
 def generate_partitions(nodes, k, n):
   l = deque(nodes)
diff --git a/tools/test/tests/onos-distributed-manager-test b/tools/test/tests/onos-distributed-manager-test
new file mode 100755
index 0000000..029acbf
--- /dev/null
+++ b/tools/test/tests/onos-distributed-manager-test
@@ -0,0 +1,112 @@
+#! /usr/bin/env python
+
+import unittest
+from httplib import OK, BAD_REQUEST
+
+import requests
+import json
+from subprocess import Popen
+import time
+
+
+class TestOnosDistributedManager(unittest.TestCase):
+
+    base_url = 'http://localhost:5000/'
+    cluster_json_url = base_url + 'cluster.json'
+
+    def setUp(self):
+        command = ["onos-distributed-manager"]
+        self.server_process = Popen(command)
+        time.sleep(1)
+
+    def tearDown(self):
+        requests.put(self.base_url + 'exit')
+
+    def test_operations(self):
+        node1_json = '{"id":"node1","ip":"10.128.11.161","port":9876}'
+        node2_json = '{"id":"node2","ip":"10.128.11.162","port":9876}'
+        node3_json = '{"id":"node3","ip":"10.128.11.163","port":9876}'
+        node4_json = '{"id":"node4","ip":"10.128.11.164","port":9876}'
+        node5_json = '{"id":"node5","ip":"10.128.11.165","port":9876}'
+        new_node5_json = '{"id":"node5","ip":"10.128.11.265","port":1234}'
+
+        node1_dict = json.loads(node1_json)
+        new_node5_dict = json.loads(new_node5_json)
+
+        post_headers = { 'Content-Type': 'application/json' }
+
+        def check_list_lengths(expected_length):
+            json_request = requests.get(self.cluster_json_url)
+            self.assertEqual(json_request.status_code, OK)
+            json_object = json_request.json()
+            self.assertEqual(len(json_object["nodes"]), expected_length)
+            self.assertEqual(len(json_object["partitions"]), expected_length)
+
+        def post_node(node_json):
+            request = requests.post(self.base_url,
+                                    data=node_json,
+                                    headers=post_headers)
+            self.assertEqual(request.status_code, OK)
+
+        # Test initial configuration - node list should be empty
+        check_list_lengths(0)
+
+        # Test post operation to add a node
+        post_node(node1_json)
+
+        json_request = requests.get(self.cluster_json_url)
+        self.assertEqual(json_request.status_code, OK)
+        json_object = json_request.json()
+        self.assertEqual(len(json_object["nodes"]), 1)
+        self.assertEqual(len(json_object["partitions"]), 1)
+        self.assertIn(node1_dict, json_object["nodes"])
+        self.assertIn(node1_dict["id"], json_object["partitions"][0]["members"])
+
+        # Re-posting the same node should cause an error
+        json_request = requests.post(self.base_url,
+                                     data=node1_json,
+                                     headers=post_headers)
+        self.assertEqual(json_request.status_code, BAD_REQUEST)
+        check_list_lengths(1)
+
+        # Add 4 more nodes and check the partitions generated
+        post_node(node2_json)
+        post_node(node3_json)
+        post_node(node4_json)
+        post_node(node5_json)
+
+        check_list_lengths(5)
+        json_request = requests.get(self.cluster_json_url)
+        self.assertEqual(json_request.status_code, OK)
+        json_object = json_request.json()
+        self.assertEqual(["node4", "node5", "node1"],
+                         json_object["partitions"][3]["members"])
+
+        # modify a node
+        json_request = requests.put(self.base_url,
+                                    data=new_node5_json,
+                                    headers=post_headers)
+        self.assertEqual(json_request.status_code, OK)
+        json_request = requests.get(self.cluster_json_url)
+        self.assertEqual(json_request.status_code, OK)
+        json_object = json_request.json()
+        self.assertIn(new_node5_dict, json_object["nodes"])
+
+        # delete all the nodes
+        def delete_node(json_string):
+            request = requests.delete(self.base_url,
+                                      data=json_string,
+                                      headers=post_headers)
+            self.assertEqual(request.status_code, OK)
+
+        delete_node(new_node5_json)
+        delete_node(node4_json)
+        delete_node(node3_json)
+        delete_node(node2_json)
+        delete_node(node1_json)
+
+        # make sure that no nodes remain
+        check_list_lengths(0)
+
+if __name__ == '__main__':
+    unittest.main()