zoukankan      html  css  js  c++  java
  • Golang&Python测试thrift

    接上篇,安装好之后,就开始编写IDL生成然后测试。

    一、生成运行

    参考 http://www.aboutyun.com/thread-8916-1-1.html 来个添加,查询。

    namespace go  my.test.demo
    namespace py  my.test.demo
     
    struct Student{
     1: i32 sid, 
     2: string sname,
     3: bool ssex=0,
     4: i16 sage,
    }
     
    const map<string,string> MAPCONSTANT = {'hello':'world', 'goodnight':'moon'}
         
    service ClassMember {        
        list<Student> List(1:i64 callTime),
        void Add(1: Student s),
        bool IsNameExist(1:i64 callTime, 2:string name),
    }

    通过以下命令来生成代码:

     thrift -r --gen go my.test.thrift 
     thrift -r --gen py my.test.thrift
    ├── client
    ├── client.go
    ├── gen-py
    │   ├── __init__.py
    │   ├── client.py
    │   └── my
    │       ├── __init__.py
    │       ├── __init__.pyc
    │       └── test
    │           ├── __init__.py
    │           ├── __init__.pyc
    │           └── demo
    │               ├── ClassMember-remote
    │               ├── ClassMember.py
    │               ├── ClassMember.pyc
    │               ├── __init__.py
    │               ├── __init__.pyc
    │               ├── constants.py
    │               ├── ttypes.py
    │               └── ttypes.pyc
    ├── my
    │   └── test
    │       └── demo
    │           ├── class_member-remote
    │           │   └── class_member-remote.go
    │           ├── classmember.go
    │           ├── constants.go
    │           └── ttypes.go
    ├── my.test.thrift
    ├── server
    └── server.go

    thrift帮忙生成代码部分:网络连接、数据序列化、RPC调用函数映射、数据发送等。

    采用什么传输类型,采用什么服务类型,具体函数还要是自己写的。

    参考:

    类似Thrift的工具,还有Avro、protocol buffer,但相对于Thrift来讲,都没有Thrift支持全面和使用广泛。
    
    1) thrift内部框架一瞥
      按照官方文档给出的整体框架,Thrift自下到上可以分为4层:
    +-------------------------------------------+
    | Server                                    |  -- 服务器进程调度
    | (single-threaded, event-driven etc) |
    +-------------------------------------------+
    | Processor                                 |  -- RPC接口处理函数分发,IDL定义接口的实现将挂接到这里面
    | (compiler generated)                      |
    +-------------------------------------------+
    | Protocol                                  |  -- 协议
    | (JSON, compact etc)                       |
    +-------------------------------------------+
    | Transport                                 |  -- 网络传输
    | (raw TCP, HTTP etc)                       |
    +-------------------------------------------+
    
      Thrift实际上是实现了C/S模式,通过代码生成工具将接口定义文件生成服务器端和客户端代码(可以为不同语言),从而实现服务端和客户端跨语言的支持。用户在Thirft描述文件中声明自己的服务,这些服务经过编译后会生成相应语言的代码文件,然后用户实现服务(客户端调用服务,服务器端提服务)便可以了。其中protocol(协议层, 定义数据传输格式,可以为二进制或者XML等)和transport(传输层,定义数据传输方式,可以为TCP/IP传输,内存共享或者文件共享等)被用作运行时库。
    
    2)支持的数据传输格式、数据传输方式和服务模型
        (a)支持的传输格式
          TBinaryProtocol – 二进制格式.
          TCompactProtocol – 压缩格式
          TJSONProtocol – JSON格式
          TSimpleJSONProtocol –提供JSON只写协议, 生成的文件很容易通过脚本语言解析。
          TDebugProtocol – 使用易懂的可读的文本格式,以便于debug
        (b) 支持的数据传输方式
          TSocket -阻塞式socker
          TFramedTransport – 以frame为单位进行传输,非阻塞式服务中使用。
          TFileTransport – 以文件形式进行传输。
          TMemoryTransport – 将内存用于I/O. java实现时内部实际使用了简单的ByteArrayOutputStream。
          TZlibTransport – 使用zlib进行压缩, 与其他传输方式联合使用。当前无java实现。
        (c)支持的服务模型
          TSimpleServer – 简单的单线程服务模型,常用于测试
          TThreadPoolServer – 多线程服务模型,使用标准的阻塞式IO。
          TNonblockingServer – 多线程服务模型,使用非阻塞式IO(需使用TFramedTransport数据传输方式)
    
        3) Thrift IDL
      Thrift定义一套IDL(Interface Definition Language)用于描述接口,通常后缀名为.thrift,通过thrift程序把.thrift文件导出成各种不一样的代码的协议定义。IDL支持的类型可以参考这里:http://thrift.apache.org/docs/types
    //client.go
    
    package main
    
    import (
        "./my/test/demo"
        "fmt"
        "git.apache.org/thrift.git/lib/go/thrift"
        "net"
        "os"
        "time"
    )
    
    const (
        HOST = "127.0.0.1"
        PORT = "10086"
    )
    
    func main() {
        startTime := currentTimeMillis()
        transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
        protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
        transport, err := thrift.NewTSocket(net.JoinHostPort(HOST, PORT))
        if err != nil {
            fmt.Fprintln(os.Stderr, "error resolving address:", err)
            os.Exit(1)
        }
    
        useTransport := transportFactory.GetTransport(transport)
        client := demo.NewClassMemberClientFactory(useTransport, protocolFactory)
        if err := transport.Open(); err != nil {
            fmt.Fprintln(os.Stderr, "Error opening socket to "+HOST+":"+PORT, " ", err)
            os.Exit(1)
        }
        defer transport.Close()
        var i int32
        for i = 0; i < 5; i++ {
            var s demo.Student
            s.Sid = i
            s.Sname = fmt.Sprintf("name_%d", i)
            err := client.Add(&s)
            time.Sleep(time.Second * 3)
            fmt.Println("add", i, "student", err)
        }
    
        sList, err := client.List(currentTimeMillis())
        fmt.Println(err)
        for _, s := range sList {
            fmt.Println(s)
        }
    
        endTime := currentTimeMillis()
        fmt.Printf("calltime:%d-%d=%dms
    ", endTime, startTime, (endTime - startTime))
    
    }
    
    func currentTimeMillis() int64 {
        return time.Now().UnixNano() / 1000000
    }
    View Code
    //server.go
    
    package main
    
    import (
        "./my/test/demo"
        "fmt"
        "git.apache.org/thrift.git/lib/go/thrift"
        "os"
    )
    
    const (
        NetworkAddr = "0.0.0.0:10086"
    )
    
    type ClassMemberImpl struct {
    }
    
    func (c *ClassMemberImpl) Add(s *demo.Student) (err error) {
        fmt.Println(s)
        students[s.Sid] = s
        return nil
    }
    
    func (c *ClassMemberImpl) List(callTime int64) (r []*demo.Student, err error) {
        for _, s := range students {
            r = append(r, s)
        }
        return r, nil
    }
    
    func (c *ClassMemberImpl) IsNameExist(callTime int64, name string) (r bool, err error) {
        for _, s := range students {
            if s.Sname == name {
                return true, nil
            }
        }
        return false, nil
    }
    
    var students map[int32]*demo.Student
    
    func main() {
    
        students = make(map[int32]*demo.Student, 5)
    
        transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
        protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
        serverTransport, err := thrift.NewTServerSocket(NetworkAddr)
        if err != nil {
            fmt.Println("Error!", err)
            os.Exit(1)
        }
    
        handler := &ClassMemberImpl{}
        processor := demo.NewClassMemberProcessor(handler)
        server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
        fmt.Println("thrift server in", NetworkAddr)
        server.Serve()
    }
    View Code
    #client.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import sys, glob, time,datetime
    sys.path.append('gen-py')
    #print glob.glob('libpy/lib.*')
    #sys.path.insert(0, glob.glob('libpy/lib.*')[0])
    
    from my.test.demo import ClassMember
    from my.test.demo.ttypes import *
    
    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    
    try:
      startTime = time.time()*1000
      # Make socket
      transport = TSocket.TSocket('127.0.0.1', 10086)
    
      # Framed is critical. Raw sockets are very slow
      transport = TTransport.TFramedTransport(transport)
    
      # Wrap in a protocol
      protocol = TBinaryProtocol.TBinaryProtocol(transport)
    
      # Create a client to use the protocol encoder
      client = ClassMember.Client(protocol)
    
      # Connect!
      transport.open()
    
      for i in range(1,6):
        i+=1
        s = Student()  
        s.sid = i
        s.sname = "name_{}".format(i)
        r = client.Add(s)
        print(s)
      
      endTime = time.time()*1000
    
      print client.List(time.time()*1000)
    
      print "use:%d-%d=%dms" %(endTime,startTime, (endTime - startTime))
      # Close!
      transport.close()
    
    except Thrift.TException, tx:
      print 'ERROR:%s' % (tx.message)

    编写代码之后,编译和运行可能需要安装对应的语言包:

    #python的thrift package安装
    sudo easy_install thrift==0.9.0
    
    #golang的package安装
    go get git.apache.org/thrift.git/lib/go/thrift

    编译之后运行:

    qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest $./client 
    add 0 student <nil>
    add 1 student <nil>
    add 2 student <nil>
    add 3 student <nil>
    add 4 student <nil>
    <nil>
    Student({Sid:0 Sname:name_0 Ssex:false Sage:0})
    Student({Sid:1 Sname:name_1 Ssex:false Sage:0})
    Student({Sid:2 Sname:name_2 Ssex:false Sage:0})
    Student({Sid:3 Sname:name_3 Ssex:false Sage:0})
    Student({Sid:4 Sname:name_4 Ssex:false Sage:0})
    calltime:1429856944847-1429856929841=15006ms
    qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest/gen-py $python client.py 
    Student(sname='name_2', sage=None, ssex=False, sid=2)
    Student(sname='name_3', sage=None, ssex=False, sid=3)
    Student(sname='name_4', sage=None, ssex=False, sid=4)
    Student(sname='name_5', sage=None, ssex=False, sid=5)
    Student(sname='name_6', sage=None, ssex=False, sid=6)
    [Student(sname='name_2', sage=0, ssex=False, sid=2), Student(sname='name_3', sage=0, ssex=False, sid=3), Student(sname='name_4', sage=0, ssex=False, sid=4), Student(sname='name_5', sage=0, ssex=False, sid=5), Student(sname='name_6', sage=0, ssex=False, sid=6)]
    use:1429858724602-1429858724600=1ms

    跨语言OK。

    二、thrift go package 

    qpzhang@qpzhangdeMac-mini:~/project/thrift-0.9.2/mytest $./client 
    add 0 student <nil>
    add 1 student <nil>
    add 2 student <nil>
    add 3 student EOF
    add 4 student write tcp 127.0.0.1:10086: broken pipe
    write tcp 127.0.0.1:10086: broken pipe
    calltime:1429856896694-1429856881690=15004ms

    在client运行过程中把server干掉,可以看到返回错误。

    这里超时、心跳,重连都要自己来搞,不像ZeroMQ,全部都帮你搞掂。

    另外,Golang里面没有下面两种实现:

    TThreadPoolServer – 多线程服务模型,使用标准的阻塞式IO。
    TNonblockingServer – 多线程服务模型,使用非阻塞式IO(需使用TFramedTransport数据传输方式)

    查看源码:

    func (p *TSimpleServer) AcceptLoop() error {
        for {
            client, err := p.serverTransport.Accept()
            if err != nil {
                select {
                case <-p.quit:
                    return nil
                default:
                }
                return err
            }
            if client != nil {
                go func() {
                    if err := p.processRequests(client); err != nil {
                        log.Println("error processing request:", err)
                    }
                }()
            }
        }
    }
    
    func (p *TSimpleServer) Serve() error {
        err := p.Listen()
        if err != nil {
            return err
        }
        p.AcceptLoop()
        return nil
    }

    可以看到simple server本身就是异步非阻塞的。

    func (p *TSimpleServer) processRequests(client TTransport) error {
        processor := p.processorFactory.GetProcessor(client)
        inputTransport := p.inputTransportFactory.GetTransport(client)
        outputTransport := p.outputTransportFactory.GetTransport(client)
        inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
        outputProtocol := p.outputProtocolFactory.GetProtocol(outputTransport)
        defer func() {
            if e := recover(); e != nil {
                log.Printf("panic in processor: %s: %s", e, debug.Stack())
            }
        }()
        if inputTransport != nil {
            defer inputTransport.Close()
        }
        if outputTransport != nil {
            defer outputTransport.Close()
        }
        for {
            ok, err := processor.Process(inputProtocol, outputProtocol)
            if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
                return nil
            } else if err != nil {
                log.Printf("error processing request: %s", err)
                return err
            }
            if !ok {
                break
            }
        }
        return nil
    }
     
    可以看到实现是在生成的代码里面的:
     
    func NewClassMemberProcessor(handler ClassMember) *ClassMemberProcessor {
    
        self6 := &ClassMemberProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)}
        self6.processorMap["List"] = &classMemberProcessorList{handler: handler}
        self6.processorMap["Add"] = &classMemberProcessorAdd{handler: handler}
        self6.processorMap["IsNameExist"] = &classMemberProcessorIsNameExist{handler: handler}
        return self6
    }
    
    func (p *ClassMemberProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
        name, _, seqId, err := iprot.ReadMessageBegin()
        if err != nil {
            return false, err
        }
        if processor, ok := p.GetProcessorFunction(name); ok {
            return processor.Process(seqId, iprot, oprot)
        }
        iprot.Skip(thrift.STRUCT)
        iprot.ReadMessageEnd()
        x7 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name)
        oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId)
        x7.Write(oprot)
        oprot.WriteMessageEnd()
        oprot.Flush()
        return false, x7
    
    }
    
    type classMemberProcessorList struct {
        handler ClassMember
    }
    
    func (p *classMemberProcessorList) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
        args := ListArgs{}
        if err = args.Read(iprot); err != nil {
            iprot.ReadMessageEnd()
            x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error())
            oprot.WriteMessageBegin("List", thrift.EXCEPTION, seqId)
            x.Write(oprot)
            oprot.WriteMessageEnd()
            oprot.Flush()
            return false, err
        }
    
        iprot.ReadMessageEnd()
        result := ListResult{}
        var retval []*Student
        var err2 error
        if retval, err2 = p.handler.List(args.CallTime); err2 != nil {
            x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing List: "+err2.Error())
            oprot.WriteMessageBegin("List", thrift.EXCEPTION, seqId)
            x.Write(oprot)
            oprot.WriteMessageEnd()
            oprot.Flush()
            return true, err2
        } else {
            result.Success = retval
        }
        if err2 = oprot.WriteMessageBegin("List", thrift.REPLY, seqId); err2 != nil {
            err = err2
        }
        if err2 = result.Write(oprot); err == nil && err2 != nil {
            err = err2
        }
        if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil {
            err = err2
        }
        if err2 = oprot.Flush(); err == nil && err2 != nil {
            err = err2
        }
        if err != nil {
            return
        }
        return true, err
    }

    如果是多语言协作,另外一种方式json+http,实现成本感觉没有thrift这么大。

    另外,性能可以参考, http://www.tuicool.com/articles/rEj6fie 

  • 相关阅读:
    ETL Pentaho Data Integration (Kettle) 插入/更新 问题 etl
    Value Investment
    sqlserver 2008r2 表分区拆分问题
    HTTP与HTTPS的区别与联系
    别人分享的面经
    饥人谷开放接口(教程)
    java内存泄漏
    单例模式
    Maven项目上有小红叉咋办
    Socket通信1.0
  • 原文地址:https://www.cnblogs.com/zhangqingping/p/4453756.html
Copyright © 2011-2022 走看看