说明
netty是java重要的企业级NIO,使用它可以快速实现很多功能通信功能如:http、ftp、socket、websocket、udp等。
本站使用自定义网包实现网络通信。
分享
- 大数据博客列表
- 开发记录汇总
- 个人java工具库 项目https://gitee.com/wangzonghui/object-tool
- 包含json、string、集合、excel、zip压缩、pdf、bytes、http等多种工具,欢迎使用。
内置编码器和解码器
解码器
名称 | 说明 |
---|
ByteToMessageDecoder | 如果想实现自己的半包解码器,实现该类 |
MessageToMessageDecoder | 一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类; |
LineBasedFrameDecoder | 通过在包尾添加回车换行符 \r\n 来区分整包消息; |
StringDecoder | 字符串解码器; |
DelimiterBasedFrameDecoder | 特殊字符作为分隔符来区分整包消息; |
FixedLengthFrameDecoder | 报文大小固定长度,不够空格补全; |
ProtoBufVarint32FrameDecoder | 通过 Protobuf 解码器来区分整包消息; |
ProtobufDecoder | Protobuf 解码器; |
LengthFieldBasedFrameDecoder | 指定长度来标识整包消息,通过在包头指定整包长度来约定包长。 |
编码器
名称 | 说明 |
---|
ProtobufEncoder | Protobuf 编码器; |
MessageToByteEncoder | 将 Java 对象编码成 ByteBuf; |
MessageToMessageEncoder | 如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个; |
LengthFieldPrepender | LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。 |
代码实现
创建核心类
消息实体类
public class MyMessage {private int len;private byte[] content;public int getLen() {return len;}public void setLen(int len) {this.len = len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}
自定义编码类
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class MyMessageEncoder extends MessageToByteEncoder<MyMessage> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {byteBuf.writeInt(myMessage.getLen());byteBuf.writeBytes(myMessage.getContent());}}
自定义解码类
import java.util.List;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;public class MyMessageDecoder extends ByteToMessageDecoder {int length=0;@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {if(byteBuf.readableBytes()>=4){if(length==0){length=byteBuf.readInt();}if(byteBuf.readableBytes()<length){return;}byte[] content=new byte[length];byteBuf.readBytes(content);MyMessage message=new MyMessage();message.setLen(length);message.setContent(content);list.add(message);length=0;}}}
服务端
ServerHandler
import com.netty.cn.rpc.selfmessage.core.MyMessage;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class MyServerHandler extends SimpleChannelInboundHandler<MyMessage> {private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage myMessage) throws Exception {System.out.println("服务端收到消息:");System.out.println("长度:"+myMessage.getLen());System.out.println("内容: "+new String(myMessage.getContent(),CharsetUtil.UTF_8));System.out.println("收到消息数量:"+(++count));String msg="服务端收到请求";MyMessage message=new MyMessage();message.setContent(msg.getBytes(CharsetUtil.UTF_8));message.setLen(msg.getBytes(CharsetUtil.UTF_8).length);ctx.writeAndFlush(message);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {super.channelReadComplete(ctx);}
}
入口类
import com.netty.cn.rpc.selfmessage.core.MyMessageDecoder;
import com.netty.cn.rpc.selfmessage.core.MyMessageEncoder;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class MyServer {public static void main(String[] args) {int port=8080;EventLoopGroup bossGroup=new NioEventLoopGroup(1);EventLoopGroup workerGroup=new NioEventLoopGroup();try {ServerBootstrap bootstrap=new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,port).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) {ChannelPipeline channelPipeline=socketChannel.pipeline();channelPipeline.addLast(new MyMessageDecoder());channelPipeline.addLast(new MyMessageEncoder());channelPipeline.addLast(new MyServerHandler());}});System.out.println("netty server start");ChannelFuture cf=bootstrap.bind(port).sync();cf.channel().closeFuture().sync();}catch(Exception e){
System.out.println(e.toString());}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
客户端
ClientHandler
import com.netty.cn.rpc.selfmessage.core.MyMessage;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class MyClientHandler extends SimpleChannelInboundHandler<MyMessage> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("连接服务端 "+ctx.channel().remoteAddress()+" 成功");String msg="你好,我是张asdfasdfsadfwerwerwerwerewrewrewrewr三。";for (int i=0;i<20;i++){MyMessage message=new MyMessage();message.setContent(msg.getBytes(CharsetUtil.UTF_8));message.setLen(msg.getBytes(CharsetUtil.UTF_8).length);ctx.writeAndFlush(message);}}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MyMessage myMessage) throws Exception {System.out.println("client 接收到信息:"+new String(myMessage.getContent()).toString());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
入口类
import com.netty.cn.rpc.selfmessage.core.MyMessageDecoder;
import com.netty.cn.rpc.selfmessage.core.MyMessageEncoder;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient {public static void main(String[] args) {int port=8080;EventLoopGroup group=new NioEventLoopGroup();try {Bootstrap bootstrap=new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) {ChannelPipeline channelPipeline=socketChannel.pipeline();channelPipeline.addLast(new MyMessageDecoder());channelPipeline.addLast(new MyMessageEncoder());channelPipeline.addLast(new MyClientHandler());}});ChannelFuture cf =bootstrap.connect("127.0.0.1",port).sync();cf.channel().closeFuture().sync();System.out.println("启动客户端"+port);} catch(Exception e){
System.out.println(e.toString());}finally {group.shutdownGracefully();}}
}
测试
- 先启动
MyServer
,再启动 MyClient
,可以看到控制台打印如下内容: - Server
netty server start
服务端收到消息:
长度:60
内容: 你好,我是张asdfasdfsadfwerwerwerwerewrewrewrewr三。
收到消息数量:1
连接服务端 /127.0.0.1:8080 成功
client 接收到信息:服务端收到请求
参考
总结
- 该方式定义了数据传输结构,传输过程中由编码器
ByteBuf
完成数据处理。 - 由于内容是二进制格式,可以存储数据,如json字符串、protobuf二次处理后数据,提升了数据传输灵活性。