导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • Runnable
  • PromptEngineering
  • dataclasses
  • LaTeX
  • rank_bm25
  • TF-IDF
  • asyncio
  • sqlalchemy
  • fastapi
  • Starlette
  • uvicorn
  • argparse
  • Generic
  • ssl
  • urllib
  • python-dotenv
  • RRF
  • CrossEncoder
  • Lost-in-the-middle
  • Jinja2
  • logger
  • io
  • venv
  • concurrent
  • parameter
  • SSE
  • 1. 前置知识
    • 1.1 什么是 HTTP 请求和响应
    • 1.2 什么是流式传输
    • 1.3 为什么需要服务器推送
  • 2. SSE 概述
    • 2.1 SSE 的核心特点
    • 2.2 SSE 的适用场景
    • 2.3 SSE 的工作原理
  • 3. SSE 协议格式
    • 3.1 基本格式
    • 3.2 字段说明
    • 3.3 格式示例
  • 4. Python SSE 服务器实现(Flask)
    • 4.1 最简单的 SSE 服务器
    • 4.2 带 HTML 客户端的完整示例
    • 4.3 发送自定义事件和 JSON 数据
  • 5. Python SSE 客户端
    • 5.1 使用 requests 库的简单客户端
  • 6. 实际应用示例:实时进度更新
  • 7. SSE vs WebSocket
  • 8. 常见问题和注意事项
    • 8.1 连接断开处理
    • 8.2 心跳机制
    • 8.3 数据格式
  • 9. 总结

1. 前置知识 #

SSE (Server-Sent Events) 是一种让服务器能够主动向客户端推送数据的技术。

在开始学习 SSE 之前,我们需要理解几个基础概念。

1.1 什么是 HTTP 请求和响应 #

HTTP 是网页通信的基础协议。通常的流程是:

  1. 客户端发送请求:浏览器向服务器请求一个网页
  2. 服务器返回响应:服务器一次性返回完整的网页内容
  3. 连接关闭:数据传输完成后,连接关闭

这是一次性的交互:请求 → 响应 → 结束。

1.2 什么是流式传输 #

流式传输(Streaming)是指数据不是一次性发送完,而是持续不断地发送。就像打开水龙头,水会持续流出,而不是一次性倒出一桶水。

在 Python 中,我们可以使用生成器(Generator)来实现流式传输:

# 定义一个生成器函数
def count_numbers():
    """
    生成器函数:逐个生成数字
    使用 yield 关键字,每次返回一个值
    """
    # 从 1 开始计数到 5(包含 5)
    for i in range(1, 6):
        # 使用 yield 返回当前数字的字符串,并在末尾加换行
        yield f"数字: {i}\n"

# 遍历生成器产生的每个数字
for number in count_numbers():
    # 每次循环获取一个值,end='' 表示不再额外添加换行
    print(number, end='')

# 输出示例:
# 数字: 1
# 数字: 2
# 数字: 3
# 数字: 4
# 数字: 5

1.3 为什么需要服务器推送 #

传统的 HTTP 请求是客户端主动请求,服务器被动响应。但有些场景需要服务器主动推送数据给客户端:

  • 实时通知:新消息、系统通知
  • 进度更新:文件上传/下载进度
  • 实时数据:股票价格、新闻更新
  • 状态监控:服务器状态、日志输出

在这些场景中,如果让客户端不断轮询(每隔几秒请求一次),会浪费资源。SSE 提供了一种更优雅的解决方案。

2. SSE 概述 #

SSE (Server-Sent Events) 是一种服务器向客户端推送实时数据的技术。

2.1 SSE 的核心特点 #

SSE 有以下重要特点:

  1. 单向通信:只能服务器向客户端推送,客户端不能通过 SSE 向服务器发送数据
  2. 基于 HTTP:使用标准的 HTTP 协议,不需要特殊的协议升级
  3. 自动重连:如果连接断开,浏览器会自动尝试重新连接
  4. 简单易用:比 WebSocket 更简单,适合简单的推送场景
  5. 文本格式:只能传输文本数据(通常是 JSON 格式)

2.2 SSE 的适用场景 #

SSE 适合以下场景:

  • 实时通知:系统通知、消息提醒
  • 进度更新:任务进度、文件处理状态
  • 数据推送:股票行情、新闻更新
  • 日志输出:实时查看服务器日志
  • 双向通信:需要客户端频繁发送数据(应该用 WebSocket)
  • 二进制数据:需要传输图片、视频等(应该用 WebSocket)

2.3 SSE 的工作原理 #

SSE 的工作流程非常简单:

  1. 客户端发起请求:浏览器向服务器请求 SSE 连接
  2. 服务器保持连接:服务器不立即关闭连接,而是保持连接打开
  3. 服务器持续推送:服务器通过这个连接持续发送数据
  4. 客户端接收数据:浏览器自动接收并处理推送的数据
  5. 自动重连:如果连接断开,浏览器自动重新连接

3. SSE 协议格式 #

SSE 使用特定的文本格式来传输数据。理解这个格式很重要。

3.1 基本格式 #

SSE 数据由多行文本组成,每行以 \n 结尾,每个事件以两个换行符 \n\n 分隔。

data: 这是一条消息\n\n

这表示一个事件,内容是"这是一条消息"。

3.2 字段说明 #

SSE 支持以下字段:

  • data:事件的数据内容(必需)
  • event:事件类型(可选,默认为 "message")
  • id:事件 ID(可选,用于断线重连)
  • retry:重连间隔(可选,单位毫秒)
  • ::注释行(可选,以冒号开头)

3.3 格式示例 #

# 最简单的消息
# data: 消息内容\n\n

# 多行数据(会被合并为一行,用 \n 分隔)
# data: 第一行\n
# data: 第二行\n\n

# 自定义事件类型
# event: customEvent\n
# data: 自定义事件数据\n\n

# 带 ID 的消息(用于断线重连时定位)
# id: 123\n
# data: 带ID的消息\n\n

# 注释行(客户端会忽略)
# : 这是一条注释\n
# data: 实际消息\n\n

4. Python SSE 服务器实现(Flask) #

Flask 是一个简单易用的 Python Web 框架,非常适合学习 SSE。

4.1 最简单的 SSE 服务器 #

下面是一个最简单的 SSE 服务器示例:

# 导入 Flask 应用框架和响应对象
from flask import Flask, Response
# 导入 time 模块,用于实现延时
import time

# 创建 Flask 应用实例
app = Flask(__name__)

# 定义 SSE(Server-Sent Events)流端点,路由为 /stream
@app.route('/stream')
def stream():
    """
    SSE 流端点
    返回一个持续发送数据的响应
    """
    # 定义生成器函数,用于持续生成 SSE 事件
    def generate():
        """
        生成器函数:持续生成 SSE 事件
        这个函数会一直运行,直到客户端断开连接
        """
        # 初始化计数器,用于消息编号
        count = 0

        # 无限循环,持续发送数据给客户端
        while True:
            # 计数器自增 1
            count += 1

            # 构造 SSE 格式的数据消息,包含当前计数
            # 格式:data: 消息内容\n\n
            message = f"data: 这是第 {count} 条消息\n\n"

            # 使用 yield 以流式方式返回数据
            yield message

            # 等待 1 秒后发送下一条消息,实现定时推送
            time.sleep(1)

    # 返回一个 Response 对象,设置为 SSE 流式响应
    # mimetype 指定为 text/event-stream 表示 SSE
    # headers 禁用缓存并保持连接不断开
    return Response(
        generate(),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',  # 禁止缓存
            'Connection': 'keep-alive'    # 保持连接不断开
        }
    )

# 判断脚本是否为主程序入口
if __name__ == '__main__':
    # 以调试模式运行服务器,支持多线程,监听 5000 端口
    app.run(debug=True, threaded=True, port=5000)

运行方法:

  1. 保存为 sse_server.py
  2. 安装 Flask:pip install flask
  3. 运行:python sse_server.py
  4. 在浏览器访问:http://localhost:5000/stream

4.2 带 HTML 客户端的完整示例 #

下面是一个完整的示例,包含服务器和客户端页面:

# 导入 Flask 所需的模块
from flask import Flask, Response, render_template_string
# 导入 time 模块用于延时
import time
# 导入 json 模块用于格式化数据
import json
# 导入 datetime 获取当前时间
from datetime import datetime

# 创建一个 Flask 应用实例
app = Flask(__name__)

# 定义 HTML 客户端页面,嵌入 Python 代码的字符串中
HTML_PAGE = '''
<!DOCTYPE html>
<html>
<head>
    <title>SSE 演示</title>
    <style>
        /* 页面样式 */
        body {
            font-family: Arial, sans-serif;
            margin: 40px;
            background-color: #f5f5f5;
        }
        .container {
            max-width: 800px;
            margin: 0 auto;
            background: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        h1 {
            color: #333;
        }
        #status {
            padding: 10px;
            margin: 10px 0;
            border-radius: 4px;
            font-weight: bold;
        }
        .connected {
            background-color: #d4edda;
            color: #155724;
        }
        .disconnected {
            background-color: #f8d7da;
            color: #721c24;
        }
        button {
            padding: 10px 20px;
            margin: 5px;
            font-size: 16px;
            cursor: pointer;
            border: none;
            border-radius: 4px;
            background-color: #007bff;
            color: white;
        }
        button:hover {
            background-color: #0056b3;
        }
        #messages {
            border: 1px solid #ddd;
            padding: 20px;
            height: 400px;
            overflow-y: auto;
            margin-top: 20px;
            background-color: #fafafa;
        }
        .message {
            margin: 5px 0;
            padding: 8px;
            border-radius: 4px;
            background-color: #e9ecef;
        }
        .message.info {
            background-color: #d1ecf1;
            color: #0c5460;
        }
        .message.data {
            background-color: #d4edda;
            color: #155724;
        }
    </style>
</head>
<body>
    <div class="container">
        <h1>Server-Sent Events 演示</h1>
        <div id="status" class="disconnected">状态: 未连接</div>
        <button onclick="connectSSE()">连接</button>
        <button onclick="disconnectSSE()">断开</button>
        <div id="messages"></div>
    </div>

    <script>
        // 用于存储 EventSource 对象
        let eventSource;

        // 连接 SSE 服务器
        function connectSSE() {
            // 创建 EventSource 实例并连接到 /stream
            eventSource = new EventSource('/stream');

            // 监听连接成功事件
            eventSource.onopen = function(e) {
                updateStatus('已连接', 'connected');
                addMessage('连接已建立', 'info');
            };

            // 监听收到服务端消息事件
            eventSource.onmessage = function(e) {
                // 显示收到的消息内容
                addMessage('收到消息: ' + e.data, 'data');
            };

            // 监听连接错误事件,断线会自动重连
            eventSource.onerror = function(e) {
                updateStatus('连接错误', 'disconnected');
                addMessage('连接错误,尝试重连...', 'info');
            };
        }

        // 断开 SSE 连接
        function disconnectSSE() {
            if (eventSource) {
                // 关闭 EventSource 连接
                eventSource.close();
                updateStatus('已断开', 'disconnected');
                addMessage('连接已关闭', 'info');
            }
        }

        // 在页面添加一条消息
        function addMessage(text, type) {
            const messagesDiv = document.getElementById('messages');
            const messageDiv = document.createElement('div');
            messageDiv.className = 'message ' + type;
            messageDiv.textContent = new Date().toLocaleTimeString() + ' - ' + text;
            messagesDiv.appendChild(messageDiv);
            // 自动滚动到消息底部
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }

        // 更新界面连接状态显示
        function updateStatus(text, className) {
            const statusDiv = document.getElementById('status');
            statusDiv.textContent = '状态: ' + text;
            statusDiv.className = className;
        }
    </script>
</body>
</html>
'''

# 路由根路径,返回 HTML 客户端页面
@app.route('/')
def index():
    """
    首页:返回包含 SSE 客户端的 HTML 页面
    """
    # 渲染字符串形式的 HTML 页面
    return render_template_string(HTML_PAGE)

# 定义 SSE 流式端点
@app.route('/stream')
def stream():
    """
    SSE 流端点
    持续向客户端推送数据
    """
    def generate():
        """
        生成器函数:生成 SSE 事件流
        """
        # 向客户端推送初始连接成功消息
        yield f"data: 连接成功!欢迎使用 SSE 服务\n\n"

        # 用于计数已发送消息次数
        count = 0

        try:
            # 无限循环,持续向客户端推送数据
            while True:
                # 间隔 1 秒推送一次数据
                time.sleep(1)

                # 消息计数器加一
                count += 1

                # 获取当前服务器时间
                current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                # 组装 SSE 消息格式内容
                # data: 消息内容\n\n
                message = f"data: 服务器时间: {current_time},已发送 {count} 条消息\n\n"
                yield message

        except GeneratorExit:
            # 生成器关闭(客户端断开)时执行
            print("客户端断开连接")
        except Exception as e:
            # 发生异常时输出异常信息
            print(f"生成流时出错: {e}")

    # 返回 SSE 响应,设置必要响应头
    return Response(
        generate(),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',      # 禁止缓存
            'Connection': 'keep-alive',       # 保持连接不断开
            'X-Accel-Buffering': 'no'         # 禁用 Nginx 缓冲(如有需要)
        }
    )

# 如果文件作为主程序运行,则启动 Flask 服务器
if __name__ == '__main__':
    # 调试模式启动,开启多线程,监听5000端口
    app.run(debug=True, threaded=True, port=5000)

使用方法:

  1. 保存为 sse_server.py
  2. 安装 Flask:pip install flask
  3. 运行:python sse_server.py
  4. 在浏览器访问:http://localhost:5000
  5. 点击"连接"按钮,开始接收服务器推送的消息

4.3 发送自定义事件和 JSON 数据 #

SSE 支持发送不同类型的事件和 JSON 格式的数据:

# 导入 Flask、Response、render_template_string
from flask import Flask, Response, render_template_string
# 导入 time 标准库
import time
# 导入 json 标准库
import json
# 导入 datetime,用于获取当前时间
from datetime import datetime

# 创建 Flask 应用实例
app = Flask(__name__)

# 定义前端 HTML 页面,用于展示 SSE 功能
HTML_PAGE = '''
<!DOCTYPE html>
<html>
<head>
    <title>SSE 自定义事件演示</title>
    <style>
        body { font-family: Arial; margin: 40px; }
        #messages { border: 1px solid #ccc; padding: 20px; height: 400px; overflow-y: auto; }
        .message { margin: 5px 0; padding: 8px; background: #f5f5f5; }
        .event { background: #e3f2fd; }
        .data { background: #e8f5e9; }
    </style>
</head>
<body>
    <h1>SSE 自定义事件演示</h1>
    <button onclick="connect()">连接</button>
    <button onclick="disconnect()">断开</button>
    <div id="messages"></div>

    <script>
        let eventSource;

        function connect() {
            eventSource = new EventSource('/stream');

            // 监听默认的 message 事件
            eventSource.onmessage = function(e) {
                addMessage('默认消息: ' + e.data, 'data');
            };

            // 监听自定义事件 'update'
            eventSource.addEventListener('update', function(e) {
                const data = JSON.parse(e.data);
                addMessage(`更新事件: ${data.message} (时间: ${data.timestamp})`, 'event');
            });

            // 监听自定义事件 'heartbeat'
            eventSource.addEventListener('heartbeat', function(e) {
                addMessage('心跳: ' + e.data, 'data');
            });
        }

        function disconnect() {
            if (eventSource) {
                eventSource.close();
            }
        }

        function addMessage(text, type) {
            const div = document.getElementById('messages');
            const msg = document.createElement('div');
            msg.className = 'message ' + type;
            msg.textContent = new Date().toLocaleTimeString() + ' - ' + text;
            div.appendChild(msg);
            div.scrollTop = div.scrollHeight;
        }
    </script>
</body>
</html>
'''

# 设置根路由,返回前面定义的 HTML 页面
@app.route('/')
def index():
    # 返回HTML页面
    return render_template_string(HTML_PAGE)

# 定义/stream路由,SSE数据流端点
@app.route('/stream')
def stream():
    # 生成SSE事件流的生成器函数
    def generate():
        # 发送初始消息,通知连接成功
        yield f"data: 连接成功!\n\n"

        # 消息计数器初始化为0
        count = 0

        try:
            # 无限循环,不断推送数据
            while True:
                # 休眠1秒,控制推送频率
                time.sleep(1)
                # 消息计数器加1
                count += 1

                # 每2秒发送一次自定义心跳事件
                if count % 2 == 0:
                    # 格式符合SSE规范:event: 事件类型\ndata: 数据\n\n
                    yield f"event: heartbeat\ndata: 心跳 #{count}\n\n"

                # 每5秒发送一次更新事件,包含JSON数据
                if count % 5 == 0:
                    # 构造事件数据字典
                    event_data = {
                        "event_id": count,
                        "message": f"这是第 {count} 个更新事件",
                        "timestamp": datetime.now().isoformat(),
                        "data": {
                            "value": count * 10,
                            "random": time.time()
                        }
                    }
                    # 使用json.dumps将事件数据转为JSON字符串
                    json_data = json.dumps(event_data, ensure_ascii=False)
                    # 发送'update'自定义事件,内容为JSON字符串
                    yield f"event: update\ndata: {json_data}\n\n"

                # 每10秒发送一次普通消息
                if count % 10 == 0:
                    yield f"data: 服务器运行中,已发送 {count} 条消息\n\n"

        # 生成器关闭时(如客户端断开连接),捕获异常
        except GeneratorExit:
            print("客户端断开连接")
        # 捕获其它异常并打印
        except Exception as e:
            print(f"生成流时出错: {e}")

    # 返回SSE响应,设置Response对象及必要的头部
    return Response(
        generate(),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive'
        }
    )

# 主程序入口,启动Flask开发服务器
if __name__ == '__main__':
    # 启动服务,开启调试模式,多线程,端口5000
    app.run(debug=True, threaded=True, port=5000)

5. Python SSE 客户端 #

除了在浏览器中使用,我们也可以用 Python 编写 SSE 客户端。

5.1 使用 requests 库的简单客户端 #

# 导入requests库,用于HTTP请求
import requests
# 导入time库(虽然当前代码未直接用到)
import time

# 定义一个SSE客户端类
class SSEClient:
    """
    简单的 SSE 客户端
    使用 requests 库接收服务器推送的数据
    """

    # 构造函数,初始化SSEClient对象
    def __init__(self, url):
        """
        初始化客户端
        url: SSE 服务器的地址
        """
        # 保存服务器地址
        self.url = url
        # 创建一个requests会话
        self.session = requests.Session()

    # 监听SSE流的主要方法
    def listen(self):
        """
        监听 SSE 流
        持续接收服务器推送的数据
        """
        try:
            # 发送GET请求,开启流式接收(stream=True)
            response = self.session.get(self.url, stream=True)

            # 定义缓冲区存储未处理的文本数据
            buffer = ""

            # 循环遍历服务器传回的数据块
            for chunk in response.iter_content(chunk_size=1024):
                # 如果本次收到非空数据块
                if chunk:
                    # 将字节解码为字符串并追加到缓冲区
                    buffer += chunk.decode('utf-8')

                    # 只要缓冲区中包含完整事件(以\n\n结尾),就处理
                    while '\n\n' in buffer:
                        # 以第一个\n\n为界,分离出完整的事件文本
                        event_text, buffer = buffer.split('\n\n', 1)
                        # 处理当前事件文本
                        self.process_event(event_text)

        # 捕获用户手动中断(Ctrl+C)
        except KeyboardInterrupt:
            # 打印客户端停止信息
            print("\n客户端停止")
        # 捕获其它异常并打印错误信息
        except Exception as e:
            print(f"连接错误: {e}")

    # 处理单个事件的函数
    def process_event(self, event_text):
        """
        处理单个 SSE 事件
        event_text: 事件的文本内容
        """
        # 初始化事件字典,预设默认值
        event = {
            'data': '',           # 事件数据内容
            'event': 'message',   # 事件类型,默认为message
            'id': None,           # 事件ID
            'retry': None         # 重连间隔
        }

        # 按行遍历事件文本内容
        for line in event_text.split('\n'):
            # 解析data字段
            if line.startswith('data:'):
                # 提取data值并去除空白
                event['data'] = line[5:].strip()
            # 解析event字段
            elif line.startswith('event:'):
                event['event'] = line[6:].strip()
            # 解析id字段
            elif line.startswith('id:'):
                event['id'] = line[3:].strip()
            # 解析retry字段
            elif line.startswith('retry:'):
                event['retry'] = line[6:].strip()
            # 忽略注释行(以:开头)
            elif line.startswith(':'):
                continue

        # 打印当前事件的类型
        print(f"\n[事件类型: {event['event']}]")
        # 打印事件数据内容
        print(f"数据: {event['data']}")
        # 如果有ID则打印ID
        if event['id']:
            print(f"ID: {event['id']}")
        # 打印分割线
        print("-" * 50)

# 主程序入口
if __name__ == "__main__":
    # 创建客户端对象,并指定要连接的SSE服务器URL
    client = SSEClient("http://localhost:5000/stream")
    # 打印连接信息
    print("连接 SSE 服务器...")
    # 提示用户如何停止
    print("按 Ctrl+C 停止")
    # 开始监听服务器推送
    client.listen()

使用方法:

  1. 确保 SSE 服务器正在运行(参考前面的示例)
  2. 保存为 sse_client.py
  3. 安装 requests:pip install requests
  4. 运行:python sse_client.py

6. 实际应用示例:实时进度更新 #

下面是一个实际应用示例,展示如何使用 SSE 实现实时进度更新:

# 导入 Flask 框架中的 Flask、Response、render_template_string
from flask import Flask, Response, render_template_string
# 导入时间模块用于模拟耗时操作
import time
# 导入随机数模块
import random

# 创建 Flask 应用对象
app = Flask(__name__)

# 定义网页的 HTML 内容
HTML_PAGE = '''
<!DOCTYPE html>
<html>
<head>
    <title>实时进度更新</title>
    <style>
        body { font-family: Arial; margin: 40px; }
        .progress-container {
            width: 500px;
            margin: 20px 0;
        }
        .progress-bar {
            width: 100%;
            height: 30px;
            background-color: #e0e0e0;
            border-radius: 15px;
            overflow: hidden;
        }
        .progress-fill {
            height: 100%;
            background-color: #4caf50;
            width: 0%;
            transition: width 0.3s;
            display: flex;
            align-items: center;
            justify-content: center;
            color: white;
            font-weight: bold;
        }
        .status {
            margin: 10px 0;
            font-size: 18px;
        }
    </style>
</head>
<body>
    <h1>实时进度更新演示</h1>
    <button onclick="startTask()">开始任务</button>
    <div class="progress-container">
        <div class="progress-bar">
            <div class="progress-fill" id="progress">0%</div>
        </div>
    </div>
    <div class="status" id="status">等待开始...</div>

    <script>
        // 定义 EventSource 变量,用于后续连接
        let eventSource;

        // 定义开始任务函数
        function startTask() {
            // 连接到进度更新 SSE 接口
            eventSource = new EventSource('/progress');

            // 监听自定义 progress 事件
            eventSource.addEventListener('progress', function(e) {
                // 解析数据为 JS 对象
                const data = JSON.parse(e.data);
                // 获取进度条 Dom
                const progressBar = document.getElementById('progress');
                // 设置进度宽度
                progressBar.style.width = data.percent + '%';
                // 显示进度百分比文本
                progressBar.textContent = data.percent + '%';
                // 设置当前状态信息
                document.getElementById('status').textContent = data.message;
            });

            // 监听任务完成事件
            eventSource.addEventListener('complete', function(e) {
                // 解析数据
                const data = JSON.parse(e.data);
                // 显示完成状态
                document.getElementById('status').textContent = data.message;
                // 关闭 SSE 连接
                eventSource.close();
            });
        }
    </script>
</body>
</html>
'''

# 路由 '/' 返回 HTML 页面
@app.route('/')
def index():
    # 返回 HTML_PAGE 的内容到浏览器
    return render_template_string(HTML_PAGE)

# 路由 '/progress' 提供 SSE 实时进度
@app.route('/progress')
def progress():
    # 内部生成器:持续推送进度信息
    def generate():
        # 模拟总的任务步骤数量
        total_steps = 100

        # 从0到100共进行 total_steps+1 次循环
        for step in range(total_steps + 1):
            # 计算当前进度百分比
            percent = int((step / total_steps) * 100)

            # 根据进度设置不同的消息
            if step == 0:
                message = "任务开始..."
            elif step < total_steps:
                message = f"处理中... ({step}/{total_steps})"
            else:
                message = "任务完成!"

            # 组装进度数据字典
            progress_data = {
                "percent": percent,
                "step": step,
                "total": total_steps,
                "message": message
            }

            # 导入 json 模块
            import json
            # 序列化进度数据为 JSON 字符串
            json_data = json.dumps(progress_data)

            # 按 SSE 格式发送进度事件
            yield f"event: progress\ndata: {json_data}\n\n"

            # 模拟每一步的耗时
            time.sleep(0.1)

        # 组装完成后的消息数据
        complete_data = {
            "message": "所有任务已完成!",
            "timestamp": time.time()
        }
        # 再次导入 json 模块(可优化)
        import json
        # 按 SSE 格式发送完成事件
        yield f"event: complete\ndata: {json.dumps(complete_data)}\n\n"

    # 返回使用 text/event-stream 作为 mimetype 的 Response
    return Response(
        generate(),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive'
        }
    )

# 如果直接运行本脚本,则启动 Flask WebServer
if __name__ == '__main__':
    # 以 debug 模式、支持多线程、端口5000运行
    app.run(debug=True, threaded=True, port=5000)

7. SSE vs WebSocket #

SSE 和 WebSocket 都可以实现实时通信,但它们有不同的特点:

特性 SSE WebSocket
通信方向 单向(服务器→客户端) 双向
协议 HTTP 独立的 ws 协议
连接管理 自动重连 需要手动处理
数据传输 文本 文本和二进制
复杂度 简单 复杂
浏览器支持 现代浏览器都支持 所有现代浏览器
适用场景 通知、推送、实时更新 聊天、游戏、实时协作

选择建议:

  • 只需要服务器推送数据 → 使用 SSE
  • 需要双向实时通信 → 使用 WebSocket
  • 简单场景 → 使用 SSE
  • 复杂场景 → 使用 WebSocket

8. 常见问题和注意事项 #

8.1 连接断开处理 #

SSE 会自动重连,但有时需要手动处理:

# 定义生成器函数,用于发送 SSE 消息
def generate():
    try:
        # 无限循环,不断产生数据
        while True:
            # 发送一条消息给客户端
            yield f"data: 消息\n\n"
            # 每隔 1 秒发送一次
            time.sleep(1)
    except GeneratorExit:
        # 捕获客户端断开连接的异常
        print("客户端断开")
    except Exception as e:
        # 捕获其它异常并打印错误信息
        print(f"错误: {e}")

8.2 心跳机制 #

长时间连接时,应该定期发送心跳,防止连接被关闭:

# 定义生成器函数,用于发送 SSE 消息
def generate():
    # 初始化一个计数器
    count = 0
    # 无限循环,不断产生消息
    while True:
        # 暂停 1 秒,控制消息发送频率
        time.sleep(1)
        # 计数器加一
        count += 1
        # 每 30 秒发送一次心跳
        if count % 30 == 0:
            # 发送注释作为心跳(客户端会忽略该行)
            yield f": heartbeat {count}\n\n"
        else:
            # 发送实际数据消息给客户端
            yield f"data: 消息 {count}\n\n"

8.3 数据格式 #

SSE 只能传输文本,如果要传输复杂数据,使用 JSON:

# 导入 json 模块,用于处理 JSON 数据
import json

# 定义一个包含用户信息的字典
data = {
    "name": "张三",
    "age": 25,
    "city": "北京"
}

# 将字典 data 转换为 JSON 字符串(确保中文不转义)
json_str = json.dumps(data, ensure_ascii=False)

# 以 SSE 的 data 格式发送 JSON 字符串到客户端
yield f"data: {json_str}\n\n"

9. 总结 #

SSE 是一种简单高效的服务器推送技术,特别适合只需要服务器向客户端推送数据的场景。

主要优点:

  1. 简单易用:基于 HTTP,实现简单
  2. 自动重连:浏览器自动处理连接断开和重连
  3. 轻量级:比 WebSocket 更简单
  4. 适合推送场景:通知、进度更新、实时数据

主要缺点:

  1. 单向通信:只能服务器向客户端推送
  2. 只支持文本:不能传输二进制数据
  3. 连接数限制:浏览器对同一域名的 SSE 连接数有限制

使用建议:

  • 简单推送场景 → 使用 SSE
  • 需要双向通信 → 使用 WebSocket
  • 实时通知、进度更新 → 使用 SSE
  • 聊天、游戏、实时协作 → 使用 WebSocket
← 上一节 sqlalchemy 下一节 ssl →

访问验证

请输入访问令牌

Token不正确,请重新输入