社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
import os
import time
#创建子线程
pid = os.fork()
print('Hello World')
#判断是子线程
if pid == 0:
print('s_fork:{},f_fork:{}'.format(os.getpid(),os.getppid()))
else:
print('f_fork:{}'.format(os.getpid()))
进一步解释:
主进程os.fork()
创建一个子进程,此时先执行主进程,pid不为0,执行else后的语句;
再执行创建的子进程,pid返回值为0,所以执行if后的语句。
关于os.fork()
的详细解释:os.fork()
函数是用来新建进程。但它只在POSIX系统上可用,在windows版的Python中,os模块没有定义os.fork()
函数。os.fork()
函数创建进程的过程是这样的:
程序每次执行时,操作系统都会创建一个新进程来运行程序指令。进程还可调用os.fork()
,要求操作系统新建一个进程。父进程是调用os.fork()
函数的进程,父进程所创建的进程叫子进程。每个进程都有一个不重复的进程ID号称为pid,它对进程进行标识。子进程与父进程完全相同,子进程从父进程继承了多个值的拷贝,如全局变量和环境变量。两个进程的唯一区别是fork的返回值。子进程接收返回值0,而父进程接收子进程的pid作为返回值。一个现有进程可以调用fork()
函数创建一个新进程,由fork创建的新进程被称为子进程(child process)。fork()
函数被调用一次但返回两次。两次返回的唯一区别是子进程中返回0值而父进程中返回子进程ID。对于程序,只要判断fork的返回值,就知道自己是处于父进程还是子进程中。
更进一步的解释可参考https://www.jianshu.com/p/e8f5b828cce0。
(1)运行原理对比:
p.start()
时,主进程复制一份代码到子进程,执行多任务;t.start()
时,多个子线程共同执行一份代码;(2)根本区别:
进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位。
(3)开销区别:
进程之间切换开销很大,线程之间切换开销很小。
线程可以看作一个轻量级的进程,同一类线程共享代码和数据空间。
进程间不能直接通信,要想实现两个进程间的通信,可以增加一个中间变量,如通过文件,一个进程写,另一个读,但是这种方式效率很低,一般通过队列Queue进行。
队列的特点是先进先出。
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
q.put([11,22])
print('finished')
打印
finished
再尝试
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
q.put([11,22])
q.put({'name':'Corley'})
print('finished')
显示
显然,当超过3条数据时会发生堵塞,只能强制停止。
改进代码:
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
q.put([11,22])
# 直接抛出异常
q.put_nowait({'name':'Corley'})
print('finished')
打印
Traceback (most recent call last):
File "xxx/demo.py", line 11, in <module>
q.put_nowait({'name':'Corley'})
File "xxxxPythonPython37libmultiprocessingqueues.py", line 129, in put_nowait
return self.put(obj, False)
File "xxxxPythonPython37libmultiprocessingqueues.py", line 83, in put
raise Full
queue.Full
显示队列已满,不能再添加元素。
取数据:
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
q.put([11,22])
print(q.get())
print(q.get())
print(q.get())
print('finished')
打印
1
Corley
[11, 22]
finished
按照先进先出的顺序取数据。
再尝试:
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
q.put([11,22])
print(q.get())
print(q.get())
print(q.get())
# 堵塞
print(q.get())
print('finished')
显示:
显然,数据取完后再取也会发生堵塞。
改进代码:
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
q.put([11,22])
print(q.get())
print(q.get())
print(q.get())
# 堵塞,直接抛出异常
print(q.get_nowait())
print('finished')
打印
Traceback (most recent call last):
1
Corley
[11, 22]
File "xxx/demo.py", line 14, in <module>
print(q.get_nowait())
File "xxxxPythonPython37libmultiprocessingqueues.py", line 126, in get_nowait
return self.get(False)
File "xxxxPythonPython37libmultiprocessingqueues.py", line 107, in get
raise Empty
_queue.Empty
显示队列为空。
其他方法使用:
from multiprocessing import Queue
# 创建队列,最多存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put('Corley')
# 判断队列是否为满
print(q.full())
q.put([11,22])
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
print('finished')
打印
False
3
1
Corley
[11, 22]
True
finished
队列在进程通信方面的实现原理如下:
下载和处理数据实现了分离,在一定程度上实现了解耦。
队列用于进程通信尝试:
import multiprocessing
def download(q):
'''下载数据'''
lis = [11, 22, 33]
for item in lis:
q.put(item)
print('Download complete and save to queue')
def analyse(q):
'''分析数据'''
analyse_data = list()
while True:
data = q.get()
analyse_data.append(data)
if q.empty():
break
print(analyse_data)
def main():
# 创建队列
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=download, args=(q,))
p2 = multiprocessing.Process(target=analyse, args=(q,))
p1.start()
p2.start()
if __name__ == '__main__':
main()
打印
Download complete and save to queue
[11, 22, 33]
进程之间共享变量尝试:
import multiprocessing
a = 1
def demo1():
global a
a += 1
def demo2():
# 打印出的值为2,则是共享的
print(a)
if __name__ == '__main__':
p1 = multiprocessing.Process(target=demo1)
p2 = multiprocessing.Process(target=demo2)
p1.start()
p2.start()
打印
1
打印出的结果为1,说明进程之间全局变量是不共享的,与线程不同。
用普通队列尝试:
import multiprocessing
from queue import Queue
def demo1(q):
q.put('a')
def demo2(q):
data = q.get()
print(data)
if __name__ == '__main__':
q = Queue()
p1 = multiprocessing.Process(target=demo1, args=(q,))
p2 = multiprocessing.Process(target=demo2, args=(q,))
p1.start()
p2.start()
打印
Traceback (most recent call last):
File "xxx/demo.py", line 93, in <module>
p1.start()
File "xxxxPythonPython37libmultiprocessingprocess.py", line 112, in start
self._popen = self._Popen(self)
File "xxxxPythonPython37libmultiprocessingcontext.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "xxxxPythonPython37libmultiprocessingcontext.py", line 322, in _Popen
return Popen(process_obj)
File "xxxxPythonPython37libmultiprocessingpopen_spawn_win32.py", line 89, in __init__
reduction.dump(process_obj, to_child)
File "xxxxPythonPython37libmultiprocessingreduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
报错,属于类型错误。
改进–线程调用run()
方法即可达到效果:
import multiprocessing
from queue import Queue
def demo1(q):
q.put('a')
def demo2(q):
data = q.get()
print(data)
if __name__ == '__main__':
q = Queue()
p1 = multiprocessing.Process(target=demo1, args=(q,))
p2 = multiprocessing.Process(target=demo2, args=(q,))
p1.run()
p2.run()
打印
a
此时正常运行得到结果,但是已经不属于多进程了,因为调用的不是run()
方法,所以普通队列不能实现真正的线程间通信,要实现跨线程通信,需要使用multiprocessing.Queue
。
缺失参数异常处理尝试:
import multiprocessing
def demo1(q):
try:
q.put('a')
except Exception as e:
print(e.args[0])
def demo2(q):
try:
data = q.get()
print(data)
except Exception as e:
print(e.args[0])
if __name__ == '__main__':
q = multiprocessing.Queue()
try:
p1 = multiprocessing.Process(target=demo1)
p2 = multiprocessing.Process(target=demo2)
p1.start()
p2.start()
except Exception as e:
print(e.args[0])
打印
Process Process-1:
Traceback (most recent call last):
File "xxxxPythonPython37libmultiprocessingprocess.py", line 297, in _bootstrap
self.run()
File "xxxxPythonPython37libmultiprocessingprocess.py", line 99, in run
self._target(*self._args, **self._kwargs)
TypeError: demo1() missing 1 required positional argument: 'q'
Process Process-2:
Traceback (most recent call last):
File "xxxxPythonPython37libmultiprocessingprocess.py", line 297, in _bootstrap
self.run()
File "xxxxPythonPython37libmultiprocessingprocess.py", line 99, in run
self._target(*self._args, **self._kwargs)
TypeError: demo2() missing 1 required positional argument: 'q'
显然,此时报错的位置不是在自己写的代码中,而是在Python自己的libmultiprocessingprocess.py中报错,属于Process类的参数错误。
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但是如果是上百甚至上千个目标,手动的去创建的进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool类。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。
原理如下图所示:
尝试:
from multiprocessing import Pool
import os, time, random
def work(msg):
t_start = time.time()
print('%s开始执行,进程号为%d' % (msg, os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print(msg, "执行完成,耗时%0.2f seconds" % (t_stop - t_start))
def demo():
pass
if __name__ == '__main__':
po = Pool(3) # 定义一个进程池,包含3个进程
for i in range(0, 10):
# 添加任务,有10个任务
po.apply_async(work, (i,))
print("--start--")
# 关闭进程池,不再接收新的任务请求
po.close()
# 等待子进程执行完成,主进程再结束
po.join()
print("--end--")
显示:
易知,最开始有0、1、2三个进程执行,0执行完成后3开始执行,1执行完成后4开始执行,3执行完成后5开始执行,一直循环,直到所有进程完成。
修改进程池参数–包含4个进程:
from multiprocessing import Pool
import os, time, random
def work(msg):
t_start = time.time()
print('%s开始执行,进程号为%d' % (msg, os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print(msg, "执行完成,耗时%0.2f seconds" % (t_stop - t_start))
def demo():
pass
if __name__ == '__main__':
po = Pool(4) # 定义一个进程池,包含3个进程
for i in range(0, 10):
# 添加任务,有10个任务
po.apply_async(work, (i,))
print("--start--")
# 关闭进程池,不再接收新的任务请求
po.close()
# 等待子进程执行完成,主进程再结束
po.join()
print("--end--")
显示:
显然,进程池有4个进程时,执行效率更高,因为同时有4个进程在执行。po.close()
执行后不能再添加任务:
from multiprocessing import Pool
import os, time, random
def work(msg):
t_start = time.time()
print('%s开始执行,进程号为%d' % (msg, os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print(msg, "执行完成,耗时%0.2f seconds" % (t_stop - t_start))
def demo():
pass
if __name__ == '__main__':
po = Pool(4) # 定义一个进程池,包含3个进程
for i in range(0, 10):
# 添加任务,有10个任务
po.apply_async(work, (i,))
print("--start--")
# 关闭进程池,不再接收新的任务请求
po.close()
po.apply_async(demo)
# 等待子进程执行完成,主进程再结束
po.join()
print("--end--")
打印
--start--
Traceback (most recent call last):
File "xxx/demo.py", line 151, in <module>
po.apply_async(demo)
File "xxxxPythonPython37libmultiprocessingpool.py", line 362, in apply_async
raise ValueError("Pool not running")
ValueError: Pool not running
要添加任务,必须要在po.close()
之前添加。
用multiprocessing.Queue
进行尝试:
import multiprocessing
def demo1(q):
q.put('a')
def demo2
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/CUFEECR/article/details/104181863
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!