后台技术发展史:
1)阻塞IO,Socket通信
2)线程,进程阻塞IO,CGI时代, apache
3)事件回调
1. 多路IO复用处理 ,Practor(同步非阻塞)C10K。 NGINX ,FLASK
2. 协程 Preactor(异步非阻塞技术),10M时代 Sanic
一、同步阻塞式IO
import socket
BUF_SIZE=512
class EchoServer:
def __init__(self, host, port):
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_socket.bind((host, port))
self.listen_socket.listen(128)
def handle_echo(self, conn):
while True:
data = conn.recv(BUF_SIZE)
if data:
conn.sendall(data)
else:
conn.close()
break
def run(self):
print("Server Start!!!")
while True:
conn, _ = self.listen_socket.accept()
print(_)
self.handle_echo(conn)
server = EchoServer("127.0.0.1", 22857)
server.run()

优点:简单
缺点:并发只有一个,会阻塞服务端
二、 进程+ 阻塞式同步IO
1)动态创建进程
import socket
BUF_SIZE=512
class EchoServer:
def __init__(self, host, port):
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_socket.bind((host, port))
self.listen_socket.listen(128)
def handle_echo(self, conn):
while True:
data = conn.recv(BUF_SIZE)
if data:
conn.sendall(data)
else:
conn.close()
break
def run(self):
print("Server Start!!!")
while True:
conn, _ = self.listen_socket.accept()
print(_)
self.handle_echo(conn)
server = EchoServer("127.0.0.1", 22857)
server.run()

htop查看进程

优点:简单。
缺点:内存上下文切换开销大,进程内存资源占用多。
2)进程池 + 阻塞式同步IO
import socket,os
import concurrent.futures
BUF_SIZE=512
class EchoServer:
def __init__(self, host, port):
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_socket.bind((host, port))
self.listen_socket.listen(128)
self.processes = 3
def handle_echo(self,conn,address):
while True:
print("get ",address)
while True:
data = conn.recv(BUF_SIZE)
if data:
conn.sendall(data)
else:
conn.close()
break
def run(self):
with concurrent.futures.ProcessPoolExecutor(max_workers=self.processes) as executor:
while True:
conn, address = self.listen_socket.accept()
print(address)
executor.submit(self.handle_echo,conn,address)
server = EchoServer("127.0.0.1", 22857)
server.run()


优点:固定进程数,优化内存上下文切换,析构,创建的开销。
缺点:进程内存资源占用多,并发代价大。
三、多线程
1)动态创建线程 + 阻塞式同步IO
import _thread
import socket
BUF_SIZE=512
class EchoServer:
def __init__(self, host, port):
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_socket.bind((host, port))
self.listen_socket.listen(128)
def handle_echo(self, conn):
while True:
data = conn.recv(BUF_SIZE)
if data:
conn.sendall(data)
else:
conn.close()
break
def run(self):
while True:
conn, address = self.listen_socket.accept()
print(address)
_thread.start_new_thread(self.handle_echo, (conn, ))
server = EchoServer("127.0.0.1", 22857)
server.run()


优点:构造、析构线程占用资源比进程小、线程间可以共享进程资源。
缺点:虽然线程占用资源少,但任不可小觑,且竞态资源注意锁。
此外在Python的主流实现CPython中,由于GIL(Global interpreter lock)的关系,同一时间只会有一个获得了GIL的线程在跑,使得程序无法利用物理多核的性能加速运算。
2)线程池 + 阻塞式同步IO
import socket,os
import concurrent.futures
BUF_SIZE=512
class EchoServer:
def __init__(self, host, port):
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_socket.bind((host, port))
self.listen_socket.listen(128)
self.threads = 3
def handle_echo(self,conn,address):
while True:
print("get ",address)
while True:
data = conn.recv(BUF_SIZE)
if data:
conn.sendall(data)
else:
conn.close()
break
def run(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
while True:
conn, address = self.listen_socket.accept()
print(address)
executor.submit(self.handle_echo,conn,address)
server = EchoServer("127.0.0.1", 22857)
server.run()


对比进程池与线程池资源占用。同样3个worker,进程资源占用是线程的三倍。
四、事件驱动模型
观察进程与线程模型,阻塞操作如recv、accept仍然占用巨高的cpu资源,事件驱动模型只有在阻塞操作就绪事件发生时才进行相应资源的分配。
1)Reactor(非阻塞式同步IO) 来了事件我通知你,你来处理
import socket,os
import select
BUF_SIZE=512
class EchoServer:
def __init__(self, host, port):
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_socket.setblocking(0)
listen_socket.bind((host, port))
listen_socket.listen(128)
self._fd_map = {}
self._fd_map[listen_socket.fileno()] = (listen_socket, self.handle_conn)
self._epoll = select.epoll()
self._epoll.register(listen_socket.fileno(), select.EPOLLIN)
def handle_echo(self, conn):
data = conn.recv(BUF_SIZE)
if data:
conn.sendall(data)
else:
self._epoll.unregister(conn.fileno())
conn.close()
def handle_conn(self, listen_socket):
try:
conn, address = listen_socket.accept()
print("get ",address)
except socket.error as e:
print(e)
return
conn.setblocking(0)
self._fd_map[conn.fileno()] = (conn, self.handle_echo)
self._epoll.register(conn.fileno(), select.EPOLLIN)
# 处理回调
def handler(self, fd, events):
if events & select.EPOLLIN:
socket, fd_handle = self._fd_map.get(fd, None)
if socket and fd_handle:
fd_handle(socket)
def run(self):
while True:
try:
events = self._epoll.poll()
except Exception as e:
if e in (EPIPE, EINTR):
# EPIPE: Happens when the client closes the connection
# EINTR: Happens when received a signal
pass
else:
print("Poll Exception: %s", e)
continue
for fd, event in events:
self.handler(fd, event)
server = EchoServer("127.0.0.1", 22857)
server.run()


优点:单线程IO复用,优化资源。
缺点:不是完全异步。
2)Proactor(非阻塞式异步IO) 来了事件我来处理,处理完了我通知你。
Reactor:同时接收多个服务请求,并且依次同步的处理它们的事件驱动程序;
Proactor:异步接收和同时处理多个服务请求的事件驱动程序。
linux内核暂时不支持Proactor,只能用主线程io模拟proactor
高并发测试。
客户端并发程序
import asyncio
import socket
import sys
async def new_socket():
# 创建 socket 对象
# client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "127.0.0.1"
port = 22857
reader, writer = await asyncio.open_connection(host, port)
while True:
writer.write("hello".encode('utf-8'))
await writer.drain()
msg = await reader.read(1024)
# print (msg.decode('utf-8'))
tasklist = []
async def main():
while True:
for x in range( 100 ):
task = asyncio.create_task( new_socket() )
tasklist.append(task)
await asyncio.sleep(1)
for x in tasklist:
await x
asyncio.run(main())
启动前
1)套接字占用

2)系统资源占用

启动后
1)套接字占用

2)系统资源占用

C10K 内存占用只有几十兆,远小于线程和进程模型。
不过服务器资源占用不只是socket资源,还有数据传输和db资源占用与消耗,视具体情况具体分析。
网友评论