zoukankan      html  css  js  c++  java
  • Thrift学习

    Thrift学习

    一:thrift介绍

      Thrift是facebook开发的用来处理各不同系统之间数据通讯的rpc服务框架,后来成为apche的开源项目。thrift支持多种程序语言,包括Java,Python,Ruby,JavaScript,Node.js,Go,C,C++,C#,Erlang,Delphi,Perl,Php,SmallTalk,OCaml,Haxe,Haskell,D语言。Thrift采用IDL(Interface Defination Language)描述性语言来定义数据结构和接口。Thrift模型如下所示:
     
                                                      图 thrift模型图

     

    二 thrift数据传输协议

    TBinaryProtocol                 二进制传输协议
    TCompactProtocol                使用VLQ编码进行压缩的数据传输协议
    TJSONProtocol                   JSON格式的数据传输协议
    TSimpleJSONProtocol             简单的JSON格式数据传输协议
    TDebugProtocol                  调试时使用的文本传输协议

     

    三 thrift传输层

    TFramedTransport               按块的大小进行传输
    TFileTransport                 按照文件的方式进行传输
    TMemoryTransport               使用内存IO方式进行传输
    TZlibTransport                 执行zlib压缩方式传输

    四 thrift服务器端

    TSimpleServer                  简单的单线程标准阻塞式服务器
    TThreadPoolServer              多线程阻塞式服务器
    TNonblockingServer             多线程非阻塞式服务器
    THsHaServer                    半同步半异步服务器
    其实传输层的传输只有阻塞和非阻塞,再加上具体的工作方式 单线程 多线程

    五 thrift客户端

    TClient                    简单单线程阻塞式客户端
    TAsynClient                异步客户端(多线程)

    六 thrift开发步骤

    1服务器端

    实现服务处理接口impl
    创建TProcessor
    创建TServerTransport(TServerSocket)   创建阻塞通信的还是非阻塞通信
    创建TProtocol                                      数据传输协议
    创建TServer                                       服务器类型 单工(单线程)  双工(多线程)  半单工半双工(多线程)
    启动Server

    2客户端

    创建Transport(TSocket)               创建阻塞通信(客户端只有阻塞)
    创建TProtocol                        数据传输协议
    基于TTransport和TProtocol创建Client
    调用Client的相应方法

    七 thrift数据类型

    1基本类型

    bool:布尔值,true 或 false

    byte:8 位有符号整数

    i16:16 位有符号整数

    i32:32 位有符号整数

    i64:64 位有符号整数

    double:64 位浮点数

    string:utf-8编码的字符串

    2结构体类型

    struct:定义公共的对象

    enum: 枚举类型

    3容器类型

    list:对应 Java 的 ArrayList

    set:对应 Java 的 HashSet

    map:对应 Java 的 HashMap

    4异常类型

    exception:对应 Java 的 Exception

    5服务类型

    service:对应服务的类  提供接口

    八 thrift例子

    enum 类型

    复制代码
    struct Student{
         1: required i32 id
         2: required string username
         3: required string password
         4: requried string number
         5: optional double age
    }
    复制代码

    struct 类型

    复制代码
    struct School{
         1: required i32 id
         2: required string name
         3: required set<Student> students
         4: required list<Student> rank
         5: required map<string, string> number_name
    }
    复制代码

    service 类型

    service ThriftMysqlService{
         void addUser(1:Student user)
         list<Student> queryAllUser()
         Student queryOneUser(1:i32 id)
         map<string, string> queryOneArticle(1:i32 id)
    }

    具体代码

    thrift.thrift 定义数据类型和接口的文件

    复制代码
    namespace java org.seava.thrift_example.thrift
    
    struct User{
        1: required i32 userId
        2: required string username
        3: required string password
    }
    
    service ThriftService{
        void addUser(1:User user)
        User queryUser(1:i32 id)
        list<User> queryUserList()
        map<string, string> queryUserNamePass()
        map<i32, User> queryUserMap()
    }
    复制代码

    到apache的官网下载thrift.exe程序, 下载地址 http://thrift.apache.org/ ,下下来通过cmd命令窗口去运行如下命令

    thrift  -gen java xxx.thrift

    接口实现 ThriftServiceImpl.java

    复制代码
     1 package org.seava.thrift_example.thrift;
     2 
     3 import java.util.ArrayList;
     4 import java.util.HashMap;
     5 import java.util.List;
     6 import java.util.Map;
     7 
     8 
     9 public class ThriftServiceImpl implements ThriftService.Iface {
    10 
    11       public void addUser(User user) throws org.apache.thrift.TException{
    12           System.out.println(user.userId + "  " + user.username + "  " + user.password);
    13       }
    14 
    15       public User queryUser(int id) throws org.apache.thrift.TException{
    16           System.out.println(id);
    17           User user = new User();
    18           user.userId = 100;
    19           user.username = "FFF";
    20           user.password = "NNN";
    21           return user;
    22       }
    23 
    24       public List<User> queryUserList() throws org.apache.thrift.TException{
    25           User user = new User();
    26           user.userId = 100;
    27           user.username = "FFF";
    28           user.password = "NNN";
    29           User user2 = new User();
    30           user2.userId = 102;
    31           user2.username = "FFF2";
    32           user2.password = "NNN2";
    33           List<User> list = new ArrayList<User>();
    34           list.add(user2);
    35           list.add(user);
    36           return list;
    37       }
    38 
    39       public Map<String,String> queryUserNamePass() throws org.apache.thrift.TException{
    40           User user = new User();
    41           user.userId = 100;
    42           user.username = "FFF";
    43           user.password = "NNN";
    44           Map<String, String> map = new HashMap<String, String>();
    45           map.put("password", user.password);
    46           map.put("useranme", user.username);
    47           return map;
    48       }
    49 
    50       public Map<Integer,User> queryUserMap() throws org.apache.thrift.TException{
    51           User user = new User();
    52           user.userId = 100;
    53           user.username = "FFF";
    54           user.password = "NNN";
    55           User user2 = new User();
    56           user2.userId = 102;
    57           user2.username = "FFF2";
    58           user2.password = "NNN2";
    59           Map<Integer, User> map = new HashMap<Integer, User>();
    60           map.put(user.userId, user);
    61           map.put(user2.userId, user2);
    62           return map;
    63       }
    64 
    65 }
    复制代码

    服务器 Server.java 

    复制代码
      1 package org.seava.thrift_example.thrift;
      2 
      3 import org.apache.thrift.TProcessor;
      4 import org.apache.thrift.protocol.TBinaryProtocol;
      5 import org.apache.thrift.protocol.TCompactProtocol;
      6 import org.apache.thrift.server.THsHaServer;
      7 import org.apache.thrift.server.TNonblockingServer;
      8 import org.apache.thrift.server.TServer;
      9 import org.apache.thrift.server.TSimpleServer;
     10 import org.apache.thrift.server.TThreadPoolServer;
     11 import org.apache.thrift.transport.TFramedTransport;
     12 import org.apache.thrift.transport.TNonblockingServerSocket;
     13 import org.apache.thrift.transport.TNonblockingServerTransport;
     14 import org.apache.thrift.transport.TServerSocket;
     15 import org.apache.thrift.transport.TTransportException;
     16 
     17 
     18 public class Server {
     19     
     20     public static int port = 8090;
     21     
     22     /**
     23      * 简单服务器类型  阻塞单线程
     24      * 步骤
     25      * 创建TProcessor
     26      * 创建TServerTransport
     27      * 创建TProtocol
     28      * 创建TServer
     29      * 启动Server
     30      */
     31     public static void startSimpleServer(){
     32         //创建processor
     33         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
     34         try {
     35             //创建transport 阻塞通信
     36             TServerSocket serverTransport = new TServerSocket(port);
     37             //创建protocol
     38             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory();
     39             //将processor transport protocol设入到服务器server中
     40             TServer.Args args = new TServer.Args(serverTransport);
     41             args.processor(tprocessor);
     42             args.protocolFactory(protocol);
     43             //定义服务器类型 设定参数
     44             TServer server = new TSimpleServer(args);
     45             //开启服务
     46             server.serve();
     47         } catch (TTransportException e) {
     48             e.printStackTrace();
     49         }
     50     }
     51     
     52     /**
     53      * 多线程服务器   阻塞多线程
     54      */
     55     public static void startThreadPoolServer(){
     56         //创建processor
     57         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
     58         try{
     59             //创建transport 阻塞通信
     60             TServerSocket serverTransport = new TServerSocket(port);
     61             //创建protocol  数据传输协议
     62             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory();
     63             TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
     64             args.processor(tprocessor);
     65             args.protocolFactory(protocol);
     66             //创建服务器类型  多线程
     67             TServer server = new TThreadPoolServer(args);
     68             //开启服务
     69             server.serve();
     70         }catch(Exception e){
     71             e.printStackTrace();
     72         }
     73     }
     74     
     75     /**
     76      * 非阻塞I/O
     77      */
     78     public static void startTNonblockingServer(){
     79         //创建processor
     80         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
     81         try{
     82             //创建transport 非阻塞 nonblocking
     83             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
     84             //创建protocol 数据传输协议
     85             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();
     86             //创建transport 数据传输方式  非阻塞需要用这种方式传输
     87             TFramedTransport.Factory transport = new TFramedTransport.Factory();
     88             TNonblockingServer.Args args = new TNonblockingServer.Args(serverTransport);
     89             args.processor(tprocessor);
     90             args.transportFactory(transport);
     91             args.protocolFactory(protocol);
     92             //创建服务器 类型是非阻塞
     93             TServer server = new TNonblockingServer(args);
     94             //开启服务
     95             server.serve();
     96         }catch(Exception e){
     97             e.printStackTrace();
     98         }
     99     }
    100     
    101     /**
    102      * 半同步半异步的非阻塞I/O
    103      */
    104     public static void startTHsHaServer(){
    105         //创建processor
    106         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
    107         try{
    108             //创建transport  非阻塞
    109             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
    110             //非阻塞需要的传输方式
    111             TFramedTransport.Factory transport = new TFramedTransport.Factory();
    112             //数据传输协议
    113             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();
    114             //创建半同步半异步服务
    115             THsHaServer.Args args = new THsHaServer.Args(serverTransport);
    116             args.processor(tprocessor);
    117             args.transportFactory(transport);
    118             args.protocolFactory(protocol);
    119             //创建 服务类型
    120             TServer server = new THsHaServer(args);
    121             //开启服务
    122             server.serve();
    123         }catch(Exception e){
    124             e.printStackTrace();
    125         }
    126     }
    127     
    128     public static void main(String args[]){
    129         //开启简单服务器
    130 //        Server.startSimpleServer();
    131         //开启多线程服务器
    132 //        Server.startThreadPoolServer();
    133 //        Server.startTNonblockingServer();
    134 //        Server.startTHsHaServer();
    135         Server.startTNonblockingServer();
    136     }
    137 }
    复制代码

    Server.java实现了简单服务器(阻塞单线程)   阻塞多线程   非阻塞   半同步半异步非阻塞

    注意: 非阻塞时传输层需要选择TFramedTransport           

    客户端 Client.java

    复制代码
      1 package org.seava.thrift_example.thrift;
      2 
      3 import java.util.List;
      4 import java.util.Map;
      5 import java.util.concurrent.CountDownLatch;
      6 import java.util.concurrent.TimeUnit;
      7 
      8 import org.apache.thrift.async.TAsyncClientManager;
      9 import org.apache.thrift.protocol.TBinaryProtocol;
     10 import org.apache.thrift.protocol.TCompactProtocol;
     11 import org.apache.thrift.protocol.TProtocol;
     12 import org.apache.thrift.protocol.TProtocolFactory;
     13 import org.apache.thrift.transport.TFramedTransport;
     14 import org.apache.thrift.transport.TNonblockingSocket;
     15 import org.apache.thrift.transport.TNonblockingTransport;
     16 import org.apache.thrift.transport.TSocket;
     17 import org.apache.thrift.transport.TTransport;
     18 
     19 
     20 public class Client implements Runnable {
     21 
     22     public static String ip = "localhost";
     23     public static int port = 8090;
     24     public static int time_out = 30000;
     25     
     26     /**
     27      * 客户端设置
     28      * 创建Transport
     29      * 创建TProtocol
     30      * 基于TTransport和TProtocol创建Client
     31      * 调用Client的相应方法
     32      */
     33     public static void startSimpleClient(){
     34         TTransport transport = null;
     35         try{
     36             //创建Transport
     37             transport = new TSocket(ip, port, time_out);
     38             //创建TProtocol
     39             TProtocol protocol = new TBinaryProtocol(transport);
     40             //基于TTransport和TProtocol创建Client
     41             ThriftService.Client client = new ThriftService.Client(protocol);
     42             transport.open();
     43             //调用client方法
     44             List<User> list = client.queryUserList();
     45             for(User user : list){
     46                 System.out.println(user.userId + " " + user.username + " " + user.password);
     47             }
     48             Map<String, String> map = client.queryUserNamePass();
     49             System.out.println(map);
     50             User user = client.queryUser(10);
     51             System.out.println(user.userId + " " + user.username + " " + user.password);
     52             Map<Integer, User> map_u = client.queryUserMap();
     53             System.out.println(map_u);
     54             User uu = new User();
     55             uu.userId = 1111;
     56             uu.username = "mmbbmmbb";
     57             uu.password = "ppbbppbb";
     58             client.addUser(uu);
     59         }catch(Exception e){
     60             e.printStackTrace();
     61         }
     62     }
     63     
     64     /**
     65      * 调用阻塞服务器的客户端
     66      */
     67     public static void startNonblockingClient(){
     68         TTransport transport = null;
     69         try{
     70             transport = new TFramedTransport(new TSocket(ip, port));
     71             TCompactProtocol protocol = new TCompactProtocol(transport);
     72             ThriftService.Client client = new ThriftService.Client(protocol);
     73             transport.open();
     74             //调用client方法
     75             List<User> list = client.queryUserList();
     76             for(User user : list){
     77                 System.out.println(user.userId + " " + user.username + " " + user.password);
     78             }
     79             Map<String, String> map = client.queryUserNamePass();
     80             System.out.println(map);
     81             User user = client.queryUser(10);
     82             System.out.println(user.userId + " " + user.username + " " + user.password);
     83             Map<Integer, User> map_u = client.queryUserMap();
     84             System.out.println(map_u);
     85             User uu = new User();
     86             uu.userId = 1111;
     87             uu.username = "mmbbmmbb";
     88             uu.password = "ppbbppbb";
     89             client.addUser(uu);
     90         }catch(Exception e){
     91             e.printStackTrace();
     92         }
     93     }
     94     
     95     public static void startAsynClient(){
     96         try{
     97             TAsyncClientManager clientManager = new TAsyncClientManager();
     98             TNonblockingTransport transport = new TNonblockingSocket(ip, port, time_out);
     99             TProtocolFactory tprotocol = new TCompactProtocol.Factory();
    100             ThriftService.AsyncClient asyncClient = new ThriftService.AsyncClient(tprotocol, clientManager, transport);
    101             System.out.println("Client start ...");
    102             CountDownLatch latch = new CountDownLatch(1);
    103             AsynCallback callBack = new AsynCallback(latch);
    104             System.out.println("call method queryUser start ...");
    105             asyncClient.queryUser(100, callBack);
    106             System.out.println("call method queryUser end");
    107             boolean wait = latch.await(30, TimeUnit.SECONDS);
    108             System.out.println("latch.await =:" + wait);
    109         }catch(Exception e){
    110             e.printStackTrace();
    111         }
    112     }
    113     
    114     public void run(){
    115         Client.startSimpleClient();
    116     }
    117     
    118     public static void main(String args[]){
    119         //调用简单服务器 
    120 //        Client.startSimpleClient();
    121         /*Client c1 = new Client();
    122         Client c2 = new Client();
    123         
    124         new Thread(c1).start();
    125         new Thread(c2).start();*/
    126         
    127 //        Client.startNonblockingClient();
    128 //        Client.startNonblockingClient();
    129         Client.startAsynClient();
    130     }
    131 }
    复制代码

    客户端实现了 阻塞单线程  和 异步客户端

    具体代码在github上: https://github.com/WaterHsu/thrift-example.git

     
     
    标签: thriftjava
  • 相关阅读:
    Kettle 实现mysql数据库不同表之间数据同步——实验过程
    Kettle ETL 来进行mysql 数据同步——试验环境搭建(表中无索引,无约束,无外键连接的情况)
    并查集知识总结
    c# 线程同步问题(about volatile)
    c# 线程的等待(堵塞)
    net中多线程返回值
    c# 中的 lock monitor mutex Semaphore 的比较
    c#两种同步结构
    links-some-blog
    T-SQL中的APPLY用法
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4103492.html
Copyright © 2011-2022 走看看