社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
本文重点分析 COM_QUERY 命令,关注select命令的发送与结果集响应协议解析。
相关协议定义请参考:https://dev.mysql.com/doc/internals/en/com-query.html
请求响应报文:https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-COM_QUERY_Response
源码解析示例代码:https://git.oschina.net/zhcsoft/StudyDemo
所在源码包路径:persistent.prestige.console.mysql,启动类:MysqlClient。
COM_QUERY
:
A COM_QUERY
is used to send the server a text-based query that is executed immediately.
The server replies to a COM_QUERY
packet with a COM_QUERY
Response.
The length of the query-string is a taken from the packet length - 1.
Payload
1 [03] COM_QUERY
string[EOF] the query the server shall execute
Fields
command_id (1) -- 0x03
COM_QUERY
query (string.EOF) -- query_text
其中query(string.EOF),不包括命令结尾的;分号。
COM_QUERY_Response:
响应包的4种情况:
It is made up of two parts:
the column definitions
the rows
which consist of a sequence of packets.
The column definitions part starts with a packet containing the column-count,
followed by as many Column Definition packets as there are columns and terminated by an EOF_Packet
.
packet if the CLIENT_DEPRECATE_EOF
capability flag is not set.
Each row is a packet, too. The rows are terminated by another EOF_Packet
. In case the query could generate the column-definition,
but generating the rows afterwards failed, a ERR_Packet
may be sent instead of the last EOF_Packet
.
ResultSet响应包结构如下:
Protocol::ColumnDefinition
)一个TCP响应包,里面可以包含多个mysql协议包。
1、字段类型
Table 14.4 Column Types:
Table Column Type | Hex Value | Notes |
---|---|---|
Protocol::MYSQL_TYPE_DECIMAL | 0x00 |
Implemented by ProtocolBinary::MYSQL_TYPE_DECIMAL |
Protocol::MYSQL_TYPE_TINY | 0x01 |
Implemented by ProtocolBinary::MYSQL_TYPE_TINY |
Protocol::MYSQL_TYPE_SHORT | 0x02 |
Implemented by ProtocolBinary::MYSQL_TYPE_SHORT |
Protocol::MYSQL_TYPE_LONG | 0x03 |
Implemented by ProtocolBinary::MYSQL_TYPE_LONG |
Protocol::MYSQL_TYPE_FLOAT | 0x04 |
Implemented by ProtocolBinary::MYSQL_TYPE_FLOAT |
Protocol::MYSQL_TYPE_DOUBLE | 0x05 |
Implemented by ProtocolBinary::MYSQL_TYPE_DOUBLE |
Protocol::MYSQL_TYPE_NULL | 0x06 |
Implemented by ProtocolBinary::MYSQL_TYPE_NULL |
Protocol::MYSQL_TYPE_TIMESTAMP | 0x07 |
Implemented by ProtocolBinary::MYSQL_TYPE_TIMESTAMP |
Protocol::MYSQL_TYPE_LONGLONG | 0x08 |
Implemented by ProtocolBinary::MYSQL_TYPE_LONGLONG |
Protocol::MYSQL_TYPE_INT24 | 0x09 |
Implemented by ProtocolBinary::MYSQL_TYPE_INT24 |
Protocol::MYSQL_TYPE_DATE | 0x0a |
Implemented by ProtocolBinary::MYSQL_TYPE_DATE |
Protocol::MYSQL_TYPE_TIME | 0x0b |
Implemented by ProtocolBinary::MYSQL_TYPE_TIME |
Protocol::MYSQL_TYPE_DATETIME | 0x0c |
Implemented by ProtocolBinary::MYSQL_TYPE_DATETIME |
Protocol::MYSQL_TYPE_YEAR | 0x0d |
Implemented by ProtocolBinary::MYSQL_TYPE_YEAR |
Protocol::MYSQL_TYPE_NEWDATE [a] | 0x0e |
see Protocol::MYSQL_TYPE_DATE |
Protocol::MYSQL_TYPE_VARCHAR | 0x0f |
Implemented by ProtocolBinary::MYSQL_TYPE_VARCHAR |
Protocol::MYSQL_TYPE_BIT | 0x10 |
Implemented by ProtocolBinary::MYSQL_TYPE_BIT |
Protocol::MYSQL_TYPE_TIMESTAMP2 [a] | 0x11 |
see Protocol::MYSQL_TYPE_TIMESTAMP |
Protocol::MYSQL_TYPE_DATETIME2 [a] | 0x12 |
see Protocol::MYSQL_TYPE_DATETIME |
Protocol::MYSQL_TYPE_TIME2 [a] | 0x13 |
see Protocol::MYSQL_TYPE_TIME |
Protocol::MYSQL_TYPE_NEWDECIMAL | 0xf6 |
Implemented by ProtocolBinary::MYSQL_TYPE_NEWDECIMAL |
Protocol::MYSQL_TYPE_ENUM | 0xf7 |
Implemented by ProtocolBinary::MYSQL_TYPE_ENUM |
Protocol::MYSQL_TYPE_SET | 0xf8 |
Implemented by ProtocolBinary::MYSQL_TYPE_SET |
Protocol::MYSQL_TYPE_TINY_BLOB | 0xf9 |
Implemented by ProtocolBinary::MYSQL_TYPE_TINY_BLOB |
Protocol::MYSQL_TYPE_MEDIUM_BLOB | 0xfa |
Implemented by ProtocolBinary::MYSQL_TYPE_MEDIUM_BLOB |
Protocol::MYSQL_TYPE_LONG_BLOB | 0xfb |
Implemented by ProtocolBinary::MYSQL_TYPE_LONG_BLOB |
Protocol::MYSQL_TYPE_BLOB | 0xfc |
Implemented by ProtocolBinary::MYSQL_TYPE_BLOB |
Protocol::MYSQL_TYPE_VAR_STRING | 0xfd |
Implemented by ProtocolBinary::MYSQL_TYPE_VAR_STRING |
Protocol::MYSQL_TYPE_STRING | 0xfe |
Implemented by ProtocolBinary::MYSQL_TYPE_STRING |
Protocol::MYSQL_TYPE_GEOMETRY | 0xff |
|
[a] Internal to MySQL Server. Not used in |
2、字段描述协议
Protocol::ColumnDefinition:
if CLIENT_PROTOCOL_41
is set Protocol::ColumnDefinition41
is used, Protocol::ColumnDefinition320
otherwise
Protocol::ColumnDefinition41:
Column Definition
Payload
lenenc_str catalog
lenenc_str schema
lenenc_str table
lenenc_str org_table
lenenc_str name
lenenc_str org_name
lenenc_int length of fixed-length fields [0c]
2 character set
4 column length
1 type
2 flags
1 decimals
2 filler [00] [00]
if command was COM_FIELD_LIST {
lenenc_int length of default-values
string[$len] default values
}
Implemented By
Protocol::send_result_set_metadata()
Fields
catalog (lenenc_str) -- catalog (always "def")
schema (lenenc_str) -- schema-name
table (lenenc_str) -- virtual table-name, 别名
org_table (lenenc_str) -- physical table-name ,数据库中的表名
name (lenenc_str) -- virtual column name, 字段别名
org_name (lenenc_str) -- physical column name 数据库字段名
next_length (lenenc_int) -- length of the following fields (always 0x0c
)
character_set (2) -- is the column character set and is defined in Protocol::CharacterSet
.
column_length (4) -- maximum length of the field
column_type (1) -- type of the column as defined in Column Type
flags (2) -- flags
decimals (1) -- max shown decimal digits
0x00
for integers and static strings
0x1f
for dynamic strings, double, float
0x00
to 0x51
for decimals
Note
decimals
and column_length
can be used for text-output formatting.
A row with the data for each column.
NULL
is sent as 0xfb
everything else is converted into a string and is sent as Protocol::LengthEncodedString
.
1、每一行的数据,全部转换为字符串进行传输,使用LengthEncodedString进行编码,每一行的数据个数有column count决定。
2、特别注意,如果字段值为空(null)的话,那么该值就是0xfb,并不是一个LengthEncodedString。
关于COM_QUERY请求及响应协议就学习到这了,暂未讲解 LOAD_FILE_REQUEST,接下来实战解析 COM_QUERY包 。在尝试解析协议之前,建议用抓包工具,与mysql客户端先发送一条select查询语句,看一下服务端与客户端的交互数据报文,截图如下:
核心源码解析如下:
package persistent.prestige.console.mysql.protocol;
import java.util.ArrayList;
import java.util.List;
import persistent.prestige.console.mysql.connection.Connection;
/**
* 注:本次解析,重在将select 查询出来的数据,使用List<Object[]>返回,甚至转换为List<Bean>
*
* @author dingwei2
*
*/
@SuppressWarnings("serial")
public class ResultSetPacket extends Packet {
private static final int STATUS_NONE = 0; //未开始解析
private static final int STATUS_COLUMN = 1;//列信息解析中
private static final int STATUS_COLUMN_END = 2;//列信息解析完成
private static final int STATUS_ROWDATA = 4;//数据解析中
private static final int STATUS_END = 8; //包解析结束
private Connection conn;
/** 列的长度 */
private int columnCount;
private List<ColumnDefinition41Packet> columnDefinition41Packets;
private List<Object[]> rowDatas;
private int status; // 0:未开始;1: 解析column definition;2:解析rowdata中 ,3:结束
/** 响应包类型 1:OK包;2:Error包;3:LoadDataFile包;4:ResultSetData包*/
private int responseType;
public ResultSetPacket(Connection conn) {
this.conn = conn;
this.rowDatas = new ArrayList<Object[]>();
// this.columnCount = columnCount;
// columnDefinition41Packets = new ArrayList<ColumnDefinition41Packet>(columnCount);
}
/**
* 由于是演示代码,内存使用的是堆内存,故内存的管理交给了垃圾回收器
* @param msg
*/
public void read(MysqlMessage msg) {
if(responseType < STATUS_COLUMN ) {//说明该包还是第一次解析,需要判断响应包的类型
int packetLen = msg.getPacketLength();
byte packetSeq = msg.getPacketSeq();
short pType = msg.getPTypeByFrom1Byte();
System.out.println("数据包类型:" + pType + ",数据实体长度:" + packetLen);
if(pType == 0xFf) { // Error Packet
ErrorPacket errorPacket = ErrorPacket.newInstance(msg, packetSeq, packetLen);
System.out.println(errorPacket);
conn.endCmd();
this.responseType = 2;
this.status = STATUS_END; //包解析结束
return;
} else if(pType == 0) { //OK Packet,,目前这里发的是EOF包
OkPacket ok = OkPacket.newInstance(msg, packetSeq, packetLen);
System.err.println(ok);
conn.endCmd();
this.responseType = 1;
this.status = STATUS_END; //包解析结束
return;
} else if(pType == 0xFB) { // load_data_request 包
conn.endCmd();
this.responseType = 3;
this.status = STATUS_END; //包解析结束
return;
} else {
this.responseType = 4;
//判断是否是LengthCodeInt类型
try {
long columnCount = msg.getBinaryLengthCode();
System.out.println("字段长度:" + columnCount);
this.columnCount = (int) columnCount;
this.columnDefinition41Packets = new ArrayList<ColumnDefinition41Packet>(this.columnCount);
this.status = STATUS_COLUMN; //column definition 解析中
} catch (UnsupportedOperationException e) {
System.out.println("不是一个合法的LengthCodeBinary包");
conn.endCmd();
this.responseType = 4;
this.status = STATUS_END; //包解析结束
return;
}
}
}
//开始包的解析
if(status == STATUS_COLUMN) { //列信息解析
int i = 0;
while (msg.hasRemaining() && i++ < this.columnCount) {
System.out.println("正在解析第" + (this.columnDefinition41Packets.size() + 1 ) + "列");
this.columnDefinition41Packets.add( ColumnDefinition41Packet.newInstance(msg, false) );
}
if( this.columnDefinition41Packets.size() < this.columnCount) { //列描述包未全部解析完,待下次数据的到来
return;
}
//列信息解析完,进入到 ResultData解析
this.status = STATUS_COLUMN_END;//列信息解析完成后,会发送一个新的mysql数据包,故本方法就会结束,因为上层调用方只会传入一个完整的数据包
} else if(status == STATUS_COLUMN_END ) { //这是一个OK包或EOF包,在这里,只需忽略掉这个包即可
// while(msg.hasRemaining()) {
// System.out.print(msg.byte2hex(msg.get()));
// }
this.status = STATUS_ROWDATA;
} else if( status == STATUS_ROWDATA) {
//需要判断该包是结束包,还是ResultData包
// while(msg.hasRemaining()) {
// System.out.print(msg.byte2hex(msg.get()));
// }
int packetLen = msg.getPacketLength();
byte packetSeq = msg.getPacketSeq();
short pType = msg.getPTypeByFrom1Byte();
System.out.println("数据包类型:" + pType);
if(pType == 0xFE) { //EOF 包
msg.skipReadBytes(packetLen); //跳过协议头部和整个EOF包
//整个解析结束
this.status = STATUS_END;
} else {
System.out.println(msg.remaining());
while(msg.hasRemaining()) {
rowDatas.add( ResultSetDataPacket.newInstance(columnDefinition41Packets, msg).values() );
}
}
}
}
public boolean isEnd() {
return this.status == STATUS_END;
}
public int getColumnCount() {
return columnCount;
}
public void setColumnCount(int columnCount) {
this.columnCount = columnCount;
}
public List<ColumnDefinition41Packet> getColumnDefinition41Packets() {
return columnDefinition41Packets;
}
public void setColumnDefinition41Packets(List<ColumnDefinition41Packet> columnDefinition41Packets) {
this.columnDefinition41Packets = columnDefinition41Packets;
}
public List<Object[]> getRowDatas() {
return rowDatas;
}
public void setRowDatas(List<Object[]> rowDatas) {
this.rowDatas = rowDatas;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
}
package persistent.prestige.console.mysql.protocol;
/**
*
* @author dingwei2
* lenenc_str catalog
* lenenc_str schema
* lenenc_str table
* lenenc_str org_table
* lenenc_str name
* lenenc_str org_name
* lenenc_int length of fixed-length fields [0c]
* 2 character set
* 4 column length
* 1 type
* 2 flags
* 1 decimals
* 2 filler [00] [00]
* if command was COM_FIELD_LIST {
* lenenc_int length of default-values
* string[$len] default values
* }
*
*/
public class ColumnDefinition41Packet extends Packet {
/** */
private String catalog;
private String schema;
/** 逻辑表名,别名,,也就是sql 语句中 as 后面的名称*/
private String table;
/** 数据库中的表名,俗称物理表名*/
private String orgTable;
private String name;
private String orgName;
private long fieldsLen; // lenenc_int length of fixed-length fields [0c]
private int characterSet; // 协议中 占用2字节
private int columnLen;// 协议中 占用4字节
private byte type; // 协议中占1字节,,如果是varchar类型的,话,为显示-3,因为byte的返回为-128 - 127;最高位8位为符合位
private int flags; // 协议中占2字节
private byte decimals; //协议中占1字节
// 接下来两个字节的填充 0x00;
//默认值长度
//private long defaultValueLen;
private String defaultValue;
private ColumnDefinition41Packet() {}
/**
*
* @param msg
* @param seq
* @param packetLent
* @return
*/
public static final ColumnDefinition41Packet newInstance(MysqlMessage msg, boolean comFieldList) {
msg.skipReadBytes(Packet.HEAD_LENGTH);
ColumnDefinition41Packet packet = new ColumnDefinition41Packet();
packet.setCatalog( new String(msg.getStringLengthCode()) );
packet.setSchema( new String(msg.getStringLengthCode()) );
packet.setTable(new String(msg.getStringLengthCode()));
packet.setOrgTable(new String(msg.getStringLengthCode()));
packet.setName(new String(msg.getStringLengthCode()));
packet.setOrgName(new String(msg.getStringLengthCode()));
packet.setFieldsLen(msg.getBinaryLengthCode());
packet.setCharacterSet(msg.getUB2());
packet.setColumnLen(msg.getInt());
packet.setType(msg.get());
packet.setFlags(msg.getUB2());
packet.setDecimals(msg.get());
msg.skipReadBytes(2);//2个填充值 0x00;
if(comFieldList) {
packet.setDefaultValue(new String(msg.getStringLengthCode()));
}
System.out.println(packet);
return packet;
}
public String getCatalog() {
return catalog;
}
public void setCatalog(String catalog) {
this.catalog = catalog;
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public String getOrgTable() {
return orgTable;
}
public void setOrgTable(String orgTable) {
this.orgTable = orgTable;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getOrgName() {
return orgName;
}
public void setOrgName(String orgName) {
this.orgName = orgName;
}
public long getFieldsLen() {
return fieldsLen;
}
public void setFieldsLen(long fieldsLen) {
this.fieldsLen = fieldsLen;
}
public int getCharacterSet() {
return characterSet;
}
public void setCharacterSet(int characterSet) {
this.characterSet = characterSet;
}
public int getColumnLen() {
return columnLen;
}
public void setColumnLen(int columnLen) {
this.columnLen = columnLen;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public int getFlags() {
return flags;
}
public void setFlags(int flags) {
this.flags = flags;
}
public byte getDecimals() {
return decimals;
}
public void setDecimals(byte decimals) {
this.decimals = decimals;
}
// public long getDefaultValueLen() {
// return defaultValueLen;
// }
//
// public void setDefaultValueLen(long defaultValueLen) {
// this.defaultValueLen = defaultValueLen;
// }
public String getDefaultValue() {
return defaultValue;
}
public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("n====Column Definition41 Info ====n")
.append("catalog:").append(catalog).append("n")
.append("schema:").append(schema).append("n")
.append("table:").append(table).append("n")
.append("orgTable:").append(orgTable).append("n")
.append("name:").append(name).append("n")
.append("orgName:").append(orgName).append("n")
.append("fieldsLen:").append(fieldsLen).append("n")
.append("characterSet:").append(characterSet).append("n")
.append("columnLen:").append(columnLen).append("n")
.append("type:").append(type).append("n")
.append("flags:").append(Integer.toHexString(flags)).append("n")
.append("decimals:").append(decimals);
return sb.toString();
}
}
package persistent.prestige.console.mysql.connection;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
public class Connection implements Runnable {
private Integer connectionId;
private String host;
private int port;
/** 底层socket通道 */
private SocketChannel channel;
/** 该连接是否通过认证 */
private boolean auth = false;
/** 是否解析了握手验证包 */
private boolean handshake = false;
private AtomicInteger seq = new AtomicInteger(0);
//读缓存区,(累积缓存区),如果数据包不是一个完整的包,需要等待更多的数据到达。
private ByteBuffer readerBuffer;
/** 当前执行的命令*/
private int cmd = 0;
/** 命令执行时解析的包 */
private Object cmdData;
private boolean running = true;
private IOHandler ioHandler;
public Connection(String host, int port) {
// TODO Auto-generated constructor stub
this.host = host;
this.port = port;
ioHandler = new CmdHandler();
}
public void close() {
this.running = false;
}
@Override
public void run() {
// TODO Auto-generated method stub
Selector selector = null;
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(new InetSocketAddress(host, port));
Set<SelectionKey> selOps = null;
while (running) {
int n = selector.select();
selOps = selector.selectedKeys();
if (selOps == null || selOps.isEmpty()) {
continue;
}
try {
for (Iterator<SelectionKey> it = selOps.iterator(); it.hasNext();) {
SelectionKey key = it.next();
if (!key.isValid()) {
key.cancel();
}
if (key.isReadable()) { // 可读
System.out.println("读事件触发");
SocketWR.doRead(key, this);
} else if (key.isWritable()) { // 可写
System.out.println("写事件触发");
SocketWR.doWrite(key, this);
} else if (key.isConnectable()) {
if (channel.isConnectionPending()) {
channel.finishConnect();
System.out.println("完成tcp连接");
}
channel.register(selector, SelectionKey.OP_READ);
}
it.remove();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("链路关闭");
}
}
public void endCmd() {
this.cmd = 0;
//this.cmdData = null;
}
public final byte getSeq() {
int s = seq.getAndAdd(1);
if (s >= 255) {
synchronized (this) {
if (s >= 255) {
seq.set(1);
}
}
}
return (byte) s;
}
public SelectableChannel channel() {
return this.channel;
}
public boolean isAuth() {
return this.auth;
}
public boolean isHandshake() {
return handshake;
}
public void setHandshake(boolean handshake) {
this.handshake = handshake;
}
public void setAuth(boolean auth) {
this.auth = auth;
}
public ByteBuffer getReaderBuffer() {
return readerBuffer;
}
public void setReaderBuffer(ByteBuffer readerBuffer) {
this.readerBuffer = readerBuffer;
}
public int getCmd() {
return cmd;
}
public void setCmd(int cmd) { //设置一个新的命令,同时清空上次命令的结果信息
this.cmd = cmd;
this.cmdData = null;
}
public Object getCmdData() {
return cmdData;
}
public void setCmdData(Object cmdData) {
this.cmdData = cmdData;
}
public IOHandler getIoHandler() {
return ioHandler;
}
public SocketChannel getChannel() {
return channel;
}
public Integer getConnectionId() {
return connectionId;
}
public void setConnectionId(Integer connectionId) {
this.connectionId = connectionId;
}
}
代码的组织思路如下:
mysql连接基于请求-应答模型,同一时刻要么是客户端发送命令,服务端收到命令后,发送响应报文。故Connection可以看出是有状态的,比如COM_QUERY命令执行过程,由于服务端返回的响应包ResultSet数据量一般比较大,会超过MTU(1500字节),故一个响应,服务端会发送多个数据包给客户端,而且在解析ResultSet过程中,可以看出有如下几个步骤,解析列长度信息、列详细信息解析中,列详细信息解析完成,结果集(row)数据解析中,数据包解析完成。Connection维护目前解析的命令,已经解析结果数据包,协议的解析职责交给具体的协议数据包(ResultSetPacket),其内部维护解析步骤状态,体现单一职责设计原则。
源码解析示例代码:https://git.oschina.net/zhcsoft/StudyDemo
包路径:persistent.prestige.console.mysql,启动类:MysqlClient。
本文详细分析了mysql通信协议ResultSet的报文结构,并实例模拟mysql客户端解析COM_QUERY,select命令的发送与结果集的解析。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!