您的位置:首页 > 编程语言 > Go语言

Go-----------Chann 任务队列

2016-04-10 00:00 495 查看
摘要: Go语言实现的基于Chan的任务队列

package main
import (
"fmt"
"log"
"os"
"sync"
"time"
)

//日志
var gLog *log.Logger = nil

//任务元素
type GoThreadPoolData struct {
data string
}

//任务处理函数
func (self *GoThreadPoolData) DoWork() {
//fmt.Println("do work ", self.data)
}

//信号元素
type GoThreadPoolSignal struct {
data int
}

//任务队列类
type GoThreadPool struct {
QueueData                chan GoThreadPoolData   //任务队列
QueueSignal              chan GoThreadPoolSignal //信号队列
ThreadCount              int                     //处理任务队列线程数量
QueueSize                int                     //任务队列缓存大小
mutexWaitToDealDataCount sync.Mutex              //任务计数器锁
WaitToDealDataCount      int                     //待处理的任务数量
pfWorkCallBack           func()                  //扩展用的函数指针
}

//任务队列类----成员函数  Init
func (self *GoThreadPool) Init(BufferSize int, ThreadCount int, pfWorkCallBack func()) {
self.QueueData = make(chan GoThreadPoolData, BufferSize)
self.QueueSignal = make(chan GoThreadPoolSignal)
self.ThreadCount = ThreadCount
self.QueueSize = BufferSize
self.pfWorkCallBack = pfWorkCallBack
for i := 0; i < ThreadCount; i++ {
go func() {
for {
select {
case task, ok := <-self.QueueData:
if ok {
task.DoWork()
}

self.mutexWaitToDealDataCount.Lock()
&nbs
3ff0
p;            self.WaitToDealDataCount--
//fmt.Println("wait to deal", self.WaitToDealDataCount)
gLog.Println("wait to deal", self.WaitToDealDataCount)
self.mutexWaitToDealDataCount.Unlock()

case task, ok := <-self.QueueSignal:
if ok {
fmt.Println("recieve end signal", task)
gLog.Println("recieve end signal", task)
break
}
}
}
fmt.Println("thread exit")
}()
}
}

//任务队列类----成员函数  UnInit
func (self *GoThreadPool) UnInit() {
signal := GoThreadPoolSignal{1}
for i := 0; i < self.ThreadCount; i++ {
self.QueueSignal <- signal
fmt.Println("add signal", i, signal.data, self.ThreadCount)
}
}

//任务队列类----成员函数  IsFull 判断队列是否已满
func (self *GoThreadPool) IsFull() bool {

self.mutexWaitToDealDataCount.Lock()
count := self.WaitToDealDataCount
self.mutexWaitToDealDataCount.Unlock()
if count >= self.QueueSize {
return true
}

return false
}

//任务队列类----成员函数  AddJob 添加任务元素
func (self *GoThreadPool) AddJob(data *GoThreadPoolData) {

self.mutexWaitToDealDataCount.Lock()
self.WaitToDealDataCount++
self.mutexWaitToDealDataCount.Unlock()

if !self.IsFull() {
//fmt.Println("add job begin", data.data)
self.QueueData <- *data
fmt.Println("add job ", data.data, self.WaitToDealDataCount)
} else {
fmt.Println(" job is full ", data.data, self.WaitToDealDataCount)
}
}
func WorkThreadFuncTest() {

}

func main() {
logfile, err := os.OpenFile("test.log", os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("%s\r\n", err.Error())
os.Exit(-1)
}
defer logfile.Close()
gLog = log.New(logfile, "\r\n", log.Ldate|log.Ltime|log.Llongfile)
gLog.Println("hello")
gLog.Println("oh....")

pool := new(GoThreadPool)
pool.Init(1000, 8, WorkThreadFuncTest)
go func() {
for i := 0; i < 100000; i++ {
data := GoThreadPoolData{"GoThreadPoolData"}
pool.AddJob(&data)

fmt.Println("GoThreadPoolData", i)
gLog.Println("GoThreadPoolData", i)
}
go pool.UnInit()
}()
time.Sleep(10000000000000)

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  go 任务队列