要求:
不适用nginx+fastcgi情况下,分布式系统之间如果通讯,如果不阻塞,能并发处理请求
环境:
luman/laravel:5.5
php:7.2
thrift -version :Thrift version 0.11.0
thrift文件模板:testServer.thrift
namespace php Rpc.Test service Echop { string Echop(1: string str) , }
生成RPC文件:
thrift -r --out ./app --gen php:server ./ThriftSource/testServer.thrift
服务端的实现:
安装第三方扩展包:
composer require sunlong/thrift
或者
https://github.com/sunlongv5/thrift.git
composer文件修改:
"autoload": { "classmap": [ "app/Rpc" ], "psr-4": { "Rpc\": "app/Rpc", "Services\": "app/services", "Thrift\": "vendor/sunlong/thrift/lib/php/lib/Thrift/" } },
新建Sevice文件夹创建文件EchopServie.php 实现thrift的Echop方法
<?php namespace Services; use RpcTestEchopIf; class EchopServie implements EchopIf{ public function Echop($str){ Log::info($str); sleep(5); return "RPC:".$str; } }
创建文件appConsoleCommandsRpcServer.php
此代码为thrift的server实现

<?php namespace AppConsoleCommands; use IlluminateConsoleCommand; use ThriftExceptionTException; use ThriftFactoryTBinaryProtocolFactory; use ThriftFactoryTTransportFactory; use ThriftServerTServerSocket; use ThriftServerTSimpleServer; use ThriftServerTForkingServer; use ThriftTMultiplexedProcessor; use RpcTestEchopClient; use RpcTestEchopProcessor; use ThriftProtocolTBinaryProtocol; use ThriftTransportTBufferedTransport; class RpcServer extends Command{ protected $signature = 'server:rpc'; /** * 控制台命令说明。 * * @var string */ protected $description = 'rpc 服务'; protected static $socketController; public function handle() { try { $handler = new ServicesEchopServie(); $processor = new EchopProcessor($handler); // 将服务注册到TMultiplexedProcessor中 $tFactory = new TTransportFactory(); $pFactory = new TBinaryProtocolFactory(true, true); $multiplexedProcessor = new TMultiplexedProcessor(); $multiplexedProcessor->registerProcessor("Echop", $processor); // 监听开始 $transport = new TServerSocket('0.0.0.0', '9998'); $server = new TForkingServer($processor, $transport, $tFactory, $tFactory, $pFactory, $pFactory); $server->serve(); } catch (TException $te) { throw new Exception($te->getMessage()); } } }
appConsoleKernel.php文件中添加
protected $commands = [ RpcServer::class, ];
客户端实现:
添加路由:
$router->get('/rpc/test', ['uses' => 'Controller@test']);
Controller文件新增test方法
public function test(){ try { ini_set('memory_limit', '1024M'); // $socket = new THttpClient('http://192.168.1.188', 5201, '/rpc/test2'); $socket = new TSocket('192.168.1.188', '9998'); $socket->setRecvTimeout(50000); $socket->setDebug(true); $transport = new TBufferedTransport($socket, 1024, 1024); $protocol = new TBinaryProtocol($transport); $client = new RpcTestEchopClient($protocol); $transport->open(); $result = $client->Echop('hello world !'); print_r($result); $transport->close(); } catch (TException $tx) { print_r($tx->getMessage()); } }
启动服务端:
/application/php7/bin/php artisan server:rpc
模拟请求:
7878端口为nginx启动的客户端地址
同时刷新三个页面,发现每个页面都是5秒返回的结果,没有阻塞
采用python客户端测试:
/application/python3.6.4/bin/pip3 install thrift
生成pthon客户端thrift文件
thrift -out .. --gen py ./ThriftSource/testPyServer.thrift
编写python的客户端:
#! /usr/bin/env python # -*- coding: utf-8 -*- from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from Rpc.Python.Echop import Client import threading __HOST = '192.168.1.188' __PORT = 9998 def send(data): tsocket = TSocket.TSocket(__HOST, __PORT) transport = TTransport.TBufferedTransport(tsocket) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Client(protocol) transport.open() print(client.Echop(data)) if __name__ == '__main__': threadl = [] t1 = threading.Thread(target=send,args=('python001',)) t2 = threading.Thread(target=send,args=('python002',)) t3 = threading.Thread(target=send,args=('python003',)) threadl.append(t1) threadl.append(t2) threadl.append(t3) for x in threadl: x.start()
执行客户端
python3 ./client.py
结果同一时刻返回三条记录