zoukankan      html  css  js  c++  java
  • flume 测试 hive sink

    测试flume,将数据送到hive表中,首先建表。

    create table order_flume(
    order_id string,
    user_id string,
    eval_set string,
    order_number string,
    order_dow string,
    order_hour_of_day string,
    days_since_prior_order string)
    clustered by (order_id) into 5 buckets
    stored as orc; # 记得加 orc,不然后面会出错
    

      flume conf 配置如下:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -f /root/code/flume_exec_test.txt
    
    # Describe the sink
    a1.sinks.k1.type = hive
    a1.sinks.k1.hive.metastore = thrift://master:9083
    a1.sinks.k1.hive.database = hw
    a1.sinks.k1.hive.table = order_flume
    a1.sinks.k1.serializer = DELIMITED
    a1.sinks.k1.serializer.delimiter = "," 
    a1.sinks.k1.serializer.serdeSeparator = ','
    a1.sinks.k1.serializer.fieldnames = order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000 # 设置大一点,默认是1000
    a1.channels.c1.transactionCapacity = 1000 # 设置大一点,默认是100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

      这个时候如果启动flume的话会报错,需要将hive中的jar包移动到flume 中。

    /usr/local/src/apache-hive-1.2.2-bin/hcatalog/share/hcatalog/* $FLUME_HOME/lib
    
    /usr/local/src/apache-hive-1.2.2-bin/lib/* $FLUME_HOME/lib
    

      此时,在修改修改 hive-site.xml,将下面的值进行修改。

          <property>
    		<name>hive.support.concurrency</name>
    		<value>true</value>
    	</property>
    	<property>
    		<name>hive.exec.dynamic.partition.mode</name>
    		<value>nonstrict</value>
    	</property>
    	<property>
    		<name>hive.txn.manager</name>
    		<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    	</property>
    	<property>
    		<name>hive.compactor.initiator.on</name>
    		<value>true</value>
    	</property>
    	<property>
    		<name>hive.compactor.worker.threads</name>
    		<value>1</value>
    	</property>
    

      上面的配置完成之后,先启动 hive metastore,在启动 flume。

    # 一定要重启
    # 先重启 mysql
    # 再重启 hadoop
    # 再启动 hive metstore
    # 再启动 flume
    
    service mysql restart
    
    start-dfs.sh
    start-yarn.sh
    
    hive --service metastore
    
    flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console
    

      一开始调试的时候,配置的都是对的,无论怎么跑都是连不上,都提示失败。

    org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thrift://master:9083', database='hw', table='order_flume', partitionVals=[] }
    	at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
    	at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
    	at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:295)
    	at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
    	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.flume.sink.hive.HiveWriter$TxnBatchException: Failed acquiring Transaction Batch from EndPoint: {metaStoreUri='thrift://master:9083', database='hw', table='oreder_flume', partitionVals=[] }
    	at org.apache.flume.sink.hive.HiveWriter.nextTxnBatch(HiveWriter.java:400)
    	at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:90)
    	... 6 more
    Caused by: org.apache.hive.hcatalog.streaming.TransactionBatchUnAvailable: Unable to acquire transaction batch on end point: {metaStoreUri='thrift://master:9083', database='hw', table='order_flume', partitionVals=[] }
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:514)
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:464)
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatchImpl(HiveEndPoint.java:351)
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatch(HiveEndPoint.java:331)
    	at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:395)
    	at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:392)
    	at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	... 1 more
    Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
    	at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
    	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
    	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:4195)
    	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:4182)
    	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.openTxns(HiveMetaStoreClient.java:1988)
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.openTxnImpl(HiveEndPoint.java:523)
    	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:507)
    	... 10 more
    

      重启了几次还是同样的错误,后面尝试先启动 hive,去看一下其中表的情况,发现启动hive失败。

    FAILED: LockException [Error 10280]: Error communicating with the metastore
    

      再次尝试重启 metastore,在尝试启动 hive,一开始尝试2次,发现还是同样的错误,结果过了几分钟,再次重新启动后,发现它自己好了,一直没有弄明白为什么,可能是之前没重启成功,重新编辑一下 hive-site.xml文件,在次保存退出,在重启可能会好。

    最终flume 链接 hive 成功,信息如下。

    2019-07-20 12:26:28,968 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:342)] k1: Creating Writer to Hive end point : {metaStoreUri='thrift://master:9083', database='hw', table='order_flume', partitionVals=[] }
    2019-07-20 12:26:30,747 (hive-k1-call-runner-0) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:377)] Trying to connect to metastore with URI thrift://master:9083
    2019-07-20 12:26:30,865 (hive-k1-call-runner-0) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:473)] Connected to metastore.
    2019-07-20 12:26:31,494 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:377)] Trying to connect to metastore with URI thrift://master:9083
    2019-07-20 12:26:31,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:473)] Connected to metastore.
    2019-07-20 12:26:35,677 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveWriter.nextTxnBatch(HiveWriter.java:335)] Acquired Txn Batch TxnIds=[1...100] on endPoint = {metaStoreUri='thrift://master:9083', database='hw', table='order_flume', partitionVals=[] }. Switching to first txn
    2019-07-20 12:26:38,808 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveWriter.commitTxn(HiveWriter.java:275)] Committing Txn 1 on EndPoint: {metaStoreUri='thrift://master:9083', database='hw', table='order_flume', partitionVals=[] }
    

      

  • 相关阅读:
    XSS的原理分析与解剖
    js多少时间之前
    倒计时代码
    js data日期初始化的5种方法
    js日期格式化函数
    删除 Windows Azure 网站上的标准服务器头
    Windows Azure 社区新闻综述(#77 版)
    android 获取本机号码需要root吗?
    宣布正式发布 Biz Talk Services、Azure Active Directory 和 Traffic Manager, 同时发布 Azure Active Directory 高级版预览
    Android 下使用tcpdump网络抓包方法
  • 原文地址:https://www.cnblogs.com/hanwen1014/p/11217385.html
Copyright © 2011-2022 走看看