zoukankan      html  css  js  c++  java
  • Flume 读取JMS 消息队列消息,并将消息写入HDFS

    利用Apache Flume 读取JMS 消息队列消息。并将消息写入HDFS,flume agent配置例如以下:

    flume-agent.conf

      #name the  components on this agent
      agentHdfs.sources  = jms_source
      agentHdfs.sinks =  hdfs_sink
      agentHdfs.channels  = mem_channel


      #  Describe/configure the source

     agentHdfs.sources.jms_source.type  = jms
    # Bind to all interfaces
    agentHdfs.sources.jms_source.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
    agentHdfs.sources.jms_source.connectionFactory = ConnectionFactory
    agentHdfs.sources.jms_source.destinationName = BUSINESS_DATA  #AMQ queue
    agentHdfs.sources.jms_source.providerURL = tcp://hadoop-master:61616
    agentHdfs.sources.jms_source.destinationType = QUEUE



    # Describe  the sink
    agentHdfs.sinks.hdfs_sink.type = hdfs
    agentHdfs.sinks.hdfs_sink.hdfs.path hdfs://hadoop-master/data/flume/%Y-%m-%d/%H
    agentHdfs.sinks.hdfs_sink.hdfs.filePrefix = %{hostname}/events-
    agentHdfs.sinks.hdfs_sink.hdfs.maxOpenFiles = 5000
    agentHdfs.sinks.hdfs_sink.hdfs.batchSize= 500
    agentHdfs.sinks.hdfs_sink.hdfs.fileType = DataStream
    agentHdfs.sinks.hdfs_sink.hdfs.writeFormat =Text
    agentHdfs.sinks.hdfs_sink.hdfs.rollSize = 0
    agentHdfs.sinks.hdfs_sink.hdfs.rollCount = 1000000
    agentHdfs.sinks.hdfs_sink.hdfs.rollInterval = 600
    agentHdfs.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true



    # Use a  channel which buffers events in memory

    agentHdfs.channels.mem_channel.type  = memory
    agentHdfs.channels.mem_channel.capacity  = 1000
    agentHdfs.channels.mem_channel.transactionCapacity  = 100

    # Bind the  source and sink to the channel
    agentHdfs.sources.jms_source.channels  = mem_channel
    agentHdfs.sinks.hdfs_sink.channel  = mem_channel

  • 相关阅读:
    ETL之数据库
    Git的简单实用
    Linux-easy mock部署
    Linux-docker安装mysql
    Linux-安装docker
    Linux-centos7安装Python3和pip3
    Linux-VMware下安装centos7
    Python之hasattr()、getattr()和setattr()
    jsonpath 信息抽取类库
    Python之内置测试框架unittest
  • 原文地址:https://www.cnblogs.com/blfbuaa/p/6884116.html
Copyright © 2011-2022 走看看