一个简单的集群任务调度框架
2011-12-20 16:57
417 查看
说到服务器集群后台的任务调度,这可能是很多网站或者计算集中型方案经常使用到的。
本文不讨论map/reduce级别的任务拆分和调度,本文设计的调度框架只满足以下几点特性:
1)轻量级,代码框架及实现原理非常简单,容易部署
2)集群可扩展,理论上集群机器数量,以及每台机器上的执行任务数都可扩展
3)业务单元化,业务定义的下发任务是具体的、可颗粒化的,本框架不辅助做任务或工作流的拆分,只接受最细颗粒化的任务
实现原理:
1)所有计算节点(这里指一个程序实例)均地位平等
2)任务以一个文件的形式存在,计算节点通过共享文件系统去“抢”任务。
3)所有的计算节点均永久存在,不断的扫描任务文件
4)业务系统下发任务,即直接生成一个文件
我们将计算节点定义为worker,那么worker的主逻辑如下
While(true){
If(find(以前未完成的任务文件)||find(任务文件)){
将该文件增加扩展名+本机ip.实例号
处理任务
将任务文件迁移到finish目录
}
}
以下为python的实现,供参考。
用到的filelocker跨平台文件锁
workerbase使用样例
本文不讨论map/reduce级别的任务拆分和调度,本文设计的调度框架只满足以下几点特性:
1)轻量级,代码框架及实现原理非常简单,容易部署
2)集群可扩展,理论上集群机器数量,以及每台机器上的执行任务数都可扩展
3)业务单元化,业务定义的下发任务是具体的、可颗粒化的,本框架不辅助做任务或工作流的拆分,只接受最细颗粒化的任务
实现原理:
1)所有计算节点(这里指一个程序实例)均地位平等
2)任务以一个文件的形式存在,计算节点通过共享文件系统去“抢”任务。
3)所有的计算节点均永久存在,不断的扫描任务文件
4)业务系统下发任务,即直接生成一个文件
我们将计算节点定义为worker,那么worker的主逻辑如下
While(true){
If(find(以前未完成的任务文件)||find(任务文件)){
将该文件增加扩展名+本机ip.实例号
处理任务
将任务文件迁移到finish目录
}
}
以下为python的实现,供参考。
#encoding=utf8 ''' Created on 2011-9-24 @author: chenggong worker基类 ''' import time import os import re from optparse import OptionParser import filelocker class WorkerBase(object): def __init__(self): self.patten = ".*" self.taskexname = ".txt" def set_task_patten(self,patten): self.patten = patten def set_task_exname(self,exname): self.taskexname = "." + exname.replace(".","") def dowork(self,filename,content): pass def tasklogic(self,filepath): with open(filepath,"r") as filehandle: filelocker.lock(filehandle,filelocker.LOCK_NB) #try lock the task try: self.log("normal",0,"开始执行任务%s"%filepath) fsname = os.path.basename(filepath).split(".")[0] success = self.dowork(fsname,filehandle.read()) except Exception,e: self.log("warning",0,"派生类未捕获异常%s"%str(e)) filelocker.unlock(filehandle) while True: try: if success: self.log("normal",0,"任务%s结束,完成成功"%filepath) finishfile = filepath.split(".")[0]+".finish" if os.path.exists(finishfile): self.log("warning",0,"该任务.finish文件已存在,进行覆盖") os.remove(finishfile) os.rename(filepath,finishfile) else: self.log("normal",0,"任务%s结束,完成失败"%filepath) errorfile = filepath.split(".")[0]+".error" if os.path.exists(errorfile): self.log("warning",0,"该任务.erorr文件已存在,进行覆盖") os.remove(errorfile) os.rename(filepath,errorfile) break except Exception,e: self.log("error",0,"任务执行完毕后改名失败,文件系统异常或任务文件已被损坏!except=%s"%str(e)) time.sleep(5) def start(self): #params taskDir = self.options.dir uuid = self.options.uuid ip = self.options.ip #main loop while True: try: for f in os.listdir(taskDir): filepath = os.path.join(taskDir,f) taskname = os.path.basename(filepath).split(".")[0] try: if(not re.match(self.patten,taskname)): continue except: self.log("fetal",0,"patten=%s,正则表达式格式匹配失败"%self.patten) return fex = os.path.splitext(f)[1] if fex == "."+uuid: #my task self.log("normal",0,"找到未完成任务%s"%str(f)) try: self.tasklogic(filepath) except: self.log("warning",0,"尝试锁定该任务失败,该任务可能已被锁定,uuid=%s可能被多次启用!"%uuid) continue elif fex == self.taskexname: #new task try: os.rename(filepath,"%s.%s.%s"%(filepath,ip,uuid)) self.tasklogic("%s.%s.%s"%(filepath,ip,uuid)) except: self.log("warning",0,"任务文件%s锁定失败,或已被占有"%filepath) continue except: self.log("error",0,"连接任务文件夹%s失败,可能网络已断开.."%taskDir) time.sleep(30) def log(self,level,typeid,msg): logdir = self.options.log if(not os.path.exists(logdir)): os.mkdir(logdir) filename = time.strftime('%Y-%m-%d',time.localtime(time.time()))+".log" t = time.strftime('%H:%M:%S',time.localtime(time.time())) filepath = os.path.join(logdir,filename) with open(filepath,"a") as f: filelocker.lock(f,filelocker.LOCK_EX) #block lock logmsg = "[%8s][%s][%s][%d]%s"%(t,self.options.uuid,level,typeid,msg) f.write(logmsg+"\n") filelocker.unlock(f) print logmsg.decode("utf8").encode("gbk") def set_options(self,options): parser = OptionParser() for opt in options: parser.add_option(opt['option'], dest=opt['value']) ##公用 parser.add_option("-d", dest="dir") parser.add_option("-i", dest="ip") parser.add_option("-u", dest="uuid") parser.add_option("-l", dest="log") (self.options, argvs) = parser.parse_args()
用到的filelocker跨平台文件锁
#encoding=utf8 # portalocker.py - Cross-platform (posix/nt) API for flock-style file locking. # Requires python 1.5.2 or better. """Cross-platform (posix/nt) API for flock-style file locking. Synopsis: import portalocker file = open("somefile", "r+") portalocker.lock(file, portalocker.LOCK_EX) file.seek(12) file.write("foo") file.close() If you know what you're doing, you may choose to portalocker.unlock(file) before closing the file, but why? Methods: lock( file, flags ) unlock( file ) Constants: LOCK_EX LOCK_SH LOCK_NB Exceptions: LockException Notes: For the 'nt' platform, this module requires the Python Extensions for Windows. Be aware that this may not work as expected on Windows 95/98/ME. History: I learned the win32 technique for locking files from sample code provided by John Nielsen <nielsenjf@my-deja.com> in the documentation that accompanies the win32 modules. Author: Jonathan Feinberg <jdf@pobox.com>, Lowell Alleman <lalleman@mfps.com> Version: $Id: portalocker.py 5474 2008-05-16 20:53:50Z lowell $ """ __all__ = [ "lock", "unlock", "LOCK_EX", "LOCK_SH", "LOCK_NB", "LockException", ] import os class LockException(Exception): # Error codes: LOCK_FAILED = 1 if os.name == 'nt': import win32con import win32file import pywintypes LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK LOCK_SH = 0 # the default LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY # is there any reason not to reuse the following structure? __overlapped = pywintypes.OVERLAPPED() elif os.name == 'posix': import fcntl LOCK_EX = fcntl.LOCK_EX LOCK_SH = fcntl.LOCK_SH LOCK_NB = fcntl.LOCK_NB else: raise RuntimeError, "PortaLocker only defined for nt and posix platforms" if os.name == 'nt': def lock(file, flags): hfile = win32file._get_osfhandle(file.fileno()) try: win32file.LockFileEx(hfile, flags, 0, -0x10000, __overlapped) except pywintypes.error, exc_value: # error: (33, 'LockFileEx', 'The process cannot access the file because another process has locked a portion of the file.') if exc_value[0] == 33: raise LockException(LockException.LOCK_FAILED, exc_value[2]) else: # Q: Are there exceptions/codes we should be dealing with here? raise def unlock(file): hfile = win32file._get_osfhandle(file.fileno()) try: win32file.UnlockFileEx(hfile, 0, -0x10000, __overlapped) except pywintypes.error, exc_value: if exc_value[0] == 158: # error: (158, 'UnlockFileEx', 'The segment is already unlocked.') # To match the 'posix' implementation, silently ignore this error pass else: # Q: Are there exceptions/codes we should be dealing with here? raise elif os.name == 'posix': def lock(file, flags): try: fcntl.flock(file.fileno(), flags) except IOError, exc_value: # IOError: [Errno 11] Resource temporarily unavailable if exc_value[0] == 11: raise LockException(LockException.LOCK_FAILED, exc_value[1]) else: raise def unlock(file): fcntl.flock(file.fileno(), fcntl.LOCK_UN) if __name__ == '__main__': from time import time, strftime, localtime import sys log = open('\\\\10.1.10.254\\storage\\log.txt', "a+") lock(log, LOCK_EX) timestamp = strftime("%m/%d/%Y %H:%M:%S\n", localtime(time())) log.write( timestamp ) print "Wrote lines. Hit enter to release lock." dummy = sys.stdin.readline() log.close()
workerbase使用样例
#encoding=utf8 ''' Created on 2011-9-24 @author: chenggong worker例程 ''' from workerbase import WorkerBase import time #派生WorkBase class SampleWorker(WorkerBase): #实现dowork方法 # filepath :任务文件名, # filehandle:任务文件内容 def dowork(self,filepath,content): print "dowork file=%s content=%s"%(filepath,content) print "doing..." #由self.options.xxxx可以获取自己设置的参数 print "myparam=%s %s"%(self.options.test1,self.options.test2) time.sleep(2) #日志提交方法 self.log("debug",0,"可以这样提交日志") #成功则返回True,失败返回False return False ''' 基本命令行参数,调用至少要有以下几个参数 -d 任务文件夹 -l 日志输出文件夹 -i 本机IP -u uuid ''' if __name__ == "__main__": #实例化SampleWorker sampleworker = SampleWorker() #设置自己的任务文件匹配方式,若不设置,则默认为全匹配 #如下,则匹配 xxx-xxx-cut 所有文件 sampleworker.set_task_patten(".*-.*-cut") #设置任务文件扩展名,若不设置,则默认为txt sampleworker.set_task_exname("txt") #设置自己的参数 sampleworker.set_options([{"option":"-a","value":"test1"},{"option":"-b","value":"test2"}]) #开始主循环 sampleworker.start()
相关文章推荐
- 企业级任务调度框架Quartz 三 一个简单的Quartz 例子
- quartz简单demo(一个简单易用的任务调度开源框架)
- 企业级任务调度框架Quartz(3) 一个简单的Quartz 例子
- 调度系统任务创建---创建一个简单调度任务(二)
- 【niubi-job——一个开源的分布式任务调度框架】-----安装教程
- 【niubi-job——一个分布式的任务调度框架】----如何开发一个niubi-job的定时任务
- 使用azkaban 建立一个简单的任务调度系统
- 【niubi-job——一个分布式的任务调度框架】----框架设计原理以及实现
- 任务调度框架Quartz(一) Quartz——一个强大的定时任务调度框架
- 【niubi-job——一个分布式的任务调度框架】----如何开发一个niubi-job的定时任务
- 【niubi-job——一个开源的分布式任务调度框架】-----安装教程
- 任务调度quartz(二)一个简单的调度平台的实现
- niubi-job:一个分布式的任务调度框架设计原理以及实现
- 任务调度框架Quartz(一) Quartz——一个强大的定时任务调度框架
- 【niubi-job——一个分布式的任务调度框架】----FAQ文档
- quartz任务调度框架的简单使用
- 【niubi-job——一个分布式的任务调度框架】----niubi-job这下更牛逼了!
- 【niubi-job——一个分布式的任务调度框架】----niubi-job这下更牛逼了!
- 【niubi-job——一个分布式的任务调度框架】----FAQ文档
- 【niubi-job——一个分布式的任务调度框架】----框架设计原理以及实现