zoukankan      html  css  js  c++  java
  • kafka安装及Kafka-PHP扩展的使用

    话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。

    实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。

    一. 首先确认下jdk有没有安装

    使用命令

    [root@localhost ~]# java -version
    java version "1.8.0_73"
    Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
    Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)

    如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

    http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

    到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。

    然后打开/etc/profile文件

    [root@localhost ~]# vim /etc/profile

    把下面这段代码写到文件里

    export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73
    export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
    export PATH=$JAVA_HOME/bin:$PATH

    最后

    [root@localhost ~]# source /etc/profile

    这时jdk就生效了,可以使用 java -version验证下。

    二. 接下来安装Kafka

    1. 下载Kafka

    到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz

    2. 下载完解压到你喜欢的目录

    我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

    3. 运行默认的Kafka

    启动Zookeeper server

    [root@localhost kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &

    启动Kafka server

    [root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &

    运行生产者producer

    [root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    运行消费者consumer

    [root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    这样,在producer那边输入内容,consumer马上就能接收到。

    4. 当有跨机的producer或consumer连接时

    需要配置config/server.properties的host.name,要不然跨机的连不上。

    三. Kafka-PHP扩展

    使用了一圈,就https://github.com/nmred/kafka-php可以用。

    我是使用composer安装的,以下是示例:

    producer.php

    <?php
    require 'vendor/autoload.php';
    
    while (1) {
        $part = mt_rand(0, 1);
        $produce = KafkaProduce::getInstance('kafka0:2181', 3000);
        // get available partitions
        $partitions = $produce->getAvailablePartitions('topic_name');
        var_dump($partitions);
        // send message
        $produce->setRequireAck(-1);
        $produce->setMessages('topic_name', 0, array(date('Y-m-d H:i:s'));
       
        sleep(3);
    }

    consumer.php

    require 'vendor/autoload.php';
    
    $consumer = KafkaConsumer::getInstance('kafka0:2181');
    $group = 'topic_name';
    $consumer->setGroup($group);
    $consumer->setFromOffset(true);
    $consumer->setTopic('topic_name', 0);
    $consumer->setMaxBytes(102400);
    $result = $consumer->fetch();
    print_r($result);
    foreach ($result as $topicName => $partition) {
        foreach ($partition as $partId => $messageSet) {
        var_dump($partition->getHighOffset());
            foreach ($messageSet as $message) {
                var_dump((string)$message);
            }
        var_dump($partition->getMessageOffset());
        }
    }
  • 相关阅读:
    ural(Timus) 1019 Line Painting
    ACMICPC Live Archive 2031 Dance Dance Revolution
    poj 3321 Apple Tree
    其他OJ 树型DP 选课
    poj 3548 Restoring the digits
    ACMICPC Live Archive 3031 Cable TV Network
    递归循环获取指定节点下面的所有子节点
    手动触发asp.net页面验证控件事件
    子级Repeater获取父级Repeater绑定项的值
    没有列名的数据绑定
  • 原文地址:https://www.cnblogs.com/imarno/p/5198940.html
Copyright © 2011-2022 走看看