Package turbogears :: Module scheduler

Source Code for Module turbogears.scheduler

   1  """Module that provides a cron-like task scheduler. 
   2   
   3  This task scheduler is designed to be used from inside your own program. 
   4  You can schedule Python functions to be called at specific intervals or 
   5  days. It uses the standard 'sched' module for the actual task scheduling, 
   6  but provides much more: 
   7   
   8  * repeated tasks (at intervals, or on specific days) 
   9  * error handling (exceptions in tasks don't kill the scheduler) 
  10  * optional to run scheduler in its own thread or separate process 
  11  * optional to run a task in its own thread or separate process 
  12   
  13  If the threading module is available, you can use the various Threaded 
  14  variants of the scheduler and associated tasks. If threading is not 
  15  available, you could still use the forked variants. If fork is also 
  16  not available, all processing is done in a single process, sequentially. 
  17   
  18  There are three Scheduler classes: 
  19   
  20      Scheduler    ThreadedScheduler    ForkedScheduler 
  21   
  22  You usually add new tasks to a scheduler using the add_interval_task or 
  23  add_daytime_task methods, with the appropriate processmethod argument 
  24  to select sequential, threaded or forked processing. NOTE: it is impossible 
  25  to add new tasks to a ForkedScheduler, after the scheduler has been started! 
  26  For more control you can use one of the following Task classes 
  27  and use schedule_task or schedule_task_abs: 
  28   
  29      IntervalTask    ThreadedIntervalTask    ForkedIntervalTask 
  30      SingleTask      ThreadedSingleTask      ForkedSingleTask 
  31      WeekdayTask     ThreadedWeekdayTask     ForkedWeekdayTask 
  32      CronLikeTask    ThreadedCronLikeTask    ForkedCronLikeTask 
  33   
  34  Kronos is the Greek god of time. Kronos scheduler (c) by Irmen de Jong. 
  35   
  36  This module is based on the Kronos scheduler by Irmen de Jong, but has been 
  37  modified to better fit within TurboGears. Vince Spicer and Mathieu Bridon have 
  38  contributed some additions and improvements such as cron-like tasks. Some fixes 
  39  have been made to make it work on Python 2.6 (adaptations sched module changes). 
  40   
  41  """ 
  42   
  43  __version__ = '2.0' 
  44   
  45  __all__ = [ 
  46   
  47      'ForkedIntervalTask', 
  48      'ForkedMonthdayTask', 
  49      'ForkedScheduler', 
  50      'ForkedSingleTask', 
  51      'ForkedTaskMixin', 
  52      'ForkedWeekdayTask', 
  53      'IntervalTask', 
  54      'MonthdayTask', 
  55      'Scheduler', 
  56      'SingleTask', 
  57      'Task', 
  58      'ThreadedIntervalTask', 
  59      'ThreadedMonthdayTask', 
  60      'ThreadedScheduler', 
  61      'ThreadedSingleTask', 
  62      'ThreadedTaskMixin', 
  63      'ThreadedWeekdayTask', 
  64      'WeekdayTask', 
  65      'add_interval_task', 
  66      'add_monthday_task', 
  67      'add_single_task', 
  68      'add_weekday_task', 
  69      'cancel', 
  70      'method', 
  71  ] 
  72   
  73  import os 
  74  import sys 
  75  import sched 
  76  import datetime 
  77  import time 
  78  import logging 
  79  import weakref 
  80  import atexit 
  81   
  82  try: 
  83      from dateutil import rrule 
  84  except ImportError: 
  85      rrule = None 
  86   
  87  from turbogears.util import Enum 
  88   
  89  log = logging.getLogger('turbogears.scheduler') 
  90   
  91  method = Enum('sequential', 'forked', 'threaded') 
  92   
  93  # bounds for each field of the cron-like syntax 
  94   
  95  MINUTE_BOUNDS = (0, 59) 
  96  HOUR_BOUNDS = (0, 23) 
  97  DOM_BOUNDS = (1, 31) 
  98  MONTH_BOUNDS = (1, 12) 
  99  DOW_BOUNDS = (0, 7) 
 100   
 101  # some fields of the cron-like syntax can be specified as names 
 102   
 103  MONTH_MAPPING = { 
 104      'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 
 105      'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12} 
 106  DOW_MAPPING = { 
 107      'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6} 
 108   
 109   
110 -class Scheduler(object):
111 """The Scheduler itself.""" 112
113 - def __init__(self):
114 self.running = True 115 self.sched = sched.scheduler(time.time, self.__delayfunc) 116 self.tasks = dict()
117
118 - def __delayfunc(self, delay):
119 """The delay function for the sched.scheduler. 120 121 This delay function is basically a time.sleep() that is divided up, 122 so that we can check the self.running flag while delaying. 123 There is an additional check in here to ensure that the top item of 124 the queue hasn't changed. 125 126 """ 127 period = 5 128 if delay < period: 129 time.sleep(delay) 130 else: 131 toptime = self._getqueuetoptime() 132 endtime = time.time() + delay 133 stoptime = endtime - period 134 while self.running and stoptime > time.time(): 135 self._acquire_lock() 136 try: 137 if not self._getqueuetoptime() == toptime: 138 break 139 finally: 140 self._release_lock() 141 time.sleep(period) 142 if not self.running or self._getqueuetoptime() != toptime: 143 return 144 now = time.time() 145 if endtime > now: 146 time.sleep(endtime - now)
147
148 - def _acquire_lock(self):
149 pass
150
151 - def _release_lock(self):
152 pass
153
154 - def add_interval_task(self, action, taskname, initialdelay, interval, 155 processmethod, args, kw):
156 """Add a new Interval Task to the schedule. 157 158 A very short initialdelay or one of zero cannot be honored, you will 159 see a slight delay before the task is first executed. This is because 160 the scheduler needs to pick it up in its loop. 161 162 """ 163 if initialdelay < 0 or interval < 1: 164 raise ValueError("Delay or interval must be >0") 165 # Select the correct IntervalTask class. Not all types may be available! 166 if processmethod == method.sequential: 167 TaskClass = IntervalTask 168 elif processmethod == method.threaded: 169 TaskClass = ThreadedIntervalTask 170 elif processmethod == method.forked: 171 TaskClass = ForkedIntervalTask 172 else: 173 raise ValueError("Invalid processmethod") 174 if not args: 175 args = [] 176 if not kw: 177 kw = {} 178 if self.running: 179 self._acquire_lock() 180 try: 181 if taskname in self.tasks: 182 raise ValueError( 183 "A task with the name %s already exists" % taskname) 184 finally: 185 self._release_lock() 186 else: 187 if taskname in self.tasks: 188 raise ValueError( 189 "A task with the name %s already exists" % taskname) 190 task = TaskClass(taskname, interval, action, args, kw) 191 self.schedule_task(task, initialdelay) 192 return task
193
194 - def add_single_task(self, action, taskname, initialdelay, processmethod, 195 args, kw):
196 """Add a new task to the scheduler that will only be executed once.""" 197 if initialdelay < 0: 198 raise ValueError("Delay must be >0") 199 # Select the correct SingleTask class. Not all types may be available! 200 if processmethod == method.sequential: 201 TaskClass = SingleTask 202 elif processmethod == method.threaded: 203 TaskClass = ThreadedSingleTask 204 elif processmethod == method.forked: 205 TaskClass = ForkedSingleTask 206 else: 207 raise ValueError("Invalid processmethod") 208 if not args: 209 args = [] 210 if not kw: 211 kw = {} 212 if self.running: 213 self._acquire_lock() 214 try: 215 if taskname in self.tasks: 216 raise ValueError( 217 "A task with the name %s already exists" % taskname) 218 finally: 219 self._release_lock() 220 else: 221 if taskname in self.tasks: 222 raise ValueError( 223 "A task with the name %s already exists" % taskname) 224 task = TaskClass(taskname, action, args, kw) 225 self.schedule_task(task, initialdelay) 226 return task
227
228 - def add_daytime_task(self, action, taskname, weekdays, monthdays, timeonday, 229 processmethod, args, kw):
230 """Add a new Day Task (Weekday or Monthday) to the schedule.""" 231 if weekdays and monthdays: 232 raise ValueError("You can only specify weekdays or monthdays, " 233 "not both") 234 if not args: 235 args = [] 236 if not kw: 237 kw = {} 238 if weekdays: 239 # Select the correct WeekdayTask class. 240 # Not all types may be available! 241 if processmethod == method.sequential: 242 TaskClass = WeekdayTask 243 elif processmethod == method.threaded: 244 TaskClass = ThreadedWeekdayTask 245 elif processmethod == method.forked: 246 TaskClass = ForkedWeekdayTask 247 else: 248 raise ValueError("Invalid processmethod") 249 if self.running: 250 self._acquire_lock() 251 try: 252 if taskname in self.tasks: 253 raise ValueError( 254 "A task with the name %s already exists" % taskname) 255 finally: 256 self._release_lock() 257 else: 258 if taskname in self.tasks: 259 raise ValueError( 260 "A task with the name %s already exists" % taskname) 261 task = TaskClass(taskname, weekdays, timeonday, action, args, kw) 262 if monthdays: 263 # Select the correct MonthdayTask class. 264 # Not all types may be available! 265 if processmethod == method.sequential: 266 TaskClass = MonthdayTask 267 elif processmethod == method.threaded: 268 TaskClass = ThreadedMonthdayTask 269 elif processmethod == method.forked: 270 TaskClass = ForkedMonthdayTask 271 else: 272 raise ValueError("Invalid processmethod") 273 if self.running: 274 self._acquire_lock() 275 try: 276 if taskname in self.tasks: 277 raise ValueError( 278 "A task with the name %s already exists" % taskname) 279 finally: 280 self._release_lock() 281 else: 282 if taskname in self.tasks: 283 raise ValueError( 284 "A task with the name %s already exists" % taskname) 285 task = TaskClass(taskname, monthdays, timeonday, action, args, kw) 286 firsttime = task.get_schedule_time(True) 287 self.schedule_task_abs(task, firsttime) 288 return task
289
290 - def add_cron_like_task(self, action, taskname, cron_str, 291 processmethod, args, kw):
292 """Add a new Cron-like Task to the schedule.""" 293 if not args: 294 args = [] 295 if not kw: 296 kw = {} 297 if processmethod == method.sequential: 298 TaskClass = CronLikeTask 299 elif processmethod == method.threaded: 300 TaskClass = ThreadedCronLikeTask 301 elif processmethod == method.forked: 302 TaskClass = ForkedCronLikeTask 303 else: 304 raise ValueError("Invalid processmethod") 305 if self.running: 306 self._acquire_lock() 307 try: 308 if taskname in self.tasks: 309 raise ValueError( 310 "A task with the name %s already exists" % taskname) 311 finally: 312 self._release_lock() 313 else: 314 if taskname in self.tasks: 315 raise ValueError( 316 "A task with the name %s already exists" % taskname) 317 task = TaskClass(taskname, action, cron_str, args, kw) 318 firsttime = task.get_schedule_time() 319 self.schedule_task_abs(task, firsttime) 320 return task
321
322 - def schedule_task(self, task, delay):
323 """Add a new task to the scheduler with the given delay (seconds). 324 325 Low-level method for internal use. 326 327 """ 328 if self.running: 329 # lock the sched queue, if needed 330 self._acquire_lock() 331 try: 332 task.event = self.sched.enter( 333 delay, 0, task, (weakref.ref(self),)) 334 if task.name: 335 self.tasks[task.name] = task 336 else: 337 log.debug("task %s doesn't have a name" % task) 338 finally: 339 self._release_lock() 340 else: 341 task.event = self.sched.enter( 342 delay, 0, task, (weakref.ref(self),)) 343 if task.name: 344 self.tasks[task.name] = task 345 else: 346 log.debug("task %s doesn't have a name" % task)
347
348 - def schedule_task_abs(self, task, abstime):
349 """Add a new task to the scheduler for the given absolute time value. 350 351 Low-level method for internal use. 352 353 """ 354 if self.running: 355 # lock the sched queue, if needed 356 self._acquire_lock() 357 try: 358 task.event = self.sched.enterabs( 359 abstime, 0, task, (weakref.ref(self),)) 360 if task.name: 361 self.tasks[task.name] = task 362 else: 363 log.debug("task %s doesn't have a name" % task) 364 finally: 365 self._release_lock() 366 else: 367 task.event = self.sched.enterabs( 368 abstime, 0, task, (weakref.ref(self),)) 369 if task.name: 370 self.tasks[task.name] = task 371 else: 372 log.debug("task %s doesn't have a name" % task)
373
374 - def start(self):
375 """Start the scheduler.""" 376 self._run()
377
378 - def stop(self):
379 """Remove all pending tasks and stop the scheduler.""" 380 self.running = False 381 self._clearschedqueue()
382
383 - def cancel(self, task):
384 """Cancel given scheduled task.""" 385 if self.running: 386 self._acquire_lock() 387 try: 388 self.sched.cancel(task.event) 389 if task.name and task.name in self.tasks: 390 del self.tasks[task.name] 391 finally: 392 self._release_lock() 393 else: 394 self.sched.cancel(task.event) 395 if task.name and task.name in self.tasks: 396 del self.tasks[task.name]
397
398 - def rename(self, taskname, newname):
399 """Rename a scheduled task.""" 400 if taskname and taskname in self.tasks: 401 task = self.tasks.pop(taskname) 402 task.name = newname 403 self.tasks[newname] = task
404 405 if sys.version_info >= (2, 6): 406 # code for sched module of Python >= 2.6
407 - def _getqueuetoptime(self):
408 if len(self.sched._queue): 409 return self.sched._queue[0].time
410 - def _clearschedqueue(self):
411 self.sched._queue[:] = []
412 else: 413 # code for sched module of Python < 2.6
414 - def _getqueuetoptime(self):
415 if len(self.sched.queue) and len(self.sched.queue[0]): 416 return self.sched.queue[0][0]
417 - def _clearschedqueue(self):
418 self.sched.queue[:] = []
419
420 - def _run(self):
421 """Low-level run method to do the actual scheduling loop.""" 422 period = 0 # increase only slowly 423 while self.running: 424 try: 425 self.sched.run() 426 except Exception: 427 log.error("ERROR DURING SCHEDULER EXECUTION", exc_info=1) 428 # queue is empty; sleep a short while before checking again 429 if self.running: 430 if period < 5: 431 period += 0.25 432 time.sleep(period)
433 434
435 -class Task (object):
436 """Abstract base class of all scheduler tasks""" 437
438 - def __init__(self, name, action, args, kw):
439 """This is an abstract class.""" 440 self.name = name 441 self.action = action 442 self.args = args 443 self.kw = kw
444
445 - def __call__(self, schedulerref):
446 """Execute the task action in the scheduler's thread.""" 447 try: 448 self.execute() 449 except Exception: 450 self.handle_exception() 451 self.reschedule(schedulerref())
452
453 - def reschedule(self, scheduler):
454 """This method should be defined in one of the sub classes.""" 455 raise NotImplementedError("You're using the abstract base class 'Task'," 456 " use a concrete class instead")
457
458 - def execute(self):
459 """Execute the actual task.""" 460 self.action(*self.args, **self.kw)
461
462 - def handle_exception(self):
463 """Handle any exception that occurred during task execution. """ 464 log.error("ERROR DURING TASK EXECUTION", exc_info=1)
465 466
467 -class SingleTask(Task):
468 """A task that only runs once.""" 469
470 - def reschedule(self, scheduler):
471 pass
472 473
474 -class IntervalTask(Task):
475 """A repeated task that occurs at certain intervals (in seconds).""" 476
477 - def __init__(self, name, interval, action, args=None, kw=None):
478 Task.__init__(self, name, action, args, kw) 479 self.interval = interval
480
481 - def reschedule(self, scheduler):
482 """Reschedule this task according to its interval (in seconds).""" 483 if scheduler.running: 484 scheduler.schedule_task(self, self.interval)
485 486
487 -class DayTaskRescheduler(object):
488 """A mixin class that contains the reschedule logic for the DayTasks.""" 489
490 - def __init__(self, timeonday):
491 self.timeonday = timeonday
492
493 - def get_schedule_time(self, today):
494 """Calculate the time value at which this task is to be scheduled.""" 495 now = list(time.localtime()) 496 if today: 497 # schedule for today. let's see if that is still possible 498 if (now[3], now[4]) >= self.timeonday: 499 # too bad, it will be tomorrow 500 now[2] += 1 501 else: 502 # tomorrow 503 now[2] += 1 504 # set new time on day (hour,minute) 505 now[3], now[4] = self.timeonday 506 # seconds 507 now[5] = 0 508 return time.mktime(now)
509
510 - def reschedule(self, scheduler):
511 """Reschedule this task according to the daytime for the task. 512 513 The task is scheduled for tomorrow, for the given daytime. 514 515 """ 516 # (The execute method in the concrete Task classes will check 517 # if the current day is a day on which the task must run). 518 if scheduler.running: 519 abstime = self.get_schedule_time(False) 520 scheduler.schedule_task_abs(self, abstime)
521 522
523 -class WeekdayTask(DayTaskRescheduler, Task):
524 """A task that is run on a given weekday. 525 526 The task is called at specific days in a week (1-7), at a fixed time 527 on the day. 528 529 """ 530
531 - def __init__(self, name, weekdays, timeonday, action, args=None, kw=None):
532 if type(timeonday) not in (list, tuple) or len(timeonday) != 2: 533 raise TypeError("timeonday must be a 2-tuple (hour,minute)") 534 if type(weekdays) not in (list, tuple): 535 raise TypeError("weekdays must be a sequence of weekday numbers " 536 "1-7 (1 is Monday)") 537 DayTaskRescheduler.__init__(self, timeonday) 538 Task.__init__(self, name, action, args, kw) 539 self.days = weekdays
540
541 - def execute(self):
542 # This is called every day, at the correct time. We only need to 543 # check if we should run this task today (this day of the week). 544 weekday = time.localtime().tm_wday + 1 545 if weekday in self.days: 546 self.action(*self.args, **self.kw)
547 548
549 -class MonthdayTask(DayTaskRescheduler, Task):
550 """A task that is run on a given day every month. 551 552 The task is called at specific days in a month (1-31), at a fixed 553 time on the day. 554 555 """ 556
557 - def __init__(self, name, monthdays, timeonday, action, args=None, kw=None):
558 if type(timeonday) not in (list, tuple) or len(timeonday) != 2: 559 raise TypeError("timeonday must be a 2-tuple (hour,minute)") 560 if type(monthdays) not in (list, tuple): 561 raise TypeError("monthdays must be a sequence of monthdays numbers " 562 "1-31") 563 DayTaskRescheduler.__init__(self, timeonday) 564 Task.__init__(self, name, action, args, kw) 565 self.days = monthdays
566
567 - def execute(self):
568 # This is called every day, at the correct time. We only need to 569 # check if we should run this task today (this day of the month). 570 if time.localtime().tm_mday in self.days: 571 self.action(*self.args, **self.kw)
572 573
574 -class CronLikeTask(Task):
575 """A task that is scheduled based on a cron-like string. 576 577 A cron string is composed of **five** time and date fields, 578 **separated by spaces**. 579 580 Note: Tasks are executed when **all** the time and date fields match 581 the current time. This is an important difference with the UNIX cron. 582 583 The time and date fields are: 584 585 ============= ================================== 586 field allowed values 587 ============= ================================== 588 minute 0-59 589 hour 0-23 590 day of month 1-31 591 month 1-12 (or names, see below) 592 day of week 0-7 (0 or 7 is Sun, or use names) 593 ============= ================================== 594 595 A field may be an **asterisk** (``*``), which always stands for 596 *first-last*. 597 598 **Ranges** of numbers are allowed. Ranges are two numbers separated with 599 a hyphen. The specified range is inclusive. For example, ``8-11`` for 600 an "hours" entry specifies *execution at hours 8, 9, 10 and 11*. 601 602 **Lists** are allowed. A list is a set of numbers (or ranges) separated 603 by commas. Examples: ``1,2,5,9``, ``0-4,8-12``. 604 605 **Step values** can be used in conjunction with ranges. Following a 606 range with ``/<number>`` specifies skips of the number's value through 607 the range. For example,``0-23/2`` can be used in the "hours" field to 608 specify task execution *every other hour* (the alternative in the V7 609 standard is ``0,2,4,6,8,10,12,14,16,18,20,22``). Steps are also 610 permitted after an asterisk, so if you want to say *every two hours*, 611 just use ``*/2``. 612 613 **Names** can also be used for the "month" and "day of week" fields. Use 614 the first three letters of the particular day or month (case doesn't 615 matter). Ranges of mixed names and integer are not permitted. 616 617 """ 618
619 - def __init__(self, name, action, cron_str, args=None, kw=None):
620 """Initialize the task.""" 621 Task.__init__(self, name, action, args, kw) 622 623 if not rrule: 624 raise ImportError("The dateutil package must be installed" 625 " if you want to use cron-like tasks.") 626 627 self.cron_str = cron_str 628 try: 629 min_str, hour_str, dom_str, month_str, dow_str = cron_str.split() 630 except: 631 raise ValueError("Invalid value: %s" % cron_str) 632 633 self.minutes = self.__process_str(min_str, MINUTE_BOUNDS) 634 self.hours = self.__process_str(hour_str, HOUR_BOUNDS) 635 self.doms = self.__process_str(dom_str, DOM_BOUNDS) 636 self.months = self.__process_str(month_str, MONTH_BOUNDS, 637 mapping=MONTH_MAPPING) 638 639 # dows are somewhat special: 640 # * Cron accepts both 0 and 7 for Sunday 641 # => we deal with that using the "% 7" operator and a temporary set 642 # * Python starts with Monday = 0 while Cron starts with Sunday = 0 643 # => we deal with that by substracting 1 644 # * (SUN - 1) % 7 = (0 - 1) % 7 = 6 645 # => we need to sort the list 646 self.dows = list(set([ (dow - 1) % 7 647 for dow in self.__process_str(dow_str, 648 DOW_BOUNDS, mapping=DOW_MAPPING) ])) 649 self.dows.sort()
650
651 - def __process_str(self, time_str, bounds, mapping=None):
652 """Transforms a field of the cron-like string into a list. 653 654 @param time_str: a field in the cron-like string 655 @type time_str: string 656 657 @param bounds: the acceptable limits for this field 658 @type bounds: 2-tuple of integers 659 660 @param mapping: the mapping between names and integer values 661 @type mapping: dict 662 663 """ 664 freq = 1 665 if '/' in time_str: 666 try: 667 time_str, freq = time_str.split('/') 668 freq = int(freq) 669 except: 670 raise ValueError("Invalid value: '%s'" % time_str) 671 672 if time_str == '*': 673 result = range(bounds[0], bounds[1] + 1) 674 return result[::freq] 675 676 result = list() 677 678 for item in time_str.split(','): 679 if '-' not in item: 680 # ex: time_str = "1,4,23" 681 try: 682 time = int(item) 683 except: 684 if mapping and item.lower() in mapping: 685 time = mapping[item.lower()] 686 else: 687 raise ValueError("Invalid value: '%s'" % time_str) 688 689 if time < bounds[0] or time > bounds [1]: 690 raise ValueError("Invalid value: '%s'" % time_str) 691 692 result.append(time) 693 else: 694 # ex: time_str = "1-4,7-9" 695 try: 696 interval_low, interval_high = item.split('-') 697 except: 698 # an interval can only have one dash 699 raise ValueError("Invalid value: '%s'" % time_str) 700 701 try: 702 # intervals are specified as integers 703 time_low = int(interval_low) 704 time_high = int(interval_high) 705 except: 706 if (mapping and interval_low.lower() in mapping 707 and interval_high.lower() in mapping): 708 # in some cases names can be used (months or dows) 709 time_low = mapping[interval_low.lower()] 710 time_high = mapping[interval_high.lower()] 711 else: 712 raise ValueError("Invalid value: '%s'" % time_str) 713 714 if time_low < bounds[0] or time_high > bounds [1]: 715 raise ValueError("Invalid value: '%s'" % time_str) 716 717 result.extend(range(time_low, time_high + 1)) 718 719 # filter results by frequency 720 return result[::freq]
721
722 - def get_schedule_time(self):
723 """Determine the next execution time of the task.""" 724 now = datetime.datetime.now() 725 726 # rrule will return `now` as the next execution time if `now` fills the 727 # criteria. This has the nasty effect of relaunching the same task over 728 # and over during one second. 729 # That only happens when `now.second == 0` (which is always the case 730 # after the first execution as cron doesn't handle anything below the 731 # minute), so let's add one second to `now`, just to be sure 732 now += datetime.timedelta(seconds=1) 733 734 next_time = list(rrule.rrule(rrule.SECONDLY, count=1, bysecond=0, 735 byminute=self.minutes, byhour=self.hours, 736 bymonthday=self.doms, bymonth=self.months, byweekday=self.dows, 737 dtstart=now)[0].timetuple()) 738 739 return time.mktime(next_time)
740
741 - def reschedule(self, scheduler):
742 """Reschedule this task according to its cron-like string.""" 743 if scheduler.running: 744 abstime = self.get_schedule_time() 745 scheduler.schedule_task_abs(self, abstime)
746 747 748 try: 749 import threading 750
751 - class ThreadedScheduler(Scheduler):
752 """A Scheduler that runs in its own thread.""" 753
754 - def __init__(self):
755 Scheduler.__init__(self) 756 # we require a lock around the task queue 757 self.thread = None 758 self._lock = threading.Lock()
759
760 - def start(self):
761 """Splice off a thread in which the scheduler will run.""" 762 self.thread = threading.Thread(target=self._run) 763 self.thread.setDaemon(True) 764 self.thread.start()
765
766 - def stop(self):
767 """Stop the scheduler and wait for the thread to finish.""" 768 Scheduler.stop(self) 769 try: 770 self.thread.join() 771 except AttributeError: 772 pass
773
774 - def _acquire_lock(self):
775 """Lock the thread's task queue.""" 776 self._lock.acquire()
777
778 - def _release_lock(self):
779 """Release the lock on the thread's task queue.""" 780 self._lock.release()
781 782
783 - class ThreadedTaskMixin(object):
784 """A mixin class to make a Task execute in a separate thread.""" 785
786 - def __call__(self, schedulerref):
787 """Execute the task action in its own thread.""" 788 threading.Thread(target=self.threadedcall).start() 789 self.reschedule(schedulerref())
790
791 - def threadedcall(self):
792 """ 793 This method is run within its own thread, so we have to 794 do the execute() call and exception handling here. 795 """ 796 try: 797 self.execute() 798 except Exception: 799 self.handle_exception()
800
801 - class ThreadedIntervalTask(ThreadedTaskMixin, IntervalTask):
802 """Interval Task that executes in its own thread.""" 803 pass
804
805 - class ThreadedSingleTask(ThreadedTaskMixin, SingleTask):
806 """Single Task that executes in its own thread.""" 807 pass
808
809 - class ThreadedWeekdayTask(ThreadedTaskMixin, WeekdayTask):
810 """Weekday Task that executes in its own thread.""" 811 pass
812
813 - class ThreadedMonthdayTask(ThreadedTaskMixin, MonthdayTask):
814 """Monthday Task that executes in its own thread.""" 815 pass
816
817 - class ThreadedCronLikeTask(ThreadedTaskMixin, CronLikeTask):
818 """Cron-like Task that executes in its own thread.""" 819 pass
820 821 except ImportError: 822 # threading is not available 823 pass 824 825 826 if hasattr(os, 'fork'): 827 import signal 828
829 - class ForkedScheduler(Scheduler):
830 """A Scheduler that runs in its own forked process.""" 831
832 - def __del__(self):
833 if hasattr(self, 'childpid'): 834 os.kill(self.childpid, signal.SIGKILL)
835
836 - def start(self):
837 """Fork off a new process in which the scheduler will run.""" 838 pid = os.fork() 839 if pid == 0: 840 # we are the child 841 signal.signal(signal.SIGUSR1, self.signalhandler) 842 self._run() 843 os._exit(0) 844 else: 845 # we are the parent 846 self.childpid = pid 847 # can no longer insert in the scheduler queue 848 del self.sched
849
850 - def stop(self):
851 """Stop the scheduler and wait for the process to finish.""" 852 os.kill(self.childpid, signal.SIGUSR1) 853 os.waitpid(self.childpid, 0)
854
855 - def signalhandler(self, sig, stack):
856 Scheduler.stop(self)
857 858
859 - class ForkedTaskMixin(object):
860 """A mixin class to make a Task execute in a separate process.""" 861
862 - def __call__(self, schedulerref):
863 """Execute the task action in its own process.""" 864 pid = os.fork() 865 if pid == 0: 866 # we are the child 867 try: 868 self.execute() 869 except Exception: 870 self.handle_exception() 871 os._exit(0) 872 else: 873 # we are the parent 874 self.reschedule(schedulerref())
875 876
877 - class ForkedIntervalTask(ForkedTaskMixin, IntervalTask):
878 """Interval Task that executes in its own process.""" 879 pass
880
881 - class ForkedSingleTask(ForkedTaskMixin, SingleTask):
882 """Single Task that executes in its own process.""" 883 pass
884
885 - class ForkedWeekdayTask(ForkedTaskMixin, WeekdayTask):
886 """Weekday Task that executes in its own process.""" 887 pass
888
889 - class ForkedMonthdayTask(ForkedTaskMixin, MonthdayTask):
890 """Monthday Task that executes in its own process.""" 891 pass
892
893 - class ForkedCronLikeTask(ForkedTaskMixin, CronLikeTask):
894 """Cron-like Task that executes in its own process.""" 895 pass
896 897 898 _scheduler_instance = None 899
900 -def _get_scheduler():
901 """Find the best available scheduler.""" 902 global _scheduler_instance 903 si = _scheduler_instance 904 if not si: 905 try: 906 si = ThreadedScheduler() 907 except NameError: 908 try: 909 si = ForkedScheduler() 910 except NameError: 911 si = Scheduler() 912 log.debug("Using Sequential scheduler") 913 else: 914 log.debug("Using Forked scheduler") 915 else: 916 log.debug("Using threaded scheduler") 917 _scheduler_instance = si 918 return si
919 920
921 -def _start_scheduler():
922 """Start the scheduler and register shutdown at exit.""" 923 log.info("Starting the scheduler...") 924 si = _get_scheduler() 925 si.start() 926 atexit.register(_stop_scheduler) 927 return si
928 929
930 -def _stop_scheduler():
931 """Stop the scheduler.""" 932 log.info("Shutting down the scheduler...") 933 if not _scheduler_instance: 934 return 935 si = _get_scheduler() 936 si.stop()
937 938
939 -def add_interval_task(action, interval, args=None, kw=None, 940 initialdelay=0, processmethod=method.threaded, taskname=None):
941 """Add an interval task to the scheduler. 942 943 Pass in initialdelay with a number of seconds to wait before running and 944 an interval with the number of seconds between runs. 945 946 For example, an initialdelay of 600 and interval of 60 would mean 947 "start running after 10 minutes and run every 1 minute after that". 948 949 @param interval: The interval in seconds between executing the action 950 @param initaldelay: the initial delay before starting execution 951 952 @param action: The callable that will be called at the time you request 953 @param args: Tuple of positional parameters to pass to the action 954 @param kw: Keyword arguments to pass to the action 955 @param taskname: Tasks can have a name (stored in task.name), which can 956 help if you're trying to keep track of many tasks. 957 @param precessmethod: By default, each task will be run in a new thread. 958 You can also pass in turbogears.scheduler.method.sequential or 959 turbogears.scheduler.method.forked. 960 961 """ 962 si = _get_scheduler() 963 return si.add_interval_task(action=action, interval=interval, args=args, 964 kw=kw, initialdelay=initialdelay, processmethod=processmethod, 965 taskname=taskname)
966 967
968 -def add_single_task(action, args=None, kw=None, 969 initialdelay=0, processmethod=method.threaded, taskname=None):
970 """Add a single task to the scheduler. 971 972 Runs a task once. Pass in ``initialdelay`` with a number of seconds 973 to wait before running. 974 975 @param initaldelay: the initial delay before starting execution 976 977 @param action: The callable that will be called at the time you request 978 @param args: Tuple of positional parameters to pass to the action 979 @param kw: Keyword arguments to pass to the action 980 @param taskname: Tasks can have a name (stored in task.name), which can 981 help if you're trying to keep track of many tasks. 982 @param precessmethod: By default, each task will be run in a new thread. 983 You can also pass in turbogears.scheduler.method.sequential or 984 turbogears.scheduler.method.forked. 985 986 """ 987 si = _get_scheduler() 988 return si.add_single_task(action=action, args=args, kw=kw, 989 initialdelay=initialdelay, processmethod=processmethod, 990 taskname=taskname)
991 992
993 -def add_weekday_task(action, weekdays, timeonday, args=None, kw=None, 994 processmethod=method.threaded, taskname=None):
995 """Add a weekday task to the scheduler. 996 997 Runs on certain days of the week. Pass in a list or tuple of weekdays 998 from 1-7 (where 1 is Monday). Additionally, you need to pass in 999 timeonday which is the time of day to run. timeonday should be a tuple 1000 with (hour, minute). 1001 1002 @param weekdays: list ot tuple of weekdays to execute action 1003 @param timeonday: tuple (hour, minute), to run on weekday 1004 1005 @param action: The callable that will be called at the time you request 1006 @param args: Tuple of positional parameters to pass to the action 1007 @param kw: Keyword arguments to pass to the action 1008 @param taskname: Tasks can have a name (stored in task.name), which can 1009 help if you're trying to keep track of many tasks. 1010 @param precessmethod: By default, each task will be run in a new thread. 1011 You can also pass in turbogears.scheduler.method.sequential or 1012 turbogears.scheduler.method.forked. 1013 1014 """ 1015 si = _get_scheduler() 1016 return si.add_daytime_task(action=action, taskname=taskname, 1017 weekdays=weekdays, monthdays=None, timeonday=timeonday, 1018 processmethod=processmethod, args=args, kw=kw)
1019 1020 add_weekly_task = add_weekday_task # alias 1021 1022
1023 -def add_monthday_task(action, monthdays, timeonday, args=None, kw=None, 1024 processmethod=method.threaded, taskname=None):
1025 """Add a monthly task to the scheduler. 1026 1027 Runs on certain days of the month. Pass in a list or tuple of monthdays 1028 from 1-31, import and also pass in timeonday which is an (hour, minute) 1029 tuple of the time of day to run the task. 1030 1031 @param monthdays: list ot tuple of monthdays to execute action 1032 @param timeonday: tuple (hour, minute), to run on monthday 1033 1034 @param action: The callable that will be called at the time you request 1035 @param args: Tuple of positional parameters to pass to the action 1036 @param kw: Keyword arguments to pass to the action 1037 @param taskname: Tasks can have a name (stored in task.name), which can 1038 help if you're trying to keep track of many tasks. 1039 @param precessmethod: By default, each task will be run in a new thread. 1040 You can also pass in turbogears.scheduler.method.sequential or 1041 turbogears.scheduler.method.forked. 1042 1043 """ 1044 si = _get_scheduler() 1045 return si.add_daytime_task(action=action, taskname=taskname, 1046 weekdays=None, monthdays=monthdays, timeonday=timeonday, 1047 processmethod=processmethod, args=args, kw=kw)
1048 1049 add_monthly_task = add_monthday_task # alias 1050 1051
1052 -def add_cron_like_task(action, cron_str, args=None, kw=None, 1053 processmethod=method.threaded, taskname=None):
1054 """Add a task to the scheduler based on a cron-like syntax. 1055 1056 @param cron_str: The scheduling information, written in a cron-like syntax 1057 1058 @param action: The callable that will be called at the time you request 1059 @param args: Tuple of positional parameters to pass to the action 1060 @param kw: Keyword arguments to pass to the action 1061 @param taskname: Tasks can have a name (stored in task.name), which can 1062 help if you're trying to keep track of many tasks. 1063 @param processmethod: By default, each task will be run in a new thread. 1064 You can also pass in turbogears.scheduler.method.sequential or 1065 turbogears.scheduler.method.forked. 1066 1067 """ 1068 si = _get_scheduler() 1069 return si.add_cron_like_task(action=action, taskname=taskname, 1070 cron_str=cron_str, processmethod=processmethod, 1071 args=args, kw=kw)
1072 1073
1074 -def get_task(taskname):
1075 """Retrieve a task from the scheduler by task name. 1076 1077 @param taskname: the name of the task to retrieve 1078 1079 """ 1080 si = _get_scheduler() 1081 return si.tasks.get(taskname)
1082 1083
1084 -def get_tasks():
1085 """Retrieve all tasks from the scheduler.""" 1086 si = _get_scheduler() 1087 return si.tasks
1088 1089
1090 -def rename_task(taskname, newname):
1091 """Rename a scheduled task.""" 1092 si = _get_scheduler() 1093 si.rename(taskname, newname)
1094 1095
1096 -def cancel(task):
1097 """Cancel task by task name. 1098 1099 @param task: the task.name of the task to cancel 1100 1101 """ 1102 si = _get_scheduler() 1103 si.cancel(task)
1104