blob: 743083129f0c9f73ff90d0f6a13eb7a2985a35df [file] [log] [blame]
#
# Copyright (c) 2013 Big Switch Networks, Inc.
#
# Licensed under the Eclipse Public License, Version 1.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.eclipse.org/legal/epl-v10.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
#
from django.core.validators import validate_ipv4_address, MinValueValidator
from django.db import models
from django.forms import ValidationError
from django.test import TestCase, Client
from django.utils import simplejson
import copy
import urllib
import urllib2
from sdncon.rest.validators import validate_mac_address, RangeValidator, ChoicesValidator
from sdncon.rest.config import add_config_handler, reset_last_config_state, model_instances_equal
# These are the test models used in the unit tests
# When running the unit tests, syncdb looks for model definitions
# in this file (i.e. tests.py), so these will be in the complete
# list of models used by the REST code
class RestTestModel(models.Model):
name = models.CharField(max_length=32, primary_key=True)
enabled = models.BooleanField()
min = models.IntegerField(validators=[MinValueValidator(100)])
range = models.IntegerField(validators=[RangeValidator(10,100)])
internal = models.CharField(max_length=24,default='foo')
ip = models.IPAddressField(validators=[validate_ipv4_address])
mac = models.CharField(max_length=20, validators=[validate_mac_address])
ratio = models.FloatField()
COLOR_CHOICES = (
('red', 'Dark Red'),
('blue', 'Navy Blue'),
('green', 'Green'),
('white', 'Ivory White')
)
color = models.CharField(max_length=20, validators=[ChoicesValidator(COLOR_CHOICES)])
def clean(self):
# This is just a dummy check for testing purposes. Typically you'd only
# use the model-level clean function for validating things across multiple fields
if self.range == 50:
raise ValidationError("Dummy model-level validation failure.")
class Rest:
NAME = 'rest-test'
FIELD_INFO = ({'name':'internal', 'private': True},)
class RestTestTagModel(models.Model):
rest_test = models.ForeignKey(RestTestModel)
name = models.CharField(max_length=32)
value = models.CharField(max_length=256)
class Rest:
NAME = 'rest-test-tag'
FIELD_INFO = ({'name':'rest_test', 'rest_name':'rest-test'},)
class RestTestRenamedFieldModel(models.Model):
test = models.CharField(max_length=16,primary_key=True)
class Rest:
NAME = 'rest-test-renamed'
FIELD_INFO = ({'name':'test', 'rest_name':'renamed'},)
class RestDisabledModel(models.Model):
dummy = models.CharField(max_length=32)
class RestCompoundKeyModel(models.Model):
name = models.CharField(max_length=32)
index = models.IntegerField()
extra = models.CharField(max_length=32, default='dummy')
class CassandraSettings:
COMPOUND_KEY_FIELDS = ['name', 'index']
class Rest:
NAME = 'rest-compound-key'
#############################################################################
# These are some client-side utility functions for getting, putting, deleting
# data using the REST API
#############################################################################
def construct_rest_url(path, query_params=None):
url = 'http://localhost:8000/rest/v1/%s/' % path
if query_params:
url += '?'
url += urllib.urlencode(query_params)
return url
def get_rest_data(type, query_param_dict=None):
url = construct_rest_url(type, query_param_dict)
request = urllib2.Request(url)
response = urllib2.urlopen(request)
response_text = response.read()
obj = simplejson.loads(response_text)
return obj
def put_rest_data(obj, type, query_param_dict=None):
url = construct_rest_url(type, query_param_dict)
post_data = simplejson.dumps(obj)
request = urllib2.Request(url, post_data, {'Content-Type':'application/json'})
request.get_method = lambda: 'PUT'
response = urllib2.urlopen(request)
return response.read()
def delete_rest_data(type, query_param_dict=None):
url = construct_rest_url(type, query_param_dict)
request = urllib2.Request(url)
request.get_method = lambda: 'DELETE'
response = urllib2.urlopen(request)
return response.read()
def test_construct_rest_url(path, query_params=None):
url = '/rest/v1/%s/' % path
if query_params:
url += '?'
url += urllib.urlencode(query_params)
return url
def test_construct_rest_model_url(path, query_params=None):
return test_construct_rest_url('model/' + path, query_params)
def test_get_rest_model_data(type, query_params=None):
url = test_construct_rest_model_url(type,query_params)
c = Client()
response = c.get(url)
return response
def test_put_rest_model_data(obj, type, query_params=None):
url = test_construct_rest_model_url(type, query_params)
data = simplejson.dumps(obj)
c = Client()
response = c.put(url , data, 'application/json')
return response
def test_delete_rest_model_data(type, query_params=None):
url = test_construct_rest_model_url(type, query_params)
c = Client()
response = c.delete(url)
return response
def get_sorted_list(data_list, key_name, create_copy=False):
if create_copy:
data_list = copy.deepcopy(data_list)
if key_name:
key_func = lambda item: item.get(key_name)
else:
key_func = lambda item: item[0]
data_list.sort(key = key_func)
return data_list
#############################################################################
# The actual unit test classes
#############################################################################
class BasicFunctionalityTest(TestCase):
REST_TEST_DATA = [
{'name':'foobar','enabled':True,'min':100,'range':30,'ip':'192.168.1.1','mac':'00:01:22:34:56:af','ratio':4.56,'color':'red'},
{'name':'sdnplatform','enabled':False,'min':400,'range':45,'ip':'192.168.1.2','mac':'00:01:22:CC:56:DD','ratio':8.76,'color':'green'},
{'name':'foobar2','enabled':True,'min':1000,'range':100,'ip':'192.168.1.3','mac':'00:01:22:34:56:af','ratio':1,'color':'white'},
]
REST_TEST_TAG_DATA = [
{'id':'1','rest-test':'foobar','name':'one','value':'testit'},
{'id':'2','rest-test':'foobar','name':'two','value':'test2'},
{'id':'3','rest-test':'sdnplatform','name':'three','value':'three'},
]
def setUp(self):
# Create the RestTestModel instances
rest_test_list = []
for rtd in self.REST_TEST_DATA:
rt = RestTestModel(**rtd)
rt.save()
rest_test_list.append(rt)
# Create the RestTestTagModel instances
for rttd in self.REST_TEST_TAG_DATA:
rttd_init = rttd.copy()
for rt in rest_test_list:
if rt.name == rttd['rest-test']:
del rttd_init['rest-test']
rttd_init['rest_test'] = rt
break
else:
raise Exception('Invalid initialization data for REST unit tests')
rtt = RestTestTagModel(**rttd_init)
rtt.save()
rtrf = RestTestRenamedFieldModel(test='test')
rtrf.save()
# Create the RestDisabledModel instance. We're only going to test that
# we can't access this model via the REST API, so we don't need to keep
# track of the data used for the instance
#rdm = RestDisabledModel(dummy='dummy')
#rdm.save()
def check_rest_test_data_result(self, data_list, expected_data_list, message=None):
data_list = get_sorted_list(data_list, 'name', True)
expected_data_list = get_sorted_list(expected_data_list, 'name', True)
self.assertEqual(len(data_list), len(expected_data_list), message)
for i in range(len(data_list)):
data = data_list[i]
expected_data = expected_data_list[i]
self.assertEqual(data['name'], expected_data['name'], message)
self.assertEqual(data['enabled'], expected_data['enabled'], message)
self.assertEqual(data['min'], expected_data['min'], message)
self.assertEqual(data['range'], expected_data['range'], message)
self.assertEqual(data['ip'], expected_data['ip'], message)
self.assertEqual(data['mac'], expected_data['mac'], message)
self.assertAlmostEqual(data['ratio'], expected_data['ratio'], 7, message)
self.assertEqual(data['color'], expected_data['color'], message)
def check_rest_test_tag_data_result(self, data_list, expected_data_list, message=None):
data_list = get_sorted_list(data_list, 'name', True)
expected_data_list = get_sorted_list(expected_data_list, 'name', True)
self.assertEqual(len(data_list), len(expected_data_list), message)
for i in range(len(data_list)):
data = data_list[i]
expected_data = expected_data_list[i]
self.assertEqual(str(data['id']), expected_data['id'], message)
self.assertEqual(data['rest-test'], expected_data['rest-test'], message)
self.assertEqual(data['name'], expected_data['name'], message)
self.assertEqual(data['value'], expected_data['value'], message)
def test_put_one(self):
test_put_rest_model_data({'min':200,'color':'white'}, 'rest-test/foobar')
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
rtd_copy = copy.deepcopy(self.REST_TEST_DATA)
rtd_copy[0]['min'] = 200
rtd_copy[0]['color'] = 'white'
self.check_rest_test_data_result(data, rtd_copy)
def test_put_query(self):
test_put_rest_model_data({'min':200,'color':'white'}, 'rest-test', {'enabled':True})
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
rtd_copy = copy.deepcopy(self.REST_TEST_DATA)
for rtd in rtd_copy:
if rtd['enabled']:
rtd['min'] = 200
rtd['color'] = 'white'
self.check_rest_test_data_result(data, rtd_copy)
def test_create_one(self):
rtd_copy = copy.deepcopy(self.REST_TEST_DATA)
create_rtd = rtd_copy[0].copy()
create_rtd['name'] = 'test-create'
rtd_copy.append(create_rtd)
test_put_rest_model_data(create_rtd, 'rest-test')
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
self.check_rest_test_data_result(data, rtd_copy)
def test_create_many(self):
rtd_copy = copy.deepcopy(self.REST_TEST_DATA)
create_rtd1 = rtd_copy[0].copy()
create_rtd1['name'] = 'test-create1'
create_rtd1['min'] = 2000
rtd_copy.append(create_rtd1)
create_rtd2 = rtd_copy[0].copy()
create_rtd2['name'] = 'test-create2'
create_rtd1['min'] = 4000
rtd_copy.append(create_rtd2)
test_put_rest_model_data([create_rtd1,create_rtd2], 'rest-test')
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
self.check_rest_test_data_result(data, rtd_copy)
def test_get(self):
RTD = self.REST_TEST_DATA
RTTD = self.REST_TEST_TAG_DATA
GET_TEST_VECTORS = (
#model_name, query_params, expected_data
('rest-test', None, RTD, 'Failure getting all rest-test instances'),
('rest-test-tag', None, RTTD, 'Failure getting all rest-test-tag instances'),
('rest-test/foobar', None, RTD[0], 'Failure getting single rest-test instance by URL'),
('rest-test', {'name':'foobar'}, [RTD[0]], 'Failure querying for single rest-test instance'),
('rest-test', {'name':'foobar','nolist':True}, RTD[0], 'Failure querying for single rest-test instance using nolist'),
('rest-test', {'enabled':True}, [RTD[0],RTD[2]], 'Failure querying for multiple rest-test instances'),
('rest-test', {'name__startswith':'foobar'}, [RTD[0],RTD[2]], 'Failure querying for multiple rest-test instances using startswith'),
('rest-test', {'orderby':'ratio'}, [RTD[2],RTD[0],RTD[1]], 'Failure querying with ascending orderby'),
('rest-test', {'orderby':'-ratio'}, [RTD[1],RTD[0],RTD[2]], 'Failure querying with descending orderby'),
('rest-test', {'orderby':'enabled,-ratio'}, [RTD[1],RTD[0],RTD[2]], 'Failure querying with multi-field orderby'),
('rest-test-tag', {'rest-test':'foobar'}, [RTTD[0],RTTD[1]], 'Failure querying by ForeignKey value'),
('rest-test-tag', {'rest-test__startswith':'foo'}, [RTTD[0],RTTD[1]], 'Failure querying by ForeignKey value'),
('rest-test-tag', {'rest-test__contains':'swi'}, [RTTD[2]], 'Failure querying by ForeignKey value'),
)
for model_name, query_params, expected_data, message in GET_TEST_VECTORS:
response = test_get_rest_model_data(model_name, query_params)
data = simplejson.loads(response.content)
#print model_name
#self.check_rest_test_data_result(data, expected_data, message)
if type(data) is not list:
data = [data]
#data = get_sorted_list(data, 'name', False)
if type(expected_data) is not list:
expected_data = [expected_data]
#expected_data = get_sorted_list(expected_data, 'name', True)
if model_name.startswith('rest-test-tag'):
self.check_rest_test_tag_data_result(data, expected_data, message)
else:
self.check_rest_test_data_result(data, expected_data, message)
def test_renamed_field(self):
response = test_get_rest_model_data('rest-test-renamed/test')
self.assertEqual(response.status_code,200)
data = simplejson.loads(response.content)
self.assertEqual(data['renamed'], 'test')
def test_delete_one(self):
test_delete_rest_model_data('rest-test/foobar')
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
self.check_rest_test_data_result(data, self.REST_TEST_DATA[1:])
def test_delete_query(self):
test_delete_rest_model_data('rest-test', {'name__startswith':'foobar'})
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
self.check_rest_test_data_result(data, [self.REST_TEST_DATA[1]])
def test_delete_all(self):
test_delete_rest_model_data('rest-test')
response = test_get_rest_model_data('rest-test')
data = simplejson.loads(response.content)
self.assertEqual(len(data),0)
class NegativeTest(TestCase):
def test_invalid_model_name(self):
response = test_get_rest_model_data('foobar')
self.assertEqual(response.status_code,400)
response_obj = simplejson.loads(response.content)
self.assertEqual(response_obj['error_type'],'RestInvalidDataTypeException')
def test_resource_not_found(self):
response = test_get_rest_model_data('rest-test/foobar')
self.assertEqual(response.status_code,404)
response_obj = simplejson.loads(response.content)
self.assertEqual(response_obj['error_type'],'RestResourceNotFoundException')
def test_invalid_orderby(self):
response = test_get_rest_model_data('rest-test', {'orderby':'foobar'})
self.assertEqual(response.status_code,400)
response_obj = simplejson.loads(response.content)
self.assertEqual(response_obj['error_type'],'RestInvalidOrderByException')
def test_hidden_model(self):
response = test_get_rest_model_data('restdisabledmodel')
self.assertEqual(response.status_code,400)
response_obj = simplejson.loads(response.content)
self.assertEqual(response_obj['error_type'],'RestInvalidDataTypeException')
class ValidationTest(TestCase):
DEFAULT_DATA = {'name':'foobar','enabled':True,'min':100,'range':30,'ip':'192.168.1.1','mac':'00:01:22:34:56:af','ratio':4.56,'color':'red'}
TEST_VECTORS = (
#update-dict, error-type, error-messgae
({'min':99}, ['min'],'Min value validation failed'),
({'range':8}, ['range'],'Min range validation failed'),
({'range':2000}, ['range'],'Max range validation failed'),
({'range':50}, None,'Model-level clean validation failed'),
({'mac':'00:01:02:03:05'}, ['mac'],'Too short MAC address validation failed'),
({'mac':'00:01:02:03:05:99:45'}, ['mac'],'Too long MAC address validation failed'),
({'mac':'00,01,02,03,05,99'}, ['mac'],'MAC address separator char validation failed'),
({'mac':'0r:01:02:03:05:99'}, ['mac'],'MAC address character validation failed'),
({'mac':'123:01:02:03:05:99'}, ['mac'],'MAC address byte length validation failed'),
({'color':'purple'}, ['color'],'Choices validation failed'),
({'ip':'foobar'}, ['ip'], 'Invalid IP address char validation failed'),
({'ip':'192.168.1'}, ['ip'], 'Too short IP address char validation failed'),
({'ip':'192.168.1.0.5'}, ['ip'], 'Too long IP address char validation failed'),
({'ip':'192,168,1,0'}, ['ip'], 'IP address separator char validation failed'),
({'ip':'256.168.1.0'}, ['ip'], 'Out of range IP address byte validation failed'),
({'min':99, 'ip':'256.168.1.0'}, ['min', 'ip'], 'Multiple field validation failed'),
)
def check_response(self, response, invalid_fields, message):
self.assertEqual(response.status_code,400)
response_obj = simplejson.loads(response.content)
self.assertEqual(response_obj['error_type'], 'RestValidationException', message)
if invalid_fields:
self.assertEqual(response_obj.get('model_error'), None)
field_errors = response_obj.get('field_errors')
self.assertNotEqual(field_errors, None)
self.assertEqual(len(invalid_fields), len(field_errors), message)
for field_name in field_errors.keys():
self.assertTrue(field_name in invalid_fields, message)
else:
self.assertEqual(response_obj.get('field_errors'), None)
self.assertNotEqual(response_obj.get('model_error'), None)
# FIXME: Maybe check the description here too?
def test_create_validation(self):
for test_data, invalid_fields, message in self.TEST_VECTORS:
data = self.DEFAULT_DATA.copy()
for name, value in test_data.items():
data[name] = value
put_response = test_put_rest_model_data(data, 'rest-test')
self.check_response(put_response, invalid_fields, message)
def test_update_validation(self):
# First add an instance with the default data and check that it's OK
put_response = test_put_rest_model_data(self.DEFAULT_DATA, 'rest-test')
self.assertEqual(put_response.status_code,200)
for test_data, invalid_fields, message in self.TEST_VECTORS:
put_response = test_put_rest_model_data(test_data, 'rest-test/foobar')
self.check_response(put_response, invalid_fields, message)
class ConfigHandlerTest(TestCase):
TEST_DATA = {'name':'foobar','enabled':True,'min':100,'range':30,'ip':'192.168.1.1','mac':'00:01:22:34:56:af','ratio':4.56,'color':'red'}
test_op = None
test_old_instance = None
test_new_instance = None
test_modified_fields = None
@staticmethod
def reset_config_handler():
ConfigHandlerTest.test_op = None
ConfigHandlerTest.test_old_instance = None
ConfigHandlerTest.test_new_instance = None
ConfigHandlerTest.test_modified_fields = None
@staticmethod
def config_handler(op, old_instance, new_instance, modified_fields):
ConfigHandlerTest.test_op = op
ConfigHandlerTest.test_old_instance = old_instance
ConfigHandlerTest.test_new_instance = new_instance
ConfigHandlerTest.test_modified_fields = modified_fields
def check_config_handler(self, expected_op, expected_old_instance, expected_new_instance, expected_modified_fields):
self.assertEqual(ConfigHandlerTest.test_op, expected_op)
self.assertTrue(model_instances_equal(ConfigHandlerTest.test_old_instance, expected_old_instance))
self.assertTrue(model_instances_equal(ConfigHandlerTest.test_new_instance, expected_new_instance))
self.assertEqual(ConfigHandlerTest.test_modified_fields, expected_modified_fields)
def test_config(self):
reset_last_config_state()
# Install the config handler
field_list = ['internal', 'min', 'mac']
add_config_handler({RestTestModel: field_list}, ConfigHandlerTest.config_handler)
# Check that config handler is triggered on an insert
ConfigHandlerTest.reset_config_handler()
test = RestTestModel(**self.TEST_DATA)
test.save()
self.check_config_handler('INSERT', None, test, None)
# Check that config handler is triggered on an update
ConfigHandlerTest.reset_config_handler()
expected_old = RestTestModel.objects.get(pk='foobar')
test.internal = 'check'
test.min = 125
test.save()
self.check_config_handler('UPDATE', expected_old, test, {'internal': 'check', 'min': 125})
# Check that config handler is not triggered on an update to a field that
# it's not configured to care about
ConfigHandlerTest.reset_config_handler()
test.max = 500
test.save()
self.check_config_handler(None, None, None, None)
# Check that config handler is triggered on a delete
ConfigHandlerTest.reset_config_handler()
test.delete()
# delete() clears the pk which messes up the instance
# comparison logic in check_config_handler, so we hack
# it back to the value it had before the delete
test.name = 'foobar'
self.check_config_handler('DELETE', test, None, None)
class CompoundKeyModelTest(TestCase):
TEST_DATA = [
{'name': 'foo', 'index': 3},
{'name': 'foo', 'index': 7},
{'name': 'bar', 'index': 2, 'extra': 'abc'},
{'name': 'bar', 'index': 4, 'extra': 'test'},
]
def setUp(self):
for data in self.TEST_DATA:
test_put_rest_model_data(data, 'rest-compound-key')
def check_query_results(self, actual_results, expected_results):
self.assertEqual(len(actual_results), len(expected_results))
actual_results = get_sorted_list(actual_results, 'id', False)
expected_results = copy.deepcopy(expected_results)
for expected_result in expected_results:
expected_result['id'] = expected_result['name'] + '|' + str(expected_result['index'])
expected_results = get_sorted_list(expected_results, 'id')
for actual_result, expected_result in zip(actual_results, expected_results):
self.assertEqual(actual_result['id'], expected_result['id'])
self.assertEqual(actual_result['name'], expected_result['name'])
self.assertEqual(actual_result['index'], expected_result['index'])
expected_extra = expected_result.get('extra', 'dummy')
self.assertEqual(actual_result['extra'], expected_extra)
def test_set_up(self):
response = test_get_rest_model_data('rest-compound-key')
self.assertEqual(response.status_code, 200)
actual_results = simplejson.loads(response.content)
self.check_query_results(actual_results, self.TEST_DATA)
def test_delete_by_id(self):
test_delete_rest_model_data('rest-compound-key', {'id': 'foo|3'})
test_delete_rest_model_data('rest-compound-key', {'id': 'bar|4'})
response = test_get_rest_model_data('rest-compound-key')
self.assertEqual(response.status_code, 200)
actual_results = simplejson.loads(response.content)
self.check_query_results(actual_results, self.TEST_DATA[1:-1])
def test_delete_by_fields(self):
test_delete_rest_model_data('rest-compound-key', {'name': 'bar', 'index': 4})
response = test_get_rest_model_data('rest-compound-key')
self.assertEqual(response.status_code, 200)
actual_results = simplejson.loads(response.content)
self.check_query_results(actual_results, self.TEST_DATA[:-1])
class UserDataTest(TestCase):
TEST_DATA = [
# name, data, binary, content-type
('test/foobar', 'hello world\nanother line', False, None),
('test/json-test', '[0,1,2]', False, 'application/json'),
('test/binary-test', '\x01\x02\x03\x04\x55', True, None),
]
MORE_TEST_DATA = [
('moretests/foo1', 'moretest1', False, None),
('moretests/foo2', 'moretest2', False, None),
]
def add_user_data(self, client, user_data):
for name, data, binary, content_type in user_data:
if not content_type:
if binary:
content_type = 'application/octet-stream'
else:
content_type = 'text/plain'
url = test_construct_rest_url('data/' + name, {'binary':binary})
response = client.put(url, data, content_type)
self.assertEquals(response.status_code, 200)
def check_expected_info_list(self, info_list, expected_info_list):
self.assertEqual(type(info_list), list)
info_list = get_sorted_list(info_list, 'name', True)
expected_info_list = get_sorted_list(expected_info_list, None, True)
self.assertEqual(len(info_list), len(expected_info_list))
for i in range(len(info_list)):
data = info_list[i]
expected_data = expected_info_list[i]
expected_name = expected_data[0]
expected_url_path = 'rest/v1/data/' + expected_name + '/'
self.assertEqual(data['name'], expected_name)
self.assertEqual(data['url_path'], expected_url_path)
def get_info_list(self, client, query_params=None):
url = test_construct_rest_url('data', query_params)
get_response = client.get(url)
self.assertEqual(get_response.status_code, 200)
info_list = simplejson.loads(get_response.content)
# The view for listing user data items hard-codes inclusion of some
# *magic* user data items corresponding to the startup-config
# and update-config, depending on the presence of certain files
# in the file system. To make the unit tests work correctly if/when
# either/both of those items are added we do this filtering pass
# over the items returned from the REST API. Ugly hack!
info_list = [item for item in info_list if '-config' not in item['name']]
return info_list
def test_user_data_basic(self):
c = Client()
self.add_user_data(c, self.TEST_DATA)
for name, data, binary, content_type in self.TEST_DATA:
if not content_type:
if binary:
content_type = 'application/octet-stream'
else:
content_type = 'text/plain'
url = test_construct_rest_url('data/' + name)
get_response = c.get(url)
self.assertEqual(get_response.content, data)
self.assertEqual(get_response['Content-Type'], content_type)
def test_user_data_delete(self):
c = Client()
self.add_user_data(c, self.TEST_DATA)
self.add_user_data(c, self.MORE_TEST_DATA)
# Do a delete with a query filter
url = test_construct_rest_url('data', {'name__startswith':'moretests/'})
response = c.delete(url)
self.assertEqual(response.status_code, 200)
info_list = self.get_info_list(c)
expected_info_list = self.TEST_DATA
self.check_expected_info_list(info_list, expected_info_list)
# Do a delete of a specific user data element
url = test_construct_rest_url('data/' + self.TEST_DATA[0][0])
response = c.delete(url)
self.assertEqual(response.status_code, 200)
info_list = self.get_info_list(c)
expected_info_list = self.TEST_DATA[1:]
self.check_expected_info_list(info_list, expected_info_list)
def test_user_data_info(self):
c = Client()
self.add_user_data(c, self.TEST_DATA)
self.add_user_data(c, self.MORE_TEST_DATA)
info_list = self.get_info_list(c)
expected_info_list = self.TEST_DATA + self.MORE_TEST_DATA
self.check_expected_info_list(info_list, expected_info_list)
info_list = self.get_info_list(c, {'name__startswith':'test/'})
expected_info_list = self.TEST_DATA
self.check_expected_info_list(info_list, expected_info_list)