zoukankan      html  css  js  c++  java
  • pulsar学习笔记1:helloworld

    pulsar号称是下一代的消息系统,这二年风光无限,大有干掉kafka的势头,如果想快速体验下,可以按以下步骤在本地搭建一个单机版本:(mac环境+jdk8)

    一、 下载

    wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.3.2/apache-pulsar-2.3.2-bin.tar.gz
    

    目前最新版本是2.3.2 

    二、解压

    tar -zxvf apache-pulsar-2.3.2-bin.tar.gz
    

      

    三、单机模式启动

    cd apache-pulsar-2.3.2
    bin/pulsar standalone
    

      

    四、 测试收发消息

    pulsar自带的client工具,可以直接测试收发消息。

    收消息命令如下:

    bin/pulsar-client consume my-topic -s "first-subscription"
    

    表示从“my-topic”这个topic上消费消息,并且指定订阅名称为“first-subscription”

    发消息命令如下:

    bin/pulsar-client produce my-topic --messages "hello pulsar"
    

    表示发送消息到my-topic这个topic上。

    注:测试时,可以单独开2个terminal,先在其中1个运行consumer,然后在另1个运行produce

    附:如果希望写java代码实现收发消息,可参考https://github.com/yjmyzz/pulsar-sample

    五、 Function测试

    function是一个极有前途的功能,可以把一个topic中喷出的消息,实时接收并处理后,再把处理结果发到另一个topic,相当于轻量级的流式计算。

    ./examples目录下有一个api-examples.jar包,里面自带了一些Function示例。

    5.1 部署Function

    部署的过程,其实就是把带处理逻辑的jar包,放到集群上,命令如下:

    bin/pulsar-admin functions create 
    --jar examples/api-examples.jar 
    --className org.apache.pulsar.functions.api.examples.ExclamationFunction 
    --inputs persistent://public/default/exclamation-input 
    --output persistent://public/default/exclamation-output 
    --name exclamation
    

    大致是创建一个function,来源是examples/api-examples.jar这个文件,并指定了具体的类名(因为一个jar包中,可以写多个function,必须指定具体的className), 然后这个function的入参是exclamation-input这个topic,处理完的结果,将输出到exclamation-output,最后这个function在pulsar中的名字是exclamation - 注:如果上述命令执行失败,可以尝试把className,换成classname. (不同版本的pulsar这个参数的大小写略有不同)

    附:ExclamationFunction的java源码如下,逻辑很简单,只是在输入参数后加一个!

    package org.apache.pulsar.functions.api.examples;
    
    import java.util.function.Function;
    
    public class ExclamationFunction implements Function<String, String> {
        @Override
        public String apply(String input) {
            return String.format("%s!", input);
        }
    }
    

      

    5.2 查看已部署的function列表

    bin/pulsar-admin functions list 
    --tenant public 
    --namespace default
    

    5.3 启动消费者,查看实时处理结果   

    bin/pulsar-client consume persistent://public/default/exclamation-output 
    --subscription-name my-subscription 
    --num-messages 0
    

      

    5.4 启动生产者,产生实时处理所需的素材

    bin/pulsar-client produce persistent://public/default/exclamation-input 
    --num-produce 1 
    --messages "Hello world"
    

      

    参考文章:

    http://pulsar.apache.org/docs/en/functions-quickstart/

  • 相关阅读:
    聊聊算法——回溯算法
    Redis高级用法
    聊聊算法——BFS和DFS
    这就是Java代码生成器的制作流程
    Spring Boot 2 实战:常用读取配置的方式
    Spring Security 实战干货:图解Spring Security中的Servlet过滤器体系
    想做时间管理大师?你可以试试Mybatis Plus代码生成器
    Maven中央仓库正式成为Oracle官方JDBC驱动程序组件分发中心
    作为一个Java开发你用过Jib吗
    使用反应式关系数据库连接规范R2DBC操作MySQL数据库
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/pulsar-helloworld.html
Copyright © 2011-2022 走看看