  • 日志流量管理





    zkServer.sh start    (三台机器)
    start-all.sh 第一台机器


    flume-ng agent --conf conf/ --conf-file /export/lastwork/flume/Taildir-web.conf --name a1 -Dflume.root.logger=INFO,console
    hdfs dfs -rm -r /flume


     1 systemctl start mysqld.service  


     1 hive 

     1 drop database weblog; 

     1 drop database weblog cascade; 


     1 a1.sources=r1
     2 #数据沉淀,可以有多个,中间用,隔开
     3 a1.sinks=k1
     4 #传输管道,一般只有一个
     5 a1.channels=c1
     6 #采集消息的类型
     7 a1.sources.r1.type=TAILDIR
     8 #检查点文件的路径
     9 a1.sources.r1.positionFile=/var/log/flume/taildir_position.json
    10 #监控该目录下的文件
    11 a1.sources.r1.filegroups=f1 f2
    12 a1.sources.r1.filegroups.f1=/var/log/test1/example.log
    13 a1.sources.r1.filegroups.f2=/var/log/test2/.*log.*
    14 #配置channel
    15 a1.channels.c1.type=memory
    16 a1.channels.c1.capacity=100
    17 a1.channels.c1.transactionCapacity=100
    18 a1.channels.c1.keep-alive=10
    19 a1.channels.c1.byteCapacity=0
    20 #配置sink
    21 #4.配置并描述sink
    22 a1.sinks.k1.type=hdfs              
    23 #4.1. rollInterval、rollCount、rollSize 分别是每隔多少秒写一个文件、往文件中写入数据的最大次数、rollSize文件的大小。也就是达到任意一个条件hdfs上就会生成一个新文件,继续写入
    24 a1.sinks.rollInterval=60
    25 a1.sinks.rollCount=30000
    26 a1.sinks.rollSize=10240
    27 a1.sinks.idleTimeout=60 
    28 a1.sinks.k1.hdfs.round=true
    29 a1.sinks.k1.hdfs.roundValue=10 
    30 a1.sinks.k1.hdfs.roundUnit=minuite
    31 a1.sinks.k1.hdfs.useLocalTimeStamp=true
    32 #沉淀的方式是控制台打印,这里是上传到hdfs
    33 a1.sinks.k1.hdfs.path = hdfs://
    35 a1.sources.r1.channels=c1
    36 a1.sinks.k1.channel=c1


     1 hadoop fs -get /flume/testLog.log /export/lastwork/flumedata/ 



     1 create database weblog; 

     1 create table ods_weblog_origin(
     2 valid string,
     3 remote_addr string,
     4 remote_user string,
     5 time_local string,
     6 request string,
     7 status string,
     8 body_bytes_sent string,
     9 http_referer string,
    10 http_user_agent string)
    11 partitioned by (datestr string)
    12 row format delimited
    13 fields terminated by '01';

     1 hadoop fs -mkdir -p /weblog/preprocessed; 

     1 hadoop fs -put part-m-00000 /weblog/preprocessed; 


     1 load data inpath '/weblog/preprocessed/' overwrite into table ods_weblog_origin partition(datestr='20130918'); 


     1 create table ods_weblog_detail(
     2 valid string,
     3 remote_addr string,
     4 remote_user string,
     5 time_local string,
     6 daystr string,
     7 timestr string,
     8 month string,
     9 day string,
    10 hour string,
    11 request string,
    12 status string,
    13 body_bytes_sent string,
    14 http_referer string,
    15 ref_host string,
    16 ref_path string,
    17 ref_query string,
    18 ref_query_id string,
    19 http_user_agent string
    20 )
    21 partitioned by(datestr string);


    1 create table t_ods_tmp_referurl as
    2 SELECT a.*,b.*
    3 FROM ods_weblog_origin a LATERAL VIEW
    4 parse_url_tuple(regexp_replace(http_referer,""",""),
    5 'HOST','PATH','QUERY','QUERY:id') b as host,path,query,query_id;


    1 create table t_ods_tmp_detail as
    2 select b.*,substring(time_local,0,10) as daystr,
    3 substring(time_local,12) as tmstr,
    4 substring(time_local,6,2) as month,
    5 substring(time_local,9,2) as day,
    6 substring(time_local,11,3) as hour
    7 from t_ods_tmp_referurl b;

    1 set hive.exec.dynamic.partition=true;

    2 set hive.exec.dynamic.partition.mode=nonstrict; 

     1 insert overwrite table ods_weblog_detail partition(datestr)
     2 select distinct otd.valid,otd.remote_addr,otd.remote_user,
     3 otd.time_local,otd.daystr,otd.tmstr,otd.month,otd.day,otd.hour,
     4 otr.request,otr.status,otr.body_bytes_sent,
     5 otr.http_referer,otr.host,otr.path,
     6 otr.query,otr.query_id,otr.http_user_agent,otd.daystr
     7 from t_ods_tmp_detail as otd,t_ods_tmp_referurl as otr
     8 where otd.remote_addr=otr.remote_addr
     9 and otd.time_local=otr.time_local
    10 and otd.body_bytes_sent=otr.body_bytes_sent
    11 and otd.request=otr.request;

     1 create table dw_pvs_everyday(pvs bigint,month string,day string); 

    1 insert into table dw_pvs_everyday
    2 select count(*) as pvs,owd.month as month,owd.day as day
    3 from ods_weblog_detail owd
    4 group by owd.month,owd.day;
    1 create table dw_avgpv_user_everyday(
    2 day string
    3 avgpv string);
    1 insert into table dw_avgpv_user_everyday
    2 select '2013-09-18',sum(b.pvs)/count(b.remote_addr) from
    3 (select remote_addr,count(1) as pvs from ods_weblog_detail where
    4 datestr='2013-09-18' group by remote_addr) b;


     1 create DATABASE sqoopdb; 


    1 bin/sqoop export 
    2 --connect jdbc:mysql://node01:3306/sqoopdb?useSSL=false 
    3 --username root 
    4 --password 123456 
    5 --table t_avgpv_num 
    6 --columns "dateStr,avgPvNum" 
    7 --fields-terminated-by '01' 
    8 --export-dir /user/hive/warehouse/weblog.db/dw_avgpv_user_everyday
