1 """Module that provides a cron-like task scheduler.
2
3 This task scheduler is designed to be used from inside your own program.
4 You can schedule Python functions to be called at specific intervals or
5 days. It uses the standard 'sched' module for the actual task scheduling,
6 but provides much more:
7 - repeated tasks (at intervals, or on specific days)
8 - error handling (exceptions in tasks don't kill the scheduler)
9 - optional to run scheduler in its own thread or separate process
10 - optional to run a task in its own thread or separate process
11
12 If the threading module is available, you can use the various Threaded
13 variants of the scheduler and associated tasks. If threading is not
14 available, you could still use the forked variants. If fork is also
15 not available, all processing is done in a single process, sequentially.
16
17 There are three Scheduler classes:
18 Scheduler ThreadedScheduler ForkedScheduler
19
20 You usually add new tasks to a scheduler using the add_interval_task or
21 add_daytime_task methods, with the appropriate processmethod argument
22 to select sequential, threaded or forked processing. NOTE: it is impossible
23 to add new tasks to a ForkedScheduler, after the scheduler has been started!
24 For more control you could use one of the following Task classes
25 and use schedule_task or schedule_task_abs:
26 IntervalTask ThreadedIntervalTask ForkedIntervalTask
27 WeekdayTask ThreadedWeekdayTask ForkedWeekdayTask
28 MonthdayTask ThreadedMonthdayTask ForkedMonthdayTask
29
30 Kronos is the Greek God of Time.
31
32 This module is based on Kronos by Irmen de Jong, but has been modified
33 to better fit within TurboGears. Additionally, this module appeared to
34 no longer be supported/in development.
35 """
36
37
38
39
40
41
42
43
44
45 import os, sys
46 import sched, time
47 import traceback
48 import weakref
49
50 from turbogears.util import Enum
51
52 method = Enum("sequential", "forked", "threaded")
53
55 """The Scheduler itself."""
56
58 self.running=True
59 self.sched = sched.scheduler(time.time, self.__delayfunc)
60
62
63
64
65
66 if delay<10:
67 time.sleep(delay)
68 else:
69 toptime = self.sched.queue[0][0]
70 endtime = time.time() + delay
71 period=5
72 stoptime = endtime - period
73 while self.running and stoptime > time.time() and \
74 self.sched.queue[0][0] == toptime:
75 time.sleep(period)
76 if not self.running or self.sched.queue[0][0] != toptime:
77 return
78 now = time.time()
79 if endtime > now:
80 time.sleep(endtime - now)
81
84
85 - def add_interval_task(self, action, taskname, initialdelay, interval, processmethod, args, kw):
86 """Add a new Interval Task to the schedule. A very short initialdelay or one of
87 zero cannot be honored, you will see a slight delay before the task is first
88 executed. This is because the scheduler needs to pick it up in its loop."""
89 if initialdelay<0 or interval<1:
90 raise ValueError("delay or interval must be >0")
91
92 if processmethod==method.sequential:
93 TaskClass=IntervalTask
94 elif processmethod==method.threaded:
95 TaskClass = ThreadedIntervalTask
96 elif processmethod==method.forked:
97 TaskClass = ForkedIntervalTask
98 else:
99 raise ValueError("invalid processmethod")
100 if not args:
101 args=[]
102 if not kw:
103 kw={}
104 task = TaskClass(taskname, interval, action, args, kw)
105 self.schedule_task(task, initialdelay)
106 return task
107
108 - def add_daytime_task(self, action, taskname, weekdays, monthdays, timeonday, processmethod, args, kw):
109 """Add a new Day Task (Weekday or Monthday) to the schedule."""
110 if weekdays and monthdays:
111 raise ValueError("you can only specify weekdays or monthdays, not both")
112 if not args:
113 args=[]
114 if not kw:
115 kw={}
116 if weekdays:
117
118 if processmethod==method.sequential:
119 TaskClass=WeekdayTask
120 elif processmethod==method.threaded:
121 TaskClass = ThreadedWeekdayTask
122 elif processmethod==method.forked:
123 TaskClass = ForkedWeekdayTask
124 else:
125 raise ValueError("invalid processmethod")
126 task=TaskClass(taskname, weekdays, timeonday, action, args, kw)
127 if monthdays:
128
129 if processmethod==method.sequential:
130 TaskClass=MonthdayTask
131 elif processmethod==method.threaded:
132 TaskClass = ThreadedMonthdayTask
133 elif processmethod==method.forked:
134 TaskClass = ForkedMonthdayTask
135 else:
136 raise ValueError("invalid processmethod")
137 task=TaskClass(taskname, monthdays, timeonday, action, args, kw)
138 firsttime=task.get_schedule_time(True)
139 self.schedule_task_abs(task, firsttime)
140 return task
141
143 """Low-level method to add a new task to the scheduler with the given delay (seconds)."""
144 if self.running:
145 self._acquire_lock()
146 try:
147 task.event = self.sched.enter(delay, 0, task,
148 (weakref.ref(self),) )
149 finally:
150 self._release_lock()
151 else:
152 task.event = self.sched.enter(delay, 0, task,
153 (weakref.ref(self),) )
154
156 """Low-level method to add a new task to the scheduler for the given absolute time value."""
157 if self.running:
158 self._acquire_lock()
159 try:
160 task.event = self.sched.enterabs(abstime, 0, task,
161 (weakref.ref(self),) )
162 finally:
163 self._release_lock()
164 else:
165 task.event = self.sched.enterabs(abstime, 0, task,
166 (weakref.ref(self),) )
167
168
170 """Start the scheduler."""
171 self._run()
172
174 """Remove all pending tasks and stop the Scheduler."""
175 self.running=False
176 self.sched.queue[:]=[]
177
179 self.sched.cancel(task.event)
180
182
183 while self.running:
184 try:
185 self.sched.run()
186 except Exception,x:
187 print >>sys.stderr, "ERROR DURING SCHEDULER EXECUTION",x
188 print >>sys.stderr, "".join(traceback.format_exception(*sys.exc_info()))
189 print >>sys.stderr, "-"*20
190
191 if self.running:
192 time.sleep(5)
193
194
196 """Abstract base class of all scheduler tasks"""
197 - def __init__(self, name, action, args, kw):
203
205 """Execute the task action in the scheduler's thread."""
206 try:
207 self.execute()
208 except Exception,x:
209 self.handle_exception(x)
210 self.reschedule(schedulerref())
211
213 """This is an abstract class, this method is defined in one of the sub classes!"""
214 raise NotImplementedError("you're using the abstract base class 'Task', use a concrete class instead")
215
217 """Execute the actual task."""
218 self.action(*self.args, **self.kw)
219
221 """Handle any exception that occured during task execution."""
222 print >>sys.stderr, "ERROR DURING TASK EXECUTION",exc
223 print >>sys.stderr,"".join(traceback.format_exception(*sys.exc_info()))
224 print >>sys.stderr,"-"*20
225
226
228 """A repeated task that occurs at certain intervals (in seconds)."""
229 - def __init__(self, name, interval, action, args=None, kw=None):
232
236
237
238
240 """A mixin class that contains the reschedule logic for the DayTasks."""
242 self.timeonday=timeonday
243
245 """Calculate the time value at which this task is to be scheduled."""
246 now=list(time.localtime())
247 if today:
248
249 if (now[3], now[4]) >= self.timeonday:
250 now[2]+=1
251 else:
252 now[2]+=1
253 now[3], now[4] = self.timeonday
254 now[5]=0
255 return time.mktime(now)
256
264
265
267 """A task that is called at specific days in a week (1-7), at a fixed time on the day."""
268 - def __init__(self, name, weekdays, timeonday, action, args=None, kw=None):
269 if type(timeonday) not in (list,tuple) or len(timeonday) != 2:
270 raise TypeError("timeonday must be a 2-tuple (hour,minute)")
271 if type(weekdays) not in (list,tuple):
272 raise TypeError("weekdays must be a sequence of weekday numbers 1-7 (1 is Monday)")
273 DayTaskRescheduler.__init__(self, timeonday)
274 Task.__init__(self, name, action, args, kw)
275 self.days=weekdays
276
278
279
280 weekday=time.localtime().tm_wday+1
281 if weekday in self.days:
282 self.action(*self.args, **self.kw)
283
285 """A task that is called at specific days in a month (1-31), at a fixed time on the day."""
286 - def __init__(self, name, monthdays, timeonday, action, args=None, kw=None):
287 if type(timeonday) not in (list,tuple) or len(timeonday) != 2:
288 raise TypeError("timeonday must be a 2-tuple (hour,minute)")
289 if type(monthdays) not in (list,tuple):
290 raise TypeError("monthdays must be a sequence of monthdays numbers 1-31")
291 DayTaskRescheduler.__init__(self, timeonday)
292 Task.__init__(self, name, action, args, kw)
293