zoukankan      html  css  js  c++  java
  • Thrift学习

    Thrift学习

    一:thrift介绍

      Thrift是facebook开发的用来处理各不同系统之间数据通讯的rpc服务框架,后来成为apche的开源项目。thrift支持多种程序语言,包括Java,Python,Ruby,JavaScript,Node.js,Go,C,C++,C#,Erlang,Delphi,Perl,Php,SmallTalk,OCaml,Haxe,Haskell,D语言。Thrift采用IDL(Interface Defination Language)描述性语言来定义数据结构和接口。Thrift模型如下所示:
     
                                                      图 thrift模型图

     

    二 thrift数据传输协议

    TBinaryProtocol                 二进制传输协议
    TCompactProtocol                使用VLQ编码进行压缩的数据传输协议
    TJSONProtocol                   JSON格式的数据传输协议
    TSimpleJSONProtocol             简单的JSON格式数据传输协议
    TDebugProtocol                  调试时使用的文本传输协议

     

    三 thrift传输层

    TFramedTransport               按块的大小进行传输
    TFileTransport                 按照文件的方式进行传输
    TMemoryTransport               使用内存IO方式进行传输
    TZlibTransport                 执行zlib压缩方式传输

    四 thrift服务器端

    TSimpleServer                  简单的单线程标准阻塞式服务器
    TThreadPoolServer              多线程阻塞式服务器
    TNonblockingServer             多线程非阻塞式服务器
    THsHaServer                    半同步半异步服务器
    其实传输层的传输只有阻塞和非阻塞,再加上具体的工作方式 单线程 多线程

    五 thrift客户端

    TClient                    简单单线程阻塞式客户端
    TAsynClient                异步客户端(多线程)

    六 thrift开发步骤

    1服务器端

    实现服务处理接口impl
    创建TProcessor
    创建TServerTransport(TServerSocket)   创建阻塞通信的还是非阻塞通信
    创建TProtocol                                      数据传输协议
    创建TServer                                       服务器类型 单工(单线程)  双工(多线程)  半单工半双工(多线程)
    启动Server

    2客户端

    创建Transport(TSocket)               创建阻塞通信(客户端只有阻塞)
    创建TProtocol                        数据传输协议
    基于TTransport和TProtocol创建Client
    调用Client的相应方法

    七 thrift数据类型

    1基本类型

    bool:布尔值,true 或 false

    byte:8 位有符号整数

    i16:16 位有符号整数

    i32:32 位有符号整数

    i64:64 位有符号整数

    double:64 位浮点数

    string:utf-8编码的字符串

    2结构体类型

    struct:定义公共的对象

    enum: 枚举类型

    3容器类型

    list:对应 Java 的 ArrayList

    set:对应 Java 的 HashSet

    map:对应 Java 的 HashMap

    4异常类型

    exception:对应 Java 的 Exception

    5服务类型

    service:对应服务的类  提供接口

    八 thrift例子

    enum 类型

    复制代码
    struct Student{
         1: required i32 id
         2: required string username
         3: required string password
         4: requried string number
         5: optional double age
    }
    复制代码

    struct 类型

    复制代码
    struct School{
         1: required i32 id
         2: required string name
         3: required set<Student> students
         4: required list<Student> rank
         5: required map<string, string> number_name
    }
    复制代码

    service 类型

    service ThriftMysqlService{
         void addUser(1:Student user)
         list<Student> queryAllUser()
         Student queryOneUser(1:i32 id)
         map<string, string> queryOneArticle(1:i32 id)
    }

    具体代码

    thrift.thrift 定义数据类型和接口的文件

    复制代码
    namespace java org.seava.thrift_example.thrift
    
    struct User{
        1: required i32 userId
        2: required string username
        3: required string password
    }
    
    service ThriftService{
        void addUser(1:User user)
        User queryUser(1:i32 id)
        list<User> queryUserList()
        map<string, string> queryUserNamePass()
        map<i32, User> queryUserMap()
    }
    复制代码

    到apache的官网下载thrift.exe程序, 下载地址 http://thrift.apache.org/ ,下下来通过cmd命令窗口去运行如下命令

    thrift  -gen java xxx.thrift

    接口实现 ThriftServiceImpl.java

    复制代码
     1 package org.seava.thrift_example.thrift;
     2 
     3 import java.util.ArrayList;
     4 import java.util.HashMap;
     5 import java.util.List;
     6 import java.util.Map;
     7 
     8 
     9 public class ThriftServiceImpl implements ThriftService.Iface {
    10 
    11       public void addUser(User user) throws org.apache.thrift.TException{
    12           System.out.println(user.userId + "  " + user.username + "  " + user.password);
    13       }
    14 
    15       public User queryUser(int id) throws org.apache.thrift.TException{
    16           System.out.println(id);
    17           User user = new User();
    18           user.userId = 100;
    19           user.username = "FFF";
    20           user.password = "NNN";
    21           return user;
    22       }
    23 
    24       public List<User> queryUserList() throws org.apache.thrift.TException{
    25           User user = new User();
    26           user.userId = 100;
    27           user.username = "FFF";
    28           user.password = "NNN";
    29           User user2 = new User();
    30           user2.userId = 102;
    31           user2.username = "FFF2";
    32           user2.password = "NNN2";
    33           List<User> list = new ArrayList<User>();
    34           list.add(user2);
    35           list.add(user);
    36           return list;
    37       }
    38 
    39       public Map<String,String> queryUserNamePass() throws org.apache.thrift.TException{
    40           User user = new User();
    41           user.userId = 100;
    42           user.username = "FFF";
    43           user.password = "NNN";
    44           Map<String, String> map = new HashMap<String, String>();
    45           map.put("password", user.password);
    46           map.put("useranme", user.username);
    47           return map;
    48       }
    49 
    50       public Map<Integer,User> queryUserMap() throws org.apache.thrift.TException{
    51           User user = new User();
    52           user.userId = 100;
    53           user.username = "FFF";
    54           user.password = "NNN";
    55           User user2 = new User();
    56           user2.userId = 102;
    57           user2.username = "FFF2";
    58           user2.password = "NNN2";
    59           Map<Integer, User> map = new HashMap<Integer, User>();
    60           map.put(user.userId, user);
    61           map.put(user2.userId, user2);
    62           return map;
    63       }
    64 
    65 }
    复制代码

    服务器 Server.java 

    复制代码
      1 package org.seava.thrift_example.thrift;
      2 
      3 import org.apache.thrift.TProcessor;
      4 import org.apache.thrift.protocol.TBinaryProtocol;
      5 import org.apache.thrift.protocol.TCompactProtocol;
      6 import org.apache.thrift.server.THsHaServer;
      7 import org.apache.thrift.server.TNonblockingServer;
      8 import org.apache.thrift.server.TServer;
      9 import org.apache.thrift.server.TSimpleServer;
     10 import org.apache.thrift.server.TThreadPoolServer;
     11 import org.apache.thrift.transport.TFramedTransport;
     12 import org.apache.thrift.transport.TNonblockingServerSocket;
     13 import org.apache.thrift.transport.TNonblockingServerTransport;
     14 import org.apache.thrift.transport.TServerSocket;
     15 import org.apache.thrift.transport.TTransportException;
     16 
     17 
     18 public class Server {
     19     
     20     public static int port = 8090;
     21     
     22     /**
     23      * 简单服务器类型  阻塞单线程
     24      * 步骤
     25      * 创建TProcessor
     26      * 创建TServerTransport
     27      * 创建TProtocol
     28      * 创建TServer
     29      * 启动Server
     30      */
     31     public static void startSimpleServer(){
     32         //创建processor
     33         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
     34         try {
     35             //创建transport 阻塞通信
     36             TServerSocket serverTransport = new TServerSocket(port);
     37             //创建protocol
     38             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory();
     39             //将processor transport protocol设入到服务器server中
     40             TServer.Args args = new TServer.Args(serverTransport);
     41             args.processor(tprocessor);
     42             args.protocolFactory(protocol);
     43             //定义服务器类型 设定参数
     44             TServer server = new TSimpleServer(args);
     45             //开启服务
     46             server.serve();
     47         } catch (TTransportException e) {
     48             e.printStackTrace();
     49         }
     50     }
     51     
     52     /**
     53      * 多线程服务器   阻塞多线程
     54      */
     55     public static void startThreadPoolServer(){
     56         //创建processor
     57         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
     58         try{
     59             //创建transport 阻塞通信
     60             TServerSocket serverTransport = new TServerSocket(port);
     61             //创建protocol  数据传输协议
     62             TBinaryProtocol.Factory protocol = new TBinaryProtocol.Factory();
     63             TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
     64             args.processor(tprocessor);
     65             args.protocolFactory(protocol);
     66             //创建服务器类型  多线程
     67             TServer server = new TThreadPoolServer(args);
     68             //开启服务
     69             server.serve();
     70         }catch(Exception e){
     71             e.printStackTrace();
     72         }
     73     }
     74     
     75     /**
     76      * 非阻塞I/O
     77      */
     78     public static void startTNonblockingServer(){
     79         //创建processor
     80         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
     81         try{
     82             //创建transport 非阻塞 nonblocking
     83             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
     84             //创建protocol 数据传输协议
     85             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();
     86             //创建transport 数据传输方式  非阻塞需要用这种方式传输
     87             TFramedTransport.Factory transport = new TFramedTransport.Factory();
     88             TNonblockingServer.Args args = new TNonblockingServer.Args(serverTransport);
     89             args.processor(tprocessor);
     90             args.transportFactory(transport);
     91             args.protocolFactory(protocol);
     92             //创建服务器 类型是非阻塞
     93             TServer server = new TNonblockingServer(args);
     94             //开启服务
     95             server.serve();
     96         }catch(Exception e){
     97             e.printStackTrace();
     98         }
     99     }
    100     
    101     /**
    102      * 半同步半异步的非阻塞I/O
    103      */
    104     public static void startTHsHaServer(){
    105         //创建processor
    106         TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl());
    107         try{
    108             //创建transport  非阻塞
    109             TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
    110             //非阻塞需要的传输方式
    111             TFramedTransport.Factory transport = new TFramedTransport.Factory();
    112             //数据传输协议
    113             TCompactProtocol.Factory protocol = new TCompactProtocol.Factory();
    114             //创建半同步半异步服务
    115             THsHaServer.Args args = new THsHaServer.Args(serverTransport);
    116             args.processor(tprocessor);
    117             args.transportFactory(transport);
    118             args.protocolFactory(protocol);
    119             //创建 服务类型
    120             TServer server = new THsHaServer(args);
    121             //开启服务
    122             server.serve();
    123         }catch(Exception e){
    124             e.printStackTrace();
    125         }
    126     }
    127     
    128     public static void main(String args[]){
    129         //开启简单服务器
    130 //        Server.startSimpleServer();
    131         //开启多线程服务器
    132 //        Server.startThreadPoolServer();
    133 //        Server.startTNonblockingServer();
    134 //        Server.startTHsHaServer();
    135         Server.startTNonblockingServer();
    136     }
    137 }
    复制代码

    Server.java实现了简单服务器(阻塞单线程)   阻塞多线程   非阻塞   半同步半异步非阻塞

    注意: 非阻塞时传输层需要选择TFramedTransport           

    客户端 Client.java

    复制代码
      1 package org.seava.thrift_example.thrift;
      2 
      3 import java.util.List;
      4 import java.util.Map;
      5 import java.util.concurrent.CountDownLatch;
      6 import java.util.concurrent.TimeUnit;
      7 
      8 import org.apache.thrift.async.TAsyncClientManager;
      9 import org.apache.thrift.protocol.TBinaryProtocol;
     10 import org.apache.thrift.protocol.TCompactProtocol;
     11 import org.apache.thrift.protocol.TProtocol;
     12 import org.apache.thrift.protocol.TProtocolFactory;
     13 import org.apache.thrift.transport.TFramedTransport;
     14 import org.apache.thrift.transport.TNonblockingSocket;
     15 import org.apache.thrift.transport.TNonblockingTransport;
     16 import org.apache.thrift.transport.TSocket;
     17 import org.apache.thrift.transport.TTransport;
     18 
     19 
     20 public class Client implements Runnable {
     21 
     22     public static String ip = "localhost";
     23     public static int port = 8090;
     24     public static int time_out = 30000;
     25     
     26     /**
     27      * 客户端设置
     28      * 创建Transport
     29      * 创建TProtocol
     30      * 基于TTransport和TProtocol创建Client
     31      * 调用Client的相应方法
     32      */
     33     public static void startSimpleClient(){
     34         TTransport transport = null;
     35         try{
     36             //创建Transport
     37             transport = new TSocket(ip, port, time_out);
     38             //创建TProtocol
     39             TProtocol protocol = new TBinaryProtocol(transport);
     40             //基于TTransport和TProtocol创建Client
     41             ThriftService.Client client = new ThriftService.Client(protocol);
     42             transport.open();
     43             //调用client方法
     44             List<User> list = client.queryUserList();
     45             for(User user : list){
     46                 System.out.println(user.userId + " " + user.username + " " + user.password);
     47             }
     48             Map<String, String> map = client.queryUserNamePass();
     49             System.out.println(map);
     50             User user = client.queryUser(10);
     51             System.out.println(user.userId + " " + user.username + " " + user.password);
     52             Map<Integer, User> map_u = client.queryUserMap();
     53             System.out.println(map_u);
     54             User uu = new User();
     55             uu.userId = 1111;
     56             uu.username = "mmbbmmbb";
     57             uu.password = "ppbbppbb";
     58             client.addUser(uu);
     59         }catch(Exception e){
     60             e.printStackTrace();
     61         }
     62     }
     63     
     64     /**
     65      * 调用阻塞服务器的客户端
     66      */
     67     public static void startNonblockingClient(){
     68         TTransport transport = null;
     69         try{
     70             transport = new TFramedTransport(new TSocket(ip, port));
     71             TCompactProtocol protocol = new TCompactProtocol(transport);
     72             ThriftService.Client client = new ThriftService.Client(protocol);
     73             transport.open();
     74             //调用client方法
     75             List<User> list = client.queryUserList();
     76             for(User user : list){
     77                 System.out.println(user.userId + " " + user.username + " " + user.password);
     78             }
     79             Map<String, String> map = client.queryUserNamePass();
     80             System.out.println(map);
     81             User user = client.queryUser(10);
     82             System.out.println(user.userId + " " + user.username + " " + user.password);
     83             Map<Integer, User> map_u = client.queryUserMap();
     84             System.out.println(map_u);
     85             User uu = new User();
     86             uu.userId = 1111;
     87             uu.username = "mmbbmmbb";
     88             uu.password = "ppbbppbb";
     89             client.addUser(uu);
     90         }catch(Exception e){
     91             e.printStackTrace();
     92         }
     93     }
     94     
     95     public static void startAsynClient(){
     96         try{
     97             TAsyncClientManager clientManager = new TAsyncClientManager();
     98             TNonblockingTransport transport = new TNonblockingSocket(ip, port, time_out);
     99             TProtocolFactory tprotocol = new TCompactProtocol.Factory();
    100             ThriftService.AsyncClient asyncClient = new ThriftService.AsyncClient(tprotocol, clientManager, transport);
    101             System.out.println("Client start ...");
    102             CountDownLatch latch = new CountDownLatch(1);
    103             AsynCallback callBack = new AsynCallback(latch);
    104             System.out.println("call method queryUser start ...");
    105             asyncClient.queryUser(100, callBack);
    106             System.out.println("call method queryUser end");
    107             boolean wait = latch.await(30, TimeUnit.SECONDS);
    108             System.out.println("latch.await =:" + wait);
    109         }catch(Exception e){
    110             e.printStackTrace();
    111         }
    112     }
    113     
    114     public void run(){
    115         Client.startSimpleClient();
    116     }
    117     
    118     public static void main(String args[]){
    119         //调用简单服务器 
    120 //        Client.startSimpleClient();
    121         /*Client c1 = new Client();
    122         Client c2 = new Client();
    123         
    124         new Thread(c1).start();
    125         new Thread(c2).start();*/
    126         
    127 //        Client.startNonblockingClient();
    128 //        Client.startNonblockingClient();
    129         Client.startAsynClient();
    130     }
    131 }
    复制代码

    客户端实现了 阻塞单线程  和 异步客户端

    具体代码在github上: https://github.com/WaterHsu/thrift-example.git

     
     
    标签: thriftjava
  • 相关阅读:
    204. Count Primes (Integer)
    203. Remove Linked List Elements (List)
    202. Happy Number (INT)
    201. Bitwise AND of Numbers Range (Bit)
    200. Number of Islands (Graph)
    199. Binary Tree Right Side View (Tree, Stack)
    198. House Robber(Array; DP)
    191. Number of 1 Bits (Int; Bit)
    190. Reverse Bits (Int; Bit)
    189. Rotate Array(Array)
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4103492.html
Copyright © 2011-2022 走看看