zoukankan      html  css  js  c++  java
  • As of Flume 1.4.0, Avro is the default RPC protocol.

    Flume 1.8.0 Developer Guide — Apache Flume http://flume.apache.org/FlumeDeveloperGuide.html

    The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client) listening on some port. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    a1.channels.c1.type = memory
    
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    # For using a thrift source set the following instead of the above line.
    # a1.source.r1.type = thrift
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
    
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    

     

    RPC clients - Avro and Thrift

    As of Flume 1.4.0, Avro is the default RPC protocol. The NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface. The client needs to create this object with the host and port of the target Flume agent, and can then use the RpcClient to send data into the agent. The following example shows how to use the Flume Client SDK API within a user’s data-generating application:

    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;
    
    public class MyApp {
      public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("host.example.org", 41414);
    
        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
          client.sendDataToFlume(sampleData);
        }
    
        client.cleanUp();
      }
    }
    
    class MyRpcClientFacade {
      private RpcClient client;
      private String hostname;
      private int port;
    
      public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
      }
    
      public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    
        // Send the event
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          // clean up and recreate the client
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
          // Use the following method to create a thrift client (instead of the above line):
          // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
      }
    
      public void cleanUp() {
        // Close the RPC connection
        client.close();
      }
    
    }
  • 相关阅读:
    C++虚函数表解析(转)
    学习网址
    css 段落文字换行问题
    移动端fixed兼容问题
    半数集1
    汇编寄存器
    设计模式概述
    Vector用法介绍
    汇编PC硬件基本特征
    android 反编译总结
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9875211.html
Copyright © 2011-2022 走看看