Package turbogears :: Module testutil

Source Code for Module turbogears.testutil

  1  """TurboGears Test Utilities.""" 
  2   
  3  __all__ = ['BrowsingSession', 'DBTest', 'DBTestSA', 'DBTestSO', 
  4      'DummySession', 'TGTest', 
  5      'capture_log', 'make_app', 'make_wsgiapp', 'mount', 
  6      'print_log', 'get_log', 'sqlalchemy_cleanup', 
  7      'start_server', 'stop_server', 'unmount'] 
  8   
  9  import os 
 10  import types 
 11  import logging 
 12  import string 
 13  import unittest 
 14   
 15  import cherrypy 
 16  from cherrypy.process.wspbus import states 
 17   
 18  from webtest import TestApp 
 19   
 20  try: 
 21      import sqlobject 
 22      from sqlobject.inheritance import InheritableSQLObject 
 23  except ImportError: 
 24      sqlobject = None 
 25  try: 
 26      import sqlalchemy 
 27  except ImportError: 
 28      sqlalchemy = None 
 29   
 30  from turbogears import config, database, startup, update_config, validators 
 31  from turbogears.util import get_model 
 32   
 33   
 34  # Main testutil interface functions for setting up & mounting a test app 
 35  # and starting/stopping it: 
 36   
37 -def unmount():
38 """Remove an application from the object traversal tree.""" 39 for app in cherrypy.tree.apps.keys(): 40 del cherrypy.tree.apps[app]
41 42
43 -def mount(controller, path='/'):
44 """Mount a controller at a path. Returns a WSGI application.""" 45 startup.config_static() 46 if path == '/': 47 startup.config_root() 48 config.update({'environment': 'test_suite', 'log.screen': False}) 49 cherrypy.tree.mount(controller, path, config=config.app) 50 return make_wsgiapp()
51 52
53 -def make_wsgiapp():
54 """Return a WSGI application from CherryPy.""" 55 return cherrypy.tree
56 57
58 -def make_app(controller=None):
59 """Return a WebTest.TestApp instance from CherryPy. 60 61 If a Controller object is provided, it will be mounted at the root level. 62 If not, it'll look for an already mounted root. 63 64 """ 65 if controller: 66 wsgiapp = mount(controller(), '/') 67 else: 68 wsgiapp = make_wsgiapp() 69 return TestApp(wsgiapp)
70 71
72 -def start_server(tg_only=True):
73 """Start the server if it's not already started. 74 75 Use tg_only=False to run the CherryPy engine as well. 76 77 """ 78 if not tg_only and not config.get('cp_started'): 79 cherrypy.engine.start() 80 config.update({'cp_started': True}) 81 82 if not config.get('server_started'): 83 startup.start_turbogears() 84 config.update({'server_started': True})
85 86
87 -def stop_server(tg_only=False):
88 """Stop the server and unmount the application. 89 90 Use tg_only=True to leave CherryPy running (for faster tests). 91 92 """ 93 if config.get('server_started'): 94 startup.stop_turbogears() 95 config.update({'server_started': False}) 96 97 unmount() 98 99 if not tg_only and config.get('cp_started'): 100 if cherrypy.engine.state != states.STOPPED: 101 cherrypy.engine.exit() 102 config.update({'cp_started': False})
103 104 105 # Miscellaneous test utility classes & functions: 106 107 _currentcat = None 108
109 -class MemoryListHandler(logging.Handler):
110
111 - def __init__(self):
112 logging.Handler.__init__(self, level=logging.DEBUG) 113 self.log = []
114
115 - def emit(self, record):
116 print "Got record: %s" % record 117 print "formatted as: %s" % self.format(record) 118 self.log.append(self.format(record))
119
120 - def print_log(self):
121 print '\n'.join(self.log) 122 self.log = []
123
124 - def get_log(self):
125 log = self.log 126 self.log = [] 127 return log
128 129 130 _memhandler = MemoryListHandler() 131
132 -def catch_validation_errors(widget, value):
133 """Catch and unpack validation errors (for testing purposes).""" 134 try: 135 value = widget.validate(value) 136 except validators.Invalid, errors: 137 try: 138 errors = errors.unpack_errors() 139 except AttributeError: 140 pass 141 else: 142 errors = {} 143 return value, errors
144 145
146 -def capture_log(category):
147 """Capture log for one category. 148 149 The category can either be a single category (a string like 'foo.bar') 150 or a list of them. You *must* call print_log() to reset when you're done. 151 152 """ 153 global _currentcat 154 assert not _currentcat, "_currentcat not cleared. Use get_log to reset." 155 if not isinstance(category, list) and not isinstance(category, tuple): 156 category = [category] 157 _currentcat = category 158 for cat in category: 159 log = logging.getLogger(cat) 160 log.setLevel(logging.DEBUG) 161 log.addHandler(_memhandler)
162 163
164 -def _reset_logging():
165 """Manage the resetting of the loggers.""" 166 global _currentcat 167 if not _currentcat: 168 return 169 for cat in _currentcat: 170 log = logging.getLogger(cat) 171 log.removeHandler(_memhandler) 172 _currentcat = None
173 174 183 184
185 -def get_log():
186 """Return the list of log messages captured by capture_log. 187 188 Resets that log and resets the temporarily added handlers. 189 190 """ 191 _reset_logging() 192 return _memhandler.get_log()
193 194
195 -def sqlalchemy_cleanup():
196 database.metadata.clear() 197 try: 198 database.metadata.dispose() 199 except AttributeError: # not threadlocal 200 if database.metadata.bind: 201 database.metadata.bind.dispose() 202 database._engine = None 203 sqlalchemy.orm.clear_mappers()
204 205 206 # Base classes for unit test cases 207
208 -class TGTest(unittest.TestCase):
209 """A WebTest enabled unit testing class. 210 211 To use, subclass and set root to your controller object, or set app to a 212 webtest.TestApp instance. 213 214 In your tests, use self.app to make WebTest calls. 215 216 """ 217 218 root = None 219 app = None 220 stop_tg_only = False 221 config = None 222
223 - def setUp(self):
224 """Set up the WebTest by starting the server. 225 226 You should override this and make sure you have properly 227 mounted a root for your server before calling super, 228 or simply pass a root controller to super. 229 Otherwise the CherryPy hooks for TurboGears will not be used. 230 231 """ 232 assert self.root or self.app, "Either self.root or self.app must be set" 233 if not self.app: 234 self.app = make_app(self.root) 235 if not self.config: 236 self.config = config.copy() 237 start_server()
238
239 - def tearDown(self):
240 """Tear down the WebTest by stopping the server.""" 241 stop_server(tg_only=self.stop_tg_only) 242 config.update(self.config)
243 244
245 - def login_user(self, user):
246 """Log a specified user object into the system.""" 247 self.app.post(config.get('identity.failure_url'), dict( 248 user_name=user.user_name, password=user.password, login='Login'))
249 250
251 -class BrowsingSession(object):
252
253 - def __init__(self):
254 self.visit = None 255 self.response, self.status = None, None 256 self.cookie = {} 257 self.app = make_app()
258
259 - def goto(self, path, headers=None, **kwargs):
260 if headers is None: 261 headers = {} 262 if self.cookie: 263 headers['Cookie'] = self.cookie_encoded 264 response = self.app.get(path, headers=headers, **kwargs) 265 266 # If we were given an encoding in the content type we should use it to 267 # decode the response: 268 ctype_parts = response.headers['Content-Type'].split(';') 269 for parameter in ctype_parts[1:]: 270 value = parameter.strip().split('=', 1)[-1] 271 try: 272 self.unicode_response = response.body.decode(value) 273 break 274 except: 275 # If the named encoding doesn't work then it doesn't work. We 276 # just won't create the unicode_response field. 277 pass 278 279 self.response = response.body 280 self.full_response = response 281 self.status = response.status 282 self.cookie = response.cookies_set 283 self.cookie_encoded = response.headers.get('Set-Cookie', '')
284 285
286 -class DummySession:
287 """A very simple dummy session.""" 288 289 session_storage = dict 290 to_be_loaded = None
291 292
293 -class AbstractDBTest(unittest.TestCase):
294 """A database enabled unit testing class. 295 296 Creates and destroys your database before and after each unit test. 297 You must set the model attribute in order for this class to 298 function correctly. 299 300 """ 301 model = None 302
303 - def setUp(self):
304 raise NotImplementedError()
305
306 - def tearDown(self):
307 raise NotImplementedError()
308 309
310 -class DBTestSO(AbstractDBTest):
311
312 - def _get_soClasses(self):
313 try: 314 return [self.model.__dict__[x] for x in self.model.soClasses] 315 except AttributeError: 316 return self.model.__dict__.values()
317
318 - def setUp(self):
319 if not self.model: 320 self.model = get_model() 321 if not self.model: 322 raise Exception("Unable to run database tests without a model") 323 324 # list of constraints we will collect 325 constraints = list() 326 327 for item in self._get_soClasses(): 328 if (isinstance(item, types.TypeType) 329 and issubclass(item, sqlobject.SQLObject) 330 and item is not sqlobject.SQLObject 331 and item is not InheritableSQLObject): 332 # create table without applying constraints, collect 333 # all the constraints for later creation. 334 # see http://sqlobject.org/FAQ.html#mutually-referencing-tables 335 # for more info 336 collected_constraints = item.createTable(ifNotExists=True, 337 applyConstraints=False) 338 339 if collected_constraints: 340 constraints.extend(collected_constraints) 341 342 # now that all tables are created, add the constraints we collected 343 for postponed_constraint in constraints: 344 # item is the last processed item and we borrow its connection 345 item._connection.query(postponed_constraint)
346
347 - def tearDown(self):
348 database.rollback_all() 349 for item in reversed(self._get_soClasses()): 350 if (isinstance(item, types.TypeType) 351 and issubclass(item, sqlobject.SQLObject) 352 and item is not sqlobject.SQLObject 353 and item is not InheritableSQLObject): 354 item.dropTable(ifExists=True, cascade=True)
355 356
357 -class DBTestSA(AbstractDBTest):
358
359 - def setUp(self):
360 database.get_engine() 361 database.metadata.create_all()
362
363 - def tearDown(self):
364 database.metadata.drop_all()
365 366 367 # Set up the test environment (will be run when the module is imported): 368 369 # For clean tests, remove all compiled Kid templates 370 for w in os.walk('.'): 371 if not os.sep + '.' in w[0]: 372 for f in w[2]: 373 if f.endswith('.kid'): 374 f = os.path.join(w[0], f[:-3] + 'pyc') 375 if os.path.exists(f): 376 os.remove(f) 377 378 # Load test configuration 379 if os.path.exists('test.cfg'): 380 # Look for a 'config' package 381 for dirpath, dirs, dummy2 in os.walk('.'): 382 basename = os.path.basename(dirpath) 383 dirname = os.path.basename(os.path.dirname(dirpath)) 384 init_py = os.path.join(dirpath, '__init__.py') 385 if (basename == 'config' and dirname[0] in string.ascii_letters + '_' 386 and os.path.exists(init_py)): 387 modulename = '%s.app' % dirpath[2:].replace(os.sep, '.') 388 break 389 else: 390 modulename = None 391 # XXX This is a temporary workaround, the code above to find the config 392 # package should really be improved and moved elsewhere. 393 # See http://trac.turbogears.org/ticket/2043 394 try: 395 update_config(configfile='test.cfg', modulename=modulename) 396 except ImportError, exc: 397 import warnings 398 warnings.warn("Could not import configuration from module: %s" % exc, 399 RuntimeWarning) 400 update_config(configfile='test.cfg', modulename=None) 401 else: 402 database.set_db_uri('sqlite:///:memory:') 403 404 config.update({'global': {'engine.autoreload.on': False}}) 405 406 # Determine which class to use for "DBTest". 407 # SetUp & TearDown should behave similarly regardless of which ORM you choose. 408 409 if config.get('sqlobject.dburi'): 410 DBTest = DBTestSO 411 elif config.get('sqlalchemy.dburi'): 412 DBTest = DBTestSA 413 else: 414 raise Exception("Unable to find SQLAlchemy or SQLObject dburi") 415