https://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。
SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。
总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。
SSE 的客户端 API 部署在EventSource对象上。下面的代码可以检测浏览器是否支持 SSE。
if ('EventSource' in window) {
// ...
}
上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。
var source = new EventSource(url, { withCredentials: true });
EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。
连接一旦建立,就会触发open事件,可以在onopen属性定义回调函数。
source.onopen = function (event) {
// ...
};
// 另一种写法
source.addEventListener('open', function (event) {
// ...
}, false);
客户端收到服务器发来的数据,就会触发message事件,可以在onmessage属性的回调函数。
source.onmessage = function (event) {
var data = event.data;
// handle message
};
// 另一种写法
source.addEventListener('message', function (event) {
var data = event.data;
// handle message
}, false);
上面代码中,事件对象的data属性就是服务器端传回的数据(文本格式)。
如果发生通信错误(比如连接中断),就会触发error事件,可以在onerror属性定义回调函数。
source.onerror = function (event) {
// handle error event
};
// 另一种写法
source.addEventListener('error', function (event) {
// handle error event
}, false);
close方法用于关闭 SSE 连接。
source.close();
默认情况下,服务器发来的数据,总是触发浏览器EventSource实例的message事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发message事件。
source.addEventListener('foo', function (event) {
var data = event.data;
// handle message
}, false);
上面代码中,浏览器对 SSE 的foo事件进行监听。如何实现服务器发送foo事件,请看下文。
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息。
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
上面三行之中,第一行的Content-Type必须指定 MIME 类型为event-steam
。
每一次发送的信息,由若干个message组成,每个message之间用nn
分隔。每个message内部由若干行组成,每一行都是如下格式,每行是n
结束。
[field]: valuen
上面的field可以取四个值。
此外,还可以有冒号开头的行,表示注释。通常,服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。
: This is a comment
下面是一个例子。
: this is a test streamnn
data: some textnn
data: another messagen
data: with two lines nn
数据内容用data字段表示。
data: messagenn
如果数据很长,可以分成多行,最后一行用nn
结尾,前面行都用n
结尾。
data: begin messagen
data: continue messagenn
下面是一个发送 JSON 数据的例子。
data: {n
data: "foo": "bar",n
data: "baz", 555n
data: }nn
数据标识符用id字段表示,相当于每一条数据的编号。
id: msg1n
data: messagenn
浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。
event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件,下面代码使用了自定义事件foo
event: foon
data: a foo eventnn
data: an unnamed eventnn
event: barn
data: a bar eventnn
上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件。
服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。
retry: 10000n
两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。
SSE 要求服务器与浏览器保持连接。对于不同的服务器软件来说,所消耗的资源是不一样的。Apache 服务器,每个连接就是一个线程,如果要维持大量连接,势必要消耗大量资源。Node 则是所有连接都使用同一个线程,因此消耗的资源会小得多,但是这要求每个连接不能包含很耗时
的操作,比如磁盘的 IO 读写。
下面是 springboot的SseEmitter服务器实例,它使用自定义的event,添加了id字段
@GetMapping(value = "/sse1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter(60_000L); // 超时时间设为 60 秒
// 异步发送数据(模拟实时推送)
new Thread(() -> {
try {
for (int i = 0; i
然后在谷歌浏览器访问http://localhost:8080/sse1,看一下结果如下
在选择用于 Server-Sent Events (SSE) 的开发语言时,性能是一个重要考量因素。以下是这四种语言在 SSE 场景下的性能分析和比较:
语言 | 并发性能 | 内存效率 | 连接开销 | 生态系统 | 适用场景 |
---|---|---|---|---|---|
Go | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 高并发、大规模连接 |
Java | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 企业级应用、复杂业务逻辑 |
C# | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | Windows 环境、.NET 生态系统 |
Python | ⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | 快速原型、中小规模应用 |
优势:
net/http
包对长连接有良好支持示例代码:
package main
import (
"fmt"
"net/http"
"time"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// 模拟持续发送事件
for {
fmt.Fprintf(w, "data: %snn", time.Now().Format("2006-01-02 15:04:05"))
flusher.Flush()
time.Sleep(1 * time.Second)
}
}
func main() {
http.HandleFunc("/events", sseHandler)
fmt.Println("SSE server running on :8080")
http.ListenAndServe(":8080", nil)
}
优势:
示例代码(使用 Spring Boot):
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
public class SseController {
@GetMapping(path = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSse() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
emitter.send(SseEmitter.event()
.data("Current time: " + java.time.LocalDateTime.now()));
} catch (IOException e) {
emitter.completeWithError(e);
scheduler.shutdown();
}
}, 0, 1, TimeUnit.SECONDS);
emitter.onCompletion(scheduler::shutdown);
return emitter;
}
}
优势:
示例代码(使用 ASP.NET Core):
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
public class SseController : Controller
{
[HttpGet("events")]
public async Task GetEvents()
{
Response.Headers.Add("Content-Type", "text/event-stream");
Response.Headers.Add("Cache-Control", "no-cache");
Response.Headers.Add("Connection", "keep-alive");
while (true)
{
await Response.WriteAsync($"data: {DateTime.Now}nn");
await Response.Body.FlushAsync();
await Task.Delay(1000);
}
}
}
优势:
劣势:
示例代码(使用 aiohttp):
from aiohttp import web
import asyncio
import datetime
async def events(request):
response = web.StreamResponse()
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Cache-Control'] = 'no-cache'
response.headers['Connection'] = 'keep-alive'
await response.prepare(request)
while True:
data = f"data: {datetime.datetime.now()}nn"
await response.write(data.encode('utf-8'))
await asyncio.sleep(1)
app = web.Application()
app.router.add_get('/events', events)
if __name__ == '__main__':
web.run_app(app, port=8080)
根据各种性能基准测试(如 TechEmpower Web Framework Benchmarks):
对于 SSE 应用,从纯性能角度考虑,Go 是最佳选择,特别是在需要处理大量并发连接的场景。Java 和 C# 在性能上也非常接近,并且提供了更丰富的企业级功能。Python 虽然性能相对较低,但其开发效率和丰富的库生态系统使其在中小规模应用中仍然是一个可行的选择。
最终的选择应该基于您的具体需求、团队技术栈熟悉度、现有基础设施以及性能要求来综合考虑。
参与评论
手机查看
返回顶部