Package turbogears :: Module testutil

Source Code for Module turbogears.testutil

  1  import Cookie 
  2  import cStringIO as StringIO 
  3  import os 
  4  import types 
  5  import logging 
  6  import string 
  7  import unittest 
  8   
  9  import cherrypy 
 10  from cherrypy._cphttptools import Request, Response 
 11   
 12  from webtest import TestApp 
 13   
 14  try: 
 15      import sqlobject 
 16      from sqlobject.inheritance import InheritableSQLObject 
 17  except ImportError: 
 18      sqlobject = None 
 19  try: 
 20      import sqlalchemy 
 21  except ImportError: 
 22      sqlalchemy = None 
 23   
 24  from turbogears import (config, controllers, database, startup, update_config, 
 25      validators) 
 26  from turbogears.identity import current_provider 
 27  from turbogears.util import get_model, deprecated 
 28   
 29   
 30  # For clean tests, remove all compiled Kid templates 
 31  for w in os.walk('.'): 
 32      if not os.sep + '.' in w[0]: 
 33          for f in w[2]: 
 34              if f.endswith('.kid'): 
 35                  f = os.path.join(w[0], f[:-3] + 'pyc') 
 36                  if os.path.exists(f): 
 37                      os.remove(f) 
 38   
 39  # Load test configuration 
 40  if os.path.exists('test.cfg'): 
 41      # Look for a 'config' package 
 42      for dirpath, dirs, dummy2 in os.walk('.'): 
 43          basename = os.path.basename(dirpath) 
 44          dirname = os.path.basename(os.path.dirname(dirpath)) 
 45          init_py = os.path.join(dirpath, '__init__.py') 
 46          if basename == 'config' and dirname[0] in string.ascii_letters + '_' \ 
 47                  and os.path.exists(init_py): 
 48              modulename = "%s.app" % dirpath[2:].replace(os.sep, ".") 
 49              break 
 50      else: 
 51          modulename = None 
 52      # XXX This is a temporary workaround, the code above to find the config 
 53      # package should really be improved and moved elsewhere. 
 54      # See http://trac.turbogears.org/ticket/2043 
 55      try: 
 56          update_config(configfile="test.cfg", modulename=modulename) 
 57      except ImportError, exc: 
 58          import warnings 
 59          warnings.warn("Could not import configuration from module: %s" % exc, 
 60              RuntimeWarning) 
 61          update_config(configfile="test.cfg", modulename=None) 
 62  else: 
 63      database.set_db_uri("sqlite:///:memory:") 
 64   
 65  config.update({'global': 
 66          {'autoreload.on': False, 'tg.new_style_logging': True}}) 
67 68 69 # main testutil inferface functions for setting up & mounting a test app 70 # and starting/stopping it. 71 72 -def mount(controller, path="/"):
73 """Mount a controller at a path. Returns a wsgi application.""" 74 if path == '/': 75 cherrypy.root = controller 76 else: 77 cherrypy.tree.mount(controller, path) 78 return make_wsgiapp()
79
80 -def unmount():
81 """Remove an application from the object traversal tree.""" 82 # There's no clean way to remove a subtree under CP2, so the only use case 83 # handled here is to remove the entire application. 84 # Supposedly, you can do a partial unmount with CP3 using: 85 # del cherrypy.tree.apps[path] 86 cherrypy.root = None 87 cherrypy.tree.mount_points = {}
88
89 -def make_wsgiapp():
90 """Return a WSGI application from CherryPy's root object.""" 91 wsgiapp = cherrypy._cpwsgi.wsgiApp 92 return wsgiapp
93
94 -def make_app(controller=None):
95 """Return a WebTest.TestApp instance from CherryPy. 96 97 If a Controller object is provided, it will be mounted at the root level. 98 If not, it'll look for an already mounted root. 99 100 """ 101 if controller: 102 wsgiapp = mount(controller(), '/') 103 else: 104 wsgiapp = make_wsgiapp() 105 return TestApp(wsgiapp)
106
107 -def start_server():
108 """Start the server if it's not already started.""" 109 if not config.get("cp_started"): 110 cherrypy.server.start(server_class=None, init_only=True) 111 config.update({"cp_started" : True}) 112 113 if not config.get("server_started"): 114 startup.startTurboGears() 115 config.update({"server_started" : True})
116
117 -def stop_server(tg_only=False):
118 """Stop the server and unmount the application. 119 120 Use tg_only=True to leave CherryPy running (for faster tests). 121 122 """ 123 unmount() 124 if config.get("cp_started") and not tg_only: 125 cherrypy.server.stop() 126 config.update({"cp_started" : False}) 127 128 if config.get("server_started"): 129 startup.stopTurboGears() 130 config.update({"server_started" : False})
131 132 # misc test utility classes & functions 133 134 _currentcat = None
135 136 -class MemoryListHandler(logging.Handler):
137
138 - def __init__(self):
139 logging.Handler.__init__(self, level=logging.DEBUG) 140 self.log = []
141
142 - def emit(self, record):
143 print "Got record: %s" % record 144 print "formatted as: %s" % self.format(record) 145 self.log.append(self.format(record))
146
147 - def print_log(self):
148 print "\n".join(self.log) 149 self.log = []
150
151 - def get_log(self):
152 log = self.log 153 self.log = [] 154 return log
155 156 _memhandler = MemoryListHandler()
157 158 -def catch_validation_errors(widget, value):
159 """Catch and unpack validation errors (for testing purposes).""" 160 try: 161 value = widget.validate(value) 162 except validators.Invalid, errors: 163 try: 164 errors = errors.unpack_errors() 165 except AttributeError: 166 pass 167 else: 168 errors = {} 169 return value, errors
170
171 -def capture_log(category):
172 """Capture log for one category. 173 174 The category can either be a single category (a string like 'foo.bar') 175 or a list of them. You *must* call print_log() to reset when you're done. 176 177 """ 178 global _currentcat 179 assert not _currentcat, "_currentcat not cleared. Use get_log to reset." 180 if not isinstance(category, list) and not isinstance(category, tuple): 181 category = [category] 182 _currentcat = category 183 for cat in category: 184 log = logging.getLogger(cat) 185 log.setLevel(logging.DEBUG) 186 log.addHandler(_memhandler)
187
188 -def _reset_logging():
189 """Manage the resetting of the loggers.""" 190 global _currentcat 191 if not _currentcat: 192 return 193 for cat in _currentcat: 194 log = logging.getLogger(cat) 195 log.removeHandler(_memhandler) 196 _currentcat = None
197 206
207 -def get_log():
208 """Return the list of log messages captured by capture_log. 209 210 Resets that log and resets the temporarily added handlers. 211 212 """ 213 _reset_logging() 214 return _memhandler.get_log()
215
216 -def sqlalchemy_cleanup():
217 database.metadata.clear() 218 try: 219 database.metadata.dispose() 220 except AttributeError: # not threadlocal 221 if database.metadata.bind: 222 database.metadata.bind.dispose() 223 database._engine = None 224 sqlalchemy.orm.clear_mappers()
225
226 227 # Base classes for unit test cases 228 229 -class TGTest(unittest.TestCase):
230 """A WebTest enabled unit testing class. 231 232 To use, subclass & set root to your controller object, or set app to a 233 webtest.TestApp instance. 234 235 In your tests, use self.app to make WebTest calls. 236 237 """ 238 root = None 239 app = None 240 stop_tg_only = False 241 config = None 242
243 - def setUp(self):
244 """Set up the WebTest by starting the server. 245 246 You should override this and make sure you have properly 247 mounted a root for your server before calling super, 248 or simply pass a root controller to super. 249 Otherwise the CherryPy filters for TurboGears will not be used. 250 251 """ 252 assert self.root or self.app, "Either self.root or self.app must be set" 253 if not self.app: 254 self.app = make_app(self.root) 255 if not self.config: 256 self.config = config.copy() 257 start_server()
258
259 - def tearDown(self):
260 """Tear down the WebTest by stopping the server.""" 261 stop_server(tg_only=self.stop_tg_only) 262 config.update(self.config)
263 264
265 - def login_user(self, user):
266 """Log a specified user object into the system.""" 267 self.app.post(config.get('identity.failure_url'), dict( 268 user_name=user.user_name, password=user.password, login='Login'))
269
270 271 -class BrowsingSession(object):
272
273 - def __init__(self):
274 self.visit = None 275 self.response, self.status = None, None 276 self.cookie = Cookie.SimpleCookie() 277 self.app = make_app()
278
279 - def goto(self, *args, **kwargs):
280 if self.cookie: 281 headers = kwargs.setdefault('headers', {}) 282 headers['Cookie'] = self.cookie_encoded 283 response = self.app.get(*args, **kwargs) 284 285 # If we were given an encoding in the content type we should use it to 286 # decode the response: 287 ctype_parts = response.headers['Content-Type'].split(';') 288 for parameter in ctype_parts[1:]: 289 attribute, value = parameter.strip().split('=') 290 try: 291 self.unicode_response = response.body.decode(value) 292 break 293 except: 294 # If the named encoding doesn't work then it doesn't work. We 295 # just won't create the unicode_response field. 296 pass 297 298 self.response = response.body 299 self.full_response = response 300 self.status = response.status 301 self.cookie = response.cookies_set 302 self.cookie_encoded = response.headers.get('Set-Cookie', '')
303
304 305 -class DummySession:
306 """A very simple dummy session.""" 307 308 session_storage = dict 309 to_be_loaded = None
310
311 312 -class DummyRequest:
313 """A very simple dummy request.""" 314 315 remote_host = "127.0.0.1" 316
317 - def __init__(self, method='GET', path='/', headers=None):
318 self.headers = headers or {} 319 self.method = method 320 self.path = path 321 self.base = '' 322 self._session = DummySession() 323 self.tg_template_enginename = config.get('tg.defaultview', 'kid')
324
325 - def purge__(self):
326 pass
327
328 329 -class DummyResponse:
330 """A very simple dummy response.""" 331 332 headers = {}
333
334 335 -class AbstractDBTest(unittest.TestCase):
336 """A database enabled unit testing class. 337 338 Creates and destroys your database before and after each unit test. 339 You must set the model attribute in order for this class to 340 function correctly. 341 342 """ 343 model = None 344
345 - def setUp(self):
346 raise NotImplementedError()
347
348 - def tearDown(self):
349 raise NotImplementedError()
350
351 352 -class DBTestSO(AbstractDBTest):
353
354 - def _get_soClasses(self):
355 try: 356 return [self.model.__dict__[x] for x in self.model.soClasses] 357 except AttributeError: 358 return self.model.__dict__.values()
359
360 - def setUp(self):
361 if not self.model: 362 self.model = get_model() 363 if not self.model: 364 raise Exception("Unable to run database tests without a model") 365 366 # list of constraints we will collect 367 constraints = list() 368 369 for item in self._get_soClasses(): 370 if isinstance(item, types.TypeType) and issubclass(item, 371 sqlobject.SQLObject) and item != sqlobject.SQLObject \ 372 and item != InheritableSQLObject: 373 # create table without applying constraints, collect 374 # all the constraints for later creation. 375 # see http://sqlobject.org/FAQ.html#mutually-referencing-tables 376 # for more info 377 collected_constraints = item.createTable(ifNotExists=True, 378 applyConstraints=False) 379 380 if collected_constraints: 381 constraints.extend(collected_constraints) 382 383 # now that all tables are created, add the constraints we collected 384 for postponed_constraint in constraints: 385 # item is the last processed item and we borrow its connection 386 item._connection.query(postponed_constraint)
387
388 - def tearDown(self):
389 database.rollback_all() 390 for item in reversed(self._get_soClasses()): 391 if isinstance(item, types.TypeType) and issubclass(item, 392 sqlobject.SQLObject) and item != sqlobject.SQLObject \ 393 and item != InheritableSQLObject: 394 item.dropTable(ifExists=True, cascade=True)
395
396 397 -class DBTestSA(AbstractDBTest):
398
399 - def setUp(self):
400 database.get_engine() 401 database.metadata.create_all()
402
403 - def tearDown(self):
404 database.metadata.drop_all()
405 406 407 # Determine which class to use for "DBTest". Setup & teardown should behave 408 # simularly regardless of which ORM you choose. 409 if config.get("sqlobject.dburi"): 410 DBTest = DBTestSO 411 elif config.get("sqlalchemy.dburi"): 412 DBTest = DBTestSA 413 else: 414 raise Exception("Unable to find sqlalchemy or sqlobject dburi") 415 416 417 # deprecated functions kept for backward compability 418 419 start_cp = deprecated('start_cp is superceded by start_server')(start_server) 420 421 reset_cp = deprecated('reset_cp has been superceded by unmount.')(unmount) 422 423 test_user = None
424 425 @deprecated() 426 -def set_identity_user(user):
427 """Setup a user for configuring request's identity.""" 428 global test_user 429 test_user = user
430
431 @deprecated() 432 -def attach_identity(req):
433 if config.get("identity.on", False): 434 req.identity = (test_user 435 and current_provider.authenticated_identity(test_user) 436 or current_provider.anonymous_identity())
437
438 @deprecated("create_request is deprecated. See TestMigration on the TG Wiki") 439 -def create_request(request, method="GET", protocol="HTTP/1.1", 440 headers={}, rfile=None, clientAddress="127.0.0.1", 441 remoteHost="localhost", scheme="http"):
442 start_server() 443 if not rfile: 444 rfile = StringIO.StringIO("") 445 if type(headers) != dict: 446 headerList = headers 447 else: 448 headerList = [(key, value) for key, value in headers.items()] 449 headerList.append(("Host", "localhost")) 450 if not hasattr(cherrypy.root, "started"): 451 startup.startTurboGears() 452 cherrypy.root.started = True 453 req = Request(clientAddress, 80, remoteHost, scheme) 454 cherrypy.serving.request = req 455 attach_identity(req) 456 cherrypy.serving.response = Response() 457 req.run(" ".join((method, request, protocol)), headerList, rfile) 458 return cherrypy.serving.response
459 460 # Use of createRequest will also emit deprecation warnings. 461 createRequest = create_request
462 463 -def _return_directly(output, *args):
464 return output
465
466 @deprecated("Please see the TestMigration page in the TG wiki.") 467 -def call(method, *args, **kw):
468 start_server() 469 output, response = call_with_request(method, DummyRequest(), *args, **kw) 470 return output
471
472 @deprecated("Please see the TestMigration page in the TG wiki.") 473 -def call_with_request(method, request, *args, **kw):
474 """More fine-grained version of call method. 475 476 This allows using request/response. 477 478 """ 479 orig_proc_output = controllers._process_output 480 controllers._process_output = _return_directly 481 cherrypy.serving.response = Response() 482 cherrypy.serving.request = request 483 if not hasattr(request, "identity"): 484 attach_identity(request) 485 output = None 486 try: 487 output = method(*args, **kw) 488 finally: 489 del cherrypy.serving.request 490 controllers._process_output = orig_proc_output 491 response = cherrypy.serving.response 492 return output, response
493 494 # Public API. We don't expose deprecated functions 495 __all__ = [ 496 "BrowsingSession", 497 "DBTest", 498 "DBTestSA", 499 "DBTestSO", 500 "DummyRequest" 501 "DumyResponse", 502 "DummySession", 503 "TGTest", 504 "capture_log", 505 "make_app", 506 "make_wsgiapp", 507 "mount", 508 "print_log", 509 "get_log", 510 "sqlalchemy_cleanup", 511 "start_server", 512 "stop_server", 513 "unmount" 514 ] 515