开发者

Netty实现自定义协议编解码器

开发者 https://www.devze.com 2023-02-03 10:44 出处:网络 作者: 我是小趴菜
目录为什么要自定义协议自定义协议设计请求头请求体自定义协议实现1:创建一个Maven项目,引入Netty依赖,完整的依赖如下2:实现我们的协议请求头3:实现我们的协议请求体4:实现我们的协议请求类5:实现自定义编码器
目录
  • 为什么要自定义协议
  • 自定义协议设计
    • 请求头
    • 请求体
  • 自定义协议实现
    • 1:创建一个Maven项目,引入Netty依赖,完整的依赖如下
    • 2:实现我们的协议请求头
    • 3:实现我们的协议请求体
    • 4:实现我们的协议请求类
    • 5:实现自定义编码器
    • 6:实现自定义解码器
    • 7:Netty Server端
    • 8:Netty Server端处理器
    • 9:Netty Client端处理器
    • 10:Netty Client端
    • 11:测试
  • 完整代码

    为什么要自定义协议

    Netty自带了一些编解码器没,比如 StringDecode,StringEncoder,在实际业务中,协议往往需要携带一些我们自定义的属性,比如版本号,imei号,appId等,这时候Netty提供的编解码器就无法满足我们的需求,所以我们需要自定义协议和自定义的编解码器

    自定义协议设计

    我们可以仿造HTTP协议,比如 请求头 + 请求体 的格式

    请求头

    HTTP协议的请求头有 请求方法(GET,POST),版本号等,既然是自定义协议,那么肯定是要满足自己实际业务需求的,所以我们的请求头包含以下信息,也可以根据自己的业务去添加一些自定义的属性

     commond: 指令,比如说你发送给Netty的消息是【登录】还是【单聊消息】

              或者是【群发消息】又或者是【踢人下线】的请求.

     version:版本号,在后期如果升级版本的话,要兼容老版本,我们可以做判断,如果是老版本的就走A逻辑分支,新版本就走B逻辑分支

     clientType:客户端访问我们的IM系统是通过WEb端,还是IOS,或者是android端

     messageType:将客户端发送的数据解析成哪种格式,比如jsON,Protobuf,还是XML格式

     imeiLen:imei号的长度(imei号在请求体中)

     appId:我们的IM是以服务的方式提供出去的,我们需要知道这个请求是从哪个服务进来的,每个服务都有一个自定义唯一的appId

     bodyLen:我们的数据长度

    请求体

     imei号:登录设备的唯一标识,虽然有了clientType来判断是从WEB端还是IOS端访问的,

             但是并不知道是从哪台设备登录的,后期我们要做踢人下线,比如一个账号只能一台设备登录,

             或者是一个账号能同时登录WEB,或者是IOS端或者是Android端,我们就需要跟clientType一起判断

             

     data:我们要发送的数据

    自定义协议实现

    1:创建一个Maven项目,引入Netty依赖,完整的依赖如下

    <dependencies>
        <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.1.69.Final</version>
        </dependency>
        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.18.24</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.51</version>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.32</version>
        </dependency>
        <dependency>
          <groupId>cn.hutool</groupId>
          <artifactId>hutool-all</artifactId>
          <version>5.8.0.M2</version>
        </dependency>
    </dependencies>
    

    2:实现我们的协议请求头

    package com.chat.model;
    import lombok.Data;
    import Java.io.Serializable;
    @Data
    public class MessageHead implements Serializable {
        /**
         * 指令
         */
        private Integer commond;
        /**
         * 版本号
         */
        private Integer version;
        /**
         * clientType(WEB,IOS,Android)
         */
        private Integer clientType;
        /**
         * 数据解析类型 和具体业务无关,后续根据解析类型解析data数据 0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0
         */
        private Integer messageType = 0x0;
        /**
         * imei号长度
         */
        private Integer imeiLen;
        /**
         * appId
         */
        private Integer appId;
        /**
         * bodyLen,数据长度
         */
        private Integer bodyLen;
    }
    

    3:实现我们的协议请求体

    package com.chat.model;
    import lombok.Data;
    import java.io.Serializable;
    @Data
    public class MessageBody implements Serializable {
        /**
         * imei号
         */
        private String imei;
        /**
         * 数据
         */
        private Object data;
    }
    

    4:实现我们的协议请求类

    package com.chat.model;
    import lombok.Data;
    import java.io.Serializable;
    // Message就是我们Netty服务接收到的完整的(请求头+请求体)数据包
    @Data
    public class Message implements Serializable {
        private MessageHead messageHead;
        private MessageBody messageBody;
    }
    

    5:实现自定义编码器

    package com.chat.codec;
    import cn.hutool.json.JSONObject;
    import cn.hutool.json.JSONUtil;
    import com.chat.model.Message;
    import com.chat.model.MessageBody;
    import com.chat.model.MessageHead;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import java.util.List;
    /**
     * 自定义编码器
     */
    public class MessageDecoder extends ByteToMessageDecoder {
        /**
         * 协议格式:请求头 +imei号 + 请求体
         * 请求头: 指令(commond) + 版本号 + clientType + 消息解析类型 + imei长度 + appId + bodyLen
         *    指令:这条消息是做什么的,比如是登录,还是群发消息,还是单聊消息,还是踢人下线....
         *    版本号:协议的版本号,对于版本升级有帮助,比如A版本的走A逻辑,B版本的走B逻辑
         *    clientType:web端,IOS,Android
         *    消息解析类型:把这条消息解析成什么样的类型,有JSON,还是String等
         *    imei:虽然有clientType来标识出该用户是从WEB访问的还是IOS或者Android端登录的,但是这时候有二台IOS手机登录你就分辨不了了
         *          所以imei号是设备的唯一标识,这样可以在用户多端登录的时候踢人下线,来实现一个账号只能一台设备登录
         *    appId:如果我们的IM系统是以服务方式提供的,appId表示的是哪个服务来访问的
         *    bodyLen:数据长度
         *  所以请求头的长度是:7 * 4 = 28字节
         */
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
            //我们的请求头有7个属性,每个属性都是int型,所以占4个字节,如果小于28个字节说明这个请求数据是有问题的,
            if(in.readableBytes() < 28) {
                return;
            }
            //拿到指令
            int command = in.readInt();
            //拿到版本号
            int version = in.readInt();
            //拿到clientType
            int clientType = in.readInt();
            //拿到消息解析类型
            int messageType = in.readInt();
            //拿到imei号的长度
            int imeiLen = in.readInt();
            //拿到appId
            int appId = in.readInt();
            //拿到数据内容长度
            int bodyLen = in.readInt();
            //我们的数据是以流的形式读取的,当读取到的数据长度小于 imei号长度+data长度,说明还没有获取到完整的请求数据,需要重新再次读取接下来TCP发送过来的数据,直到等于了就代表
            //我们已经读取到一条完整的数据了,其实这也是一种解决TCP粘包和拆包的问题
            if(in.readableBytes() < (bodyLen + imeiLen)) {
                //表示读取的数据还不够
                in.resetReaderIndex();
                return;
            }
            //通过imei号长度读取imei号
            byte[] imeiData = new byte[imeiLen];
            in.readBytes(imeiData);
            String imei = new String(imeiData);
            //通过bodyLen读取数据内容
            byte[] bodyData = new byte[bodyLen];
            in.readBytes(bodyData);
            /**
             * 设置请求头
             */
            MessageHead messageHead = new MessageHead();
            messageHead.setCommond(command);
            messageHead.setAppId(appId);
            messageHead.setBodyLen(bodyData.length);
            messageHead.setImeiLen(imeiData.length);
            messageHead.setVersion(version);
            messageHead.setClientType(clientType);
            messageHead.setMessageType(messageType);
            /**
             * 设置请求体
             */
            MessageBody messageBody = new MessageBody();
            messageBody.setImei(imei);
            Message message = new Message();
            message.setMessageHead(messageHead);
            /**
             * 根据messageType来封装请求数据
             */
            if(messageType == 0x0) {
                //解析成JSON格式
                String body = new String(bodyData);
                com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
                jsonObject.put("body",body);
                messageBody.setData(jsonObject);
            }else if(messageType == 0x1) {
                //解析成Protobuf
            }else if(messageType == 0x2) {
                //解析成Xml
            }
            message.setMessageBody(messageBody);
            //更新读索引
            in.markReaderIndex();
            //最后通过管道写出去
            out.add(message);
        }
    }
    

    6:实现自定义解码器

    package com.chat.codec;
    import com.chat.model.Message;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import java.nio.charset.Charset;
    public class MessageEncoder extends MessageToByteEncoder<Message> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
            out.writeInt(message.getMessageHead().getCommond());
            out.writeInt(message.getMessageHead().getVersion());
            out.writeInt(message.getMessageHead().getClientType());
            out.writeInt(message.getMessageHead().getMessageType());
            out.writeInt(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")).length);
            out.writeInt(message.getMessageHead().getAppId());
            out.writeInt(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")).length);
            out.writeBytes(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")));
            out.writeBytes(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")));
        }
    }
    

    7:Netty Server端

    package com.chat.server;
    import com.chat.codec.MessageDecoder;
    import com.chat.codec.MessageEncoder;
    import io.netty.bootstrap.SerphpverBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    public class Server {
        public static void main(String[] args) throws Exception{
            // 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
            // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
            EventLoopGroup bossGroup = new NioEventLoopGroup(3);
            EventLoopGroup workerGroup = new NioEventLoopGroup(8);
            try {
                // 创建服务器端的启动对象
                ServerBootstrap bootstrap = new ServerBootstrap();
                // 使用链式编程来配置参数
                bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                        // 使用NioServerSocketChannel作为服务器的通道实现
                        .channel(NioServerSocketChannel.class)
                        // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                        // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new MessageDecoder());
                                ch.pipeline().addLast(new MessageEncoder());
                                ch.pipeline().addLast(new MyServerHandler());
                                //ch.pipeline().addLast(new ServerHandler());
                            }
                        });
                System.out.println("netty server start。。");
                // 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
                // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
                ChannelFuture cf = bootstrap.bind(9000).sync();
                // 给cf注册监听器,监听我们关心的事件
                /*cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (cf.isSuccess()) {
                            System.out.println("监听端口9000成功");
                        } else {
                            System.out.println("监听端口9000失败");
                        }
                    }
                });*/
                // 等待服务端监听端口关闭,closeFuture是异步操作
                // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
                cf.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    8:Netty Server端处理器

    package com.chat.server;
    import cn.hutool.json.JSONUtil;
    import com.chat.model.Message;
    import com.chat.model.MessageBody;
    import com.chat.model.MessageHead;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class MyServerHandler extends SimpleChannelInboundHandler<Message> {
        private final static Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
            System.out.println("这是客户端发送的消息" + JSONUtil.toJsonPrettyStr(message));
            Message messageResponse = new Message();
            MessageHead messageHead = new MessageHead();
            messageHead.setCommond(9988);
      python      messageHead.setMessageType(0xZAFpoyUIg0);
            messageHead.setClientType(1);
            messageHead.setVersion(2);
            messageHead.setAppId(3);
            String msg = "这是服务端发送给你的消息";
            messageHead.setBodyLen(msg.getBytes().length);
            String imei = "12-euri-1234";
            messageHead.setImeiLen(imei.getBytes().length);
            MessageBody messageBody = new MessageBody();
            messageBody.setImei(imei);
        开发者_开发培训    messageBody.setData(msg);
            messageResponse.setMessageHead(messageHead);
            messageResponse.setMessageBody(messageBody);
            ctx.writeAndFlush(messageResponse);
        }
    }
    

    9:Netty Client端处理器

    package com.chat.client;
    import cn.hutool.json.JSONUtil;
    import com.chat.model.Message;
    import com.chat.model.MessageBody;
    import com.chat.model.MessageHead;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    public class ClientHandler extends SimpleChannelInboundHandler<Message> {
        /**
         * 当客户端连接服务器完成就会触发该方法
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            for(int i = 0; i < 20; i ++) {
                Message message = new Message();
                MessageHead messageHead = new MessageHead();
                messageHead.setCommond(9988);
                messageHead.setMessageType(0x0);
                messageHead.setClientType(1);
                messageHead.setVersion(2);
                messageHead.setAppId(3);
                String msg = "hello-" + i;
                messageHead.setBodyLen(msg.getBytes().length);
                String imei = "12-euri";
                messageHead.setImeiLen(imei.getBytes().length);
                MessageBody messageBody = new MessageBody();
                messageBody.setImei(imei);
                messageBody.setData(msg);
                message.setMessageHead(messageHead);
                message.setMessageBody(messageBody);
                ctx.writeAndFlush(message);
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
        //当通道有读取事件时会触发,即服务端发送数据给客户端
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
            System.out.println(JSONUtil.toJsonPrettyStr(message));
        }
    }
    

    10:Netty Client端

    package com.chat.client;
    import com.chat.codec.MessageDecoder;
    import com.chat.codec.MessageEncoder;
    import io.netty.bootstrap.Bootstrap;
    import io.njavascriptetty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    public class Client {
        public static void main(String[] args) throws Exception{
                //客户端需要一个事件循环组
                EventLoopGroup group = new NioEventLoopGroup();
                try {
                    //创建客户端启动对象
                    //注意客户端使用的不是ServerBootstrap而是Bootstrap
                    Bootstrap bootstrap = new Bootstrap();
                    //设置相关参数
                    bootstrap.group(group) //设置线程组
                            .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketCh编程客栈annel ch) throws Exception {
                                    //加入处理器
                                    ch.pipeline().addLast(new MessageDecoder());
                                    ch.pipeline().addLast(new MessageEncoder());
                                    ch.pipeline().addLast(new ClientHandler());
                                }
                            });
                    System.out.println("netty client start。。");
                    //启动客户端去连接服务器端
                    ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
                    //对通道关闭进行监听
                    cf.channel().closeFuture().sync();
                } finally {
                    group.shutdownGracefully();
                }
            }
    }
    

    11:测试

    1: 先启动Server端的main方法

    2:再启动Client端的main方法

    3:查看控制台

    服务端控制台:

    Netty实现自定义协议编解码器

    客户端控制台:

    Netty实现自定义协议编解码器

    完整代码

    全部代码就是下图这几个类,上面已经贴出每个类的全部代码,直接复制就行了

    Netty实现自定义协议编解码器

    以上就是Netty实现自定义协议编解码器的详细内容,更多关于Netty自定义协议编解码器的资料请关注我们其它相关文章!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消