进程通信
"""进程在内存级别是隔离的,但是文件在磁盘上,1.基于文件通信利用抢票系统讲解2.基于队列通信3.基于管道的通信"""
文件通信
# 抢票系统# 1.先可以查票,查询余票数 并发# 2.进行购买,向服务端发送请求,服务端接收请求,在后端将票数-1,返回到前端 串行from multiprocessing import Processfrom multiprocessing import Lockimport jsonimport timeimport osimport random# task一个总进程def search(): """ 查看余票 :return: """ time.sleep(random.randint(1,3)) with open('ticket.json',encoding='utf-8') as f1: dic = json.load(f1) print(f'{os.getpid()}查看了票数,剩余{dic["count"]}')def paid(): """ 支付 打开文件写入文件转成json模式 :return: """ with open('ticket.json', encoding='utf-8') as f1: dic = json.load(f1) if dic['count'] > 0: dic['count'] -=1 time.sleep(random.randint(1,3)) with open('ticket.json',encoding='utf-8',mode='w') as f1: json.dump(dic,f1) print(f'{os.getpid()}购买成功')def task(lock): # task进程 search() lock.acquire() paid() lock.release()if __name__ == '__main__': mutex = Lock() for i in range(6): p = Process(target=task,args=(mutex,)) p.start() # 启动# 当很多进程共抢一个资源数据时,要保证顺序数据安全,一定要串行# 互斥锁:可以公平性的保证顺序以及数据的安全# 基于文件的进程之间的通信# 缺点:效率低,自己加锁会麻烦,容易出现死锁# 优点:能通信了"""几乎同时6个进程开启,都进入了一个查票环节,task1进入search,睡了3s,其他的进程都进入查票,查票此时是并发当task2进程进入买票环节,读取文件票数,将文件票数-1,进入阻塞状态,其他的进程陆续执行同样的操作,这样就造成了数据不安全"""
队列通信
"""""""""队列:一个容器,这个容器可以承载一些数据 Queue队列的特性:先进先出永远保持这个顺序 FIFO first in first out"""from multiprocessing import Queueq = Queue()def func(): print('in func') return '函数'q.put(1)q.put('alex')q.put([1,2,3])q.put(func())print(q.get())print(q.get())print(q.get())print(q.get())# 设置大小from multiprocessing import Queueq = Queue(3) # maxsize的最大值def func(): print('in func')q.put(1)q.put('alex')q.put([1,2,3])q.put(func) # 若put超出最大值则队列满了 进程put会阻塞 无法执行下面的代码print(q.get())print(q.get())print(q.get())print(q.get()) # 若get满了,当数据取完时,在进程get数据也会出现阻塞,直到再put一个数据# 一个进程,几个put就几个get## block = False 只要遇到阻塞就会报错from multiprocessing import Queueq = Queue(3) # maxsizedef func(): print('in func')q.put(1)q.put('alex')q.put([1,2,3])q.put(555,block=False)print(q.get())print(q.get())print(q.get())print(q.get())# timeout = 3 延时报错 超过3秒还阻塞,抛出异常from multiprocessing import Queueq = Queue(3) # maxsizeq.put(1)q.put('alex')q.put([1,2,3])print(q.get())print(q.get())print(q.get())print(q.get(timeout=3))
join
# join让主进程等待子进程的结束,再执行主进程from multiprocessing import Processimport timedef task(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') # 主进程if __name__ == '__main__': p = Process(target=task,args=('zs',))# 创建一个进程对象 p.start() # 启动p子进程同时启动主进程 p.join() # 必须等待p执行完再执行主 print('主进程') # 输出 # zs is running # zs is gone # 主进程# join开启多个子进程# 未开启时:先执行主进程,再一个一个执行子进程(先执行时间短的)from multiprocessing import Processimport timedef task(name,sec): print(f'{name} is running') time.sleep(sec) print(f'{name} is gone') # 主进程if __name__ == '__main__': start_time = time.time() # 同一个时刻开启4个进程,并发或者并行,按照最大的时间走 p = Process(target=task,args=('zs',2)) p2 = Process(target=task,args=('zs2',7)) p3 = Process(target=task,args=('zs3',5)) p.start() p2.start() p3.start() print(f'主进程消耗的时间{time.time() - start_time}') # 主进程消耗的时间与其他进程无关 0.000123秒 # 输出: # 主进程消耗的时间0.0050699710845947266 # zs is running # zs2 is running # zs3 is running # zs is gone # zs3 is gone # zs2 is gone# 验证1: # join只针对主进程,如果join下面多次join是不阻塞的 # 不会按照一行一行输出,同时开始执行,最后执行主进程 # 必须等待最长的1个p执行完再执行主进程 # p,p1,p2 同时进行,p执行速度快,先执行完,执行时阻塞不执行'2秒'主进程2秒后执行主进程'2秒' # 执行p的时候同时执行p1和p2 p2执行速度比p3慢 p3先执行完,执行完后join无法阻塞主进程 # p3执行完,p2再过2秒也执行完,p2执行完后,直接执行'7'秒和'5'秒的主进程 p.join() print('2秒') p2.join() print('7秒') p3.join() print('5秒') print(f'主进程消耗的时间{time.time() - start_time}') # 并发执行主进程消耗的时间 3.0387587秒 # p1和p2和p3 同时运行 按照最长的时长打印 # 输出 # 主进程消耗的时间0.005324602127075195 # zs is running # zs2 is running # zs3 is running # zs is gone # 2秒 # zs3 is gone # zs2 is gone # 7秒 # 5秒 # 主进程消耗的时间7.008730888366699# 验证2# 对验证1进行优化代码 循环打印# 正确示范from multiprocessing import Processimport timedef task(sec): print(f'is running') time.sleep(sec) print(f'is gone')if __name__ == '__main__': start_time = time.time() l1 = [] for i in range(1,4): p = Process(target=task,args=(i,)) l1.append(p) p.start() for i in l1: i.join() print(f'主进程{time.time()-start_time}')# join就是阻塞,主进程有join主进程下面的代码一律不执行
互斥锁
# 互斥锁:# 多个任务共抢占一个资源时,想要顺序优先保障数据安全,一定要让其串行# 现在的进程都并发的抢占输出# 并发是以效率优先的,但是目前的需求是:顺序优先# 多个进程共枪一个资源时,要保证顺畅优先:串行,一个一个来# 并行或者并发from multiprocessing import Processimport timeimport randomimport osdef task1(): print(f'{os.getpid()}开始') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了')def task2(): print(f'{os.getpid()}开始') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了')def task3(): print(f'{os.getpid()}开始') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了')if __name__ == '__main__': # 开启3个任务 p1 = Process(target=task1) p2 = Process(target=task2) p3 = Process(target=task3) p1.start() p2.start() p3.start()# 串行# 利用join 解决了并行的问题,保证了顺序优先,但是这个谁先谁后是固定的,# 这样不合理,争抢同一个资源的时候,应该是先到先得,保证公平from multiprocessing import Processimport timeimport randomimport osdef task1(): print(f'{os.getpid()}开始') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了')def task2(): print(f'{os.getpid()}开始') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了')def task3(): print(f'{os.getpid()}开始') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了')if __name__ == '__main__': # 开启3个任务 p1 = Process(target=task1) p2 = Process(target=task2) p3 = Process(target=task3) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join()from multiprocessing import Processimport timeimport randomimport osdef task1(p): print(f'{p}开始') time.sleep(random.randint(1,3)) print(f'{p}打印结束了')def task2(p): print(f'{p}开始') time.sleep(random.randint(1,3)) print(f'{p}打印结束了')def task3(p): print(f'{p}开始') time.sleep(random.randint(1,3)) print(f'{p}打印结束了')if __name__ == '__main__': # 开启3个任务 p1 = Process(target=task1,args=('p1',)) p2 = Process(target=task2,args=('p2',)) p3 = Process(target=task3,args=('p3',)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join()from multiprocessing import Processfrom multiprocessing import Lock #(锁)import timeimport randomimport osdef task1(p,lock): # 上锁 lock.acquire() # lock.acquire() 互斥锁上多个无法解开 print(f'{p}开始') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') # 解锁 释放掉 lock.release()def task2(p,lock): # 上锁 lock.acquire() print(f'{p}开始') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') # 解锁 释放掉 lock.release()def task3(p,lock): # 上锁 lock.acquire() print(f'{p}开始') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') # 解锁 释放掉,再往下执行,开启强锁 lock.release()if __name__ == '__main__': # 开启3个任务 # 开始上锁,添加参数lock mutex = Lock() p1 = Process(target=task1,args=('p1',mutex)) p2 = Process(target=task2,args=('p2',mutex)) p3 = Process(target=task3,args=('p3',mutex)) p1.start() p1.join() p2.start() p2.join() p3.start() p3.join()"""三个任务同时抢占一把锁,先到先得,假如task2 先得到然后执行下面的程序,此时的task1和task3也会抢锁,但是抢到后,发现已经上锁了,只能阻塞等待遇到阻塞后,操作系统会强行将CPU切换到其他任务,其他任务发现锁还没有被打开,继续阻塞直到task2 将锁释放掉,task1和task3继续争抢这把锁如果一个子进程里面有多个锁,会变成死锁,无法继续执行""""""join和互斥锁的区别共同点:都能将并行变成串行,保证了顺序不同点:join人为的设定顺序,lock让其争抢顺序,保证了公平性"""from multiprocessing import Processfrom multiprocessing import Lockimport timeimport randomdef task1(p,lock): lock.acquire() print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') lock.release()def task2(p,lock): lock.acquire() print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') lock.release()def task3(p,lock): lock.acquire() print(f'{p}开始打印了') time.sleep(random.randint(1,3)) print(f'{p}打印结束了') lock.release()if __name__ == "__main__": mutex = Lock() p1 = Process(target=task1,args=('p1',mutex)) p2 = Process(target=task2,args=('p2',mutex)) p3 = Process(target=task3,args=('p3',mutex)) p2.start() p1.start() p3.start()