zoukankan      html  css  js  c++  java
  • As of Flume 1.4.0, Avro is the default RPC protocol.

    Flume 1.8.0 Developer Guide — Apache Flume http://flume.apache.org/FlumeDeveloperGuide.html

    The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client) listening on some port. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

    a1.channels = c1
    a1.sources = r1
    a1.sinks = k1
    
    a1.channels.c1.type = memory
    
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    # For using a thrift source set the following instead of the above line.
    # a1.source.r1.type = thrift
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
    
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = logger
    

     

    RPC clients - Avro and Thrift

    As of Flume 1.4.0, Avro is the default RPC protocol. The NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface. The client needs to create this object with the host and port of the target Flume agent, and can then use the RpcClient to send data into the agent. The following example shows how to use the Flume Client SDK API within a user’s data-generating application:

    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;
    
    public class MyApp {
      public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("host.example.org", 41414);
    
        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
          client.sendDataToFlume(sampleData);
        }
    
        client.cleanUp();
      }
    }
    
    class MyRpcClientFacade {
      private RpcClient client;
      private String hostname;
      private int port;
    
      public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
      }
    
      public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    
        // Send the event
        try {
          client.append(event);
        } catch (EventDeliveryException e) {
          // clean up and recreate the client
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
          // Use the following method to create a thrift client (instead of the above line):
          // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
      }
    
      public void cleanUp() {
        // Close the RPC connection
        client.close();
      }
    
    }
  • 相关阅读:
    redis几种模式的部署(Windows下实现)
    servlet容器:jetty,Tomcat,JBoss
    redis三种模式对比
    Linux expect详解
    expect学习笔记及实例详解
    shell expect的简单实用
    【Spring Boot】内嵌容器
    java 怎么让打印信息换行?
    Java中的静态方法是什么?
    java的接口为什么不能实例化
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9875211.html
Copyright © 2011-2022 走看看