您的位置:首页 > 编程语言 > Python开发

python多线程限制并发数示例

2018-02-06 18:12 281 查看
#coding: utf-8
#!/usr/bin/env python
import Queue
import threading
import time

prolock = threading.Lock()

# 定义同时队列数
queue = Queue.Queue(maxsize=10)

# 定义任务初值值及最大值
taskidx = 0
maxidx = 100

# 生成任务列表
def taskList():
task = []
for i in range(100):
task.append("task" + str(i))
return task

# 把任务放入队列中
class Producer(threading.Thread):
def __init__(self, name, queue):
self.__name = name
self.__queue = queue
super(Producer, self).__init__()

def run(self):
while True:
global taskidx, prolock, maxidx
time.sleep(4)
prolock.acquire()
print 'Producer name: %s' % (self.__name)
if maxidx == taskidx:
prolock.release()
break
ips = taskList()
ip = ips[taskidx]
self.__queue.put(ip)
taskidx = taskidx + 1
prolock.release()

# 线程处理任务
class Consumer(threading.Thread):
def __init__(self, name, queue):
self.__name = name
self.__queue = queue
super(Consumer, self).__init__()

def run(self):
while True:
ip = self.__queue.get()
print 'Consumer name: %s' % (self.__name)
consumer_process(ip)
self.__queue.task_done()

def consumer_process(ip):
time.sleep(1)
print ip

def startProducer(thread_num):
t_produce = []
for i in range(thread_num):
p = Producer("producer"+str(i), queue)
p.setDaemon(True)
p.start()
t_produce.append(p)
return t_produce

def startConsumer(thread_num):
t_consumer = []
for i in range(thread_num):
c = Consumer("Consumer"+str(i), queue)
c.setDaemon(True)
c.start()
t_consumer.append(c)
return t_consumer

def main():
t_produce = startProducer(3)
t_consumer = startConsumer(5)

# 确保所有的任务都生成
for p in t_produce:
p.join()

# 等待处理完所有任务
queue.join()

if __name__ == '__main__':
main()
print '------end-------'


一般生成任务都会比较快,可以使用单线程来生成任务,示例如下:

#coding: utf-8
#!/usr/bin/env python
import Queue
import threading
import time

# 定义同时处理任务数
queue = Queue.Queue(maxsize=3)

# 生成任务列表
def taskList():
task = []
for i in range(100):
task.append("task" + str(i))
return task

# 把任务放入队列中
class Producer(threading.Thread):
def __init__(self, name, queue):
self.__name = name
self.__queue = queue
super(Producer, self).__init__()

def run(self):
for ip in taskList():
self.__queue.put(ip)

# 线程处理任务
class Consumer(threading.Thread):
def __init__(self, name, queue):
self.__name = name
self.__queue = queue
super(Consumer, self).__init__()

def run(self):
while True:
ip = self.__queue.get()
print 'Consumer name: %s' % (self.__name)
consumer_process(ip)
self.__queue.task_done()

def consumer_process(ip):
time.sleep(1)
print ip

def startConsumer(thread_num):
t_consumer = []
for i in range(thread_num):
c = Consumer(i, queue)
c.setDaemon(True)
c.start()
t_consumer.append(c)
return t_consumer

def main():
p = Producer("Producer task0", queue)
p.setDaemon(True)
p.start()
startConsumer(9)

# 确保所有的任务都生成
p.join()

# 等待处理完所有任务
queue.join()

if __name__ == '__main__':
main()
print '------end-------'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: