最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

分布式数据库中间件–(3)Cobar对简单select命令的处理过程

来源:动视网 责编:小采 时间:2020-11-09 15:07:19
文档

分布式数据库中间件–(3)Cobar对简单select命令的处理过程

分布式数据库中间件–(3)Cobar对简单select命令的处理过程:在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的
推荐度:
导读分布式数据库中间件–(3)Cobar对简单select命令的处理过程:在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的


解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:

01 public void query(String sql) {
02 //这里就得到了完整的SQL语句,接收自客户端
03 ServerConnection c = this.source;
04 if (LOGGER.isDebugEnabled()) {
05 LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
06 }
07 //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,执行相应的操作
08 int rs = ServerParse.parse(sql);
09 switch (rs & 0xff) {
10 .......................
11 case ServerParse.SELECT:
12 //select操作执行
13 SelectHandler.handle(sql, c, rs >>> 8);
14 break;
15 .......................
16 }
17 }

首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql

c.execute(stmt, ServerParse.SELECT);

在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。

1 // 路由计算
2 RouteResultset rrs = null;
3 try {
4 rrs = ServerRouter.route(schema, sql, this.charset, this);
5 LOGGER.debug("路由计算结果:"+rrs.toString());
6 }

具体的路由算法也是比较复杂,以后会专门分析。

Cobar的DEBUG控制台输出路由的计算结果如下:

11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={

该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。

01 private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {
02 ServerConnection sc = ss.getSource();
03 .........................
04 try {
05 // 执行并等待返回
06 BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
07 // 接收和处理数据,执行到这里就说明上面的执行已经得到执行结果的返回
08 final ReentrantLock lock = MultiNodeExecutor.this.lock;
09 lock.lock();
10 try {
11 switch (bin.data[0]) {
12 case ErrorPacket.FIELD_COUNT:
13 c.setRunning(false);
14 handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
15 break;
16 case OkPacket.FIELD_COUNT:
17 OkPacket ok = new OkPacket();
18 ok.read(bin);
19 affectedRows += ok.affectedRows;
20 // set lastInsertId
21 if (ok.insertId > 0) {
22 insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);
23 }
24 c.setRunning(false);
25 handleSuccessOK(ss, rrn, autocommit, ok);
26 break;
27 default: // HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF
28 final MySQLChannel mc = (MySQLChannel) c;
29 if (fieldEOF) {
30 for (;;) {
31 bin = mc.receive();
32 switch (bin.data[0]) {
33 case ErrorPacket.FIELD_COUNT:
34 c.setRunning(false);
35 handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
36 return;
37 case EOFPacket.FIELD_COUNT:
38 handleRowData(rrn, c, ss);
39 return;
40 default:
41 continue;
42 }
43 }
44 } else {
45 bin.packetId = ++packetId;// HEADER
46 List headerList = new LinkedList();
47 headerList.add(bin);
48 for (;;) {
49 bin = mc.receive();
50 switch (bin.data[0]) {
51 case ErrorPacket.FIELD_COUNT:
52 c.setRunning(false);
53 handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
54 return;
55 case EOFPacket.FIELD_COUNT:
56 bin.packetId = ++packetId;// FIELD_EOF
57 for (MySQLPacket packet : headerList) {
58 buffer = packet.write(buffer, sc);
59 }
60 headerList = null;
61 buffer = bin.write(buffer, sc);
62 fieldEOF = true;
63 handleRowData(rrn, c, ss);
64 return;
65 default:
66 bin.packetId = ++packetId;// FIELDS
67 switch (flag) {
68 case RouteResultset.REWRITE_FIELD:
69 StringBuilder fieldName = new StringBuilder();
70 fieldName.append("Tables_in_").append(ss.getSource().getSchema());
71 FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
72 headerList.add(field);
73 break;
74 default:
75 headerList.add(bin);
76 }
77 }
78 }
79 }
80 }
81 } finally {
82 lock.unlock();
83 }
84 }//异常处理....................
85 }

这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。

当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。

响应报文类型 第1个字节取值范围
OK 响应报文 0×00
Error 响应报文 0xFF
Result Set 报文 0×01 – 0xFA
Field 报文 0×01 – 0xFA
Row Data 报文 0×01 – 0xFA
EOF 报文 0xFE

注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。

Result Set 消息分为五部分,结构如下:

结构 说明
[Result Set Header] 列数量
[Field] 列信息(多个)
[EOF] 列结束
[Row Data] 行数据(多个)
[EOF] 数据结束

函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。

文档

分布式数据库中间件–(3)Cobar对简单select命令的处理过程

分布式数据库中间件–(3)Cobar对简单select命令的处理过程:在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的
推荐度:
标签: 过程 简单 数据库
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top