python-网络编程-程序员宅基地

技术标签: python  操作系统  json  

目录:1.C/S架构  2.TCP/IP协议  3.socket套接字  4.粘包与解决办法  5.操作系统  6.进程理论  7.开启进程  8.join方法  9.守护进程  10.互斥锁  11.队列Queue  12.生产者消费者模型  13.线程理论  14.开启线程  15.守护线程  16.GIL  17.进程池与线程池  18.死锁与递归锁  19.信号量  20.Event  21.定时器  22.线程queue  23.协程  24.greenlet  25.gevent  26.IO模型  27.IO多路复用与selectors模块  28.socketserver

C/S架构

         C指的是client(客户端软件),S指的是server(服务端软件)

        

         客户端软件想要基于网络发送一条消息给服务端软件,流程如下:

         1.客户端产生数据,存放于客户端软件的内存中,调用接口将内存中的数据发送/拷贝到操作系统内存

         2.客户端操作系统收到数据后,按照客户端软件指定的规则(即协议),调用网卡发送数据

         3.网络传输数据

         4.服务端调用系统接口,想要将数据从操作系统内存拷贝到自己的内存中

         5.服务端操作系统收到4的指令后,使用与客户端相同的规则(即协议)从网卡接收到数据,拷贝给服务端软件

 

TCP/IP协议

         Transmission Control Protocol/Internet Protocol

         传输控制协议/因特网互联协议

         五层模型:

                   1.物理层,光缆、电缆、双绞线、无线电波等,功能:主要是基于电器特性

                   发送高(1)低(0)电压电信号

                  

                   2.数据链路层,功能:规定了电信号的分组方式

                            Ethernet以太网协议,规定了一组电信号构成一个数据包,叫做‘帧’

                            每一个数据帧分为报头(head)和数据(data)

                            报头 固定由6个字节的发送者/原地址+6个字节的接受者/目标地址+6个字节的数据类型组成,固定18个字节

                            数据包含最短46个字节,最长1500个字节

                            所以一个帧最小64字节,最大1518字节,超过限度就分片发送

                           

                            但凡接入互联网的机器就必须有一块网卡,每块网卡都有独一无二的一个mac地址

                           

                            mac地址:每块网卡在出厂时都会烧制上世界上唯一的一个mac地址

                           

                   3.网络层,功能:引入一套新的地址用来区分不同的广播域/子网,这套地址即网络地址

                            IP协议,是规定网络地址的协议,ipv4规定网络地址由32位2进制表示,范围是0.0.0.0到255.255.255.255

                           

                            这一层发出的数据就是:IP头+data,这样一种结构

                           

                            ip地址+mac地址就标示了全世界独一无二的一台机器

                           

                            子网掩码,就是表示子网络特征的一个参数,形式上与IP地址一样,由32位2进制组成

                            其网络部分全部为1,主机部分全部为0,子网掩码是用来标识一个ip地址的哪些位代表网络位,

                            哪些位代表主机位,区分网络为和主机位是为了划分子网,避免广播风暴和地址浪费

                           

                            A类ip地址:1个网络位3个主机位1.0.0.0-126.0.0.0

                            B类ip地址:2个网络位2个主机位128.0.0.0-191.255.255.255

                            C类ip地址:3个网络位1个主机位192.0.0.0-223.255.255.255

                           

                            0.0.0.0对应当前主机,255.255.255.255是当前子网的广播地址

                           

                   4.传输层,功能:建立端口到端口的通信

                            网络层的ip帮我们区分子网,数据链路层的mac地址帮我们找到主机

                            端口即应用程序与网卡关联的编号,端口范围0-65535,其中0-1023为系统占用端口

                            找到端口就找到软件了,IP+端口就能找到全世界唯一一个软件

                           

                            tcp协议,可靠传输,理论上tcp数据包没有长度限制,但为了保证网络效率,通常不超过ip数据包长度

                                     流式协议,它的数据是数据流,该协议没有封包,所以很可能出现粘包,因为消息边界不明确

                           

                                     以太网头-ip头-tcp头-数据

                           

                                     tcp协议是可靠协议,因为在数据传输过程中,只要不得到确认,就重新发送数据,直到得到确认

                                     tcp要先建立双向通路,tcp是流式协议,双向通路就像管道,建好双向通路才可以发数据

                                     tcp建立双向通路与断开双向通路需要三次握手四次挥手

                                     为什么是四次挥手呢,2、3可能不是同时的,2是确认客户端断连接的请求的,这个可以直接回复

                                     但是3是服务端发起断连接的请求,需要服务端到客户端的数据发完才能断连接,所以2、3可能不是同时的,

                                     那么如果这两步合并就会出现数据没传完的情况

                           

                            udp协议,不可靠传输

                                     udp不需要建立通路,直接就发送数据

                           

                                     udp协议自带封包,就算内容是空的,也会在外面包一层,这样的话udp的数据包就是有边界的,不会粘包

                                    

                                     以太网头-ip头-udp头-数据

                           

                            tcp协议虽然安全性高,但是网络开销大,而udp虽然没有提供安全机制,但网络开销小

                            在现在这个网络安全已经相对较高的情况下,为了保证传输速率,优先考虑udp协议

                           

                   5.应用层

                  

                   经过层层包装后的数据结构是:Ethernet头 + ip头 + tcp/udp头 + data

 

socket:就是套接字

         什么是socket?socket是应用层与tcp/ip协议族通信的中间软件抽象层,它是一组接口

         基于tcp的套接字实例:

         server端:

                   import socket

                   phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#买手机

                   phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面

                   phone.bind(('127.0.0.1',8080)) #绑定手机卡 注意这里,传进去的是一个由ip和端口组成的元组

                            这个地方127.0.0.1就限制了客户端服务端必须都在本机上才能用

                   phone.listen(5) #开机 5是挂起数量,ip、端口和挂起数量都应该写在配置文件中,不应该写死在这里

                   while True: #链接循环,是为了可以循环服务,一个客户端终止了就等着再服务新的客户端

                            conn,addr=phone.accept() #等待电话连接 这里拿到的是一个由两个元素组成的元组,分别赋值给conn,addr

                                     conn就是客户端与服务端之间建立的通路,addr就是客户端的地址

                            print('电话线路是',conn)

                            print('客户端的手机号是',addr)

                            while True: #通信循环,对应客户端的通信循环,发数据收数据

                                     try: #应对windows系统

                                               data=conn.recv(1024) #收消息 从自己的缓存中收最大1024个字节的数据

                                               if not data:break #如果一直收到空的内容,linux系统会进入死循环,这样应对

                                                        具体讲是这样的,正常情况下不会发空过来,如果客户端单方面给终止了才会一直发空过来,

                                                        那么windows系统上会直接报错,同try...except处理

                                                        而linux系统上不会报错会进入死循环,死循环对cpu的占用率极高,所以用if判断这句来处理

                                               print('客户端发来的消息是',data)

                                               conn.send(data.upper())

                                     except Exception:

                                               break

                            conn.close()

                   phone.close()

                  

         client端:

                   import socket

                   phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

                   phone.connect(('127.0.0.1',8080)) 注意这里,传进去的是一个由ip和端口组成的元组

                   while True:#通信循环

                            msg=input('>>:').strip()

                            if not msg:continue #判断msg是否为空,为空的话就continue

                            phone.send(msg.encode('utf-8')) #要指定编码方式,转成bytes格式才能进行网络传输

                            data=phone.recv(1024)

                            print(data.decode('utf-8'))

                   phone.close()

                  

         socket套接字方法

         socket实例类:socket.socket(family=AF_INET,type=SOCK_STREAM,proto=0,fileno=None)

         family(socket家族)                                          这两个内容可以忽略

                   socket.AF_INET 用于网络编程,大部分时候都用这个

                   socket.AF_UNIX 用于本机进程间通讯

                  

         socket type类型

                   socket.SOCK_STREAM 用于tcp

                   socket.SOCK_DGRAM 用于udp

                   socket.SOCK_RAW 原始套接字

                   socket.SOCK_RDM 是一种可靠的udp形式,即保证交付数据报但不保证顺序

        

         服务端套接字函数

         s.bind() 绑定(主机,端口号)到套接字

         s.listen() 开始tcp监听

         s.accept() 被动接受tcp客户端的连接,(阻塞式)等待连接的到来

         s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面

        

         客户端套接字函数

         s.connect() 主动初始化tcp服务器连接

         s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常

        

         公共用途的套接字函数

         s.recv() 接收数据

         s.send() 发送数据,如果待发送数据量大于己端缓存区剩余空间,数据会丢失

         s.sendall() 发送完整的tcp数据,本质上就是循环调用send(),直到发完为止

         s.recvfrom() 从套接字接收数据,返回值是(bytes,address)

         s.close() 关闭套接字

         socket.setblocking(flag),设置socket为非阻塞模式

        

         基于udp的套接字实例: 了解

         udp服务端

                   import socket

                   udpserver=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

                   udpserver.bind(('127.0.0.1',8080))

                   while True:#通讯循环 udp协议没有链接,所以没有链接循环

                            data,client_addr=udpserver.recvfrom(1024)

                            print(data.decode('utf-8'))

                            print(client_addr)

                            msg=input('>>:')

                            udpserver.sendto(msg.encode('utf-8'),client_addr)

        

         udp客户端

                   import socket

                   udpclient=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

                   server_ip_port=('127.0.0.1',8080)

                   while True:#通讯循环 udp协议没有链接,所以没有链接循环

                            inp=input('>>:')

                            udpclient.sendto(inp.encode('utf-8'),server_ip_port)

                            data,server_addr=udpclient.recvfrom(1024)

                            print(data.decode('utf-8'))

                           

send与recv

         1.这两者都不是直接接收对方的数据,而是操作自己的操作系统内存,

                   就是send把内容发给自己的操作系统的内存,由操作系统来发,数据到了后先存在操作系统的内存中,而recv是去操作系统的内存中拿

                   这两者没有一一对应关系,不需要一个send对应一个recv  

         2.recv有两个阶段,wait for data阶段和copy data阶段,时间都耗费在第一阶段

                   而send只有一个copy data阶段

 

粘包现象与解决办法

         tcp协议是面向流的协议,容易出现粘包,而udp是面向消息的协议,每个udp字段都是一条消息

         应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,所以udp不会粘包

         粘包问题的发生主要还是接收方不知道消息之间的界限,不知道一次性提取多少数据

         在数据量比较小并且时间间隔非常短的情况下,或数据量非常大超出了接受范围时才可能发生粘包问题

        

         总结:

         1.tcp(transport control protocol 传输控制协议)

         是面向连接的,面向流的,提供高可靠性服务,收发两端一一成对,为了更有效地收发,

         使用了优化算法(Ngale算法),将多次间隔较短且数据量小的数据合并成一个大的数据块,

         然后封包,这样一来,接收方难以分辨数据边界,必须提供科学的拆包机制,

         即面向流的通信没有消息保护边界

        

         2.udp(user datagram protocol 用户数据包协议)

         是无连接的,面向消息的,提供高效率的服务,不会使用块的合并优化算法

         由于udp支持一对多的模式,所以接收端的skbuff(套接字缓冲区)采用链式结构来记录每个到达的udp包

         每个udp包有消息头,对接收端来说,就容易区分处理了

         即面向消息的通信是有消息保护边界的

        

         3.tcp基于数据流,于是收发的消息不能为空,这就需要在客户端服务端都添加空消息处理机制

           udp基于数据报,即是输入空内容,那也不是空消息,udp协议会封装上消息头

        

         用json和struct解决粘包实例:

         server端

                   import socket,subprocess,struct,json

                   phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

                   phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面

                   ip_port=('127.0.0.1',8080)

                   phone.bind(ip_port)

                   phone.listen(5)

                   while True:

                            conn,client_addr=phone.accept()

                            while True:

                                     try:

                                               # 1.收命令

                                               cmd=conn.recv(1024)

                                               if not cmd:break

                                               # 2. 执行命令,拿到结果,使用os.system是拿不到结果的,需要用subprocess模块,使用管道

                                               res=subprocess.Popen(cmd.decode('utf-8'),

                                                                                     shell=True,

                                                                                     stderr=subprocess.PIPE,

                                                                                     stdout=subprocess.PIPE)

                                               # err=res.stderr.read()

                                               # if err:

                                               #     cmd_res=err

                                               # else:

                                               #     cmd_res=res.stdout.read()

                                               # conn.send(cmd_res)

                                               out_res=res.stdout.read()

                                               err_res=res.stderr.read()

                                               data_size=len(out_res)+len(err_res) #数据的长度

                                               head_dic={'data_size':data_size} #做一个字典,数据长度为value

                                               head_json=json.dumps(head_dic) #把包含数据长度的字典转成字符串格式

                                               head_bytes=head_json.encode('utf-8') #把字符串格式的字典转成二进制格式用于网络传输,这个就是报头

                                               #先发送报头长度

                                               head_len=len(head_bytes) #提取报头长度

                                               conn.send(struct.pack('i',head_len)) 使用struct将报头长度转成固定的4个字节发送给客户端,这个i是整型的意思,后面必须跟数字

                                               #发送报头内容

                                               conn.send(head_bytes) #把报头发过去

                                               #最后发送数据部分

                                               #3.把结果返回给客户端

                                               conn.send(out_res)

                                               conn.send(err_res)

                                     except Exception:

                                               break

                            conn.close()

                   phone.close()

        

         client端

                   import socket,struct,json

                   phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

                   ip_port=('127.0.0.1',8080)

                   phone.connect(ip_port)

                   while True:

                            cmd=input('>>:').strip()

                            if not cmd:continue

                            phone.send(bytes(cmd,encoding='utf-8'))

                            #收报头长度

                            head_struct=phone.recv(4)

                            head_len=struct.unpack('i',head_struct)[0] 提取报头长度,解包出来是一个元组的格式,索引值为0的位置就是要的报头长度

                            #收报头内容

                            head_bytes=phone.recv(head_len) #按照报头长度接收,拿到报头部分,是二进制格式的

                            head_json=head_bytes.decode('utf-8') #把二进制格式的报头解码成json字符串格式的

                            head_dic=json.loads(head_json)提取报头内容,把字符串格式的报头内容反序列化为字典格式

                            data_size=head_dic['data_size'] 获取数据长度

                            #收数据

                            recv_size=0

                            recv_data=b''

                            while recv_size < data_size:

                                     data=phone.recv(1024)

                                     recv_size+=len(data)

                                     recv_data+=data

                            print(recv_data.decode('gbk')) #这个地方是个坑,虽然输入cmd的时候用的utf-8,服务端那边解码也是utf-8

                                                                                                       但是因为pycharm是运行在windows系统上,系统的编码是gbk,所以要看到显示

                                                                                                       需要用gbk解码

                   phone.close()

 

                   是不是可以这样理解,先拿到数据长度,再把数据长度做成字典,再把字典先序列化后编码成二进制格式,这个经过加工处理的‘字典’就是报头,包含数据的长度信息

                            然后,len拿到报头的长度,把这个长度struck.pack转成固定4个字节,然后分三次发送,第一次发4个字节,再发报头内容,也就是那个特殊的‘字典’,

                            最后发送数据部分

                           

                            接收的时候,先接收固定4个字节,把4个字节的内容unpack解包,拿到的就是报头长度的信息,然后根据这个报头长度的信息区接收,

                            接收到的内容是二进制格式,先解码成字符串格式,再反序列化成原本的字典格式,再用key取值拿到字典里面关于数据长度的信息,

                            最后根据这个信息去循环收取数据

                           

                   struck可以有两种模式,i与l,i是整型,l是长整形,l的范围大于i

 

操作系统

         操作系统的作用

                   1.隐藏丑陋复杂的硬件接口,提供良好的抽象接口

                   2.管理、调度进程,并且将多个进程对硬件的竞争变得有序

         多道技术

                   1.产生背景:针对单核,实现并发。现在的主机都是多核,每个核都会利用多道技术

                   2.空间上的多路复用:如内存中同时有多道程序

                   3.时间上的多路复用:复用一个cpu的时间片

                            遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来

                            这样才能保证下次切回来时,能基于上次切走的位置继续运行

                   4.空间上的复用最大的问题是:

                            程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制。如果内存彼此不分割,则一个程序可以访问另外一个程序的内存,

                            首先丧失的是安全性,比如你的qq程序可以访问操作系统的内存,这意味着你的qq可以拿到操作系统的所有权限。

                            其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操作系统的内存给回收了,则操作系统崩溃。

 

         第一代计算机:真空管和穿孔卡片

         第二代计算机:晶体管和批处理系统,现代操作系统的前身在这里出现

         第三代计算机:集成电路芯片和多道程序设计,出现了多道技术,但是其操作系统还是批处理系统,并没有使用多道技术,因为还没有解决上面的4中的问题

         第四代计算机:个人计算机

                           

进程理论

         进程是资源单位,线程是执行单位

        

         进程:

                   进程就是正在进行的一个过程或者说一个任务。负责执行任务的是cpu

                   程序仅仅只是一堆代码而已,进程指的是程序的运行过程

                   同一个程序执行两次那也是两个进程

        

         并发与并行

                   无论是并发还是并行,在用户看来都是‘同时’运行的,不管是进程还是线程,

                   都只是一个任务而已,真正干活的是cpu,而一个cpu同一时刻只能执行一个任务

                  

                   并发:伪并行,即看起来是同时运行,单个cpu+多道技术就可以实现

                            即在一个时间段内有很多任务要做,但cpu同一时刻只能做一个任务,那就先做一会1,

                            再做一会2,再做一会3....这就保证了每个人任务都在进行中

                   并行:同时运行,只有具备多个cpu才能实现并行

                  

开启进程

         python中的多线程无法利用多核优势,如果想要充分使用cpu的多核资源python中大部分情况下需要使用多进程

        

         python提供multiprocessing模块,用来开启子进程

        

         与线程不同,进程间没有任何共享状态,进程修改的数据,改动仅限于该进程内,内存空间是隔离的

        

         Process类

                   Process([group[,target[,name[,args[,kwargs]]]]])

                   由该类实例化得到的对象,可用来开启一个子进程

                   需要使用关键字的方式来指定参数

                   args指定的为传给target函数的位置参数,是一个元祖形式,必须有逗号

                   注意:在windows中,Process()必须放在 if __name__ == '__main__'下

                  

                   参数介绍:

                            group 参数未使用,值始终未None

                            target 表示调用对象,即子进程要执行的任务

                            name 为子进程的名称

                            args 表示调用对象的位置参数元组,args=(1,2,'egon')

                            kwargs 表示调用对象的字典,kwargs={'name':'egon','egon':18}

                  

                   方法介绍:

                            p.start() 开启进程,并调用该子进程中的p.run()

                            p.run() 进程启动时运行的方法,正是它去调用target指定的函数,自定义类时必须实现run()

                            p.terminate() 强制终止进程p,不会进行任何清理操作,也是给操作系统发信号,由操作系统进行清理操作

                                     如果p还创建了子进程,那这样操作后其子进程就变成了僵尸进程

                                     如果p还保存了一个锁,那么将不会被释放,进而导致死锁

                            p.is_alive() 如果p仍然运行,返回True

                            p.join([timeout]) 主线程等待p终止,是主线程处于等待状态,p处于运行状态

                                     timeout是可选的超时时间

                  

                   属性介绍:

                            p.daemon 默认值是False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,

                                     p也随之终止,并且设为True后不能创建自己的新进程,必须在p.start()之前设置

                            p.name 进程的名称

                            p.pid 进程的pid

         创建并开启子进程

         方式一:调用Process类

                   import time,random

                   from multiprocessing import Process  # 这个Process一定要是大写开头的才对

                   def piao(name):

                            print('%s is piaoing'%name)

                            time.sleep(random.randrange(1,5))

                            print('%s piao end'%name)

                   if __name__ == '__main__':

                            #实例化得到四个对象

                            p1 = Process(target=piao,args=('egon',)) #注意这里args需要传的值是元组,所以,很重要

                            p2 = Process(target=piao,args=('alex',))  # 也可以用kwargs传字典进去

                            p3 = Process(target=piao,args=('wupeiqi',))

                            p4 = Process(target=piao,args=('yuanhao',))

                            #调用对象下的方法,开启四个进程

                            p1.start()  # 仅仅只是给操作系统发送了一个信号

                            p2.start()  # 并不是先运行p1再运行p2再运行p3最后运行p4这样的顺序

                            p3.start()  # 只是发一个信号,开进程由操作系统来执行

                            p4.start()

                            print('主进程')

        

         方式二:继承Process类,必须自己写run()方法,自己写一个类继承Process类

                   import time,random

                   from multiprocessing import Process  这个Process一定要是大写开头的才对

                   class Piao(Process):

                            def __init__(self,name):

                                     super().__init__() 或 Process.__init__(self) super方法不用写self

                                     self.name=name

                            def run(self):  # 一定要是run()

                                     print('%s is piaoing'%self.name)

                                     time.sleep(random.randrange(1,5))

                                     print('%s piao end'%self.name)

                   if __name__ == '__main__':

                            p1=Piao('egon')

                            p2=Piao('alex')

                            p3=Piao('wupeiqi')

                            p4=Piao('yuanhao')

                            p1.start()

                            p2.start()

                            p3.start()

                            p4.start()

                            print('主进程')

        

         查看pid:

         1.用os模块,os.getpid()查看自己的pid,os.getppid()查看父进程的pid

         2.p.pid

        

         所有的子进程都要经历僵尸进程这个状态,就是子进程执行完毕了后要清理掉的时候,成为僵尸进程,

         保留一点点子进程的消息,以供父进程查看,父进程终结掉时会清理掉僵尸进程,有害

        

         孤儿进程:子进程没终结掉,而父进程终结了,由init进程接收,无害

        

         进程间内存空间是隔离的

        

         基于多进程实现并发的套接字通信,实例:

         server:

                   import socket

                   from multiprocessing import Process

                  

                  

                   def talk(conn):

                            while True:

                                     try:

                                               data = conn.recv(1024)

                                               if not data:

                                                        continue

                                               conn.send(data.upper())

                                     except ConnectionResetError:

                                               break

                            conn.close()

 

 

                   def server(ip, port):

                            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

                            server.bind((ip, port))

                            server.listen(5)

                            while True:

                                     conn, addr = server.accept()

                                     p = Process(target=talk, args=(conn,))

                                     p.start()

                            server.close()

 

 

                   if __name__ == '__main__':

                            server('127.0.0.1', 8800)

                           

         client:

                   import socket

                   client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                   client.connect(('127.0.0.1', 8800))

                   while True:

                            msg = input('>>').strip()

                            if not msg:

                                     continue

                            client.send(msg.encode('utf-8'))

                            data = client.recv(1024)

                            print(data.decode('utf-8'))

 

join方法

         在主进程运行过程中,如果想并发的执行其他的任务,我们可以开启子进程,

                   此时主进程任务与子进程任务分两种情况:

                   1.在主进程任务与子进程任务彼此独立的情况下,主进程的任务先执行完毕后,

                            主进程还需要等待子进程执行完毕,然后统一回收资源

                   2.如果主进程任务执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,

                            此时就需要一种机制能够让主进程检测子进程是否执行完毕,执行完毕就继续执行主进程,

                            未执行完毕,主进程一直阻塞,这种机制就是join方法的作用

        

         join方法实例:

                   import time,random

                   from multiprocessing import Process

                   def task(name):

                            print('%s is piaoing'%name)

                            time.sleep(random.randrange(1,5))

                            print('%s piao end'%name)

                   if __name__ == '__main__':

                            p1 = Process(target=task,args=('egon',))

                            p2 = Process(target=task,args=('alex',))

                            p3 = Process(target=task,args=('wupeiqi',))

                            p4 = Process(target=task,args=('yuanhao',))

                            p1.start() 进程只要start就会开始运行,这里start4个,那么系统中就有4个并发的进程

                            p2.start()

                            p3.start()

                            p4.start()

                            p1.join() join方法是让主进程阻塞等待,p1-p4仍然并发执行

                            p2.join() 所以4个join花费的总时间就是耗费时间最长的那个进程运行的时间

                            p3.join()

                            p4.join()

                            #上面start与join可以简写如下

                            p_l = [p1,p2,p3,p4]

                            for i in p_l:

                                     i.start()

                            for v in p_l:

                                     v.join()

                            print('主进程')

                           

守护进程

         主进程创建子进程,然后将该子进程设置成守护进程

        

         守护进程会在  主进程代码 执行结束后就终止

         守护进程内无法再开启子进程,否则抛异常

        

         如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程就可以了

         如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成

         守护进程,主进程任务执行完,守护进程随即结束

        

         代码实例:

                   import time,random

                   from multiprocessing import Process

                   def task(name):

                            print('%s is piaoing'%name)

                            time.sleep(random.randrange(1,5))

                            print('%s piao end'%name)

                   if __name__ == '__main__':

                            p = Process(target=task,args=('egon',))

                            p.daemon = True 设置守护进程,一定要放在p.start()之前,不然会报错

                            p.start()

                            print('主进程') 这是主进程,只要终端中打印了这一行,守护进程也随之终止

                   这个函数的执行结果是只打印’主进程‘这三个字

 

互斥锁

         针对的是对共享数据的修改,只要对修改共享数据的代码加锁

         原理上就是把并发改成串行,降低了效率,但是保证了数据安全不错乱

         实例:

                   from multiprocessing import Process,Lock

                   import time,os

                   def work(lock):

                            lock.acquire() #加锁

                            print('%s is running'%os.getpid())

                            time.sleep(2)

                            print('%s is done'%os.getpid())

                            lock.release() #释放锁

                   if __name__ == '__main__':

                            lock = Lock()

                            for i in range(3):

                                     p = Process(target=work,args=(lock,))

                                     p.start()

        

         模拟抢票,实例:

         db.txt >>: {"count": 1} 这个count一定要用双引号,不然用json读不出来

        

                   from multiprocessing import Process, Lock

                   import json

                   import time

 

                   def search(name):

                            time.sleep(1)

                            dic = json.load(open('db.txt', 'r', encoding='utf-8'))

                            print('<%s> check number:[%s]' % (name, dic.get('count')))

 

                   def get(name):

                            time.sleep(1)

                            dic = json.load(open('db.txt', 'r', encoding='utf-8'))

                            if dic.get('count') > 0:

                                     dic['count'] -= 1

                                     time.sleep(1)

                                     json.dump(dic, open('db.txt', 'w', encoding='utf-8'))

                                     print('<%s> successful' % name)

                            else:

                                     print('sorry<%s>,no more' % name)

 

                   def task(name, lock):

                            search(name)

                            lock.acquire()

                            get(name)

                            lock.release()

 

                   if __name__ == '__main__':

                            lock = Lock()

                            for i in range(1, 10):

                                     t = Process(target=task, args=('egon %s' % i, lock))

                                     t.start()

 

        

         join是将一个任务整体串行,多个子进程还是并发的,子进程与主进程改成了串行,而互斥锁则是将一个任务中的某一段代码串行

 

队列Queue

         进程彼此之间隔离,要实现进程间通信(IPC),multiprocessing提供两种方式进行消息传递

         队列和管道,都是使用内存空间

        

         创建队列的类(底层就是以管道和锁定的方式实现),队列就是管道+锁

         Queue([maxsize]) 创建共享的进程队列

                   maxsize是队列中允许最大项数,省略则无大小限制

                   队列里面存放的是消息,而非数据,所以不要放大的,要放小的

                   队列占用的是内存空间,所以maxsize即使没有大小限制也受到内存大小的限制

        

         主要方法:

                   q.put() 在队列中插入数据

                   q.get() 可以从队列中读取并删除一个元素

                   q.full() 判断队列满了没有

                   q.empty() 判断队列空了没有

        

         实例:

                   from multiprocessing import Process, Queue

                   q = Queue(3)

                   q.put(1)

                   q.put(2)

                   q.put(3)

                   print(q.full())

                   print(q.get())

                   print(q.get())

                   print(q.get())

                   print(q.empty())

        

 

生产者消费者模型

         生产者指的是生产数据的任务,消费者指的是处理数据的任务

         在并发编程中,可能会出现生产者跟消费者效率不协调的情况,这时候就需要生产者消费者模型

        

         生产者消费者模型:

                   是通过一个容器来解决生产者和消费者的强耦合问题。

                   生产者与消费者之间不直接通讯,而是通过一个阻塞队列进行通讯

                   生产者把数据交给阻塞队列,消费者去阻塞队列那里拿数据

                   阻塞队列相当于一个缓冲区,来平衡生产者与消费者的处理能力,解耦合

                   用队列queue来实现

        

         一种用于生产者消费者模型的队列的机制

         JoinableQueue([maxsize])

         方法与queue类似,q.get() q.put(),其特有的方法:

                   q.task_done() 消费者使用此方法发出信号,表示q.get()的返回项目已经被处理

                   q.join() 生产者调用此方法阻塞,直到队列中所有项目均被处理

 

         实例:

                   from multiprocessing import Process,JoinableQueue

                   import time

 

                   def producer(q,name,food):

                            for i in range(1,10):

                                     time.sleep(2)

                                     res = '%s%s'%(food,i)

                                     q.put(res)

                                     print('----->%s produce %s'%(name,res))

                            q.join() 这里是为了阻塞住子进程,等待q.task_done()发消息

 

                   def consumer(q,name):

                            while True:

                                     res = q.get()

                                     time.sleep(1)

                                     print('--->%s eat %s'%(name,res))

                                     q.task_done()

                                    

                   if __name__ == '__main__':

                            q = JoinableQueue()

                            p1 = Process(target=producer,args=(q,'egon1','baozi'))

                            p2 = Process(target=producer,args=(q,'egon2','gutou'))

                            c1 = Process(target=consumer,args=(q,'alex1',))

                            c2 = Process(target=consumer,args=(q,'alex2',))

                            c1.daemon = True 设置守护进程

                            c2.daemon = True 设置守护进程

                            p1.start()

                            p2.start()

                            c1.start()

                            c2.start()

                            p1.join()       这里是为了阻塞主进程,主进程等待p1 p2结束才会执行 而p1 p2会被q.join阻塞,要等待c1 c2里面的

                            p2.join()       q.task_done()执行,把全部取完的信号发回后才会把p1 p2结束掉,而p1 p2结束了,

                            print('主程序') c1 c2作为消费者也没必要存在了,设置成守护进程

                                                            这样的话,p1,p2执行完了执行主程序,主程序执行完了c1 c2一块跟着结束

 

线程理论

         进程是资源单位,线程是执行单位,线程才是cpu上的执行单位

         进程好比是上海地铁,线程就是2号线,3号线,4号线

         多线程就是一个进程中存在多个线程,多个线程共享该进程的地址空间和所有资源

         创建线程的开销要远小于创建进程的开销

        

开启线程

         方式1:Thread类直接创建

                   import threading

                   def countNum(n):

                            print('running on number:%s'%n)

                   if __name__=='__main__':

                            t1=threading.Thread(target=countNum,args=(23,))

                            t2=threading.Thread(target=countNum,args=(34,))

                            t1.start()

                            t2.start()

                            print('ending')

         或者from threading import Thread,这样的话在创建对象的时候就可以直接Thread(),个人感觉用这个好一点,占内存少,调用也方便

        

         方式2:Thread类继承式创建

                   import threading

                   class MyThread(threading.Thread):

                            def __init__(self,num):

                                     threading.Thread.__init__(self)

                                     self.num=num

                            def run(self):

                                     print('running on number:%s'%self.num)

                   t1=MyThread(56)

                   t2=MyThread(78)

                   t1.start() 调用执行run()方法

                   t2.start()

                   print('ending')

                  

         进程与线程的区别:    

                   1.创建进程的开销大于创建线程的开销

                   2.在主进程下开启多个线程,每个线程的pid都跟主进程一样,而开多个进程,每个进程的pid各不相同

                   3.进程之间的地址空间是隔离的,同一个进程内的多个线程地址空间是共享的,一个线程内的修改会反映到所有的线程中

        

         Thread对象的其他属性或方法,这个是Thread下的方法

                   Thread.isAlive() 返回线程是否活动

                   Thread.getName() 返回线程名

                   Thread.setName() 设置线程名

         threading模块提供的一些方法,这是threading下的方法,与Thread一个级别

                   threading.currentThread() 返回当前的线程变量,可以用这个拿到当前线程变量

                   threading.enumerate() 返回一个包含正在运行的线程的list

                   threading.activeCount() 返回正在运行的线程数量

                                                                 与len(threading.enumerate())结果一样

        

         代码示例:

         join方法阻塞主线程

                   import threading

                   import time

                   def tingge():

                            print('tingge')

                            time.sleep(3)

 

                   def xieboke():

                            print('xieboke')

                            time.sleep(5)

 

                   t1=threading.Thread(target=tingge)

                   t2=threading.Thread(target=xieboke)

                   t1.start()

                   t2.start()

                   t1.join()  #join在子线程完成运行之前,这个子线程的父线程将一直被阻塞

                   t2.join()

                   print('ending')

 

守护线程

         无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后就被销毁

         需要强调的是,运行完毕并非终止运行

                   1.对主进程而言,运行完毕指的是主进程代码运行完毕

                   2.对主线程而言,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕

                            主线程才算运行完毕

        

         代码实例:

                   from threading import Thread

                   import time

                   def sayhi(name):

                            time.sleep(2)

                            print('%s say hi'%name)

                   if __name__ == '__main__':

                            t = Thread(target=sayhi,args=('egon',))

                            t.setDaemon(True) 设置守护线程,放在start之前 或:t.daemon = True

                            t.start()

                            print('主线程')

                   因为sayhi中有sleep(2),所以在主线程print之后,被设置成守护线程的t就终止了

 

基于多线程实现并发的套接字通信,实例:

         server端:

                   import socket

                   from threading import Thread

 

                   def communicate(conn):

                            while True:

                                     try:

                                               data = conn.recv(1024)

                                               if not data:

                                                        break

                                               conn.send(data.upper())

                                     except ConnectionResetError:

                                               break

                            conn.close()

 

                  def server(ip, port):

                            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

                            server.bind((ip, port))

                            server.listen(5)

                            while True:

                                     conn, addr = server.accept()

                                     t = Thread(target=communicate, args=(conn,))

                                     t.start()

                            server.close()

                           

                   if __name__ == '__main__':

                            server('127.0.0.1', 8800)

        

         client端:

                   import socket

                   client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                   client.connect(('127.0.0.1', 8800))

                   while True:

                            msg = input('>>').strip()

                            if not msg:

                                     continue

                            client.send(msg.encode('utf-8'))

                            data = client.recv(1024)

                            print(data.decode('utf-8'))

                   client.close()

 

GIL 全局解释器锁

         在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核

         GIL是加在cpython解释器上的一把互斥锁,只是我们常用的是cpython解释器而已,但是GIL并不是python的锁

         GIL并不是python的特性,而是在实现Cpython时所引进的一个概念,python完全可以不依赖于GIL

         在cpython中,因为GIL的存在无法实现多线程的并行,无法利用多核,但是可以实现并发

        

         GIL本质就是一把互斥锁(mutex),所以就是在把并发改串行

        

         GIL与Lock:

                   首先,锁的目的是为了保护共享数据,同一时间只能有一个线程来修改共享的数据

                   然后,保护不同的数据就应该加不同的锁

                   最后,GIL与Lock保护的数据不一样,GIL保护的是解释器级别的,比如垃圾回收的数据

                            Lock保护用户自己开发的应用程序的数据

        

         互斥锁代码示例:

                   import time,threading

                   # from threading import Thread,Lock 这样写比较好

                   def subNum():

                            global num  #100个线程会同时走到这一步

                            r.acquire()  #加上互斥锁之后,acquire与release之间的内容被上锁,

                            temp=num     #待资源访问结束后才会释放,然后让下一个线程进来

                            time.sleep(0.0001)

                            num=temp-1

                            r.release()

                   num=100

                   thread_list=[]

                   r=threading.Lock()

                   for i in range(100):

                            t=threading.Thread(target=subNum) #产生100个线程

                            t.start()

                            thread_list.append(t)

                   for t in thread_list:

                            t.join()

                   print('Result:',num)

 

         对于计算来说,cpu越多越好,但对与I/O来说,再多的cpu也没用

         对于运行一个程序来说,cpu越多执行效率越高,因为一个程序不会是纯计算或纯I/O

 

         对于I/O密集型,无论是多核还是单核,多线程合适

         对于计算密集型,单核时使用多线程,多核时使用多进程

         即,多进程适合多核时进行计算密集型任务,其余情况都是多线程更合适

 

         多线程适合于I/O密集型,比如socket,爬虫,web

         多进程适合于计算密集型,比如金融分析

         现在都是多核了,所以在多核情况下:计算密集型用多进程,这样才能利用多核优势

                                                                      IO密集型用多线程,因为我们现在做的大都是IO密集型的操作,所以大部分情况下用多线程

        

         代码实例:

    对于计算密集型,多核时的多进程:效率高,8秒多

                   from multiprocessing import Process

                   import time

                   def counter():

                            i = 0

                            for i in range(40000000):

                                     i += 1

                            return True

                   def main():

                            l=[]

                            start_time=time.time()

                            for i in range(2):

                                     t=Process(target=counter)

                                     t.start()

                                     l.append(t)

                            for t in l:

                                     t.join()

                            # counter()

                            # counter()

                            end_time=time.time()

                            print('Total time:{}'.format(end_time-start_time))

                   if __name__=='__main__':

                            main()

        

         对于计算密集型,多核时的多线程:12秒多,效率低于多进程

                   from threading import Thread

                   import time

                   def counter():

                            i = 0

                            for i in range(40000000):

                                     i += 1

                            return True

                   def main():

                            l=[]

                            start_time=time.time()

                            for i in range(2):

                                     t=Thread(target=counter)

                                     t.start()

                                     l.append(t)

                                     t.join()

                            end_time=time.time()

                            print('Total time:{}'.format(end_time-start_time))

                   if __name__=='__main__':

                            main()

 

进程池与线程池

         from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

         concurrent.futures模块提供了高度封装的异步调用接口

         ThreadPoolExecutor 线程池,提供异步调用

         ProcessPoolExecutor 进程池,提供异步调用

         进程池与线程池的用法完全一样

        

         提交任务的两种方式:同步调用与异步调用

                   同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果再执行下一行代码

                   异步调用:提交完任务后,不在原地等待任务执行完毕,直接执行下面的代码

                   实例:

                   1.同步调用,不等于阻塞,阻塞是IO阻塞,同步调用是在等待任务执行结果,跟IO没有必然联系

                            import time

                            import random

                            from concurrent.futures import ThreadPoolExecutor

 

 

                            def la(name):

                                     print('%s is laing' % name)

                                     time.sleep(random.randint(1, 3))

                                     res = random.randint(7, 13) * '#'

                                     return {'name': name, 'res': res}

 

 

                            def weight(shit):

                                     name = shit.get('name')

                                     size = len(shit.get('res'))

                                     print('%s lale <%s>kg' % (name, size))

 

 

                            if __name__ == '__main__':

                                     pool = ThreadPoolExecutor(13)

                                     shit1 = pool.submit(la, 'alex').result()  # 等待执行结果,拿到结果后再往下走

                                     weight(shit1)

                                     shit2 = pool.submit(la, 'wusir').result()

                                     weight(shit2)

                  

                   2.异步调用

                            import time

                            import random

                            from concurrent.futures import ThreadPoolExecutor

 

 

                            def la(name):

                                     print('%s is laing' % name)

                                     time.sleep(random.randint(1, 3))

                                     res = random.randint(7, 13) * '#'

                                     weight({'name': name, 'res': res})

 

 

                            def weight(shit):

                                     name = shit.get('name')

                                     size = len(shit.get('res'))

                                     print('%s lale <%s>kg' % (name, size))

 

 

                            if __name__ == '__main__':

                                     pool = ThreadPoolExecutor(13)

                                     pool.submit(la, 'alex')  # 不拿到结果直接往下走

                                     pool.submit(la, 'wusir')

                                     pool.submit(la, 'yuan')

                           

                            异步调用+回调函数:

                                     import time

                                     import random

                                     from concurrent.futures import ThreadPoolExecutor

 

 

                                     def la(name):

                                               print('%s is laing' % name)

                                               time.sleep(random.randint(1, 3))

                                               res = random.randint(7, 13) * '#'

                                               return {'name': name, 'res': res}

 

 

                                     def weight(shit):

                                               shit = shit.result()  使用回调函数后传进来的shit是一个obj对象,要用obj.result()拿到结果

                                               name = shit.get('name')

                                               size = len(shit.get('res'))

                                               print('%s lale <%s>kg' % (name, size))

 

 

                                     if __name__ == '__main__':

                                               pool = ThreadPoolExecutor(13)

                                               pool.submit(la, 'alex').add_done_callback(weight) weight拿到的是一个对象obj,所以需要用obj.result()拿到结果

                                               pool.submit(la, 'wusir').add_done_callback(weight)

                                               pool.submit(la, 'yuan').add_done_callback(weight)

        

         基本方法:

                   submit(func, *args, **kwargs) 异步提交任务

                   map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作

                   shutdown(wait=True) 相当于进程池的pool.close()+pool.join()的操作

                                                                 wait=True 等待池内所有任务执行完毕回收完资源后才继续

                                                                 wait=False 立即返回,并不会等待池内的任务执行完毕

                                                                  但是不管wait的参数是什么,整个程序都会等到所有任务执行完毕

                                                                 submit与map必须在shutdown之前

                   result(timeout=None) 取得结果

                   add_done_callback(func) 回调函数

                                                                           为进程池或线程池内的每一个进程或线程绑定一个函数func,该函数func在进程或线程的任务执行完毕后自动触发

                                                                           并接受任务的返回值作为参数传给func

        

         实例:

                   from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

                   import os

                   import time

                   import random

 

                   def task(name):

                            print('name:%s pid:%s run' % (name, os.getpid()))

                            time.sleep(random.randint(1, 3))

 

 

                   if __name__ == '__main__':

                            pool = ProcessPoolExecutor(4) # 进程池 指定进程池最大数目就是4个,这样的话最多就是开4个进程,多余的任务等着有进程结束才能执行

                            pool = ThreadPoolExecutor(4)  # 线程池,10个线程的pid都是一样的,而上面进程池的时候10个进程的pid是固定的4个pid

                            for i in range(10):

                                     pool.submit(task, 'egon%s' % i)

                            pool.shutdown(wait=True)  # 会等待所有10个进程都走完才走下面的print

                            print('zhu')

                  

         练习:

                   from concurrent.futures import ThreadPoolExecutor

                   import time

                   def get(url):

                            print('GET %s' % url)

                            response = requests.get(url)

                            time.sleep(2)

                            return {'url': url, 'content': response.text}

 

 

                   def parse(res):

                            res = res.result()

                            print('%s res is %s' % (res.get('url'), len(res.get('content'))))

 

 

                   if __name__ == '__main__':

                            urls = [

                                     'http://www.baidu.com',

                                     'http://www.bilibili.com',

                                     'http://www.zhihu.com',

                                     'http://www.acfun.cn',

                            ]

                            pool = ThreadPoolExecutor(3)

                            for url in urls:

                                     pool.submit(get, url).add_done_callback(parse)

 

基于线程池实现并发的套接字通信

         client:

                   import socket

                   client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                   client.connect(('127.0.0.1', 8800))

                   while True:

                            msg = input('>>').strip()

                            if not msg:

                                     continue

                            client.send(msg.encode('utf-8'))

                            data = client.recv(1024)

                            print(data.decode('utf-8'))

                   client.close()

                  

         server:

                   import socket

                   from concurrent.futures import ThreadPoolExecutor

 

                   def communicate(conn):

                            while True:

                                     try:

                                               data = conn.recv(1024)

                                               if not data:

                                                        break

                                               conn.send(data.upper())

                                     except ConnectionResetError:

                                               break

                            conn.close()

 

                   def server(ip, port):

                            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

                            server.bind((ip, port))

                            server.listen(5)

                            while True:

                                     conn, addr = server.accept()

                                     pool.submit(communicate, conn)

                            server.close()

                           

                   if __name__ == '__main__':

                            pool = ThreadPoolExecutor(2)

                            server('127.0.0.1', 8800)

 

死锁与递归锁

         死锁:两个及以上的进程或线程在执行过程中,因争夺资源而造成的相互等待的现象

         解决办法就是递归锁,为了支持在同一线程中多次请求同一资源,可重入锁RLock

         递归锁可以acquire多次,而互斥锁只能acquire一次

         递归锁内部有一个count变量,记录acquire的计数,只要count的计数不是0,其他线程就抢不到,直到所有的acquire都被release,count为0时才能被其他线程抢到

        

         代码实例:可以多用RLock

                   import time,threading

                   Rlock=threading.RLock()

                   class MyThread(threading.Thread):

                            def __init__(self):

                                     threading.Thread.__init__(self)

                            def run(self):

                                     self.fun1()

                                     self.fun2()

                            def fun1(self):

                                     Rlock.acquire()

                                     print('i am %s,get res:%s---%s'%(self.name,'ResA',time.time()))

                                     Rlock.acquire()

                                     print('i am %s,get res:%s---%s'%(self.name,'ResB',time.time()))

                                     Rlock.release()

                                     Rlock.release()

                            def fun2(self):

                                     Rlock.acquire()

                                     print('i am %s,get res:%s---%s' % (self.name, 'ResB', time.time()))

                                     time.sleep(0.2)

                                     Rlock.acquire()

                                     print('i am %s,get res:%s---%s' % (self.name, 'ResA', time.time()))

                                     Rlock.release()

                                     Rlock.release()

                   if __name__=='__main__':

                            print('start------------------------%s'%time.time())

                            for i in range(0,10):

                                     my_thread=MyThread()

                                     my_thread.start()

                                    

信号量

         也是一把锁,可以指定信号量为5,互斥锁使得同一时间只能有一个任务抢到锁去执行,

         那么信号量同一时间可以有5个任务拿到锁去执行,同一时间可以有多个任务抢到锁去执行

 

         代码实例:

                   from threading import Thread,Semaphore

                   import threading,time

                   def func():

                            sm.acquire()

                            print('%s get sm'%threading.current_thread().getName())

                            time.sleep(3)

                            sm.release()

                   if __name__ == '__main__':

                            sm = Semaphore(5)

                            for i in range(23):

                                     t = Thread(target=func)

                                     t.start()   

                                    

Event

         线程的一个关键特性是每个线程都是独立运行且状态不可预测的  

         如果程序中的线程需要判断某个线程的状态来确定自己下一步的操作,那就用到Event方法

         from threading import Event

         event.isSet() 返回event的状态值

         event.wait() 如果状态值为False将阻塞线程

         event.set() 设置状态值为Ture,所以阻塞池的线程激活进入就绪状态,等待系统调用

         event.clear() 恢复状态值为False

 

         实例:

         1.

                   from threading import Thread, Event

                   import time

                   event = Event()

 

 

                   def student(name):

                            print('%s 正在听课' % name)

                            event.wait(2)  # 设置超时时间,超过这个时间没有拿到set的True就不等了继续往下执行

                            print('%s 课间活动' % name)

 

 

                   def teacher(name):

                            print('%s 正在授课' % name)

                            time.sleep(7)

                            event.set()

 

 

                   if __name__ == '__main__':

                            stu1 = Thread(target=student, args=('alex',))

                            stu2 = Thread(target=student, args=('wusir',))

                            stu3 = Thread(target=student, args=('yuan',))

                            t1 = Thread(target=teacher, args=('egon',))

                            stu1.start()

                            stu2.start()

                            stu3.start()

                            t1.start()

        

         2.

                   from threading import Thread, Event, currentThread

                   import time

                   event = Event()

 

 

                   def conn():

                            n = 0

                            while not event.is_set():

                                     if n == 3:

                                               print('%s try too many times' % currentThread().getName())

                                               return

                                     print('%s try %s' % (currentThread().getName(), n ))

                                     event.wait(0.5)

                                     n += 1

                            print('%s is connected' % currentThread().getName())

 

 

                   def check():

                            print('%s is checking' % currentThread().getName())

                            time.sleep(5)

                            event.set()

 

 

                   if __name__ == '__main__':

                            for i in range(3):

                                     t = Thread(target=conn)

                                     t.start()

                            t = Thread(target=check)

                            t.start()

 

定时器

         指定n秒后执行某操作

        

         代码实例:

         1.

                   from threading import Timer

                   def hello():

                            print('hello world')

                   t = Timer(1,hello) 一秒后,执行hello函数

                   t.start()

        

         2. 做一个4位数的验证码,有效期是5秒,超过5秒不输入就刷新出新的验证码

                   from threading import Timer

                   import random

 

                   class Code:

                            def __init__(self):

                                     self.make_cache()

 

                            def make_cache(self, interval=5):

                                     self.cache = self.make_code()

                                     print(self.cache)

                                     self.t = Timer(interval, self.make_cache)

                                     self.t.start()

 

                            def make_code(self, n=4):

                                     res = ''

                                     for i in range(n):

                                               s1 = str(random.randint(0, 9))

                                               s2 = chr(random.randint(65, 90))

                                               res += random.choice([s1, s2])

                                     return res

 

                            def check(self):

                                     while True:

                                               code = input('>>:').strip()

                                               if code.upper() == self.cache:

                                                        print('right')

                                                        self.t.cancel()

                                                        break

 

                   obj = Code()

                   obj.check()

 

线程queue

         实例:

                   import queue

                   #最常用

                   q=queue.Queue() #默认先进先出(FIFO)括号里面可以设定最大值

                   q.put(111)

                   q.put('hello')

                   q.put(222)

                   print(q.get())

                   print(q.get())

                   print(q.get())

                   print(q.get()) 这里因为只有三个数据,第四次get没有数据,所以会一直阻塞住

 

                   #堆栈,后进先出

                   q1=queue.LifoQueue() last in first out

                   q1.put(111)

                   q1.put(222)

                   q1.put(333)

                   print(q1.get()) 取到的是333

 

 

                   #优先级队列,按照优先级

                   q2=queue.PriorityQueue()

                   q2.put([4,'hello4']) put进去的元素,我目前知道列表或元组,第一个元素规定优先级

                   q2.put([1,'hello'])  通常是数字,数字越小,优先级越高

                   q2.put([2,'hello2'])

                   print(q2.get()) 取到的是[1,'hello']

                   print(q2.get()) 取到的是[2,'hello2']

        

协程

         并发的本质是切换+保存状态

         协程:是单线程下的并发,又称为微线程、纤程Coroutine

         协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的

        

         python的线程属于内核级别的,即由操作系统控制调度

         单线程内开启协程,一旦遇到I/O就会从应用程序级别控制切换

        

         优点:

                   1.协程的切换开销小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

                   2.单线程内就可以实现并发的效果,最大限度利用cpu

         缺点:

                   1.协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程

                   2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

        

         协程必须在只有一个单线程里实现并发

         修改共享数据不需加锁

         用户程序里自己保存多个控制流的上下文栈

         一个协程遇到IO操作会自动切换到其他协程

 

         实例:

                   import time

                   def consumer():

                            r=''

                            while True:

                                     n=yield r

                                     if not n:

                                               return

                                     print('[CONSUMER]<<Consuming%s...'%n)

                                     time.sleep(1)

                                     r='200 OK'

                   def produce(c):

                            next(c)

                            n=0

                            while n<5:

                                     n+=1

                                     print('[PRODUCER]>>Producing %s...'%n)

                                     cr=c.send(n)

                                     print('[PRODUCER] Consumer return:%s'%cr)

                            c.close()

                   if __name__=='__main__':

                            c=consumer()

                            produce(c)

 

greenlet

         在单个线程内有多个任务,需要在多个任务之间切换,yield太麻烦,就用greenlet模块

         代码实例:

                   from greenlet import greenlet

                   def test1():

                            print(12)

                            gr2.switch()

                            print(34)

                            gr2.switch()

                   def test2():

                            print(56)

                            gr1.switch()

                            print(78)

                   gr1=greenlet(test1)

                   gr2=greenlet(test2)

                   gr1.switch()  # 这里可以传参数,只有在第一次switch时可以传参数

        

         但是greenlet不能处理I/O阻塞操作,可以用gevent模块解决

 

gevent

         异步提交任务,就因为是异步提交的,所以才需要用jion方法阻塞住

         协程只有一个单线程,异步提交任务,不join的话,spawn提交了任务就走完了,线程就结束了,可能提交的任务还没开始,就会直接死掉

         所以一定要用join阻塞住主线程,保证线程不死

         用法:

                   g1 = gevent.spawn(func1,1,2,3,4,x=5,y=6) 创建一个协程对象g1,第一个参数放函数名

                            后面的全部都是给该函数名传值的

                   g2 = gevent.spawn(func2)

                   g1.join() 等待g1的结果

                   g2.join()

                   或者上面两个合并:gevent.joinall([gevent.spawn(func1),gevent.spawn(func2)])

                   g1.value 拿到g1的返回值

                  

         代码实例:

                   import gevent,time

                   def foo():

                            print('running in foo')

                            gevent.sleep(2) 用来模拟gevent可以识别的I/O阻塞,time.sleep()不能识别

                            print('switch to foo again')

                   def bar():

                            print('switch to bar')

                            gevent.sleep(5)

                            print('switch to bar again')

                   start=time.time()

                   print(start)

                   #g1 = gevent.spawn(foo)

                   #g2 = gevent.spawn(bar)

                   #g1.join()

                   #g2.join()  # 或用下面的方法

                   gevent.joinall(

                            [gevent.spawn(foo),

                             gevent.spawn(bar)] 里面放列表的形式

                   )

                   print(time.time()-start)

        

         切记:需要加上下面这句代码,gevent才能识别正常的I/O阻塞,不然不能识别time.sleep(1)

         from gevent import monkey;monkey.patch_all()

         干脆就直接记住,要用gevent,就在文件的开头加上这句话!开头!import socket的前面

        

        

         #爬虫

                   import time

                   from gevent import monkey;monkey.patch_all()

                   import gevent

                   from urllib import request

                   def f(url):

                            print('GET:%s'%url)

                            resp=request.urlopen(url)

                            data=resp.read()

                            print('%d bytes received from %s.'%(len(data),url))

                   start=time.time()

                   gevent.joinall([

                            gevent.spawn(f,'https://nba.hupu.com/'),

                            gevent.spawn(f,'https://www.zhihu.com/'),

                            gevent.spawn(f,'https://www.bilibili.com/'),

                   ])

                   print(time.time()-start)

 

基于gevent模块实现并发的套接字通信

         client:

                   import socket

                   from threading import Thread, currentThread

 

                   def client():

                            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                            client.connect(('127.0.0.1', 8800))

                            while True:

                                     client.send(('%s say hello' % currentThread().getName()).encode('utf-8'))

                                     data = client.recv(1024)

                                     print(data.decode('utf-8'))

                            client.close()

 

                   if __name__ == '__main__':

                            for i in range(500):  # 开500个线程,这样的话,就是服务端单线程处理500个并发

                                     t = Thread(target=client)

                                     t.start()

        

         server:

                   from gevent import monkey; monkey.patch_all()

                   import socket

                   import gevent

 

                   def communicate(conn):

                            while True:

                                     try:

                                               data = conn.recv(1024)

                                               if not data:

                                                        break

                                               conn.send(data.upper())

                                     except ConnectionResetError:

                                               break

                            conn.close()

 

                   def server(ip, port):

                            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

                            server.bind((ip, port))

                            server.listen(5)

                            while True:

                                     conn, addr = server.accept()

                                     gevent.spawn(communicate, conn)

                            server.close()

 

                   if __name__ == '__main__':

                            g1 = gevent.spawn(server, '127.0.0.1', 8800)

                            g1.join()

 

                  

其实使用yield、greenl和gevent都是在实现协程,单线程下的并发,yield最麻烦,greenlet稍好一点,这两者都不能处理IO操作

         gevent最好,其底层封装的还是greenlet,可以处理IO操作

 

IO模型

         同步:在发出一个功能调用时,在没有得到结果之前,该调用就不会返回,比如打电话

         异步:在发出一个功能调用时,调用者不能立刻得到结果,比如发短信,通常跟回调机制结合到一起

         阻塞:调用结果返回前,当前线程会被挂起,函数只有得到结果之后才会将阻塞的线程激活

         非阻塞:在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程

        

         当一个read操作发生时,该操作会经历两个阶段:

                   等待数据准备 waiting for the data to be ready

                   将数据从内核拷贝到进程中 copying the data from the kernel to the process

        

         阻塞IO(blocking IO)

                   在linux中,默认情况下,所有的socket都是blocking

                   特点是在IO执行的两个阶段都被block

                   可以用多线程模型来解决小规模的服务请求,大规模的要用的非阻塞IO

        

         非阻塞IO(non-blocking IO)

                   多次发送系统调用,wait for data阶段,数据没准备好会返回error,然后不停的发

                   循环往复的进行recvfrom系统调用,这称之为轮询,轮询检查内核数据,直到数据准备好

                   拷贝数据的过程,进程处于阻塞状态

                   优点:wait for data无阻塞 copy data阻塞

                   缺点:系统调用发送太多,数据不能实时接收到

                  

                   但是,非阻塞IO绝不被推荐

        

         异步IO

                   全程无阻塞

                  

         IO多路复用(IO multiplexing)

                   监听多个链接 select/epoll,优势在于处理多个连接

                   特点:全程阻塞,比阻塞IO多一次系统调用

                              能够监听多个文件描述符(套接字对象),进而实现并发

                  

                   代码实例:

                       这是server端

                            import socket,time,select

                            sock=socket.socket()

                            sock.bind(('127.0.0.1',8800))

                            sock.listen(5)

                            sock.setblocking(False)

                            inputs=[sock,] #这个sock是在有一个clent连接的时候才会有变化

                            while 1:

                                     r,w,e=select.select(inputs,[],[]) #监听的是有变化的套接字

                                     for obj in r:

                                               if obj==sock:

                                                        conn,addr=obj.accept()  #这个conn是在clent发送数据的时候发生变化

                                                        print('conn',conn) #conn是链接的客户端的套接字对象

                                                        inputs.append(conn)

                                               else:

                                                        try:

                                                                 data = obj.recv(1024)

                                                                 print(data.decode('utf-8'))

                                                                 send_data = input('>>:')

                                                                 obj.send(send_data.encode('utf-8'))

                                                        except Exception:

                                                                 inputs.remove(obj)

 

                                                        #在linux上这样写,上面的写法是应用于windows上

                                                        # if not data:

                                                        #     inputs.remove(obj)

                                                        #     continue

        

         在所有的IO模型里面,阻塞IO、非阻塞IO、IO多路复用以及没学的驱动信号,都是同步IO

         只有异步IO是异步的。

         关于同步异步的判断:无论是wait for data 还是 copy data 只要出现阻塞,那就是同步的

 

IO多路复用

         对于windows而言,只有select,对于linux有select poll epoll三种方式

         epoll>poll>select

         select缺点:

                   1.每次调用select都需要把所有的文件描述符fd拷贝到内核空间,导致效率下降;

                   2.每次调用,都遍历所有的fd是否有数据访问,这个过程效率太低,这个问题最重要;

                   3.最大连接数有上限(1024)

                  

         poll:跟select基本一致,但是最大连接数没有限制

 

         epoll:

                   第一个函数:创建epoll句柄,将所有的文件描述符fd拷贝到内存空间,但是只需要拷一次

                   第二个函数:回调函数,是某一个函数或动作成功完成后会触发的函数,

                                               为所有的fd绑定一个回调函数,一旦有数据访问,就会触发该回调函数,

                                               回调函数将fd放到一个链表中

                   第三个函数:判断链表是否为空

                   epoll的最大连接数没有上限(相对的)

 

selectors模块

         三种IO多路复用模型在不同平台有不同支持,

         使用seletors帮我们默认选择当前平台下合适的IO多路复用模型

         代码实例:

                   server端

                   import selectors #基于select模块实现的IO多路复用,建议使用

                   import socket

                   sock=socket.socket()

                   sock.bind(('127.0.0.1',8800))

                   sock.listen(5)

                   sock.setblocking(False)

                    =selectors.DefaultSelector() #根据具体平台选择最佳IO多路复用机制,比如linux上选epoll

 

                   def read(conn,mask):

                            try:

                                     data=conn.recv(1024)

                                     print(data.decode('utf-8'))

                                     send_data=input('>>:')

                                     conn.send(send_data.encode('utf-8'))

                            except Exception:

                                     sel.unregister(conn) #解除绑定,避免出现一个客户端认为关闭服务端崩掉的情况

 

                   def accept(sock,mask):

                            conn,addr=sock.accept()

                            #print(conn)

                            sel.register(conn,selectors.EVENT_READ,read)

 

                   sel.register(sock,selectors.EVENT_READ,accept)  #注册事件

 

                   while 1:

                            #print('waiting...')

                            events=sel.select() #监听

                            for key,mask in events:

                                     #print(key.data)  #当前绑定的方法,比如accept,read

                                     #print(key.fileobj) #当前活动的文件描述符,比如sock,conn

                                     func=key.data

                                     obj=key.fileobj

                                      func(obj,mask)

                                    

                                    

         #官方的列子如下:      

                   import selectors

                   import socket

 

                   sel = selectors.DefaultSelector()

 

                   def accept(sock, mask):

                            conn, addr = sock.accept()  # Should be ready

                            print('accepted', conn, 'from', addr)

                            conn.setblocking(False)

                            sel.register(conn, selectors.EVENT_READ, read)

 

                   def read(conn, mask):

                            data = conn.recv(1000)  # Should be ready

                            if data:

                                     print('echoing', repr(data), 'to', conn)

                                     conn.send(data)  # Hope it won't block

                            else:

                                     print('closing', conn)

                                     sel.unregister(conn)

                                     conn.close()

 

                   sock = socket.socket()

                   sock.bind(('localhost', 1234))

                   sock.listen(100)

                   sock.setblocking(False)

                   sel.register(sock, selectors.EVENT_READ, accept)

 

                   while True:

                            events = sel.select()

                            for key, mask in events:

                                     callback = key.data

                                     callback(key.fileobj, mask)

 

socketserver

         服务端的特点:

                   1.一直运行提供服务,即链接循环,通信循环是基于一个链接的

                   2.绑定一个唯一的地址

        

         socketserver模块分为两大类:server类解决链接问题,request解决通信问题

        

         代码实例:

         server端:

                   import socketserver

                   class FTPserver(socketserver.BaseRequestHandler): #通讯

                            def handle(self):  这个是固定死的,必须定义handle方法,上面的继承也是固定的

                                     print(self)

                                     print(self.request) 独有的request方法,其实就是conn,一个套接字对象

                                     while True:

                                               data=self.request.recv(1024)

                                               print(data)

                                               self.request.send(data.upper())

 

 

                   if __name__=='__main__':

                            obj=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FTPserver) 基于多线程实现并发

                            obj.serve_forever() #链接循环

                           

         client端:

                   import socket

                   phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

                   phone.connect(('127.0.0.1',8080))

                   while True:#通信循环

                            msg=input('>>:').strip()

                            if not msg:continue #判断msg是否为空,为空的话就continue

                            phone.send(msg.encode('utf-8'))

                            data=phone.recv(1024)

                            print(data)

                   phone.close()

转载于:https://www.cnblogs.com/Interstellar-cooper/p/9253922.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_34345560/article/details/93279365

智能推荐

LNMP环境搭建——PHP篇-程序员宅基地

文章浏览阅读95次。一、源代码安装1.编译安装./configure --prefix=/usr/local/php\--with-config-file-path=/usr/local/php/etc --with-bz2 --with-curl \--enable-ftp --enable-sockets --disable-ipv6 --with-gd \--with..._/json.so from install

openerp mysql_Odoo字段(Fields)总结-至2020全-程序员宅基地

文章浏览阅读192次。本文涵盖当前已发行版本中,ODOO的全字段属性总结,包含强制字段与选填字段。 Many2one (多对一)定义模型A具有对模型B的单一引用的关系字段。我们用销售订单中的例子来解释: sale.order(销售订单) 中的合作伙伴ID号 partner_id = fields.Many2one,参考于 res.partner 。这意味着每个销售订单都有一个客户/合作伙伴。 One2many(一对多..._odoo(openerp)数据字典

JAVASE-集合_javase集合-程序员宅基地

文章浏览阅读312次。JAVASE集合内容梳理包括 ArrayList HashMap HashSet List Map TreeMap TreeSet LinkedHashSet LinkedHashMap 红黑树 双向链表 集合_javase集合

Python版软著快速生成60页代码_一个软件生成60页软著代码-程序员宅基地

文章浏览阅读2.3k次,点赞3次,收藏15次。1.导入第三方包,以python-docx为例file->settings->project:***->project interpreter->+->搜索python-docx->Install Package2.输入代码from docx import Documentfrom docx.shared import Ptfrom docx.enum.text import WD_LINE_SPACINGimport reimport os_一个软件生成60页软著代码

Ubuntu18.04安装CUDA10.0+cuDNN7.6.5+Tensorflow1.15教程_ubuntu 安装 cuda tensorflow gpu 1.15-程序员宅基地

文章浏览阅读2.1k次,点赞4次,收藏20次。这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入欢迎使用Markdown编辑器你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Mar_ubuntu 安装 cuda tensorflow gpu 1.15

基于AM5728 DSP+ARM+FPGA的实时工业以太网EtherCAT主站实现_dsp ethercat通信-程序员宅基地

文章浏览阅读2.8k次,点赞2次,收藏18次。针对EtherCAT的数控系统的实现,提出了一种基于EtherCAT技术的实时通信及DSP控制的可行系统方案,构建了EtherCAT主站网络结构,并重点分析了EtherCAT主站的状态机、分布时钟、CoE协议。本实验使用AM57x开发板、Linux-4.4.19内核,提供基于EtherCAT协议控制伺服驱动器,驱动伺服电机运转的方法。实验硬件:评估板: TI AM5728 ID..._dsp ethercat通信

随便推点

Cyanogen 宣布年底关停服务, CM fork 为 Lineage-程序员宅基地

文章浏览阅读80次。Cyanogen 公司已宣布于年底关闭与 CyanogenMod 相关的所有服务。由于 Cyanogen 公司拥有有关 Cyanogen 的所有商标,该社区项目不能再继续运行。因此,CyanogenMod 已经划入 LineageOS.该公司在官方微博发表声明称:“作为 Cyanogen 正在进行的重组的一个环节,所有服务以及 Cyanogen 支持..._lineage fork

shardingjdbc 开发问题集_shardingjdbc问题汇总-程序员宅基地

文章浏览阅读1k次。1、分片键日期不能用hutool工具类转化的日期对象去查询,否则会报错报错内容:ShardingvaluemustimplementsComparable解决方法:由于hutool转化的日期对象DateTime没有实现Comparable接口,所以报错,采用jdk的Date类型即可..._shardingjdbc问题汇总

python 教程 w3 school_Python 模块 | w3cschool菜鸟教程-程序员宅基地

文章浏览阅读1.8k次。Python 模块在前面的几个章节中我们脚本上是用python解释器来编程,如果你从Python解释器退出再进入,那么你定义的所有的方法和变量就都消失了。为此 Python 提供了一个办法,把这些定义存放在文件中,为一些脚本或者交互式的解释器实例使用,这个文件被称为模块。模块是一个包含所有你定义的函数和变量的文件,其后缀名是.py。模块可以被别的程序引入,以使用该模块中的函数等功能。这也是使用py..._python school

php 分页读取pdf,使用 PHP 读取文本(TXT)文件 并分页显示-程序员宅基地

文章浏览阅读202次。搜索热词下面是编程之家 jb51.cc 通过网络收集整理的代码片段。编程之家小编现在分享给大家,也给大家做个参考。session_start();if (empty($page)) {$page=1;}if (isset($_GET['page'])==TRUE) {$page=$_GET['page']; }?>Read Resultif($page){$counter=file_get_..._php 读取pdf指定页

大学物理第二章 质点动力学详解-程序员宅基地

本篇文章介绍了大学物理第二章的内容,包括牛顿运动定律和理想流体的伯努利方程。其中涵盖了牛顿第一定律的惯性参考系以及理想流体的不可压缩流体连续性方程和伯努利方程的内容。

武汉市 教师职称计算机考试题库,2017职称计算机考试综合试题及答案-程序员宅基地

文章浏览阅读132次。2017职称计算机考试综合试题及答案一、选择题C(1)计算机的应用领域可大致分为6个方面,下列选项中属于这几项的是A)计算机辅助教学、专家系统、人工智能 B)工程计算、数据结构、文字处理C)实时控制、科学计算、数据处理 D)数值处理、人工智能、操作系统C(2)CAI表示为A)计算机辅助设计 B)计算机辅助制造 C)计算机辅助教学 D)计算机辅助军事B(3)十进制数269转换为十六进制数为A)10E..._教师职称考试计算机题目

推荐文章

热门文章

相关标签