2021-07-03

Netty 框架学习 —— 预置的 ChannelHandler 和编解码器


Netty 为许多提供了许多预置的编解码器和处理器,几乎可以开箱即用,减少了在烦琐事务上话费的时间和精力


空闲的连接和超时

检测空闲连接以及超时对于释放资源来说至关重要,Netty 特地为它提供了几个 ChannelHandler 实现

名称描述
IdleStateHandler当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件,然后,你可以通过在 ChannelInboundHandler 重写 userEventTriggered() 方法来处理该 IdleStateEvent 事件
ReadTimeoutHandler如果在指定的时间间隔内没有收到入站数据,则抛出一个 ReadTimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught() 方法来检测该 ReadTimeoutException
WriteTimeoutHandler如果在指定的时间间隔内没有出站数据写入,则抛出一个 WriteTimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught() 方法来检测该 WriteTimeoutException

下述代码展示了当使用通常的发送心跳消息到远程节点的方法时,如果 60 秒内没有接收或者发送任何数据,我们将得到通知,如果没有响应,则连接会关闭

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception {  ChannelPipeline pipeline = ch.pipeline();  // IdleStateHandler 将在被触发时发送一个 IdleStateEvent 事件  pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));  // 将一个 HeartbeatHandler 添加到 ChannelPipeline  pipeline.addLast(new HeartbeatHandler()); } public static final class HeartbeatHandler extends SimpleChannelInboundHandler {  // 发送到远程节点的心跳消息  private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled    .unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));  @Override  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {   if (evt instanceof IdleStateEvent) {    // 发送心跳消息,并在发送失败时关闭该连接    ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())      .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);   } else {    super.userEventTriggered(ctx, evt);   }  }  @Override  protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {     } }}

解码基于分隔符的协议

基于分隔符的消息协议使用定义的字符来标记消息或者消息段的开头或者结尾,下表列出的解码器能帮助你定义可以提取由任意标记序列分隔的帧的自定义解码器

名称描述
DelimiterBasedFrameDecoder使用由用户提供的分隔符来提取帧
LineBasedFrameDecoder由行尾符(\n 或者 \r\n)分隔帧

下述代码展示了如何使用 LineBasedFrameDecoder 处理由行尾符分隔的帧

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception {  ChannelPipeline pipeline = ch.pipeline();  // 该 LineBasedFrameDecoder 将提取的帧转发给下一个 ChannelInboundHandler  pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));  // 添加 FrameHandler 以接收帧  pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {  @Override  protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {   // do something  } }}

如果你还使用除了行尾符之外的分隔符来分隔帧,那么你还可以使用 DelimiterBasedFrameDecoder,只需要将特定的分隔符序列指定到其构造函数即可

作为示例,我们将使用下面的协议规范:

  • 传入数据流是一系列的帧,每个帧都由换行符 \n 来分隔
  • 每个帧都由一系列元素组成,每个元素由单个空格字符分隔
  • 一个帧的内容代表一个命令,定义为一个命令名称后跟着数目可变的参数

基于这个协议,我们的自定义解码器将定义以下类:

  • Cmd —— 将帧的命令存储在 ByteBuf 中,一个 ByteBuf 用于名称,另一个用于参数
  • CmdDecoder —— 从被重写了的 decode() 方法中获取一行字符串,从它的内容构建一个 Cmd 实例
  • CmdHandler —— 从 CmdDecoder 获取解码的 Cmd 对象,并对它进行一些处理
public class CmdHandlerInitializer extends ChannelInitializer<Channel> { static final byte SPACE = (byte) ' '; @Override protected void initChannel(Channel ch) throws Exception {  ChannelPipeline pipeline = ch.pipeline();  pipeline.addLast(new CmdDecoder(64 * 1024));  pipeline.addLast(new CmdHandler()); } /**  * Cmd POJO  */ public static final class Cmd {  private final ByteBuf name;  private final ByteBuf args;  public Cmd(ByteBuf name, ByteBuf args) {   this.name = name;   this.args = args;  }  public ByteBuf getArgs() {   return args;  }  public ByteBuf getName() {   return name;  } } public static final class CmdDecoder extends LineBasedFrameDecoder {  public CmdDecoder(int maxLength) {   super(maxLength);  }  @Override  protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {   // 从 ByteBuf 中提取由行尾符序列分隔的帧   ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);   // 如果输入中没有帧,则返回 null   if (frame == null) {    return null;   }   // 查找第一个空格字符的索引,前面是命令名称,后面是参数   int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);   // 使用包含命令名称和参数的切片创建新的 Cmd 对象   return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex()));  } } public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {  @Override  protected void messageReceived(ChannelHandlerContext ctx, Cmd msg) throws Exception {   // 处理传经 ChannelPipeline 的 Cmd 对象  } }}

基于长度的协议

基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束,下表列出 Netty 提供的用于处理这种类型的协议的两种解码器

名称描述
FixedLengthFrameDecoder提取在调用构造函数时指定的定长帧
LengthFieldBasedFrameDecoder根据帧头部中的长度值来提取帧:该字段的偏移量以及长度在构造函数中指定

你经常会遇到被编码到消息头部的帧大小不是固定值的协议,为了处理这种变长帧,你可以使用 LengthFieldBasedFrameDecoder,它将从头部字段确定帧长,然后从数据流中提取指定的字节数

下图展示了一个示例,其中长度字段在帧中的偏移量为 0,并且长度为 2 字节

下述代码展示了如何使用其 3 个构造函数分别为 maxFrameLength、lengthFieldOffser 和 lengthFieldLength 的构造函数。在这个场景下,帧的长度被编码到了帧起始的前 8 个字节中

public class LengthBasedInitializer extends ChannelInitializer<Channel> { /**  * 使用 LengthFieldBasedFrameDecoder 解码将帧长度编码到帧起始的前 8 个字节中的消息  */ @Override protected void initChannel(Channel ch) throws Exception {  ChannelPipeline pipeline = ch.pipeline();  pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));  pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {  @Override  protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {   // do something  } }}

写大型数据

因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。由于写操作是非阻塞的,所以即时没有写出所有的数据,写操作也会在完成时返回并通知 ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要考虑处理远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。让我们考虑下将一个文件内容写出到网络的情况

NIO 的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有这一切都发生在 Netty 的核心中,所以应用程序需要做的就是使用一个 FileRegion 接口的实现

下述代码展示了如何通过从 FileInputStream 创建一个 DefaultFileRegion,并将其写入 Channel

// 创建一个 FileInputStreamleInputStream in = new FileInputStream(file);// 以该文件的的完整长度创建一个新的 DefaultFileRegionFileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());// 发送该 DefaultFileRegion,并注册一个 ChannelFutureListenerchannel.writeAndFlush(region).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {  // 处理失败  if(!future.isSuccess()) {   Throwable cause = future.cause();   // do something  } }});

这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗

interface ChunkedInput<B> 中的类型参数 B 是 readChunk() 方法返回的类型。Netty 预置了该接口的四个实现,如表所示,每个都代表了一个将由 ChunkedWriteHandler 处理的不定长度的数据流

名称描述
ChunkedFile从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用
ChunkedNioFile和 ChunkedFile 类似,只是它使用了 FileChannel
ChunkedStream从 InputStream 中逐块传输内容
ChunkedNioStream从 ReadableByteChannel 中逐步传输内容

下述代码说明了 ChunkedStream 的用法,它是实践中最常用的实现。所示的类使用了一个 File 以及一个 SSLContext 进行实例化,当 ......

原文转载:http://www.shaoqun.com/a/845150.html

跨境电商:https://www.ikjzd.com/

一淘网:https://www.ikjzd.com/w/1698

克雷格:https://www.ikjzd.com/w/194

四海商舟:https://www.ikjzd.com/w/1516


Netty为许多提供了许多预置的编解码器和处理器,几乎可以开箱即用,减少了在烦琐事务上话费的时间和精力空闲的连接和超时检测空闲连接以及超时对于释放资源来说至关重要,Netty特地为它提供了几个ChannelHandler实现名称描述IdleStateHandler当连接空闲时间太长时,将会触发一个IdleStateEvent事件,然后,你可以通过在ChannelInboundHandler重写us
hemingway:https://www.ikjzd.com/w/2344
纯干货分享:外媒力荐12个有效实用的亚马逊FBA卖家小工具!:https://www.ikjzd.com/articles/11955
一份来自eBay卖家的控诉:eBay竟然隐藏我的拍卖清单!!:https://www.ikjzd.com/articles/11956
曝!又一品牌起诉侵权,多位卖家惨遭巨额罚款!:https://www.ikjzd.com/articles/11962
西班牙人爱上阿里速卖通,双11、黑五包裹收不停!:https://www.ikjzd.com/articles/11964
撞击旗袍丝袜老师 小坏蛋你太厉害了我受不了了:http://lady.shaoqun.com/m/a/247953.html
护士脱了裙子坐了上去 护士被医生办公室狂玩:http://lady.shaoqun.com/m/a/247595.html
口述公么的粗大满足了我 睡着了被偷偷滑进去了:http://www.30bags.com/m/a/249933.html
视频直击|老人公园突然被陌生男子按摩,结果差点赔了5万...:http://lady.shaoqun.com/a/400662.html
白河峡谷漂流怎么样,好玩吗?:http://www.30bags.com/a/470636.html
晚上下班婆婆把我锁在外面,半夜在公园长椅上被陌生人侵犯:http://lady.shaoqun.com/a/400663.html
女生在公园锻炼,却在大白天被陌生男人猥亵!:http://lady.shaoqun.com/a/400664.html

No comments:

Post a Comment