基于netty的websocket服务优化

发布时间:2026/6/26 6:21:57
基于netty的websocket服务优化 本文默认读者对netty有一定的使用经验对于一些netty的基础概念不会说明本文主要探讨对于netty实现的websocket服务端推送数据的一些优化点如果存在错误的地方欢迎指出并且文中有说明不清晰的地方也欢迎留言讨论开始喽通常我们使用如下的方式创建一个基于netty的websocket服务然后再此基础上进行业务开发。// ai生成的demo public class WebSocketServer { private final int port; public WebSocketServer(int port) { this.port port; } public void run() throws Exception { EventLoopGroup bossGroup new NioEventLoopGroup(); EventLoopGroup workerGroup new NioEventLoopGroup(); try { ServerBootstrap b new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline ch.pipeline(); // HTTP 编解码器 pipeline.addLast(new HttpServerCodec()); // HTTP 消息聚合器 pipeline.addLast(new HttpObjectAggregator(65536)); // WebSocket 协议处理器 pipeline.addLast(new WebSocketServerProtocolHandler(/ws)); // 自定义 WebSocket 处理器 pipeline.addLast(new WebSocketFrameHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口并启动服务器 ChannelFuture f b.bind(port).sync(); System.out.println(WebSocket 服务器启动监听端口 port); // 等待服务器 socket 关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port 8080; new WebSocketServer(port).run(); } }此时的websocket服务仅仅是最基础的版本虽然可以用但是在成本性能健壮性上仍存在一些优化空间以下是一些优化点的原理介绍快速订阅优化如下图是客户端正常订阅websocket流程不包括TLS在这个流程中客户端从建立连接到收到消息一共要经过3个rtt时间如果我们将协议升级和发送订阅请求合并则可以减少1个rtt事件快速订阅流程将协议升级和订阅请求合并后只需要2个rtt事件即可协议升级本质上是一次get请求所以get请求中就可以携带参数握手成功后netty会触发一个userEventTriggered方法传递HandshakeComplete事件事件中会包含请求的uriheader协议等数据所以我们只需要实现一个userEventTriggered方法来处理这个事件中的数据将订阅参数处理即可在网络延迟较大的客户端中节省一个rtt的时间可能就有几十或者上百的毫秒节省下面这个代码是netty中WebSocketServerProtocolHandshakeHandler读接口实现包含了上面提到的发送HandshakeComplete事件的逻辑感兴趣的可以自己debug一下public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final HttpObject httpObject (HttpObject) msg; if (httpObject instanceof HttpRequest) { final HttpRequest req (HttpRequest) httpObject; isWebSocketPath isWebSocketPath(req); if (!isWebSocketPath) { ctx.fireChannelRead(msg); return; } try { final WebSocketServerHandshakerFactory wsFactory new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()), serverConfig.subprotocols(), serverConfig.decoderConfig()); final WebSocketServerHandshaker handshaker wsFactory.newHandshaker(req); final ChannelPromise localHandshakePromise handshakePromise; if (handshaker null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker); ctx.pipeline().remove(this); final ChannelFuture handshakeFuture handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener(new ChannelFutureListener() { Override public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { localHandshakePromise.tryFailure(future.cause()); ctx.fireExceptionCaught(future.cause()); } else { localHandshakePromise.trySuccess(); ctx.fireUserEventTriggered( WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); // 发送握手成功事件 ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } }); applyHandshakeTimeout(); } } finally { ReferenceCountUtil.release(req); } } else if (!isWebSocketPath) { ctx.fireChannelRead(msg); } else { ReferenceCountUtil.release(msg); } }除了上面这个快速订阅的过程可以节省1个rtt事件外还可以和TCP Fast Open优化同时使用可以再次节省一个rtt时间。这样可以将3个rtt压缩为1个rtt时间。不过这个优化暂时并没有应用到生产上因此不过多介绍感兴趣的可以自行查阅实现数据压缩优化压缩扩展如果推送数据较多时通过压缩数据可以节省大量的网络流量。那么如何压缩数据就有多种选择首先可以在业务层手动将数据压缩后发送出去。但是这种方式导致数据在业务层不可读比如浏览器中会发现消息是乱码无法查看。因此我们更推荐在传输层进行压缩websocket协议中本身支持permessage-deflate扩展permessage-deflate扩展主要用于将推送数据进行压缩处理。通过此扩展即可在发送后由框架自动将数据压缩具体实现只需要将WebSocketServerProtocolHandler处理器前面加上WebSocketServerCompressionHandler处理器并且将WebSocketServerProtocolHandler处理器中的支持扩展参数配置为true即可支持压缩功能下面是WebSocketServerProtocolHandler的其中一个构造函数其中第3个参数allowExtensions配置为true则代表开启扩展功能public WebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, boolean checkStartsWith, boolean dropPongFrames) { this(websocketPath, subprotocols, allowExtensions, maxFrameSize, allowMaskMismatch, checkStartsWith, dropPongFrames, DEFAULT_HANDSHAKE_TIMEOUT_MILLIS); }选择性压缩在开了压缩功能后netty会默认将所有的BinaryWebSocketFrame和TextWebSocketFrame消息进行压缩但是如果消息体较小则可能额外消耗了cpu但无法有比较好的压缩率因此可以选择将超过一定大小的消息进行压缩WebSocketExtensionFilter接口决定是否跳过扩展功能只要实现这个接口并配置到netty中即可完成选择性压缩功能Component public class CustomWebSocketExtensionFilter implements WebSocketExtensionFilter { Resource private WebSocketConfig webSocketConfig; Override public boolean mustSkip(WebSocketFrame frame) { if(frame instanceof TextWebSocketFrame || frame instanceof BinaryWebSocketFrame) { // 小于一定长度的消息则跳过压缩返回值代表是否跳过压缩 if(frame.content().readableBytes() webSocketConfig.getMinCompressionLength()) { return true; } } return false; } }构造一个新的WebSocketServerExtensionHandler替换掉WebSocketServerCompressionHandler即可代码如下Component public class WebSocketServerExtensionHandlerFactory { Resource private CustomWebSocketExtensionFilter customWebSocketExtensionFilter; private CustomWebSocketExtensionFilterProvider customWebSocketExtensionFilterProvider new CustomWebSocketExtensionFilterProvider(); public WebSocketServerExtensionHandler newInstance() { return new WebSocketServerExtensionHandler(new PerMessageDeflateServerExtensionHandshaker(6, ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(), PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE, false, false, customWebSocketExtensionFilterProvider )); } private class CustomWebSocketExtensionFilterProvider implements WebSocketExtensionFilterProvider { Override public WebSocketExtensionFilter encoderFilter() { return customWebSocketExtensionFilter; } Override public WebSocketExtensionFilter decoderFilter() { return WebSocketExtensionFilter.NEVER_SKIP; } } }需要额外关注的一点是写入数据时如果我们手动将一条消息依次发送给多个channel则每个channel的管道都会将消息压缩一次但是如果通过channelGroup将消息发送给多个channel则消息只会压缩一次缓冲区缓冲区打满tcp连接中每个连接都有自己的读写缓冲区如果接收方接受数据过慢那么最终会阻塞到发送方的写缓冲区但是由于netty是事件驱动的因此当写缓冲区打满时消息会堆积到内存中直到把内存打满引发fullgc问题因此推送数据时必须处理写缓存区打满的问题这个问题处理方式比较简单netty中提供了是否可写的方法当不可写入的时候丢弃掉消息就不会将消息堆积