Package turbogears :: Module scheduler

Source Code for Module turbogears.scheduler

  1  """Module that provides a cron-like task scheduler. 
  2   
  3  This task scheduler is designed to be used from inside your own program. 
  4  You can schedule Python functions to be called at specific intervals or 
  5  days. It uses the standard 'sched' module for the actual task scheduling, 
  6  but provides much more: 
  7      - repeated tasks (at intervals, or on specific days) 
  8      - error handling (exceptions in tasks don't kill the scheduler) 
  9      - optional to run scheduler in its own thread or separate process 
 10      - optional to run a task in its own thread or separate process 
 11   
 12  If the threading module is available, you can use the various Threaded 
 13  variants of the scheduler and associated tasks. If threading is not 
 14  available, you could still use the forked variants. If fork is also 
 15  not available, all processing is done in a single process, sequentially. 
 16   
 17  There are three Scheduler classes: 
 18      Scheduler    ThreadedScheduler    ForkedScheduler 
 19   
 20  You usually add new tasks to a scheduler using the add_interval_task or 
 21  add_daytime_task methods, with the appropriate processmethod argument 
 22  to select sequential, threaded or forked processing. NOTE: it is impossible 
 23  to add new tasks to a ForkedScheduler, after the scheduler has been started! 
 24  For more control you could use one of the following Task classes 
 25  and use schedule_task or schedule_task_abs: 
 26      IntervalTask    ThreadedIntervalTask    ForkedIntervalTask 
 27      WeekdayTask     ThreadedWeekdayTask     ForkedWeekdayTask 
 28      MonthdayTask    ThreadedMonthdayTask    ForkedMonthdayTask 
 29   
 30  Kronos is the Greek God of Time. 
 31   
 32  This module is based on Kronos by Irmen de Jong, but has been modified 
 33  to better fit within TurboGears. Additionally, this module appeared to 
 34  no longer be supported/in development. 
 35  """ 
 36  # 
 37  #   $Id: kronos.py,v 1.5 2004/10/06 22:43:49 irmen Exp $ 
 38  # 
 39  #   (c) Irmen de Jong. 
 40  #   This is open-source software, released under the MIT Software License: 
 41  #   http://www.opensource.org/licenses/mit-license.php 
 42  # 
 43   
 44   
 45  import os, sys 
 46  import sched, time 
 47  import traceback 
 48  import weakref 
 49   
 50  from turbogears.util import Enum 
 51   
 52  method = Enum("sequential", "forked", "threaded") 
 53   
54 -class Scheduler:
55 """The Scheduler itself.""" 56
57 - def __init__(self):
58 self.running=True 59 self.sched = sched.scheduler(time.time, self.__delayfunc)
60
61 - def __delayfunc(self, delay):
62 # This delay function is basically a time.sleep() that is 63 # divided up, so that we can check the self.running flag while delaying. 64 # there is an additional check in here to ensure that the top item of 65 # the queue hasn't changed 66 if delay<10: 67 time.sleep(delay) 68 else: 69 toptime = self.sched.queue[0][0] 70 endtime = time.time() + delay 71 period=5 72 stoptime = endtime - period 73 while self.running and stoptime > time.time() and \ 74 self.sched.queue[0][0] == toptime: 75 time.sleep(period) 76 if not self.running or self.sched.queue[0][0] != toptime: 77 return 78 now = time.time() 79 if endtime > now: 80 time.sleep(endtime - now)
81
82 - def _acquire_lock(self): pass
83 - def _release_lock(self): pass
84
85 - def add_interval_task(self, action, taskname, initialdelay, interval, processmethod, args, kw):
86 """Add a new Interval Task to the schedule. A very short initialdelay or one of 87 zero cannot be honored, you will see a slight delay before the task is first 88 executed. This is because the scheduler needs to pick it up in its loop.""" 89 if initialdelay<0 or interval<1: 90 raise ValueError("delay or interval must be >0") 91 # Select the correct IntervalTask class. Not all types may be available! 92 if processmethod==method.sequential: 93 TaskClass=IntervalTask 94 elif processmethod==method.threaded: 95 TaskClass = ThreadedIntervalTask 96 elif processmethod==method.forked: 97 TaskClass = ForkedIntervalTask 98 else: 99 raise ValueError("invalid processmethod") 100 if not args: 101 args=[] 102 if not kw: 103 kw={} 104 task = TaskClass(taskname, interval, action, args, kw) 105 self.schedule_task(task, initialdelay) 106 return task
107
108 - def add_daytime_task(self, action, taskname, weekdays, monthdays, timeonday, processmethod, args, kw):
109 """Add a new Day Task (Weekday or Monthday) to the schedule.""" 110 if weekdays and monthdays: 111 raise ValueError("you can only specify weekdays or monthdays, not both") 112 if not args: 113 args=[] 114 if not kw: 115 kw={} 116 if weekdays: 117 # Select the correct WeekdayTask class. Not all types may be available! 118 if processmethod==method.sequential: 119 TaskClass=WeekdayTask 120 elif processmethod==method.threaded: 121 TaskClass = ThreadedWeekdayTask 122 elif processmethod==method.forked: 123 TaskClass = ForkedWeekdayTask 124 else: 125 raise ValueError("invalid processmethod") 126 task=TaskClass(taskname, weekdays, timeonday, action, args, kw) 127 if monthdays: 128 # Select the correct MonthdayTask class. Not all types may be available! 129 if processmethod==method.sequential: 130 TaskClass=MonthdayTask 131 elif processmethod==method.threaded: 132 TaskClass = ThreadedMonthdayTask 133 elif processmethod==method.forked: 134 TaskClass = ForkedMonthdayTask 135 else: 136 raise ValueError("invalid processmethod") 137 task=TaskClass(taskname, monthdays, timeonday, action, args, kw) 138 firsttime=task.get_schedule_time(True) 139 self.schedule_task_abs(task, firsttime) 140 return task
141
142 - def schedule_task(self, task, delay):
143 """Low-level method to add a new task to the scheduler with the given delay (seconds).""" 144 if self.running: 145 self._acquire_lock() # lock the sched queue, if needed 146 try: 147 task.event = self.sched.enter(delay, 0, task, 148 (weakref.ref(self),) ) 149 finally: 150 self._release_lock() 151 else: 152 task.event = self.sched.enter(delay, 0, task, 153 (weakref.ref(self),) )
154
155 - def schedule_task_abs(self, task, abstime):
156 """Low-level method to add a new task to the scheduler for the given absolute time value.""" 157 if self.running: 158 self._acquire_lock() # lock the sched queue, if needed 159 try: 160 task.event = self.sched.enterabs(abstime, 0, task, 161 (weakref.ref(self),) ) 162 finally: 163 self._release_lock() 164 else: 165 task.event = self.sched.enterabs(abstime, 0, task, 166 (weakref.ref(self),) )
167 168
169 - def start(self):
170 """Start the scheduler.""" 171 self._run()
172
173 - def stop(self):
174 """Remove all pending tasks and stop the Scheduler.""" 175 self.running=False 176 self.sched.queue[:]=[]
177
178 - def cancel(self, task):
179 self.sched.cancel(task.event)
180
181 - def _run(self):
182 # Low-level run method to do the actual scheduling loop. 183 while self.running: 184 try: 185 self.sched.run() 186 except Exception,x: 187 print >>sys.stderr, "ERROR DURING SCHEDULER EXECUTION",x 188 print >>sys.stderr, "".join(traceback.format_exception(*sys.exc_info())) 189 print >>sys.stderr, "-"*20 190 # queue is empty; sleep a short while before checking again 191 if self.running: 192 time.sleep(5)
193 194
195 -class Task:
196 """Abstract base class of all scheduler tasks"""
197 - def __init__(self, name, action, args, kw):
198 """This is an abstract class!""" 199 self.name=name 200 self.action=action 201 self.args=args 202 self.kw=kw
203
204 - def __call__(self, schedulerref):
205 """Execute the task action in the scheduler's thread.""" 206 try: 207 self.execute() 208 except Exception,x: 209 self.handle_exception(x) 210 self.reschedule(schedulerref())
211
212 - def reschedule(self, scheduler):
213 """This is an abstract class, this method is defined in one of the sub classes!""" 214 raise NotImplementedError("you're using the abstract base class 'Task', use a concrete class instead")
215
216 - def execute(self):
217 """Execute the actual task.""" 218 self.action(*self.args, **self.kw)
219
220 - def handle_exception(self, exc):
221 """Handle any exception that occured during task execution.""" 222 print >>sys.stderr, "ERROR DURING TASK EXECUTION",exc 223 print >>sys.stderr,"".join(traceback.format_exception(*sys.exc_info())) 224 print >>sys.stderr,"-"*20
225 226
227 -class IntervalTask(Task):
228 """A repeated task that occurs at certain intervals (in seconds)."""
229 - def __init__(self, name, interval, action, args=None, kw=None):
230 Task.__init__(self, name, action, args, kw) 231 self.interval=interval
232
233 - def reschedule(self, scheduler):
234 # reschedule this task according to its interval (in seconds). 235 scheduler.schedule_task(self, self.interval)
236 237 238
239 -class DayTaskRescheduler:
240 """A mixin class that contains the reschedule logic for the DayTasks."""
241 - def __init__(self, timeonday):
242 self.timeonday=timeonday
243
244 - def get_schedule_time(self, today):
245 """Calculate the time value at which this task is to be scheduled.""" 246 now=list(time.localtime()) 247 if today: 248 # schedule for today. let's see if that is still possible 249 if (now[3], now[4]) >= self.timeonday: 250 now[2]+=1 # too bad, it will be tomorrow 251 else: 252 now[2]+=1 # tomorrow 253 now[3], now[4] = self.timeonday # set new time on day (hour,minute) 254 now[5]=0 # seconds 255 return time.mktime(now)
256
257 - def reschedule(self, scheduler):
258 # Reschedule this task according to the daytime for the task. 259 # The task is scheduled for tomorrow, for the given daytime. 260 # (The execute method in the concrete Task classes will check 261 # if the current day is a day on which the task must run). 262 abstime = self.get_schedule_time(False) 263 scheduler.schedule_task_abs(self, abstime)
264 265
266 -class WeekdayTask(DayTaskRescheduler, Task):
267 """A task that is called at specific days in a week (1-7), at a fixed time on the day."""
268 - def __init__(self, name, weekdays, timeonday, action, args=None, kw=None):
269 if type(timeonday) not in (list,tuple) or len(timeonday) != 2: 270 raise TypeError("timeonday must be a 2-tuple (hour,minute)") 271 if type(weekdays) not in (list,tuple): 272 raise TypeError("weekdays must be a sequence of weekday numbers 1-7 (1 is Monday)") 273 DayTaskRescheduler.__init__(self, timeonday) 274 Task.__init__(self, name, action, args, kw) 275 self.days=weekdays
276
277 - def execute(self):
278 # This is called every day, at the correct time. We only need to 279 # check if we should run this task today (this day of the week). 280 weekday=time.localtime().tm_wday+1 281 if weekday in self.days: 282 self.action(*self.args, **self.kw)
283
284 -class MonthdayTask(DayTaskRescheduler, Task):
285 """A task that is called at specific days in a month (1-31), at a fixed time on the day."""
286 - def __init__(self, name, monthdays, timeonday, action, args=None, kw=None):
287 if type(timeonday) not in (list,tuple) or len(timeonday) != 2: 288 raise TypeError("timeonday must be a 2-tuple (hour,minute)") 289 if type(monthdays) not in (list,tuple): 290 raise TypeError("monthdays must be a sequence of monthdays numbers 1-31") 291 DayTaskRescheduler.__init__(self, timeonday) 292 Task.__init__(self, name, action, args, kw) 293