blob: 1f3cbe6b29586e10925f93c10812caab0d324758 [file] [log] [blame]
#!/usr/bin/python3
import requests
import json
import itertools
#
# Creates client-side connectivity json
#
def tapi_client_input(sip_uuids):
create_input = {
"tapi-connectivity:input": {
"end-point" : [
{
"local-id": sip_uuids[0],
"service-interface-point": {
"service-interface-point-uuid" : sip_uuids[0]
}
}
,
{
"local-id": sip_uuids[1],
"service-interface-point": {
"service-interface-point-uuid" : sip_uuids[1]
}
}
]
}
}
return create_input
#
# Creates line-side connectivity json
#
def tapi_line_input(sip_uuids):
create_input = {
"tapi-connectivity:input" : {
"end-point" : [
{
"layer-protocol-qualifier" : "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC",
"role" : "UNKNOWN",
"local-id" : "Src_end_point",
"direction" : "BIDIRECTIONAL",
"service-interface-point" : {
"service-interface-point-uuid" : sip_uuids[0]
},
"protection-role" : "WORK",
"layer-protocol-name" : "PHOTONIC_MEDIA"
},
{
"direction" : "BIDIRECTIONAL",
"service-interface-point" : {
"service-interface-point-uuid" : sip_uuids[1]
},
"protection-role" : "WORK",
"layer-protocol-name" : "PHOTONIC_MEDIA",
"layer-protocol-qualifier" : "tapi-photonic-media:PHOTONIC_LAYER_QUALIFIER_NMC",
"role" : "UNKNOWN",
"local-id" : "Dst_end_point"
}
]
}
}
return create_input
#
# Obtains TAPI context through restconf
#
def get_context(url_context):
resp = requests.get(url_context, auth=('onos', 'rocks'))
if resp.status_code != 200:
raise Exception('GET {}'.format(resp.status_code))
return resp.json()
#
# Requests a connectivity service
#
def request_connection(url_connectivity, context):
# All Context SIPs
sips = context["tapi-common:context"]["service-interface-point"]
# Sorted Photonic Media SIPs. filter is an iterable
esips = list(filter(is_dsr_media, sorted(sips, key=lambda sip: sip["name"][0]["value"])))
endpoints = [esips[0], esips[-1]]
sip_uuids = []
for sip in endpoints:
sip_uuids.append(sip["uuid"])
for uuid in sip_uuids:
print(uuid)
create_input_json = json.dumps(tapi_client_input(sip_uuids))
print (create_input_json)
headers = {'Content-type': 'application/json'}
resp = requests.post(url_connectivity, data=create_input_json, headers=headers, auth=('onos', 'rocks'))
if resp.status_code != 200:
raise Exception('POST {}'.format(resp.status_code))
return resp
#
# Filter method used to keep only SIPs that are photonic_media
#
def is_photonic_media(sip):
return sip["layer-protocol-name"]=="PHOTONIC_MEDIA"
#
# Filter method used to keep only SIPs that are DSR
#
def is_dsr_media(sip):
return sip["layer-protocol-name"]=="DSR"
#
# Processes the topology to verify the correctness
#
def process_topology():
# TODO use method to parse topology
# Getting the Topology
# topology = context["tapi-common:context"]["tapi-topology:topology-context"]["topology"][0]
# nodes = topology["node"];
# links = topology["link"];
noop
#
# Creates a connection first getting the context, parsing for SIPS and then issuing the request.
#
def create_connection(url_context, url_connectivity):
context = get_context(url_context)
return request_connection(url_connectivity, context)
#
# Create a client-side connection. Firstly, get the context, parsing for SIPs that connect
# with each other in line-side; Secondly, issue the request
#
def create_client_connection(url_context, url_connectivity):
context = get_context(url_context)
conn_context = context["tapi-connectivity:connectivity-context"]
return request_connection(url_connectivity, context)
#
# Create a line-side connection. Firstly, get the context, parsing for SIPs with photonic_media type,
# and select one pair of them; Secondly, issue the request
#
def create_line_connection(url_context, url_connectivity):
context = get_context(url_context)
print context
# select the first topo from all topologies
sips = context["tapi-common:context"]["service-interface-point"]
topo = context["tapi-common:context"]["tapi-topology:topology-context"]["topology"][0]
# select the first link from all links of topo
nep_pair = topo["link"][0]["node-edge-point"]
assert topo["uuid"] == nep_pair[0]["topology-uuid"]
assert topo["uuid"] == nep_pair[1]["topology-uuid"]
sip_uuids = extract_photonic_sips(nep_pair, topo, sips)
create_input_json = json.dumps(tapi_line_input(sip_uuids))
print create_input_json
headers = {'Content-type': 'application/json'}
resp = requests.post(url_connectivity, data=create_input_json, headers=headers, auth=('onos', 'rocks'))
if resp.status_code != 200:
raise Exception('POST {}'.format(resp.status_code))
return resp
def extract_photonic_sips(neps, topo, sips):
# parse mapped node and edge point from nep
src_sip_uuid = extract_photonic_sip_uuid(neps[0], topo)
dst_sip_uuid = extract_photonic_sip_uuid(neps[1], topo)
src_sip = extract_photonic_sip(src_sip_uuid, sips)
dst_sip = extract_photonic_sip(dst_sip_uuid, sips)
print "Connection to be built between %s and %s, whose sip_uuid are %s and %s respectively." % \
(src_sip["name"][0]["value"], dst_sip["name"][0]["value"], src_sip_uuid, dst_sip_uuid)
return src_sip_uuid, dst_sip_uuid
def extract_photonic_sip(sip_uuid, sips):
for sip in sips:
if sip["uuid"] == sip_uuid and sip["layer-protocol-name"] == "PHOTONIC_MEDIA":
return sip
return None
def extract_photonic_sip_uuid(nep, topo):
for node in topo["node"]:
if node["uuid"] == nep["node-uuid"]:
oneps = node["owned-node-edge-point"]
for onep in oneps:
if onep["uuid"] == nep["node-edge-point-uuid"]:
# check the length equals 1 to verify the 1-to-1 mapping relationship
assert len(onep["mapped-service-interface-point"]) == 1
sip_uuid = onep["mapped-service-interface-point"][0]["service-interface-point-uuid"]
return sip_uuid
return None
#
# Obtains existing connectivity services
#
def get_connection(url_connectivity, uuid):
# uuid is useless for this method
json = '{}'
headers = {'Content-type': 'application/json'}
resp = requests.post(url_connectivity, data=json, headers=headers, auth=('onos', 'rocks'))
if resp.status_code != 200:
raise Exception('POST {}'.format(resp.status_code))
return resp