Java IO编程由浅入深 - 6 (bio实现C/S聊天室 1 )_小小白鸽的博客-程序员宅基地

技术标签: java  io  bio  socket  网络协议  netty  

目录

  1. Java IO编程由浅入深 - 1 (bio c/s架构实现)

  2. Java IO编程由浅入深 - 2(bio 基于字符串的消息编解码器)

  3. Java IO编程由浅入深 - 3 (bio 基于消息长度的解编码器)

  4. Java IO编程由浅入深 - 4 (bio http协议解编码器,实现http服务器)

  5. Java IO编程由浅入深 - 5 (项目架构重构)

  6. Java IO编程由浅入深 - 6 (bio实现C/S聊天室 1 )

  7. Java IO编程由浅入深 - 7 (bio实现C/S聊天室 2 )

  8. Java IO编程由浅入深 - 8 (bio实现C/S聊天室 3 )

  9. Java IO编程由浅入深 - 9 (bio实现http升级到websocket )

  10. Java IO编程由浅入深 - 10 (bio 基于websocket的心跳检测实现 )

为什么要自定义传输协议

客户端与服务端通讯,要保证数据传输的安全性,同时也要保证过滤垃圾消息,同时开源的协议,基本是通用的,我这次的需求是保证内部通讯,协议是唯一性的,不能让外部客户端随意连接进来进行通讯,因此需要自定义传输协议,保证数据的安全性

定义数据传输协议

/**
 * @author lhstack
 * 消息编解码器,用于客户端与服务端通讯 client / server
 * 自定义协议
 * 第一个字节 0(必须为0) 000(0-7 区间消息类型 ping,pong,binary,text,stream) 0000(扩展类型,自定义,0-15区间)
 * 第二个字节 高两位 消息长度类型 0 (byte) | 1 (short) | 2 (int) | 3(long)
 * 第二个字节 第三位bit 表示是否需要掩码 0 为不需要 1 为需要
 * 第二个字节 低五位 为掩码长度 如果上面设置需要掩码,则掩码长度为2 ^ 5 - 1,同时随机生成2^5-1的掩码数组,数据为0-127之间的随机数
 * 后续读取,如果设置了掩码,则第一次读取为读取掩码数据,其次再根据长度类型,读取出对应的长度,如果没有设置掩码,则直接根据长度类型读取对应长度即可
 * 然后根据读取到的长度,去读取body,如果进行了掩码,则对body中每个字节进行掩码即可 如 body[i] ^= maskArray[i % maskArray.length];
 */

编写我们的解编码器

package com.lhstack.bio.codec.bs;

import com.lhstack.bio.channel.ChannelHandlerContext;
import com.lhstack.bio.codec.ByteToMessageCodec;
import io.netty.buffer.ByteBuf;

import java.util.List;
import java.util.Random;

/**
 * @author lhstack
 * 消息编解码器,用于客户端与服务端通讯 client / server
 * 自定义协议
 * 第一个字节 0(必须为0) 000(0-7 区间消息类型 ping,pong,binary,text,stream) 0000(扩展类型,自定义,0-15区间)
 * 第二个字节 高两位 消息长度类型 0 (byte) | 1 (short) | 2 (int) | 3(long)
 * 第二个字节 第三位bit 表示是否需要掩码 0 为不需要 1 为需要
 * 第二个字节 低五位 为掩码长度 如果上面设置需要掩码,则掩码长度为2 ^ 5 - 1,同时随机生成2^5-1的掩码数组,数据为0-127之间的随机数
 * 后续读取,如果设置了掩码,则第一次读取为读取掩码数据,其次再根据长度类型,读取出对应的长度,如果没有设置掩码,则直接根据长度类型读取对应长度即可
 * 然后根据读取到的长度,去读取body,如果进行了掩码,则对body中每个字节进行掩码即可 如 body[i] ^= maskArray[i % maskArray.length];
 */
public class CsMessageCodec extends ByteToMessageCodec<Message> {
    

    enum State{
    
        //消息类型
        MESSAGE_TYPE,
        //消息长度
        LENGTH,
        //消息掩码
        MASK,
        READ_LENGTH,
        BODY
    }

    private State state = State.MESSAGE_TYPE;

    private int length;

    /**
     * 长度类型 0 byte 1 short 2 int 3 long
     */
    private int lengthType;

    private int maskLength;

    /**
     * 消息类型
     */
    private MessageType messageType;

    /**
     * 扩展类型
     */
    private int extType;

    /**
     * 是否进行了掩码
     */
    private boolean isMask;

    private byte[] maskArray;

    private static final Random RANDOM = new Random();

    @Override
    protected void messageEncoder(ChannelHandlerContext ctx, Message msg, ByteBuf buf) throws Exception {
    
        int messageType = msg.getMessageType().getType();
        byte b0 = 0;
        //0~8 & 0x07 == 0~8 & 0b00000111 << 4 ==> (0~8 << 4 & 0b01110000)
        //b0 | (messageType & 0x07) << 4 = 00000000 | (0~8 << 4 & 0b01110000)
        b0 |= (messageType & 0x07) << 4;
        //补齐后四位
        b0 |= (msg.getExtType() & 0x0f);
        buf.writeByte(b0);
        byte b1 = 0;
        //记录内容长度
        int bodyLength = msg.getBody().length;
        //获取长度类型
        int lengthType = fetchLengthType(bodyLength);
        //写入长度类型
        b1 |= (lengthType & 0x03) << 6;
        //判断是否需要mask
        b1 |= msg.isMask() ? 1 << 5 : 0;
        //定义maskArray
        byte[] maskArray = new byte[0];
        int maskLength = 0;
        if(msg.isMask()){
    
            maskLength = RANDOM.nextInt(30) + 1;
            maskArray = new byte[maskLength];
            for (int i = 0; i < maskArray.length; i++) {
    
                maskArray[i] = (byte) RANDOM.nextInt(127);
            }
            b1 |= maskLength & 0x1f;
        }
        buf.writeByte(b1);
        if(msg.isMask()){
    
            buf.writeBytes(maskArray);
        }
        //根据内容类型,输出对应内容长度
        switch (lengthType){
    
            case 0:{
    
                buf.writeByte(bodyLength);
            }break;
            case 1:{
    
                buf.writeShort(bodyLength);
            }break;
            case 2:{
    
                buf.writeInt(bodyLength);
            }break;
            case 3:{
    
                buf.writeLong(bodyLength);
            }break;
            default:{
    

            }
        }
        byte[] body = msg.getBody();
        //是否进行了mask
        if(msg.isMask()){
    
            for (int i = 0; i < body.length; i++) {
    
                body[i] = (byte) (body[i] ^ maskArray[i % maskLength]);
            }
        }
        buf.writeBytes(body);

    }

    /**
     * 获取消息长度类型
     * @param length
     * @return
     */
    private int fetchLengthType(int length) {
    
        if(length <= Byte.MAX_VALUE){
    
            return 0;
        }else if(length < Short.MAX_VALUE){
    
            return 1;
        }else if(length < Integer.MAX_VALUE){
    
            return 2;
        }
        return 3;
    }

    @Override
    protected void messageDecoder(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
    
        //这里要循环读取,避免沾包,没有完整的读取完数据
        while(buf.isReadable()){
    
            if(this.state == State.MESSAGE_TYPE){
    
                if(!buf.isReadable()){
    
                    return ;
                }
                byte b = buf.readByte();
                if(!messageValid(b)){
    
                    buf.skipBytes(0);
                    return ;
                }
                this.messageType = MessageType.valueOf(touchMessageType(b));
                this.extType = touchMessageExtType(b);
                //设置状态为读取长度
                this.state = State.LENGTH;
            }
            //处理长度
            if(this.state == State.LENGTH){
    
                if(!buf.isReadable()){
    
                    return ;
                }
                byte b = buf.readByte();
                this.lengthType = touchLengthType(b);
                if(isMask(b)){
    
                    this.maskLength = touchMaskLength(b);
                    //设置消息需要mask
                    this.isMask = true;
                    this.state = State.MASK;
                }else{
    
                    this.state = State.READ_LENGTH;
                }
            }
            /**
             * 处理掩码
             */
            if(this.state == State.MASK){
    
                if(buf.isReadable() && buf.readableBytes() >= this.maskLength){
    
                    this.maskArray = new byte[this.maskLength];
                    buf.readBytes(this.maskArray);
                    this.state = State.READ_LENGTH;
                }
            }
            if(this.state == State.READ_LENGTH){
    
                //根据消息长度类型,获取消息的长度
                switch (this.lengthType){
    
                    case 0:{
    
                        if(buf.isReadable()){
    
                            this.length = buf.readByte();
                            this.state = State.BODY;
                        }else{
    
                            return ;
                        }
                    }break;
                    case 1:{
    
                        if(buf.readableBytes() >= 2){
    
                            this.length = buf.readShort();
                            this.state = State.BODY;
                        }else{
    
                            return ;
                        }
                    }break;
                    case 2:{
    
                        if(buf.readableBytes() >= 4){
    
                            this.length = buf.readInt();
                            this.state = State.BODY;
                        }else{
    
                            return ;
                        }
                    }break;
                    case 3:{
    
                        if(buf.readableBytes() >= 8){
    
                            this.length = (int) buf.readLong();
                            this.state = State.BODY;
                        }else{
    
                            return ;
                        }
                    }break;
                    default:{
    

                    }
                }
            }
            if(this.state == State.BODY){
    
                if(buf.readableBytes() >= this.length){
    
                    //读取消息体
                    byte[] body = new byte[this.length];
                    buf.readBytes(body);
                    //是否进行掩码
                    if(this.isMask){
    
                        int maskLength = this.maskArray.length;
                        for (int i = 0; i < body.length; i++) {
    
                            body[i] = (byte) (body[i] ^ this.maskArray[i % maskLength]);
                        }
                    }
                    switch (messageType){
    
                        case TEXT:out.add(new TextMessage(this.extType,new String(body),this.isMask));break;
                        case PING:out.add(new PingMessage(this.extType));break;
                        case PONG:out.add(new PongMessage(this.extType));break;
                        case BINARY:out.add(new BinaryMessage(this.extType,body,this.isMask));break;
                        case STREAM:out.add(new StreamMessage(this.extType,body,this.isMask));break;
                        default:{
    

                        }
                    }
                    this.state = State.MESSAGE_TYPE;
                }
            }
        }
    }

    /**
     * 0x1f = 0b00011111 == 31
     * @param b
     * @return
     */
    private int touchMaskLength(byte b) {
    
        return b & 0x1f;
    }

    /**
     *  0x20 = 0b00100000 = 32
     * @param b
     * @return
     */
    private boolean isMask(byte b) {
    
        return (b & 0x20) == 0x20;
    }

    /**
     * 提取长度类型 0 byte 1 short 2 int 3 long
     * 0b11111111 >>> 6 == 0b00000011
     * @param b
     * @return
     */
    private int touchLengthType(byte b) {
    
        return b >>> 6;
    }

    /**
     * 提取扩展协议 0-15 0x0f == 15 == 0b00001111
     * @param b
     * @return
     */
    private int touchMessageExtType(byte b) {
    
        return b & 0x0f;
    }

    /**
     * 获取消息类型 0 - 7
     * 0x70 = 16 * 7 + 0 = 112 = 0b01110000
     *
     * @param b
     * @return
     */
    private int touchMessageType(byte b) {
    
        return (b & 0x70) >>> 4;
    }

    /**
     * 验证第一位bit是否为0 ,0x80 = 128 & b == 128 || 0
     * @param b
     * @return
     */
    private boolean messageValid(byte b) {
    
        return (b & 0x80) == 0;
    }
}


对应的消息对象

BinaryMessage

package com.lhstack.bio.codec.bs;

/**
 * @author lhstack
 * 二进制
 */
public class BinaryMessage extends Message{
    

    public BinaryMessage(byte[] body) {
    
        this(0, body,false);
    }

    public BinaryMessage(int extType, byte[] body) {
    
        this(extType, body,false);
    }

    public BinaryMessage(byte[] body,boolean isMask) {
    
        this(0, body,isMask);
    }

    public BinaryMessage(int extType, byte[] body, boolean isMask) {
    
        super(MessageType.BINARY, extType, body, isMask);
    }
}

PingMessage

package com.lhstack.bio.codec.bs;

/**
 * @author lhstack
 * ping消息
 */
public class PingMessage extends Message{
    
    public PingMessage(int extType) {
    
        super(MessageType.PING, extType, new byte[0], false);
    }

    public PingMessage() {
    
        this(0);
    }
}

PongMessage

package com.lhstack.bio.codec.bs;

/**
 * @author lhstack
 * pong
 */
public class PongMessage extends Message{
    

    public PongMessage(int extType) {
    
        super(MessageType.PONG, extType, new byte[0], false);
    }

    public PongMessage() {
    
        this(0);
    }
}

StreamMessage

package com.lhstack.bio.codec.bs;

/**
 * @author lhstack
 */
public class StreamMessage extends Message{
    

    public StreamMessage(int extType, byte[] body) {
    
        this(extType, body,false);
    }

    public StreamMessage(byte[] body) {
    
        this(0, body,false);
    }

    public StreamMessage(byte[] body,boolean isMask) {
    
        this(0, body,isMask);
    }

    public StreamMessage(int extType, byte[] body, boolean isMask) {
    
        super(MessageType.STREAM, extType, body, isMask);
    }
}

TextMessage

package com.lhstack.bio.codec.bs;

import java.nio.charset.StandardCharsets;

/**
 * @author lhstack
 * text文本消息
 */
public class TextMessage extends Message{
    

    private final String text;

    public TextMessage(String msg) {
    
        this(msg,false);
    }

    public TextMessage(String msg, boolean isMask) {
    
        this(0, msg, isMask);
    }

    public TextMessage(int extType, String msg) {
    
        this(extType, msg,false);
    }

    public TextMessage(int extType, String msg, boolean isMask) {
    
        super(MessageType.TEXT, extType, msg.getBytes(StandardCharsets.UTF_8), isMask);
        this.text = msg;
    }

    public String text(){
    
        return text;
    }

    @Override
    public String toString() {
    
        return "TextMessage{" +
                "text='" + text + '\'' +
                '}';
    }
}

客户端与服务端进行通讯测试

服务端代码如下

public static void csMessageServer() throws Exception {
    
        ServerBootStrap serverBootStrap = new ServerBootStrap(8080);
        serverBootStrap.group(Executors.newFixedThreadPool(200))
                .handler(new ChannelInitializeHandler() {
    
                    @Override
                    public void initializer(Channel ch) throws Exception {
    
                        ch.pipeline()
                                .addLast(new CsMessageCodec())
                                .addLast(new SimpleChannelAdapterHandler<Message>() {
    
                                    @Override
                                    public void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
    
                                        System.out.println(msg);
                                        System.out.println(new String(msg.getBody()));
                                        ctx.writeAndFlush(new Message(1,0,"hello world".getBytes(StandardCharsets.UTF_8),true));
                                    }

                                    @Override
                                    public void exceptionCatch(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
    
                                        throwable.printStackTrace();
                                    }
                                });
                    }
                });
        serverBootStrap.start();
    }

客户端代码如下

public static void csMessageClient() throws Exception {
    
        BootStrap bootStrap = new BootStrap();
        Channel channel = bootStrap.handler(new ChannelInitializeHandler() {
    
            @Override
            public void initializer(Channel ch) throws Exception {
    
                ch.pipeline()
                        .addLast(new CsMessageCodec())
                        .addLast(new SimpleChannelAdapterHandler<Message>() {
    
                            @Override
                            public void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
    
                                System.out.println(new String(msg.getBody()));
                                ctx.writeAndFlush(new Message(1,1,("hello world,当前时间是: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).getBytes(StandardCharsets.UTF_8),true));
                            }

                            @Override
                            public void exceptionCatch(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
    
                                throwable.printStackTrace();
                            }
                        });
            }
        }).connect(new InetSocketAddress("localhost", 8080));
        channel.writeAndFlush(new Message(1,1,("hello world,当前时间是: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).getBytes(StandardCharsets.UTF_8),true));
    }

运行服务端和客户端,查看效果如下

服务端效果如下
在这里插入图片描述
客户端效果如下
在这里插入图片描述
这一期,我实现了自定义协议,同时通过设置mask的方式保证数据是否是明文还是密文传输,保证数据安全性,下一期,我将实现多个客户端之间互相通讯

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_42413011/article/details/114852932

智能推荐

Spring Data Moore-SR6_咔啡的博客-程序员宅基地

OverviewLearnSpring Data’s mission is to provide a familiar and consistent, Spring-based programming model for data access while still retaining the special traits of the underlying data store.It ...

char类型转int类型_qchar转int_峰子大疯子的博客-程序员宅基地

这里写自定义目录标题char类型转int类型代码示例原理char类型转int类型代码示例char a = '1';int x = a - '0';原理原理是用的ASCILL码。char类型减去一个char类型的’0’就可以转成int类型。比如这里a的值为1,ascill码值是49,减一个’0’,他的ascill值为48,相减得1,正好是1-0的结果,所以可以正好转成int类型。有错望您指出。..._qchar转int

STM32cubeMX硬件IIC问题_huhuolianmeng的博客-程序员宅基地

注意写数据的时候#if 1printf("\r\n***************I2C Example*******************************\r\n");printf("WriteBuffer\r\n");for(i=0; i<256; i++){ WriteBuffer[i]=i; /* WriteBuffer init */ printf("0x%02X ",WriteBuffer[i]);}/* wrinte date to EEPROM *

CSS3动画之平移、旋转、缩放、倾斜、透视_半粒糖_Qy的博客-程序员宅基地

@keyframes XXX {//动画名 from{//动画来的起点 //----------------transform 转换--------------------- //translate 平移 transform: translateY(500px);//Y轴平移 transform: translateX(500px);//Y轴平移 transform: translate(50px,50px);//X,Y轴平移 transform: translate3d(x,y,z

yum安装nginx时报错:No package nginx available. Error: Nothing to do_yum虚拟机nothingtodo_UpUpUpUpUpUpUp的博客-程序员宅基地

在安装nginx时出现以下错误:[root@centos6-1 ~]# yum install nginx -yLoaded plugins: fastestmirror, refresh-packagekit, securitySetting up Install ProcessLoading mirror speeds from cached hostfile * base: ft..._yum虚拟机nothingtodo

nginx配置1:借助Nginx搭建反向代理服务器与缓存静态文件_github.com/starRTC的博客-程序员宅基地

修改配置文件nginx.conf  (1)进程数与每个进程的最大连接数:    •nginx进程数,建议设置为等于CPU总核心数    •单个进程最大连接数,那么该服务器的最大连接数=连接数*进程数  (2)Nginx的基本配置:    •监听端口一般都为http端口:80;    •域名可以有多个,用空格隔开:例如 server_name www.

随便推点

计算机c语言学生成绩管理作业,C语言实现简单的学生成绩管理系统_住范儿石乐天的博客-程序员宅基地

最近在问答上帮提问者回答问题,有遇到求C语言实现学生管理成绩系统,需求非常简单,就顺手码了下代码。我觉得这种比较小的系统,可以收录起来,做一个C语言基础学习目录也是不错的主意。因为当时的问题已经找不到了,我就回想着把问题的大致说一下。一 题目简单实现一个学生成绩系统,一个学生包含学号,姓名,年龄,数学成绩,计算机成绩,总成绩六项信息,学号唯一,姓名可以重名。数据格式如下:学号姓名年龄数学计算机总分..._学生成绩管理系统c语言作业

base64上传oss_阿里云对象存储 OSS-base64 上传图片_沼泽无它的博客-程序员宅基地

public function uploads(Request $request){$pictrue = ''; // 获取图片转base64的字符串$result = [];// 转化base64编码图片 jpeg、jpg、png、bmp、gifif(preg_match('/^(data:\s*image\/(\w+);base64,)/', $pictrue, $result)) {$typ..._阿里云 oss base64上传

BugkuCTF-Reverse题入门逆向多解法_bugkuctf reverse_彬彬有礼am_03的博客-程序员宅基地

补充:IDA里面十六进制转字符的快捷键:R方法一第一步PEID,这里可以看出程序是32位的查壳:(无壳)拖进IDA32分析:先查看伪码:F5找到main函数发现有许多 mov 指令,66H 是 ‘f’,6CH 是 ‘l’,推测这就是 flag 的 ASCII 码值。得到flag:flag{Re_1s_S0_C0OL}方法二也可以用动态调试的方法。动态调试这就需要用到 OD。先下一个断点,然后单步运行,看看栈里的变化。例如,我就找个 0x00401475 的地址。在 OD _bugkuctf reverse

Unity3D 显示FPS的脚本_VR技术小光的博客-程序员宅基地

public class FPS : MonoBehaviour{ public static float f_Fps; public float f_UpdateInterval = 0.5f; //每个0.5秒刷新一次 private float f_LastInterval; //游戏时间 private int i_Frames = 0;//帧数...

centos8 tab自动补全命令_centos8 table补齐_行路默默的博客-程序员宅基地

安装centos8 tab自动补全命令安装 bash-completion如果还没有source /etc/profile.d/bash_completion.sh_centos8 table补齐

dremio整合minio_dremioclientauthmiddlewarefactory_木卫二号Coding的博客-程序员宅基地

Add Data LakeGeneralname xxxAWS Access Key --> 勾选AWS Access Key --> minio 账号AWS Access Secret --> minio 密码Bukets oss-test 指定自己minio的buketsAdvanced OptionsEnable asynchronous access when possible --> 勾选Enable compatibility mo_dremioclientauthmiddlewarefactory

推荐文章

热门文章

相关标签