zoukankan      html  css  js  c++  java
  • Kafka+Storm写入Hbase和HDFS

    1.Storm整合Kafka

    使用Kafka作为数据源,起到缓冲的作用

     1  // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
     2 String zks = KafkaProperties.Connect;
     3 BrokerHosts brokerHosts = new ZkHosts(zks);
     4 String topic = KafkaProperties.topic;
     5 String group = KafkaProperties.groupId;
     6 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, "/storm", group);
     7 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
     8 spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.211.1","192.168.211.2","192.168.211.3"});
     9 spoutConfig.zkPort = 2181;
    10 spoutConfig.ignoreZkOffsets = true;
    11 spoutConfig.startOffsetTime=-2L;
    12 
    13 KafkaSpout receiver = new KafkaSpout(spoutConfig);
    14 topologyBuilder.setSpout("kafka-spout", receiver);

    KafkaProperties:

    /**
     * 配置一些Storm从kafka取数据时,一些关于数据源的配置信息
     * @author kongc
     *
     */
    public interface KafkaProperties {
       final static String Connect = "192.168.211.1:2181,192.168.211.2:2181,192.168.211.3:2181";
       final static String groupId = "kafka";
       final static String topic = "test_topic";
    }

    2.Storm整合HDFS

    我们希望按照日期,创建文件,将Storm计算后的数据写入HDFS

    采取的策略是通过获取系统当前时间,然后格式化成所要命名的字符串作为path,然后判断这个路径是否存在,存在则追加写入,不存在则创建。

    /***************将数据存入HDFS**********************/
    Path path = new Path("hdfs://192.168.1.170:8020/user/hive/warehouse/test_oee/" + format + "oee.txt");
    synchronized (path) {
       try {
          if(KafkaTopology.fileSystem.exists(path)!=true){
             System.out.println("*************create*************");
             KafkaTopology.FDoutputStream = KafkaTopology.fileSystem.create(path, true);
          }else{
             if(KafkaTopology.FDoutputStream ==null){
                System.out.println("**************append*************");
                KafkaTopology.FDoutputStream = KafkaTopology.fileSystem.append(path);
             }
          }
          String data = mesg.getEquipment_name()+","+mesg.getDown_time()+","+mesg.getQualified_count()+","+mesg.getQualified_count()+","+mesg.getAll_count()+","+mesg.getPlan_time()+","+mesg.getProduce_time()+"\n";
          KafkaTopology.FDoutputStream.write(data.getBytes());
          KafkaTopology.FDoutputStream.close();
          KafkaTopology.FDoutputStream = null;
       } catch (IOException e) {
          e.printStackTrace();
       }
    
    }

    Storm整合Hbase

    Storm写入Hbase

     /****************存入Hbase*****************/
    String[] value = {
          mesg.getEquipment_name(),
          mesg.getDown_time(),
          mesg.getQualified_count(),
          mesg.getAll_count(),
          mesg.getPlan_time(),
          mesg.getProduce_time()
    };
    //System.out.println("hbase==>:"+value.toString());
    HbaseHelper.insertData(
          KafkaTopology.tableName, 
          mesg.getEquipment_name()+Math.random()*1000000000, 
          KafkaTopology.family,value
    );
    this.collector.ack(input);

    在调试Storm的过程中遇到一些问题。

    错误信息:

    NIOServerCnxn - caught end of stream exception
    ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15cf25cbf2d000d, likely client has closed socket
    Caused by: java.lang.NullPointerException
    ERROR o.a.s.util - Halting process: ("Worker died")

    错误原因:

    追踪源码找到打印此语句的位置

    /** Read the request payload (everything following the length prefix) */
        private void readPayload() throws IOException, InterruptedException {
            if (incomingBuffer.remaining() != 0) { // have we read length bytes?
                    //尝试一次读进来
                int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
            }
         //一次读完
            if (incomingBuffer.remaining() == 0) { // have we read length bytes?
                    //server的packet统计
                packetReceived();
                    //准备使用这个buffer了
                incomingBuffer.flip();
                    //如果CoonectRequst还没来,那第一个packet肯定是他了
                if (!initialized) {
                    readConnectRequest();
                } 
                    //处理请他请求
                else {
                    readRequest();
                }
                    //清理现场,为下一个packet读做准备
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
            }
        }
  • 相关阅读:
    彻底解决SQL SERVER 2008无法远程连接的问题
    将ReportingService 2008配置为匿名访问
    将低版本的数据库迁移到sqlserver 2008
    Oracle 11G R2
    Reporting Services 安装的备份和还原操作
    DefaultValue
    用户 'IIS APPPOOL\DefaultAppPool' 登录失败。
    在IIS中为SQL Server 2008配置报表服务
    数据库日志维护方式
    如何卸载的 SQL Server 2008 实例
  • 原文地址:https://www.cnblogs.com/kongcong/p/7112029.html
Copyright © 2011-2022 走看看