Python网络编程深度实践:TCP/IP至WebSocket协议演进解析

# Python网络编程深度实践:TCP/IP至WebSocket协议演进解析


Python网络编程涵盖了从底层套接字到高级应用协议的全栈实现。本文将深入探讨网络协议栈的演进路径及其在Python中的实践。


## TCP/IP协议栈:网络通信的基石


TCP/IP协议提供了可靠的数据传输基础,Python的socket模块提供了底层访问能力。


```python

# TCP服务器与客户端实现

import socket

import threading

import select

from typing import Optional


class TCPServer:

    """TCP服务器实现"""

    

    def __init__(self, host: str = 'localhost', port: int = 8080):

        self.host = host

        self.port = port

        self.server_socket: Optional[socket.socket] = None

        self.clients = {}

        self.running = False

        

    def start(self):

        """启动TCP服务器"""

        # 创建TCP套接字

        self.server_socket = socket.socket(

            socket.AF_INET,      # IPv4

            socket.SOCK_STREAM,  # TCP

            socket.IPPROTO_TCP   # TCP协议

        )

        

        # 设置套接字选项

        self.server_socket.setsockopt(

            socket.SOL_SOCKET, 

            socket.SO_REUSEADDR, 

            1

        )

        self.server_socket.setsockopt(

            socket.IPPROTO_TCP,

            socket.TCP_NODELAY,

            1  # 禁用Nagle算法,降低延迟

        )

        

        # 绑定地址和端口

        self.server_socket.bind((self.host, self.port))

        

        # 开始监听

        self.server_socket.listen(5)

        print(f"TCP服务器启动于 {self.host}:{self.port}")

        

        self.running = True

        self.accept_connections()

    

    def accept_connections(self):

        """接受客户端连接"""

        while self.running:

            try:

                # 使用select处理多个连接

                readable, _, _ = select.select(

                    [self.server_socket] + list(self.clients.keys()),

                    [], [], 1.0

                )

                

                for sock in readable:

                    if sock is self.server_socket:

                        # 接受新连接

                        client_socket, client_address = self.server_socket.accept()

                        

                        # 设置客户端套接字选项

                        client_socket.setsockopt(

                            socket.IPPROTO_TCP,

                            socket.TCP_KEEPIDLE, 

                            60  # 保活探测开始时间

                        )

                        client_socket.setsockopt(

                            socket.IPPROTO_TCP,

                            socket.TCP_KEEPINTVL, 

                            10  # 保活探测间隔

                        )

                        client_socket.setsockopt(

                            socket.IPPROTO_TCP,

                            socket.TCP_KEEPCNT, 

                            5   # 最大探测次数

                        )

                        

                        print(f"新连接: {client_address}")

                        self.clients[client_socket] = client_address

                        

                        # 启动客户端处理线程

                        thread = threading.Thread(

                            target=self.handle_client,

                            args=(client_socket, client_address)

                        )

                        thread.daemon = True

                        thread.start()

                    else:

                        # 处理现有连接的数据

                        self.handle_client_data(sock)

            

            except OSError as e:

                print(f"连接错误: {e}")

                break

    

    def handle_client(self, client_socket: socket.socket, address: tuple):

        """处理单个客户端连接"""

        try:

            # 设置接收缓冲区大小

            client_socket.setsockopt(

                socket.SOL_SOCKET,

                socket.SO_RCVBUF,

                65536  # 64KB接收缓冲区

            )

            

            while self.running:

                # 接收数据

                data = client_socket.recv(4096)

                if not data:

                    break

                

                # 处理请求

                response = self.process_request(data)

                

                # 发送响应

                client_socket.sendall(response)

        

        except ConnectionError as e:

            print(f"连接中断: {address} - {e}")

        finally:

            client_socket.close()

            if client_socket in self.clients:

                del self.clients[client_socket]

            print(f"连接关闭: {address}")

    

    def process_request(self, data: bytes) -> bytes:

        """处理请求并生成响应"""

        # 实现具体的业务逻辑

        request_text = data.decode('utf-8', errors='ignore')

        

        # 解析HTTP-like请求

        if request_text.startswith(('GET', 'POST', 'PUT', 'DELETE')):

            response = self.handle_http_request(request_text)

        else:

            response = f"ECHO: {request_text}".encode('utf-8')

        

        return response

    

    def handle_http_request(self, request: str) -> bytes:

        """处理HTTP请求"""

        lines = request.split('\r\n')

        request_line = lines[0]

        

        method, path, version = request_line.split()

        

        # 构建HTTP响应

        response_body = f"Method: {method}, Path: {path}"

        response = (

            f"HTTP/1.1 200 OK\r\n"

            f"Content-Type: text/plain\r\n"

            f"Content-Length: {len(response_body)}\r\n"

            f"Connection: keep-alive\r\n"

            f"\r\n"

            f"{response_body}"

        )

        

        return response.encode('utf-8')


class TCPClient:

    """TCP客户端实现"""

    

    def __init__(self, host: str = 'localhost', port: int = 8080):

        self.host = host

        self.port = port

        self.socket: Optional[socket.socket] = None

        self.buffer_size = 4096

    

    def connect(self) -> bool:

        """连接到服务器"""

        try:

            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            self.socket.settimeout(10.0)  # 连接超时

            

            # 启用TCP keepalive

            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

            

            self.socket.connect((self.host, self.port))

            print(f"已连接到 {self.host}:{self.port}")

            return True

        except (socket.timeout, ConnectionRefusedError) as e:

            print(f"连接失败: {e}")

            return False

    

    def send_request(self, data: bytes) -> bytes:

        """发送请求并接收响应"""

        if not self.socket:

            raise ConnectionError("未建立连接")

        

        try:

            # 发送数据

            self.socket.sendall(data)

            

            # 接收响应

            response = bytearray()

            while True:

                chunk = self.socket.recv(self.buffer_size)

                if not chunk:

                    break

                response.extend(chunk)

                

                # 检查是否接收完整

                if len(chunk) < self.buffer_size:

                    break

            

            return bytes(response)

        

        except socket.timeout:

            raise TimeoutError("接收响应超时")

    

    def close(self):

        """关闭连接"""

        if self.socket:

            try:

                self.socket.shutdown(socket.SHUT_RDWR)

            except:

                pass

            self.socket.close()

            self.socket = None

```


## UDP协议:无连接传输实现


UDP提供了无连接的轻量级传输能力,适用于实时性要求高的场景。


```python

# UDP服务器与客户端实现

import socket

import struct

from typing import Tuple


class UDPServer:

    """UDP服务器实现"""

    

    def __init__(self, host: str = 'localhost', port: int = 9090):

        self.host = host

        self.port = port

        self.server_socket = None

        self.running = False

    

    def start(self):

        """启动UDP服务器"""

        self.server_socket = socket.socket(

            socket.AF_INET,

            socket.SOCK_DGRAM,  # UDP

            socket.IPPROTO_UDP

        )

        

        # 设置广播权限

        self.server_socket.setsockopt(

            socket.SOL_SOCKET,

            socket.SO_BROADCAST,

            1

        )

        

        # 绑定地址

        self.server_socket.bind((self.host, self.port))

        

        # 设置接收缓冲区

        self.server_socket.setsockopt(

            socket.SOL_SOCKET,

            socket.SO_RCVBUF,

            65536

        )

        

        print(f"UDP服务器启动于 {self.host}:{self.port}")

        self.running = True

        self.handle_requests()

    

    def handle_requests(self):

        """处理UDP请求"""

        while self.running:

            try:

                # 接收数据报

                data, client_address = self.server_socket.recvfrom(65535)

                

                # 处理请求

                response = self.process_datagram(data, client_address)

                

                if response:

                    # 发送响应

                    self.server_socket.sendto(response, client_address)

            

            except KeyboardInterrupt:

                break

            except Exception as e:

                print(f"处理请求时出错: {e}")

    

    def process_datagram(self, data: bytes, address: Tuple[str, int]) -> bytes:

        """处理数据报"""

        # 实现自定义协议处理

        try:

            # 示例:解析自定义二进制协议

            if len(data) >= 4:

                # 解析头部

                packet_type, payload_length = struct.unpack('!HH', data[:4])

                

                if packet_type == 1:  # Echo请求

                    payload = data[4:4+payload_length]

                    return self.build_echo_response(payload)

                elif packet_type == 2:  # 时间请求

                    return self.build_time_response()

            

            return b'INVALID_REQUEST'

        

        except struct.error as e:

            return f'PROTOCOL_ERROR: {e}'.encode()

    

    def build_echo_response(self, payload: bytes) -> bytes:

        """构建Echo响应"""

        header = struct.pack('!HH', 1, len(payload))

        return header + payload

    

    def build_time_response(self) -> bytes:

        """构建时间响应"""

        import time

        timestamp = int(time.time())

        header = struct.pack('!HH', 2, 4)

        data = struct.pack('!I', timestamp)

        return header + data


class ReliableUDP:

    """可靠UDP协议实现"""

    

    def __init__(self, host: str, port: int):

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        self.address = (host, port)

        self.sequence_number = 0

        self.acknowledged_packets = set()

        self.pending_packets = {}

        self.timeout = 2.0

        

    def send_reliable(self, data: bytes) -> bool:

        """可靠发送数据"""

        packet_id = self.sequence_number

        self.sequence_number += 1

        

        # 构建数据包

        packet = self.build_packet(packet_id, data)

        

        # 发送并等待确认

        attempts = 0

        while attempts < 3:

            self.socket.sendto(packet, self.address)

            self.pending_packets[packet_id] = packet

            

            # 等待确认

            if self.wait_for_ack(packet_id):

                del self.pending_packets[packet_id]

                return True

            

            attempts += 1

        

        return False

    

    def build_packet(self, packet_id: int, data: bytes) -> bytes:

        """构建数据包"""

        header = struct.pack('!IH', packet_id, len(data))

        checksum = self.calculate_checksum(header + data)

        return header + struct.pack('!I', checksum) + data

    

    def calculate_checksum(self, data: bytes) -> int:

        """计算校验和"""

        if len(data) % 2 == 1:

            data += b'\x00'

        

        total = 0

        for i in range(0, len(data), 2):

            word = (data[i] << 8) + data[i+1]

            total += word

            total = (total & 0xffff) + (total >> 16)

        

        return ~total & 0xffff

    

    def wait_for_ack(self, packet_id: int) -> bool:

        """等待确认"""

        self.socket.settimeout(self.timeout)

        

        try:

            while True:

                data, _ = self.socket.recvfrom(1024)

                

                if len(data) >= 6:

                    ack_id, ack_type = struct.unpack('!IH', data[:6])

                    

                    if ack_type == 1 and ack_id == packet_id:

                        return True

        

        except socket.timeout:

            return False

```


## HTTP/1.1到HTTP/2的演进


Python对HTTP协议的支持从基础的socket实现到现代异步框架。


```python

# HTTP/1.1和HTTP/2实现对比

import http.client

import ssl

from http.server import HTTPServer, BaseHTTPRequestHandler

import asyncio

import h2.config

import h2.connection


class HTTP11Handler(BaseHTTPRequestHandler):

    """HTTP/1.1请求处理器"""

    

    protocol_version = 'HTTP/1.1'

    

    def do_GET(self):

        """处理GET请求"""

        # 解析请求头

        content_length = self.headers.get('Content-Length')

        keep_alive = self.headers.get('Connection') == 'keep-alive'

        

        # 构建响应

        self.send_response(200)

        self.send_header('Content-Type', 'text/html')

        <"gmb.s6k3.org.cn"><"dbrf.s6k3.org.cn"><"poj.s6k3.org.cn">

        if keep_alive:

            self.send_header('Connection', 'keep-alive')

            self.send_header('Keep-Alive', 'timeout=5, max=100')

        else:

            self.send_header('Connection', 'close')

        

        response_body = self.generate_response()

        self.send_header('Content-Length', str(len(response_body)))

        self.end_headers()

        

        self.wfile.write(response_body)

    

    def do_POST(self):

        """处理POST请求"""

        content_length = int(self.headers.get('Content-Length', 0))

        post_data = self.rfile.read(content_length)

        

        # 处理POST数据

        result = self.process_post_data(post_data)

        

        self.send_response(200)

        self.send_header('Content-Type', 'application/json')

        self.end_headers()

        

        self.wfile.write(result.encode())

    

    def generate_response(self) -> bytes:

        """生成响应内容"""

        html = """

       

       

       

            HTTP/1.1 Server

       

       

           

HTTP/1.1 Protocol

           

Current time: {time}

       

       

        """.format(time=self.date_time_string())

        

        return html.encode()

    

    def process_post_data(self, data: bytes) -> str:

        """处理POST数据"""

        import json

        try:

            parsed = json.loads(data.decode())

            return json.dumps({'status': 'success', 'data': parsed})

        except:

            return json.dumps({'status': 'error', 'message': 'Invalid JSON'})


class HTTP2Server:

    """HTTP/2服务器实现"""

    

    def __init__(self, host='localhost', port=8443):

        self.host = host

        self.port = port

        self.ssl_context = self.create_ssl_context()

    

    def create_ssl_context(self) -> ssl.SSLContext:

        """创建SSL上下文"""

        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)

        context.load_cert_chain('server.crt', 'server.key')

        

        # 启用ALPN以支持HTTP/2

        context.set_alpn_protocols(['h2', 'http/1.1'])

        

        return context

    

    async def handle_client(self, reader, writer):

        """处理客户端连接"""

        try:

            # 协商HTTP/2

            negotiated = writer.get_extra_info('ssl_object').selected_alpn_protocol()

            

            if negotiated == 'h2':

                await self.handle_http2(reader, writer)

            else:

                await self.handle_http11(reader, writer)

        

        except Exception as e:

            print(f"处理客户端时出错: {e}")

        finally:

            writer.close()

            await writer.wait_closed()

    

    async def handle_http2(self, reader, writer):

        """处理HTTP/2连接"""

        config = h2.config.H2Configuration(client_side=False)

        conn = h2.connection.H2Connection(config=config)

        

        conn.initiate_connection()

        writer.write(conn.data_to_send())

        await writer.drain()

        

        while True:

            data = await reader.read(65535)

            if not data:

                break

            

            events = conn.receive_data(data)

            for event in events:

                await self.handle_h2_event(event, conn, writer)

            

            writer.write(conn.data_to_send())

            await writer.drain()

    

    async def handle_h2_event(self, event, conn, writer):

        """处理HTTP/2事件"""

        if isinstance(event, h2.events.RequestReceived):

            await self.handle_h2_request(event, conn, writer)

        elif isinstance(event, h2.events.DataReceived):

            await self.handle_h2_data(event, conn, writer)

    

    async def handle_h2_request(self, event, conn, writer):

        """处理HTTP/2请求"""

        stream_id = event.stream_id

        headers = dict(event.headers)

        

        # 构建响应

        response_headers = [

            (':status', '200'),

            ('content-type', 'text/html'),

            ('server', 'python-http2')

        ]

        

        conn.send_headers(stream_id, response_headers)

        

        # 发送响应体

        response_body = self.generate_h2_response(headers)

        conn.send_data(stream_id, response_body, end_stream=True)

        

        writer.write(conn.data_to_send())

        await writer.drain()

```


## WebSocket:双向实时通信


WebSocket协议在HTTP基础上实现了全双工通信,Python提供了多种实现方式。


```python

# WebSocket协议实现

import asyncio

import websockets

import struct

from typing import Optional


class WebSocketProtocol:

    """WebSocket协议解析器"""

    

    OPCODE_CONTINUATION = 0x0

    OPCODE_TEXT = 0x1

    OPCODE_BINARY = 0x2

    OPCODE_CLOSE = 0x8

    OPCODE_PING = 0x9

    OPCODE_PONG = 0xA

    

    def __init__(self):

        self.buffer = bytearray()

    

    def parse_frame(self, data: bytes):

        """解析WebSocket帧"""

        if len(data) < 2:

            return None

        

        byte1, byte2 = data[0], data[1]

        

        fin = (byte1 & 0x80) != 0

        rsv1 = (byte1 & 0x40) != 0

        rsv2 = (byte1 & 0x20) != 0

        rsv3 = (byte1 & 0x10) != 0

        opcode = byte1 & 0x0F

        

        masked = (byte2 & 0x80) != 0

        payload_len = byte2 & 0x7F

        

        offset = 2

        

        # 处理扩展长度

        if payload_len == 126:

            if len(data) < offset + 2:

                return None

            payload_len = struct.unpack('!H', data[offset:offset+2])[0]

            offset += 2

        elif payload_len == 127:

            if len(data) < offset + 8:

                return None

            payload_len = struct.unpack('!Q', data[offset:offset+8])[0]

            offset += 8

        

        # 处理掩码

        masking_key = None

        if masked:

            if len(data) < offset + 4:

                return None

            masking_key = data[offset:offset+4]

            offset += 4

        <"sde.s6k3.org.cn"><"svx.s6k3.org.cn"><"m.r1k3.org.cn">

        # 检查数据是否完整

        if len(data) < offset + payload_len:

            return None

        

        # 提取载荷数据

        payload = data[offset:offset + payload_len]

        

        # 如果使用了掩码,解码数据

        if masked and masking_key:

            payload = self.unmask_payload(payload, masking_key)

        

        return {

            'fin': fin,

            'opcode': opcode,

            'payload': payload,

            'masked': masked

        }

    

    def unmask_payload(self, payload: bytes, masking_key: bytes) -> bytes:

        """解码掩码数据"""

        result = bytearray(payload)

        for i in range(len(result)):

            result[i] ^= masking_key[i % 4]

        return bytes(result)

    

    def build_frame(self, payload: bytes, opcode: int, fin: bool = True) -> bytes:

        """构建WebSocket帧"""

        frame = bytearray()

        

        # 第一个字节

        first_byte = 0

        if fin:

            first_byte |= 0x80

        first_byte |= opcode & 0x0F

        frame.append(first_byte)

        

        # 第二个字节和长度

        payload_len = len(payload)

        if payload_len <= 125:

            frame.append(payload_len)

        elif payload_len <= 65535:

            frame.append(126)

            frame.extend(struct.pack('!H', payload_len))

        else:

            frame.append(127)

            frame.extend(struct.pack('!Q', payload_len))

        

        # 添加载荷

        frame.extend(payload)

        

        return bytes(frame)


class WebSocketServer:

    """WebSocket服务器"""

    

    def __init__(self, host='localhost', port=8765):

        self.host = host

        self.port = port

        self.clients = set()

        self.handlers = {}

    

    async def handler(self, websocket, path):

        """WebSocket连接处理器"""

        self.clients.add(websocket)

        client_id = id(websocket)

        

        print(f"客户端连接: {client_id}")

        

        try:

            # 握手过程已完成(websockets库处理)

            

            # 处理消息

            async for message in websocket:

                await self.handle_message(websocket, message)

        

        except websockets.exceptions.ConnectionClosed:

            print(f"客户端断开: {client_id}")

        finally:

            self.clients.remove(websocket)

    

    async def handle_message(self, websocket, message):

        """处理WebSocket消息"""

        if isinstance(message, str):

            # 文本消息

            await self.handle_text_message(websocket, message)

        elif isinstance(message, bytes):

            # 二进制消息

            await self.handle_binary_message(websocket, message)

    

    async def handle_text_message(self, websocket, message: str):

        """处理文本消息"""

        # 解析JSON格式消息

        import json

        try:

            data = json.loads(message)

            action = data.get('action')

            

            if action in self.handlers:

                response = await self.handlers[action](websocket, data)

                if response:

                    await websocket.send(json.dumps(response))

            else:

                # 默认回声处理

                await websocket.send(json.dumps({

                    'action': 'echo',

                    'data': data,

                    'timestamp': self.get_timestamp()

                }))

        

        except json.JSONDecodeError:

            await websocket.send(json.dumps({

                'error': 'Invalid JSON',

                'received': message

            }))

    

    async def handle_binary_message(self, websocket, message: bytes):

        """处理二进制消息"""

        # 解析二进制协议

        if len(message) >= 4:

            # 示例:解析简单的二进制协议

            protocol_version = message[0]

            message_type = message[1]

            data_length = struct.unpack('!H', message[2:4])[0]

            

            if len(message) >= 4 + data_length:

                payload = message[4:4+data_length]

                

                # 处理二进制数据

                response = self.process_binary_data(message_type, payload)

                

                if response:

                    await websocket.send(response)

    

    def process_binary_data(self, message_type: int, payload: bytes) -> Optional[bytes]:

        """处理二进制数据"""

        if message_type == 1:  # Echo二进制数据

            return payload

        elif message_type == 2:  # 处理图像数据

            # 这里可以添加图像处理逻辑

            return payload[::-1]  # 简单反转作为示例

        

        return None

    

    def register_handler(self, action: str, handler):

        """注册消息处理器"""

        self.handlers[action] = handler

    

    async def broadcast(self, message):

        """广播消息给所有客户端"""

        if not self.clients:

            return

        

        if isinstance(message, dict):

            import json

            message = json.dumps(message)

        

        tasks = [client.send(message) for client in self.clients]

        await asyncio.gather(*tasks, return_exceptions=True)

    

    def get_timestamp(self):

        """获取时间戳"""

        import time

        return int(time.time() * 1000)

    

    async def start(self):

        """启动WebSocket服务器"""

        async with websockets.serve(self.handler, self.host, self.port):

            print(f"WebSocket服务器启动于 ws://{self.host}:{self.port}")

            await asyncio.Future()  # 永 久运行


# WebSocket客户端实现

class WebSocketClient:

    """WebSocket客户端"""

    

    def __init__(self, uri: str):

        self.uri = uri

        self.websocket = None

        self.connected = False

    

    async def connect(self):

        """连接到WebSocket服务器"""

        try:

            self.websocket = await websockets.connect(

                self.uri,

                ping_interval=20,

                ping_timeout=10,

                close_timeout=5

            )

            self.connected = True

            print(f"已连接到 {self.uri}")

            return True

        except Exception as e:

            print(f"连接失败: {e}")

            return False

    

    async def send_message(self, message):

        """发送消息"""

        if not self.connected or not self.websocket:

            raise ConnectionError("未连接")

        

        try:

            await self.websocket.send(message)

        except websockets.exceptions.ConnectionClosed:

            self.connected = False

            raise

    

    async def receive_message(self):

        """接收消息"""

        if not self.connected or not self.websocket:

            raise ConnectionError("未连接")

        

        try:

            return await self.websocket.recv()

        except websockets.exceptions.ConnectionClosed:

            self.connected = False

            raise

    

    async def close(self):

        """关闭连接"""

        if self.websocket:

            await self.websocket.close()

            self.connected = False

```


## 总结


Python网络编程涵盖了从底层TCP/IP协议到高层WebSocket的全栈实现。每种协议都有其适用的场景:TCP/IP提供可靠传输,UDP适用于实时应用,HTTP支撑Web通信,而WebSocket实现了真正的全双工通信。理解这些协议的核心机制对于构建高性能网络应用至关重要。在实际开发中,需要根据具体需求选择合适的协议和实现方式,并结合Python的异步特性优化性能。


请使用浏览器的分享功能分享到微信等