您的位置:首页 > 编程语言

并发编程(进程、线程)

2017-05-08 15:37 218 查看

1、空间多路复用是什么?

考虑一个场景:浏览器,网易云音乐以及notepad++ 三个软件只能顺序执行是怎样一种场景呢?

另外,假如有两个程序A和B,程序A在执行到一半的过程中,需要读取大量的数据输入(I/O操作),而此时CPU只能静静地等待任务A读取完数据才能继续执行,这样就白白浪费了CPU资源。你是不是已经想到在程序A读取数据的过程中,让程序B去执行,当程序A读取完数据之后,让程序B暂停。聪明,这当然没问题,但这里有一个关键词:切换;

补充:

主线程 ----------》开启子线程后-------------他们会在同时一起工作,这就是并发,也可说实现了异步;

应用:例如有一个Django WEB API 的视图函数, 接收到request后,需要操作数据库等等,如果response中无需数据库数据,就可以把这些数据库操作,做成线程,先return response给用户;

 

GIL:Global interpreter lock (全局解释器锁):

是Python创始人 Guido van Rossum(吉多·范罗苏姆)龟叔,在CPython解释器层面设计的一把线程锁(只针对CPython),规定同一个时刻只能有一个 线程去使用 1个CPU;

缺陷:Python 无法实现并行(多个同时在不同CPU上执行)和多核时代无缘了;

优点:武断得解决了 多线程之间 争强资源问题

 

 

2、进程:

2.0 进程的概念

进程就是一个程序在一个数据集上的一次动态执行过程(正在运行的软件就是进程)。进程一般由程序、数据集、进程控制块三部分组成;

程序:执行的内容(要做什么)

数据集:执行过程中用到的数据

进程控制块:操作系统切换该进程时保存的状态  

操作系统切换进程的规则:进程出现IO操作、和固定时间

 

举栗子说明:

举个栗子:
想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他
离开时的那一步继续做下去。

CPU就是这个科学家

食谱就是程序

面粉、鸡蛋、糖、香草汁等就是数据集

被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态)就是进程控制块;

 

2.1 进程的状态

运行:程序正在被CPU执行

休眠:程序执行了sleep函数,或者遇到网络IO等待远程服务端response

等待:程序当前没有被CPU执行

 

ps:

僵尸进程:父进程----》fork1个子进程, 子进程退出了,但是父进程是个死循环/ 没有调用wait()方法对子进程进行回收,此时子进程沦为僵尸进程;

解决:僵尸进程的父进程 kill -9掉,使其沦为孤儿进程,被init进程回收。

孤儿进程:父进程退出,父进程fork的子进程全部变成孤儿进程,最后由init进程回收;

 

2.2 创建、回收子进程

os.fork()和os.wait()

介绍:

A. os.fork()是调用Linux系统的fork()创建1个进程,该函数被调用1次 return 2次,返回0代表子进程执行/返回其他代表父进程执

B.getpid()来获取自己的pid;也可以调用getppid()来获取父进程的id;os.wait() 用于父进程 等待子进程结束,对其回收!

C.fork之后,操作系统会复制一个与父进程完全相同的子进程,
虽说是父子关系,但是在操作系统看来,他们更像兄弟关系,这2个进程共享代码空间,但是数据空间是互相独立的,
子进程数据空间中的内容是父进程的完整拷贝,指令指针也完全相同。
子进程从 父进程fork子进程的代码位置开始执行!(fork嘛! 分叉!)
# !/usr/bin/env python
# -*- coding:utf-8 -*-

import os,time
print ('程序开始 (%s) start...' %os.getpid())

pid = os.fork()

if pid==0:                                                  #0代表是子进程执行
print ('我是子进程1!我的进程ID是%s'%(os.getpid()))
else:                                                         #其他代表父进程执行
print ('我是父进程!我的进程ID是%s' %(os.getpid()))
os.wait()
pid1=os.fork()                                           #再次创建子进程
if pid1==0:
print('我是子进程2!我的进程ID是%s' % (os.getpid()))
else:
print('我是父进程!我的进程ID是%s' % (os.getpid()))
os.wait()                                            #等待函数:用于父进程 等待子进程结束,对其回收!

 

 

exex协议簇:

execl(file, arg0,arg1,...) 用参数列表arg0, arg1 等等执行文件

execv(file, arglist) 除了使用参数向量列表,其他的和execl()相同

execle(file, arg0,arg1,... env) 和execl 相同,但提供了环境变量字典env

execve(file,arglist, env) 除了带有参数向量列表,其他的和execle()相同

execlp(cmd, arg0,arg1,...) 于execl()相同,但是在用户的搜索路径下搜索完全的文件路径名

execvp(cmd, arglist) 除了带有参数向量列表,与execlp()相同

execlpe(cmd, arg0, arg1,... env) 和execlp 相同,但提供了环境变量字典env

execvpe(cmd,arglist, env) 和execvp 相同,但提供了环境变量字典env



import os print 'start' os.execl('./b.py','') ''' 在fork后的子进程中使用exec函数族,可以装入和运行其它程序(子进程替换原有进程,和>父进程做不同的事)。 ''' print('end') #---------------------------------------------------------- b.py文件 print('1111111111111')

 

 

2.3 进程间通信方式(IPC Inter-Process Communication)

 

在同一台计算机中的进程相互通信的方式主要有:管道(pipe)、信号(Signal)、信号量(Semaphore)、消息队列(Message)、共享内存(shared memory),其中信号量、消息队列、共享内存被称为IPC机制

不同机器之间的进程通讯可以使用套接字技术。SOCKET

 

方式1:管道

无名管道无法解决无父子关系的进程间通信

a.无名管道

import threading,time
def threading1():
time.sleep(5)
print('子进程结束!')

t1=threading.Thread(target=threading1)
print('主线程结束')
主线程退出,子线程未执行完毕;

 

print("老子是主线程")    #1、主线程先执行
def tingge():   #主线程
print("听歌")
time.sleep(6)
print("听歌结束!")

def xieboke():
print("写博客")
time.sleep(5)
print("写博客结束")

t1=threading.Thread(target=tingge) #2、主线程创建 子线程1
t2=threading.Thread(target=xieboke)#3、主线程创建 子线程2
t1.start()   # 线程1启动和主线程一起执行
t1.join()    #等线程1执行完了之后,再往下执行,启动线程2
t2.start()   #线程2启动和主线程一起执行
t2.join()    #等线程2执行完了之后,再往下执行,print("结束")
print("结束")

 

3、setDaemon(True):将线程声明为守护线程,必须在start() 方法调用之前设置,如果线程被置为守护线程,将会和主线程(主线程执行完了,最后退出)一起结束;

守护线程:进程中所有非守护线程结束时,守护线程随之结束。

 

import threading
from time import ctime,sleep
import time

def Music(name):

print ("Begin listening 我是T1线程to {name}. {time}".format(name=name,time=ctime()))
sleep(3)
print("end listening我是T1线程后半部分 {time}".format(time=ctime()))

def Blog(title):

print ("Begin recording我是T2线程 the {title}. {time}".format(title=title,time=ctime()))
sleep(3)
print('end recording 我是T2线程后半部分{time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

#t2.setDaemon(True)
t1.setDaemon(True)#如果你设置一个线程为守护线程,就表示你在说这个线程会在 其他非守护线程执行完之后自动退出,。(应用于监听、日志打印服务)
for t in threads:

t.start()

# t1.join() #先执行主线程 T1线程执行完了,执行
# t2.join()    #  考虑这三种join位置下的结果?

print ("all over %s" %ctime())

 

执行结果;

Begin listening 我是T1线程to FILL ME. Mon May  8 20:08:25 2017
Begin recording我是T2线程 the . Mon May  8 20:08:25 2017
all over Mon May  8 20:08:25 2017
end listening我是T1线程后半部分 Mon May  8 20:08:28 2017

 

 

4、threading其他方法

Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

-------------------------------------------------------------------------------------------------------------

 

5、线程资源争用现象 

 1、开100个线程对全局变量sum进行-1操作(正常执行)

import time
import threading

def subNum():
global num     #1、每个线程中都获取 全局变量 num=100
         #2、每个遇到IO操作,切换 到下一个线程,循环上次拿到num=100
num -=1         #3、每个线程睡醒之后。。。 对全局变量num-1(100-1)线程结束
num = 100
thread_list = []

for i in range(100): #开启100个线程
t = threading.Thread(target=subNum)
t.start()
thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
t.join()

print('Result: ', num)

执行结果:

  Result:  0

 

各个线程争用 进程数据集现象

import time
import threading

def addNum():
global num           #每个线程中都获取这个全局变量 num=100
temp=num             #线程创建temp变量
time.sleep(0.01)     #线程遇到IO操作,切换下一个线程一直到第100个线程, (切换前保存当前状态 temp=100,)
num =temp-1          #线程睡醒拿到 之前保存的状态 变量temp=100   进行temp-1操作 100-1=99  线程执行结束;
print(num)
num = 100  #设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
t.join()

print('Result: ',num)

Result:  99

 

线程间公共资源争用效果:

 

 

 

 

 

6、同步锁(互斥锁)

由于一个进程里出现多个线程,线程切换时会导致资源争用,所以要有一把互斥锁,在某1线程,拿到公共数据集时把数据锁住,其他线程等待锁释放之后,在使用资源;

def addNum():
mutexA = threading.Lock()
global num
mutexA.acquire() # 对 取得公共资源加锁
temp=num
time.sleep(0.1)
num =temp-1  #
mutexA.release()  #释放锁
num = 100

thread_list = []

for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
t.join()

print('Result: ', num)

执行结果:

99(恢复正常)

 

7、递归锁

由于多线程使用 互斥锁,都导致死锁,所以在封装了互斥锁的基础上,出现了递归(内部有count计算器,如果有线程拿到了递归锁,其他线程只能等待)

死锁现象

class Mythread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()  #thread-1 首先取得同步锁 A
print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
mutexB.acquire() #thread 1 又取得同步锁 B
print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
mutexB.release()  #hread-1 又释放同步锁B
mutexA.release()  #thread1 又释放同步锁A

def func2(self):
mutexB.acquire()  #由于 其他线程都在 GIL锁中,在他们竞争的期间, hread-1 又取得同步锁
print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
time.sleep(0.2)    #由于thread1 出现IO操作,所以GIL锁中 其他线程 thread2竞争出来

mutexA.acquire()  #thread2得到同步锁,由于hread-1 还没有释放同步锁,导致死锁
print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
mutexA.release()
mutexB.release()

if __name__ == '__main__':
print("开始")

for i in range(0,10):
myt_hread=Mythread()
myt_hread.start()

 

递归锁出现

在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

mutex = threading.RLock()           #创建一把递归锁
class Mythread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
self.func1()
self.func2()
def func1(self):
mutex.acquire()  #hread-1 首先取得同步锁
print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
mutex.acquire()
print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
mutex.release()
mutex.release()

def func2(self):
mutex.acquire()
print("I am %s , get res: %s---%s" % (self.name, "ResB", time.time()))
time.sleep(0.2)

mutex.acquire()
print("I am %s , get res: %s---%s" % (self.name, "ResA", time.time()))
mutex.release()
mutex.release()

if __name__ == '__main__':
print("开始")

for i in range(0,10):
myt_hread=Mythread()
myt_hread.start()

  

 

 

 

 

8、线程的Event对象

线程的一个关键特性是每个线程都是独立运行且状态不可预测,为了解决这些问题,我们需要使用threading库中的Event对象,让线程之间可以相互影响和控制相互通信;

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
logging.debug('Waiting for redis ready...')
event.wait()                     #线程遇到 envent.wait()等待。。。
logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
time.sleep(1)

def main():
readis_ready = threading.Event() #创建event对象
t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
t1.start()

t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
t2.start()

logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
time.sleep(3) # simulate the check progress
readis_ready.set()            #唤醒 envent.wait的线程

if __name__=="__main__":
main()

执行结果(类似三个人一起去野外,两人站在原地不动,一个人出前面探路,然后唤醒后面的两人,再走)

(t1        ) Waiting for redis ready...   #线程1执行,遇到event.wait()等待 唤醒
(t2        ) Waiting for redis ready...   #线程2执行,遇到event.wair()等待 唤醒
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event #主线程唤醒 线程1 和线程2 开始支持
(t1        ) redis ready, and connect to redis server and do some work [Tue May  9 19:03:19 2017]
(t2        ) redis ready, and connect to redis server and do some work [Tue May  9 19:03:19 2017]

 

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

9、Semaphore(线程连接池)

Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

import threading
import time

semaphore = threading.Semaphore(5)  #创建一个连接池对象,限制这个池子里面只能出现 5个线程

def func():
if semaphore.acquire():  #
print (threading.currentThread().getName() + ' get semaphore')
time.sleep(2)
semaphore.release()

for i in range(20):
t1 = threading.Thread(target=func)  #
t1.start()

执行结果
hread-1 get semaphore  #每5个线程为一组执行
Thread-2 get semaphore
Thread-3 get semaphore
Thread-4 get semaphore
Thread-5 get semaphore

Thread-6 get semaphore
Thread-8 get semaphore
Thread-9 get semaphore
Thread-10 get semaphore

Thread-7 get semaphore
Thread-11 get semaphore
Thread-13 get semaphore
Thread-14 get semaphore
Thread-12 get semaphore
Thread-15 get semaphore

Thread-17 get semaphore
Thread-16 get semaphore
Thread-19 get semaphore
Thread-20 get semaphore
Thread-18 get semaphore

协程:

协程:由程序员在 用户态内 做上下文切换实现并发,而不是由操作系统;

优点:(1)由于单线程,无需操作系统做上下文切换 (2)单线程容易管理不需要对公用 数据集加锁;

 

 

 通过yield和迭代器的send方法实现程序间的切换,在用户空间制造了并发。

 


def foo():
r=''
print("开始")
while True:
n=yield r #
print("我拿到%s啦!"%n)


a=foo() # next(a) #触发生成器函数的运行,直到碰到n=yield r停止 # a.send(1) #send 在上次next的基础上 触发生成器函数的运行,会传值给yield next(a) a.send(1) a.send(2) a.send(3) a.send(4)

 

 

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: