您的位置:首页 > 编程语言 > Python开发

python基础-管道通信(进程)、线程Condition使用

2017-12-07 17:57 399 查看
管道
管道的概念

管道通信示例进程

线程之定时器
延迟执行

立即执行

阻塞

线程Condition

管道

管道的概念

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

#主要方法:
conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

#其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设成None,操作将无限期地等待数据到达。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果接收的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。

conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收

conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。


管道通信示例(进程)

我们来看一个例子:

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
left,right=p
left.close()
while True:
try:

baozi=right.recv()
print('%s 收到包子:%s' %(name,baozi))
except EOFError:
right.close()
print("EOFError")
break
def producer(seq,p):
left,right=p
right.close()
for i in seq:
left.send(i)
else:
print("produce close")
left.close()
if __name__ == '__main__':
#在进程前
left,right=Pipe()

c1=Process(target=consumer,args=((left,right),'c1'))
c1.start()

seq=(i for i in range(10))
producer(seq,(left,right))

c1.join()
print('主进程')


输出如下:

E:\python\python_sdk\python.exe E:/python/py_pro/python.py
produce close
c1 收到包子:0
c1 收到包子:1
c1 收到包子:2
c1 收到包子:3
c1 收到包子:4
c1 收到包子:5
c1 收到包子:6
c1 收到包子:7
c1 收到包子:8
c1 收到包子:9
EOFError
主进程

Process finished with exit code 0


线程之定时器

定时器,指定n秒后执行某操作

延迟执行

from threading import Timer

def hello():
print("hello, world")

if __name__ == "__main__":
t = Timer(1, hello)
t.start()
print("main")


输出如下:

main
hello, world


立即执行

from threading import Timer

def hello():
print("hello, world")

if __name__ == "__main__":
t = Timer(0, hello)
t.start()
print("main")


输出如下:

E:\python\python_sdk\python.exe E:/python/py_pro/python.py
hello, world
main

Process finished with exit code 0


阻塞

from threading import Timer

def hello():
print("hello, world")

if __name__ == "__main__":
t = Timer(1, hello)
t.start()
t.join()
print("main")


输出如下:

E:\python\python_sdk\python.exe E:/python/py_pro/python.py
hello, world
main

Process finished with exit code 0


线程Condition

使用Condition对象可以在某些事件触发或者达到特定的条件后才处理数据,Condition除了具有Lock对象的acquire方法和release方法外,还有wait方法、notify方法、notifyAll方法等用于条件处理。

threading.Condition([lock]):创建一个condition,支持从外界引用一个Lock对象(适用于多个condtion共用一个Lock的情况),默认是创建一个新的Lock对象。

acquire()/release():获得/释放 Lock

wait([timeout]):线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。调用wait()会释放Lock,直至该线程被Notify()、NotifyAll()或者超时线程又重新获得Lock.

notify(n=1):通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。

notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程(这个一般用得少)

import threading

def run(n):
con.acquire()
con.wait()
print("run the thread: %s" % n)
con.release()

if __name__ == '__main__':

con = threading.Condition()
for i in range(5):
t = threading.Thread(target=run, args=(i,))
t.start()

while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()


输出如下:

E:\python\python_sdk\python.exe E:/python/py_pro/python.py
>>>7
>>>run the thread: 0
run the thread: 1
run the thread: 3
run the thread: 4
run the thread: 2
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: