zoukankan      html  css  js  c++  java
  • Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES

    1.环境

    • Mysql 5.6
    • Sqoop 1.4.6
    • Hadoop 2.5.2
    • HBase 0.98
    • Elasticsearch 2.3.5

    2.安装(略过)

    3.HBase Coprocessor实现

    HBase Observer

    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.elasticsearch.client.Client;
    //import org.elasticsearch.client.transport.TransportClient;
    //import org.elasticsearch.common.settings.ImmutableSettings;
    //import org.elasticsearch.common.settings.Settings;
    //import org.elasticsearch.common.transport.InetSocketTransportAddress;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    //import java.util.NavigableMap;
    
    public class DataSyncObserver extends BaseRegionObserver {
    
       private static Client client = null;
       private static final Log LOG = LogFactory.getLog(DataSyncObserver.class);
    
    
       /**
        * 读取HBase Shell的指令参数
        *
        * @param env
        */
       private void readConfiguration(CoprocessorEnvironment env) {
           Configuration conf = env.getConfiguration();
           Config.clusterName = conf.get("es_cluster");
           Config.nodeHost = conf.get("es_host");
           Config.nodePort = conf.getInt("es_port", -1);
           Config.indexName = conf.get("es_index");
           Config.typeName = conf.get("es_type");
    
           LOG.info("observer -- started with config: " + Config.getInfo());
       }
    
    
       @Override
       public void start(CoprocessorEnvironment env) throws IOException {
           readConfiguration(env);
    //        Settings settings = ImmutableSettings.settingsBuilder()
    //                .put("cluster.name", Config.clusterName).build();
    //        client = new TransportClient(settings)
    //                .addTransportAddress(new InetSocketTransportAddress(
    //                        Config.nodeHost, Config.nodePort));
           client = MyTransportClient.client;
       }
    
    
       @Override
       public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
           try {
               String indexId = new String(put.getRow());
               Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
    //            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
               Map<String, Object> json = new HashMap<String, Object>();
               for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                   for (Cell cell : entry.getValue()) {
                       String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                       String value = Bytes.toString(CellUtil.cloneValue(cell));
                       json.put(key, value);
                   }
               }
               System.out.println();
               ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json));
               LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);
           } catch (Exception ex) {
               LOG.error(ex);
           }
       }
    
       @Override
       public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
           try {
               String indexId = new String(delete.getRow());
               ElasticSearchOperator.addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, indexId));
               LOG.info("observer -- delete a doc: " + indexId);
           } catch (Exception ex) {
               LOG.error(ex);
           }
       }
    
    }

    ES方法

    import org.elasticsearch.action.bulk.BulkRequestBuilder;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.delete.DeleteRequestBuilder;
    import org.elasticsearch.action.update.UpdateRequestBuilder;
    import org.elasticsearch.client.Client;
    //import org.elasticsearch.client.transport.TransportClient;
    //import org.elasticsearch.common.settings.ImmutableSettings;
    //import org.elasticsearch.common.settings.Settings;
    //import org.elasticsearch.common.transport.InetSocketTransportAddress;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ElasticSearchOperator {
    
       // 缓冲池容量
       private static final int MAX_BULK_COUNT = 10;
       // 最大提交间隔(秒)
       private static final int MAX_COMMIT_INTERVAL = 60 * 5;
    
       private static Client client = null;
       private static BulkRequestBuilder bulkRequestBuilder = null;
    
       private static Lock commitLock = new ReentrantLock();
    
       static {
    
           // elasticsearch1.5.0
    //        Settings settings = ImmutableSettings.settingsBuilder()
    //                .put("cluster.name", Config.clusterName).build();
    //        client = new TransportClient(settings)
    //                .addTransportAddress(new InetSocketTransportAddress(
    //                        Config.nodeHost, Config.nodePort));
    
           // 2.3.5
           client = MyTransportClient.client;
    
           bulkRequestBuilder = client.prepareBulk();
           bulkRequestBuilder.setRefresh(true);
    
           Timer timer = new Timer();
           timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);
       }
    
       /**
        * 判断缓存池是否已满,批量提交
        *
        * @param threshold
        */
       private static void bulkRequest(int threshold) {
           if (bulkRequestBuilder.numberOfActions() > threshold) {
               BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
               if (!bulkResponse.hasFailures()) {
                   bulkRequestBuilder = client.prepareBulk();
               }
           }
       }
    
       /**
        * 加入索引请求到缓冲池
        *
        * @param builder
        */
       public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
           commitLock.lock();
           try {
               bulkRequestBuilder.add(builder);
               bulkRequest(MAX_BULK_COUNT);
           } catch (Exception ex) {
               ex.printStackTrace();
           } finally {
               commitLock.unlock();
           }
       }
    
       /**
        * 加入删除请求到缓冲池
        *
        * @param builder
        */
       public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
           commitLock.lock();
           try {
               bulkRequestBuilder.add(builder);
               bulkRequest(MAX_BULK_COUNT);
           } catch (Exception ex) {
               ex.printStackTrace();
           } finally {
               commitLock.unlock();
           }
       }
    
       /**
        * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步
        */
       static class CommitTimer extends TimerTask {
           @Override
           public void run() {
               commitLock.lock();
               try {
                   bulkRequest(0);
               } catch (Exception ex) {
                   ex.printStackTrace();
               } finally {
                   commitLock.unlock();
               }
           }
       }
    
    }

    打包并上传到hdfs

    mvn clean compile assembly:single
    mv observer-1.0-SNAPSHOT-jar-with-dependencies.jar observer-hb0.98-es2.3.5.jar
    hdfs dfs -put observer-hb0.98-es2.3.5.jar /hbase/lib/

    4.创建HBase表,并启用Coprocessor

    mysql

    hbase shell
    create 'region','data'
    disable 'region'
    alter 'region', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=mysql_region,es_index=hbase,es_port=9300,es_host=localhost'
    enable 'region'

    oracle

    create 'sp','data'
    disable 'sp'
    alter 'sp', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=oracle_sp,es_index=hbase,es_port=9300,es_host=localhost'
    enable 'sp'

    查看

    hbase(main):007:0* describe 'ora_test'
    Table ora_test is ENABLED                                            
    ora_test, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///appdt/hbase
    /lib/observer-hb1.2.2-es2.3.5.jar|com.gavin.observer.DataSyncObserver
    |1001|es_cluster=elas2.3.4,es_type=ora_test,es_index=hbase,es_port=93
    00,es_host=localhost'}                                               
    COLUMN FAMILIES DESCRIPTION                                          
    {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',
     REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MI
    N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B
    LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}     
    1 row(s) in 0.0260 seconds

    删除Coprocessor

    disable 'ora_test' 
    alter 'ora_test',METHOD => 'table_att_unset',NAME =>'coprocessor$1' 
    enable 'ora_test'

    查看删除效果

    hbase(main):011:0> describe 'ora_test'
    Table ora_test is ENABLED                                           
    ora_test                                                             
    COLUMN FAMILIES DESCRIPTION                                          
    {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',
     REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MI
    N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B
    LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}     
    1 row(s) in 0.0200 seconds

    5.使用sqoop上传数据

    mysql

    bin/sqoop import --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username mysql --password 111111 --table TB_REGION --hbase-table region --hbase-row-key REGION_ID --column-family data

    oracle

    bin/sqoop import --connect jdbc:oracle:thin:@192.168.16.223:1521/orcl --username sitts --password password --table SITTS.ESB_SERVICE_PARAM --split-by PARAM_ID --hbase-table sp --hbase-row-key PARAM_ID --column-family data

    6.校验

    HBase

    scan 'region'

    ES

    7.参考

    HBase Observer同步数据到ElasticSearch

    8.注意

    • 同一个Coprocessor用一个index,不同表可以设置不同type,不然index会乱
    • 修改Java代码后,上传到HDFS的jar包文件必须和之前不一样,否则就算卸载掉原有的coprocessor再重新安装也不能生效
    • 如果你有多个表对多个索引/类型的映射,每个表所加载Observer对应的jar包路径不能相同,否则ElasticSearch会串数据
  • 相关阅读:
    关于virtual、非virtual继承函数的调用
    关于文件操作的文件格式与打开方式
    C++ 文件读写操作
    C++语法题
    检测java string变量是否含有中文
    常用知识库
    WMS仓储管理系统
    运输管理
    ipconfig/flushdns 清除系统DNS缓存
    cmd查看域名账号相关的命令
  • 原文地址:https://www.cnblogs.com/itboys/p/9520389.html
Copyright © 2011-2022 走看看