zoukankan      html  css  js  c++  java
  • flume收集日志直接sink到oracle数据库

    因为项目需求,需要保存项目日志。项目的并发量不大,所以这里直接通过flume保存到oracle

    源码地址: https://github.com/jaxlove/fks/tree/master/src/main/java/com

    日志系统设置

      url:以select、save、update、remove开头。

      通过filter记录请求功的url。格式为json格式,字段包括channel(来源渠道web、wap、app等)、operate_type(操作类型)、first_model(菜单第一模块)、second_model(菜单第二模块)、data(url传递的参数)、ip(请求者ip)、account_id(用户账号id)、time(时间,有系统自动生成),url(请求的url地址)、remark(自定义备注)

      表结构相同。

    flume配置:

      由于flume没有直接sink到oracle的jar包,这里自己自定义sink,偷懒,直接通过mybatis保存到数据库。。。

      flume在conf里配置设置

    a1.sinks.k1.type = com.myflume.OracleSink
    a1.sinks.k1.jdbc_url = jdbc:oracle:thin:@ip:port:实例名
    a1.sinks.k1.jdbc_username = username
    a1.sinks.k1.jdbc_password = password
    #设置多少跳数据提交一次。数据量大,数据精度要求不高可以设置高一点 a1.sinks.k1.jdbc_batchsize = 5
    #需要保存的表名 a1.sinks.k1.jdbc_tablename =
    tablename

    java代码的实现说明:

    1、获取日志的 { 与 } 之间的数据,将其转为json。

    2、json的key必须和table的字段相同。只有这样才能保存,否则该字段不会入库。

    3、由于java无法识别日志过多的数据格式,所以只能保存数字与字符串类型。同样数据也必须设置为相同类型。否则会报错。

    以下是代码:

    com.myflume.OracleSink
    package com.myflume;
    
    import com.common.SpringContextHolder;
    import com.service.LogInfoService;
    import net.sf.json.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.apache.tomcat.jdbc.pool.DataSource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 自定义sink
     *
     * @author wdj on 2018/6/8
     */
    public class OracleSink extends AbstractSink implements Configurable{
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        private Integer tryCount = 0;
    
        //MAX_TRY_COUNT 次尝试提交之后若数据个数还未达到batchSize,则试着提交
        private final Integer MAX_TRY_COUNT = 2;
    
        private String jdbcurl;
        private String username;
        private String password;
        private Integer batchSize;
        private String tablename;
        private DataSource dataSource;
        LogInfoService logInfoService;
        private List<Map<String,Object>> datas = new ArrayList<>();
    
        // 获取flume的配置参数
        @Override
        public void configure(Context context) {
            ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
                    new String[] { "classpath:spring-context.xml" });
            applicationContext.start();
        //通过spring管理bean logInfoService = SpringContextHolder.getBean("logInfoService"); dataSource = SpringContextHolder.getBean("dataSource"); jdbcurl=context.getString("jdbc_url"); username=context.getString("jdbc_username"); password=context.getString("jdbc_password"); batchSize = context.getInteger("jdbc_batchsize",10); tablename = context.getString("jdbc_tablename"); logger.info("初始化数据 ==== tablename:"+tablename+";jdbcurl:"+jdbcurl+";username:"+username+";batchSize"+batchSize); } // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to @Override public synchronized void start() { if(!StringUtils.isBlank(jdbcurl) && !StringUtils.isBlank(username) && !StringUtils.isBlank(password)){ dataSource = new DataSource(); dataSource.setUrl(jdbcurl); dataSource.setUsername(username); dataSource.setPassword(password); dataSource.setInitialSize(5); dataSource.setMaxActive(20); dataSource.setMinIdle(5); dataSource.setMaxIdle(20); dataSource.setMaxWait(30000); } } // Disconnect from the external respository and do any // additional cleanup @Override public synchronized void stop() { logger.info("sink关闭。。。。。。。。保存缓存中的剩余数据"); if(datas != null && !datas.isEmpty()){ logInfoService.save(tablename,datas); logger.info("提交"+datas.size()+"条数据"); } dataSource.close(); super.stop(); } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { if(StringUtils.isBlank(tablename)){ throw new Exception("tablename不能为空!"); } // This try clause includes whatever Channel operations you want to do long processedEvent = 0; for (; processedEvent < batchSize; processedEvent++) { Event event = ch.take(); byte[] eventBody; if(event != null){ eventBody = event.getBody(); String line= new String(eventBody,"UTF-8"); if (line.length() > 0 ){ int start = line.indexOf('{'); int end = line.lastIndexOf('}'); if(start != -1 && end!= -1){ String dataStr = line.substring(start,end+1); Map<String,Object> map = JSONObject.fromObject(dataStr); datas.add(map); } } }else{ logger.info("even为空,回退。。。"); status = Status.BACKOFF; break; } } boolean canCommit = (status != Status.BACKOFF && datas!=null && !datas.isEmpty()) || (tryCount >= MAX_TRY_COUNT && datas!=null && !datas.isEmpty()); // 将数据复制到临时变量,将data去空,当时若flume在datas浮空后未保存数据就关闭,则还是会丢失一部分数据 List<Map<String,Object>> tem = new ArrayList<>(); tem.addAll(datas); datas = new ArrayList<>(); if(canCommit){ logInfoService.save(tablename,tem); logger.info("提交"+datas.size()+"条数据"); status = Status.READY; tryCount=0; txn.commit(); }else if(status == Status.BACKOFF){ txn.rollback(); tryCount++; }else{ logger.info("数据为空!"); status = Status.BACKOFF; txn.rollback(); tryCount=0; } } catch (Exception e) { txn.rollback(); // Log exception, handle individual exceptions as needed logger.error("保存数据出错:",e); status = Status.BACKOFF; } txn.close(); return status; } public static void main(String[] args){ OracleSink oracleSink = new OracleSink(); oracleSink.configure(null); oracleSink.start(); try { oracleSink.process(); } catch (EventDeliveryException e) { e.printStackTrace(); } } }
    com.service.LogInfoService
    package com.service;
    
    import com.dao.LogInfoDao;
    import com.entity.ColumnDataBean;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.*;
    
    /**
     * description
     *
     * @author wdj on 2018/6/9
     */
    @Service
    public class LogInfoService {
    
        @Resource
        LogInfoDao logInfoDao;
    
        public void save(String tablename,List<Map<String,Object>> datas){
            //除了id所有列
            List<Map<String,String>> columnList = logInfoDao.getColumn(tablename.toUpperCase());
            //使用linkedHashMap保存原有的顺序
            Map<String,String> columns = new LinkedHashMap();
            for (Map<String, String> stringStringMap : columnList) {
                columns.put(stringStringMap.get("COLUMN_NAME"),getJdbcType(stringStringMap.get("DATA_TYPE")));
            }
            List<Map> dataMap = new ArrayList<>();
            for (Map<String, Object> data : datas) {
                data =transformUpperCase(data);
                Map map = new LinkedHashMap();
                for (String s : columns.keySet()) {
                    ColumnDataBean dataBean = new ColumnDataBean();
                    dataBean.setValue(data.get(s));
                    dataBean.setType(columns.get(s));
                    //保存字段值,及字段类型
                    map.put(s,dataBean);
                }
                dataMap.add(map);
            }
            logInfoDao.save(tablename,dataMap);
        }
    
        /**
         * 将map的key转为大写
         * @param orgMap
         * @return
         */
        public Map<String, Object> transformUpperCase(Map<String, Object> orgMap) {
            Map<String, Object> resultMap = new HashMap<>();
    
            if (orgMap == null || orgMap.isEmpty()) {
                return resultMap;
            }
    
            Set<String> keySet = orgMap.keySet();
            for (String key : keySet) {
                String newKey = key.toUpperCase();
    
                resultMap.put(newKey, orgMap.get(key));
            }
            return resultMap;
        }
    
        /**
         * 根据数据库类型,获取jdbcType,粗略版
         * @param dataSourceType
         * @return
         */
        public String getJdbcType(String dataSourceType){
            if(StringUtils.isBlank(dataSourceType)){
                return "VARCHAR";//默认字符串
            }else if(dataSourceType.indexOf("TIMESTAMP")>-1){
                return "TIMESTAMP";
            }else if(dataSourceType.indexOf("CHAR")>-1){
                return "VARCHAR";
            }else if(dataSourceType.indexOf("NUMBER")>-1){
                return "NUMERIC";
            }else{
                return "VARCHAR";
            }
        }
    
    }
    
    ColumnDataBean就俩个参数,private Object value;private String type;不粘代码了。(PS一下,本来打算直接用map的。但是在dao的save方法里,通过c[VALUE]和c[KEY]只能获取map中固定的一个,不知道是为什么)

    dao实现的xml

    <mapper namespace="com.dao.LogInfoDao">
    
        <select id="getColumn" resultType="map">
            select COLUMN_NAME,DATA_TYPE from USER_TAB_COLUMNS WHERE TABLE_NAME=#{tablename} and  COLUMN_NAME !='ID'
        </select>
    
        <insert id="save">
            insert into ${tablename}
            select * from
            <foreach collection="data" item="d" open="(" close=")" separator="union all">
                select sys_guid(),
                <foreach collection="d" index="k" item="c" separator=",">
                    #{c.value,jdbcType=${c.type}} as ${k}
                </foreach>
                from dual
            </foreach>
        </insert>
    
    </mapper>

    over!byebye,继续努力!



     
  • 相关阅读:
    WordPress搭建的新博客 www.douzujun.club
    调用weka模拟实现 “主动学习“ 算法
    危险!80% 用户正在考虑放弃 Oracle JDK…
    最新!Dubbo 远程代码执行漏洞通告,速度升级
    Tomcat 又爆出高危漏洞!!Tomcat 8.5 ~10 中招…
    Spring Boot 启动,1 秒搞定!
    为什么要重写 hashcode 和 equals 方法?
    详解 Java 中 4 种 IO 模型
    详解GaussDB bufferpool缓存策略,这次彻底懂了!
    【API进阶之路6】一个技术盲点,差点让整个项目翻车
  • 原文地址:https://www.cnblogs.com/jaxlove-it/p/9174638.html
Copyright © 2011-2022 走看看