进程相关(二)

接上篇 进程一

五、Process 类的一些方法

  • p.start():开启进程,异步非阻塞

  • p.terminate():结束进程, 异步非阻塞

  • p.is_alive():判断子进程是否还活着

  • p.join():等待子进程的结束,同步阻塞

# 示例
import time
from multiprocessing import Process
​
def son1():
    while True:
        print('is alive')
        time.sleep(0.5)
​
if __name__ == '__main__':
    p = Process(target=son1)
    p.start()      # 异步 非阻塞
    print(p.is_alive())
    time.sleep(1)
    p.terminate()   # 异步的 非阻塞
    print(p.is_alive())   # 进程还活着 因为操作系统还没来得及关闭进程
    time.sleep(0.01)
    print(p.is_alive())   # 操作系统已经响应了我们要关闭进程的需求,再去检测的时候,得到的结果是进程已经结束了

总结:

  • 开启进程的方式
# 面向函数
def 函数名:要在子进程中执行的代码
p = Process(target= 函数名,args=(参数1,))
​
# 面向对象
class 类名(Process):
    def __init__(self,参数1,参数2):   # 如果子进程不需要参数可以不写
        self.a = 参数1
        self.b = 参数2
        super().__init__()
    def run(self):
        # 要在子进程中执行的代码
p = 类名(参数1,参数2)
  • Process 提供的操作进程的方法
    • p.start():开启进程,异步非阻塞
      • p.terminate():结束进程,异步非阻塞

      • p.join():同步阻塞

      • p.isalive():获取当前进程的状态

      • p.daemon = True:设置为守护进程,守护进程永远在主进程的代码结束之后自动结束

六、锁 Lock

  • 如果在一个并发的场景下,涉及到某部分内容是需要修改一些所有进程共享数据资源,需要加锁来维护数据的安全

  • 在数据安全的基础上,才考虑效率问题

  • 同步存在的意义:数据的安全性

  • 方式:

    • 在主进程中实例化 lock = Lock()

    • 把这把锁传递给子进程

    • 在子进程中 对需要加锁的代码 进行 with lock:

      • with lock 相当于 lock.acquire()和 lock.release()
  • 应用场景:(在进程中需要加锁的场景)

    • 共享的数据资源(文件、数据库)

    • 对资源进行修改、删除操作

  • 加锁之后能够保证数据的安全性 但是也降低了程序的执行效率

示例:抢票系统		
import time
import json
from multiprocessing import Process,Lock
​
def search_ticket(user):
    with open('ticket_count') as f:
        dic = json.load(f)
        print('%s查询结果  : %s张余票'%(user,dic['count']))
​
def buy_ticket(user,lock):
    # with lock:
    # lock.acquire()   # 给这段代码加上一把锁
        time.sleep(0.02)
        with open('ticket_count') as f:
            dic = json.load(f)
        if dic['count'] > 0:
            print('%s买到票了'%(user))
            dic['count'] -= 1
        else:
            print('%s没买到票' % (user))
        time.sleep(0.02)
        with open('ticket_count','w') as f:
            json.dump(dic,f)
    # lock.release()   # 给这段代码解锁
​
def task(user, lock):
    search_ticket(user)
    with lock:
        buy_ticket(user, lock)
​
if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=task,args=('user%s'%i,lock))
        p.start()
		

七、进程之间通信

  • 进程之间的数据隔离
from multiprocessing import Process
n = 100
def func():
    global n
    n -= 1
​
if __name__ == '__main__':
    p_l = []
    for i in range(10):
        p = Process(target=func)
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(n)
	
  • 进程之间的通信 - IPC(inter process communication)
from multiprocessing import Queue,Process
# 先进先出
def func(exp,q):
    ret = eval(exp)
    q.put({ret,2,3})
    q.put(ret*2)
    q.put(ret*4)
​
if __name__ == '__main__':
    q = Queue()
    Process(target=func,args=('1+2+3',q)).start()
    print(q.get())
    print(q.get())
    print(q.get())
import queue
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)             # 当队列为满的时候再向队列中放数据 队列会阻塞
print('5555555')
try:
    q.put_nowait(6)  # 当队列为满的时候再向队列中放数据 会报错并且会丢失数据
except queue.Full:
    pass
print('6666666')
​
print(q.get())
print(q.get())
print(q.get())   
print(q.get())   
print(q.get())              # 在队列为空的时候会发生阻塞
try:
    print(q.get_nowait())   # 在队列为空的时候 直接报错
except queue.Empty:pass
  • 内置 IPC 机制:

    • Queue(队列):进程之间数据安全

      • 天生就是数据安全的

      • 基于文件家族的 socket pickle lock

    • pipe(管道):进程之间数据不安全

      • 不安全的

      • 基于文件家族的 socket pickle

      • 队列 = 管道 + 锁

  • 第三方工具(软件)提供的 IPC 机制:

    • redis / memcache / kafka / rabbitmq

    • 优点:

      • 并发需求

      • 高可用,多个消息中间件

      • 断电保存数据

      • 解耦

八、进程之间的数据共享

  • mulprocessing 中有一个 manager 类,封装了所有和进程(数据共享、数据传递)相关的数据类型

  • 但是对于字典、列表这一类的数据操作的时候会产生数据不安全

  • 需要加锁解决问题,并且需要尽量少的使用这种方式

#示例
from multiprocessing import Manager,Process,Lock
​
def func(dic,lock):
    with lock:
        dic['count'] -= 1
​
if __name__ == '__main__':
    # m = Manager()
    with Manager() as m:
        l = Lock()
        dic = m.dict({'count':100})
        p_l = []
        for i in range(100):
            p = Process(target=func,args=(dic,l))
            p.start()
            p_l.append(p)
        for p in p_l:p.join()
        print(dic)