您的位置:首页 > 编程语言 > Python开发

Python 3.X 实现定时器 Timer,制作抽象的Timer定时器基类

2016-12-16 22:57 861 查看
Python 在不依赖第三方库的前提下,对于定时器的实现并不是很完美,但是这不意味着我们无法实现。

阅读了网上的一些资料,得出一些结论,顺手写了一个基类的定时器(Python3)

BaseTimer:

1 # -*- coding: utf-8 -*-
2
3
4 from abc import ABCMeta, abstractmethod
5 import time
6 import threading
7
8 class BaseTimer(threading.Thread):
9     """
10     基础定时器抽象类,可以随意继承本类并且实现exec抽象方法即可定时产生任务
11     """
12     __metaclass__ = ABCMeta
13     def __init__(self,howtime=1.0,enduring=True):
14         """
15         howtime 每次定时的时间
16         enduring 是否是一个持久性的任务,用这个可以控制开关
17         """
18
19         self.enduring = enduring
20         self.howtime = howtime
21         threading.Thread.__init__(self)
22
23     def run(self):
24         time.sleep(self.howtime)  #至少执行一次 任务
25         self.exec()
26         while self.enduring:    #是否持久,或者是否进行运行
27             time.sleep(self.howtime)
28             self.exec()    #每次开始执行任务
29
30     @abstractmethod
31     def exec(self):
32         """抽象方法,子类实现"""
33         pass
34
35     def destroy(self):
36         """销毁自身"""
37         self.enduring = False
38         del self
39
40     def stop(self):
41         self.enduring = False
42
43     def restart(self):
44         self.enduring = True
45
46     def get_status(self):
47         return self.enduring
48


如何使用?

我们来建立一个新的任务,这个任务是过一段时间就输出:

1 class TimerMask(BaseTimer):
2     """定时任务类,不同的业务逻辑"""
3     def __init__(self,howtime=1.0,enduring=True):
4         BaseTimer.__init__(self,howtime,enduring)
5         self.ws=0
6
7     def exec(self):
8         print("HelloWorld!")
9         self.ws = self.ws + 1  #这里是过0.8秒输出一次
10         if self.ws >5:
11             self.destroy()


加入如下:

1 if __name__ == "__main__":
2     Q_w = 0
3     w = TimerMask(howtime=0.8)
4     print("-")
5     w.start()
6     #这里线程输出这些,做其他事情的
7     while True:
8         time.sleep(0.4) #0.4秒
9         print("- ",Q_w," - WMask:",w)
10         Q_w += 1
11         pass


输出:

1 '''
2 使用sched模块实现的timer,sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法
3 Created on 2013-7-31
4
5 @author: Eric
6 '''
7 import time, sched
8
9 #被调度触发的函数
10 def event_func(msg):
11     print("Current Time:", time.strftime("%y-%m-%d %H:%M:%S"), 'msg:', msg)
12
13 def run_function():
14     #初始化sched模块的scheduler类
15     s = sched.scheduler(time.time, time.sleep)
16     #设置一个调度,因为time.sleep()的时间是一秒,所以timer的间隔时间就是sleep的时间,加上enter的第一个参数
17     s.enter(0, 2, event_func, ("Timer event.",))
18     s.run()
19
20 def timer1():
21     while True:
22         #sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,可以使用while循环的方式不停的调用该方法
23         time.sleep(1)
24         run_function()
25
26 if __name__ == "__main__":
27     timer1()


View Code

感谢耐心阅读,希望对你有帮助。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: