Base net-virt CLI files on top of which ONOS specific changes will be done
diff --git a/cli/sdncon/clusterAdmin/__init__.py b/cli/sdncon/clusterAdmin/__init__.py
new file mode 100755
index 0000000..9ab2783
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/__init__.py
@@ -0,0 +1,16 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
diff --git a/cli/sdncon/clusterAdmin/admin.py b/cli/sdncon/clusterAdmin/admin.py
new file mode 100755
index 0000000..2a4a993
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/admin.py
@@ -0,0 +1,22 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from sdncon.clusterAdmin.models import Customer, Cluster, CustomerUser
+from django.contrib import admin
+
+admin.site.register(Customer)
+admin.site.register(Cluster)
+admin.site.register(CustomerUser)
diff --git a/cli/sdncon/clusterAdmin/middleware.py b/cli/sdncon/clusterAdmin/middleware.py
new file mode 100755
index 0000000..f66b367
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/middleware.py
@@ -0,0 +1,269 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+import sys, re
+from django.conf import settings
+from django.contrib.auth.views import login
+from django.http import HttpResponseRedirect, HttpResponse
+from django.utils import simplejson
+from django.contrib import auth
+from django.contrib.auth.models import User, AnonymousUser
+
+from .utils import isCloudBuild
+from .models import CustomerUser, Customer, Cluster, AuthToken
+
+import logging
+debugLevel = logging.INFO
+logfile = None
+
+# For testing, uncomment as required
+#debugLevel = logging.DEBUG
+#logfile = 'middleware.log'
+
+def initLogger():
+ logger = logging.getLogger('middleware')
+ formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s %(message)s')
+ logger.setLevel(debugLevel)
+
+ # Add a file handler
+ if logfile:
+ file_handler = logging.FileHandler(logfile)
+ file_handler.setFormatter(formatter)
+ logger.addHandler(file_handler)
+
+ # Add a console handler
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(formatter)
+ console_handler.setLevel(debugLevel)
+ logger.addHandler(console_handler)
+ return logger
+
+logger = initLogger()
+
+def is_localhost(request):
+ return request.META['REMOTE_ADDR'] in ['127.0.0.1', '127.0.1.1', 'localhost']
+
+class RequireAuthMiddleware(object):
+ """RequireAuthMiddleware: Middleware to enforce authentication
+
+ If it is enabled, every Django-powered page, except LOGIN_URL and the list of EXEMPT_URLS,
+ will require authentication
+
+ Unautenticated user requests are redirected to the login page set (LOGIN_URL in settings)
+ Unautenticated REST calls which are returned a JSON error as a 403 forbidden response
+ """
+
+ def __init__(self):
+ self.enforce_auth = isCloudBuild() # For now, enforce authentication only if we are a cloud instance
+ self.login_url = getattr(settings, 'LOGIN_URL', '/accounts/login/')
+ self.exempt_urls = [self.login_url]
+ self.exempt_urls += getattr(settings, 'EXEMPT_URLS', [] )
+ self.rest_prefix = getattr(settings, 'REST_PREFIX', '/rest/')
+ if self.enforce_auth:
+ logger.info('RequireAuthMiddleware: Enforcing Authentication')
+
+ def process_request(self, request):
+ if self.enforce_auth:
+ if is_localhost(request):
+ return None
+ if request.user.is_anonymous():
+ for url in self.exempt_urls:
+ if request.path.startswith(url):
+ return None
+ return self.redirect(request)
+ return None
+
+ def process_response(self, request, response):
+ if request.path.startswith(self.login_url):
+ response['x-bsc-auth-status'] = 'required'
+ return response
+
+ def redirect(self, request):
+ if self.rest_prefix in request.path:
+ logger.warn('RequireAuthMiddleware: Unauthenticated REST request: %s, %s, %s' % (
+ request.path, str(request.user), request.META['REMOTE_ADDR']))
+ json_content_type = 'application/json'
+ json_error_response = {'error_type': 'auth', 'description': 'Authentication error'}
+ return HttpResponse(simplejson.dumps(json_error_response), json_content_type, 403)
+
+ logger.debug('RequireAuthMiddleware: Redirecting to login page: %s, %s, %s' % (
+ request.path, str(request.user), request.META['REMOTE_ADDR']))
+ return HttpResponseRedirect('%s?next=%s' % (self.login_url, request.path))
+
+
+class ClusterAuthenticate(object):
+ """ClusterAuthenticate: Middleware that authenticates/sets credentials for the REST requests
+
+ If this is a REST request:
+ If request is from localhost
+ authorize it and set the user as admin
+ If a the user already has an authenticated session
+ validate that this user is authorized to access the requested cluster
+ If it is from an anonynous user:
+ extract auth token from the request parameters and
+ verify that the token is in authorized for the requested cluster
+ (for now, just validate that the user associated with the token
+ is authorized to access the requested cluster. In future, we may do more)
+ """
+
+ token_param_name = 'auth-token' # Name of the auth token parameter in the query string
+ enforce_auth = isCloudBuild() # For now, enfore auth only for the cloud instance
+ bypass_localhost_check = False # Set to true to force auth token check even on localhost
+
+ def process_request(self, request):
+ logger.debug('url: ' + request.path)
+ if not self.enforce_auth:
+ logger.debug('ClusterAuthenticate ignored: is disabled')
+ return
+ if is_localhost(request) and not self.bypass_localhost_check:
+ logger.debug('ClusterAuthenticate ignored: is local request')
+ return
+ if self.get_req_cluster_name(request) is None:
+ logger.debug('ClusterAuthenticate ignored: no cluster name in request');
+ return
+
+ # Find user and autorized clusters
+ cluster_name = self.get_req_cluster_name(request)
+ cluster = self.get_cluster_from_name(cluster_name)
+ user = AnonymousUser()
+ if hasattr(request, 'session'):
+ user = auth.get_user(request)
+ logger.debug('User from request: ' + str(user))
+ if user.is_authenticated():
+ allowed_clusters = self.get_allowed_clusters_for_user(user)
+ else:
+ token_string = self.get_req_token_string(request)
+ user = self.get_user_for_token(token_string)
+ allowed_clusters = self.get_allowed_clusters_for_token(token_string)
+ logger.debug('Checking authorization for cluster ' + str(cluster) +
+ ' in cluster list ' + str(allowed_clusters) +
+ ' for ' + str(user))
+
+ # Do validation/set user for request
+ if hasattr(request, 'user'):
+ request.user = AnonymousUser()
+ request._cached_user = AnonymousUser()
+ if cluster and allowed_clusters:
+ if cluster in allowed_clusters:
+ request.user = user
+ request._cached_user = user
+ return
+
+ def validate_session(self, request):
+ return True
+
+ def get_req_cluster_name(self, request):
+ # Find last entry in path, remove parameters
+ path = request.path
+ cluster_name = None
+ try:
+ pathcomps = path.split('/')
+ if 'rest' in (pathcomps[1],pathcomps[2]):
+ cluster_name_idx = 5
+ if pathcomps[cluster_name_idx].find(':') > -1:
+ cluster_name = pathcomps[cluster_name_idx]
+ except (TypeError, IndexError):
+ logger.debug('Type error parsing path for customer: ' + str(path))
+ return None
+ except Exception:
+ logger.debug('Unknown error parsing path for customer: ' + str(path))
+ return None
+ return cluster_name
+
+ def get_cluster_from_name(self, cluster_name):
+ if cluster_name:
+ for cluster in Cluster.objects.all():
+ if cluster_name == cluster.id:
+ logger.debug('Mapped request to cluster "%s"' % cluster_name)
+ return cluster
+ logger.warn('Failed to map request to cluster: "%s"' % cluster_name)
+ return None
+
+ def get_req_token_string(self, request):
+ token_string = None
+ try:
+ token_string = request.REQUEST[self.token_param_name]
+ token_string = token_string.upper()
+ except KeyError:
+ logger.debug('No token param in request')
+ return token_string
+
+ def get_token_from_name(self, token_string):
+ token = None
+ if token_string:
+ try:
+ token = AuthToken.objects.get(id=token_string)
+ except AuthToken.DoesNotExist:
+ logger.debug('Token not found in DB: ' + str(token_string))
+ token = None
+ except Exception:
+ logger.debug('Auth Tokens not configured in DB? - ' + str(token_string))
+ token = None
+ return token
+
+ def get_user_for_token(self, token_string):
+ user = AnonymousUser()
+ token = self.get_token_from_name(token_string)
+ if token:
+ user = token.user
+ return user
+
+ def get_allowed_clusters_for_token(self, token_string):
+ """Given an auth token, generate the list of clusters for which it gives credentials
+
+ If the cluster entry in the token object is present, use that
+ If the customer entry in the token object is present, return all clusters for the customer
+ If the user entry in the token object is present, use that to get a list of clusters.
+
+ In the future, DB changes may provide varying granularity.
+ """
+
+ logger.debug('Got token: ' + str(token_string))
+
+ cluster_list = []
+ token = self.get_token_from_name(token_string)
+ if token:
+ if token.cluster is not None:
+ logger.debug('token mapped to cluster ' + str(token.cluster))
+ cluster_list = [token.cluster]
+ elif token.customer is not None:
+ logger.debug('token mapped to customer ' + str(token.customer))
+ cluster_list = Cluster.objects.filter(customer=token.customer)
+ else:
+ logger.debug('token mapped to user ' + str(token.user))
+ cluster_list = self.get_allowed_clusters_for_user(token.user)
+
+ logger.debug('Returning clust list ' + str(cluster_list))
+ return cluster_list
+
+ def get_allowed_clusters_for_user(self, user):
+ """Given a user, generate the list of clusters for which it gives credentials
+ Use user to map to a customer (list) and from there to a list of clusters.
+ """
+
+ logger.debug('Got user: ' + str(user))
+
+ cluster_list = []
+ if user:
+ for cust_user in CustomerUser.objects.filter(user=user):
+ # Currently the query below isn't working. Brute force alternative given
+ #cluster_list.extend(Cluster.objects.filter(customer=cust_user.customer))
+ for cluster in Cluster.objects.all():
+ if cluster.customer == cust_user.customer:
+ cluster_list.append(cluster)
+
+ logger.debug('Returning cluster list ' + str(cluster_list))
+ return cluster_list
diff --git a/cli/sdncon/clusterAdmin/models.py b/cli/sdncon/clusterAdmin/models.py
new file mode 100755
index 0000000..9090fb6
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/models.py
@@ -0,0 +1,170 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+import datetime
+from django.db import models
+from django.contrib.auth.models import User
+
+# Create your models here.
+class CustomerManager(models.Manager):
+
+ def create_customer(self, name, email):
+ """
+ Creates and saves a customer with the given name and email
+ """
+ now = datetime.datetime.now()
+
+ # Normalize the address by lowercasing the domain part of the email address
+ try:
+ email_name, domain_part = email.strip().split('@', 1)
+ except ValueError:
+ pass
+ else:
+ email = '@'.join([email_name, domain_part.lower()])
+
+ customer = self.model(customername=name, is_active=True, date_joined=now)
+ customer.save(using=self._db)
+ return customer
+
+class ClusterManager(models.Manager):
+
+ def create_cluster(self, name, customer):
+ """
+ Creates and saves a cluster with the given cluster name and the customer
+ """
+ print "clusterManager create_cluster"
+ now = datetime.datetime.now()
+
+ id = ":".join([customer.customername, name])
+ cluster = self.model(id=id, clustername=name, is_active=True, date_joined=now)
+ cluster.save(using=self._db)
+ return cluster
+
+class CustomerUserManager(models.Manager):
+
+ def create_customerUser(self, user, customer):
+ """
+ Creates and saves a customer user membership
+ """
+ id = ":".join([customer.customername, user.username])
+ cu = self.model(id=id, user=user, customer=customer)
+ cu.save(using=self._db)
+ return cu
+
+class Customer(models.Model):
+ """
+ Customer defines a customer in the cloud server.
+ """
+ customername = models.CharField('customer name',
+ primary_key=True,
+ max_length=30,
+ help_text="30 characters or fewer. Letters, numbers and @/./+/-/_ characters")
+ email = models.EmailField('e-mail address', blank=True)
+ is_active = models.BooleanField('active',
+ default=True,
+ help_text="Designates whether this Customer should be treated as active. Unselect this instead of deleting accounts.")
+ date_joined = models.DateTimeField('date joined', default=datetime.datetime.now)
+ objects = CustomerManager()
+
+ def __unicode__(self):
+ return self.customername
+
+class Cluster(models.Model):
+ """
+ Cluster defines a cluster of nodes in the cloud server.
+ """
+ id = models.CharField(
+ primary_key=True,
+ verbose_name='Cluster ID',
+ max_length=75,
+ help_text='Unique identifier for the cluster; format is customername:clustername')
+ clustername = models.CharField('cluster name',
+ max_length=30,
+ help_text="Required. 30 characters or fewer. Letters, numbers and @/./+/-/_ characters")
+ is_active = models.BooleanField('active',
+ default=True,
+ help_text="Designates whether this cluster should be treated as active. Unselect this instead of deleting the cluster.")
+ date_joined = models.DateTimeField('date joined',
+ default=datetime.datetime.now)
+
+ customer = models.ForeignKey(Customer, on_delete=models.CASCADE)
+
+ objects = ClusterManager()
+
+ def __unicode__(self):
+ return self.id
+
+
+class CustomerUser(models.Model):
+ """
+ This is a temporary model that captures the list of users in a given customer
+ """
+ id = models.CharField(
+ primary_key=True,
+ verbose_name='Customer User',
+ max_length=75,
+ help_text='Unique relationship that shows the customer which the user belongs; \
+ format is customername:username')
+ user = models.ForeignKey(User, on_delete=models.CASCADE)
+ customer = models.ForeignKey(Customer, on_delete=models.CASCADE)
+
+ objects = CustomerUserManager()
+
+ def __unicode__(self):
+ return self.id
+
+
+class AuthToken(models.Model):
+ """
+ Store the authentication token as an ascii string
+ Associate various credential options: user, cluster, customer. At least
+ one of these must be populated to be a valid entry.
+ """
+ id = models.CharField(
+ primary_key=True,
+ max_length = 64)
+
+ cluster = models.ForeignKey(
+ Cluster,
+ blank=True,
+ null=True)
+
+ user = models.ForeignKey(
+ User,
+ blank=True,
+ null=True)
+
+ customer = models.ForeignKey(
+ Customer,
+ blank=True,
+ null=True)
+
+ expiration_date = models.DateTimeField(
+ verbose_name='Expiration Date',
+ help_text='Date when the authentication token expires',
+ blank=True,
+ null=True)
+
+ annotation = models.CharField(
+ verbose_name='Annotation',
+ help_text='Track creation information such as user',
+ max_length=512,
+ blank=True,
+ null=True)
+
+ def __unicode__(self):
+ return str(self.id)
+
diff --git a/cli/sdncon/clusterAdmin/tests.py b/cli/sdncon/clusterAdmin/tests.py
new file mode 100755
index 0000000..793e610
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/tests.py
@@ -0,0 +1,39 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+"""
+This file demonstrates two different styles of tests (one doctest and one
+unittest). These will both pass when you run "manage.py test".
+
+Replace these with more appropriate tests for your application.
+"""
+
+from django.test import TestCase
+
+class SimpleTest(TestCase):
+ def test_basic_addition(self):
+ """
+ Tests that 1 + 1 always equals 2.
+ """
+ self.failUnlessEqual(1 + 1, 2)
+
+__test__ = {"doctest": """
+Another way to test that 1 + 1 is equal to 2.
+
+>>> 1 + 1 == 2
+True
+"""}
+
diff --git a/cli/sdncon/clusterAdmin/tests/__init__.py b/cli/sdncon/clusterAdmin/tests/__init__.py
new file mode 100755
index 0000000..cdc3d76
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/tests/__init__.py
@@ -0,0 +1,35 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+#from django.contrib.auth.tests.auth_backends import BackendTest, RowlevelBackendTest, AnonymousUserBackendTest, NoAnonymousUserBackendTest
+
+from django.contrib.auth.tests.basic import BasicTestCase
+from django.contrib.auth.tests.decorators import LoginRequiredTestCase
+#from django.contrib.auth.tests.forms import UserCreationFormTest, AuthenticationFormTest, SetPasswordFormTest, PasswordChangeFormTest, UserChangeFormTest, PasswordResetFormTest
+from django.contrib.auth.tests.remote_user \
+ import RemoteUserTest, RemoteUserNoCreateTest, RemoteUserCustomTest
+from django.contrib.auth.tests.models import ProfileTestCase
+from django.contrib.auth.tests.tokens import TokenGeneratorTest
+#from django.contrib.auth.tests.views \
+# import PasswordResetTest, ChangePasswordTest, LoginTest, LogoutTest
+
+
+from sdncon.clusterAdmin.tests.auth_backends import BackendTest, RowlevelBackendTest, AnonymousUserBackendTest, NoAnonymousUserBackendTest
+from sdncon.clusterAdmin.tests.forms import UserCreationFormTest, AuthenticationFormTest, SetPasswordFormTest, PasswordChangeFormTest, UserChangeFormTest, PasswordResetFormTest
+from sdncon.clusterAdmin.tests.views \
+ import PasswordResetTest, ChangePasswordTest, LoginTest, LogoutTest
+
+# The password for the fixture data users is 'password'
diff --git a/cli/sdncon/clusterAdmin/tests/auth_backends.py b/cli/sdncon/clusterAdmin/tests/auth_backends.py
new file mode 100755
index 0000000..ce26859
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/tests/auth_backends.py
@@ -0,0 +1,280 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+import warnings
+
+from django.conf import settings
+from django.contrib.auth.models import User, Group, Permission, AnonymousUser
+from django.contrib.contenttypes.models import ContentType
+from django.test import TestCase
+from django.utils.unittest import skipIf
+
+
+class BackendTest(TestCase):
+
+ backend = 'django.contrib.auth.backends.ModelBackend'
+
+ def setUp(self):
+ self.curr_auth = settings.AUTHENTICATION_BACKENDS
+ settings.AUTHENTICATION_BACKENDS = (self.backend,)
+ User.objects.create_user('test', 'test@example.com', 'test')
+
+ def tearDown(self):
+ settings.AUTHENTICATION_BACKENDS = self.curr_auth
+
+ @skipIf(True, "Do not support")
+ def test_has_perm(self):
+ user = User.objects.get(username='test')
+ self.assertEqual(user.has_perm('auth.test'), False)
+ user.is_staff = True
+ user.save()
+ self.assertEqual(user.has_perm('auth.test'), False)
+ user.is_superuser = True
+ user.save()
+ self.assertEqual(user.has_perm('auth.test'), True)
+ user.is_staff = False
+ user.is_superuser = False
+ user.save()
+ self.assertEqual(user.has_perm('auth.test'), False)
+ user.is_staff = True
+ user.is_superuser = True
+ user.is_active = False
+ user.save()
+ self.assertEqual(user.has_perm('auth.test'), False)
+
+ @skipIf(True, "Do not support")
+ def test_custom_perms(self):
+ user = User.objects.get(username='test')
+ content_type=ContentType.objects.get_for_model(Group)
+ perm = Permission.objects.create(name='test', content_type=content_type, codename='test')
+ user.user_permissions.add(perm)
+ user.save()
+
+ # reloading user to purge the _perm_cache
+ user = User.objects.get(username='test')
+ self.assertEqual(user.get_all_permissions() == set([u'auth.test']), True)
+ self.assertEqual(user.get_group_permissions(), set([]))
+ self.assertEqual(user.has_module_perms('Group'), False)
+ self.assertEqual(user.has_module_perms('auth'), True)
+ perm = Permission.objects.create(name='test2', content_type=content_type, codename='test2')
+ user.user_permissions.add(perm)
+ user.save()
+ perm = Permission.objects.create(name='test3', content_type=content_type, codename='test3')
+ user.user_permissions.add(perm)
+ user.save()
+ user = User.objects.get(username='test')
+ self.assertEqual(user.get_all_permissions(), set([u'auth.test2', u'auth.test', u'auth.test3']))
+ self.assertEqual(user.has_perm('test'), False)
+ self.assertEqual(user.has_perm('auth.test'), True)
+ self.assertEqual(user.has_perms(['auth.test2', 'auth.test3']), True)
+ perm = Permission.objects.create(name='test_group', content_type=content_type, codename='test_group')
+ group = Group.objects.create(name='test_group')
+ group.permissions.add(perm)
+ group.save()
+ user.groups.add(group)
+ user = User.objects.get(username='test')
+ exp = set([u'auth.test2', u'auth.test', u'auth.test3', u'auth.test_group'])
+ self.assertEqual(user.get_all_permissions(), exp)
+ self.assertEqual(user.get_group_permissions(), set([u'auth.test_group']))
+ self.assertEqual(user.has_perms(['auth.test3', 'auth.test_group']), True)
+
+ user = AnonymousUser()
+ self.assertEqual(user.has_perm('test'), False)
+ self.assertEqual(user.has_perms(['auth.test2', 'auth.test3']), False)
+
+ @skipIf(True, "Do not support")
+ def test_has_no_object_perm(self):
+ """Regressiontest for #12462"""
+ user = User.objects.get(username='test')
+ content_type=ContentType.objects.get_for_model(Group)
+ perm = Permission.objects.create(name='test', content_type=content_type, codename='test')
+ user.user_permissions.add(perm)
+ user.save()
+
+ self.assertEqual(user.has_perm('auth.test', 'object'), False)
+ self.assertEqual(user.get_all_permissions('object'), set([]))
+ self.assertEqual(user.has_perm('auth.test'), True)
+ self.assertEqual(user.get_all_permissions(), set(['auth.test']))
+
+
+class TestObj(object):
+ pass
+
+
+class SimpleRowlevelBackend(object):
+ supports_object_permissions = True
+
+ # This class also supports tests for anonymous user permissions,
+ # via subclasses which just set the 'supports_anonymous_user' attribute.
+
+ def has_perm(self, user, perm, obj=None):
+ if not obj:
+ return # We only support row level perms
+
+ if isinstance(obj, TestObj):
+ if user.username == 'test2':
+ return True
+ elif user.is_anonymous() and perm == 'anon':
+ # not reached due to supports_anonymous_user = False
+ return True
+ return False
+
+ def has_module_perms(self, user, app_label):
+ return app_label == "app1"
+
+ def get_all_permissions(self, user, obj=None):
+ if not obj:
+ return [] # We only support row level perms
+
+ if not isinstance(obj, TestObj):
+ return ['none']
+
+ if user.is_anonymous():
+ return ['anon']
+ if user.username == 'test2':
+ return ['simple', 'advanced']
+ else:
+ return ['simple']
+
+ def get_group_permissions(self, user, obj=None):
+ if not obj:
+ return # We only support row level perms
+
+ if not isinstance(obj, TestObj):
+ return ['none']
+
+ if 'test_group' in [group.name for group in user.groups.all()]:
+ return ['group_perm']
+ else:
+ return ['none']
+
+
+class RowlevelBackendTest(TestCase):
+ """
+ Tests for auth backend that supports object level permissions
+ """
+ backend = 'django.contrib.auth.tests.auth_backends.SimpleRowlevelBackend'
+
+ def setUp(self):
+ self.curr_auth = settings.AUTHENTICATION_BACKENDS
+ settings.AUTHENTICATION_BACKENDS = tuple(self.curr_auth) + (self.backend,)
+ self.user1 = User.objects.create_user('test', 'test@example.com', 'test')
+ self.user2 = User.objects.create_user('test2', 'test2@example.com', 'test')
+ self.user3 = User.objects.create_user('test3', 'test3@example.com', 'test')
+ self.save_warnings_state()
+ warnings.filterwarnings('ignore', category=DeprecationWarning,
+ module='django.contrib.auth')
+
+ def tearDown(self):
+ settings.AUTHENTICATION_BACKENDS = self.curr_auth
+ self.restore_warnings_state()
+
+ @skipIf(True, "Do not support")
+ def test_has_perm(self):
+ self.assertEqual(self.user1.has_perm('perm', TestObj()), False)
+ self.assertEqual(self.user2.has_perm('perm', TestObj()), True)
+ self.assertEqual(self.user2.has_perm('perm'), False)
+ self.assertEqual(self.user2.has_perms(['simple', 'advanced'], TestObj()), True)
+ self.assertEqual(self.user3.has_perm('perm', TestObj()), False)
+ self.assertEqual(self.user3.has_perm('anon', TestObj()), False)
+ self.assertEqual(self.user3.has_perms(['simple', 'advanced'], TestObj()), False)
+
+ @skipIf(True, "Do not support")
+ def test_get_all_permissions(self):
+ self.assertEqual(self.user1.get_all_permissions(TestObj()), set(['simple']))
+ self.assertEqual(self.user2.get_all_permissions(TestObj()), set(['simple', 'advanced']))
+ self.assertEqual(self.user2.get_all_permissions(), set([]))
+
+ @skipIf(True, "Do not support")
+ def test_get_group_permissions(self):
+ content_type=ContentType.objects.get_for_model(Group)
+ group = Group.objects.create(name='test_group')
+ self.user3.groups.add(group)
+ self.assertEqual(self.user3.get_group_permissions(TestObj()), set(['group_perm']))
+
+
+class AnonymousUserBackend(SimpleRowlevelBackend):
+
+ supports_anonymous_user = True
+
+
+class NoAnonymousUserBackend(SimpleRowlevelBackend):
+
+ supports_anonymous_user = False
+
+
+class AnonymousUserBackendTest(TestCase):
+ """
+ Tests for AnonymousUser delegating to backend if it has 'supports_anonymous_user' = True
+ """
+
+ backend = 'django.contrib.auth.tests.auth_backends.AnonymousUserBackend'
+
+ def setUp(self):
+ self.curr_auth = settings.AUTHENTICATION_BACKENDS
+ settings.AUTHENTICATION_BACKENDS = (self.backend,)
+ self.user1 = AnonymousUser()
+
+ def tearDown(self):
+ settings.AUTHENTICATION_BACKENDS = self.curr_auth
+
+ @skipIf(True, "Do not support")
+ def test_has_perm(self):
+ self.assertEqual(self.user1.has_perm('perm', TestObj()), False)
+ self.assertEqual(self.user1.has_perm('anon', TestObj()), True)
+
+ @skipIf(True, "Do not support")
+ def test_has_perms(self):
+ self.assertEqual(self.user1.has_perms(['anon'], TestObj()), True)
+ self.assertEqual(self.user1.has_perms(['anon', 'perm'], TestObj()), False)
+
+ def test_has_module_perms(self):
+ self.assertEqual(self.user1.has_module_perms("app1"), True)
+ self.assertEqual(self.user1.has_module_perms("app2"), False)
+
+ @skipIf(True, "Do not support")
+ def test_get_all_permissions(self):
+ self.assertEqual(self.user1.get_all_permissions(TestObj()), set(['anon']))
+
+
+class NoAnonymousUserBackendTest(TestCase):
+ """
+ Tests that AnonymousUser does not delegate to backend if it has 'supports_anonymous_user' = False
+ """
+ backend = 'django.contrib.auth.tests.auth_backends.NoAnonymousUserBackend'
+
+ def setUp(self):
+ self.curr_auth = settings.AUTHENTICATION_BACKENDS
+ settings.AUTHENTICATION_BACKENDS = tuple(self.curr_auth) + (self.backend,)
+ self.user1 = AnonymousUser()
+
+ def tearDown(self):
+ settings.AUTHENTICATION_BACKENDS = self.curr_auth
+
+ @skipIf(True, "Do not support")
+ def test_has_perm(self):
+ self.assertEqual(self.user1.has_perm('perm', TestObj()), False)
+ self.assertEqual(self.user1.has_perm('anon', TestObj()), False)
+
+ def test_has_perms(self):
+ self.assertEqual(self.user1.has_perms(['anon'], TestObj()), False)
+
+ def test_has_module_perms(self):
+ self.assertEqual(self.user1.has_module_perms("app1"), False)
+ self.assertEqual(self.user1.has_module_perms("app2"), False)
+
+ def test_get_all_permissions(self):
+ self.assertEqual(self.user1.get_all_permissions(TestObj()), set())
diff --git a/cli/sdncon/clusterAdmin/tests/forms.py b/cli/sdncon/clusterAdmin/tests/forms.py
new file mode 100755
index 0000000..d70101c
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/tests/forms.py
@@ -0,0 +1,270 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from django.contrib.auth.models import User
+from django.contrib.auth.forms import UserCreationForm, AuthenticationForm, PasswordChangeForm, SetPasswordForm, UserChangeForm, PasswordResetForm
+from django.test import TestCase
+from django.utils.unittest import skipIf
+
+
+class UserCreationFormTest(TestCase):
+
+ fixtures = ['authtestdata.json']
+
+ def test_user_already_exists(self):
+ data = {
+ 'username': 'testclient',
+ 'password1': 'test123',
+ 'password2': 'test123',
+ }
+ form = UserCreationForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form["username"].errors,
+ [u'A user with that username already exists.'])
+
+ def test_invalid_data(self):
+ data = {
+ 'username': 'jsmith!',
+ 'password1': 'test123',
+ 'password2': 'test123',
+ }
+ form = UserCreationForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form["username"].errors,
+ [u'This value may contain only letters, numbers and @/./+/-/_ characters.'])
+
+
+ def test_password_verification(self):
+ # The verification password is incorrect.
+ data = {
+ 'username': 'jsmith',
+ 'password1': 'test123',
+ 'password2': 'test',
+ }
+ form = UserCreationForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form["password2"].errors,
+ [u"The two password fields didn't match."])
+
+
+ def test_both_passwords(self):
+ # One (or both) passwords weren't given
+ data = {'username': 'jsmith'}
+ form = UserCreationForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form['password1'].errors,
+ [u'This field is required.'])
+ self.assertEqual(form['password2'].errors,
+ [u'This field is required.'])
+
+
+ data['password2'] = 'test123'
+ form = UserCreationForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form['password1'].errors,
+ [u'This field is required.'])
+
+ def test_success(self):
+ # The success case.
+
+ data = {
+ 'username': 'jsmith@example.com',
+ 'password1': 'test123',
+ 'password2': 'test123',
+ }
+ form = UserCreationForm(data)
+ self.assertTrue(form.is_valid())
+ u = form.save()
+ self.assertEqual(repr(u), '<User: jsmith@example.com>')
+
+
+class AuthenticationFormTest(TestCase):
+
+ fixtures = ['authtestdata.json']
+
+ def test_invalid_username(self):
+ # The user submits an invalid username.
+
+ data = {
+ 'username': 'jsmith_does_not_exist',
+ 'password': 'test123',
+ }
+ form = AuthenticationForm(None, data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form.non_field_errors(),
+ [u'Please enter a correct username and password. Note that both fields are case-sensitive.'])
+
+ def test_inactive_user(self):
+ # The user is inactive.
+ data = {
+ 'username': 'inactive',
+ 'password': 'password',
+ }
+ form = AuthenticationForm(None, data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form.non_field_errors(),
+ [u'This account is inactive.'])
+
+
+ def test_success(self):
+ # The success case
+ data = {
+ 'username': 'testclient',
+ 'password': 'password',
+ }
+ form = AuthenticationForm(None, data)
+ self.assertTrue(form.is_valid())
+ self.assertEqual(form.non_field_errors(), [])
+
+
+class SetPasswordFormTest(TestCase):
+
+ fixtures = ['authtestdata.json']
+
+ def test_password_verification(self):
+ # The two new passwords do not match.
+ user = User.objects.get(username='testclient')
+ data = {
+ 'new_password1': 'abc123',
+ 'new_password2': 'abc',
+ }
+ form = SetPasswordForm(user, data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form["new_password2"].errors,
+ [u"The two password fields didn't match."])
+
+ def test_success(self):
+ user = User.objects.get(username='testclient')
+ data = {
+ 'new_password1': 'abc123',
+ 'new_password2': 'abc123',
+ }
+ form = SetPasswordForm(user, data)
+ self.assertTrue(form.is_valid())
+
+
+class PasswordChangeFormTest(TestCase):
+
+ fixtures = ['authtestdata.json']
+
+ def test_incorrect_password(self):
+ user = User.objects.get(username='testclient')
+ data = {
+ 'old_password': 'test',
+ 'new_password1': 'abc123',
+ 'new_password2': 'abc123',
+ }
+ form = PasswordChangeForm(user, data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form["old_password"].errors,
+ [u'Your old password was entered incorrectly. Please enter it again.'])
+
+
+ def test_password_verification(self):
+ # The two new passwords do not match.
+ user = User.objects.get(username='testclient')
+ data = {
+ 'old_password': 'password',
+ 'new_password1': 'abc123',
+ 'new_password2': 'abc',
+ }
+ form = PasswordChangeForm(user, data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form["new_password2"].errors,
+ [u"The two password fields didn't match."])
+
+
+ def test_success(self):
+ # The success case.
+ user = User.objects.get(username='testclient')
+ data = {
+ 'old_password': 'password',
+ 'new_password1': 'abc123',
+ 'new_password2': 'abc123',
+ }
+ form = PasswordChangeForm(user, data)
+ self.assertTrue(form.is_valid())
+
+ def test_field_order(self):
+ # Regression test - check the order of fields:
+ user = User.objects.get(username='testclient')
+ self.assertEqual(PasswordChangeForm(user, {}).fields.keys(),
+ ['old_password', 'new_password1', 'new_password2'])
+
+class UserChangeFormTest(TestCase):
+
+ fixtures = ['authtestdata.json']
+
+ @skipIf(True, "Do not support")
+ def test_username_validity(self):
+ user = User.objects.get(username='testclient')
+ data = {'username': 'not valid'}
+ form = UserChangeForm(data, instance=user)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form['username'].errors,
+ [u'This value may contain only letters, numbers and @/./+/-/_ characters.'])
+
+ def test_bug_14242(self):
+ # A regression test, introduce by adding an optimization for the
+ # UserChangeForm.
+
+ class MyUserForm(UserChangeForm):
+ def __init__(self, *args, **kwargs):
+ super(MyUserForm, self).__init__(*args, **kwargs)
+ self.fields['groups'].help_text = 'These groups give users different permissions'
+
+ class Meta(UserChangeForm.Meta):
+ fields = ('groups',)
+
+ # Just check we can create it
+ form = MyUserForm({})
+
+
+class PasswordResetFormTest(TestCase):
+
+ fixtures = ['authtestdata.json']
+
+ def test_invalid_email(self):
+ data = {'email':'not valid'}
+ form = PasswordResetForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form['email'].errors,
+ [u'Enter a valid e-mail address.'])
+
+ def test_nonexistant_email(self):
+ # Test nonexistant email address
+ data = {'email':'foo@bar.com'}
+ form = PasswordResetForm(data)
+ self.assertFalse(form.is_valid())
+ self.assertEqual(form.errors,
+ {'email': [u"That e-mail address doesn't have an associated user account. Are you sure you've registered?"]})
+
+ def test_cleaned_data(self):
+ # Regression test
+ user = User.objects.create_user("jsmith3", "jsmith3@example.com", "test123")
+ data = {'email':'jsmith3@example.com'}
+ form = PasswordResetForm(data)
+ self.assertTrue(form.is_valid())
+ self.assertEqual(form.cleaned_data['email'], u'jsmith3@example.com')
+
+
+ def test_bug_5605(self):
+ # bug #5605, preserve the case of the user name (before the @ in the
+ # email address) when creating a user.
+ user = User.objects.create_user('forms_test2', 'tesT@EXAMple.com', 'test')
+ self.assertEqual(user.email, 'tesT@example.com')
+ user = User.objects.create_user('forms_test3', 'tesT', 'test')
+ self.assertEqual(user.email, 'tesT')
diff --git a/cli/sdncon/clusterAdmin/tests/urls.py b/cli/sdncon/clusterAdmin/tests/urls.py
new file mode 100755
index 0000000..a228549
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/tests/urls.py
@@ -0,0 +1,39 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+from django.conf.urls.defaults import patterns
+from django.contrib.auth.urls import urlpatterns
+from django.contrib.auth.views import password_reset
+from django.contrib.auth.decorators import login_required
+from django.http import HttpResponse
+from django.template import Template, RequestContext
+
+def remote_user_auth_view(request):
+ "Dummy view for remote user tests"
+ t = Template("Username is {{ user }}.")
+ c = RequestContext(request, {})
+ return HttpResponse(t.render(c))
+
+# special urls for auth test cases
+urlpatterns = urlpatterns + patterns('',
+ (r'^logout/custom_query/$', 'django.contrib.auth.views.logout', dict(redirect_field_name='follow')),
+ (r'^logout/next_page/$', 'django.contrib.auth.views.logout', dict(next_page='/somewhere/')),
+ (r'^remote_user/$', remote_user_auth_view),
+ (r'^password_reset_from_email/$', 'django.contrib.auth.views.password_reset', dict(from_email='staffmember@example.com')),
+ (r'^login_required/$', login_required(password_reset)),
+ (r'^login_required_login_url/$', login_required(password_reset, login_url='/somewhere/')),
+)
+
diff --git a/cli/sdncon/clusterAdmin/tests/views.py b/cli/sdncon/clusterAdmin/tests/views.py
new file mode 100755
index 0000000..335941e
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/tests/views.py
@@ -0,0 +1,311 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+import os
+import re
+import urllib
+
+from django.conf import settings
+from django.contrib.auth import SESSION_KEY, REDIRECT_FIELD_NAME
+from django.contrib.auth.forms import AuthenticationForm
+from django.contrib.sites.models import Site, RequestSite
+from django.contrib.auth.models import User
+from django.test import TestCase
+from django.core import mail
+from django.core.urlresolvers import reverse
+from django.utils.unittest import skipIf
+
+class AuthViewsTestCase(TestCase):
+ """
+ Helper base class for all the follow test cases.
+ """
+ fixtures = ['authtestdata.json']
+ urls = 'django.contrib.auth.tests.urls'
+
+ def setUp(self):
+ self.old_LANGUAGES = settings.LANGUAGES
+ self.old_LANGUAGE_CODE = settings.LANGUAGE_CODE
+ settings.LANGUAGES = (('en', 'English'),)
+ settings.LANGUAGE_CODE = 'en'
+ self.old_TEMPLATE_DIRS = settings.TEMPLATE_DIRS
+ settings.TEMPLATE_DIRS = (
+ os.path.join(
+ os.path.dirname(__file__),
+ 'templates'
+ )
+ ,)
+
+ def tearDown(self):
+ settings.LANGUAGES = self.old_LANGUAGES
+ settings.LANGUAGE_CODE = self.old_LANGUAGE_CODE
+ settings.TEMPLATE_DIRS = self.old_TEMPLATE_DIRS
+
+ def login(self, password='password'):
+ response = self.client.post('/login/', {
+ 'username': 'testclient',
+ 'password': password
+ }
+ )
+ self.assertEquals(response.status_code, 302)
+ self.assert_(response['Location'].endswith(settings.LOGIN_REDIRECT_URL))
+ self.assert_(SESSION_KEY in self.client.session)
+
+class PasswordResetTest(AuthViewsTestCase):
+
+ def test_email_not_found(self):
+ "Error is raised if the provided email address isn't currently registered"
+ response = self.client.get('/password_reset/')
+ self.assertEquals(response.status_code, 200)
+ response = self.client.post('/password_reset/', {'email': 'not_a_real_email@email.com'})
+ self.assertContains(response, "That e-mail address doesn't have an associated user account")
+ self.assertEquals(len(mail.outbox), 0)
+
+ @skipIf(True, "Do not support")
+ def test_email_found(self):
+ "Email is sent if a valid email address is provided for password reset"
+ response = self.client.post('/password_reset/', {'email': 'staffmember@example.com'})
+ self.assertEquals(response.status_code, 302)
+ self.assertEquals(len(mail.outbox), 1)
+ self.assert_("http://" in mail.outbox[0].body)
+ self.assertEquals(settings.DEFAULT_FROM_EMAIL, mail.outbox[0].from_email)
+
+ @skipIf(True, "Do not support")
+ def test_email_found_custom_from(self):
+ "Email is sent if a valid email address is provided for password reset when a custom from_email is provided."
+ response = self.client.post('/password_reset_from_email/', {'email': 'staffmember@example.com'})
+ self.assertEquals(response.status_code, 302)
+ self.assertEquals(len(mail.outbox), 1)
+ self.assertEquals("staffmember@example.com", mail.outbox[0].from_email)
+
+ def _test_confirm_start(self):
+ # Start by creating the email
+ response = self.client.post('/password_reset/', {'email': 'staffmember@example.com'})
+ self.assertEquals(response.status_code, 302)
+ self.assertEquals(len(mail.outbox), 1)
+ return self._read_signup_email(mail.outbox[0])
+
+ def _read_signup_email(self, email):
+ urlmatch = re.search(r"https?://[^/]*(/.*reset/\S*)", email.body)
+ self.assert_(urlmatch is not None, "No URL found in sent email")
+ return urlmatch.group(), urlmatch.groups()[0]
+
+ @skipIf(True, "Do not support")
+ def test_confirm_valid(self):
+ url, path = self._test_confirm_start()
+ response = self.client.get(path)
+ # redirect to a 'complete' page:
+ self.assertEquals(response.status_code, 200)
+ self.assert_("Please enter your new password" in response.content)
+
+ @skipIf(True, "Do not support")
+ def test_confirm_invalid(self):
+ url, path = self._test_confirm_start()
+ # Let's munge the token in the path, but keep the same length,
+ # in case the URLconf will reject a different length.
+ path = path[:-5] + ("0"*4) + path[-1]
+
+ response = self.client.get(path)
+ self.assertEquals(response.status_code, 200)
+ self.assert_("The password reset link was invalid" in response.content)
+
+ @skipIf(True, "Do not support")
+ def test_confirm_invalid_user(self):
+ # Ensure that we get a 200 response for a non-existant user, not a 404
+ response = self.client.get('/reset/123456-1-1/')
+ self.assertEquals(response.status_code, 200)
+ self.assert_("The password reset link was invalid" in response.content)
+
+ @skipIf(True, "Do not support")
+ def test_confirm_invalid_post(self):
+ # Same as test_confirm_invalid, but trying
+ # to do a POST instead.
+ url, path = self._test_confirm_start()
+ path = path[:-5] + ("0"*4) + path[-1]
+
+ response = self.client.post(path, {'new_password1': 'anewpassword',
+ 'new_password2':' anewpassword'})
+ # Check the password has not been changed
+ u = User.objects.get(email='staffmember@example.com')
+ self.assert_(not u.check_password("anewpassword"))
+
+ @skipIf(True, "Do not support")
+ def test_confirm_complete(self):
+ url, path = self._test_confirm_start()
+ response = self.client.post(path, {'new_password1': 'anewpassword',
+ 'new_password2': 'anewpassword'})
+ # It redirects us to a 'complete' page:
+ self.assertEquals(response.status_code, 302)
+ # Check the password has been changed
+ u = User.objects.get(email='staffmember@example.com')
+ self.assert_(u.check_password("anewpassword"))
+
+ # Check we can't use the link again
+ response = self.client.get(path)
+ self.assertEquals(response.status_code, 200)
+ self.assert_("The password reset link was invalid" in response.content)
+
+ @skipIf(True, "Do not support")
+ def test_confirm_different_passwords(self):
+ url, path = self._test_confirm_start()
+ response = self.client.post(path, {'new_password1': 'anewpassword',
+ 'new_password2':' x'})
+ self.assertEquals(response.status_code, 200)
+ self.assert_("The two password fields didn't match" in response.content)
+
+class ChangePasswordTest(AuthViewsTestCase):
+
+ def fail_login(self, password='password'):
+ response = self.client.post('/login/', {
+ 'username': 'testclient',
+ 'password': password
+ }
+ )
+ self.assertEquals(response.status_code, 200)
+ self.assert_("Please enter a correct username and password. Note that both fields are case-sensitive." in response.content)
+
+ def logout(self):
+ response = self.client.get('/logout/')
+
+ def test_password_change_fails_with_invalid_old_password(self):
+ self.login()
+ response = self.client.post('/password_change/', {
+ 'old_password': 'donuts',
+ 'new_password1': 'password1',
+ 'new_password2': 'password1',
+ }
+ )
+ self.assertEquals(response.status_code, 200)
+ self.assert_("Your old password was entered incorrectly. Please enter it again." in response.content)
+
+ def test_password_change_fails_with_mismatched_passwords(self):
+ self.login()
+ response = self.client.post('/password_change/', {
+ 'old_password': 'password',
+ 'new_password1': 'password1',
+ 'new_password2': 'donuts',
+ }
+ )
+ self.assertEquals(response.status_code, 200)
+ self.assert_("The two password fields didn't match." in response.content)
+
+ def test_password_change_succeeds(self):
+ self.login()
+ response = self.client.post('/password_change/', {
+ 'old_password': 'password',
+ 'new_password1': 'password1',
+ 'new_password2': 'password1',
+ }
+ )
+ self.assertEquals(response.status_code, 302)
+ self.assert_(response['Location'].endswith('/password_change/done/'))
+ self.fail_login()
+ self.login(password='password1')
+
+class LoginTest(AuthViewsTestCase):
+
+ @skipIf(True, "Do not support")
+ def test_current_site_in_context_after_login(self):
+ response = self.client.get(reverse('django.contrib.auth.views.login'))
+ self.assertEquals(response.status_code, 200)
+ site = Site.objects.get_current()
+ self.assertEquals(response.context['site'], site)
+ self.assertEquals(response.context['site_name'], site.name)
+ self.assert_(isinstance(response.context['form'], AuthenticationForm),
+ 'Login form is not an AuthenticationForm')
+
+ def test_security_check(self, password='password'):
+ login_url = reverse('django.contrib.auth.views.login')
+
+ # Those URLs should not pass the security check
+ for bad_url in ('http://example.com',
+ 'https://example.com',
+ 'ftp://exampel.com',
+ '//example.com'):
+
+ nasty_url = '%(url)s?%(next)s=%(bad_url)s' % {
+ 'url': login_url,
+ 'next': REDIRECT_FIELD_NAME,
+ 'bad_url': urllib.quote(bad_url)
+ }
+ response = self.client.post(nasty_url, {
+ 'username': 'testclient',
+ 'password': password,
+ }
+ )
+ self.assertEquals(response.status_code, 302)
+ self.assertFalse(bad_url in response['Location'], "%s should be blocked" % bad_url)
+
+ # Now, these URLs have an other URL as a GET parameter and therefore
+ # should be allowed
+ for url_ in ('http://example.com', 'https://example.com',
+ 'ftp://exampel.com', '//example.com'):
+ safe_url = '%(url)s?%(next)s=/view/?param=%(safe_param)s' % {
+ 'url': login_url,
+ 'next': REDIRECT_FIELD_NAME,
+ 'safe_param': urllib.quote(url_)
+ }
+ response = self.client.post(safe_url, {
+ 'username': 'testclient',
+ 'password': password,
+ }
+ )
+ self.assertEquals(response.status_code, 302)
+ self.assertTrue('/view/?param=%s' % url_ in response['Location'], "/view/?param=%s should be allowed" % url_)
+
+
+class LogoutTest(AuthViewsTestCase):
+ urls = 'django.contrib.auth.tests.urls'
+
+ def confirm_logged_out(self):
+ self.assert_(SESSION_KEY not in self.client.session)
+
+ def test_logout_default(self):
+ "Logout without next_page option renders the default template"
+ self.login()
+ response = self.client.get('/logout/')
+ self.assertEquals(200, response.status_code)
+ self.assert_('Logged out' in response.content)
+ self.confirm_logged_out()
+
+ def test_14377(self):
+ # Bug 14377
+ self.login()
+ response = self.client.get('/logout/')
+ self.assertTrue('site' in response.context)
+
+ def test_logout_with_next_page_specified(self):
+ "Logout with next_page option given redirects to specified resource"
+ self.login()
+ response = self.client.get('/logout/next_page/')
+ self.assertEqual(response.status_code, 302)
+ self.assert_(response['Location'].endswith('/somewhere/'))
+ self.confirm_logged_out()
+
+ def test_logout_with_redirect_argument(self):
+ "Logout with query string redirects to specified resource"
+ self.login()
+ response = self.client.get('/logout/?next=/login/')
+ self.assertEqual(response.status_code, 302)
+ self.assert_(response['Location'].endswith('/login/'))
+ self.confirm_logged_out()
+
+ def test_logout_with_custom_redirect_argument(self):
+ "Logout with custom query string redirects to specified resource"
+ self.login()
+ response = self.client.get('/logout/custom_query/?follow=/somewhere/')
+ self.assertEqual(response.status_code, 302)
+ self.assert_(response['Location'].endswith('/somewhere/'))
+ self.confirm_logged_out()
diff --git a/cli/sdncon/clusterAdmin/utils.py b/cli/sdncon/clusterAdmin/utils.py
new file mode 100755
index 0000000..0dea8d5
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/utils.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+import os
+import sdncon
+
+BSN_BASE = sdncon.SDN_ROOT
+PERSONALITY = 'vm_personality'
+
+"""
+This is a simple conditional decorator, suggested by Claudiu on Stackoverflow.com
+"""
+def conditionally(decorator, condition):
+ def moddec(f):
+ if not condition:
+ return f
+ return decorator(f)
+ return moddec
+
+def isCloudBuild():
+ pf = os.path.join(BSN_BASE, PERSONALITY)
+ return os.path.exists(pf) and "cloud" in open(pf).read()
+
+def uicontext(request):
+ expiredSession = False
+ debug = False
+ try:
+ hostname = request.META['HTTP_HOST']
+ referer = request.META['HTTP_REFERER'].split('/')
+ if referer[2] == hostname:
+ expiredSession = referer[3] not in ('account', 'logout')
+ except:
+ # Ignore any exceptions, like missing REFERER, HOST, etc.
+ pass
+ return {
+ 'isCloudBuild': isCloudBuild(),
+ 'expiredSession': expiredSession,
+ 'debug': debug,
+ }
+
+def abc(f):
+ def blah(*args, **kwargs):
+ print 'abc'
+ return f(*args, **kwargs)
+ return blah
+
+@conditionally(abc, isCloudBuild())
+def test(): pass
+
+if __name__ == '__main__':
+ test()
+
+
diff --git a/cli/sdncon/clusterAdmin/views.py b/cli/sdncon/clusterAdmin/views.py
new file mode 100755
index 0000000..851a79d
--- /dev/null
+++ b/cli/sdncon/clusterAdmin/views.py
@@ -0,0 +1,125 @@
+#
+# Copyright (c) 2013 Big Switch Networks, Inc.
+#
+# Licensed under the Eclipse Public License, Version 1.0 (the
+# "License"); you may not use this file except in compliance with the
+# License. You may obtain a copy of the License at
+#
+# http://www.eclipse.org/legal/epl-v10.html
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+#
+
+# Create your views here.
+
+from django.contrib.auth.decorators import login_required
+from django.utils import simplejson
+from django.http import HttpResponse
+from sdncon.clusterAdmin.models import CustomerUser
+from sdncon.clusterAdmin.utils import conditionally, isCloudBuild
+from sdncon.rest.views import safe_rest_view
+
+DEFAULT_TENANT = 'default'
+JSON_DATA_TYPE = 'application/json'
+
+@safe_rest_view
+def get_node_personality(request):
+ isCloud = isCloudBuild()
+ response_data = {'cloud' : isCloud, 'controller-node' : not isCloud}
+ response_data = simplejson.dumps(response_data)
+ return HttpResponse(response_data, JSON_DATA_TYPE)
+
+@conditionally(login_required, isCloudBuild())
+@safe_rest_view
+def session_clusterAdmin(request):
+ """
+ This returns the customer and clusters, which the current session is associated with.
+ """
+ customer = {}
+ customer['user'] = request.user.username
+ customer['customer'] = DEFAULT_TENANT
+ customer['cluster'] = []
+ if request.user.is_authenticated():
+ cus = CustomerUser.objects.all()
+ for cu in cus:
+ if cu.user.username == request.user.username:
+ customer['customer'] = cu.customer.customername
+ for cluster in cu.customer.cluster_set.all():
+ customer['cluster'].append(cluster.clustername)
+
+ response_data = simplejson.dumps(customer)
+ return HttpResponse(response_data, JSON_DATA_TYPE)
+
+
+import random
+from django.shortcuts import render_to_response, redirect
+from .models import AuthToken
+
+@conditionally(login_required, isCloudBuild())
+def token_view(request):
+ token=''
+ for t in AuthToken.objects.filter(user=request.user):
+ token=t.id
+ return render_to_response('registration/token_view.html', {'token': token})
+
+################################################################
+
+VALID_TOKEN_CHARS = ['3', '4', '6', '7', '9', 'A', 'C', 'E', 'F', 'G',
+ 'H', 'K', 'M', 'N', 'P', 'R', 'T', 'W', 'X', 'Y']
+TOKEN_LENGTH = 16
+TOKEN_GEN_ATTEMPTS = 16 # Number of times generated token can be in DB before giving up
+
+#
+# Token Specification
+# 16 characters, hyphens every 4 characters
+# ....-....-....-....
+# Key is actually this string, including the -s
+# Characters used must be in the VALID_TOKEN_CHARS array above
+# This gives a token space of 20^16
+#
+
+@conditionally(login_required, isCloudBuild())
+def token_generate(request):
+ """
+ Generate, validate and add an auth token
+ """
+
+ for try_count in range(TOKEN_GEN_ATTEMPTS):
+ token_string = ""
+ for i in range(TOKEN_LENGTH):
+ if i > 0 and i % 4 == 0:
+ token_string += '-'
+ token_string += random.choice(VALID_TOKEN_CHARS)
+ print "Generated token " + token_string
+
+ # See if the token already exists in the DB; race condition here, but should be minimal
+ try:
+ AuthToken.objects.get(id=token_string)
+ print "Token already present in DB, try " + str(try_count)
+ token_string = ""
+ except AuthToken.DoesNotExist: # This is what we want
+ break
+
+ if not token_string:
+ # Should probably raise an exception here
+ print "Failed to generate new auth token"
+ # Return internal server error
+ return
+
+ # Map the token to a customer or cluster
+ # For now, just map one token to the user
+
+ # Delete any existing tokens
+ for t in AuthToken.objects.filter(user=request.user):
+ print 'Deleting token', t
+ t.delete()
+
+ # Add the new token to the DB
+ token = AuthToken(id=token_string, user=request.user)
+ token.save()
+
+ return redirect('sdncon.clusterAdmin.views.token_view')