zoukankan      html  css  js  c++  java
  • 部分转 php kafka

    Step 1: 下载Kafka

    (官网地址:http://kafka.apache.org

    Kafka入门经典教程

    http://www.aboutyun.com/thread-12882-1-1.html

    http://czj4451.iteye.com/blog/2041096

    php kafka:

    http://www.alliedjeep.com/18625.htm

    https://github.com/EVODelavega/phpkafka

    php kafka 例子:

    https://github.com/njczy2010/phpkafka

    二、环境搭建


    Step 1: 下载Kafka

    (官网地址:http://kafka.apache.org

    点击下载最新的版本并解压.

    1. > tar -xzf kafka_2.9.2-0.8.1.1.tgz
    2. > cd kafka_2.9.2-0.8.1.1
    复制代码




    Step 2: 启动服务

    Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

    1. > bin/zookeeper-server-start.sh config/zookeeper.properties &
    2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    3. ...
    复制代码



    现在启动Kafka:

    1. > bin/kafka-server-start.sh config/server.properties
    2. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    4. ...
    复制代码



    Step 3: 创建 topic

    创建一个叫做“test”的topic,它只有一个分区,一个副本。

    1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    复制代码



    可以通过list命令查看创建的topic:

    1. > bin/kafka-topics.sh --list --zookeeper localhost:2181
    2. test
    复制代码



    除了手动创建topic,还可以配置broker让它自动创建topic.

    Step 4:发送消息.

    Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

    运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

    1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    2. This is a messageThis is another message
    复制代码



    ctrl+c可以退出发送。

    Step 5: 启动consumer

    Kafka also has a command line consumer that will dump out messages to standard output.
    Kafka也有一个命令行consumer可以读取消息并输出到标准输出:

    1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    2. This is a message
    3. This is another message
    复制代码



    你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
    这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

    php kafka:

    http://www.alliedjeep.com/18625.htm

    1、安装librdkafka

    cd /usr/local/src #进入安装包存放目录

    wget https://github.com/edenhill/librdkafka/archive/master.zip #下载

    mv master.zip librdkafka-master.zip #修改包名

    unzip librdkafka-master.zip #解压

    cd librdkafka-master #进入安装文件夹

    ./configure #配置

    make #编译

    make install #安装

    2、安装phpkafka

    cd /usr/local/src #进入安装包存放目录

    wget https://github.com/EVODelavega/phpkafka/archive/master.zip #下载

    mv master.zip phpkafka-master.zip #修改包名

    unzip phpkafka-master.zip #解压

    cd phpkafka-master #进入安装文件夹

    /usr/local/php/bin/phpize #加载php扩展模块

    ./configure --enable-kafka --with-php-config=/usr/local/php/bin/php-config #配置

    make #编译

    make install #安装

    3、修改php配置文件

    vi /usr/local/php/etc/php.ini #打开php配置文件,在最后一行添加下面的代码

    extension="kafka.so"

    :wq! #保存退出

    https://github.com/EVODelavega/phpkafka

    Requirements:

    Download and install librdkafka. Run sudo ldconfig to update shared libraries.

    Installing PHP extension:

    1 phpize
    2 ./configure --enable-kafka
    3 make
    4 sudo make install
    5 sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini'
    6 #For CLI mode:
    7 sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini'

    Examples:

     1 // Produce a message
     2 $kafka = new Kafka("localhost:9092");
     3 $kafka->produce("topic_name", "message content");
     4 //get all the available partitions
     5 $partitions = $kafka->getPartitionsForTopic('topic_name');
     6 //use it to OPTIONALLY specify a partition to consume from
     7 //if not, consuming IS slower. To set the partition:
     8 $kafka->setPartition($partitions[0]);//set to first partition
     9 //then consume, for example, starting with the first offset, consume 20 messages
    10 $msg = $kafka->consume("topic_name", Kafka::OFFSET_BEGIN, 20);
    11 var_dump($msg);//dumps array of messages

    php kafka 例子:

    https://github.com/njczy2010/phpkafka

     1 <?php
     2 $root = dirname(__file__);
     3 require_once("$root/config.php");
     4 $kafka = new Kafka("localhost:9092");
     5 function create_message() {    
     6     $num = mt_rand(0,9);
     7     $host = $GLOBALS["G_CONFIG"]["host"]["$num"];
     8     $client_ip = "" . mt_rand(0,255) . "." . mt_rand(0,255) . "." . mt_rand(0,255) . "." . mt_rand(0,255);
     9     $num2 = mt_rand(0,7);
    10     $method = $GLOBALS["G_CONFIG"]["method"]["$num2"];
    11     $ua = $GLOBALS["G_CONFIG"]["ua"]["0"];
    12     $refer = $GLOBALS["G_CONFIG"]["refer"]["0"];
    13     $product = "/" . md5(mt_rand() );
    14     $message = array (
    15         "host" => $host,
    16         "client_ip" => $client_ip,
    17         "method" => $method,
    18         "product" => $product,
    19         "ua" => $ua,
    20         "product" => $product,
    21     );
    22     $json = json_encode($message);
    23     print_r($json);
    24     echo "
    ";
    25     return $json;
    26 }
    27 while(1) {
    28     $json = create_message();
    29     $kafka->produce("test", $json);
    30     //get all the available partitions
    31     $partitions = $kafka->getPartitionsForTopic('test');
    32 //use it to OPTIONALLY specify a partition to consume from
    33 //if not, consuming IS slower. To set the partition:
    34     $kafka->setPartition($partitions[0]);//set to first partition
    35 //then consume, for example, starting with the first offset, consume 20 messages
    36     $msg = $kafka->consume("test", Kafka::OFFSET_BEGIN, 20);
    37     usleep(300);
    38 }
    39 //var_dump($msg);//dumps array of messages
    40 ?>
  • 相关阅读:
    vue证明题二,让vue跑起来
    vue证明题一,vue全家桶的构成
    Vue证明题
    layDay日期格式不合法报错解决
    【ASP.NET Core】运行原理(3):认证
    【ASP.NET Core】运行原理(2):启动WebHost
    【ASP.NET Core】运行原理(1):创建WebHost
    [Open Source] .NET 基于StackExchange.Redis的扩展
    [高并发]EntityFramework之高性能扩展
    [Tool] Git 使用 与 Git Flow
  • 原文地址:https://www.cnblogs.com/njczy2010/p/5711729.html
Copyright © 2011-2022 走看看