zoukankan      html  css  js  c++  java
  • Avro-RPC client in Flume

    Avro used in Flume

    Define the interface of RpcClient 

    public interface RpcClient {
      public int getBatchSize();
      public void append(Event event) throws EventDeliveryException;
      public void appendBatch(List<Event> events) throws
          EventDeliveryException;
      public void close() throws FlumeException;
    }
    

    Define Avro RpcClient extending from RpcClient, using Netty as socket communication.

    public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
      private Transceiver transceiver;
      private AvroSourceProtocol.Callback avroClient;
     
      private void connect(long timeout, TimeUnit tu) throws FlumeException {
        callTimeoutPool = Executors.newCachedThreadPool(
            new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
        NioClientSocketChannelFactory socketChannelFactory = null;
    
        try {
          ExecutorService bossExecutor =
              Executors.newCachedThreadPool(new TransceiverThreadFactory(
                  "Avro " + NettyTransceiver.class.getSimpleName() + " Boss"));
          ExecutorService workerExecutor =
              Executors.newCachedThreadPool(new TransceiverThreadFactory(
                  "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"));
    
          if (enableDeflateCompression || enableSsl) {
            if (maxIoWorkers >= 1) {
              socketChannelFactory = new SSLCompressionChannelFactory(
                bossExecutor, workerExecutor,
                enableDeflateCompression, enableSsl, trustAllCerts,
                compressionLevel, truststore, truststorePassword, truststoreType,
                excludeProtocols, maxIoWorkers);
            } else {
              socketChannelFactory = new SSLCompressionChannelFactory(
                bossExecutor, workerExecutor,
                enableDeflateCompression, enableSsl, trustAllCerts,
                compressionLevel, truststore, truststorePassword, truststoreType,
                excludeProtocols);
            }
          } else {
            if (maxIoWorkers >= 1) {
              socketChannelFactory = new NioClientSocketChannelFactory(
                  bossExecutor, workerExecutor, maxIoWorkers);
            } else {
              socketChannelFactory = new NioClientSocketChannelFactory(
                  bossExecutor, workerExecutor);
            }
          }
    
          transceiver = new NettyTransceiver(this.address,
              socketChannelFactory,
              tu.toMillis(timeout));
          avroClient =
              SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
              transceiver);
        } catch (Throwable t) {
          if (callTimeoutPool != null) {
            callTimeoutPool.shutdownNow();
          }
          if (socketChannelFactory != null) {
            socketChannelFactory.releaseExternalResources();
          }
          if (t instanceof IOException) {
            throw new FlumeException(this + ": RPC connection error", t);
          } else if (t instanceof FlumeException) {
            throw (FlumeException) t;
          } else if (t instanceof Error) {
            throw (Error) t;
          } else {
            throw new FlumeException(this + ": Unexpected exception", t);
          }
        }
    
        setState(ConnState.READY);
      }
    
    }
    
    Define NettyTransceiver to read and write with socket programming
    /**
     * A Netty-based {@link Transceiver} implementation.
     */
    public class NettyTransceiver extends Transceiver {
      /** If not specified, the default connection timeout will be used (60 sec). */
      public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000L;
      public static final String NETTY_CONNECT_TIMEOUT_OPTION = 
          "connectTimeoutMillis";
      public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
      public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
      
      private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
          .getName());
    
      private final AtomicInteger serialGenerator = new AtomicInteger(0);
      private final Map<Integer, Callback<List<ByteBuffer>>> requests = 
        new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
      
      private final ChannelFactory channelFactory;
      private final long connectTimeoutMillis;
      private final ClientBootstrap bootstrap;
      private final InetSocketAddress remoteAddr;
      
      volatile ChannelFuture channelFuture;
      volatile boolean stopping;
      private final Object channelFutureLock = new Object();
    
      /**
       * Read lock must be acquired whenever using non-final state.
       * Write lock must be acquired whenever modifying state.
       */
      private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
      private Channel channel;       // Synchronized on stateLock
      private Protocol remote;       // Synchronized on stateLock
    
      NettyTransceiver() {
        channelFactory = null;
        connectTimeoutMillis = 0L;
        bootstrap = null;
        remoteAddr = null;
        channelFuture = null;
      }
    
      /**
       * Creates a NettyTransceiver, and attempts to connect to the given address.
       * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection 
       * timeout.
       * @param addr the address to connect to.
       * @throws IOException if an error occurs connecting to the given address.
       */
      public NettyTransceiver(InetSocketAddress addr) throws IOException {
        this(addr, DEFAULT_CONNECTION_TIMEOUT_MILLIS);
      }
      
      /**
       * Creates a NettyTransceiver, and attempts to connect to the given address.
       * @param addr the address to connect to.
       * @param connectTimeoutMillis maximum amount of time to wait for connection 
       * establishment in milliseconds, or null to use 
       * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
       * @throws IOException if an error occurs connecting to the given address.
       */
      public NettyTransceiver(InetSocketAddress addr, 
          Long connectTimeoutMillis) throws IOException {
        this(addr, new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(new NettyTransceiverThreadFactory(
                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), 
            Executors.newCachedThreadPool(new NettyTransceiverThreadFactory(
                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), 
            connectTimeoutMillis);
      }
    
      /**
       * Creates a NettyTransceiver, and attempts to connect to the given address.
       * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection 
       * timeout.
       * @param addr the address to connect to.
       * @param channelFactory the factory to use to create a new Netty Channel.
       * @throws IOException if an error occurs connecting to the given address.
       */
      public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) 
        throws IOException {
        this(addr, channelFactory, buildDefaultBootstrapOptions(null));
      }
      
      /**
       * Creates a NettyTransceiver, and attempts to connect to the given address.
       * @param addr the address to connect to.
       * @param channelFactory the factory to use to create a new Netty Channel.
       * @param connectTimeoutMillis maximum amount of time to wait for connection 
       * establishment in milliseconds, or null to use 
       * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}.
       * @throws IOException if an error occurs connecting to the given address.
       */
      public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, 
          Long connectTimeoutMillis) throws IOException {
        this(addr, channelFactory, 
            buildDefaultBootstrapOptions(connectTimeoutMillis));
      }
      
      /**
       * Creates a NettyTransceiver, and attempts to connect to the given address.
       * It is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION} 
       * option be set to a reasonable timeout value (a Long value in milliseconds) 
       * to prevent connect/disconnect attempts from hanging indefinitely.  It is 
       * also recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set 
       * to true to minimize RPC latency.
       * @param addr the address to connect to.
       * @param channelFactory the factory to use to create a new Netty Channel.
       * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options 
       * to use.
       * @throws IOException if an error occurs connecting to the given address.
       */
      public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, 
          Map<String, Object> nettyClientBootstrapOptions) throws IOException {
        if (channelFactory == null) {
          throw new NullPointerException("channelFactory is null");
        }
        
        // Set up.
        this.channelFactory = channelFactory;
        this.connectTimeoutMillis = (Long) 
            nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION);
        bootstrap = new ClientBootstrap(channelFactory);
        remoteAddr = addr;
    
        // Configure the event pipeline factory.
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          @Override
          public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline p = Channels.pipeline();
            p.addLast("frameDecoder", new NettyFrameDecoder());
            p.addLast("frameEncoder", new NettyFrameEncoder());
            p.addLast("handler", new NettyClientAvroHandler());
            return p;
          }
        });
    
        if (nettyClientBootstrapOptions != null) {
          LOG.debug("Using Netty bootstrap options: " + 
              nettyClientBootstrapOptions);
          bootstrap.setOptions(nettyClientBootstrapOptions);
        }
    
        // Make a new connection.
        stateLock.readLock().lock();
        try {
          getChannel();
        } finally {
          stateLock.readLock().unlock();
        }
      }
      
      /**
       * Creates the default options map for the Netty ClientBootstrap.
       * @param connectTimeoutMillis connection timeout in milliseconds, or null 
       * if no timeout is desired.
       * @return the map of Netty bootstrap options.
       */
      private static Map<String, Object> buildDefaultBootstrapOptions(
          Long connectTimeoutMillis) {
        Map<String, Object> options = new HashMap<String, Object>(2);
        options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE);
        options.put(NETTY_CONNECT_TIMEOUT_OPTION, 
            connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : 
              connectTimeoutMillis);
        return options;
      }
      
      /**
       * Tests whether the given channel is ready for writing.
       * @return true if the channel is open and ready; false otherwise.
       */
      private static boolean isChannelReady(Channel channel) {
        return (channel != null) && 
          channel.isOpen() && channel.isBound() && channel.isConnected();
      }
      
      /**
       * Gets the Netty channel.  If the channel is not connected, first attempts 
       * to connect.
       * NOTE: The stateLock read lock *must* be acquired before calling this 
       * method.
       * @return the Netty channel
       * @throws IOException if an error occurs connecting the channel.
       */
      private Channel getChannel() throws IOException {
        if (!isChannelReady(channel)) {
          // Need to reconnect
          // Upgrade to write lock
          stateLock.readLock().unlock();
          stateLock.writeLock().lock();
          try {
            if (!isChannelReady(channel)) {
              synchronized(channelFutureLock) {
                if (!stopping) {
              LOG.debug("Connecting to " + remoteAddr);
                  channelFuture = bootstrap.connect(remoteAddr);
                }
              }
              if (channelFuture != null) {
              channelFuture.awaitUninterruptibly(connectTimeoutMillis);
    
                synchronized(channelFutureLock) {
              if (!channelFuture.isSuccess()) {
                throw new IOException("Error connecting to " + remoteAddr, 
                    channelFuture.getCause());
              }
              channel = channelFuture.getChannel();
                  channelFuture = null;
                }
              }
            }
          } finally {
            // Downgrade to read lock:
            stateLock.readLock().lock();
            stateLock.writeLock().unlock();
          }
        }
        return channel;
      }
      
      /**
       * Closes the connection to the remote peer if connected.
       */
      private void disconnect() {
        disconnect(false, false, null);
      }
      
      /**
       * Closes the connection to the remote peer if connected.
       * @param awaitCompletion if true, will block until the close has completed.
       * @param cancelPendingRequests if true, will drain the requests map and 
       * send an IOException to all Callbacks.
       * @param cause if non-null and cancelPendingRequests is true, this Throwable 
       * will be passed to all Callbacks.
       */
      private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests,
          Throwable cause) {
        Channel channelToClose = null;
        Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
        boolean stateReadLockHeld = stateLock.getReadHoldCount() != 0;
    
        synchronized(channelFutureLock) {
            if (stopping && channelFuture != null) {
               channelFuture.cancel();
            }
        }
        if (stateReadLockHeld) {
          stateLock.readLock().unlock();
        }
        stateLock.writeLock().lock();
        try {
          if (channel != null) {
            if (cause != null) {
              LOG.debug("Disconnecting from " + remoteAddr, cause);
            }
            else {
              LOG.debug("Disconnecting from " + remoteAddr);
            }
            channelToClose = channel;
            channel = null;
            remote = null;
            if (cancelPendingRequests) {
              // Remove all pending requests (will be canceled after relinquishing 
              // write lock).
              requestsToCancel = 
                new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(requests);
              requests.clear();
            }
          }
        } finally {
          if (stateReadLockHeld) {
            stateLock.readLock().lock();
          }
          stateLock.writeLock().unlock();
        }
        
        // Cancel any pending requests by sending errors to the callbacks:
        if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) {
          LOG.debug("Removing " + requestsToCancel.size() + " pending request(s).");
          for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) {
            request.handleError(
                cause != null ? cause : 
                  new IOException(getClass().getSimpleName() + " closed"));
          }
        }
        
        // Close the channel:
        if (channelToClose != null) {
          ChannelFuture closeFuture = channelToClose.close();
          if (awaitCompletion && (closeFuture != null)) {
            closeFuture.awaitUninterruptibly(connectTimeoutMillis);
          }
        }
      }
      
      /**
       * Netty channels are thread-safe, so there is no need to acquire locks.
       * This method is a no-op.
       */
      @Override
      public void lockChannel() {
        
      }
      
      /**
       * Netty channels are thread-safe, so there is no need to acquire locks.
       * This method is a no-op.
       */
      @Override
      public void unlockChannel() {
        
      }
    
      public void close() {
        try {
          // Close the connection:
          stopping = true;
          disconnect(true, true, null);
        } finally {
          // Shut down all thread pools to exit.
          channelFactory.releaseExternalResources();
        }
      }
    
      @Override
      public String getRemoteName() throws IOException {
        stateLock.readLock().lock();
        try {
          return getChannel().getRemoteAddress().toString();
        } finally {
          stateLock.readLock().unlock();
        }
      }
    
      /**
       * Override as non-synchronized method because the method is thread safe.
       */
      @Override
      public List<ByteBuffer> transceive(List<ByteBuffer> request) 
        throws IOException {
        try {
          CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
          transceive(request, transceiverFuture);
          return transceiverFuture.get();
        } catch (InterruptedException e) {
          LOG.debug("failed to get the response", e);
          return null;
        } catch (ExecutionException e) {
          LOG.debug("failed to get the response", e);
          return null;
        }
      }
      
      @Override
      public void transceive(List<ByteBuffer> request, 
          Callback<List<ByteBuffer>> callback) throws IOException {
        stateLock.readLock().lock();
        try {
          int serial = serialGenerator.incrementAndGet();
          NettyDataPack dataPack = new NettyDataPack(serial, request);
          requests.put(serial, callback);
          writeDataPack(dataPack);
        } finally {
          stateLock.readLock().unlock();
        }
      }
      
      @Override
      public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
        stateLock.readLock().lock();
        try {
          writeDataPack(
              new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
        } finally {
          stateLock.readLock().unlock();
        }
      }
      
      /**
       * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
       * NOTE: The stateLock read lock *must* be acquired before calling this 
       * method.
       * @param dataPack the data pack to write.
       * @throws IOException if an error occurs connecting to the remote peer.
       */
      private void writeDataPack(NettyDataPack dataPack) throws IOException {
        getChannel().write(dataPack);
      }
    
      @Override
      public List<ByteBuffer> readBuffers() throws IOException {
        throw new UnsupportedOperationException();  
      }
      
      @Override
      public Protocol getRemote() {
        stateLock.readLock().lock();
        try {
          return remote;
        } finally {
          stateLock.readLock().unlock();
        }
      }
    
      @Override
      public boolean isConnected() {
        stateLock.readLock().lock();
        try {
          return remote!=null;
        } finally {
          stateLock.readLock().unlock();
        }
      }
    
      @Override
      public void setRemote(Protocol protocol) {
        stateLock.writeLock().lock();
        try {
          this.remote = protocol;
        } finally {
          stateLock.writeLock().unlock();
        }
      }
    
      /**
       * Avro client handler for the Netty transport 
       */
      class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
    
        @Override
        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception {
          if (e instanceof ChannelStateEvent) {
            LOG.debug(e.toString());
            ChannelStateEvent cse = (ChannelStateEvent)e;
            if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
              // Server closed connection; disconnect client side
              LOG.debug("Remote peer " + remoteAddr + " closed connection.");
              disconnect(false, true, null);
            }
          }
          super.handleUpstream(ctx, e);
        }
    
        @Override
        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
          // channel = e.getChannel();
          super.channelOpen(ctx, e);
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
          NettyDataPack dataPack = (NettyDataPack)e.getMessage();
          Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial());
          if (callback==null) {
            throw new RuntimeException("Missing previous call info");
          }
          try {
            callback.handleResult(dataPack.getDatas());
          } finally {
            requests.remove(dataPack.getSerial());
          }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
          disconnect(false, true, e.getCause());      
        }
    
      }
    
      /**
       * Creates threads with unique names based on a specified name prefix.
       */
      private static class NettyTransceiverThreadFactory implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger(0);
        private final String prefix;
        
        /**
         * Creates a NettyTransceiverThreadFactory that creates threads with the 
         * specified name.
         * @param prefix the name prefix to use for all threads created by this 
         * ThreadFactory.  A unique ID will be appended to this prefix to form the 
         * final thread name.
         */
        public NettyTransceiverThreadFactory(String prefix) {
          this.prefix = prefix;
        }
        
        @Override
        public Thread newThread(Runnable r) {
          Thread thread = new Thread(r);
          thread.setName(prefix + " " + threadId.incrementAndGet());
          return thread;
        }
      }
    }
    View Code

    Use proxy as RpcClient 

    SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,transceiver)

    public class SpecificRequestor extends Requestor implements InvocationHandler {
        /** Create a proxy instance whose methods invoke RPCs. */
      @SuppressWarnings("unchecked")
      public static  <T> T getClient(Class<T> iface, Transceiver transciever,
                                     SpecificData data)
        throws IOException {
        Protocol protocol = data.getProtocol(iface);
        return (T)Proxy.newProxyInstance
          (data.getClassLoader(),
           new Class[] { iface },
           new SpecificRequestor(protocol, transciever, data));
      }
    
      /** Create a proxy instance whose methods invoke RPCs. */
      @SuppressWarnings("unchecked")
      public static <T> T getClient(Class<T> iface, SpecificRequestor requestor)
        throws IOException {
        return (T)Proxy.newProxyInstance(requestor.data.getClassLoader(),
                                      new Class[] { iface }, requestor);
      }
    /** Create a proxy instance whose methods invoke RPCs. */
      public static  <T> T getClient(Class<T> iface, Transceiver transciever)
        throws IOException {
        return getClient(iface, transciever,
                         new SpecificData(iface.getClassLoader()));
      }
       @Override
      public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
        try {
          // Check if this is a callback-based RPC:
          Type[] parameterTypes = method.getParameterTypes();
          if ((parameterTypes.length > 0) &&
              (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
              Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
            // Extract the Callback from the end of of the argument list
            Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
            Callback<?> callback = (Callback<?>)args[args.length - 1];
            request(method.getName(), finalArgs, callback);
            return null;
          }
          else {
            return request(method.getName(), args);
          }
        } catch (Exception e) {
          // Check if this is a declared Exception:
          for (Class<?> exceptionClass : method.getExceptionTypes()) {
            if (exceptionClass.isAssignableFrom(e.getClass())) {
              throw e;
            }
          }
          
          // Next, check for RuntimeExceptions:
          if (e instanceof RuntimeException) {
            throw e;
          }
          
          // Not an expected Exception, so wrap it in AvroRemoteException:
          throw new AvroRemoteException(e);
        }
      }
    
    }

    Request method to remote

     /** Writes a request message and returns the result through a Callback. */
      <T> void request(Request request, Callback<T> callback)
        throws Exception {
        Transceiver t = getTransceiver();
        if (!t.isConnected()) {
          // Acquire handshake lock so that only one thread is performing the
          // handshake and other threads block until the handshake is completed
          handshakeLock.lock();
          try {
            if (t.isConnected()) {
              // Another thread already completed the handshake; no need to hold
              // the write lock
              handshakeLock.unlock();
            } else {
              CallFuture<T> callFuture = new CallFuture<T>(callback);
              t.transceive(request.getBytes(),
                           new TransceiverCallback<T>(request, callFuture));
              // Block until handshake complete
              callFuture.await();
              if (request.getMessage().isOneWay()) {
                Throwable error = callFuture.getError();
                if (error != null) {
                  if (error instanceof Exception) {
                    throw (Exception) error;
                  } else {
                    throw new AvroRemoteException(error);
                  }
                }
              }
              return;
            }
          } finally{
            if (handshakeLock.isHeldByCurrentThread()) {
              handshakeLock.unlock();
            }
          }
        }
        
        if (request.getMessage().isOneWay()) {
          t.lockChannel();
          try {
            t.writeBuffers(request.getBytes());
            if (callback != null) {
              callback.handleResult(null);
            }
          } finally {
            t.unlockChannel();
          }
        } else {
          t.transceive(request.getBytes(),
                       new TransceiverCallback<T>(request, callback));
        }
        
      }

    Now look at avro protocl

    @org.apache.avro.specific.AvroGenerated
    public interface AvroSourceProtocol {
      public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{"protocol":"AvroSourceProtocol","namespace":"org.apache.flume.source.avro","doc":"* Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements.  See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership.  The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License.  You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing,\n * software distributed under the License is distributed on an\n * \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n * KIND, either express or implied.  See the License for the\n * specific language governing permissions and limitations\n * under the License.","types":[{"type":"enum","name":"Status","symbols":["OK","FAILED","UNKNOWN"]},{"type":"record","name":"AvroFlumeEvent","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}],"messages":{"append":{"request":[{"name":"event","type":"AvroFlumeEvent"}],"response":"Status"},"appendBatch":{"request":[{"name":"events","type":{"type":"array","items":"AvroFlumeEvent"}}],"response":"Status"}}}");
      org.apache.flume.source.avro.Status append(org.apache.flume.source.avro.AvroFlumeEvent event) throws org.apache.avro.AvroRemoteException;
      org.apache.flume.source.avro.Status appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events) throws org.apache.avro.AvroRemoteException;
    
      @SuppressWarnings("all")
      public interface Callback extends AvroSourceProtocol {
        public static final org.apache.avro.Protocol PROTOCOL = org.apache.flume.source.avro.AvroSourceProtocol.PROTOCOL;
        void append(org.apache.flume.source.avro.AvroFlumeEvent event, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException;
        void appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException;
      }
    }
  • 相关阅读:
    [二叉树算法]关于层次遍历二叉树的一些算法总结
    数据库事务并发访问产生的问题及四种事务隔离级别
    当relative遇上z-index,阻断事件捕获
    关于CAS操作
    LRU算法与LRUCache
    Hadoop2.x 关于日志文件位置
    推荐系统架构图——我的软件工程概论课设
    文件上传+解析漏洞
    命令执行漏洞
    SSRF漏洞
  • 原文地址:https://www.cnblogs.com/iiiDragon/p/9776684.html
Copyright © 2011-2022 走看看