zoukankan      html  css  js  c++  java
  • flume自定义sink之mysql

    package me;
    
    import static org.mockito.Matchers.booleanThat;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    
    import com.google.common.base.Preconditions;
    
    public class MySink extends AbstractSink implements Configurable {
    	private Connection connect;
    	private Statement stmt;
    	private String columnName;
    	private String url;
    	private String user;
    	private String password;
    	private String tableName;
    	// 在整个sink结束时执行一遍
    	@Override
    	public synchronized void stop() {
    		// TODO Auto-generated method stub
    		super.stop();
    	}
    
    	// 在整个sink开始时执行一遍
    	@Override
    	public synchronized void start() {
    		// TODO Auto-generated method stub
    		super.start();
    		try {
    			connect = DriverManager.getConnection(url, user, password);
    			// 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
    			stmt = connect.createStatement(); 
    		} catch (SQLException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    
    	}
    
    	// 不断循环调用
    	@Override
    	public Status process() throws EventDeliveryException {
    		// TODO Auto-generated method stub
    
    		Channel ch = getChannel();
    		Transaction txn = ch.getTransaction();
    		Event event = null;
    		txn.begin();
    		while (true) {
    			event = ch.take();
    			if (event != null) {
    				break;
    			}
    		}
    		try {
    			String body = new String(event.getBody());
    			if (body.split(",").length == columnName.split(",").length) {
    				String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")";
    				stmt.executeUpdate(sql);
    				txn.commit();
    				return Status.READY;
    			} else {
    				txn.rollback();
    				return null;
    			}
    		} catch (Throwable th) {
    			txn.rollback();
    
    			if (th instanceof Error) {
    				throw (Error) th;
    			} else {
    				throw new EventDeliveryException(th);
    			}
    		} finally {
    			txn.close();
    		}
    
    	}
    
    	@Override
    	public void configure(Context arg0) {
    		columnName = arg0.getString("column_name");
    		Preconditions.checkNotNull(columnName, "column_name must be set!!");
            url = arg0.getString("url");
            Preconditions.checkNotNull(url, "url must be set!!");
            user = arg0.getString("user");
            Preconditions.checkNotNull(user, "user must be set!!");
            password = arg0.getString("password");
            Preconditions.checkNotNull(password, "password must be set!!");
            tableName = arg0.getString("tableName");
            Preconditions.checkNotNull(tableName, "tableName must be set!!");
    	}
    
    }
    
    agent.sources = s1    
    agent.channels = c1  
    agent.sinks = sk1  
    
    agent.sources.s1.type = netcat  
    agent.sources.s1.bind = localhost  
    agent.sources.s1.port = 5678  
    agent.sources.s1.channels = c1  
    
    agent.sinks.sk1.type = me.MySink
    agent.sinks.sk1.url=jdbc:mysql://192.168.16.33:3306/test
    agent.sinks.sk1.tableName= test.user
    agent.sinks.sk1.user=root
    agent.sinks.sk1.password=WoChu@123
    agent.sinks.sk1.column_name=id, username, password
    agent.sinks.sk1.channel = c1  
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000     
    agent.channels.c1.transactionCapacity = 100
    
    lihudeMacBook-Pro:~ SunAndLi$ cd hadoop-2.7.2/flume/
    lihudeMacBook-Pro:flume SunAndLi$  bin/flume-ng agent -c conf -f conf/sink-mysql --name agent -Dflume.root.logger=INFO,console 
    

     

  • 相关阅读:
    thinkphp3.2 无法加载模块
    php 使用 wangeditor3 图片上传
    nginx 配置 server
    oracle练手(一)
    Oracle练习(一)
    java运算符优先级
    数据库(mysql和oracle)
    java实现4种内部排序
    mysql-----分库分表
    NIO总结-----Buffer
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6645415.html
Copyright © 2011-2022 走看看