zoukankan      html  css  js  c++  java
  • Flume+HBase+Kafka集成与开发

    先把flume1.7的源码包下载

    http://archive.apache.org/dist/flume/1.7.0/

    下载解压后

    我们通过IDEA这个软件来打开这个工程

     

     

    点击ok后我们选择打开一个新的窗口

     不过这个默认方式导入加载时间很长,建议大家用maven方式导入。

    导入之后我们看这个类

     

    看看我们的数据源,就是我们之前下载好的搜狗实验室的数据,之前已经上传到节点1去了

    这个是我们要配置flume的模型

    下面我们来配置节点1的flume

    配置jdk的绝对路径 

     

    下面这个配置暂时这样配置先,往后可能会修改

     

    下面对下载好的数据进行预处理一下,因为下载下来的数据格式比较混乱

     先是按行来把制表符换成逗号,然后生成weblog2.log

    接下来是按行把空格换成逗号生成weblog3.log

    这样子我们就统一用逗号隔开了

    把没用的文件删除掉

     改下名字

     把预处理完的weblog.log文件分发到节点2 和节点3上去

     

    我们对导入的flume源码进行二次开发

    我们不要动他原来的,我们新建一个类

     然后把这个类的内容拷过来然后修改文件名和类名

     

    package org.apache.flume.sink.hbase;
    
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    
    import com.google.common.base.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.conf.ComponentConfiguration;
    import org.hbase.async.AtomicIncrementRequest;
    import org.hbase.async.PutRequest;
    
    import java.util.ArrayList;
    import java.util.List;
    //package org.apache.flume.sink.hbase;
    
    import com.google.common.base.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.conf.ComponentConfiguration;
    import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
    import org.hbase.async.AtomicIncrementRequest;
    import org.hbase.async.PutRequest;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * A simple serializer to be used with the AsyncHBaseSink
     * that returns puts from an event, by writing the event
     * body into it. The headers are discarded. It also updates a row in hbase
     * which acts as an event counter.
     *
     * Takes optional parameters:<p>
     * <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
     * <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
     * <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
     *
     * Mandatory parameters: <p>
     * <tt>cf:</tt>Column family.<p>
     * Components that have no defaults and will not be used if absent:
     * <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,
     * event data will not be written.<p>
     * <tt>incrementColumn:</tt> Which column to increment. If this is absent, it
     *  means no column is incremented.
     */
    public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
        private byte[] table;
        private byte[] cf;
        private byte[] payload;
        private byte[] payloadColumn;
        private byte[] incrementColumn;
        private String rowPrefix;
        private byte[] incrementRow;
        private SimpleHbaseEventSerializer.KeyType keyType;
    
        @Override
        public void initialize(byte[] table, byte[] cf) {
            this.table = table;
            this.cf = cf;
        }
    
        @Override
        public List<PutRequest> getActions() {
            List<PutRequest> actions = new ArrayList<PutRequest>();
            if (payloadColumn != null) {
                byte[] rowKey;
                try {
                    switch (keyType) {
                        case TS:
                            rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
                            break;
                        case TSNANO:
                            rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
                            break;
                        case RANDOM:
                            rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
                            break;
                        default:
                            rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
                            break;
                    }
                    PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                            payloadColumn, payload);
                    actions.add(putRequest);
                } catch (Exception e) {
                    throw new FlumeException("Could not get row key!", e);
                }
            }
            return actions;
        }
    
        public List<AtomicIncrementRequest> getIncrements() {
            List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
            if (incrementColumn != null) {
                AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
                        incrementRow, cf, incrementColumn);
                actions.add(inc);
            }
            return actions;
        }
    
        @Override
        public void cleanUp() {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public void configure(Context context) {
            String pCol = context.getString("payloadColumn", "pCol");
            String iCol = context.getString("incrementColumn", "iCol");
            rowPrefix = context.getString("rowPrefix", "default");
            String suffix = context.getString("suffix", "uuid");
            if (pCol != null && !pCol.isEmpty()) {
                if (suffix.equals("timestamp")) {
                    keyType = SimpleHbaseEventSerializer.KeyType.TS;
                } else if (suffix.equals("random")) {
                    keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;
                } else if (suffix.equals("nano")) {
                    keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;
                } else {
                    keyType = SimpleHbaseEventSerializer.KeyType.UUID;
                }
                payloadColumn = pCol.getBytes(Charsets.UTF_8);
            }
            if (iCol != null && !iCol.isEmpty()) {
                incrementColumn = iCol.getBytes(Charsets.UTF_8);
            }
            incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
        }
    
        @Override
        public void setEvent(Event event) {
            this.payload = event.getBody();
        }
    
        @Override
        public void configure(ComponentConfiguration conf) {
            // TODO Auto-generated method stub
        }
    
    }
    在原来基础上稍微做修改

     

    按住ctrl键单机鼠标进去

    添加以下内容

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    package org.apache.flume.sink.hbase;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Random;
    import java.util.UUID;
    
    /**
     * Utility class for users to generate their own keys. Any key can be used,
     * this is just a utility that provides a set of simple keys.
     */
    public class SimpleRowKeyGenerator {
    
      public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
        return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
      }
    
      public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
        return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
      }
    
      public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
        return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
      }
    
      public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
        return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
      }
    
      public static byte[] getKfkRowKey(String userid,String datetime) throws UnsupportedEncodingException {
        return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
      }
      
    }

     继续修改,修改后的代码是下面的

     KfkAsyncHbaseEventSerializer.java
    package org.apache.flume.sink.hbase;
    
    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    
    import com.google.common.base.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.conf.ComponentConfiguration;
    import org.hbase.async.AtomicIncrementRequest;
    import org.hbase.async.PutRequest;
    
    import java.util.ArrayList;
    import java.util.List;
    //package org.apache.flume.sink.hbase;
    
    import com.google.common.base.Charsets;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.conf.ComponentConfiguration;
    import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
    import org.hbase.async.AtomicIncrementRequest;
    import org.hbase.async.PutRequest;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * A simple serializer to be used with the AsyncHBaseSink
     * that returns puts from an event, by writing the event
     * body into it. The headers are discarded. It also updates a row in hbase
     * which acts as an event counter.
     *
     * Takes optional parameters:<p>
     * <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
     * <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
     * <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
     *
     * Mandatory parameters: <p>
     * <tt>cf:</tt>Column family.<p>
     * Components that have no defaults and will not be used if absent:
     * <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,
     * event data will not be written.<p>
     * <tt>incrementColumn:</tt> Which column to increment. If this is absent, it
     *  means no column is incremented.
     */
    public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
        private byte[] table;
        private byte[] cf;
        private byte[] payload;
        private byte[] payloadColumn;
        private byte[] incrementColumn;
        private String rowPrefix;
        private byte[] incrementRow;
        private SimpleHbaseEventSerializer.KeyType keyType;
    
        @Override
        public void initialize(byte[] table, byte[] cf) {
            this.table = table;
            this.cf = cf;
        }
    
        @Override
        public List<PutRequest> getActions() {
            List<PutRequest> actions = new ArrayList<PutRequest>();
            if (payloadColumn != null) {
                byte[] rowKey;
                try {
    
    
                    String [] columns =String.valueOf(payloadColumn).split(",");
                    String [] values =String.valueOf(this.payload).split(",");
                           for(int i=0;i<columns.length;i++) {
                               byte[] colColumn=columns[i].getBytes();
                               byte[] colValue=values[i].getBytes(Charsets.UTF_8);
                               if(colColumn.length!=colValue.length)  break;  //continue;
                              // if(colValue.length<3) continue;
                               String datetime = values[0].toString();
                               String userid = values[1].toString();
                               rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
                               //获取6个列的值最终加载到hbase
                               PutRequest putRequest = new PutRequest(table, rowKey, cf,
                                       colColumn, colValue);
                               actions.add(putRequest);
                           }
                } catch (Exception e) {
                    throw new FlumeException("Could not get row key!", e);
                }
            }
            return actions;
        }
    
        public List<AtomicIncrementRequest> getIncrements() {
            List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();
            if (incrementColumn != null) {
                AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
                        incrementRow, cf, incrementColumn);
                actions.add(inc);
            }
            return actions;
        }
    
        @Override
        public void cleanUp() {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public void configure(Context context) {
            String pCol = context.getString("payloadColumn", "pCol");
            String iCol = context.getString("incrementColumn", "iCol");
            rowPrefix = context.getString("rowPrefix", "default");
            String suffix = context.getString("suffix", "uuid");
            if (pCol != null && !pCol.isEmpty()) {
                if (suffix.equals("timestamp")) {
                    keyType = SimpleHbaseEventSerializer.KeyType.TS;
                } else if (suffix.equals("random")) {
                    keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;
                } else if (suffix.equals("nano")) {
                    keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;
                } else {
                    keyType = SimpleHbaseEventSerializer.KeyType.UUID;
                }
                payloadColumn = pCol.getBytes(Charsets.UTF_8);
            }
            if (iCol != null && !iCol.isEmpty()) {
                incrementColumn = iCol.getBytes(Charsets.UTF_8);
            }
            incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
        }
    
        @Override
        public void setEvent(Event event) {
            this.payload = event.getBody();
        }
    
        @Override
        public void configure(ComponentConfiguration conf) {
            // TODO Auto-generated method stub
        }
    
    }


    现在把代码打包

    我们可以看到有很多相关的依赖包,我们把不需要的删掉

    
    

    
    

     

    直接点击Build就可以了 

    打好的架包在本地的工程路径的这里

    现在把这个架包上传到flume的lib目录下

    也就是这个目录。

    可以看到上传日期,就是今天上传的

    下面配置flume + kafka

    agent1.sources = r1
    agent1.channels = kafkaC hbaseC
    agent1.sinks=kafkaSink hbaseSink
    
    #***********flume + hbase************
    agent1.sources.r1.type = avro
    agent1.sources.r1.channels = hbaseC
    agent1.sources.r1.bind = bigdata-pro01.kfk.com
    agent1.sources.r1.port=5555
    agent1.sources.r1.threads=5
    
    agent1.channels.hbaseC.type = memory
    agent1.channels.hbaseC.capacity = 100000
    agent1.channels.hbaseC.transactionCapacity = 100000
    agent1.channels.hbaseC.keep-alive=20
    
    agent1.sinks.hbaseSink.type = asynchbase
    agent1.sinks.hbaseSink.table=weblogs
    agent1.sinks.hbaseSink.columnFamily=info
    agent1.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer 
    agent1.sinks.hbaseSink.channel = hbaseC
    agent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl
    
    #**************flume + kafka***************
    agent1.channels.kafkaC.type = memory
    agent1.channels.kafkaC.capacity = 100000
    agent1.channels.kafkaC.transactionCapacity = 100000
    agent1.channels.kafkaC.keep-alive=20
    
    agent1.sinks.kafkaSink.channel = kafkaC
    agent1.sinks.kafkaSink.type= org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.kafkaSink.kafka.brokerList=bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092
    agent1.sinks.kafkaSink.topic=test
    agent1.sinks.kafkaSink.zookeeperConnect=bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181
    agent1.sinks.kafkaSink.requiredAcks=1
    agent1.sinks.kafkaSink.batchSize=1
    agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
  • 相关阅读:
    快速构建一个vue项目的开发环境(基础)
    一个vue的简单例子
    webpack基础使用
    程序日志正常,linux进程异常终止,如何查看日志
    mysql多字段排序
    mysql连表
    go网络
    go通道关闭
    【软考】CMMI的5个等级和22个过程域
    maven工程导入时解决Cannot change version of project facet Dynamic Web Module to 2.3
  • 原文地址:https://www.cnblogs.com/braveym/p/8320904.html
Copyright © 2011-2022 走看看