Package turbogears :: Package identity :: Module saprovider

Source Code for Module turbogears.identity.saprovider

  1  from sqlalchemy.orm import class_mapper 
  2   
  3  from turbogears import config, identity 
  4  from turbogears.database import session 
  5  from turbogears.util import load_class 
  6   
  7  import logging 
  8  log = logging.getLogger("turbogears.identity.saprovider") 
  9   
 10  try: 
 11      set, frozenset 
 12  except NameError: # Python 2.3 
 13      from sets import Set as set, ImmutableSet as frozenset 
 14   
 15   
 16  # Global class references -- 
 17  # these will be set when the provider is initialised. 
 18  user_class = None 
 19  group_class = None 
 20  permission_class = None 
 21  visit_class = None 
 22   
 23   
24 -class SqlAlchemyIdentity(object):
25 """Identity that uses a model from a database (via SQLAlchemy).""" 26
27 - def __init__(self, visit_key=None, user=None):
28 self.visit_key = visit_key 29 if user: 30 self._user = user 31 if visit_key is not None: 32 self.login()
33
34 - def _get_user(self):
35 """Get user instance for this identity.""" 36 try: 37 return self._user 38 except AttributeError: 39 # User hasn't already been set 40 pass 41 # Attempt to load the user. After this code executes, there *will* be 42 # a _user attribute, even if the value is None. 43 visit = self.visit_link 44 self._user = visit and user_class.query.get(visit.user_id) 45 return self._user
46 user = property(_get_user) 47
48 - def _get_user_name(self):
49 """Get user name of this identity.""" 50 if not self.user: 51 return None 52 return self.user.user_name
53 user_name = property(_get_user_name) 54
55 - def _get_user_id(self):
56 """Get user id of this identity.""" 57 if not self.user: 58 return None 59 return self.user.user_id
60 user_id = property(_get_user_id) 61
62 - def _get_anonymous(self):
63 """Return true if not logged in.""" 64 return not self.user
65 anonymous = property(_get_anonymous) 66
67 - def _get_permissions(self):
68 """Get set of permission names of this identity.""" 69 try: 70 return self._permissions 71 except AttributeError: 72 # Permissions haven't been computed yet 73 pass 74 if not self.user: 75 self._permissions = frozenset() 76 else: 77 self._permissions = frozenset( 78 [p.permission_name for p in self.user.permissions]) 79 return self._permissions
80 permissions = property(_get_permissions) 81
82 - def _get_groups(self):
83 """Get set of group names of this identity.""" 84 try: 85 return self._groups 86 except AttributeError: 87 # Groups haven't been computed yet 88 pass 89 if not self.user: 90 self._groups = frozenset() 91 else: 92 self._groups = frozenset([g.group_name for g in self.user.groups]) 93 return self._groups
94 groups = property(_get_groups) 95
96 - def _get_group_ids(self):
97 """Get set of group IDs of this identity.""" 98 try: 99 return self._group_ids 100 except AttributeError: 101 # Groups haven't been computed yet 102 pass 103 if not self.user: 104 self._group_ids = frozenset() 105 else: 106 self._group_ids = frozenset([g.group_id for g in self.user.groups]) 107 return self._group_ids
108 group_ids = property(_get_group_ids) 109 115 visit_link = property(_get_visit_link) 116
117 - def _get_login_url(self):
118 """Get the URL for the login page.""" 119 return identity.get_failure_url()
120 login_url = property(_get_login_url) 121
122 - def login(self):
123 """Set the link between this identity and the visit.""" 124 visit = self.visit_link 125 if visit: 126 visit.user_id = self._user.user_id 127 else: 128 visit = visit_class() 129 visit.visit_key = self.visit_key 130 visit.user_id = self._user.user_id 131 session.flush()
132
133 - def logout(self):
134 """Remove the link between this identity and the visit.""" 135 visit = self.visit_link 136 if visit: 137 session.delete(visit) 138 session.flush() 139 # Clear the current identity 140 identity.set_current_identity(SqlAlchemyIdentity())
141 142
143 -class SqlAlchemyIdentityProvider(object):
144 """IdentityProvider that uses a model from a database (via SQLAlchemy).""" 145
146 - def __init__(self):
147 super(SqlAlchemyIdentityProvider, self).__init__() 148 get = config.get 149 150 global user_class, group_class, permission_class, visit_class 151 152 user_class_path = get("identity.saprovider.model.user", None) 153 user_class = load_class(user_class_path) 154 group_class_path = get("identity.saprovider.model.group", None) 155 group_class = load_class(group_class_path) 156 permission_class_path = get( 157 "identity.saprovider.model.permission", None) 158 permission_class = load_class(permission_class_path) 159 visit_class_path = get("identity.saprovider.model.visit", None) 160 log.info("Loading: %s", visit_class_path) 161 visit_class = load_class(visit_class_path) 162 # Default encryption algorithm is to use plain text passwords 163 algorithm = get("identity.saprovider.encryption_algorithm", None) 164 self.encrypt_password = lambda pw: \ 165 identity._encrypt_password(algorithm, pw)
166
167 - def create_provider_model(self):
168 """Create the database tables if they don't already exist.""" 169 class_mapper(user_class).local_table.create(checkfirst=True) 170 class_mapper(group_class).local_table.create(checkfirst=True) 171 class_mapper(permission_class).local_table.create(checkfirst=True) 172 class_mapper(visit_class).local_table.create(checkfirst=True)
173
174 - def validate_identity(self, user_name, password, visit_key):
175 """Validate the identity represented by user_name using the password. 176 177 Must return either None if the credentials weren't valid or an object 178 with the following properties: 179 user_name: original user name 180 user: a provider dependant object (TG_User or similar) 181 groups: a set of group names 182 permissions: a set of permission names 183 184 """ 185 user = user_class.query.filter_by(user_name=user_name).first() 186 if not user: 187 log.warning("No such user: %s", user_name) 188 return None 189 if not self.validate_password(user, user_name, password): 190 log.info("Passwords don't match for user: %s", user_name) 191 return None 192 log.info("Associating user (%s) with visit (%s)", 193 user_name, visit_key) 194 return SqlAlchemyIdentity(visit_key, user)
195
196 - def validate_password(self, user, user_name, password):
197 """Check the user_name and password against existing credentials. 198 199 Note: user_name is not used here, but is required by external 200 password validation schemes that might override this method. 201 If you use SqlAlchemyIdentityProvider, but want to check the passwords 202 against an external source (i.e. PAM, LDAP, Windows domain, etc), 203 subclass SqlAlchemyIdentityProvider, and override this method. 204 205 """ 206 return user.password == self.encrypt_password(password)
207
208 - def load_identity(self, visit_key):
209 """Lookup the principal represented by user_name. 210 211 Return None if there is no principal for the given user ID. 212 213 Must return an object with the following properties: 214 user_name: original user name 215 user: a provider dependant object (TG_User or similar) 216 groups: a set of group names 217 permissions: a set of permission names 218 219 """ 220 return SqlAlchemyIdentity(visit_key)
221
222 - def anonymous_identity(self):
223 """Return anonymous identity. 224 225 Must return an object with the following properties: 226 user_name: original user name 227 user: a provider dependant object (TG_User or similar) 228 groups: a set of group names 229 permissions: a set of permission names 230 231 """ 232 return SqlAlchemyIdentity()
233
234 - def authenticated_identity(self, user):
235 """Construct Identity object for users with no visit_key.""" 236 return SqlAlchemyIdentity(user=user)
237