zoukankan      html  css  js  c++  java
  • thrift之php,python使用TServerSocket并发 处理请求

    要求:

    不适用nginx+fastcgi情况下,分布式系统之间如果通讯,如果不阻塞,能并发处理请求

    环境:

    luman/laravel:5.5

    php:7.2

     thrift -version :Thrift version 0.11.0

    thrift文件模板:testServer.thrift

    namespace php Rpc.Test
    
    
    service Echop {
       string Echop(1: string str) ,
    }
    

      

    生成RPC文件:

    thrift -r --out ./app --gen php:server ./ThriftSource/testServer.thrift
    

      

    服务端的实现:

    安装第三方扩展包:

    composer require sunlong/thrift

    或者

    https://github.com/sunlongv5/thrift.git

    composer文件修改:

    "autoload": {
            "classmap": [
                "app/Rpc"
    
            ],
            "psr-4": {
                "Rpc\": "app/Rpc",
                "Services\": "app/services",
                "Thrift\": "vendor/sunlong/thrift/lib/php/lib/Thrift/"
            }
        },
    

      

      

    新建Sevice文件夹创建文件EchopServie.php  实现thrift的Echop方法

    <?php
    namespace Services;
    use RpcTestEchopIf;
    
    class EchopServie implements EchopIf{
        public function Echop($str){
            Log::info($str);
            sleep(5);
            return "RPC:".$str;
        }
    }
    

      

      

    创建文件appConsoleCommandsRpcServer.php

    此代码为thrift的server实现

    <?php
    
    namespace AppConsoleCommands;
    
    use IlluminateConsoleCommand;
    use ThriftExceptionTException;
    use ThriftFactoryTBinaryProtocolFactory;
    use ThriftFactoryTTransportFactory;
    use ThriftServerTServerSocket;
    use ThriftServerTSimpleServer;
    use ThriftServerTForkingServer;
    use ThriftTMultiplexedProcessor;
    use RpcTestEchopClient;
    use RpcTestEchopProcessor;
    use ThriftProtocolTBinaryProtocol;
    use ThriftTransportTBufferedTransport;
    
    
    class RpcServer extends Command{
        protected $signature = 'server:rpc';
    
        /**
         * 控制台命令说明。
         *
         * @var string
         */
        protected $description = 'rpc 服务';
    
        protected static $socketController;
    
    
        public function handle()
        {
            try {
    
                $handler = new ServicesEchopServie();
                $processor = new EchopProcessor($handler);
                // 将服务注册到TMultiplexedProcessor中
                $tFactory = new TTransportFactory();
                $pFactory = new TBinaryProtocolFactory(true, true);
                $multiplexedProcessor = new TMultiplexedProcessor();
                $multiplexedProcessor->registerProcessor("Echop", $processor);
                // 监听开始
                $transport = new TServerSocket('0.0.0.0', '9998');
                $server = new TForkingServer($processor, $transport, $tFactory, $tFactory, $pFactory, $pFactory);
                $server->serve();
            } catch (TException $te) {
                throw new Exception($te->getMessage());
            }
        }
    
    
    
    
    
    }
    RpcServer

    appConsoleKernel.php文件中添加

    protected $commands = [
            RpcServer::class,
        ];
    

      

    客户端实现:

    添加路由:

    $router->get('/rpc/test', ['uses' => 'Controller@test']);
    

      

    Controller文件新增test方法
    public function test(){
            try {
                ini_set('memory_limit', '1024M');
    //            $socket = new THttpClient('http://192.168.1.188', 5201, '/rpc/test2');
                $socket = new TSocket('192.168.1.188', '9998');
                $socket->setRecvTimeout(50000);
                $socket->setDebug(true);
                $transport = new TBufferedTransport($socket, 1024, 1024);
                $protocol = new TBinaryProtocol($transport);
                $client = new RpcTestEchopClient($protocol);
                $transport->open();
                $result = $client->Echop('hello world !');
                print_r($result);
                $transport->close();
            } catch (TException $tx) {
                print_r($tx->getMessage());
            }
        }
    

      

     启动服务端:

     /application/php7/bin/php  artisan server:rpc
    

      

    模拟请求:

    7878端口为nginx启动的客户端地址

    同时刷新三个页面,发现每个页面都是5秒返回的结果,没有阻塞

    采用python客户端测试:

    /application/python3.6.4/bin/pip3  install thrift

    生成pthon客户端thrift文件

     thrift -out .. --gen py ./ThriftSource/testPyServer.thrift

    编写python的客户端:

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol
    from Rpc.Python.Echop import  Client
    import threading
    
    __HOST = '192.168.1.188'
    __PORT = 9998
    
    def send(data):
    	tsocket = TSocket.TSocket(__HOST, __PORT)
    	transport = TTransport.TBufferedTransport(tsocket)
    	protocol = TBinaryProtocol.TBinaryProtocol(transport)
    	client = Client(protocol)
    	transport.open()
    	print(client.Echop(data))
    
    if __name__ == '__main__':
    	threadl = []
    	t1 = threading.Thread(target=send,args=('python001',)) 
    	t2 = threading.Thread(target=send,args=('python002',)) 
    	t3 = threading.Thread(target=send,args=('python003',))
    	threadl.append(t1)
    	threadl.append(t2)
    	threadl.append(t3)
    	for x in threadl:
    	    x.start()
    

      

    执行客户端

    python3 ./client.py
    

      

     结果同一时刻返回三条记录

  • 相关阅读:
    递归程序设计方法
    深入理解 Entity Framework
    面向对象设计的七大原则分析与实践
    JavaScript内置对象与原型继承
    设计模式之创建型(1)-简单工厂
    设计模式之创建型(2)-工厂方法模式
    设计模式之创建型(3)-抽象工厂模式
    设计模式之创建型(4)-建造者模式(Builder)
    设计模式之创建型(5)-单例模式(Singleton)
    设计模式之创建型(6)-原型模式(Prototype)
  • 原文地址:https://www.cnblogs.com/sunlong88/p/10702842.html
Copyright © 2011-2022 走看看