一篇文章彻底搞懂Netty编解码器和Handler的调用机制
ztj100 2024-12-14 16:12 17 浏览 0 评论
基本说明
1、netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等;
2、ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如:实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站的数据。
3、ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务器端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。
编码解码器
1、当Netty发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另外一种格式(比如Java对象);如果是出站消息,它会被编码成字节。
2、Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHandler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了,以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler.
解码器-ByteToMessageDecoder
1、由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理,它的类继承关系如下:
2、一个关于ByteToMessageDecoder实例分析
public class ToIntegerDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out) throws Exception{
if(in.readableBytes()>=4){
out.add(in.readInt());
}
}
}
说明:
① 这个例子,每次入站从ByteBuf中读取4个字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据;
Netty的handler链的调用机制
实例要求
1、使用自定义的编码器和解码器来说明Netty的handler调用机制;
2、客户端发送long->服务端,服务端发送long->客户端;
代码演示
服务端代码,绑定7000端口,并定义一个初始化类MyServerInitializer来设置服务端的解码器、编码器、和它的业务逻辑处理器
/**
* 服务端
*/
public class MyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
//自定义一个初始化类
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerInitializer自定义类的代码
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//入站的handler进行解码MybateToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
//这是出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler处理业务逻辑
pipeline.addLast(new MyServerHandler());
}
}
自定义的解码器MyByteToLongDecoder,对于服务端来说,当消息入站时,会调用该解码器对消息进行解码,前提条件是入站的消息类型和解码器的处理消息类型一致解码器才会被调用,注意我们这里演示的是long类型的消息,它的字节长度为8个字节,所以在解码的时候,我们要进行判断一下,有8个字节才进去读取
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*decode 会根据接收的数据,被调用多次,直到确定没有新的元素被添加到list,或者
* 是ByteBuf没有更多的可读字节为止,如果list out 不为空,就会将list的内容传递给下一个
* channelinboundHandler处理器,该处理器的方法也会被调用多次
* @param ctx 上下文对象
* @param in 入站的ByteBuf
* @param out List集合,将解码后的数据传给下一个handler
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder 被调用");
//因为Long 为8个字节,这里需要判断有8个字节,才能读取一个long
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
自定义编码器MyLongToByteEncoder,对于服务端来说,当数据出站时,它需要将数据进行编码再发送,它的原理其实和解码器是一样的道理,只有发送的消息类型和编码器处理的数据类型一致,该编码器才会在消息发出之前被调用,不然,它就不会经过该编码器,消息就会直接发送出去
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
/**
* 编码方法,这个编码方法会根据接收到的数据类型,如果是Long类型就被被调用,
* 如果不是Long类型就不会调用该编码器,直接就会将数据发送出去不会经过该编码器,判断的逻辑在父类MessageToByteEncoder的write方法里面的this.acceptOutboundMessage(msg)
* 因此我们在编写Encoder要注意传入的数据类型和处理的数据类型要一致
* @param ctx 上下文
* @param msg 发送的消息
* @param out
*/
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用,msg="+msg);
//因为msg已经是long类型了,所以这里不需要进行编码,这里只是讲解一下编码的逻辑
out.writeLong(msg);
}
}
服务端的业务逻辑处理器MyServerHandler,这里我们将客户端发送给服务端的消息直接打印出来,并且给服务端发送一个long类型的消息
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端"+ctx.channel().remoteAddress()+" 读取到long "+msg);
//给客户端发送一个long
ctx.writeAndFlush(98979L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端代码,定义一个初始化类MyClientInitializer来设置它的解码器、编码器、和它的业务逻辑处理器
public class MyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
//自定义一个初始化类
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端的初始化类MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler 对数据进行编码
pipeline.addLast(new MyLongToByteEncoder());
//这是一个入站的解码器(入站handler)
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义的handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}
然后它也是需要有编码器、解码器,和服务端的原理是一样的,它的业务处理器用来处理业务,也就是它可以只关注业务MyClientHandler,这里客户端的业务逻辑只打印出服务端发送的消息,及一连接上服务端就给服务端发送一条消息
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip="+ctx.channel().remoteAddress()+",收到消息="+msg);
}
/**
* 一连上服务端就给服务端发送消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//发送一个Long类型的数据
ctx.writeAndFlush(1234567L);
}
}
演示代码,先启动服务端,然后再启动客户端,先观察服务端的输出,根据输出可以看到当数据入站时,解码器先被调用,然后再调用业务逻辑处理器,业务逻辑处理器发送消息给客户端的时候,会调用编码器
MyByteToLongDecoder 被调用
从客户端/127.0.0.1:53398 读取到long 1234567
MyLongToByteEncoder encode 被调用,msg=98979
观察客户端的输出,可以看到发送消息给服务端时,编码器被调用,当服务端回送消息回来的时候,解码器被调用然后再到业务逻辑处理器
MyClientHandler 发送数据
MyLongToByteEncoder encode 被调用,msg=1234567
MyByteToLongDecoder 被调用
服务器的ip=localhost/127.0.0.1:7000,收到消息=98979
3、结论
① 不论解码器handler还是编码器handler即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行;
② 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会和期望的可能不一致;
解码器-ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
1、ReplayingDecoder 扩展了ByteToMessageDecoder类,使用这个类我们不必调用readableBytes()方法,参数S指定了用户状态管理的类型,其中void代表不需要状态管理;
2、应用实例:使用ReplayingDecoder编写解码器,对前面的案例进行简化,它可以替代上面的解码器,这样子在服务端读取消息的时候,就不用判断接收到的消息的字节数是否够8个字节,因为它底层已经做了处理,它直接读取消息便可
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext cx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
3、ReplayingDecoder使用方便,但它也有一些局限性
① 并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException;
② ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢;
其他编解码器
1、LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
2、DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
3、HttpObjectDecoder:一个HTTP数据的解码器。
4、LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理粘包和半包消息。
......
相关推荐
- 从IDEA开始,迈进GO语言之门(idea got)
-
前言笔者在学习GO语言编程的时候,GO语言在国内还没有像JAVA/Php/Python那样普及,绕了不少的弯路,要开始入门学习一门编程语言,最好就先从选择一个好的编程语言的开发环境开始,有了这个开发环...
- 基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于SpringBoot...
- 基于springboot的个人服装管理系统java网上商城jsp源代码mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...
- 基于springboot的美食网站Java食品销售jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...
- 贸易管理进销存springboot云管货管账分析java jsp源代码mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述贸易管理进销存spring...
- SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍SpringBoot+V...
- 目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗
-
帮粉丝找了一个基于SpringBoot的天猫商城项目,快速部署运行,所用技术:MySQL,Druid,Log4j2,Maven,Echarts,Bootstrap...免费给大家分享出来前台演示...
- SpringBoot+Mysql实现的手机商城附带源码演示导入视频
-
今天为大家带来的是基于SpringBoot+JPA+Thymeleaf框架的手机商城管理系统,商城系统分为前台和后台、前台用的是Bootstrap框架后台用的是SpringBoot+JPA都是现在主...
- 全网首发!马士兵内部共享—1658页《Java面试突击核心讲》
-
又是一年一度的“金九银十”秋招大热门,为助力广大程序员朋友“面试造火箭”,小编今天给大家分享的便是这份马士兵内部的面试神技——1658页《Java面试突击核心讲》!...
- SpringBoot数据库操作的应用(springboot与数据库交互)
-
1.JDBC+HikariDataSource...
- SpringBoot 整合 Flink 实时同步 MySQL
-
1、需求在Flink发布SpringBoot打包的jar包能够实时同步MySQL表,做到原表进行新增、修改、删除的时候目标表都能对应同步。...
- SpringBoot + Mybatis + Shiro + mysql + redis智能平台源码分享
-
后端技术栈基于SpringBoot+Mybatis+Shiro+mysql+redis构建的智慧云智能教育平台基于数据驱动视图的理念封装element-ui,即使没有vue的使...
- Springboot+Mysql舞蹈课程在线预约系统源码附带视频运行教程
-
今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的Springboot+Mysql舞蹈课程在线预约系统,系统项目源代码在【猿来入此】获取!https://www.yuan...
- SpringBoot+Mysql在线众筹系统源码+讲解视频+开发文档(参考论文
-
今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的在线众筹管理系统,主要实现了普通用户在线参与众筹基本操作流程的全部功能,系统分普通用户、超级管理员等角色,除基础脚手架外...
- Docker一键部署 SpringBoot 应用的方法,贼快贼好用
-
这两天发现个Gradle插件,支持一键打包、推送Docker镜像。今天我们来讲讲这个插件,希望对大家有所帮助!GradleDockerPlugin简介...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- 从IDEA开始,迈进GO语言之门(idea got)
- 基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql
- 基于springboot的个人服装管理系统java网上商城jsp源代码mysql
- 基于springboot的美食网站Java食品销售jsp源代码Mysql
- 贸易管理进销存springboot云管货管账分析java jsp源代码mysql
- SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql
- 目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗
- SpringBoot+Mysql实现的手机商城附带源码演示导入视频
- 全网首发!马士兵内部共享—1658页《Java面试突击核心讲》
- SpringBoot数据库操作的应用(springboot与数据库交互)
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)