zoukankan      html  css  js  c++  java
  • 进程之间究竟有哪些通信方式

     需要特殊指出的是消息队列、信号量和共享内存这三种IPC同属于XSI IPC(XSI可以认为是POSIX标准的超集,简单粗暴理解为C++之于C)。这三种IPC在*NIX中一般都有两个“名字”来为其命名,一个叫做标志符,一个叫做键(key)。标志符是一个非负整数,每当一个IPC结构被创建然后又被销毁后,标志符便会+1一直加到整数的最大整数数值,而后又从0开始重新计算。既然是为了多进程通信使用,那么多进程在使用XSI IPC的时候就需要使用一个名字来找到相应的IPC,然后才能对其进行读写(术语叫做多个进程在同一个IPC结构上汇聚),所以POSIX建议是无论何时创建一个IPC结构,都应指定一个键(key)与之关联。一句话总结就是:标志符是XSI IPC的内部名称,键(key)是XSI IPC的外部名称。

    使多个进程在XSI IPC上汇聚的方法大概有如下三种:

    • 使用指定键IPC_PRIVATE来创建一个IPC结构,然后将返回的标志符保存到一个文件中,然后进程之间通过读取这个文件中的标志符进行通信。使用公共的头文件。这么做的缺点是多了IO操作。
    • 将共同认同的键写入到公共头文件中。这么做的缺点这个键可能已经与一个IPCi结构关联,这样在使用这个键创建结构的时候就可能会出错,然后必须删除已有的IPC结构再重新创建。
    • 认同一个文件路径名和项目ID,然后使用ftok将这两个参数转换成一个键。这将是我们使用的方式。

    XSI IPC结构有一个与之对应的权限结构,叫做ipc_perm,这个结构中定义了IPC结构的创建者、拥有者等

    1、管道

    我们来看一条 Linux 的语句

     netstat -tulnp | grep 8080

    学过 Linux 命名的估计都懂这条语句的含义,其中”|“是管道的意思,它的作用就是把前一条命令的输出作为后一条命令的输入。在这里就是把 netstat -tulnp 的输出结果作为 grep 8080 这条命令的输入。如果两个进程要进行通信的话,就可以用这种管道来进行通信了,并且我们可以知道这条竖线是没有名字的,所以我们把这种通信方式称之为匿名管道。(只能用于父进程和和其子进程之间的通信)

    并且这种通信方式是单向的,只能把第一个命令的输出作为第二个命令的输入,如果进程之间想要互相通信的话,那么需要创建两个管道。

    居然有匿名管道,那也意味着有命名管道,下面我们来创建一个命名管道

     mkfifo  test

    这条命令创建了一个名字为 test 的命名管道。

    接下来我们用一个进程向这个管道里面写数据,然后有另外一个进程把里面的数据读出来。

     echo "this is a pipe" > test   // 写数据

    这个时候管道的内容没有被读出的话,那么这个命令就会一直停在这里,只有当另外一个进程把 test 里面的内容读出来的时候这条命令才会结束。接下来我们用另外一个进程来读取

     cat < test  // 读数据

    我们可以看到,test 里面的数据被读取出来了。上一条命令也执行结束了。

    从上面的例子可以看出,管道的通知机制类似于缓存,就像一个进程把数据放在某个缓存区域,然后等着另外一个进程去拿,并且是管道是单向传输的。

    这种通信方式有什么缺点呢?显然,这种通信方式效率低下,你看,a 进程给 b 进程传输数据,只能等待 b 进程取了数据之后 a 进程才能返回。

    所以管道不适合频繁通信的进程。当然,他也有它的优点,例如比较简单,能够保证我们的数据已经真的被其他进程拿走了。我们平时用 Linux 的时候,也算是经常用。

    管道是*NIX上常见的一个东西,大家平时使用linux的时候也都在用,简单理解就是|,比如ps -aux|grep php这就是管道,大概意思类似于ps进程和grep进程两个进程之间用|完成了通信。管道是一种半双工(现在也有系统已经支持全双工的管道)的工作方式,也就是说数据只能沿着管道的一个方向进行传递,不可以在同一个管道上反向传数据。管道分为两种,一种叫做未命名的管道,另一种叫做命名管道,未命名管道只能在拥有公共祖先的两个进程之间使用,简单理解就是只能用于父进程和和其子进程之间的通信,但是命名管道则可以用于任何两个毫无关连的进程之间的通信(一会儿将要演示的将是这种命名管道)。

    示例1:

    <?php
    // 管道文件绝对路径
    $pipe_file = __DIR__.DIRECTORY_SEPARATOR.'test.pipe';
    // 如果这个文件存在,那么使用posix_mkfifo()的时候是返回false,否则,成功返回true
    if( !file_exists( $pipe_file ) ){
        if( !posix_mkfifo( $pipe_file, 0666 ) ){
            exit( 'create pipe error.'.PHP_EOL );
        }
    }
    // fork出一个子进程
    $pid = pcntl_fork();
    if( $pid < 0 ){
        exit( 'fork error'.PHP_EOL );
    } else if( 0 == $pid ) {
        // 在子进程中
        // 打开命名管道,并写入一段文本
        $file = fopen( $pipe_file, "w" );
        fwrite( $file, "helo world." );
        exit;
    } else if( $pid > 0 ) {
        // 在父进程中
        // 打开命名管道,然后读取文本
        $file = fopen( $pipe_file, "r" );
        // 注意此处fread会被阻塞
        $content = fread( $file, 1024 );
        echo $content.PHP_EOL;
        // 注意此处再次阻塞,等待回收子进程,避免僵尸进程
        pcntl_wait( $status );
    }

    运行结果:helo world.

    示例2:

    <?php
    class Pipe
    {
        public  $fifoPath;
        private $w_pipe;
        private $r_pipe;
     
        /**
         * 自动创建一个管道
         *
         * @param string $name 管道名字
         * @param int $mode  管道的权限,默认任何用户组可以读写
         */
        function __construct($name = 'pipe', $mode = 0666)
        {
            $fifoPath = "/tmp/$name." . posix_getpid();
            if (!file_exists($fifoPath)) {
                if (!posix_mkfifo($fifoPath, $mode)) {
                    error("create new pipe ($name) error.");
                    return false;
                }
            } else {
                error( "pipe ($name) has exit.");
                return false;
            }
            $this->fifoPath = $fifoPath;
        }
     
    ///////////////////////////////////////////////////
    //  写管道函数开始
    ///////////////////////////////////////////////////
        function open_write()
        {
            $this->w_pipe = fopen($this->fifoPath, 'w');
            if ($this->w_pipe == NULL) {
                error("open pipe {$this->fifoPath} for write error.");
                return false;
            }
            return true;
        }
     
        function write($data)
        {
            return fwrite($this->w_pipe, $data);
        }
     
        function write_all($data)
        {
            $w_pipe = fopen($this->fifoPath, 'w');
            fwrite($w_pipe, $data);
            fclose($w_pipe);
        }
     
        function close_write()
        {
            return fclose($this->w_pipe);
        }
    /////////////////////////////////////////////////////////
    /// 读管道相关函数开始
    ////////////////////////////////////////////////////////
        function open_read()
        {
            $this->r_pipe = fopen($this->fifoPath, 'r');
            if ($this->r_pipe == NULL) {
                error("open pipe {$this->fifoPath} for read error.");
                return false;
            }
            return true;
        }
     
        function read($byte = 1024)
        {
            return fread($this->r_pipe, $byte);
        }
     
        function read_all()
        {
            $r_pipe = fopen($this->fifoPath, 'r');
            $data = '';
            while (!feof($r_pipe)) {
                //echo "read one K
    ";
                $data .= fread($r_pipe, 1024);
            }
            fclose($r_pipe);
            return $data;
        }
     
        function close_read()
        {
            return fclose($this->r_pipe);
        }
    ////////////////////////////////////////////////////
        /**
         * 删除管道
         *
         * @return boolean is success
         */
        function rm_pipe()
        {
            return unlink($this->fifoPath);
        }
    }
    ?>

    2、消息队列

    那我们能不能把进程的数据放在某个内存之后就马上让进程返回呢?无需等待其他进程来取就返回呢?

    答是可以的,我们可以用消息队列的通信模式来解决这个问题,例如 a 进程要给 b 进程发送消息,只需要把消息放在对应的消息队列里就行了,b 进程需要的时候再去对应的
    消息队列里取出来。同理,b 进程要个 a 进程发送消息也是一样。这种通信方式也类似于缓存吧。

    这种通信方式有缺点吗?答是有的,如果 a 进程发送的数据占的内存比较大,并且两个进程之间的通信特别频繁的话,消息队列模型就不大适合了。因为 a 发送的数据很大的话,意味发送消息(拷贝)这个过程需要花很多时间来读内存。

    哪有没有什么解决方案呢?答是有的,请继续往下看。

    示例:1

    <?php
    /**
     * 这段代码模拟了一个日常的任务。
     * 第一个父进程产生了一个子进程。子进程又作为父进程,产生10个子进程。
     * 可以简化为A -> B -> c,d,e... 等进程。
     * 作为A来说,只需要生产任务,然后交给B 来处理。B 则会将任务分配给10个子进程来进行处理。
     */
    //设定脚本永不超时
    set_time_limit(0);
    $ftok = ftok(__FILE__, 'a');
    $msg_queue = msg_get_queue($ftok);
    $pidarr = [];
    //产生子进程
    $pid = pcntl_fork();
    if ($pid) {
        //父进程模拟生成一个特大的数组。
        $arr = range(1,100000);
        //将任务放进队里,让多个子进程并行处理
        foreach ($arr as $val) {
            $status = msg_send($msg_queue,1, $val);
            usleep(1000);
        }
        $pidarr[] = $pid;
        msg_remove_queue($msg_queue);
    } else {
        //子进程收到任务后,fork10个子进程来处理任务。
        for ($i =0; $i<10; $i++) {
            $childpid = pcntl_fork();
            if ($childpid) {
                $pidarr[] = $childpid; //收集子进程processid
            } else {
                while (true) {
                    msg_receive($msg_queue, 0, $msg_type, 1024, $message);
                    if (!$message) exit(0);
                    echo $message.PHP_EOL;
                    usleep(1000);
                }
            }
        }
    }
    //防止主进程先于子进程退出,形成僵尸进程
    while (count($pidarr) > 0) {
        foreach ($pidarr as $key => $pid) {
            $status = pcntl_waitpid($pid, $status);
            if ($status == -1 || $status > 0) {
                unset($pidarr[$key]);
            }
        }
        sleep(1);
    }

     示例2:

    <?php
    // 使用ftok创建一个键名,注意这个函数的第二个参数“需要一个字符的字符串”
    $key = ftok( __DIR__, 'a' );
    // 然后使用msg_get_queue创建一个消息队列
    $queue = msg_get_queue( $key, 0666 );
    // 使用msg_stat_queue函数可以查看这个消息队列的信息,而使用msg_set_queue函数则可以修改这些信息
    //var_dump( msg_stat_queue( $queue ) );
    // fork进程
    $pid = pcntl_fork();
    if( $pid < 0 ){
        exit( 'fork error'.PHP_EOL );
    } else if( $pid > 0 ) {
        // 在父进程中
        // 使用msg_receive()函数获取消息
        msg_receive( $queue, 0, $msgtype, 1024, $message );
        echo $message.PHP_EOL;
        // 用完了记得清理删除消息队列
        msg_remove_queue( $queue );
        pcntl_wait($status);
    } else if( 0 == $pid ) {
        // 在子进程中
        // 向消息队列中写入消息
        // 使用msg_send()向消息队列中写入消息,具体可以参考文档内容
        msg_send( $queue, 1, "helloword" );
        exit;
    }

     示例3:http://www.php20.cn/article/137

    3、共享内存

    共享内存这个通信方式就可以很好着解决拷贝所消耗的时间了。

    这个可能有人会问了,每个进程不是有自己的独立内存吗?两个进程怎么就可以共享一块内存了?

    我们都知道,系统加载一个进程的时候,分配给进程的内存并不是实际物理内存,而是虚拟内存空间。那么我们可以让两个进程各自拿出一块虚拟地址空间来,然后映射到相同的物理内存中,这样,两个进程虽然有着独立的虚拟内存空间,但有一部分却是映射到相同的物理内存,这就完成了内存共享机制了。

    4、信号量

    共享内存最大的问题是什么?没错,就是多进程竞争内存的问题,就像类似于我们平时说的线程安全问题。如何解决这个问题?这个时候我们的信号量就上场了。

    信号量的本质就是一个计数器,用来实现进程之间的互斥与同步。例如信号量的初始值是 1,然后 a 进程来访问内存1的时候,我们就把信号量的值设为 0,然后进程b 也要来访问内存1的时候,看到信号量的值为 0 就知道已经有进程在访问内存1了,这个时候进程 b 就会访问不了内存1。所以说,信号量也是进程之间的一种通信方式。

     示例:http://www.php20.cn/article/134

    共享内存 和 信号量

    信号量与共享内存。共享内存是最快是进程间通信方式,因为n个进程之间并不需要数据复制,而是直接操控同一份数据。实际上信号量和共享内存是分不开的,要用也是搭配着用。*NIX的一些书籍中甚至不建议新手轻易使用这种进程间通信的方式,因为这是一种极易产生死锁的解决方案。共享内存顾名思义,就是一坨内存中的区域,可以让多个进程进行读写。这里最大的问题就在于数据同步的问题,比如一个在更改数据的时候,另一个进程不可以读,不然就会产生问题。所以为了解决这个问题才引入了信号量,信号量是一个计数器,是配合共享内存使用的,一般情况下流程如下:

    • 当前进程获取将使用的共享内存的信号量
    • 如果信号量大于0,那么就表示这块儿共享资源可以使用,然后进程将信号量减1
    • 如果信号量为0,则进程进入休眠状态一直到信号量大于0,进程唤醒开始从1

    一个进程不再使用当前共享资源情况下,就会将信号量减1。这个地方,信号量的检测并且减1是原子性的,也就说两个操作必须一起成功,这是由系统内核来实现的。

    在php中,信号量和共享内存先后一共也就这几个函数:

    shm 系列函数

        // 申请共享内存,返回共享内存资源号
        $shm_id = shm_attach($key, 1024, 066);
        // 设置共享内存的值,可以理解为:为某块共享内存设置一个 key - value 的键值对值
        $sha_key = 1;
        shm_put_var($shm_id, $sha_key, $var);
        // 获取共享内存值
        shm_get_var($shm_id, $sha_key);
        // 检测某个 key 是否存在
        shm_has_var($shm_id, $sha_key);
        // 删除一个 key
        shm_remove_var($shm_id, $sha_key);
        // 移除共享内存块
        shm_remove($shm_id);
        // 断开共享内存的链接
        shm_detach($shm_id)

    shmop系列函数

    /**
     * shmop_open(int $key , string $flags , int $mode , int $size)
     * $key 共享内存的key
     * $flags 的值有以下几种
     * a :  创建一个只读的共享内存区。
     * c :  如果共享内存区已存在,则打开该共享内存区,并尝试读写。否则新建共享内存区
     * w : 创建一个读写共享内存区
     * n :  创建一个共享内存区,如果已存在,则返回失败
     *
     * $mode 读写权限。如0755 0644 等
     * $size 申请共享内存区的大小
     */
    
    /**
     * shmop_read( resource $shmid , int $start , int $count)
     * 将从共享内存块中读取数据
     * $shmid 共享内存id,资源类型
     * $start 从共享内存的那个字节开始读起
     * $count 一次读取多少个字节。
     * 如果count值小于发送的信息长度,则信息会被截断。
     */
    
    /**
     * shmop_write(resource $shmid , string $data , int $offset)
     * 将数据写入共享内存块
     * $data 将要写入的数据
     * $offset 从共享内存块的那个位置开始写入。
     * 该函数的返回值是写入数据的长度。
     */
    
    /**
     * shmop_size(resource $shmid);
     * 返回当前共享内存块,已经使用的大小
     */
    
    
    /**
     * shmop_delete ( resource $shmid )
     * 删除一个共享内存块的,删除引用关系
     */
    
    /**
     * shmop_close ( resource $shmid )
     * 关闭共享内存块
     * 要先使用shmop_delete 之后才能继续使用shmop_close
     */

    示例1:

    <?php
    // sem key
    $sem_key = ftok( __FILE__, 'b' );
    $sem_id = sem_get( $sem_key );
    // shm key
    $shm_key = ftok( __FILE__, 'm' );
    $shm_id = shm_attach( $shm_key, 1024, 0666 );
    const SHM_VAR = 1;
    $child_pid = [];
    // fork 2 child process
    for( $i = 1; $i <= 2; $i++ ){
      $pid = pcntl_fork();
      if( $pid < 0 ){
        exit();
      } else if( 0 == $pid ) {
        // 获取锁
        sem_acquire( $sem_id );
        if( shm_has_var( $shm_id, SHM_VAR ) ){
          $counter = shm_get_var( $shm_id, SHM_VAR );
          $counter += 1;
          shm_put_var( $shm_id, SHM_VAR, $counter );
        } else {
          $counter = 1;
          shm_put_var( $shm_id, SHM_VAR, $counter );
        }
        // 释放锁,一定要记得释放,不然就一直会被阻锁死
        sem_release( $sem_id );
        exit;
      } else if( $pid > 0 ) {
        $child_pid[] = $pid;
      }
    }
    while( !empty( $child_pid ) ){
      foreach( $child_pid as $pid_key => $pid_item ){
        pcntl_waitpid( $pid_item, $status, WNOHANG );
        unset( $child_pid[ $pid_key ] );
      }
    }
    // 休眠2秒钟,2个子进程都执行完毕了
    sleep( 2 );
    echo '最终结果'.shm_get_var( $shm_id, SHM_VAR ).PHP_EOL;
    // 记得删除共享内存数据,删除共享内存是有顺序的,先remove后detach,顺序反过来php可能会报错
    shm_remove( $shm_id );
    shm_detach( $shm_id );

    5、Socket

    上面我们说的共享内存、管道、信号量、消息队列,他们都是多个进程在一台主机之间的通信,那两个相隔几千里的进程能够进行通信吗?

    答是必须的,这个时候 Socket 这家伙就派上用场了,例如我们平时通过浏览器发起一个 http 请求,然后服务器给你返回对应的数据,这种就是采用 Socket 的通信方式了。

    示例1:

    socket_server.php

    <?php
    $socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP) or die("error:".socket_strerror(socket_last_error()));
    socket_bind($socket,"192.168.1.104","8000") or die("error:".socket_strerror(socket_last_error()));
    socket_set_option($socket, SOL_SOCKET , SO_REUSEADDR , 1);#可以重复使用端口号
    socket_listen($socket,5);
    while(true){
        print("******等待新客户端的到来*******
    ");
        $clien = socket_accept($socket) or die("error:".socket_strerror(socket_last_error()));
        $pid = pcntl_fork();
        #posix_setsid();
        if ($pid == -1) {
            die('fork failed');
        } else if ($pid == 0) {
            hanld_seesion($clien);
        } else {
            #pcntl_wait($status);
        }
    }
    function hanld_seesion($clien){
        socket_getpeername($clien,$ip,$port);
        $id = posix_getpid();
        print("进程ID:$id == 客户端:".$ip.":".$port."已连接
    ");
        while(true){
            $data = socket_read($clien,1024);
            if(mb_strlen($data) == 0)
            {
                print("进程ID:$id == 客户端:".$ip.":".$port."断开连接
    ");
                socket_close($clien);
                exit();
            }
            print($ip.":".$port.">>:".$data."
    ");
        }
    }

    socket_client.php

    <?php
    $socket = socket_create(AF_INET,SOCK_STREAM,SOL_TCP) or die("error:".socket_strerror(socket_last_error()));
    $msg = "Ping!";
    $len = strlen($msg);
    $server = socket_connect($socket,'192.168.1.104', 8000);
    socket_write($socket, $msg);
    sleep(30);
    socket_close($socket);

    示例2:

    socket_server.php

    <?php
    class SphpSocket
    {
        private static $_instance;
        public $connect_list = array();//客户端列表
        public $connect_callback, $receive_callback, $close_callback;//回调函数
        public $server;//socket服务
        public $is_run=true;//是否运行
        public $config = array(//各种配置
            'debug'=>true,
            'host' => '0.0.0.0',
            'port' => '9501',
            'domain'   => AF_INET,
            'type'     => SOCK_STREAM,
            'protocol' => SOL_TCP,
            'accept' => 511,
            'option_level' => SOL_SOCKET,
            'optname'      => SO_REUSEADDR,
            'optval'       => 1,
            'read_length'=>1024,
            'read_type'=>PHP_NORMAL_READ
        );
        public $error_log=array();
        public static function getInstance($host, $port)
        {
            if (!(self::$_instance instanceof self)) {
                self::$_instance = new static($host, $port);
            }
            return self::$_instance;
        }
    
        public function __construct($host, $port)
        {
            $this->config['host'] = $host;
            $this->config['port'] = $port;
        }
        /**
         * 绑定事件
         * @param $type connect|receive|close
         * @param callable $function
         */
        public function on($type, callable $function)
        {
            switch (strtolower($type)) {
                case 'connect':
                    $this->connect_callback = $function;
                    break;
                case 'receive':
                    $this->receive_callback = $function;
                    break;
                case 'close':
                    $this->close_callback = $function;
                    break;
            }
            return $this;
        }
    
        public function onConnect($connection){
            if (is_callable($this->connect_callback)) {
                call_user_func($this->connect_callback,$connection);
            }
        }
        public function onReceive($connection,$data){
    
            if (is_callable($this->receive_callback)) {
                call_user_func($this->receive_callback,$connection,$data);
            }
        }
        public function onClose($connection){
            if (is_callable($this->close_callback)) {
                call_user_func($this->close_callback,$connection);
            }
        }
    
        /**
         *
         */
        public function start()
        {
            $this->createSocket();
            echo '创建socket成功!'.PHP_EOL;
            $this->bindSocket();
            echo '绑定端口成功!'.PHP_EOL;
            $this->listenSocket();
            echo '监听端口成功!'.PHP_EOL;
            $this->setOptionSocket();
            $this->acceptSocket();
            return $this;
        }
    
        /**
         * 创建socket
         * @return $this
         * @throws Exception
         */
        protected function createSocket()
        {
            $this->server = socket_create($this->config['domain'], $this->config['type'], $this->config['protocol']);
            if ($this->server === false) {
                throw new Exception('创建socket失败!');
            }
            return $this;
        }
    
        /**
         * 绑定端口
         * @return $this
         * @throws Exception
         */
        protected function bindSocket()
        {
            $this->server === false && $this->createSocket();
    
            $result = socket_bind($this->server, $this->config['host'], $this->config['port']);
            if ($result === false) {
                throw new Exception('绑定端口失败!');
            }
            return $this;
        }
    
        /**
         * 监听端口
         * @param null $accept
         * @return $this
         * @throws Exception
         */
        protected function listenSocket($accept = null)
        {
            $this->server === false && $this->createSocket();
            $accept || $accept = $this->config['accept'];
            $result = socket_listen($this->server, $accept);
            if ($result === false) {
                throw new Exception('监听端口失败!');
            }
            return $this;
        }
    
        /**
         * 配置socket
         * @return $this
         * @throws Exception
         */
        protected function setOptionSocket()
        {
            $this->server === false && $this->createSocket();
            $result = socket_set_option($this->server, $this->config['option_level'], $this->config['optname'], $this->config['optval']);
            if ($result === false) {
                throw new Exception('配置socket失败!');
            }
            return $this;
        }
    
        /**
         * 接收socket连接
         */
        protected function acceptSocket(){
            $this->server === false && $this->createSocket();
            while(true&&$this->is_run===true){
                $connection = socket_accept($this->server);
                if($connection===false){
    
                }else{
                    $this->addConnectionList($connection);
                    $this->onConnect($connection);
                    $this->forkProcess($connection);
                }
    
            }
        }
    
        /**
         * 写入客户端信息
         * @param $connection
         * @return $this
         */
        protected function addConnectionList($connection){
    //        $fd =
            $this->connect_list[(string)$connection]['fd']=$connection;
            return $this;
        }
    
        /**
         * 写入客户端进程id
         * @param $connection
         * @param $pid
         * @return $this
         */
        protected function addConnectionListProcess($connection,$pid){
            $this->connect_list[(string)$connection]['pid']=$pid;
            return $this;
        }
    
        /**
         * 派生进程处理
         * @param $connection
         */
        protected function forkProcess($connection){
    
            $pid = pcntl_fork();
            if($pid>0){//使用主进程处理客户端其他请求,子进程继续监听连接请求
                $this->addConnectionListProcess($connection,$pid);
                $this->readSocket($connection);
            }else{
            }
        }
    
        /**
         * 读取socket信息
         * @param $connection
         */
        protected function readSocket($connection){
            while(true&&isset($this->connect_list[(string)$connection])&&$this->is_run){
                $data = @socket_read($connection,$this->config['read_length'],$this->config['read_type']);
                if($data===false){
                    $this->close($connection);
                }else{
                    $this->onReceive($connection,$data);
                }
            }
        }
    
        /**
         * 发送消息给客户端
         * @param $connection
         * @param $msg
         * @return int
         */
        public function send($connection,$msg){
            $result = socket_write($connection, $msg,strlen($msg));
            return $result;
        }
    
        /**
         * 主动关闭客户端
         * @param $connection
         */
        public function close($connection){
            $this->onClose($connection);
            //先关掉子进程
            posix_kill($this->connect_list[(string)$connection]['pid'],SIGTERM);
            $result = socket_close($connection);
            unset($this->connect_list[(string)$connection]);
            return $result;
        }
    }

    socket_client.php

    <?php
    include 'server.php'; $socket
    = SphpSocket::getInstance('0.0.0.0',9501); $socket->on('connect',function ($connection)use($socket){ $socket->send($connection,'恭喜您连接成功!'); }); $socket->on('receive',function ($connection,$data)use($socket){ $result = $socket->send($connection,'您发送的消息是:'.$data); var_dump($data); if(trim($data)=='关闭'){ $socket->close($connection); } echo "发送消息成功"; }); $socket->on('close',function ($connection)use($socket){ var_dump($connection.'已经关闭连接'); }); $socket->start();

    命令行启动 socket_client.php  则启动了一个socket服务器

    telnet 127.0.0.1 9502 链接

     

    总结

    所以,进程之间的通信方式有:

    1、管道

    2、消息队列

    3、共享内存

    4、信号量

    5、Socket

    讲到这里也就完结了,之前我看进程之间的通信方式的时候,也算是死记硬背,并没有去理解他们之间的关系,优缺点,为什么会有这种通信方式。所以最近花点时间去研究了一下,

  • 相关阅读:
    网页美学设计原则(上)
    WF/WCF/WCS/WPF/MVC/AJAX
    CSS框架 960 grid system最新优化版使用指南
    自定义AuthorizeAttribute实现MVC权限设计
    企业网站,大气怎么设计
    博客网页配色表+流行色系
    iepngfix.htc和png8让IE6支持png背景透明
    网页美学设计原则(下)
    java中的环境变量设置及tomcat的设置
    注册时间
  • 原文地址:https://www.cnblogs.com/xiangshihua/p/13233649.html
Copyright © 2011-2022 走看看