zoukankan      html  css  js  c++  java
  • 【Samza系列】实时计算Samza中文教程(四)—API概述

        上一篇和大家一起宏观上学习了Samza平台的架构,重点讲了一下数据缓冲层和资源管理层。剩下的一块非常重要的SamzaAPI层本节作为重点为大家展开介绍。

        当你使用Samza来实现一个数据流处理逻辑时。你必须实现一个叫StreamTask的接口,例如以下所看到的:
    public class MyTaskClass implements StreamTask {
    
      public void process(IncomingMessageEnvelope envelope,
                          MessageCollector collector,
                          TaskCoordinator coordinator) {
        // process message
      }
    }
        当你执行你的job时,Samza将为你的class创建一些实例(可能在多台机器上)。这些任务实例会处理输入流里的消息。


        在你的job的配置中你能告诉Samza你想消费哪条数据流。

    举一个较为完整的样例(大家也能够參看http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration.html

    ):
    # This is the class above, which Samza will instantiate when the job is run
    task.class=com.example.samza.MyTaskClass
    
    # Define a system called "kafka" (you can give it any name, and you can define
    # multiple systems if you want to process messages from different sources)
    systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
    
    # The job consumes a topic called "PageViewEvent" from the "kafka" system
    task.inputs=kafka.PageViewEvent
    
    # Define a serializer/deserializer called "json" which parses JSON messages
    serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
    
    # Use the "json" serializer for messages in the "PageViewEvent" topic
    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
        对于Samza从任务的输入流利接收的每一条消息,处理逻辑都会被调用。它主要包括三个重要的信息:消息、关键词key以及消息来自的数据流:
    /** Every message that is delivered to a StreamTask is wrapped
     * in an IncomingMessageEnvelope, which contains metadata about
     * the origin of the message. */
    public class IncomingMessageEnvelope {
      /** A deserialized message. */
      Object getMessage() { ... }
    
      /** A deserialized key. */
      Object getKey() { ... }
    
      /** The stream and partition that this message came from. */
      SystemStreamPartition getSystemStreamPartition() { ... }
    }
        注意键和值都要被声明为对象,而且须要转化为正确的类型。假设你不配置一个serializer/deserializer。它们就会成为典型的java字节数组。一个deserializer可以转化这些字节到其它随意类型,举个样例来说j一个son deserializer可以将字节数组转化为Map、List以及字符串对象。

        SystemStreamPartition()这种方法会返回一个SystemStreamPartition对象,它会告诉你消息是从哪里来的。它由下面三部分组成:
        1. The system:系统的名字来源于消息。就在你job的配置里定义。你能够有多个用于输入和输出的不同名字的系统;
        2. The stream name: 在原系统里数据流(话题、队列)的名字。相同也是在job的配置里定义;
        3. The partition: 一条数据流一般会被划分到多个分区。而且每个分区会被Samza安排一个StreamTask实例;
        API看起来像是这种:
    /** A triple of system name, stream name and partition. */
    public class SystemStreamPartition extends SystemStream {
    
      /** The name of the system which provides this stream. It is
          defined in the Samza job's configuration. */
      public String getSystem() { ... }
    
      /** The name of the stream/topic/queue within the system. */
      public String getStream() { ... }
    
      /** The partition within the stream. */
      public Partition getPartition() { ... }
    }
        在上面这个job的配置样例里可以看到。这个系统名字叫“Kafka”。数据流的名字叫“PageViewEvent”。(kafka这个名字不是特定的——你能给你的系统取不论什么你想要的名字)。

    假设你有一些输入流向导入你的StreamTask,你可以使用SystemStreamPartition去决定你接受到哪一类消息。


        怎样发送消息呢?假设你看一下StreamTask里的process()方法,你将看到你有一个MessageCollector接口。
    /** When a task wishes to send a message, it uses this interface. */
    public interface MessageCollector {
      void send(OutgoingMessageEnvelope envelope);
    }
        为了发送一个消息, 你会创建一个OutgoingMessageEnvelop对象而且把它传递给消息收集器。它至少会确定你想要发送的消息、系统以及数据流名字再发送出去。你也能够确定分区的key和还有一些參数。详细能够參考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。

        注意事项:
        请仅仅在process()方法里使用MessageCollector对象。

    假设你保持住一个MessageCollector实例而且之后再次使用它,你的消息可能会错误地发送出去。举一个样例,这儿有一个简单的任务,它把每个输入的消息拆成单词,而且发送每个单词作为一个消息:

    public class SplitStringIntoWords implements StreamTask {
    
      // Send outgoing messages to a stream called "words"
      // in the "kafka" system.
      private final SystemStream OUTPUT_STREAM =
        new SystemStream("kafka", "words");
    
      public void process(IncomingMessageEnvelope envelope,
                          MessageCollector collector,
                          TaskCoordinator coordinator) {
        String message = (String) envelope.getMessage();
    
        for (String word : message.split(" ")) {
          // Use the word as the key, and 1 as the value.
          // A second task can add the 1's to get the word count.
          collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
        }
      }
    }
        Samza的API的概要介绍就到这里吧,非常多细节的API能够參看javadoc文档,这也是官网下一节的内容,因为篇幅有限,大家能够自己针对性的去深入了解了解就能够了。下一篇会讲一下之前在架构篇里多次提到的SamzaContainer。


        

  • 相关阅读:
    13-14学年寒假集训
    kafka 并发数配置过程中踩到的坑 InstanceAlreadyExistsException
    MongoDB运行状态、性能监控,分析
    linux运维相关命令收集
    谷歌浏览器文字显示不正常
    数据库sql优化
    一个字符串在另一个字符串中出现的次数
    多线程下载文件
    internet资源下载的断点续传
    URL如何通过Proxy代理访问Internet资源
  • 原文地址:https://www.cnblogs.com/gcczhongduan/p/5086242.html
Copyright © 2011-2022 走看看