python中的多进程的几种方式以及进程间的通信

  • baagee 发布于 2017-07-30 08:44:46
  • 分类:Python
  • 2874 人围观
  • 2 人喜欢

Python中进程的实现由好几种方式。

1,Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程

import os

rpid = os.fork()
if rpid<0:
    print("fork调用失败。")
elif rpid == 0:
    print("我是子进程(%s),我的父进程是(%s)"%(os.getpid(),os.getppid()))
    x+=1
else:
    print("我是父进程(%s),我的子进程是(%s)"%(os.getpid(),rpid))

print("父子进程都可以执行这里的代码")

结果是:

我是父进程(19360),我的子进程是(19361)
父子进程都可以执行这里的代码
我是子进程(19361),我的父进程是(19360)
父子进程都可以执行这里的代码

程序执行到os.fork()时,操作系统会创建一个新的进程(子进程),然后复制父进程的所有信息到子进程中。

然后父进程和子进程都会从fork()函数中得到一个返回值,在子进程中这个值一定是0,而父进程中是子进程的 id号

在Unix/Linux操作系统中,提供了一个fork()系统函数,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的ID。

这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

1.1,多进程修改全局变量

import os
import time

num = 0

# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()

if pid == 0:
    num+=1
    print('哈哈1---num=%d'%num)
else:
    time.sleep(1)
    num+=1
    print('哈哈2---num=%d'%num)

结果是:

哈哈1---num=1
哈哈2---num=1
[Finished in 1.1s]

父进程、子进程执行顺序没有规律,完全取决于操作系统的调度算法,并且每个进程间资源互不影响,独立。

2,multiprocessing实现多进程

如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?

由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os
import time

def info(name,age=9,sex='man',**kwargs):
	for x in range(5):
		print('name=%s,age=%d,sex=%s'%(name,age,sex))
		print(kwargs)
		time.sleep(1)

p=Process(target=info,args=('p1',22,'man'),kwargs={'sss':111,'aaa':999,'asd':123},name='p1')

print('父进程 %d.' % os.getpid())

print('子进程将要执行')
p.start()
print(p.is_alive())
p.join()
# p.terminate()
print('子进程已结束')
print('子进程.name=%s'%p.name)
print('子进程.pid=%d'%p.pid)
print(p.is_alive())

结果是:

父进程 7822.
子进程将要执行
True
name=p1,age=22,sex=man
{'aaa': 999, 'asd': 123, 'sss': 111}
name=p1,age=22,sex=man
{'aaa': 999, 'asd': 123, 'sss': 111}
name=p1,age=22,sex=man
{'aaa': 999, 'asd': 123, 'sss': 111}
name=p1,age=22,sex=man
{'aaa': 999, 'asd': 123, 'sss': 111}
name=p1,age=22,sex=man
{'aaa': 999, 'asd': 123, 'sss': 111}
子进程已结束
子进程.name=p1
子进程.pid=7825
False
[Finished in 5.1s]

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

Process语法结构如下:

Process([group [, target [, name [, args [, kwargs]]]]])

target:表示这个进程实例所调用对象;

args:表示调用对象的位置参数元组;

kwargs:表示调用对象的关键字参数字典;

name:为当前进程实例的别名;

group:大多数情况下用不到;

Process类常用方法:

is_alive():判断进程实例是否还在执行

join([timeout]):是否等待进程实例执行结束,或等待多少秒;

start():启动进程实例(创建子进程);

run():如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法;

terminate():不管任务是否完成,立即终止;

Process类常用属性:

name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数;

pid:当前进程实例的PID值;   

例2

from multiprocessing import Process
import time
import os

#两个子进程将会调用的两个方法
def  worker_1(interval):
    print("worker_1,父进程(%s),当前进程(%s)"%(os.getppid(),os.getpid()))
    t_start = time.time()
    time.sleep(interval) #程序将会被挂起interval秒
    t_end = time.time()
    print("worker_1,执行时间为'%0.2f'秒"%(t_end - t_start))

def  worker_2(interval):
    print("worker_2,父进程(%s),当前进程(%s)"%(os.getppid(),os.getpid()))
    t_start = time.time()
    time.sleep(interval)
    t_end = time.time()
    print("worker_2,执行时间为'%0.2f'秒"%(t_end - t_start))

#输出当前程序的ID
print("进程ID:%s"%os.getpid())

#创建两个进程对象,target指向这个进程对象要执行的对象名称,
#args后面的元组中,是要传递给worker_1方法的参数,
#因为worker_1方法就一个interval参数,这里传递一个整数2给它,
#如果不指定name参数,默认的进程对象名称为Process-N,N为一个递增的整数
p1=Process(target=worker_1,args=(2,))
p2=Process(target=worker_2,name="BaAGee",args=(1,))

#使用"进程对象名称.start()"来创建并执行一个子进程,
#这两个进程对象在start后,就会分别去执行worker_1和worker_2方法中的内容
p1.start()
p2.start()

#同时父进程仍然往下执行,如果p2进程还在执行,将会返回True
print("p2.is_alive=%s"%p2.is_alive())

#输出p1和p2进程的别名和pid
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)

#join括号中不携带参数,表示父进程在这个位置要等待p1进程执行完成后,
#再继续执行下面的语句,一般用于进程间的数据同步,如果不写这一句,
#下面的is_alive判断将会是True,在shell(cmd)里面调用这个程序时
#可以完整的看到这个过程,大家可以尝试着将下面的这条语句改成p1.join(1),
#因为p2需要2秒以上才可能执行完成,父进程等待1秒很可能不能让p1完全执行完成,
#所以下面的print会输出True,即p1仍然在执行
p1.join()
print("p1.is_alive=%s"%p1.is_alive())

结果是:

进程ID:7910
p2.is_alive=True
p1.name=Process-1
p1.pid=7913
p2.name=BaAGee
p2.pid=7914
worker_1,父进程(7910),当前进程(7913)
worker_2,父进程(7910),当前进程(7914)
worker_2,执行时间为'1.00'秒
worker_1,执行时间为'2.00'秒
p1.is_alive=False
[Finished in 2.1s]

2.1,继承Process类来实现多进程

创建新的进程还能够使用类的方式,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象,请看下面的实例:

import os
from multiprocessing import Process
import time

class myProcess(Process):

    def __init__(self):
        Process.__init__(self)

    def run(self):
        print('子进程pid(%s) 开始执行,父进程为pid(%s)'%(os.getpid(),os.getppid()))
        x=0
        st=time.time()
        for i in range(100000000):
            x+=i
        print('子进程pid(%s)执行结束,耗时%0.3f秒,x=%d'%(os.getpid(),time.time()-st,x))


if __name__ == '__main__':
    t1=time.time()
    mp=myProcess()
    mp1=myProcess()
    mp1.start()
    mp.start()
    mp1.join()
    mp.join()
    print('over,time=%0.3f'%(time.time()-t1))

结果是:

子进程pid(7988) 开始执行,父进程为pid(7985)
子进程pid(7989) 开始执行,父进程为pid(7985)
子进程pid(7988)执行结束,耗时8.463秒,x=4999999950000000
子进程pid(7989)执行结束,耗时8.540秒,x=4999999950000000
over,time=8.545
[Finished in 8.6s]

3,进程池中的多进程

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行,请看下面的实例:

from multiprocessing import Pool
import os
import time

def work(i):
	print("%d->开始执行,子进程pid=(%s),父进程为(%s)"%(i,os.getpid(),os.getppid()))
	t_start=time.time()
	x=0
	for j in range(i):
		x+=j
	print("%d->执行完毕,子进程pid=(%s)执行结束,x=%d,耗时%0.3f秒"%(i,os.getpid(),x,time.time()-t_start))

#定义一个进程池,最大进程数3
po=Pool(3)

for i in range(10000000,10000012):
	#Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
    #每次循环将会用空闲出来的子进程去调用目标
	po.apply_async(work,(i,))

print("----start----")
po.close() #关闭进程池,关闭后po不能够在添加新的任务
po.join() #等待po中所有子进程执行完成,必须放在close语句之后,
# 主进程 创建/添加任务后,主进程默认不会等待进程池的任务执行完毕之后才结束,
# 而是当主进程任务做完后立马结束,如果没有join会导致进程池的任务不会被执行
print("-----end-----")

结果是:

----start----
10000000->开始执行,子进程pid=(8854),父进程为(884410000001->开始执行,子进程pid=(8855),父进程为(884410000002->开始执行,子进程pid=(8856),父进程为(884410000000->执行完毕,子进程pid=(8854)执行结束,x=49999995000000,耗时0.99110000003->开始执行,子进程pid=(8854),父进程为(884410000002->执行完毕,子进程pid=(8856)执行结束,x=50000015000001,耗时1.05410000004->开始执行,子进程pid=(8856),父进程为(884410000001->执行完毕,子进程pid=(8855)执行结束,x=50000005000000,耗时1.10810000005->开始执行,子进程pid=(8855),父进程为(884410000004->执行完毕,子进程pid=(8856)执行结束,x=50000035000006,耗时0.96010000006->开始执行,子进程pid=(8856),父进程为(884410000003->执行完毕,子进程pid=(8854)执行结束,x=50000025000003,耗时1.04810000007->开始执行,子进程pid=(8854),父进程为(884410000005->执行完毕,子进程pid=(8855)执行结束,x=50000045000010,耗时1.06210000008->开始执行,子进程pid=(8855),父进程为(884410000006->执行完毕,子进程pid=(8856)执行结束,x=50000055000015,耗时0.95410000009->开始执行,子进程pid=(8856),父进程为(884410000007->执行完毕,子进程pid=(8854)执行结束,x=50000065000021,耗时1.04110000010->开始执行,子进程pid=(8854),父进程为(884410000008->执行完毕,子进程pid=(8855)执行结束,x=50000075000028,耗时1.03110000011->开始执行,子进程pid=(8855),父进程为(884410000009->执行完毕,子进程pid=(8856)执行结束,x=50000085000036,耗时0.98010000010->执行完毕,子进程pid=(8854)执行结束,x=50000095000045,耗时1.04410000011->执行完毕,子进程pid=(8855)执行结束,x=50000105000055,耗时0.991秒
-----end-----
[Finished in 4.4s]

multiprocessing.Pool常用函数解析:

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

apply(func[, args[, kwds]]):使用阻塞方式调用func

close():关闭Pool,使其不再接受新的任务;

terminate():不管任务是否完成,立即终止;

join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

4,进程间通信-Queue:

Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序

初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

Queue.qsize():返回当前队列包含的消息数量;

Queue.empty():如果队列为空,返回True,反之False ;

Queue.full():如果队列满了,返回True,反之False;

Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;

1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;

2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;

Queue.get_nowait():相当Queue.get(False);

Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;

2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;

Queue.put_nowait(item):相当Queue.put(item, False);

4.1,Process多进程的进程间通信

我们以Process为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据

from multiprocessing import Queue,Process
import time,os

def write(q):
	print("write启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
	infos=['河南','山东','山西','陕西','北京','湖南','海南']
	for info in infos:
		print('Put %s to queue...' %info)
		q.put(info)
		time.sleep(1)

def read(q):
	print("read启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
	while True:
		if not q.empty():
			info=q.get(True)
			print('Get %s from queue.' %info)
		else:
			break
		time.sleep(1)

if __name__ == '__main__':
	# 父进程创建Queue,并传给各个子进程:
	q=Queue()
	pw=Process(target=write,args=(q,))
	pr=Process(target=read,args=(q,))

	# 启动子进程pw,写入:
	pw.start()    
	# 等待pw结束:
	pw.join()

	# 启动子进程pr,读取:
	pr.start()
	pr.join()

	print('-------over---------')

结果是:

baagee@baagee-virtual-machine:~/桌面/python/day05$ python3 进程间通信Queue实例.py 
write启动(10188),父进程为(10187)
Put 河南 to queue...
Put 山东 to queue...
Put 山西 to queue...
Put 陕西 to queue...
Put 北京 to queue...
Put 湖南 to queue...
Put 海南 to queue...
read启动(10193),父进程为(10187)
Get 河南 from queue.
Get 山东 from queue.
Get 山西 from queue.
Get 陕西 from queue.
Get 北京 from queue.
Get 湖南 from queue.
Get 海南 from queue.
-------over---------

4.2,进程池中多进程间使用Queue通信

# 修改import中的Queue为Manager
from multiprocessing import Manager,Pool
import os,time

def read(q):
    print("read启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print("read从Queue获取到消息:%s"%q.get(True))
        time.sleep(1)

def write(q):
    print("write启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
    for i in "BaAGee":
        print('Put %s to queue...' %i)
        q.put(i)
        time.sleep(1)

if __name__=="__main__":
    print("(%s) start"%os.getpid())
    q=Manager().Queue() #使用Manager中的Queue来初始化
    po=Pool()
    #使用阻塞模式创建进程,这样就不需要在read中使用死循环了,可以让write完全执行完成后,再用reader去读取
    po.apply(write,(q,))
    po.apply(read,(q,))
    po.close()
    po.join()
    print("(%s) End"%os.getpid())

结果是:

baagee@baagee-virtual-machine:~/桌面/python/day05$ python3 进程池中的Queue.py 
(10561) start
write启动(10569),父进程为(10561)
Put B to queue...
Put a to queue...
Put A to queue...
Put G to queue...
Put e to queue...
Put e to queue...
read启动(10567),父进程为(10561)
read从Queue获取到消息:B
read从Queue获取到消息:a
read从Queue获取到消息:A
read从Queue获取到消息:G
read从Queue获取到消息:e
read从Queue获取到消息:e
(10561) End

5,进程间的异步

from multiprocessing import Pool
import time
import os

def test():
    print("---进程池中的进程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid()))
    for i in range(3):
        print("----%d---"%i)
        time.sleep(1)
    return ['sdgfsd','sgfd']

def test2(args):
    print("---callback func--pid=%d"%os.getpid())
    print("---callback func--args=%s"%args)

pool = Pool(1)
pool.apply_async(func=test,callback=test2)
pool.close()

while True:
    print("----主进程-pid=%d----"%os.getpid())
    time.sleep(1)

结果是:

baagee@baagee-virtual-machine:~/桌面/python/day06$ python3 异步.py 
----主进程-pid=6756----
---进程池中的进程---pid=6757,ppid=6756--
----0---
----1---
----主进程-pid=6756----
----2---
----主进程-pid=6756----
----主进程-pid=6756----
---callback func--pid=6756
---callback func--args=['sdgfsd', 'sgfd']
----主进程-pid=6756----
----主进程-pid=6756----
----主进程-pid=6756----
----主进程-pid=6756----

说明:当程序执行时,主进程也在继续往下执行,当子进程完成后叫主进程去执行callback函数,子进程test函数返回值会传递到callbakc函数里,callback函数执行完毕主进程继续执行自己的任务。


标签: Python 进程 fork Queue

评论

点击图片切换
还没有评论,快来抢沙发吧!