2026/6/10 3:52:04
网站建设
项目流程
百度竞价网站,WordPress数字商城模板,两学一做网站进不去,网站的做网站公司哪家好AI 流式响应实战#xff1a;从同步等待到实时推送
在 IM 系统中集成 AI 时#xff0c;流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应#xff0c;从同步等待到实时推送。
一、为什么需要流式响应#xff1f;
同步等待的问题
传统同步方式的问题#xff1a…AI 流式响应实战从同步等待到实时推送在 IM 系统中集成 AI 时流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应从同步等待到实时推送。一、为什么需要流式响应同步等待的问题传统同步方式的问题// ❌ 同步方式用户需要等待AI完整响应StringaiResponseaiService.getAnswer(userMessage);// 如果AI响应需要10秒用户就要等待10秒sendMessage(aiResponse);问题等待时间长AI 生成可能需要 5-10 秒用户长时间等待体验差无法看到生成过程感觉卡顿资源占用长时间占用连线和线程流式响应的优势实时反馈逐字显示用户可立即看到内容体验更好类似 ChatGPT 的打字机效果资源利用边生成边推送不阻塞对比方式首字延迟完整响应时间用户体验同步等待10秒10秒差流式响应1-2秒10秒好回调函数模式的设计统一接口设计定义统一的 AI 服务接口publicinterfaceIAiService{/** * 流式调用AI服务 * param userMsg 用户消息 * param consumer 回调函数处理每个数据块 */voidstreamCallWithMessage(StringuserMsg,ConsumerAIResultconsumer);/** * 多轮对话 */defaultvoidchat(Stringmessage,ListMessageRecordmessages,ConsumerAIResultconsumer){}}关键点使用ConsumerAIResult作为回调每个数据块通过回调处理支持多轮对话AIResult 设计publicinterfaceAIResult{StringgetContent();// 当前数据块的内容intgetStatus();// 状态WAIT(0-进行中)、END(1-结束)、FAIL(2-失败)}状态枚举publicenumAIMessageStatusEnum{WAIT(0,wait),// 流式响应进行中END(1,end),// 流式响应结束FAIL(2,fail);// 流式响应失败}三、WebSocket 实时推送的实现整体流程用户发送消息 ↓RocketMQ异步处理 ↓ AI服务流式调用 ↓ 回调函数处理每个数据块 ↓ 封装为 STREAM_MSG_NOTIFY ↓WebSocket实时推送代码实现RocketMQ 消费者接收消息ComponentpublicclassAIHelperReceiverimplementsInitializingBean{ResourceprivateIAiServiceaiService;ResourceprivateGlobalChannelHolderglobalChannelHolder;publicvoidinitConsumer(){defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently)(messageExtList,context)-{for(MessageExtmessageExt:messageExtList){MessageDtomessageDtoJSONObject.parseObject(msgStr,MessageDto.class);// 提交到独立线程池不阻塞MQ消费线程threadPoolUtil.submitTask(()-{StringBuilderfullContentnewStringBuilder();try{// 流式调用AI服务aiService.streamCallWithMessage(messageDto.getMessageContent(),aiResult-{// 回调函数处理每个数据块AIMessageDtoaiMessageDtonewAIMessageDto();aiMessageDto.setMessageId(messageDto.getMessageId());aiMessageDto.setRoomId(messageDto.getRoomId());aiMessageDto.setContent(aiResult.getContent());aiMessageDto.setStatus(aiResult.getStatus());// 实时推送globalChannelHolder.sendBroadcastAIMessage(aiMessageDto,AQBusinessConstant.AI_HELPER_ID);// 累积完整内容fullContent.append(aiResult.getContent());});}catch(Exceptione){// 错误处理LOGGER.error(AI助手处理消息失败,e);AIMessageDtofailMessagenewAIMessageDto();failMessage.setStatus(AIMessageStatusEnum.FAIL.getCode());globalChannelHolder.sendBroadcastAIMessage(failMessage,AQBusinessConstant.AI_HELPER_ID);}finally{// 流式响应结束后持久化完整消息MessageDtostoreMessagebuildStoreMessage(messageDto,fullContent);messageService.saveMessage(storeMessage);}});}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});}}封装流式消息并推送ComponentpublicclassGlobalChannelHolder{publicvoidsendBroadcastAIMessage(AIMessageDtoaiMessageDto,StringaiId){// 1. 获取AI助手信息UserGlobalInfoDtouserInfouserHolder.getUserInfo(aiId);// 2. 构建流式消息AQChatMsgProtocol.StreamMsgNotifystreamMsgNotifyAQChatMsgProtocol.StreamMsgNotify.newBuilder().setUser(userBuilder).setMsgId(aiMessageDto.getMessageId()).setRoomId(aiMessageDto.getRoomId()).setContent(aiMessageDto.getContent()null?:aiMessageDto.getContent()).setStreamType(aiMessageDto.getStatus())// 0-进行中1-结束2-失败.build();// 3. 广播到房间内所有用户messageBroadcaster.broadcast(aiMessageDto.getRoomId(),streamMsgNotify);}}消息广播ComponentpublicclassMessageBroadcaster{privatefinalMapString,ChannelGroupchannelGroupMapnewConcurrentHashMap();publicTextendsGeneratedMessageV3voidbroadcast(StringroomId,Tmsg){ChannelGroupchannelGroupchannelGroupMap.get(roomId);if(channelGroup!null){// 批量发送高效channelGroup.writeAndFlush(msg);}}}四、流式消息的封装STREM_MSG_NOTIFYProtobuf 消息定义// 流式消息通知messageStreamMsgNotify{string roomId1;// 房间IDstring msgId2;// 消息IDUseruser3;// AI助手信息int32 streamType4;// 流类型0-进行中1-结束2-失败string content5;// 当前数据块内容}消息类型enumMsgCommand{// ...STREAM_MSG_NOTIFY32;// 流式消息通知// ...}消息状态流转用户发送消息 ↓ STREAM_MSG_NOTIFY(streamType0,content你)← 第一个数据块 ↓ STREAM_MSG_NOTIFY(streamType0,content好)← 第二个数据块 ↓ STREAM_MSG_NOTIFY(streamType0,content)← 第三个数据块 ↓...↓ STREAM_MSG_NOTIFY(streamType1,content)← 结束标志前端处理示例伪代码websocket.onmessage(event){constmessageJSON.parse(event.data);if(message.commandSTREAM_MSG_NOTIFY){if(message.streamType0){// 进行中追加内容appendContent(message.content);}elseif(message.streamType1){// 结束显示完整消息showCompleteMessage();}elseif(message.streamType2){// 失败显示错误提示showErrorMessage();}}};五、多 AI 平台集成的统一接口设计问题不同 AI 平台的 API 不同阿里百炼使用FlowableGenerationResultGitee AI使用MessageHandlerString其他平台可能有不同的流式接口解决方案统一接口 适配器模式统一接口定义publicinterfaceIAiService{/** * 流式调用统一使用 ConsumerAIResult 回调 */voidstreamCallWithMessage(StringuserMsg,ConsumerAIResultconsumer);}阿里百炼实现ServicePrimarypublicclassQWAiServiceimplementsIAiService{OverridepublicvoidstreamCallWithMessage(StringuserMsg,ConsumerAIResultconsumer){GenerationgennewGeneration();MessagemessageMessage.builder().role(Role.USER.getValue()).content(userMsg).build();// 调用阿里百炼流式APIFlowableGenerationResultresultgen.streamCall(generationParam);// 转换为统一格式result.blockingForEach(r-{Stringcontentr.getOutput().getChoices().get(0).getMessage().getContent();StringfinishReasonr.getOutput().getChoices().get(0).getFinishReason();QWResultqwResultnewQWResult();qwResult.setContent(content);// 判断是否结束qwResult.setStatus(stop.equals(finishReason)?AIMessageStatusEnum.END.getCode():AIMessageStatusEnum.WAIT.getCode());// 调用统一回调consumer.accept(qwResult);});}}Gitee AI 实现ServicepublicclassGiteeAIServiceimplementsIAiService{ResourceprivateGiteeAIClientgiteeAIClient;OverridepublicvoidstreamCallWithMessage(StringuserMsg,ConsumerAIResultconsumer){// 调用Gitee AI流式APIgiteeAIClient.streamChat(message,messageList,data-{JSONObjectparseJSONObject.parseObject(data);JSONArraychoicesparse.getJSONArray(choices);JSONObjectchoicesInchoices.getJSONObject(0);StringfinishReasonchoicesIn.getString(finish_reason);if(finishReason!nullfinishReason.equals(stop)){// 结束GiteeResultgiteeResultnewGiteeResult();giteeResult.setStatus(AIMessageStatusEnum.END.getCode());consumer.accept(giteeResult);return;}// 进行中JSONObjectdeltachoicesIn.getJSONObject(delta);Stringcontentdelta.getString(content);if(content!null!content.isEmpty()){GiteeResultgiteeResultnewGiteeResult();giteeResult.setContent(content);giteeResult.setStatus(AIMessageStatusEnum.WAIT.getCode());consumer.accept(giteeResult);}});}}统一接口的优势业务代码无需关心具体平台易于扩展新平台便于切换平台通过Prime注解使用示例// 业务代码只需要调用统一接口ResourceprivateIAiServiceaiService;// Spring会自动注入Primary的实现aiService.streamCallWithMessage(userMsg,aiResult-{// 处理流式响应不关心是哪个AI平台sendBroadcastAIMessage(aiResult);});六、性能优化独立线程池// AI处理在独立线程池中执行不阻塞MQ消费线程threadPoolUtil.submitTask(()-{aiService.streamCallWithMessage(userMsg,consumer);});优势不阻塞 RocketMQ 消费线程AI 处理失败不影响其他消息可控制并发数异步处理// 消息发送到RocketMQ异步处理mqSendingAgent.aiHelper(messageDto);// 立即返回不等待AI响应优势用户发送消息后立即返回AI 响应通过 WebSocket 实时推送提升响应速度七、总结关键点流式响应使用回调函数模式实时推送每个数据块统一接口IAiservice统一不同 AI 平台的接口WebSocket 推送通过STREAM_MSG_NOTIFY实时推送异步处理使用 RocketMQ 独立线程池不阻塞主流程优化效果指标同步流式响应提升首字延迟10秒1-2秒5-10倍用户体验差好显著提升资源占用高低降低经验总结流式响应能显著提升性能统一接口便于多平台集成异步处理避免阻塞回调函数模式适合流式场景通过以上实现AQChat 实现了类似 ChatGPT 的流式响应效果提升了用户体验。