源码分析mycat1.6之mysql通信协议篇之COM_QUERY(SELECT语句报文解析) - Go语言中文社区

源码分析mycat1.6之mysql通信协议篇之COM_QUERY(SELECT语句报文解析)


本文重点分析 COM_QUERY 命令,关注select命令的发送与结果集响应协议解析。

相关协议定义请参考:https://dev.mysql.com/doc/internals/en/com-query.html

请求响应报文:https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-COM_QUERY_Response

源码解析示例代码:https://git.oschina.net/zhcsoft/StudyDemo 

所在源码包路径:persistent.prestige.console.mysql,启动类:MysqlClient。

1、客户端命令协议

COM_QUERY:

COM_QUERY is used to send the server a text-based query that is executed immediately.

The server replies to a COM_QUERY packet with a COM_QUERY Response.

The length of the query-string is a taken from the packet length - 1.

Payload

1              [03] COM_QUERY
string[EOF]    the query the server shall execute

Fields

  • command_id (1) -- 0x03 COM_QUERY

  • query (string.EOF) -- query_text

其中query(string.EOF),不包括命令结尾的;分号。

2、服务端查询命令的响应

COM_QUERY_Response:

响应包的4种情况:

  1. 如果执行错误,比如SQL语法错误,返回ERR包
  2. 如果执行成功,但没有查到任何数据,返回OK包
  3. 如果客户端执行load data local infile 'filename' into table <table>,则返回LOCAL_INFILE_REQUEST。

  1. 如果返回结果集,则下发包为Resultset,本文的重点,解析ResultSet报文。

 

2.1、ProtocolText::ResultSet 文本协议包解析

It is made up of two parts:

  • the column definitions

  • the rows

which consist of a sequence of packets.

The column definitions part starts with a packet containing the column-count, 

followed by as many Column Definition packets as there are columns and terminated by an EOF_Packet

packet if the CLIENT_DEPRECATE_EOF capability flag is not set.

Each row is a packet, too. The rows are terminated by another EOF_Packet. In case the query could generate the column-definition,

 but generating the rows afterwards failed, a ERR_Packet may be sent instead of the last EOF_Packet.

ResultSet响应包结构如下:

  • 第一个包:列长度包,使用一个LengthEncodedInteger类型来存储列的长度;包括报文头(PacketLenth + 序列号 4字节) + LengthEncodedInteger。
  • 紧跟着就是n个字段描述包,每一个字段描述就是一个包(Protocol::ColumnDefinition
    所有字段信息描述后,会发送一个EOF包或OK包作为字段定义与数据(Row)的分隔符号。取决于是否设置了CLIENT_DEPRECATE_EOF标记,如果设置了,返回的是OK包,5.7后都是返回OK包。
  • 接下来是行数据包,每一行一个数据包,也包括包头与报文体。
  • 最后的结束包,同样可能是EOF或OK包

一个TCP响应包,里面可以包含多个mysql协议包。

 

2.1.1 字段描述

1、字段类型

Table 14.4 Column Types:

Table Column Type Hex Value Notes
Protocol::MYSQL_TYPE_DECIMAL 0x00 Implemented by ProtocolBinary::MYSQL_TYPE_DECIMAL
Protocol::MYSQL_TYPE_TINY 0x01 Implemented by ProtocolBinary::MYSQL_TYPE_TINY
Protocol::MYSQL_TYPE_SHORT 0x02 Implemented by ProtocolBinary::MYSQL_TYPE_SHORT
Protocol::MYSQL_TYPE_LONG 0x03 Implemented by ProtocolBinary::MYSQL_TYPE_LONG
Protocol::MYSQL_TYPE_FLOAT 0x04 Implemented by ProtocolBinary::MYSQL_TYPE_FLOAT
Protocol::MYSQL_TYPE_DOUBLE 0x05 Implemented by ProtocolBinary::MYSQL_TYPE_DOUBLE
Protocol::MYSQL_TYPE_NULL 0x06 Implemented by ProtocolBinary::MYSQL_TYPE_NULL
Protocol::MYSQL_TYPE_TIMESTAMP 0x07 Implemented by ProtocolBinary::MYSQL_TYPE_TIMESTAMP
Protocol::MYSQL_TYPE_LONGLONG 0x08 Implemented by ProtocolBinary::MYSQL_TYPE_LONGLONG
Protocol::MYSQL_TYPE_INT24 0x09 Implemented by ProtocolBinary::MYSQL_TYPE_INT24
Protocol::MYSQL_TYPE_DATE 0x0a Implemented by ProtocolBinary::MYSQL_TYPE_DATE
Protocol::MYSQL_TYPE_TIME 0x0b Implemented by ProtocolBinary::MYSQL_TYPE_TIME
Protocol::MYSQL_TYPE_DATETIME 0x0c Implemented by ProtocolBinary::MYSQL_TYPE_DATETIME
Protocol::MYSQL_TYPE_YEAR 0x0d Implemented by ProtocolBinary::MYSQL_TYPE_YEAR
Protocol::MYSQL_TYPE_NEWDATE [a] 0x0e see Protocol::MYSQL_TYPE_DATE
Protocol::MYSQL_TYPE_VARCHAR 0x0f Implemented by ProtocolBinary::MYSQL_TYPE_VARCHAR
Protocol::MYSQL_TYPE_BIT 0x10 Implemented by ProtocolBinary::MYSQL_TYPE_BIT
Protocol::MYSQL_TYPE_TIMESTAMP2 [a] 0x11 see Protocol::MYSQL_TYPE_TIMESTAMP
Protocol::MYSQL_TYPE_DATETIME2 [a] 0x12 see Protocol::MYSQL_TYPE_DATETIME
Protocol::MYSQL_TYPE_TIME2 [a] 0x13 see Protocol::MYSQL_TYPE_TIME
Protocol::MYSQL_TYPE_NEWDECIMAL 0xf6 Implemented by ProtocolBinary::MYSQL_TYPE_NEWDECIMAL
Protocol::MYSQL_TYPE_ENUM 0xf7 Implemented by ProtocolBinary::MYSQL_TYPE_ENUM
Protocol::MYSQL_TYPE_SET 0xf8 Implemented by ProtocolBinary::MYSQL_TYPE_SET
Protocol::MYSQL_TYPE_TINY_BLOB 0xf9 Implemented by ProtocolBinary::MYSQL_TYPE_TINY_BLOB
Protocol::MYSQL_TYPE_MEDIUM_BLOB 0xfa Implemented by ProtocolBinary::MYSQL_TYPE_MEDIUM_BLOB
Protocol::MYSQL_TYPE_LONG_BLOB 0xfb Implemented by ProtocolBinary::MYSQL_TYPE_LONG_BLOB
Protocol::MYSQL_TYPE_BLOB 0xfc Implemented by ProtocolBinary::MYSQL_TYPE_BLOB
Protocol::MYSQL_TYPE_VAR_STRING 0xfd Implemented by ProtocolBinary::MYSQL_TYPE_VAR_STRING
Protocol::MYSQL_TYPE_STRING 0xfe Implemented by ProtocolBinary::MYSQL_TYPE_STRING
Protocol::MYSQL_TYPE_GEOMETRY 0xff

 

[a] Internal to MySQL Server. Not used in ProtocolBinary::* nor ProtocolText::*.

2、字段描述协议

Protocol::ColumnDefinition:

if CLIENT_PROTOCOL_41 is set Protocol::ColumnDefinition41 is used, Protocol::ColumnDefinition320 otherwise

Protocol::ColumnDefinition41:

Column Definition

Payload

lenenc_str     catalog
lenenc_str     schema
lenenc_str     table
lenenc_str     org_table
lenenc_str     name
lenenc_str     org_name
lenenc_int     length of fixed-length fields [0c]
2              character set
4              column length
1              type
2              flags
1              decimals
2              filler [00] [00]
  if command was COM_FIELD_LIST {
lenenc_int     length of default-values
string[$len]   default values
  }

Implemented By

Protocol::send_result_set_metadata()

Fields

  • catalog (lenenc_str) -- catalog (always "def")        

  • schema (lenenc_str) -- schema-name                                 

  • table (lenenc_str) -- virtual table-name,  别名

  • org_table (lenenc_str) -- physical table-name ,数据库中的表名

  • name (lenenc_str) -- virtual column name,   字段别名

  • org_name (lenenc_str) -- physical column name  数据库字段名

  • next_length (lenenc_int) -- length of the following fields (always 0x0c)

  • character_set (2) -- is the column character set and is defined in Protocol::CharacterSet.

  • column_length (4) -- maximum length of the field

  • column_type (1) -- type of the column as defined in Column Type

  • flags (2) -- flags

  • decimals (1) -- max shown decimal digits

    • 0x00 for integers and static strings

    • 0x1f for dynamic strings, double, float

    • 0x00 to 0x51 for decimals

Note

decimals and column_length can be used for text-output formatting.

 

2.1.2 Text Resultset Row

A row with the data for each column.

1、每一行的数据,全部转换为字符串进行传输,使用LengthEncodedString进行编码,每一行的数据个数有column count决定。

2、特别注意,如果字段值为空(null)的话,那么该值就是0xfb,并不是一个LengthEncodedString。

3、动手编写解析程序

关于COM_QUERY请求及响应协议就学习到这了,暂未讲解 LOAD_FILE_REQUEST,接下来实战解析 COM_QUERY包 。在尝试解析协议之前,建议用抓包工具,与mysql客户端先发送一条select查询语句,看一下服务端与客户端的交互数据报文,截图如下:

核心源码解析如下:

3.1 ResultSetPacket ResultSet 协议包解析

package persistent.prestige.console.mysql.protocol;

import java.util.ArrayList;
import java.util.List;

import persistent.prestige.console.mysql.connection.Connection;

/**
 * 注:本次解析,重在将select 查询出来的数据,使用List<Object[]>返回,甚至转换为List<Bean>
 * 
 * @author dingwei2
 *
 */
@SuppressWarnings("serial")
public class ResultSetPacket extends Packet {

    private static final int STATUS_NONE = 0; //未开始解析
    private static final int STATUS_COLUMN = 1;//列信息解析中

    private static final int STATUS_COLUMN_END = 2;//列信息解析完成


    private static final int STATUS_ROWDATA = 4;//数据解析中

    private static final int STATUS_END = 8;    //包解析结束


    private Connection conn;

    /** 列的长度 */
    private int columnCount;
    private List<ColumnDefinition41Packet> columnDefinition41Packets;
    private List<Object[]> rowDatas;

    private int status; // 0:未开始;1: 解析column definition;2:解析rowdata中 ,3:结束

    /** 响应包类型   1:OK包;2:Error包;3:LoadDataFile包;4:ResultSetData包*/
    private int responseType;

    public ResultSetPacket(Connection conn) {
        this.conn = conn;
        this.rowDatas = new ArrayList<Object[]>();
//        this.columnCount = columnCount;
//        columnDefinition41Packets = new ArrayList<ColumnDefinition41Packet>(columnCount);
    }

    /**
     * 由于是演示代码,内存使用的是堆内存,故内存的管理交给了垃圾回收器
     * @param msg
     */
    public void read(MysqlMessage msg) {
        if(responseType < STATUS_COLUMN ) {//说明该包还是第一次解析,需要判断响应包的类型
            int packetLen = msg.getPacketLength();
            byte packetSeq = msg.getPacketSeq();
            short pType = msg.getPTypeByFrom1Byte();
            System.out.println("数据包类型:" + pType + ",数据实体长度:" + packetLen);

            if(pType == 0xFf) { // Error Packet
                ErrorPacket errorPacket = ErrorPacket.newInstance(msg, packetSeq, packetLen);
                System.out.println(errorPacket);
                conn.endCmd();

                this.responseType = 2;
                this.status = STATUS_END; //包解析结束
                return;
            } else if(pType == 0) { //OK Packet,,目前这里发的是EOF包
                OkPacket ok = OkPacket.newInstance(msg, packetSeq, packetLen);
                System.err.println(ok); 

                conn.endCmd();
                this.responseType = 1;
                this.status = STATUS_END; //包解析结束
                return;
            } else if(pType == 0xFB) { // load_data_request 包
                conn.endCmd();
                this.responseType = 3;
                this.status = STATUS_END; //包解析结束
                return;
            } else {

                this.responseType = 4;

                //判断是否是LengthCodeInt类型
                try {
                    long columnCount = msg.getBinaryLengthCode();
                    System.out.println("字段长度:" + columnCount);
                    this.columnCount = (int) columnCount;
                    this.columnDefinition41Packets = new ArrayList<ColumnDefinition41Packet>(this.columnCount);
                    this.status = STATUS_COLUMN; //column definition 解析中

                } catch (UnsupportedOperationException e) {
                    System.out.println("不是一个合法的LengthCodeBinary包");
                    conn.endCmd();
                    this.responseType = 4;
                    this.status = STATUS_END; //包解析结束
                    return;
                }

            }
        }


        //开始包的解析
        if(status == STATUS_COLUMN) { //列信息解析
            int i = 0;

            while (msg.hasRemaining() && i++ < this.columnCount) {
                System.out.println("正在解析第" + (this.columnDefinition41Packets.size() + 1 ) + "列");
                this.columnDefinition41Packets.add( ColumnDefinition41Packet.newInstance(msg, false) );    
            }

            if( this.columnDefinition41Packets.size() < this.columnCount) {  //列描述包未全部解析完,待下次数据的到来
                return;
            }

            //列信息解析完,进入到 ResultData解析
            this.status = STATUS_COLUMN_END;//列信息解析完成后,会发送一个新的mysql数据包,故本方法就会结束,因为上层调用方只会传入一个完整的数据包

        } else if(status == STATUS_COLUMN_END ) { //这是一个OK包或EOF包,在这里,只需忽略掉这个包即可
//            while(msg.hasRemaining()) {
//                System.out.print(msg.byte2hex(msg.get()));
//            }
            this.status = STATUS_ROWDATA;
        } else if( status == STATUS_ROWDATA) {
            //需要判断该包是结束包,还是ResultData包
//            while(msg.hasRemaining()) {
//                System.out.print(msg.byte2hex(msg.get()));
//            }

            int packetLen = msg.getPacketLength();
            byte packetSeq = msg.getPacketSeq();
            short pType = msg.getPTypeByFrom1Byte();
            System.out.println("数据包类型:" + pType);

            if(pType == 0xFE) { //EOF 包
                msg.skipReadBytes(packetLen); //跳过协议头部和整个EOF包
                //整个解析结束
                this.status = STATUS_END;
            } else {
                System.out.println(msg.remaining());
                while(msg.hasRemaining()) {
                    rowDatas.add( ResultSetDataPacket.newInstance(columnDefinition41Packets, msg).values()  );
                }
            }


        }



    }


    public boolean isEnd() {
        return this.status == STATUS_END;
    }


    public int getColumnCount() {
        return columnCount;
    }

    public void setColumnCount(int columnCount) {
        this.columnCount = columnCount;
    }

    public List<ColumnDefinition41Packet> getColumnDefinition41Packets() {
        return columnDefinition41Packets;
    }

    public void setColumnDefinition41Packets(List<ColumnDefinition41Packet> columnDefinition41Packets) {
        this.columnDefinition41Packets = columnDefinition41Packets;
    }

    public List<Object[]> getRowDatas() {
        return rowDatas;
    }

    public void setRowDatas(List<Object[]> rowDatas) {
        this.rowDatas = rowDatas;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }



}

3.2 ColumnDefinition41Packet  列信息协议包解析

package persistent.prestige.console.mysql.protocol;

/**
 * 
 * @author dingwei2
 * lenenc_str     catalog
 * lenenc_str     schema
 * lenenc_str     table
 * lenenc_str     org_table
 * lenenc_str     name
 * lenenc_str     org_name
 * lenenc_int     length of fixed-length fields [0c]
 * 2              character set
 * 4              column length
 * 1              type
 * 2              flags
 * 1              decimals
 * 2              filler [00] [00]
 * if command was COM_FIELD_LIST {
 *   lenenc_int     length of default-values
 *   string[$len]   default values
 * }
 *
 */
public class ColumnDefinition41Packet extends Packet {
    /** */
    private String catalog;

    private String schema;

    /** 逻辑表名,别名,,也就是sql 语句中 as 后面的名称*/
    private String table;

    /** 数据库中的表名,俗称物理表名*/
    private String orgTable;

    private String name;

    private String orgName;

    private long fieldsLen; // lenenc_int     length of fixed-length fields [0c]

    private int characterSet; // 协议中 占用2字节

    private int columnLen;// 协议中 占用4字节

    private byte type; // 协议中占1字节,,如果是varchar类型的,话,为显示-3,因为byte的返回为-128 - 127;最高位8位为符合位

    private int flags; // 协议中占2字节

    private byte decimals; //协议中占1字节

    // 接下来两个字节的填充  0x00;


    //默认值长度
    //private long defaultValueLen;

    private String defaultValue;


    private ColumnDefinition41Packet() {}

    /**
     * 
     * @param msg
     * @param seq
     * @param packetLent
     * @return
     */
    public static final ColumnDefinition41Packet newInstance(MysqlMessage msg, boolean comFieldList) {
        msg.skipReadBytes(Packet.HEAD_LENGTH);
        ColumnDefinition41Packet packet = new ColumnDefinition41Packet();
        packet.setCatalog(  new String(msg.getStringLengthCode()) );
        packet.setSchema( new String(msg.getStringLengthCode()) );
        packet.setTable(new String(msg.getStringLengthCode()));
        packet.setOrgTable(new String(msg.getStringLengthCode()));
        packet.setName(new String(msg.getStringLengthCode()));
        packet.setOrgName(new String(msg.getStringLengthCode()));
        packet.setFieldsLen(msg.getBinaryLengthCode());
        packet.setCharacterSet(msg.getUB2());
        packet.setColumnLen(msg.getInt());
        packet.setType(msg.get());
        packet.setFlags(msg.getUB2());
        packet.setDecimals(msg.get());
        msg.skipReadBytes(2);//2个填充值 0x00;

        if(comFieldList) {
            packet.setDefaultValue(new String(msg.getStringLengthCode()));
        }

        System.out.println(packet);
        return packet;
    }


    public String getCatalog() {
        return catalog;
    }

    public void setCatalog(String catalog) {
        this.catalog = catalog;
    }

    public String getSchema() {
        return schema;
    }

    public void setSchema(String schema) {
        this.schema = schema;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public String getOrgTable() {
        return orgTable;
    }

    public void setOrgTable(String orgTable) {
        this.orgTable = orgTable;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getOrgName() {
        return orgName;
    }

    public void setOrgName(String orgName) {
        this.orgName = orgName;
    }

    public long getFieldsLen() {
        return fieldsLen;
    }

    public void setFieldsLen(long fieldsLen) {
        this.fieldsLen = fieldsLen;
    }

    public int getCharacterSet() {
        return characterSet;
    }

    public void setCharacterSet(int characterSet) {
        this.characterSet = characterSet;
    }

    public int getColumnLen() {
        return columnLen;
    }

    public void setColumnLen(int columnLen) {
        this.columnLen = columnLen;
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public int getFlags() {
        return flags;
    }

    public void setFlags(int flags) {
        this.flags = flags;
    }

    public byte getDecimals() {
        return decimals;
    }

    public void setDecimals(byte decimals) {
        this.decimals = decimals;
    }

//    public long getDefaultValueLen() {
//        return defaultValueLen;
//    }
//
//    public void setDefaultValueLen(long defaultValueLen) {
//        this.defaultValueLen = defaultValueLen;
//    }

    public String getDefaultValue() {
        return defaultValue;
    }

    public void setDefaultValue(String defaultValue) {
        this.defaultValue = defaultValue;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("n====Column Definition41 Info ====n")
          .append("catalog:").append(catalog).append("n")
          .append("schema:").append(schema).append("n")
          .append("table:").append(table).append("n")
          .append("orgTable:").append(orgTable).append("n")
          .append("name:").append(name).append("n")
          .append("orgName:").append(orgName).append("n")
          .append("fieldsLen:").append(fieldsLen).append("n")
          .append("characterSet:").append(characterSet).append("n")
          .append("columnLen:").append(columnLen).append("n")
          .append("type:").append(type).append("n")
          .append("flags:").append(Integer.toHexString(flags)).append("n")
          .append("decimals:").append(decimals);

        return sb.toString();
    }




}

3.3 Connection 客户端连接类

package persistent.prestige.console.mysql.connection;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;


public class Connection implements Runnable {

    private Integer connectionId;

    private String host;
    private int port;

    /** 底层socket通道 */
    private SocketChannel channel;

    /** 该连接是否通过认证 */
    private boolean auth = false;

    /** 是否解析了握手验证包 */
    private boolean handshake = false;

    private AtomicInteger seq = new AtomicInteger(0);

    //读缓存区,(累积缓存区),如果数据包不是一个完整的包,需要等待更多的数据到达。
    private ByteBuffer readerBuffer;

    /** 当前执行的命令*/
    private int cmd = 0;

    /** 命令执行时解析的包 */
    private Object cmdData;

    private boolean running = true;

    private IOHandler ioHandler;

    public Connection(String host, int port) {
        // TODO Auto-generated constructor stub
        this.host = host;
        this.port = port;
        ioHandler = new CmdHandler();
    }

    public void close() {
        this.running = false;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Selector selector = null;
        try {
            selector = Selector.open();
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_CONNECT);

            channel.connect(new InetSocketAddress(host, port));

            Set<SelectionKey> selOps = null;

            while (running) {
                int n = selector.select();
                selOps = selector.selectedKeys();
                if (selOps == null || selOps.isEmpty()) {
                    continue;
                }

                try {
                    for (Iterator<SelectionKey> it = selOps.iterator(); it.hasNext();) {
                        SelectionKey key = it.next();
                        if (!key.isValid()) {
                            key.cancel();
                        }

                        if (key.isReadable()) { // 可读
                            System.out.println("读事件触发");
                            SocketWR.doRead(key, this);

                        } else if (key.isWritable()) { // 可写
                            System.out.println("写事件触发");
                            SocketWR.doWrite(key, this);


                        } else if (key.isConnectable()) {
                            if (channel.isConnectionPending()) {
                                channel.finishConnect();
                                System.out.println("完成tcp连接");
                            }
                            channel.register(selector, SelectionKey.OP_READ);
                        }

                        it.remove();

                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                }

            }

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();

        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.out.println("链路关闭");
        }
    }

    public void endCmd() {
        this.cmd = 0;
        //this.cmdData = null;
    }

    public final byte getSeq() {
        int s = seq.getAndAdd(1);
        if (s >= 255) {
            synchronized (this) {
                if (s >= 255) {
                    seq.set(1);
                }
            }
        }
        return (byte) s;
    }

    public SelectableChannel channel() {
        return this.channel;
    }

    public boolean isAuth() {
        return this.auth;
    }

    public boolean isHandshake() {
        return handshake;
    }

    public void setHandshake(boolean handshake) {
        this.handshake = handshake;
    }

    public void setAuth(boolean auth) {
        this.auth = auth;
    }

    public ByteBuffer getReaderBuffer() {
        return readerBuffer;
    }

    public void setReaderBuffer(ByteBuffer readerBuffer) {
        this.readerBuffer = readerBuffer;
    }

    public int getCmd() {
        return cmd;
    }

    public void setCmd(int cmd) { //设置一个新的命令,同时清空上次命令的结果信息
        this.cmd = cmd;
        this.cmdData = null;
    }

    public Object getCmdData() {
        return cmdData;
    }

    public void setCmdData(Object cmdData) {
        this.cmdData = cmdData;
    }

    public IOHandler getIoHandler() {
        return ioHandler;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public Integer getConnectionId() {
        return connectionId;
    }

    public void setConnectionId(Integer connectionId) {
        this.connectionId = connectionId;
    }

}

代码的组织思路如下:

mysql连接基于请求-应答模型,同一时刻要么是客户端发送命令,服务端收到命令后,发送响应报文。故Connection可以看出是有状态的,比如COM_QUERY命令执行过程,由于服务端返回的响应包ResultSet数据量一般比较大,会超过MTU(1500字节),故一个响应,服务端会发送多个数据包给客户端,而且在解析ResultSet过程中,可以看出有如下几个步骤,解析列长度信息、列详细信息解析中,列详细信息解析完成,结果集(row)数据解析中,数据包解析完成。Connection维护目前解析的命令,已经解析结果数据包,协议的解析职责交给具体的协议数据包(ResultSetPacket),其内部维护解析步骤状态,体现单一职责设计原则。

源码解析示例代码:https://git.oschina.net/zhcsoft/StudyDemo 

包路径:persistent.prestige.console.mysql,启动类:MysqlClient。


本文详细分析了mysql通信协议ResultSet的报文结构,并实例模拟mysql客户端解析COM_QUERY,select命令的发送与结果集的解析。

 

 

 

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/prestigeding/article/details/70198164
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-07 15:48:47
  • 阅读 ( 959 )
  • 分类:数据库

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢