建设网站设备预算永州网站开发公司
2026/6/9 19:22:26 网站建设 项目流程
建设网站设备预算,永州网站开发公司,响应式网站模板下载免费,行业应用服务类app作者#xff1a;张铭辉#xff08;希铭#xff09; 前言#xff1a;WebSocket 的技术演进与时代价值 1.1 什么是 WebSocket#xff1f; WebSocket 是一种基于 TCP 协议的全双工通信协议#xff08;RFC 6455 [ 1] #xff09;#xff0c;通过一次 HTTP 握手即可建立持…作者张铭辉希铭前言WebSocket 的技术演进与时代价值1.1 什么是 WebSocketWebSocket 是一种基于 TCP 协议的全双工通信协议RFC 6455[1]通过一次 HTTP 握手即可建立持久化连接实现客户端与服务端的双向数据传输。以下是一次 WebSocket 通信的示意图[2]可以看到和 HTTP 不同Client 会先向 Server 端基于 HTTP 协议发起一次握手请求Server 返回响应握手成功。在这之后已有的 TCP 连接会被升级为 WebSocket 连接Client 和 Server 之间可以进行全双工通信。TCP 连接会一直持续到其中一侧认为需要关闭且对方同意关闭之时。为了更好理解后续 WebSocket 的全链路可观测方案有必要对 WebSocket 的协议细节进行解读本节剩余内容部分翻译 总结自 WebSocket Protocol[3]。1.1.1 URI 格式与语法和 HTTP 协议族非常类似WebSocket 也有普通协议和他的安全版本用 ws 和 wss 来区分wss 的安全也采用 TLS 协议实现。由于 WebSocket 依赖 HTTP 协议进行握手后续复用原 TCP 连接故 WebSocket 默认的端口也是 80ws和 443wss。URI 整体的格式也和 HTTP 非常类似。1.1.2 启动连接握手基于 HTTP/1.1传统的 WebSocket 握手是一次典型的 HTTP 请求/响应。客户端主动发起一个 WebSocket 握手请求一个特殊的 GET如果服务器支持且允许使用 WebSocket 协议通信则会返回一个 WebSocket 握手响应。WebSocket Connection 就建立起来了。握手请求包含以下头如果服务端接受 WebSocket 协议则发送一个 StatusCode 为 101 的响应HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbKxOo响应包括HTTP/1.1 101 Switching Protocols表示成功从 HTTP 升级到 WebSocket。Upgrade: websocket确认协议升级。Connection: Upgrade表示连接已升级。Sec-WebSocket-Accept一个根据客户端的Sec-WebSocket-Key计算出的值用于验证服务器理解了 WebSocket 握手请求。HTTP/2 与 HTTP/3 升级到 WebSocket 的过程有一些不同但不是本文讨论的关键在此不再赘述欢迎阅读 WebSocket Protocol 原文[3]。1.1.3 WebSocket 消息与数据帧在握手完毕后连接会被升级为 WebSocket 连接此时客户端和服务端可以随时双向发送 WebSocket 消息message用来交换数据和指令。WebSocket 中的最小通信单元是数据帧每个消息有可能由一个或者多个数据帧组成。数据帧根据其用途可以分为以下三种类型文本帧载荷为 UTF-8 编码的文本数据二进制帧载荷为二进制数据控制帧用于传递协议信号如 ping、pong、close 帧等一个数据帧的数据组成如下图所示关于数据帧中每段数据的含义如有兴趣欢迎阅读 WebSocket Protocol 原文[3]。1.1.4 关闭连接握手当客户端或服务端某一方认为连接可以关闭时会向对端发送一个关闭帧是控制帧的一种对端收到关闭帧后会尽快发送另一个关闭帧作为响应。发送完关闭帧后该端不应该再发送任何数据帧。双方交换完关闭帧后TCP 连接将关闭。1.2 为什么用 WebSocket不难看出WebSocket 核心特性体现在长连接保持连接建立后持续存在避免重复握手开销双向数据通道客户端与服务端可随时发送数据帧Text/Binary低延迟特性省去 HTTP 轮询的请求头传输成本消息分帧机制支持超大数据量的分片传输单帧最大 2^64 字节与传统 HTTP 协议对比WebSocket 在通信模式上实现了根本性突破这种协议特性使其成为大数据量下实时通信场景的首选方案。1.3 AI 时代 WebSocket 协议的复兴随着大模型技术的爆发越来越多需要实时交互的场景开始出现智能化赋予了 WebSocket 协议新的活力支持实时对话与交互的智能客服或机器人车载 AI 助手与云端模型实时交互自动翻译、智能识图的 AI 智能眼镜除实时性外WebSocket 为有状态的连接多轮对话的记忆保持、即时打断输出等功能也比传统的 HTTP 更加容易实现。到目前为止主流的大模型提供商大多都提供了 WebSocket 的交互 API 及配套的 SDK帮助用户更好地构建后端服务系统例如OpenAI 支持基于 WebSocket 的 Realtime API[4]百炼大模型服务平台发布基于 WebSocket 的实时多模态交互协议[5]Google Gemini 支持基于 WebSocket 的 Live API[6]WebSocket 在赋能 AI 应用实时性的同时也为应用系统的可观测性带来了很大的挑战。WebSocket 协议高度的灵活性与扩展性注定了它不能像 HTTP 和 gRPC 那样非常方便地做到全链路可观测本文接下来将具体分析 WebSocket 场景下全链路可观测的实现痛点与解决方案。WebSocket 全链路可观测痛点分析2.1 协议灵活性带来的链路追踪困境2.1.1 链路信息注入难对于常规的 HTTP 调用为了保证链路的连通性调用方会在 HTTP headers 中额外添加一组用于承载链路上下文的键值对确保被调用方在解析协议时能够正确地还原调用方的链路上下文进而保证上下文可以被继续传递下去。图示是使用 W3C 链路追踪协议[7]时链路上下文的 header 的一个具体示例而在 1.1.3 节我们了解到一个 WebSocket 数据帧其实仅由数字节的控制位和数据载荷构成。除建立连接时握手以外没有其他的机会传输 header 这些元数据。因此传统 OpenTelemetry 的 W3C 链路上下文无法直接植入每个数据帧中。而在实际应用场景中对于一次 WebSocket 连接往往并不代表仅一次 WebSocket 调用仅依赖建立连接时的 HTTP 请求与响应是远远不够的。同时这也牵扯出第二个困难——Span 作用域界定模糊。2.1.2 Span 作用域界定模糊在可观测领域我们一般把调用链路上一次关键的操作称为一条 Span跨度[8]一条调用链一般由一组树状结构的 Span 组成。在可观测前端的帮助下我们可以把同属于一条调用链的 Span 召回并根据父子关系也就是调用关系以及发生时间渲染为下图所示的瀑布图以此来帮助我们了解一条链路发生的所有关键操作以及调用关系。然而在 WebSocket 场景下操作粒度的定义可以非常灵活。如图所示一个 Span 有可能对应一次 WebSocket 连接从开始到结束的全过程也有可能对应每一次消息的收发甚至也可以对应每一次数据帧的传递过程。对 Span 粒度定义的高度灵活也导致了链路上下文在注入与管理上也会有非常大的变化这也增大了业务上落地的难度。2.1.3 链路上下文的反向扩散问题虽然我们根据 WebSocket 连接的发起方与接收方将两端分为了 Client 和 Server但实际业务的处理过程是高度灵活的双向流可能存在由 Server 侧发起请求Client 进行处理的情况。例如允许 Client 主动与 Server 建立连接并将自身服务注册给 Server 端由 Server 发送消息来对 Client 进行回调。对于这种交互方式而言消息生产方调用方是 Server消费方被调用方是 Client因此链路上下文应该由 Server 注入到消息中由 Client 还原并进一步传递。2.2 异步调用引发的断链危机在 WebSocket 应用中为了提高连接利用率两端也常用异步的方式来解耦消息接收过程与处理过程以下是一个典型的异步消息处理架构。在这个过程中消息有可能会直接被提交到线程池也有可能存放在一个进程内的队列甚至直接写入 Redis 等外部存储。这种灵活多变的异步方式也给链路上下文的进程内透传带来了困难非常容易出现断链问题。基于 LoongSuite 的全链路观测最佳实践3.1 方案基本原理通过上两节的讨论我们可以得到两个基本结论WebSocket 的用法相当灵活链路追踪的实现很大程度上取决于业务实现需要开发者自主实现一些扩展来保证链路完整性高频业务场景缺少一些最佳落地范式导致自主实现链路追踪困难此外由于 WebSocket 链路上也难免存在一些 NoSQL、HTTP 等其他类型的调用依然需要无侵入探针来保证各种调用的串联这就要求无侵入探针与自定义扩展产生的链路上下文可以很好地互通。LoongSuite 无侵入探针提供的基于 OpenTelemetry API 的扩展机制就是解决这些问题的最佳手段[9]。3.1.1 OpenTelemetry API 与 LoongSuite 探针工作原理OpenTelemetry API 是 OpenTelemetry 社区定义的可观测数据采集标准的重要组件之一[10]它定义了一整套可观测领域使用的 API 行为标准和功能说明比如可观测数据创建、上下文管理/透传、数据上报等逻辑并为许多语言提供了配套的 SDK 实现。使用者可以基于 API 与 SDK 比较容易地实现上下文的管理与透传。以下是使用 Tracer API 定义 Span 的示意private int doWork() { // 创建 span Span doWorkSpan tracer.spanBuilder(doWork).startSpan(); // 激活 span 所在上下文 try (Scope scope doWorkSpan.makeCurrent()) { int result 0; for (int i 0; i 10; i) { result i; } return result; } finally { // 结束 span doWorkSpan.end(); } }LoongSuite 探针是阿里云可观测团队基于 OpenTelemetry 探针构建的面向 AI 应用的开源的进程内可观测采集组件。对于热门的开源组件例如 LangChain、OpenAI SDK、Tomcat 等LoongSuite 探针提供了丰富的预定义插桩实现。使用者不再需要基于 OpenTelemetry API 进行开发只需要修改编译或运行时命令探针就能把可观测数据创建、上下文管理/透传、数据上报等关键逻辑自动完成从而达成无侵入可观测的目标。LoongSuite 探针可以满足生产应用绝大多数场景下的可观测需求但对于一些高度自定义的场景如消息系统中的复杂消费过程、部分 MQTT 场景以及 WebSocket 通信场景使用 OpenTelemetry API/SDK 添加自定义埋点则是弥补无侵入探针监控盲区的最优方案。3.1.2 LoongSuite 探针与自定义扩展交互示意对于 Java、Golang 这类包管理相对严格需要明确指定版本的语言来说探针与应用可能会存在版本不一致的依赖比如 Jackson、gRPC 和 OpenTelemetry API/SDK 等等。为了避免依赖冲突常采用 shadow 的方式进行依赖隔离。但这也会导致用户在使用 OpenTelemetry API 和 SDK 自主埋点的时候产生的链路上下文并不能与探针内互通进而导致调用链断裂。OpenTelemetry 和 LoongSuite 探针同样采用代码增强机制保证了链路上下文的共享具体整体示意如下探针和应用共用一套 APIAPI 自身保证向前兼容探针初始化时会将初始化好的实例对象注册到 GlobalHolder应用中自定义埋点时直接从 API 中的 GlobalHolder 就可以获取到探针的实例对象对于 SDK 中定义的一些方法和静态的 API如 Context、Baggage 等通过代码增强的方式跳过这些函数原本的调用转而使用探针中对应的实现通过以上机制LoongSuite 探针可以很好地和 OpenTelemetry API/SDK 创建的 Span 串联在一起保证了链路的完整性。3.2 WebSocket 分布式链路追踪最佳实践了解了这几个组件关键的问题是我的应用应该怎么添加这些自定义的埋点呢在 WebSocket 全链路的实现中需要先根据业务诉求明确几个问题会话粒度问题一次 WebSocket 连接对应一条 Trace 还是多条 Trace对应一条 Trace一次 WebSocket 连接是为了完成一系列相关性强的操作且持续时间一般仅在数分钟对应多条 Trace一次 WebSocket 连接会在建立完成后留存下来持续复用持续时间可能持续几小时。调用建模问题WebSocket 内部的数据传输过程能否建模为离散的请求与响应如果连接建立后只用于双方传递数据则不需要为每条消息专门创建 Span一个 Span 的生命周期应该对应双方传递消息的完整过程如果连接建立后一方发送消息另一方处理消息并返回响应则每组这类调用都可以创建一对父子 Span对应的数据结构需要允许承载序列化后的链路上下文。应对以上几个不同场景自定义埋点的实现推荐也会有所差异接下来将分别展开介绍。3.2.1 引入 OpenTelemetry API 依赖探针对 API 的兼容为向前兼容对于最新版本的 API 适配可能比较有限生产环境中 API 包的版本不需要过新基本 API 足够使用即可。对于 Java 语言建议在 pom.xml 中引入。API 文档https://javadoc.io/doc/io.opentelemetry/opentelemetry-api/1.28.0/index.htmldependency groupIdio.opentelemetry/groupId artifactIdopentelemetry-api/artifactId /dependency获取探针注入的全局实例openTelemetry GlobalOpenTelemetry.get(); tracer openTelemetry.getTracer(websocket-example, 1.0.0);对于 Golang 语言可以执行 go get 命令获取包。API 文档https://pkg.go.dev/go.opentelemetry.io/otelv1.28.0go get go.opentelemetry.io/otel获取探针注入的全局实例tracer : otel.GetTracerProvider().Tracer(websocket-example)对于 Python 语言可以通过 pip install 获取。API 文档https://opentelemetry-python.readthedocs.io/en/latest/pip install opentelemetry-api获取探针注入的全局实例from opentelemetry import trace tracer trace.get_tracer(__name__)3.2.2 会话粒度问题——创建 WebSocket 连接维度的 Trace实现建议WebSocket 在建立连接时会基于 HTTP 请求发起握手复用该 Trace 上下文作为整次 WebSocket 连接中子操作的上下文。以下是以一整个 WebSocket 连接为一条 Trace 的实现基本示意图所有的请求与数据传递都作为子 Span 挂靠在一条 Trace 下面。因此这种实现更适合 WebSocket 连接按需连接并会及时关闭的场景。Client 侧代码实现以 Java 原生提供的 WebSocket 库为例public static void main(String[] args) throws Exception { // 1. 创建连接级别的 Trace在连接前创建以便在握手时传递 TraceContext Span connectionSpan tracer.spanBuilder(websocket.connection) .setAttribute(websocket.endpoint, /native/ws) .setAttribute(websocket.destination, ws://localhost:18081) .setAttribute(websocket.connection.type, client) .startSpan(); // 2. 将当前 Span 激活在线程内的上下文中标记 Span 的作用域为从连接开始到连接关闭 try (Scope scope connectionSpan.makeCurrent()) { WebSocketContainer container ContainerProvider.getWebSocketContainer(); // 创建 WebSocket Client NativeWebSocketClient client new NativeWebSocketClient(); // 使用 Endpoint 方式连接 Session session container.connectToServer( new jakarta.websocket.Endpoint() { Override public void onOpen(Session session, EndpointConfig config) { client.onOpen(session); // 注册消息处理器使用匿名内部类而不是 lambda避免泛型类型推断问题 session.addMessageHandler(new MessageHandler.WholeString() { Override public void onMessage(String message) { client.onMessage(message); } }); } Override public void onClose(Session session, CloseReason closeReason) { client.onClose(); } Override public void onError(Session session, Throwable thr) { // 记录错误到当前 Span connectionSpan.recordException(thr); client.onError(thr); } }, // 3. 发起握手时在请求头中携带当前的上下文 createHeaderWithUserProperties(), URI.create(ws://localhost:18081/native/ws)); client.session session; client.sessionId session.getId(); log.info(客户端已启动输入消息发送给服务器输入 exit 退出:); // 从控制台读取输入 BufferedReader reader new BufferedReader(new InputStreamReader(System.in)); String line; while ((line reader.readLine()) ! null !line.equals(exit)) { if (!line.trim().isEmpty()) { // 4. 向 Server 发送消息 client.sendMessage(line); } } // 关闭连接 client.close(); log.info(客户端已退出); } catch (Exception e) { // 如果出现错误记录到 span 中 connectionSpan.recordException(e); log.error(客户端启动失败, e); } finally { // 5. 结束 span connectionSpan.end(); } // 等待 span 异步上报实际业务中无需保留 Thread.sleep(5000L); } private static ClientEndpointConfig createHeaderWithUserProperties() { // 创建 ClientEndpointConfig用于自定义握手请求头 ClientEndpointConfig.Builder configBuilder ClientEndpointConfig.Builder.create(); // 3.1. 获取当前的 TraceContext准备 HTTP headers final MapString, ListString headersMap new HashMap(); Context currentContext Context.current(); // 3.2. 通过全局实例的 ContextPropagators 注入 TraceContext 到 headers openTelemetry.getPropagators().getTextMapPropagator() .inject(currentContext, headersMap, (carrier, key, value) - carrier.put(key, List.of(value))); // 3.3. 设置 Configurator 来在握手时添加 headers configBuilder.configurator(new ClientEndpointConfig.Configurator() { Override public void beforeRequest(MapString, ListString headers) { headers.putAll(headersMap); } }); return configBuilder.build(); }Server 侧代码实现以 Java 原生提供的 WebSocket 库为例ServerEndpoint(value /native/ws, configurator NativeWebSocketServer.TraceContextConfigurator.class) public class NativeWebSocketServer { // 按照 session 维度管理来自 Client 的上下文 private static final MapString, Context connectionTraceContexts new ConcurrentHashMap(); // 1. 定义配置类用于在握手时提取 TraceContext public static class TraceContextConfigurator extends ServerEndpointConfig.Configurator { private static final TextMapGetterMapString, ListString headerGetter new TextMapGetterMapString, ListString() { Override public IterableString keys(MapString, ListString carrier) { return carrier.keySet(); } Override public String get(MapString, ListString carrier, String key) { ListString values carrier.get(key); return values ! null !values.isEmpty() ? values.get(0) : null; } }; Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { // 从 HTTP headers 中提取 TraceContext MapString, ListString headers request.getHeaders(); Context extractedContext openTelemetry.getPropagators() .getTextMapPropagator() .extract(Context.current(), headers, headerGetter); // 2. 将 TraceContext 存储到 userProperties在 onOpen 时提取 sec.getUserProperties().put(traceContext, extractedContext); } } OnOpen public void onOpen(Session session, EndpointConfig config) { String sessionId session.getId(); sessions.put(sessionId, session); // 3. 从 config 的 userProperties 中提取 TraceContext在 Configurator 中设置 Context parentContext Context.current(); Object traceContextObj config.getUserProperties().get(traceContext); if (traceContextObj instanceof Context) { parentContext (Context) traceContextObj; } // 4. 将 Client 链路上下文保存下来在需要创建子 span 时获取即可 connectionTraceContexts.put(sessionId, parentContext); log.info(客户端连接: sessionId{}, 当前连接数{}, 已从 Client TraceContext 创建子 Span, sessionId, sessions.size()); sendMessage(session, 欢迎连接您的会话ID: sessionId); } OnClose public void onClose(Session session) { String sessionId session.getId(); sessions.remove(sessionId); connectionTraceContexts.remove(sessionId); log.info(客户端断开: sessionId{}, 剩余连接数{}, Trace已结束, sessionId, sessions.size()); } }3.2.3 会话粒度问题——使用会话 ID 关联不同的 Trace实现建议复用 WebSocket 的 Session ID 作为每条 Span 的属性在必要时也可以按照属性查询来自于同一个 WebSocket 会话的所有 Trace。以下是使用会话 ID 关联不同 Trace 的实现基本示意图每次 Client 侧或 Server 侧发起的主动请求都是一条单独的 Trace彼此之间并不会在 Trace 瀑布图中呈现关系但可以通过会话 ID 这个属性进行过滤和查询。因此这种实现更适合 WebSocket 连接时间很长且可能存在复用的场景。使用会话 ID 关联不同 Trace 实现方案相对简单大多数框架都能直接获取到当前所在会话的 ID调用 setAttribute API 写入 Span 即可以下是一个基本示例public void sendMessage(Session session, String message) { Span span tracer.spanBuilder(Client send message).startSpan(); // 向 span 中写入 session id span.setAttribute(websocket.session.id, session.getId()); try (Scope scope span.makeCurrent()) { doSendMessage(message); } finally { span.end(); } }3.2.4 调用建模问题——存在明显调用关系实现建议仿照 Messaging 系统的链路追踪逻辑消息的发送者为调用方消息的接受者为被调用方分别创建 Span。调用方 Span 作为被调用方的父级。涉及多轮消息发送只要意图为流式传输视为一次调用行为。链路效果如下图所示这种情形是生产应用中最普遍碰到的情况要保证 Client 链路和 Server 链路的串联需要调用方在发送消息时保证消息中有一个类似 headers 的预留字段用于传递链路上下文该字段需要被 Client 和 Server 同时支持解析。许多生产服务都预留了这类字段例如语音合成CosyVoice WebSocket API#指令客户端→服务端https://help.aliyun.com/zh/model-studio/cosyvoice-websocket-api[#b0100c3591yqq调用方代码实现public void sendMessage(String message) { // 0. 可选如果为一个 Connection 创建了 span需要在此处执行 span.makeCurrent() // 1. 创建 header 字段 HashMapString, String headers new HashMap(); // 2. 创建 span 并写入必要的属性 Span span tracer.spanBuilder(Client send message).startSpan(); span.setAttribute(websocket.session.id, session.getId()); try (Scope scope span.makeCurrent()) { // 3. 调用 OTel API将上下文注入到 header 中 openTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, (headersMap, key, value) - headersMap.put(key, value)); // 4. 发送消息 // 如果是流式发送消息则可以仅在第一条消息中添加 header调用双方需要保证 span 创建的幂等性即整个流式发送期间仅创建一个 span sendMessage(message, headers); } finally { span.end(); } }被调用方代码实现public void onMessage(String message, Session session) { String sessionId session.getId(); try { // 1. 解析消息 MessageWithHeaders msgWithHeaders objectMapper.readValue(message, MessageWithHeaders.class); MapString, String headers msgWithHeaders.getHeaders(); // 2. 从消息中提取链路上下文 Context remoteContext openTelemetry.getPropagators().getTextMapPropagator() .extract(Context.current(), headers, new TextMapGetterMapString, String() { Override public IterableString keys(MapString, String headersMap) { return headersMap.keySet(); } Override public String get(MapString, String headersMap, String key) { return headersMap.getOrDefault(key, null); } }); // 3. 以提取出来的上下文作为父级创建 Server span Span serverSpan tracer.spanBuilder(Server handle message) .setParent(remoteContext).startSpan(); try (Scope scope serverSpan.makeCurrent()) { // 4. 处理消息/流式返回响应 String body msgWithHeaders.getBody(); log.info(收到消息 [{}] [headers{}]: {}, sessionId, headers, body); // 处理消息带 headers handleMessage(session, body, headers); } catch (Exception e) { serverSpan.recordException(e); } finally { serverSpan.end(); } } catch (Exception e) { log.error(消息接收失败 [{}]: {}, sessionId, message, e); } }3.2.5 调用建模问题——无显式调用关系仅传输数据实现建议数据发送方创建 Span作为整个 WebSocket 连接 Span如有的子 Span双方 Span 不维持父子关系。链路效果如下图所示数据发送方代码示例public void streamingSendMessages(Session session) { // 0. 可选如果为一个 Connection 创建了 span需要在此处执行 span.makeCurrent() Context context connectionTraceContexts.containsKey(session.getId()) ? connectionTraceContexts.get(session.getId()) : Context.current(); try (Scope pScope context.makeCurrent()) { // 1. 创建 Span Span span tracer.spanBuilder(Client send message).setParent(context).startSpan(); span.setAttribute(websocket.session.id, session.getId()); try (Scope scope span.makeCurrent()) { // 2. 发送消息 while (messageQueue ! null messageQueue.containsKey(session.getId())) { ListMessage messages messageQueue.get(session.getId()); messages.forEach(message - sendMessage(session, message)); Thread.sleep(200L); } } finally { span.end(); } } }3.2.6 异步透传问题——进程内异步上下文管理一般地在 WebSocket 应用中的异步存在两种实现基于线程池的异步调度每当接收到消息都创建一个 Runnable 或 Callable或者创建一个 Golang/Python 协程基于进程内队列进行异步通信如 Java 的 Deque、Golang 的 Channel、Python 的 Generator 等每当接收到消息都入队由统一的 Worker 进行处理对于第一种情形LoongSuite 探针已经支持上下文的自动透传public void onMessage(String message) { Span messageSpan tracer.spanBuilder(Server handle message).startSpan(); // 把当前 span 激活并放到 ThreadLocal 中 try (Scope scope messageSpan.makeCurrent()) { // 异步调用消息处理流程 // 探针会在 Runnable 任务被创建时将 span 所在上下文自动传递到 doHandleMessage 方法内部 // doHandleMessage 方法实际执行时上下文会被自动复原 workerExecutor.execute(() - doHandleMessage(message)); } }对于第二种情形需要使用者主动进行上下文透传和还原public void onMessage(String message) { Span messageSpan tracer.spanBuilder(Server handle message).startSpan(); // 把当前 span 激活并放到 ThreadLocal 中 try (Scope scope messageSpan.makeCurrent()) { // 手动将 TraceContext 与 Message 关联也可以通过 Map message.setTracingContext(Context.current()); // 消息入队 messageQueue.offer(message); } } public void pollAndHandleMessage() { while (true) { if (!messageQueue.isEmpty) { Message message messageQueue.poll(); // 消息出队后获取 TraceContext 与 Span Context tracingContext message.getTracingContext(); Span span Span.fromContext(tracingContext); // 将 TraceContext 重新激活并放到 ThreadLocal 中 try (Scope scope tracingContext.makeCurrent()) { handleMessage(message); } finally { // 消息处理结束关闭 span span.end(); } } Thread.sleep(100L); } }3.3 WebSocket 中流式传输的关键业务指标在 3.2 节中我们可以看到在流式传输的场景下我们会把一次完整的请求记录为一条 Span以防止过多 Span 导致性能瓶颈。但这也会抹去流式传输中的一些关键性能信息——一次消息处理中某些个别的数据包处理时长过长引发整个响应过程偏慢。实际生产中这些指标也能很大程度上帮我们衡量应用的健康度与评估某些链路的问题所在以下是几个常用的业务指标以下是计算这些指标的一个简单的工具类实现关于详细的使用方式欢迎查看示例代码https://github.com/Cirilla-zmh/asr-demo/blob/main/asr-service/src/main/java/com/example/asr/ws/AsrWebSocketHandler.java工具类定义public class WebSocketPerformanceMeasure { private static final Logger log LoggerFactory.getLogger(WebSocketPerformanceMeasure.class); private static final long UNINITIALIZED -1L; private Long startTime; private Long firstChunkTime; private AtomicInteger chunkCounts; private AtomicLong totalInterval; private Long lastChunkTime; public static WebSocketPerformanceMeasure create() { WebSocketPerformanceMeasure measure new WebSocketPerformanceMeasure(); measure.startTime System.currentTimeMillis(); measure.firstChunkTime UNINITIALIZED; measure.chunkCounts new AtomicInteger(0); measure.totalInterval new AtomicLong(0); measure.lastChunkTime UNINITIALIZED; return measure; } /** * 开始测量如果尚未开始 */ public void start() { if (startTime null) { startTime System.currentTimeMillis(); firstChunkTime UNINITIALIZED; chunkCounts new AtomicInteger(0); totalInterval new AtomicLong(0); lastChunkTime UNINITIALIZED; } } /** * 记录一个 chunk 的到达 * 自动计算 time_to_first_chunk 和更新间隔统计 * * return 如果是第一个 chunk返回 time_to_first_chunk毫秒否则返回 null */ public Long recordChunk() { if (startTime null) { log.warn(Performance measure not started, calling start() automatically); start(); } long currentTime System.currentTimeMillis(); chunkCounts.incrementAndGet(); // 记录第一个 chunk 的时间 Long timeToFirstChunk null; if (firstChunkTime UNINITIALIZED) { timeToFirstChunk currentTime - startTime; firstChunkTime currentTime; log.debug(First chunk recorded, time_to_first_chunk: {}ms, timeToFirstChunk); } // 计算 chunk 间隔从第二个 chunk 开始 if (lastChunkTime ! UNINITIALIZED) { long interval currentTime - lastChunkTime; totalInterval.addAndGet(interval); } lastChunkTime currentTime; return timeToFirstChunk; } /** * 获取 time_to_first_chunk毫秒 * 如果第一个 chunk 尚未到达返回 null */ public Long getTimeToFirstChunk() { if (firstChunkTime UNINITIALIZED || startTime null) { return null; } return firstChunkTime - startTime; } /** * 获取 time_to_last_chunk毫秒 * 需要保证在 chunk 完全到达后调用 * 如果第一个 chunk 尚未到达返回 null */ public Long getTimeToLastChunk() { if (lastChunkTime UNINITIALIZED || startTime null) { return null; } return lastChunkTime - startTime; } /** * 获取平均 chunk 间隔毫秒 * 如果 chunk 数量少于 2返回 null */ public Long getAverageInterval() { int count chunkCounts.get(); if (count 2 || totalInterval null) { return null; } return totalInterval.get() / (count - 1); } /** * 获取 chunk 总数 */ public int getChunkCount() { return chunkCounts ! null ? chunkCounts.get() : 0; } }典型场景实践AI 语音对话系统本节我们将结合一个生产中常见的业务系统来简要介绍本文方案在该场景下的具体实践。相关 demo 代码已开源欢迎移步 https://github.com/Cirilla-zmh/asr-demo 实践。4.1 系统架构解析以下是系统整体架构的简单示意设备端 → WebSocket → ASR → LLM(意图识别) ↓ ├─ 闲聊 → LLM(生成) → TTS → 设备端 └─ 下单 → MCP(下单) → LLM(生成) → TTS → 设备端调用时序图4.2 接入 LoongSuite 探针在本示例项目中预留了探针挂载的环境变量通过挂载 LoongSuite 探针我们可以将 ASR demo 服务的可观测数据接入到 ARMS 控制台。以下是具体步骤下载 LoongSuite 商业化探针并解压为保证 LLM 链路的完整性建议下载 4.6.0 及更高版本探针。wget http://arms-apm-cn-hangzhou.oss-cn-hangzhou.aliyuncs.com/4.6.0/AliyunJavaAgent.zip -O AliyunJavaAgent.zip unzip AliyunJavaAgent.zip参照 README在启动应用时添加探针挂载相关参数相关参数可以参考手动安装探针[1****1]文档获取。export JAVA_AGENT_OPTIONS-javaagent:/path/to/4.6.0/AliyunJavaAgent/aliyun-java-agent.jar -Darms.licenseKey${your_license_key} -Darms.appNamewebsocket-demo -Daliyun.javaagent.regionIdcn-hangzhou -Darms.workspace${your_cms_workspace} ./start.sh你也可以接入 LoongSuite 开源版本探针或者 OpenTelemetry 探针并可观测数据上报到开源的可观测平台受限于篇幅在此不再展开欢迎移步 https://github.com/alibaba/loongsuite-java-agent 获取更多信息。4.3 系统页面与可观测效果示意以下是部署后的应用系统页面类似现在的智能机器人 IM 系统用于替代设备端的左右在发起对话后统计上来的链路如图所示。可以在一条链路中清晰看到每个环节的持续时间在 WebSocket 对应 span 中能够看到统计到的首包延迟与平均输出间隔等指标帮助分析整体业务性能结语未来展望WebSocket 领域的全链路可观测性一直以来都是让许多企业开发和运维人员头痛的问题。可观测性的解决方案并不能一蹴而就需要与用户进行持续深度共建与配合。很兴奋能看到公牛在与可观测团队共同完成了该方案在生产上的实际落地[12]也为我们方案的完善提供了非常宝贵的经验。未来我们将与更多的用户与开源开发者共建持续补充和建设更完善、更易用的 WebSocket 可观测方案。欢迎大家关注 LoongSuite 社区以获取相关方案的最新进展“LoongCollector(原iLogtail)社区”钉钉群号 35576244“LoongSuite Go Agent开源交流群”钉钉群号 102565007776“LoongSuite Python SIG”钉钉群号 101925034286“LoongSuite Python SIG”钉钉群号 101925034286参考文章[1] RFC 6455https://datatracker.ietf.org/doc/html/rfc6455[2] The Road to WebSocketshttps://websocket.org/guides/road-to-websockets/[3] WebSocket Protocolhttps://websocket.org/guides/websocket-protocol/[4] OpenAI Realtime APIhttps://platform.openai.com/docs/guides/realtime-websocket[5] 实时多模态交互协议WebSockethttps://help.aliyun.com/zh/model-studio/multimodal-interaction-protocol[6] Live API - WebSockets API referencehttps://ai.google.dev/api/live[7] Trace Contexthttps://www.w3.org/TR/trace-context/#abstract[8] Distributed Tracing 基本介绍https://observability.cn/project/opentelemetry/rp8k7gzvtys07zsb/[9] 通过OpenTelemetry Java SDK为调用链增加自定义埋点https://help.aliyun.com/zh/arms/application-monitoring/use-cases/use-opentelemetry-sdk-for-java-to-manually-instrument-applications?spma2c4g.11186623.help-menu-search-34364.d_5[10] OpenTelemetry Specification Overviewhttps://opentelemetry.io/docs/specs/otel/overview/[11] 手动安装探针https://help.aliyun.com/zh/arms/application-monitoring/user-guide/manually-install-arms-agent-for-java-applications[12] 《让每次语音唤醒都可靠公牛沐光重构可观测体系》

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询