Python: Python网络协议和编程
- TAGS: Python
Socket 编程基础
主要内容:
- 七层网络模型
- TCP/IP协议
- TCP 协议、UDP 协议对比
- TCP 协议三次握手、四次断开
- IP 地址分类与用途
- Port 端口的用途
- Socket 编程模式
- Socket TCP 编程模式
- Socket UDP 编程模式
- IO 多路复用模型
网络概念
每一个开发工程师,对编程方面的每一个知识都应该做一定的了解。
什么是网络
网络是由节点和连线构成的图,表示诸多对象及其关系。
什么是计算机网络
计算机网络指的是将地理位置不同的具有独立功能的多台计算机及其外部设备, 通过通信线路物理连接(包括有线、无线连接),并在网络操作系统、网络管理 软件和网络通信协议的管理和协调下,实现 资源共享 和 信息传递 的计算机系统。
带宽
在数字设备中,指的是单位时间数据的传输量。
网络传输习惯上使用比特率,即bps每秒传输的二进制位数。
常见的100M网络,实际上指的是理论上的下行速度为100Mbps,换算得12.5MBps。
拓扑
总线型

所有设备都连接到公共总线上,结点间使用广播通信方式。一个结点发出的信息,总线上所有其他结点都可以接收到。一段时间只允许一个结点独占总线。
常见使用同轴电缆连接,总线两端需要终结器。
优点
- 结构简单、易于实现
- 易于扩充,增加或者移除结点比较灵活
- 可靠性较高,个别结点发生故障时,不影响网络中其他结点的正常工作
缺点
- 网络传输能力低,安全性低,总线发生故障时,会导致全网瘫痪
- 所有数据都需要经过总线传输,总线是整个网络的瓶颈。结点数量的增多会影响网络性能
环形结构

环形结构是将联网的计算机由通信线路连接成一个闭合的环,在环形结构网络中信息按照固定方向流动,或顺时针方向,或逆时针方向。
优点
- 令牌控制,没有线路竞争,实时性强,传输控制容易
缺点
- 维护困难,可靠性不高。一个结点发生故障时,可能导致全网瘫痪。可以使用双环拓扑结构,但是复杂性提升
星型拓扑

每个结点都由一条单独的通信线路与中心结点连结。其他各结点都与该中心结点 有着物理链路的直接互连,其他结点直接不能直接通信,其他结点直接的通信需 要该中心结点进行转发。因此中心结点必须有着较强的功能和较高的可靠性。需 要中心设备,例如hub、switch、router
优点
- 可靠性高,方便管理,易于扩展,传输效率高
缺点
- 线路利用率低,中心节点需要很高的可靠性和冗余度
注意, hub工作在一层,这种星型实际上就是芯片化的总线网络。只是物理拓扑结构上感觉像是星型。
OSI参考模型
OSI是Open System Interconnection的缩写,意为开放式系统互联。国际标准化 组织(ISO)制定了OSI模型,该模型定义了不同计算机互联的标准,是设计和描 述计算机网络通信的基本框架。
OSI模型把网络通信的工作分为7层,分别是物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。


- 物理层: 定义了电气规范,设备规范、物理接口等,电信号的变化,或数字信号变化,比特。
- 链路层: 二层。将比特组织成帧,即对字节进行定义,支持错误检查。使用物理地址、MAC地址。MAC有48位,前24位厂商编号由IEEE分配,后24位设备序号。
- 网络层: 三层。将帧组织成包,包传递的路径选择(路由),将包传输到目标地址。使用逻辑地址、IP地址。
- 传输层: 四层。解决传输的问题,确保数据传输的可靠性;建立、维护、终止虚拟电路;错误检测和恢复。
- 会话层: 建立、管理、终止应用程序间的逻辑通路,即会话。
- 表示层: 对应用数据格式化、加密解密等。将上层数据转换为适合网络传输的格式,或将下层数据转化为上。
- 应用层: 七层。为应用程序提供网络服务接口,用户使用的时候并不关心会话如何建立保持,也不关心协议的协商是否加密等。
数据传输
数据很大,在应用层切分,每一份数据都会在下一层被封装 在数据链路层会增加tail即校验位,最后在物理层上都是电平信号0、1发送出去 到了对端设备,由下至上逐层解包组合。直到合成并还原应用层的一份数据
通讯的三种模式
单播 包在计算机网络传输中,目的地址为单一目标的传输方式。每次都是点对点的2个实体间相互通信
广播 数据在网络中传输,目标地址是网络中所有设备的传输方式。所有设备是有范围的,这个范围称为广播域 IP v6不支持广播,由组播替代
多播、组播: 把数据同时传递给一组目的地址。数据源只发出一份数据,会在尽可能远的的设备上复制和分发
冲突域、广播域 参看https://baike.baidu.com/item/%E5%B9%BF%E6%92%AD%E5%9F%9F
冲突域 网络中设备A发送数据时,设备B也发送数据,数据碰撞,发生了冲突,这两个设备就属于同一个冲突域 交换机可以隔离冲突域 路由器可以隔离广播域
局域网LAN
局域网Local Area Network,指的是某一个区域内,多台计算机互联的计算机组 常见组网设备 网线、有线网卡、无线网卡、集线器、交换机、路由器等
网络设备
网络线缆 有线连接,需要使用网线,最早使用同轴电缆,后来使用双绞线,现在高速网络布线可以采用光纤 常用的双绞线使用RJ45水晶头 直通采用两端T568B,互连使用一端T568A一端T568B的交叉线,不过目前新型网卡可以自适应,都使用直通线连接即可
集线器hub 工作在一层。使用HUB连接的设备看似是星型,实际是总线型 它是物理层设备,只认识电信号,所以根本不认识什么MAC地址之类的信息。早期用来多机互连,信号中继的作用户 连入设备越多,广播信号,在一个冲突域,网络效率很低 使用HUB连接的所有设备,都在同一个冲突域 交换机switch 工作在二层。内部记录着MAC表,通过自学习,建立交换机端口和MAC地址对应表。内部有电路交换数据,如同信号立交桥。网桥也工作这一层 路由器Router 工作在三层。内部记录路由表,记录着路由器的端口到 网络 对应关系。这个表可以静态配置,也可以动态更新 功能:分割广播域;选择路由;维护和检查路由信息;连接广域网
广域网WAN
广域网,又称外网、公网。连接不同局域网或城域网的计算机通讯网络
互联网Internet
互联网Internet,也称因特网。前身是美国军用ARPA网,后来连入了很多的科研院校,并逐步商业化走向全球 它连接了覆盖全球的网络,是众多的广域网互联的大型网络 互联网使用了TCP/IP协议
TCP/IP协议栈
TCP/IP,Transmission Control Protocol/Internet Protocol ,传输控制协议/因特网互联协议 它是一个包含很多工作在不同层的协议的协议族,其中最著名的2个协议分别是TCP和IP协议 它最早起源于美国国防部(缩写为DoD)的ARPA网项目,共定义了四层:网络访问层、Internet层、传输层、应用层
TCP/IP协议是事实标准。目前局域网和广域网基本上也都用该协议
传输层协议
TCP UDP 连续类型 面向连接 无连接 可靠性 可靠 不可靠 有序 数据包有序号 没有包序 使用场景 大多数场合,数据不能出任何问题 视频、音频 连接 TCP需要通讯双方预先建立连接,需要三次握手、四次断开 UDP不需要预先建立连接 可靠性 TCP需要确定每一个包是否收到,丢包重发,效率低一些 UDP尽最大努力交付数据,不要要确认数据包,丢包无法知道,也不重复,效率高一些 有序 TCP包有序号,可以进行顺序控制。第一个包序号随机生成,之后的序号都和它有关 UDP包无序,无法纠正,只能在应用层进行验证
TCP协议三次握手/四次断开
三次握手建立连接 Three-way Handshake Client端首先发送一个SYN包告诉Server端我的初始序列号是X Server端收到SYN包后回复给Client一个ACK确认包,告诉Client说我收到了。Server端也需要告诉Client端自己的初始序列号,于是Server也发送一个SYN包告诉Client我的初始序列号是Y。这两个包一起发送 Client收到后,回复Server一个ACK确认包说我知道了
四次断开 Client发送一个FIN包来告诉Server需要断开 Server收到后回复一个ACK确认FIN包收到 Server在自己也没数据发送给Client后,Server也发送一个FIN包给Client,表示也无数据发送 Client收到后,就会回复一个ACK确认Server的FIN包 主动发出Fin包就是主动关闭方,就会进入TIME_WAIT,原因是被动关闭方发来的FIN包需要确认,万一此包丢失,被动关闭方未收到确认会超时重发FIN包,主动关闭方还在,可以重发ACK。
IP地址
IP地址是IP协议提供的一种同一个地址格式,它为互联网上的网络设备分配一个用来通信的逻辑地址 目前分为IP v4和IP v6
IP v4
IP v4 是一个32位二进制数,不便记忆,为了使用方便,使用“点分十进制”表示法,将这个二进制数每8位断开,每8位是一个字节,一个字节表示的十进制正整数范围是0~255 IP v4地址早期比较充足,随着全球连入互联网,在2011年IP v4地址分配完毕 IP地址的分类 公有地址: 需向因特网信息中心申请,在互联网上可以直接使用的IP地址
私有地址: 不需要注册,可以组织内部网络使用的IP地址
IP地址这个数被分成2部分,即网络位 + 主机位
网络位表示设备同属一个网络;主机位表示网络中不同的设备的唯一ID
子网掩码 子网掩码将IP地址划分为网络ID和主机ID IP地址 位与 子网掩码就是网络ID IP v4地址被分为A、B、C、D、E五类
A类 最高位是 0 第一字节(最高字节)为网络位。第一个字节变化为 0000 0001 到 0111 1111,共127,减去回环地址,剩余126个网络 A类IP地址范围1.0.0.0到127.255.255.255。二进制表示为: 00000001 00000000 00000000 00000000 至 01111111 11111111 11111111 11111111。最后一个是广播地址。 子网掩码 255.0.0.0 每一个网络中主机个数等于 256³ - 2 = 1677716 -2 = 1677714 B类 最高位是10 前2个字节为网络位,其变化为128.0~191.255,相当于 1000 0000 0000 0000 到 1011 1111 1111 1111,实际上就是后14位变化,共2^14 = 16384个网络 B类IP地址范围128.0.0.0-191.255.255.255 。二进制表示为: 10000000 00000000 00000000 00000000 至 10111111 11111111 11111111 11111111 最后一个是广播地址 子网掩码255.255.0.0 每一个网络中主机个数等于 256² - 2 = 65535 -2 = 65534 C类 最高位是110 前3个字节为网络位,其变化为192.0.0~223.255.255,相当于 1100 0000 0000 0000 0000 0000 到 1101 1111 1111 1111 1111 1111 ,实际上就是后21位变化,共2^21 = 2097152个网络。
C类IP地址地址范围192.0.0.0-223.255.255.255 。二进制表示为: 11000000 00000000 00000000 00000000 至 11011111 11111111 11111111 11111111。 最后一个是广播地址 子网掩码255.255.255.0 每一个网络中主机个数等于 256 -2 = 254 D类 多播地址,或组播地址 多播地址最高4位必须是1110,那么地址范围就是224.0.0.0到239.255.255.255 224.0.0.1特指所有主机
E类 实验用地址 特殊IP地址 0.0.0.0表示当前主机 255.255.255.255 限制广播地址。路由器不会转发这个受限广播地址的数据报文,此地址只能用于本网广播 IP地址中以127开头的地址称为Loopback回环地址 169.254.x.x,windows主机使用了DHCP获取IP,但没有获得地址,windows会临时获得这样的地址 网关GATEWAY 网关(Gateway)又称网间连接器、协议转换器。网关在网络层以上实现网络互连,用于网络间互联 举例 IP地址192.168.3.200,要配合子网掩码使用,假设子网掩码为255.255.255.0,说明它是C类地址 网络ID为192.168.3.0,广播地址为192.168.3.255 剩余192.168.3.1~192.168.3.254能够分配给网络中其他设备 网关地址配置一般习惯使用1、100、254等。本例使用192.168.3.1 其作用是连接不同的网络,也称为处在不同的网段
又有一个IP地址为192.168.100.10/24,它也是C类地址,/16指B类地址,/8指A类地址。网络ID是192.168.100.0 和上面的IP处在不同的网络,这两个地址的主机通信,就需要使用网关,由网关将数据包转发到另一个网络
IP v6
互联网上的公有地址在2011年分配完,而随着互联网的发展,接入设备越来越多,尤其是物联网的到来,此问题必须解决。由此,提出了IP v6 IP v6采用128位二进制数表示,基本解决IP地址短缺现象,同时,该协议还解决原有协议的诸多问题
路由Routing
跨网络通信就需要使用路由,通过路由器将数据包从一个网络发往另一个网络 路由器上维护着路由表,它知道如何将数据包发往另外的网络 windows使用 route print ,Linux使用 route -n 可以查看路由表 路由器所有端口都有自己的IP地址,这些IP地址往往处在不同的网络,所以,路由器连接了不同了网络 路由表中记录着路由设备所有端口对应的网络,分为静态、动态配置 静态路由:由管理员手动配置的固定的路由信息 动态路由:网络中的路由器,根据实时网络拓扑变化,相互通信传递路由信息,利用这些路由信息通过路由选择协议动态计算,并更新路由表。常见的协议有RIP、OSPF等等
网关:下一跳地址,就是到下一个网络,从哪个网关出去 到192.168.0.0/24和10.0.0.0/8网络,R1本身就直接连接着这些网络,所以网关为空,不需要 到172.16.0.0/16网络需要找到R2,所以写R2的接口1地址即可
DHCP
动态主机设置协议(Dynamic Host Configuration Protocol,DHCP)是一个局域网的网络协议,基于UDP协议工作 主要用途就是用于内部网或网络服务供应商自动给网络中的主机分配IP地址
网络编程
Socket介绍
Socket套接字 Python中提供socket.py标准库,非常底层的接口库 Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系 协议族 AF表示Address Family,用于socket()第一个参数 名称 含义 AF_INET IPV4 AF_INET6 IPV6 AF_UNIX Unix Domain Socket, windows没有 Socket类型 名称 含义 SOCK_STREAM 面向连接的流套接字。默认值,TCP协议 SOCK_DGRAM 无连接的数据文套戒子。UDP协议
TCP编程
Socket编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为Server,客户端称为Client 这种编程模式也称为 CS编程
TCP服务端编程
服务端编程步骤
创建Socket对象
绑定IP地址Address和端口Port。使用bind()方法 IPv4地址为一个二元组('IP地址字符串‘, Port)
开始监听,将再指定的IP端口上监听。使用listen()方法
获取用于传送数据的socket对象 socket.accrpt() -> (socket object, address info) accept方法阻塞等待客户端创立连接,返回一个新的Socket对象和客户端地址的二元组 地址是远程客户端的地址,IPv4中它是一个二元组(clientaddr, port) 接受数据
recv(bufsize[, flags]) 使用缓冲区接受数据 发送数据
send(bytes) 发送数据,必须传入一个bytes
问题 两次绑定同一个监听端口会怎么样?
import socket
s = socket.socket() # 创建socket对象,TCP连接 IPv4 ip = '127.0.0.1' # 本机回环地址 port = 9999 # TCP 65536 s.bind((ip, port)) # 一个地址和端口二元组 s.listen() # 开始监听,等待客户端连接到来
s1, info = s.accept() # 阻塞直到和客户端成功建立连接,返回一个新的socket对象和客户端地址 print(s1, info)
data = s1.recv(1024) # 阻塞 print(info, data) s1.send(b'hello world') s1.close()
s2, info = s.accept() data = s2.recv(1024) print(info, data) s2.send(b'hello python') s2.close()
s.close()
上例accept和recv是阻塞的,主线程经常被阻塞住而不能工作
客户端操作 使用测试工具模拟客户端进行测试
灰色,需点连接才可成功连接
查看监听端口 windoes 命令 netstat -anp tcp | findstr 9999 linux 命令 netstat -tanl | grep 9999 或 ss -tanl | grep 9999 ss命令没有可使用pip install ss自行安装 注意 IP 127.0.0.1 指本机回环地址,永远指向本机 port 端口 linux 用1000以上,这是一个两字节数,范围[0,65535] ,共65536种状态,当前此协议的此端口不能被别人占用 server.bild() 绑定一个二元组,不可多次绑定同一个端口 listen() 监听,不可多次监听,真正的显示出端口 打开的资源必须关闭,会占用文件描述符 fd 实现服务端主动断开连接 收发各一次
import socket
server = socket.socket() # TCP 连接 IPv4 ip = '127.0.0.1' port = 9999 # TCP 65536 server.bind((ip, port)) # address
server.listen() # 真正的显示出端口 print(server)
newsock, clientinfo = server.accept() # 默认阻塞
data = newsock.recv(1024) # 阻塞,buffer空 print(data) newsock.send('server acl. data={}'.format(data).encode()) # buffer 满 newsock.close()
server.close()
执行结果
服务端控制台结果 <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)> b'hello world'
Process finished with exit code 0
客户端结果
服务端循环接受客户端信息
import socket
server = socket.socket() # TCP 连接 IPv4 ip = '127.0.0.1' port = 9999 # TCP 65536 server.bind((ip, port)) # address
server.listen() # 真正的显示出端口 print(server)
newsock, clientinfo = server.accept() # 默认阻塞 print(newsock) print('new1', clientinfo) while True: data = newsock.recv(1024) # 阻塞,buffer空 print(data) newsock.send('server acl. data={}'.format(data).encode()) # buffer 满 newsock.close()
server.close()
执行结果
服务端控制台结果 <socket.socket fd=552, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)> <socket.socket fd=556, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 54481)> new1 ('127.0.0.1', 54481) b'hello world' b'hello python' b'' Traceback (most recent call last): File "C:/Users/mayn/PycharmProjects/mypython/m/t1.py", line 16, in <module> data = newsock.recv(1024) # 阻塞,buffer空 ConnectionAbortedError: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
Process finished with exit code 1
客户端结果
因上例代码中未设置循环退出条件,故手动中断连接会抛异常
实战 – 写一个群聊程序
- 需求分析
聊天工具是CS程序,C是每一个客户端client,S是服务器端server。 服务器应该具有的功能:
启动服务,包括绑定地址和端口,并监听 建立连接,能和多个客户端建立连接 接收不同用户的信息 分发,将接收的某个用户的信息转发到已连接的所有客户端 停止服务 记录连接的客户端
- 代码实现
服务端应该对应一个类 class ChatServer: def __init__(self, ip, port): # 启动服务 self.sock = socket.socket() self.addr = (ip, port)
def start(self): # 启动监听 pass
def accept(self): # 多人连接 pass
def recv(self): # 接受客户端数据 pass
def stop(self): # 停止服务 pass
在此基础上,扩展完成 import logging import socket import threading import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): # 启动服务 self.sock = socket.socket() self.addr = (ip, port) self.clients = {} # 客户端
def start(self): # 启动监听 self.sock.bind(self.addr) # 绑定 self.sock.listen() # 监听
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接 while True: sock, client = self.sock.accept() # 阻塞 self.clients[client] = sock # 添加到客户端字典
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock:socket.socket, client): # 接受客户端数据 while True: data = sock.recv(1024) # 阻塞到数据到来 msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format( datetime.datetime.now(), *client, data.decode()) logging.info(msg) msg = msg.encode() for s in self.clients.values(): s.send(msg)
def stop(self): # 停止服务 for s in self.clients.values(): s.close() self.sock.close()
cs = ChatServer() cs.start()
基本功能完成,但是有问题。使用Event改进 import logging import socket import threading import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): # 启动服务 self.sock = socket.socket() self.addr = (ip, port) self.clients = {} # 客户端 self.event = threading.Event()
def start(self): # 启动监听 self.sock.bind(self.addr) # 绑定 self.sock.listen() # 监听
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接 while not self.event.is_set(): sock, client = self.sock.accept() # 阻塞 self.clients[client] = sock # 添加到客户端字典
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock:socket.socket, client): # 接受客户端数据 while not self.event.is_set(): data = sock.recv(1024) # 阻塞到数据到来 msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format( datetime.datetime.now(), *client, data.decode()) logging.info(msg) msg = msg.encode() for s in self.clients.values(): s.send(msg)
def stop(self): # 停止服务 self.event.set() for s in self.clients.values(): s.close() self.sock.close()
cs = ChatServer() cs.start()
while True: cmd = input('>>').strip() if cmd =='quit': cs.stop() threading.Event().wait(3) break
这一版基本能用了,测试通过。但是还有要完善的地方 例如各种异常的判断,客户端断开连接后字典中的移除客户端数据等
- 客户端主动断开带来的问题
服务端知道自己何时断开,如果客户端断开,服务器不知道。(客户端主动断开,服务端recv会得到一个空串) 所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发送一个空消息,超时返回异常,捕获异常并清理连接 即使为客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能 增加客户端退出命令
import logging import socket import threading import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): # 启动服务 self.sock = socket.socket() self.addr = (ip, port) self.clients = {} # 客户端 self.event = threading.Event()
def start(self): # 启动监听 self.sock.bind(self.addr) # 绑定 self.sock.listen() # 监听
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接 while not self.event.is_set(): sock, client = self.sock.accept() # 阻塞 self.clients[client] = sock # 添加到客户端字典
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock:socket.socket, client): # 接受客户端数据 while not self.event.is_set(): data = sock.recv(1024) # 阻塞到数据到来 msg = data.decode().strip()
if msg
= 'quit' or msg =
'': # 主动断开得到空串 self.clients.pop(client) sock.close() logging.info('{} quits'.format(client)) breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format( datetime.datetime.now(), *client, data.decode()) logging.info(msg) msg = msg.encode() for s in self.clients.values(): s.send(msg)
def stop(self): # 停止服务 self.event.set() for s in self.clients.values(): s.close() self.sock.close()
cs = ChatServer() cs.start()
while True: cmd = input('>>').strip() if cmd =='quit': cs.stop() threading.Event().wait(3) break logging.info(threading.enumerate()) # 用来观察断开后线程的变化
程序还是有瑕疵,但是业务基本功能完成了 注意:
由于GIL和内置数据结构的读写原子性,单独操作字典的某一项item是安全的。但是遍历过程是线程不安全的,遍历中有可能被打断,其他线程如果对字典元素进行增加、弹出,都会影响字典的size,就会抛出异常。所以还是要加锁Lock 加锁后代码如下 import logging import socket import threading import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): # 启动服务 self.sock = socket.socket() self.addr = (ip, port) self.clients = {} # 客户端 self.event = threading.Event() self.lock = threading.Lock()
def start(self): # 启动监听 self.sock.bind(self.addr) # 绑定 self.sock.listen() # 监听
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接 while not self.event.is_set(): sock, client = self.sock.accept() # 阻塞 with self.lock: self.clients[client] = sock # 添加到客户端字典
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock:socket.socket, client): # 接受客户端数据 while not self.event.is_set(): data = sock.recv(1024) # 阻塞到数据到来 msg = data.decode().strip()
if msg
= 'quit' or msg =
'': # 主动断开得到空串 with self.lock: self.clients.pop(client) sock.close() logging.info('{} quits'.format(client)) breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format( datetime.datetime.now(), *client, data.decode()) logging.info(msg) msg = msg.encode()
with self.lock: for s in self.clients.values(): s.send(msg)
def stop(self): # 停止服务 self.event.set() with self.lock: for s in self.clients.values(): s.close() self.sock.close()
cs = ChatServer() cs.start()
while True: cmd = input('>>').strip() if cmd =='quit': cs.stop() threading.Event().wait(3) break logging.info(threading.enumerate()) # 用来观察断开后线程的变化 logging.info(cs.clients)
socket常用方法
名称 含义 socket.recv(bufsize[, flags]) 获取数据。默认是阻塞的方式 socket.recvfrom(bufsize[, flags]) 获取数据,返回一个二元组(bytes, address) socket.recv_into(buffer[, nbytes[, flags]]) 获取到nbytes的数据后,存储到buffer中。如果nbytes没有指定或0,将buffer大小的数据存入buffer中。返回接收的字节数 socket.recvfrom_into(buffer[, nbytes[, flags]]) 获取数据,返回一个二元组(bytes, address)到buffer中 socket.send(bytes[, flags]) TCP发送数据 socket.sendall(bytes[, flags]) TCP发送全部数据,成功返回None socket.sendto(string[,flag],address) UDP发送数据 socket.sendfile(file, offset=0, count=None) 发送一个文件直到EOF,使用高性能的os.sendfile机制,返回发送的字节数。如果win下不支持sendfile,或者不是普通文件,使用send()发送文件。offset告诉起始位置。3.5版本开始 名称 含义 socket.getpeername() 返回连接套接字的远程地址。返回值通常是元组(ipaddr,port) socket.getsockname() 返回套接字自己的地址。通常是一个元组(ipaddr,port) socket.setblocking(flag) 如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值) 非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常 socket.settimeout(value) 设置套接字操作的超时期,timeout是一个浮点数,单位是秒 值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect() socket.setsockopt(level,optname,value) 设置套接字选项的值。比如缓冲区大小。太多了,去看文档 不同系统,不同版本都不尽相同
MakeFile
socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None) 创建一个与该套接字相关连的文件对象,将recv方法看做读方法,将send方法看做写方法
import socket
server = socket.socket() server.bind(('127.0.0.1', 9999)) server.listen()
s, _ server.accept()
f = s.makefile(mode
'rw')
print(s.getpeername()) print(s.getsockname())
f = s.makefile('rw') data = f.read(10) # 按行读取要使用readline方法 print(data) f.write('return your msg: {}'.format(data)) f.flush()
f.close() print(f.closed, s._closed) s.close() print(f.closed, s._closed)
server.close()
服务端控制台执行结果 ('127.0.0.1', 62161) ('127.0.0.1', 9999) 1234567890 True False True True
Process finished with exit code 0
客户端执行结果
- makefile练习
使用makefile改写群聊类 import logging import socket import threading import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): # 启动服务 self.sock = socket.socket() self.addr = (ip, port) self.clients = {} # 客户端 self.event = threading.Event() self.lock = threading.Lock()
def start(self): # 启动监听 self.sock.bind(self.addr) # 绑定 self.sock.listen() # 监听
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接 while not self.event.is_set(): sock, client = self.sock.accept() # 阻塞 f = sock.makefile('rw') # 支持读写 with self.lock: self.clients[client] = f, socket # 添加到客户端字典
threading.Thread(target=self.recv, args=(f, client)).start()
def recv(self, f, client): # 接受客户端数据 while not self.event.is_set(): data = f.readline() # 阻塞等一行来,即换行符 msg = data.strip() print(msg, '
~~~~~~~~~
')if msg
= 'quit' or msg =
'': # 主动断开得到空串 with self.lock: _, sock = self.clients.pop(client) f.close() sock.close() logging.info('{} quits'.format(client)) break msg = "{:%Y/%m/%d %H:%M:%S {}:{}\n{}\n}".format( datetime.datetime.now(), *client, data) logging.info(msg)with self.lock: for ff,_ in self.clients.values(): ff.write(msg) ff.flush()
def stop(self): # 停止服务 self.event.set() with self.lock: for f, s in self.clients.values(): f.close() s.close() self.sock.close()
cs = ChatServer() cs.start()
while True: cmd = input('>>').strip() if cmd == 'quit': cs.stop() threading.Event().wait(3) break logging.info(threading.enumerate()) # 用来观察断开后线程的变化 logging.info(cs.clients)
上例完成了基本功能,但是,如果客户端主动断开,或者readline出现异常,就不会从clients中移除作废的socket。可以使用异常处理解决这个问题
ChatServer实验用完整代码
注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。 增加一些异常处理。
import logging import socket import threading import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): # 启动服务 self.sock = socket.socket() self.addr = (ip, port) self.clients = {} # 客户端 self.event = threading.Event() self.lock = threading.Lock()
def start(self): # 启动监听 self.sock.bind(self.addr) # 绑定 self.sock.listen() # 监听
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接 while not self.event.is_set(): sock, client = self.sock.accept() # 阻塞 f = sock.makefile('rw') # 支持读写 with self.lock: self.clients[client] = f, sock # 添加到客户端字典
threading.Thread(target=self.recv, args=(f, client)).start()
def recv(self, f, client): # 接收客户端数据 while not self.event.is_set(): try: # 异常处理 data = f.readline() # 阻塞等一行来,换行符 except Exception as e: logging.error(e) data = 'quit'
msg = data.strip()
if msg = 'quit' or msg =
'': # 主动断开得到空船
with self.lock:
_, sock = self.clients.pop(client)
f.close()
sock.close()
logging.info('{} quits'.format(client))
break
msg = '{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n'.format(
datetime.datetime.now(), *client, data)
logging.info(msg)
with self.lock: for ff,_ in self.clients.values(): ff.write(msg) ff.flush()
def stop(self): # 停止服务 self.event.set() with self.lock: for f, s in self.clients.values(): f.close() s.close() self.sock.close()
def main(): cs = ChatServer() cs.start()
while True: cmd = input('>>').strip() if cmd == 'quit': cs.stop() threading.Event().wait(3) break logging.info(threading.enumerate()) # 用来观察断开后线程的变化 logging.info(cs.clients)
if name == 'main': main()
TCP客户端编程
客户端编程步骤
创建Socket对象 连接到远端服务端的ip和port,connect()方法 传输数据 使用send、recv方法发送、接收数据 关闭连接,释放资源 import socket
client = socket.socket() ipaddr = ('127.0.0.1', 9999) client.connect(ipaddr) # 直接连接服务器
client.send(b'abcd\n') data = client.recv(1024) # 阻塞等待 print(data)
client.close()
开始编写客户端类 import socket import threading import datetime import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatClient: def __init__(self, ip='127.0.0.1', port=9999): self.sock = socket.socket() self.addr = (ip, port) self.event = threading.Event()
def start(self): # 启动对远端服务器的链接 self.sock.connect(self.addr) self.send("I'm ready.")
threading.Thread(target=self.recv, name="recv").start()
def recv(self): # 接收客户端的数据 while not self.event.is_set(): try: data = self.sock.recv(1024) # 阻塞 except Exception as e: logging.error(e) break msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format( datetime.datetime.now(), *self.addr, data.strip()) logging.info(msg)
def send(self, msg:str): data = "{}\n".format(msg.strip().encode()) # 服务器需要一个换行符 self.sock.send(data)
def stop(self): self.sock.close() self.event.wait(3) self.event.set() logging.info('Client stops.')
def main(): cc = ChatClient() cc.start() while True: cmd = input('>>') if cmd.strip() == 'quit': cc.stop() break cc.send(cmd) # 发送消息
if name == 'main': main()
同样,这样的客户端还是有些问题的,仅用于测试
UDP编程
UDP服务端编程流程
创建socket对象。type=socket.Sock_DGRAM 绑定IP和Port,bing()方法 传输数据 接受数据,socket.recvform(bufsize[flags]),获得一个二元组(string,address) 发送数据,socket.sendto(string, address)发给某地址某信息 释放资源 import socket
server = socket.socket(type=socket.SOCK_DGRAM)
server.bing(('0.0.0.0', 9999)) # 立即绑定一个udp端口 data = server.recv(1024) # 阻塞等待数据 data = server.recvfrom(1024) # 阻塞等待数据(value, (ip, port)) server.sendto(b'hello', ('192.168.142.1', 10000))
server.close()
UDP客户端编程流程 创建socket对象。type=socket.Sock_DGRAM 发送数据,socket.sendto(string, address)发给某地址某信息 接受数据,socket.recvform(bufsize[flags]),获得一个二元组(string,address) 释放资源 import socket client = socket.socket(type=socket.SOCK_DGRAM) raddr = ('192.168.142.1', 9999)
client.connect(raddr) client.sendto(b'hello', raddr) client.send(b'hello') data = client.recvfrom(1024) # 阻塞等待数据(value, (ip, port)) data = client.recv(1024) # 阻塞等待数据
client.close()
注意:UDP是无连接协议,所以可以只有任何一端 例如客户端数据发往服务端,服务端存在与否无所谓 UDP编程中bind、connect、send、sendto、recv、recvfrom方法使用 UDP的socket对象创建后,是没有占用本地地址和端口的 方法 说明 bind方法 可以指定本地地址和端口laddr,会立即占用 connect方法 可以立即占用本地地址和端口laddr,填充远程地址和端口raddr sendto方法 可以立即占用本地地址和端口laddr,并把数据发往指定远端。 只有有了本地绑定端口,sendto就可以向任何远端发送数据 send方法 需要和connect方法配合, 可以使用已经从本地端口把数据发往raddr指定的远端 recv方法 要求一定要在占用了本地端口后,返回接受的数据 recvfrom方法 要求一定要占用了本地端口后,返回接受的数据和对端地址的二元祖
练习 –UDP版群聊
UDP版群聊服务端代码
服务端类的基本架构 class ChatUDPServer: def __init__(self, ip='127.0.0.1', port=9999): self.addr = (ip, port) self.sock = socket.socket(type=socket.SOCK_DGRAM)
def start(self): self.sock.bind(self.addr) # 立即绑定 self.sock.recvfrom(1024) # 阻塞接受数据
def stop(self): self.sock.close()
完整版版代码 import socket import threading import datetime import logging
FORMAT = "%(asctime)s %(threadName) %(thread) %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUDPServer: def __init__(self, ip='127.0.0.1', port=9999): self.addr = (ip, port) self.sock = socket.socket(type=socket.SOCK_DGRAM) self.clients = set() # 记录客户端 self.event = threading.Event()
def start(self): self.sock.bind(self.addr) # 立即绑定
threading.Thread(target=self.recv, name='recv').start()
def recv(self): while not self.event.is_set(): data , raddr = self.sock.recvfrom(1024) # 阻塞接受数据
if data.strip() == b'quit':
if raddr in self.clients: self.clients.remove(raddr) logging.info('{} leaving'.format(raddr)) continue
self.clients.add(raddr)
msg = '{}. from {}:{}'.format(data.decode(), *raddr) logging.info(msg) msg = msg.encode()
for c in self.clients: self.sock.sendto(msg, c) # 不保证对方能够收到
def stop(self): for c in self.clients: self.sock.sendto(b'bye', c) self.sock.close() self.event.set()
def main(): cs = ChatUDPServer() cs.start()
while True: cmd = input('>>>') if cmd.strip() == 'quit': cs.stop() break logging.info(threading.enumerate()) logging.info(cs.clients)
if name == 'main': main()
UDP版群聊客户端代码
import socket import threading import datetime import logging
FORMAT = "%(asctime)s %(threadName) %(thread) %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUdpClient: def __init__(self, rip='127.0.0.1', rport=9999): self.sock = socket.socket(type=socket.SOCK_DGRAM) self.raddr = (rip, rport) self.event = threading.Event()
def start(self): self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口 threading.Thread(target=self.recv, name='recv').start()
def recv(self): while not self.event.is_set(): data , raddr = self.sock.recvfrom(1024)
msg = '{}. from {}:{}'.format(data.decode(), *raddr) logging.info(msg)
def send(self, msg:str): self.sock.sendto(msg.encode(), self.raddr)
def stop(self): self.event.set() self.send('quit') # 通知服务端退出 self.sock.close()
def main(): cc1 = ChatUdpClient() cc2 = ChatUdpClient() cc1.start() cc2.start() print(cc1.sock) print(cc2.sock)
while True: cmd = input('Input your words >>') if cmd.strip() == 'quit': cc1.stop() cc2.stop() break cc1.send(cmd) cc2.send(cmd)
if name == 'main': main()
上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。 问题:服务端如何知道客户端断开了呢?
代码改进 服务端代码改进 增加心跳heartbeat机制或ack机制。这些机制同样可以用在TCP通信的时候 心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间隔约定好就行 ack即响应,一端收到另一端的消息后返回的确认信息
心跳机制
一般来说是客户端定时发往服务端的,服务端并不需要ack回复客户端,只需要记录该客户端还活着就行了 如果是服务端定时发往客户端的,一般需要客户端ack响应来表示活着,如果没有收到ack的客户端,服务端移除其信息。这种实现较为复杂,用的较少 也可以双向都发心跳的,用的更少 在服务器端代码中使用第一种心跳机制改进
import socket import threading import datetime import logging
FORMAT = "%(asctime)s %(threadName) %(thread) %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUDPServer: def __init__(self, ip='127.0.0.1', port=9999, intreval=10): self.addr = (ip, port) self.sock = socket.socket(type=socket.SOCK_DGRAM) self.clients = {} # 记录客户端, 改为字典 self.event = threading.Event() self.interval = intreval # 默认10秒,超时就要移除对应的客户端
def start(self): self.sock.bind(self.addr) # 立即绑定
threading.Thread(target=self.recv, name='recv').start()
def recv(self): while not self.event.is_set(): localset = set() # 清理超市 data , raddr = self.sock.recvfrom(1024) # 阻塞接受数据
current = datetime.datetime.now().timestamp() # float
if data.strip() = b'^hb^': # 心跳信息
self.clients[raddr] = current
continue
elif data.strip() =
b'quit':
self.clients.pop(raddr, None) logging.info('{} leaving'.format(raddr)) continue self.clients[raddr] = current
msg = '{}. from {}:{}'.format(data.decode(), *raddr) logging.info(msg) msg = msg.encode()
for c , stamp in self.clients.items(): if current - stamp > self.interval: localset.add(c) else: self.sock.sendto(msg, c) # 不保证对方能够收到
for c in localset: self.clients.pop(c)
def stop(self): self.event.set() self.clients.clear() self.sock.close()
def main(): cs = ChatUDPServer() cs.start()
while True: cmd = input('>>>') if cmd.strip() == 'quit': cs.stop() break logging.info(threading.enumerate()) logging.info(cs.clients)
if name == 'main': main()
- 客户端代码改进
增加定时发送心跳代码
import socket import threading import datetime import logging
FORMAT = "%(asctime)s %(threadName) %(thread) %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUdpClient: def __init__(self, rip='127.0.0.1', rport=9999): self.sock = socket.socket(type=socket.SOCK_DGRAM) self.raddr = (rip, rport) self.event = threading.Event()
def start(self): self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口
threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start() threading.Thread(target=self.recv, name='recv').start()
def _sendhb(self): # 心跳 while not self.event.wait(5): self.send('^hb^')
def recv(self): while not self.event.is_set(): data, raddr = self.sock.recvfrom(1024)
msg = '{}. from {}:{}'.format(data.decode(), *raddr) logging.info(msg)
def send(self, msg: str): self.sock.sendto(msg.encode(), self.raddr)
def stop(self): self.event.set() self.send('quit') # 通知服务端退出 self.sock.close()
def main(): cc1 = ChatUdpClient() cc2 = ChatUdpClient() cc1.start() cc2.start() print(cc1.sock) print(cc2.sock)
while True: cmd = input('Input your words >>') if cmd.strip() == 'quit': cc1.stop() cc2.stop() break cc1.send(cmd) cc2.send(cmd)
if name == 'main': main()
UDP协议应用
UDP是无连接协议,它基于以下假设:
网络足够好 消息不会丢包 包不会乱序 但是,即使再局域网,也不能保证不丢包,而且包的达到不一定有序
应用场景 视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决 海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系 DNS协议,数据内容小,一个包就能查询到结果,不存在乱序,丢包,重新请求解析 一般来说,UDP性能优于TCP,但是可靠性要求高的场合的还是要选择TCP协议
SocketServer
概述 socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层API进行封装 Python的封装就是——socketserver模块。它是网络服务编程框架,便于企业级快速开发
类的继承关系
------------
BaseServer |
------------
v
----------- ------------------
TCPServer | --–—> | UnixStreamServer |
----------- ------------------
v
----------- --------------------
UDPServer | --–—> | UnixDatagramServer |
----------- --------------------
SocketServer简化了网络服务器的编写 它有4个同步类:
TCPServer UDPServer UnixStreamServer UnixDatagramServer 2个Mixin类:ForkingMixIn 和 ThreadingMixIn 类,用来支持异步。由此得到
class ForkingUDPServer(ForkingMixIn, UDPServer): pass class ForkingTCPServer(ForkingMixIn, TCPServer): pass class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass fork是创建多进程,thread是创建多线程 fork需要操作系统支持,Windows不支持
编程接口
socketserver.BaseServer(server_address, RequestHandlerClass) 需要提供服务器绑定的地址信息,和用于处理请求的RequestHandlerClass类 RequestHandlerClass类必须是BaseRequestHandler类的子类,在BaseServer中代码如下:
class BaseServer: def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def finish_request(self, request, client_address): # 处理请求的方法 """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) # RequestHandlerClass构造
BaseRequestHandler类 它是和用户连接的用户请求处理类的基类,定义为 BaseRequestHandler(request, client_address, server) 服务端Server实例接收用户请求后,最后会实例化这个类 它被初始化时,送入3个构造参数:request, client_address, server自身 以后就可以在BaseRequestHandler类的实例上使用以下属性:
self.request是和客户端的连接的socket对象 self.server是TCPServer实例本身 self.client_address是客户端地址 这个类在初始化的时候,它会依次调用3个方法。子类可以覆盖这些方法
class BaseRequestHandler: def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish()
def setup(self): # 每一个连接初始化 pass
def handle(self): # 每一次请求处理 pass
def finish(self): # 每一个连接清理 pass
测试代码 import threading import socketserver
class Myhandler(socketserver.BaseRequestHandler): def handle(self):
print('-' * 30) print(self.server) # 服务 print(self.request) # 服务端负责客户端连接请求的socket对象 print(self.client_address) # 客户端地址 print(self.__dict__) print(self.server.__dict__) # 能看到负责accept的socket
print(threading.enumerate()) print(threading.current_thread()) print('-' * 30)
addr = ('192.168.142.1', 9999) server = socketserver.ThreadingTCPServer(addr, Myhandler)
server.serve_forever() # 永久循环执行
<socketserver.ThreadingTCPServer object at 0x000001A8055D80F0>
<socket.socket fd=492, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, \ laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 50280)>
('127.0.0.1', 50280)
{'request': <socket.socket fd=492, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, \ proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 50280)>, 'client_address': ('127.0.0.1', 50280),\ 'server': <socketserver.ThreadingTCPServer object at 0x000001A8055D80F0>}
{'server_address': ('127.0.0.1', 9999), 'RequestHandlerClass': <class 'main.Myhandler'>, \ '_BaseServer__is_shut_down': <threading.Event object at 0x000001A8055D85F8>, '_BaseServer__shutdown_request': False,\ 'socket': <socket.socket fd=540, family=AddressFamily.AF_INET, \ type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999)>}
[<_MainThread(MainThread, started 53996)>, <Thread(Thread-1, started 53580)>]
<Thread(Thread-1, started 53580)>
测试结果说明,handle方法相当于socket的recv方法 每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.client_address
测试过程中,上面代码,连接后立即断开了,怎样才能客户端和服务器端长时间连接 import threading import socketserver import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class Myhandler(socketserver.BaseRequestHandler): def handle(self):
print('-' * 30) print(self.server) # 服务 print(self.request) # 服务端负责客户端连接请求的socket对象 print(self.client_address) # 客户端地址 print(self.__dict__) print(self.server.__dict__) # 能看到负责accept的socket
print(threading.enumerate())
print(threading.current_thread())
print('-' * 30)
for i in range(3): # 三次后客户端断开连接
data = self.request.recv(1024)
logging.info(data)
logging.info('==end==
')
addr = ('127.0.0.1', 9999) server = socketserver.ThreadingTCPServer(addr, Myhandler)
server.serve_forever() # 永久循环执行
将ThreadingTCPServer换成TCPServer,同时连接2个客户端观察效果
ThreadingTCPServer是异步的,可以同时处理多个连接 TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程
总结 创建服务器需要几个步骤:
从BaseRequestHandler类派生出子类,并覆盖其handle()方法来创建请求处理程序类,此方法将处理传入请求 实例化一个服务器类,传参服务器的地址和请求处理类 调用服务器实例的handle_request()执行一次或serve_forever()永久执行方法 调用server_close()关闭套接字
实现EchoServer
顾名思义,Echo,来什么消息回显什么消息 客户端发来什么信息,返回什么信息
import threading import socketserver
class Handler(socketserver.BaseRequestHandler): def setup(self): super().setup() self.event = threading.Event()
def finish(self): super().finish() self.event.set()
def handle(self): super().handle() while not self.event.is_set(): data = self.request.recv(1024).decode() msg = '{} {}'.format(self.client_address, data).encode() self.request.send(msg)
server = socketserver.ThreadingTCPServer(('127.0.0.1', 9999), Handler) threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True).start()
while True: cmd = input('>>').strip() if cmd == 'quit': server.server_close() break print(threading.enumerate())
实战——改写ChatServer
使用ThreadingTCPServer改写ChatServer
import datetime import threading from socketserver import ThreadingTCPServer, BaseRequestHandler, StreamRequestHandler import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatHandler(StreamRequestHandler): clients = {}
def setup(self): super().setup() self.event = threading.Event() self.clients[self.client_address] = self.wfile
def handle(self): super().handle()
while not self.event.is_set(): data = self.rfile.read1(1024) # 可以读取到数据 data = data.decode().rstrip()
if data ='quit' or data =
'': # 主动退出和断开
break
msg = '{} {}:{} {}'.format(datetime.datetime.now(), *self.client_address, data)
for f in self.clients.values(): f.write(msg.encode()) f.flush()
def finish(self): super().finish() self.clients.pop(self.client_address) self.event.set()
server = ThreadingTCPServer(('127.0.0.1', 9999), ChatHandler) server.daemon_threads = True # 让所有启动线程都为daemon
threading.Thread(target=server.serve_forever, name='chatserver', daemon=True).start()
while True: cmd = input('>>') if cmd.strip() == 'quit': server.server_close() break print(threading.enumerate())
问题
上例 self.clients.pop(self.client_address) 能执行到吗? 如果连接的线程中handle方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients的pop方法还能执行吗? 当然能执行,基类源码保证了即使异常,也能执行finish方法。但不代表不应该不捕获客户端各种异常 注意:此程序线程不安全,需要加锁来解决
总结
为每一个连接提供RequestHandlerClass类实例,依次调用setup、handle、finish方法,且使用了try…finally结构保证finish方法一定能被调用。这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在handle函数中使用循环。 socketserver模块提供的不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度 将socket编程简化,只需要程序员关注数据处理本身,实现Handler类就行了。 这种风格在Python十分常见
IO多种概念及多路复用
重要概念
同步、异步
函数或方法被调用的时候,调用者是否得到最终结果的
直接得到最终结果的,就是同步调用 不直接得到最终结果的,就是异步调用
阻塞、非阻塞
函数或方法调用的时候,是否立刻返回
立即返回就是非阻塞调用 不立即返回就是阻塞调用
区别
同步、异步,与阻塞、非阻塞不相关 同步、异步强调的是,是否得到(最终的)结果 阻塞、非阻塞强调是时间,是否等待
同步与异步区别在于:调用者是否得到了想要的最终结果 同步就是一直要执行到返回最终结果 异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,以后可以通过被调用者提供的某种方式(被调用着通知调用者、调用者反复查询、回调),来取回最终结果
阻塞与非阻塞的区别在于,调用者是否还能干其他事 阻塞,调用者就只能干等 非阻塞,调用者可以先去忙会别的,不用一直等
联系
同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞 同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等
异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,取了号什么不干就等叫自己的号 异步非阻塞,我要打饭,你给我号,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我
操作系统知识
在386之前,CPU工作在实模式下,之后,开始支持保护模式,对内存进行了划分 X86 CPU有4种工作级别:
Ring0级,可以执行特权指令,可以访问所有级别数据,可以访问IO设备等 Ring3级,级别最低,只能访问本级别数据 内核代码运行在Ring0,用户代码运行在Ring3
现代操作系统采用虚拟存储器,理论上,对于32位系统来说,进程对虚拟内存地址的内存寻址空间为4G(2^32) 64位操作系统理论上最大内存寻址空间(2^64) 操作系统中,内核程序独立且运行在较高的特权级别上,它们驻留在被保护的内存空间上,拥有访问硬件设备的所有权限,这部分内存称为内核空间(内核态,最高地址1G)
普通应用程序运行在用户空间(用户态)
应用程序想访问某些硬件资源就需要通过操作系统提供的系统调用,系统调用可以使用特权指令运行在内核空间,此时进程陷入内核态运行。系统调用完成,进程将回到用户态执行用户空间代码
同步IO、异步IO、IO多路复用
IO两个阶段
IO过程分两阶段:
数据准备阶段 内核空间复制回用户空间进程缓冲区阶段 发生IO的时候:
内核从IO设备读、写数据(淘米,把米放饭锅里煮饭) 进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来) 系统调用——read函数
IO模型
同步IO
同步IO模型包括 阻塞IO、非阻塞IO、IO多路复用、信号驱动IO
阻塞IO
进程等待(阻塞),直到读写完成。(全程等待)
非阻塞IO
进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间
第一阶段数据没有准备好,就先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的 第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的 例:淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭
IO多路复用
所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力
select几乎所有操作系统平台都支持,poll是对的select的升级 epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有kqueue,Windows有iocp 以select为例
将关注的IO操作告诉select函数并调用,进程阻塞,内核“监视”select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程 select举例
食堂供应很多菜(众多的IO),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待大师傅叫。其中一样菜好了,大师傅叫你,说你点的菜有好的了,你得自己遍历找找看哪一样才好了,请服务员把做好的菜打给你 epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了 一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下 epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高
信号驱动IO
进程在IO访问时,先通过sigaction系统调用,提交一个信号处理函数,立即返回。进程不阻塞 当内核准备好数据后,产生一个SIGIO信号并投递给信号处理函数。可以在此函数中调用recvfrom函数操作数据从内核空间复制到用户空间,这段过程进程阻塞
异步IO
进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。 举例
来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。在整个过程中,进程都可以忙别的,等好了才过来。 今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。 Linux的aio的系统调用,内核从版本2.6开始支持
前4个都是同步IO,因为核心操作recv函数调用时,进程阻塞直到拿到最终结果为止。 而异步IO进程全程不阻塞
Python中IO多路复用
IO多路复用 大多数操作系统都支持select和poll Linux 2.5+ 支持epoll BSD、Mac支持kqueue Solaris实现了/dev/poll Windows的IOCP Python的select库实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll。它是底层的IO多路复用模块
开发中的选择 完全跨平台,使用select、poll。但是性能较差 针对不同操作系统自行选择支持的技术,这样做会提高IO处理的性能 select维护一个文件描述符数据结构,单个进程使用有上限,通常是1024,线性扫描这个数据结构。效率低。 pool和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才知道哪个设备就绪了 epool使用事件通知机制,使用回调机制提高效率。
select/pool还要从内核空间复制消息到用户空间,而epoll通过内核空间和用户空间共享一块内存来减少复制
selectors库
3.4版本提供selectors库,高级IO复用库
类层次结构︰ BaseSelector +– SelectSelector 实现select +– PollSelector 实现poll +– EpollSelector 实现epoll +– DevpollSelector 实现devpoll +– KqueueSelector 实现kqueue
selectors.DefaultSelector返回当前平台最有效、性能最高的实现。 但是,由于没有实现Windows下的IOCP,所以,Windows下只能退化为select
在selects模块源码最下面有如下代码
if 'KqueueSelector' in globals(): DefaultSelector = KqueueSelector elif 'EpollSelector' in globals(): DefaultSelector = EpollSelector elif 'DevpollSelector' in globals(): DefaultSelector = DevpollSelector elif 'PollSelector' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
事件注册 class SelectSelector(_BaseSelectorImpl): """Select-based selector.""" def register(fileobj, events, data=None) -> SelectorKey: pass
为selector注册一个文件对象,监视它的IO事件。返回SelectKey对象。
fileobj 被监视文件对象,例如socket对象 events 事件,该文件对象必须等待的事件 data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个参数在关注的事件产生后让selector干什么事 Event常量 含义 EVENT_READ 可读 0b01,内核已经准备好输入输出设备,可以开始读了 EVENT_WRITE 可写 0b10,内核准备好了,可以往里写了 selectors.SelectorKey 有4个属性:
fileobj 注册的文件对象 fd 文件描述符 events 等待上面的文件描述符的文件对象的事件 data 注册时关联的数据
练习:IO多路复用TCP Server
完成一个TCP Server,能够接受客户端请求并回应客户端消息
import selectors import socket
s = selectors.DefaultSelector() # 拿到selector
server = socket.socket() server.bind(('127.0.0.1', 9999)) server.listen()
server.setblocking(False)
def accept(sock:socket.socket, mask:int): """mask:事件的掩码""" conn, raddr = sock.accept() conn.setblocking(False) # 非阻塞 key = s.register(conn, selectors.EVENT_READ, recv) # 读和写可同时监控, selectors.EVENT_READ | selectors.EVENT_WRITE print(key)
def recv(conn:socket.socket, mast:int): data = conn.recv(1024) print(data) msg = 'Your msg = {} from {}'.format(data.decode(), conn.getpeername()).encode() conn.send(msg)
key = s.register(server, selectors.EVENT_READ, accept) #socket fileobject print(key) print(key.__class__.mro())
while True:
events = s.select() # epoll select, 默认是阻塞的
print(events) # [(key, mask)]
for key, mask in events:
print(type(key), type(mask)) print(key.data) key.data(key.fileobj, mask)
实战:IO多路复用群聊软件
将ChatServer改写成IO多路复用的方式 不需要启动多线程来执行socket的accept、recv方法了
import socket import threading import selectors import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s" logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatServer: def __init__(self, ip='127.0.0.1', port=9999): self.sock = socket.socket() self.addr = ip, port self.event = threading.Event()
self.selector = selectors.DefaultSelector()
def start(self): self.sock.bind(self.addr) self.sock.listen() self.sock.setblocking(False)
key = self.selector.register(self.sock, selectors.EVENT_READ, self.accept) logging.info(key) # 只有一个
threading.Thread(target=self.select, name='select', daemon=True).start()
def select(self): while not self.event.is_set():
events = self.selector.select() # 阻塞 print(events) # [(key, mask)] for key, mask in events:
key.data(key.fileobj, mask) # select线程
def accept(self, sock:socket.socket, mask): """mask:事件的掩码""" conn, raddr = self.sock.accept() conn.setblocking(False) # 非阻塞
key = self.selector.register(conn, selectors.EVENT_READ, self.recv) logging.info(key) #n个
def recv(self, conn:socket.socket, mask): data = conn.recv(1024)
if data.strip() = b'quit' or data.strip() =
b'':
self.selector.unregister(conn) conn.close() return
for key in self.selector.get_map().values():
if key.data == self.recv: # key.data 注册时注入的绑定的对象 s = key.fileobj msg = 'Your msg = {} from {}'.format(data.decode(), conn.getpeername()).encode() s.send(msg)
def stop(self): self.event.set()
fs = [] for key in self.selector.get_map().values(): fs.append(key.fileobj) for f in fs: self.selector.unregister(f) f.close()
self.selector.close()
if name = '__main__':
cc = ChatServer()
cc.start()
while True:
cmd
input('>>').strip()
if cmd == 'quit':
cc.stop()
break
logging.info(threading.enumerate())
本例只完成基本功能,其他功能如有需要,请自行完成。 注意使用IO多路复用,使用了几个线程? 特别注意key.data == self.recv
总结
使用 IO多路复用 +(select、epoll) 并不一定比 多线程 + 同步阻塞IO 性能好,其最大优势可以处理更多的连接
多线程 + 同步阻塞IO模式 开辟太多线程,线程开辟、销毁开销还是较大,倒是可以使用线程池;线程多,线程自己使用的内存也很可观;多线程切换时要保护现场和恢复现场,线程过多,切换会占用大量的时间
连接较少,多线程 + 同步阻塞IO模式比较适合,效率也不低
如果连接非常多,对服务端程序来说,IO并发还是比较高的,这时候,开辟太多线程其实也不是很划算,这时候IO多路复用或许是更好的选择,使用epoll