Thrift 客户端调用RPC的Demo
public static void main(String[] args) throws Exception { TTransport transport = new TSocket("127.0.0.1", 7912); TProtocol protocol = new TBinaryProtocol(transport); // 创建client com.gxf.thrift.HelloWordService.Client client = new com.gxf.thrift.HelloWordService.Client(protocol); transport.open(); // 建立连接 // 第一种请求类型 com.gxf.thrift.Request request = new com.gxf.thrift.Request() .setType(com.gxf.thrift.RequestType.SAY_HELLO).setName("guanxiangfei").setAge(24); System.out.println(client.doAction(request)); // 第二种请求类型 request.setType(com.gxf.thrift.RequestType.QUERY_TIME).setName("guanxiangfei"); System.out.println(client.doAction(request)); transport.close(); // 请求结束,断开连接 }
这里可以很清楚看到分层设计,以及层层封装。transport, proctol这里网上有人说是用了装饰模式
跟进transport.open()代码
/** * Opens the transport for reading/writing. * * @throws TTransportException if the transport could not be opened */ public abstract void open() throws TTransportException;
这里是个抽象类TTransport的抽象方法,看下实现类TSocekt实现方法
/** * Connects the socket, creating a new socket object if necessary. */ public void open() throws TTransportException { if (isOpen()) { throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); } if (host_ == null || host_.length() == 0) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); } if (port_ <= 0 || port_ > 65535) { throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + port_); } if (socket_ == null) { initSocket(); } try { socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_); inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); } catch (IOException iox) { close(); throw new TTransportException(TTransportException.NOT_OPEN, iox); } }
这里可以看出,用socket建立了连接,为发送序列化数据做准备
继续跟进代码
client.doAction(request)
继续
public String doAction(Request request) throws RequestException, org.apache.thrift.TException { send_doAction(request); return recv_doAction(); }
这里是thrift编译器或者说代码生成器,生成的框架代码
第一个应该是发送序列化数据,第二个方法应该是接收服务端数据。这里分析第一个方法,发送序列化数据。继续跟进代码
public void send_doAction(Request request) throws org.apache.thrift.TException { doAction_args args = new doAction_args(); args.setRequest(request); sendBase("doAction", args); }
这里request实体被设置到了args里面,继续跟进sendBase
protected void sendBase(String methodName, TBase<?,?> args) throws TException { sendBase(methodName, args, TMessageType.CALL); }
继续跟进
private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException { oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_)); args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); }
这是TServerClient.java不是生成代码,是thrift框架代码。这里可以看到RPC方法,args,以及请求类型应该需要被发送给服务端
这里可以看出,先会写一个消息开始的TMessage开始符号, 然后写args,最后写TMessage结束标识,刷新缓存。这应该是thrift发送消息的协议,先发送什么,后发送什么
。
继续跟进writeMessageBegin()方法
这里跟进的是TBinaryProtocol的实现
public void writeMessageBegin(TMessage message) throws TException { if (strictWrite_) { int version = VERSION_1 | message.type; writeI32(version); writeString(message.name); writeI32(message.seqid); } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } }
这里可以看出写入消息Begin的内容,version, name, seqid等
继续跟进writeI32(version)看下 一个整型传输的协议
public void writeI32(int i32) throws TException { inoutTemp[0] = (byte)(0xff & (i32 >> 24)); inoutTemp[1] = (byte)(0xff & (i32 >> 16)); inoutTemp[2] = (byte)(0xff & (i32 >> 8)); inoutTemp[3] = (byte)(0xff & (i32)); trans_.write(inoutTemp, 0, 4); }
这里可以看出,用了4个byte来存放一个int。然后调用tranport发送。这里Demo里用的应该是TSocket发送。int用4个byte发送应该也是正常的,一个int占4个字节
接着跟进tran_.write()方法。TSocket的父类是TIOStreamTransport。进入TIOStreamTransport
/** * Writes to the underlying output stream if not null. */ public void write(byte[] buf, int off, int len) throws TTransportException { if (outputStream_ == null) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream"); } try { outputStream_.write(buf, off, len); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } }
这里的outputStream在new TSocket()的时候初始化了,这里也可以看出Demo用的是socket这种阻塞io发送的。当然thrift也支持非阻塞的ChannelSocekt,文件等。
回到写args这一块我还要在看看,好像要跟到生成的代码中。
总结:
1. 底层发送序列化文件用的是socket, channelsocket等
2. thrift有一套自己的协议,如先发送messagebegin标识, int占几个字节等