zoukankan      html  css  js  c++  java
  • Kafka简介及使用PHP处理Kafka消息

    Kafka简介及使用PHP处理Kafka消息

    Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。

    Kafka的特点:

    • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
    • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
    • 支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。
    • 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
    • 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
    • 同时支持离线数据处理和实时数据处理。

    Kafka的架构:

    Kafka简介及使用PHP处理Kafka消息-kafka架构图

    Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。

    Kafka基本概念:

    • Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
    • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
    • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
    • Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
    • Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
    • Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。

    Kafka消息发送的流程:

    Kafka简介及使用PHP处理Kafka消息-Kafka消息发送

    下面是PHP生产、消费Kafka消息的例子(假设已经配置好Kafka):

    1.从zookeeper源码src/c/src安装zookeeper c client

    1.  
      cd zookeeper-3.4.8/src/c
    2.  
      ./configure
    3.  
      make && make install


    2.编译php libzookper扩展

    1.  
      git clone https://github.com/Timandes/libzookeeper.git
    2.  
      cd libzookeeper
    3.  
      phpize
    4.  
      ./configure--with-libzookeeper=/usr/local/bin/cli_mt
    5.  
      make && makeinstall

    3.编译php zookeeper扩展

    1.  
      git clone https://github.com/andreiz/php-zookeeper.git
    2.  
      cd php-zookeeper
    3.  
      phpize
    4.  
      ./configure
    5.  
      make && make install

    4.修改php.ini配置,添加libzookeeper和php-zookeeper扩展

    1.  
      extension=libzookeeper.so
    2.  
      extension=zookeeper.so

    PHP处理Kafka消息:

    1.启动zookeeper和kafka

    1.  
      ./bin/zookeeper-server-start.sh config/zookeeper.properties
    2.  
       
    3.  
      ./bin/kafka-server-start.sh config/server.properties

    2.创建由2个partition组成的、名为testtopic的topic

    kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor --partitions --topic testtopic


    3.composer安装nmred/kafka-php

    1 composer require "nmred/kafka-php"

    4.producer.php代码

    1.  
      <php 
    2.  
      require_once('./vendor/autoload.php'); 
    3.  
      $produce=/Kafka/Produce::getInstance('localhost:2181',3000); 
    4.  
      $produce->setRequireAck(-1); $topicName='testtopic';
    5.  
      //获取到topic下可用的partitions
    6.  
      $partitions=$produce->getAvailablePartitions($topicName);
    7.  
      $partitionCount=count($partitions); 
    8.  
      $count=1;//可以处理的消费者数量(可以理解为server数量)
    9.  
      while(true){    $message=json_encode(array('uid'=>$count,'age'=>$count%100,'datetime'=>date('Y-m-d H:i:s')));     
    10.  
      //发送消息到不同的partition   
    11.  
       $partitionId=$count%$partitionCount;    
    12.  
      $produce->setMessages('testtopic',$partitionId,array($message));   
    13.  
       $result=$produce->send();    
    14.  
      var_dump($result);     
    15.  
      $count++;   
    16.  
       echo"producer sleeping/n";   
    17.  
       sleep(1);
    18.  
      }

    5、consumer.php代码

    1.  
      <?php 
    2.  
      require_once('./vendor/autoload.php'); 
    3.  
      //获取需要处理的partitionId
    4.  
      $partitionId = isset($argv[1]) ? intval($argv[1]) :0; 
    5.  
      $consumer =/Kafka/Consumer::getInstance('localhost:2181'); 
    6.  
      $consumer->setGroup('test-consumer-group');
    7.  
      $consumer->setPartition('testtopic', $partitionId);
    8.  
      $consumer->setFromOffset(true);
    9.  
      $consumer->setMaxBytes(102400); 
    10.  
      while(true){    
    11.  
      $topic = $consumer->fetch();     
    12.  
      foreach ($topic as $topicName => $partition{        
    13.  
      foreach ($partition as $partId => $messageSet{            
    14.  
      foreach ($messageSet as $message){                
    15.  
      var_dump($message);           
    16.  
      }        
    17.  
      }    
    18.  
      }    
    19.  
      echo"consumer sleeping/n";   
    20.  
      sleep(1);
    21.  
      }

    6、在3个终端界面分别运行

    1.  
      php producer.php
    2.  
      php consumer.php
    3.  
      php consumer.php


    7、两个consumer脚本依次收到producer发送的消息

    php-kafka-consumer-output

  • 相关阅读:
    mongo admin 客户端管理工具安装
    kong API gateway
    安装 docker管理 工具 页面 portainer
    elcipse 安装lombok插件解决 @Slf4j 等找不到log变量问题
    cqrs案例
    你还不知道这四点Mysql调优的话,那就赶快学起来
    python中的类型提示(type hint)
    大厂面试最常被问的Redis问题合集
    nginx gzip json [2]
    nginx gzip json 配置「1」
  • 原文地址:https://www.cnblogs.com/brady-wang/p/10790303.html
Copyright © 2011-2022 走看看