最近在着手一个项目,其中有一个模块功能需要使用数据的实时展示。我在页面上要等待工单数据的三个值更新了,才能进行下一步工单的操作。
当时做的时候,采用的是简单粗暴的。让用户在首页刷新下数据等有数据了再进入工单操作页面。结果不到一天时间就被客户吐槽了(偷懒的想法未能实现)。在调整的过程中也了解了这种实时数据推送的。一般想到的就是轮询和webscoket方式。
了解了下在项目中实现数据实时推送通常涉及使用一些实时通信技术或者消息队列系统。以下是几种常见的实现方式:
- WebSocket: WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,可以实现客户端和服务器之间的实时数据传输。在项目中可以使用 WebSocket 技术来建立持久连接,并实时推送数据给客户端。
- 消息队列系统: 使用消息队列系统如 Apache Kafka、RabbitMQ、ActiveMQ 等,将数据生产者产生的数据发布到消息队列中,然后订阅者可以实时地从队列中消费数据,实现数据的实时推送。
- Server-Sent Events (SSE): SSE 是一种用于通过 HTTP 将服务器推送事件发送到浏览器的技术。服务器端可以周期性地发送更新给客户端,客户端通过 EventSource API 接收这些更新。
- HTTP 长轮询(Long Polling): 客户端发起一个 HTTP 请求到服务器,服务器一直保持连接打开,直到有新的数据可供发送,然后服务器响应并关闭连接,客户端收到响应后立即发起下一个请求。
- 使用第三方实时通信平台: 有一些第三方实时通信平台(如Firebase、Pusher等)提供了易于集成的实时数据推送服务,可以考虑使用它们来实现实时推送功能。
选择哪种方式取决于项目的具体需求、技术栈和可扩展性需求。最后经过对比了各自的优缺点和根据项目的实际情况。我这边还是选择了SSE。一种单向的持久连接的推送的方式。由于我的数据只需要前端知道变更了就行,不需要往后台发送信息。相比websocket更加的轻量。
具体的项目中的使用方式,我前端使用的是VUE3 +TS
// 主要的初始化代码
const initEventSource = (dataId?: any) => {
if (typeof EventSource !== 'undefined') {
const evtSource = new EventSource('http://***:7011/sync/see/subscribe?id=' + dataId, { withCredentials: true }) // 后端接口,要配置允许跨域属性
// 与事件源的连接刚打开时触发
evtSource.onopen = function (e) {
console.log(e)
}
// 当从事件源接收到数据时触发
evtSource.onmessage = function (e) {
console.log('服务端发送消息:' + e.data)
}
// 与事件源的连接无法打开时触发
evtSource.onerror = function (e) {
console.log(e)
evtSource.close() // 关闭连接
}
// 也可以侦听命名事件,即自定义的事件
evtSource.addEventListener('msgtz', function (e) {
console.log('msgtz:' + e.data)
const newdata = JSON.parse(e.data)
// 需要动态刷新 tableData 的数据 未实现 ,这种方式无法动态刷新 tableData 的数据
for (let i = 0; i < tableData.value.length; i++) {
let item = tableData.value[i]
if (item.id === newdata.id) {
item.softNumber = newdata.softNumber
item.checkCode = newdata.checkCode
item.protectId = newdata.protectId
}
}
})
} else {
console.log('当前浏览器不支持使用EventSource接收服务器推送事件!')
}
}
后台代码:
@RestController
@RequestMapping(path = "/sync/see")
@Slf4j(topic = "SseController")
public class SseController {
public static Map sseCache = new ConcurrentHashMap<>();
public static String sseKey(String id) {
return "sse_" + id;
}
/**
* 前端传递标识,生成唯一的消息通道
* 使用的时候需要注意这个MediaType
*/
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) throws IOException {
String sseKey = sseKey(id);
// 超时时间设置为3s,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(0L);
// 设置前端的重试时间为1s
// sseEmitter.send(SseEmitter.event().reconnectTime(600000).data("连接成功"));
sseCache.put(sseKey, sseEmitter);
log.info("add :" + sseKey);
// 链接断开时
sseEmitter.onCompletion(() -> {
sseCache.get(sseKey).complete();
sseCache.remove(sseKey);
});
sseEmitter.onTimeout(() -> {
log.info(sseKey + " :超时");
sseCache.get(sseKey).complete();
sseCache.remove(sseKey);
});
// 连接报错
sseEmitter.onError((throwable) -> {
sseCache.get(sseKey).complete();
sseCache.remove(sseKey);
});
return sseEmitter;
}
}
// 具体的业务中使用
private Boolean pushSse(Order order) {
try {
String sseKey = SseController.sseKey(order.getId());
SseEmitter sseEmitter = SseController.sseCache.get(sseKey);
if (sseEmitter != null) {
log.info("sse 推送数据:{}", hgLedgerEntity);
sseEmitter.send(SseEmitter.event().name("msgtz").data(order));
return true;
}
} catch (IOException e) {
log.error("sse 推送失败:{}", e.getLocalizedMessage(), e);
return false;
}
return false;
}
使用起来的确是比之前的websocket要简单和更加轻便。目前这个上线了也有1-2个月目前在线上使用,还算比较稳定(没用做过大批量的压测,这个可以自行根据项目要求来实现)。