Avro used in Flume
Define the interface of RpcClient
public interface RpcClient { public int getBatchSize(); public void append(Event event) throws EventDeliveryException; public void appendBatch(List<Event> events) throws EventDeliveryException; public void close() throws FlumeException; }
Define Avro RpcClient extending from RpcClient, using Netty as socket communication.
public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient { private Transceiver transceiver; private AvroSourceProtocol.Callback avroClient; private void connect(long timeout, TimeUnit tu) throws FlumeException { callTimeoutPool = Executors.newCachedThreadPool( new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker")); NioClientSocketChannelFactory socketChannelFactory = null; try { ExecutorService bossExecutor = Executors.newCachedThreadPool(new TransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")); ExecutorService workerExecutor = Executors.newCachedThreadPool(new TransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")); if (enableDeflateCompression || enableSsl) { if (maxIoWorkers >= 1) { socketChannelFactory = new SSLCompressionChannelFactory( bossExecutor, workerExecutor, enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel, truststore, truststorePassword, truststoreType, excludeProtocols, maxIoWorkers); } else { socketChannelFactory = new SSLCompressionChannelFactory( bossExecutor, workerExecutor, enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel, truststore, truststorePassword, truststoreType, excludeProtocols); } } else { if (maxIoWorkers >= 1) { socketChannelFactory = new NioClientSocketChannelFactory( bossExecutor, workerExecutor, maxIoWorkers); } else { socketChannelFactory = new NioClientSocketChannelFactory( bossExecutor, workerExecutor); } } transceiver = new NettyTransceiver(this.address, socketChannelFactory, tu.toMillis(timeout)); avroClient = SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver); } catch (Throwable t) { if (callTimeoutPool != null) { callTimeoutPool.shutdownNow(); } if (socketChannelFactory != null) { socketChannelFactory.releaseExternalResources(); } if (t instanceof IOException) { throw new FlumeException(this + ": RPC connection error", t); } else if (t instanceof FlumeException) { throw (FlumeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new FlumeException(this + ": Unexpected exception", t); } } setState(ConnState.READY); } }
Define NettyTransceiver to read and write with socket programming
/** * A Netty-based {@link Transceiver} implementation. */ public class NettyTransceiver extends Transceiver { /** If not specified, the default connection timeout will be used (60 sec). */ public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60 * 1000L; public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis"; public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay"; public static final boolean DEFAULT_TCP_NODELAY_VALUE = true; private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class .getName()); private final AtomicInteger serialGenerator = new AtomicInteger(0); private final Map<Integer, Callback<List<ByteBuffer>>> requests = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(); private final ChannelFactory channelFactory; private final long connectTimeoutMillis; private final ClientBootstrap bootstrap; private final InetSocketAddress remoteAddr; volatile ChannelFuture channelFuture; volatile boolean stopping; private final Object channelFutureLock = new Object(); /** * Read lock must be acquired whenever using non-final state. * Write lock must be acquired whenever modifying state. */ private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock(); private Channel channel; // Synchronized on stateLock private Protocol remote; // Synchronized on stateLock NettyTransceiver() { channelFactory = null; connectTimeoutMillis = 0L; bootstrap = null; remoteAddr = null; channelFuture = null; } /** * Creates a NettyTransceiver, and attempts to connect to the given address. * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection * timeout. * @param addr the address to connect to. * @throws IOException if an error occurs connecting to the given address. */ public NettyTransceiver(InetSocketAddress addr) throws IOException { this(addr, DEFAULT_CONNECTION_TIMEOUT_MILLIS); } /** * Creates a NettyTransceiver, and attempts to connect to the given address. * @param addr the address to connect to. * @param connectTimeoutMillis maximum amount of time to wait for connection * establishment in milliseconds, or null to use * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}. * @throws IOException if an error occurs connecting to the given address. */ public NettyTransceiver(InetSocketAddress addr, Long connectTimeoutMillis) throws IOException { this(addr, new NioClientSocketChannelFactory( Executors.newCachedThreadPool(new NettyTransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), Executors.newCachedThreadPool(new NettyTransceiverThreadFactory( "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), connectTimeoutMillis); } /** * Creates a NettyTransceiver, and attempts to connect to the given address. * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS} is used for the connection * timeout. * @param addr the address to connect to. * @param channelFactory the factory to use to create a new Netty Channel. * @throws IOException if an error occurs connecting to the given address. */ public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException { this(addr, channelFactory, buildDefaultBootstrapOptions(null)); } /** * Creates a NettyTransceiver, and attempts to connect to the given address. * @param addr the address to connect to. * @param channelFactory the factory to use to create a new Netty Channel. * @param connectTimeoutMillis maximum amount of time to wait for connection * establishment in milliseconds, or null to use * {@link #DEFAULT_CONNECTION_TIMEOUT_MILLIS}. * @throws IOException if an error occurs connecting to the given address. */ public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Long connectTimeoutMillis) throws IOException { this(addr, channelFactory, buildDefaultBootstrapOptions(connectTimeoutMillis)); } /** * Creates a NettyTransceiver, and attempts to connect to the given address. * It is strongly recommended that the {@link #NETTY_CONNECT_TIMEOUT_OPTION} * option be set to a reasonable timeout value (a Long value in milliseconds) * to prevent connect/disconnect attempts from hanging indefinitely. It is * also recommended that the {@link #NETTY_TCP_NODELAY_OPTION} option be set * to true to minimize RPC latency. * @param addr the address to connect to. * @param channelFactory the factory to use to create a new Netty Channel. * @param nettyClientBootstrapOptions map of Netty ClientBootstrap options * to use. * @throws IOException if an error occurs connecting to the given address. */ public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory, Map<String, Object> nettyClientBootstrapOptions) throws IOException { if (channelFactory == null) { throw new NullPointerException("channelFactory is null"); } // Set up. this.channelFactory = channelFactory; this.connectTimeoutMillis = (Long) nettyClientBootstrapOptions.get(NETTY_CONNECT_TIMEOUT_OPTION); bootstrap = new ClientBootstrap(channelFactory); remoteAddr = addr; // Configure the event pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline p = Channels.pipeline(); p.addLast("frameDecoder", new NettyFrameDecoder()); p.addLast("frameEncoder", new NettyFrameEncoder()); p.addLast("handler", new NettyClientAvroHandler()); return p; } }); if (nettyClientBootstrapOptions != null) { LOG.debug("Using Netty bootstrap options: " + nettyClientBootstrapOptions); bootstrap.setOptions(nettyClientBootstrapOptions); } // Make a new connection. stateLock.readLock().lock(); try { getChannel(); } finally { stateLock.readLock().unlock(); } } /** * Creates the default options map for the Netty ClientBootstrap. * @param connectTimeoutMillis connection timeout in milliseconds, or null * if no timeout is desired. * @return the map of Netty bootstrap options. */ private static Map<String, Object> buildDefaultBootstrapOptions( Long connectTimeoutMillis) { Map<String, Object> options = new HashMap<String, Object>(2); options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE); options.put(NETTY_CONNECT_TIMEOUT_OPTION, connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : connectTimeoutMillis); return options; } /** * Tests whether the given channel is ready for writing. * @return true if the channel is open and ready; false otherwise. */ private static boolean isChannelReady(Channel channel) { return (channel != null) && channel.isOpen() && channel.isBound() && channel.isConnected(); } /** * Gets the Netty channel. If the channel is not connected, first attempts * to connect. * NOTE: The stateLock read lock *must* be acquired before calling this * method. * @return the Netty channel * @throws IOException if an error occurs connecting the channel. */ private Channel getChannel() throws IOException { if (!isChannelReady(channel)) { // Need to reconnect // Upgrade to write lock stateLock.readLock().unlock(); stateLock.writeLock().lock(); try { if (!isChannelReady(channel)) { synchronized(channelFutureLock) { if (!stopping) { LOG.debug("Connecting to " + remoteAddr); channelFuture = bootstrap.connect(remoteAddr); } } if (channelFuture != null) { channelFuture.awaitUninterruptibly(connectTimeoutMillis); synchronized(channelFutureLock) { if (!channelFuture.isSuccess()) { throw new IOException("Error connecting to " + remoteAddr, channelFuture.getCause()); } channel = channelFuture.getChannel(); channelFuture = null; } } } } finally { // Downgrade to read lock: stateLock.readLock().lock(); stateLock.writeLock().unlock(); } } return channel; } /** * Closes the connection to the remote peer if connected. */ private void disconnect() { disconnect(false, false, null); } /** * Closes the connection to the remote peer if connected. * @param awaitCompletion if true, will block until the close has completed. * @param cancelPendingRequests if true, will drain the requests map and * send an IOException to all Callbacks. * @param cause if non-null and cancelPendingRequests is true, this Throwable * will be passed to all Callbacks. */ private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests, Throwable cause) { Channel channelToClose = null; Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null; boolean stateReadLockHeld = stateLock.getReadHoldCount() != 0; synchronized(channelFutureLock) { if (stopping && channelFuture != null) { channelFuture.cancel(); } } if (stateReadLockHeld) { stateLock.readLock().unlock(); } stateLock.writeLock().lock(); try { if (channel != null) { if (cause != null) { LOG.debug("Disconnecting from " + remoteAddr, cause); } else { LOG.debug("Disconnecting from " + remoteAddr); } channelToClose = channel; channel = null; remote = null; if (cancelPendingRequests) { // Remove all pending requests (will be canceled after relinquishing // write lock). requestsToCancel = new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(requests); requests.clear(); } } } finally { if (stateReadLockHeld) { stateLock.readLock().lock(); } stateLock.writeLock().unlock(); } // Cancel any pending requests by sending errors to the callbacks: if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) { LOG.debug("Removing " + requestsToCancel.size() + " pending request(s)."); for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) { request.handleError( cause != null ? cause : new IOException(getClass().getSimpleName() + " closed")); } } // Close the channel: if (channelToClose != null) { ChannelFuture closeFuture = channelToClose.close(); if (awaitCompletion && (closeFuture != null)) { closeFuture.awaitUninterruptibly(connectTimeoutMillis); } } } /** * Netty channels are thread-safe, so there is no need to acquire locks. * This method is a no-op. */ @Override public void lockChannel() { } /** * Netty channels are thread-safe, so there is no need to acquire locks. * This method is a no-op. */ @Override public void unlockChannel() { } public void close() { try { // Close the connection: stopping = true; disconnect(true, true, null); } finally { // Shut down all thread pools to exit. channelFactory.releaseExternalResources(); } } @Override public String getRemoteName() throws IOException { stateLock.readLock().lock(); try { return getChannel().getRemoteAddress().toString(); } finally { stateLock.readLock().unlock(); } } /** * Override as non-synchronized method because the method is thread safe. */ @Override public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException { try { CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>(); transceive(request, transceiverFuture); return transceiverFuture.get(); } catch (InterruptedException e) { LOG.debug("failed to get the response", e); return null; } catch (ExecutionException e) { LOG.debug("failed to get the response", e); return null; } } @Override public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException { stateLock.readLock().lock(); try { int serial = serialGenerator.incrementAndGet(); NettyDataPack dataPack = new NettyDataPack(serial, request); requests.put(serial, callback); writeDataPack(dataPack); } finally { stateLock.readLock().unlock(); } } @Override public void writeBuffers(List<ByteBuffer> buffers) throws IOException { stateLock.readLock().lock(); try { writeDataPack( new NettyDataPack(serialGenerator.incrementAndGet(), buffers)); } finally { stateLock.readLock().unlock(); } } /** * Writes a NettyDataPack, reconnecting to the remote peer if necessary. * NOTE: The stateLock read lock *must* be acquired before calling this * method. * @param dataPack the data pack to write. * @throws IOException if an error occurs connecting to the remote peer. */ private void writeDataPack(NettyDataPack dataPack) throws IOException { getChannel().write(dataPack); } @Override public List<ByteBuffer> readBuffers() throws IOException { throw new UnsupportedOperationException(); } @Override public Protocol getRemote() { stateLock.readLock().lock(); try { return remote; } finally { stateLock.readLock().unlock(); } } @Override public boolean isConnected() { stateLock.readLock().lock(); try { return remote!=null; } finally { stateLock.readLock().unlock(); } } @Override public void setRemote(Protocol protocol) { stateLock.writeLock().lock(); try { this.remote = protocol; } finally { stateLock.writeLock().unlock(); } } /** * Avro client handler for the Netty transport */ class NettyClientAvroHandler extends SimpleChannelUpstreamHandler { @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { LOG.debug(e.toString()); ChannelStateEvent cse = (ChannelStateEvent)e; if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) { // Server closed connection; disconnect client side LOG.debug("Remote peer " + remoteAddr + " closed connection."); disconnect(false, true, null); } } super.handleUpstream(ctx, e); } @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // channel = e.getChannel(); super.channelOpen(ctx, e); } @Override public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { NettyDataPack dataPack = (NettyDataPack)e.getMessage(); Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial()); if (callback==null) { throw new RuntimeException("Missing previous call info"); } try { callback.handleResult(dataPack.getDatas()); } finally { requests.remove(dataPack.getSerial()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { disconnect(false, true, e.getCause()); } } /** * Creates threads with unique names based on a specified name prefix. */ private static class NettyTransceiverThreadFactory implements ThreadFactory { private final AtomicInteger threadId = new AtomicInteger(0); private final String prefix; /** * Creates a NettyTransceiverThreadFactory that creates threads with the * specified name. * @param prefix the name prefix to use for all threads created by this * ThreadFactory. A unique ID will be appended to this prefix to form the * final thread name. */ public NettyTransceiverThreadFactory(String prefix) { this.prefix = prefix; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName(prefix + " " + threadId.incrementAndGet()); return thread; } } }
Use proxy as RpcClient
SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,transceiver)
public class SpecificRequestor extends Requestor implements InvocationHandler { /** Create a proxy instance whose methods invoke RPCs. */ @SuppressWarnings("unchecked") public static <T> T getClient(Class<T> iface, Transceiver transciever, SpecificData data) throws IOException { Protocol protocol = data.getProtocol(iface); return (T)Proxy.newProxyInstance (data.getClassLoader(), new Class[] { iface }, new SpecificRequestor(protocol, transciever, data)); } /** Create a proxy instance whose methods invoke RPCs. */ @SuppressWarnings("unchecked") public static <T> T getClient(Class<T> iface, SpecificRequestor requestor) throws IOException { return (T)Proxy.newProxyInstance(requestor.data.getClassLoader(), new Class[] { iface }, requestor); } /** Create a proxy instance whose methods invoke RPCs. */ public static <T> T getClient(Class<T> iface, Transceiver transciever) throws IOException { return getClient(iface, transciever, new SpecificData(iface.getClassLoader())); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { try { // Check if this is a callback-based RPC: Type[] parameterTypes = method.getParameterTypes(); if ((parameterTypes.length > 0) && (parameterTypes[parameterTypes.length - 1] instanceof Class) && Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) { // Extract the Callback from the end of of the argument list Object[] finalArgs = Arrays.copyOf(args, args.length - 1); Callback<?> callback = (Callback<?>)args[args.length - 1]; request(method.getName(), finalArgs, callback); return null; } else { return request(method.getName(), args); } } catch (Exception e) { // Check if this is a declared Exception: for (Class<?> exceptionClass : method.getExceptionTypes()) { if (exceptionClass.isAssignableFrom(e.getClass())) { throw e; } } // Next, check for RuntimeExceptions: if (e instanceof RuntimeException) { throw e; } // Not an expected Exception, so wrap it in AvroRemoteException: throw new AvroRemoteException(e); } } }
Request method to remote
/** Writes a request message and returns the result through a Callback. */ <T> void request(Request request, Callback<T> callback) throws Exception { Transceiver t = getTransceiver(); if (!t.isConnected()) { // Acquire handshake lock so that only one thread is performing the // handshake and other threads block until the handshake is completed handshakeLock.lock(); try { if (t.isConnected()) { // Another thread already completed the handshake; no need to hold // the write lock handshakeLock.unlock(); } else { CallFuture<T> callFuture = new CallFuture<T>(callback); t.transceive(request.getBytes(), new TransceiverCallback<T>(request, callFuture)); // Block until handshake complete callFuture.await(); if (request.getMessage().isOneWay()) { Throwable error = callFuture.getError(); if (error != null) { if (error instanceof Exception) { throw (Exception) error; } else { throw new AvroRemoteException(error); } } } return; } } finally{ if (handshakeLock.isHeldByCurrentThread()) { handshakeLock.unlock(); } } } if (request.getMessage().isOneWay()) { t.lockChannel(); try { t.writeBuffers(request.getBytes()); if (callback != null) { callback.handleResult(null); } } finally { t.unlockChannel(); } } else { t.transceive(request.getBytes(), new TransceiverCallback<T>(request, callback)); } }
Now look at avro protocl
@org.apache.avro.specific.AvroGenerated public interface AvroSourceProtocol { public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{"protocol":"AvroSourceProtocol","namespace":"org.apache.flume.source.avro","doc":"* Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing,\n * software distributed under the License is distributed on an\n * \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n * KIND, either express or implied. See the License for the\n * specific language governing permissions and limitations\n * under the License.","types":[{"type":"enum","name":"Status","symbols":["OK","FAILED","UNKNOWN"]},{"type":"record","name":"AvroFlumeEvent","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}],"messages":{"append":{"request":[{"name":"event","type":"AvroFlumeEvent"}],"response":"Status"},"appendBatch":{"request":[{"name":"events","type":{"type":"array","items":"AvroFlumeEvent"}}],"response":"Status"}}}"); org.apache.flume.source.avro.Status append(org.apache.flume.source.avro.AvroFlumeEvent event) throws org.apache.avro.AvroRemoteException; org.apache.flume.source.avro.Status appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events) throws org.apache.avro.AvroRemoteException; @SuppressWarnings("all") public interface Callback extends AvroSourceProtocol { public static final org.apache.avro.Protocol PROTOCOL = org.apache.flume.source.avro.AvroSourceProtocol.PROTOCOL; void append(org.apache.flume.source.avro.AvroFlumeEvent event, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException; void appendBatch(java.util.List<org.apache.flume.source.avro.AvroFlumeEvent> events, org.apache.avro.ipc.Callback<org.apache.flume.source.avro.Status> callback) throws java.io.IOException; } }