zoukankan      html  css  js  c++  java
  • As of Flume 1.4.0, Avro is the default RPC protocol.

    Flume 1.8.0 Developer Guide — Apache Flume http://flume.apache.org/FlumeDeveloperGuide.html

    The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client) listening on some port. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    a1.channels.c1.type = memory
    
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    # For using a thrift source set the following instead of the above line.
    # a1.source.r1.type = thrift
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
    
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    

     

    RPC clients - Avro and Thrift

    As of Flume 1.4.0, Avro is the default RPC protocol. The NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface. The client needs to create this object with the host and port of the target Flume agent, and can then use the RpcClient to send data into the agent. The following example shows how to use the Flume Client SDK API within a user’s data-generating application:

    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;
    
    public class MyApp {
      public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("host.example.org", 41414);
    
        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
          client.sendDataToFlume(sampleData);
        }
    
        client.cleanUp();
      }
    }
    
    class MyRpcClientFacade {
      private RpcClient client;
      private String hostname;
      private int port;
    
      public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
      }
    
      public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    
        // Send the event
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          // clean up and recreate the client
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
          // Use the following method to create a thrift client (instead of the above line):
          // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
      }
    
      public void cleanUp() {
        // Close the RPC connection
        client.close();
      }
    
    }
  • 相关阅读:
    node.js的安装
    简单创建数据库和表
    layui后台框架的搭建
    FlagsAttribute属性在enum中的应用 [转]
    递归页面
    AspNet2.0页面生命周期 [转]
    C#基础概念二十五问[转]
    md5加密方法
    ASP.NET中Cookie编程的基础知识 [转]
    说说“数据库的自动安装、升级” [转]
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9875211.html
Copyright © 2011-2022 走看看