进程相关 (一)

一、基本概念

  • 定义:运行中的程序

  • 程序和进程之间的区别:

    • 程序只是一个文件

    • 进程是这个文件被 CPU 运行起来了

  • 进程是计算机中最小的资源分配单位

  • 在操作系统中的唯一标识符 :pid

  • 进程调度:

    • 操作系统调度进程的算法
      • 短作业优先算法

      • 先来先服务算法

      • 时间片轮转算法

      • 多级反馈算法

  • 并行与并发

    • 并行

      • 两个程序 两个 CPU 每个程序分别占用一个 CPU 自己执行自己的

      • 看起来是同时执行,实际在每一个时间点上都在各自执行着

    • 并发

      • 两个程序 一个 cpu 每个程序交替的在一个 cpu 上执行

      • 看起来在同时执行,但是实际上仍然是串行

  • 同步 / 异步 / 阻塞 / 非阻塞

    • 同步:一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列,要么成功都成功,失败都失败,两个任务的状态可以保持一致

    • 异步:不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了,至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

    • 阻塞:cpu 不工作

    • 非阻塞:cpu 工作

    • 同步阻塞

      • conn.recv

      • socket 阻塞的 tcp 协议的时候

    • 同步非阻塞

      • func() 没有 io 操作

      • socket 非阻塞的 tcp 协议的时候

      • 调用函数(这个函数内部不存在 io 操作)

    • 异步非阻塞

      • 把 func 扔到其他任务里去执行了

      • 我本身的任务和 func 任务各自执行各自的 没有 io 操作

    • 异步阻塞

  • 进程的三状态

    • 就绪 (Ready) 状态

      • 当进程已分配到除 CPU 以外的所有必要的资源,只要获得处理机便立即执行,这时的进程状态称为就绪状态
    • 执行 / 运行(Running)

      • 状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态
    • 阻塞 (Blocked) 状态

      • 正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待 I/O 完成、申请缓冲区不能满足、等待信件 (信号) 等
  • 子进程 / 父进程

    • 在父进程中创建子进程

      • 在 pycharm 中启动的所有 py 程序都是 pycharm 的子进程
    • os 模块

      • os.getpid():获取子进程的 pid

      • os.getppid():获取父进程的 pid

二、进程模块

开启进程

# 面向函数方式:
import os
import time
from multiprocessing import Process
​
def eat():
    print('start eating',os.getpid())
    time.sleep(1)
    print('end eating',os.getpid())
​
def sleep():
    print('start sleeping',os.getpid())
    time.sleep(1)
    print('end sleeping',os.getpid())
​
if __name__ == '__main__':
    p1 = Process(target=eat)    # 创建一个即将要执行eat函数的进程对象
    p1.start()                  # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
    p2 = Process(target=sleep)  # 创建一个即将要执行sleep函数的进程对象
    p2.start()                  # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
    print('main :',os.getpid())
# 面向对象方式:
import os
import time
from multiprocessing import Process
​
class MyProcecss1(Process):
    def __init__(self,x,y):
        self.x = x
        self.y = y
        super().__init__()
    def run(self):
        print(self.x,self.y,os.getpid())
        for i in range(5):
            print('in son2')
​
if __name__ == '__main__':
    mp = MyProcecss1(1,2)
    mp.daemon = True
    mp.start()
    print(mp.is_alive())
    time.sleep(1)
    mp.terminate()
	
  • 操作系统创建进程的方式不同

    • windows 操作系统执行开启进程的代码

      • 实际上新的子进程需要通过 import 父进程的代码来完成数据的导入工作

      • 所以有一些内容我们只希望在父进程中完成,就写在 if name == ‘main’: 下面

    • ios linux 操作系统创建进程 fork

  • 主进程和子进程之间的关系

    • 主进程的结束逻辑
      • 主进程的代码结束

      • 所有的子进程结束

      • 给子进程回收资源

      • 主进程结束

import os
import time
from multiprocessing import Process
def func():
    print('start',os.getpid())
    time.sleep(10)
    print('end',os.getpid())
​
if __name__ == '__main__':
    p = Process(target=func)
    p.start()   # 异步 调用开启进程的方法 但是并不等待这个进程真的开启
    print('main :',os.getpid())
    # 主进程没结束 :等待子进程结束
    # 主进程负责回收子进程的资源
    # 如果子进程执行结束,父进程没有回收资源,那么这个子进程会变成一个僵尸进程
  • 主进程怎么知道子进程结束了的呢?
    • 基于网络、文件
    • join 方法 :阻塞,直到子进程结束就结束
 # 示例一:
import time
from multiprocessing import Process
def send_mail():
   time.sleep(3)
   print('发送了一封邮件')
if __name__ == '__main__':
   p = Process(target=send_mail)
   p.start()   # 异步 非阻塞
   # time.sleep(5)
   print('join start')
   p.join()    # 同步 阻塞 直到p对应的进程结束之后才结束阻塞
   print('5000封邮件已发送完毕')
# 示例二:
# 开启10个进程,给公司的5000个人发邮件,发送完邮件之后,打印一个消息“5000封邮件已发送完毕”
import time
import random
from multiprocessing import Process
def send_mail(a):
   time.sleep(random.random())
   print('发送了一封邮件',a)
​
if __name__ == '__main__':
   l = []
   for i in range(10):
       p = Process(target=send_mail,args=(i,))
       p.start()
       l.append(p)
   print(l)
   for p in l:p.join()
   # 阻塞 直到上面的十个进程都结束
   print('5000封邮件已发送完毕')

总结:

  • 如何创建一个进程对象

    • 对象和进程之间的关系
      • 进程对象和进程并没有直接的关系

      • 只是存储了一些和进程相关的内容

      • 此时此刻,操作系统还没有接到创建进程的指令

  • 如何开启一个进程

    • 通过 p.start() 开启了一个进程–这个方法相当于给了操作系统一条指令

    • start 方法的非阻塞和异步的特点

      • 在执行开启进程这个方法的时候

      • 我们既不等待这个进程开启,也不等待操作系统给我们的响应

      • 这里只是负责通知操作系统去开启一个进程

      • 开启了一个子进程之后,主进程的代码和子进程的代码完全异步

  • 父进程和子进程

    • 为了回收子进程的资源,父进程会等待着所有的子进程结束之后才结束
  • 进程开启的过程中 windows 和 linux / ios 之间的区别

    • windows 通过(模块导入)再一次执行父进程文件中的代码来获取父进程中的数据

      • 所以只要是不希望被子进程执行的代码,就写在 if name == ’main’下

      • 因为在进行导入的时候父进程文件中的name != ‘main

    • linux/ios

      • 正常的写就可以,没有 if name == ’main’这件事情了
    • 补充

	if __name__ == '__main__':
		# 控制当这个py文件被当作脚本直接执行的时候,就执行这里面的代码
		# 当这个py文件被当作模块导入的时候,就不执行这里面的代码
		print('hello hello')
	__name__ == '__main__'  # 执行的文件就是__name__所在的文件
	__name__ == '文件名'     # __name__所在的文件被导入执行的时候
  • join 方法
    • 把一个进程的结束事件封装成一个 join 方法

    • 执行 join 方法的效果就是阻塞,直到这个子进程执行结束就结束阻塞

	# 在多个子进程中使用join
	p_l= []
	for i in range(10):
		p = Process(target=函数名,args=(参数1,参数2))
		p.start()
		p_l.append(p)
	for p in p_l:p.join()
	# 所有的子进程都结束之后要执行的代码写在这里

三、守护进程

  • 方式:有一个参数可以把一个子进程设置为一个守护进程
p.daemon = True

import time
from multiprocessing import Process
​
def son1(a,b):
    while True:
        print('is alive')
        time.sleep(0.5)
​
def son2():
    for i in range(5):
        print('in son2')
        time.sleep(1)
​
if __name__ == '__main__':
    p = Process(target=son1,args=(1,2))
    p.daemon = True
    p.start()      # 把p子进程设置成了一个守护进程
    p2 = Process(target=son2)
    p2.start()
    time.sleep(2)
	
  • 结束:守护进程是随着主进程的代码结束而结束的

    • 所有的子进程都必须在主进程结束之前结束,由主进程来负责回收资源
  • 应用场景:

    • 生产者消费者模型

    • 和守护线程做对比

四、生产者消费者模型

  • 解耦:把写在一起的大的功能分开成多个小的功能处理

    • 优点:修改 复用 可读性
  • 组成:

    • producer:生产者,生产数据

    • consumer:消费者,处理数据

    • 生产者和消费者之间的容器就是队列

  • 什么是生产者消费者模型?

    • 把一个产生数据并且处理数据的过程解耦,让生产的数据的过程和处理数据的过程达到一个工作效率上的平衡,

    • 中间的容器,在多进程中我们使用队列或者可被 join 的队列,做到控制数据的量

      • 当数据过剩的时候,队列的大小会控制这生产者的行为

      • 当数据严重不足的时候,队列会控制消费者的行为

      • 并且我们还可以通过定期检查队列中元素的个数来调节生产者消费者的个数

    • 比如说:一个爬虫,或者一个 web 程序的 server 端

      • 爬虫:请求网页的平均时间是 0.3s,处理网页代码的时候是 0.003s

        • 100 倍,每启动 100 个线程生产数据,启动一个线程来完成处理数据
      • web 程序的 server 端:每秒钟有 6w 条请求,一个服务每 s 中只能处理 2000 条

        • 先写一个 web 程序,只负责一件事情,就是接收请求,然后把请求放到队列中

        • 再写很多个 server 端,从队列中获取请求,然后处理,然后返回结果

  • 生产者消费者模型

#  生产者消费者模型
import time
import random
from multiprocessing import Process,Queue
​
def producer(q,name,food):
   for i in range(10):
       time.sleep(random.random())
       fd = '%s%s'%(food,i)
       q.put(fd)
       print('%s生产了一个%s'%(name,food))
​
def consumer(q,name):
   while True:
       food = q.get()
       if not food:break
       time.sleep(random.randint(1,3))
       print('%s吃了%s'%(name,food))
​
def cp(c_count,p_count):
   q = Queue(10)
   for i in range(c_count):
       Process(target=consumer, args=(q, 'alex')).start()
   p_l = []
   for i in range(p_count):
       p1 = Process(target=producer, args=(q, 'wusir', '泔水'))
       p1.start()
       p_l.append(p1)
   for p in p_l:p.join()
   for i in range(c_count):
       q.put(None)
if __name__ == '__main__':
   cp(2,3)
	
# 生产者消费者模型实现的爬虫示例
import re
import requests
from multiprocessing import Process,Queue
​
def producer(q,url):
    response = requests.get(url)
    q.put(response.text)
​
def consumer(q):
    while True:
        s = q.get()
        if not s:break
        com = re.compile(
            '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>\d+).*?<span class="title">(?P<title>.*?)</span>'
            '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S)
        ret = com.finditer(s)
        for i in ret:
            print({
                "id": i.group("id"),
                "title": i.group("title"),
                "rating_num": i.group("rating_num"),
                "comment_num": i.group("comment_num")}
            )
​
if __name__ == '__main__':
    count = 0
    q = Queue(3)
    p_l = []
    for i in range(10):
        url = 'https://movie.douban.com/top250?start=%s&filter='%count
        count += 25
        p = Process(target=producer,args=(q,url,))
        p.start()
        p_l.append(p)
    Process(target=consumer, args=(q,)).start()
    for p in p_l:p.join()
    q.put(None)
  • joinablequeue
# 示例
import time
import random
from  multiprocessing import JoinableQueue,Process
​
def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        fd = '%s%s'%(food,i)
        q.put(fd)
        print('%s生产了一个%s'%(name,food))
    q.join()
​
def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.random())
        print('%s吃了%s'%(name,food))
        q.task_done()
​
if __name__ == '__main__':
    jq = JoinableQueue()
    p =Process(target=producer,args=(jq,'wusir','泔水'))
    p.start()
    c = Process(target=consumer,args=(jq,'alex'))
    c.daemon = True
    c.start()
    p.join()