zoukankan      html  css  js  c++  java
  • storm结合kafka

    pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.hc</groupId>
        <artifactId>mysqldetect</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>mysqldetect</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>1.1.1</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>1.1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.12</artifactId>
                <version>1.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.0.0</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    DetectTopology.java

    package com.hc.mysqldetect;

    import java.util.Properties;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.bolt.KafkaBolt;
    import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;

    public class DetectTopology {

     public static void main(String[] args) throws Exception {

      BrokerHosts brokerHosts = new ZkHosts("10.100.66.20:2181");
      SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "mysql_log", "/mysql_log", "kafkaspout");
      spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
      KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

      Properties props = new Properties();
      props.put("bootstrap.servers", "10.100.66.20:9092");
      props.put("acks", "1");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props)
        .withTopicSelector(new DefaultTopicSelector("mysql_inject"))
        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafkaSpout", kafkaSpout);
      builder.setBolt("detectBolt", new DetectBolt()).shuffleGrouping("kafkaSpout");
      builder.setBolt("kafkaBolt", kafkaBolt).shuffleGrouping("detectBolt");

      Config conf = new Config();

     conf.put(Config.WORKER_HEAP_MEMORY_MB, 1152);//默认768
     conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 180);//默认30


      conf.setDebug(true);
      if (args.length == 0) {
       String topologyName = "detectTopology";
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology(topologyName, conf, builder.createTopology());
       Utils.sleep(10000);
       cluster.killTopology(topologyName);
       cluster.shutdown();
      } else {
       conf.setNumWorkers(3);
       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
      }

     }

    }

    MessageScheme.java

    package com.hc.mysqldetect;

    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.List;

    import org.apache.storm.spout.Scheme;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.slf4j.LoggerFactory;
    import org.slf4j.Logger;

    public class MessageScheme implements Scheme {

     private static final long serialVersionUID = 1L;
     private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

     public Fields getOutputFields() {
      return new Fields("msg");
     }

     public List<Object> deserialize(ByteBuffer buffer) {
      String msg = getString(buffer);
      // logger.info("get one message is {}", msg);
      return new Values(msg);

     }

     private String getString(ByteBuffer buffer) {
      Charset charset = null;
      CharsetDecoder decoder = null;
      CharBuffer charBuffer = null;
      try {
       charset = Charset.forName("UTF-8");
       decoder = charset.newDecoder();
       // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
       charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
       return charBuffer.toString();
      } catch (Exception ex) {
       ex.printStackTrace();
       return "";
      }
     }

    }

    DetectBolt.java

    package com.hc.mysqldetect;

    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.slf4j.LoggerFactory;
    import org.slf4j.Logger;

    public class DetectBolt extends BaseBasicBolt {

     private static final long serialVersionUID = 1L;
     private static final Logger logger = LoggerFactory.getLogger(DetectBolt.class);

     public void execute(Tuple tuple, BasicOutputCollector collector) {
      String msg = (String) tuple.getValue(0);
      String out = "Message got is '" + msg + "'!";
      // logger.info("out={}", out);
      collector.emit(new Values(out));
     }

     public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("message"));
     }

    }

  • 相关阅读:
    【转】你刚才在淘宝上买了一件东西【技术普及贴】
    Hibernate使用自定义脚本替换注解或者xml文件中的自动生成表结构
    北京地铁和广州地铁之感想
    使用eclipse开发工具与hibernate开发者为开源一起做贡献
    hdu 1159 Common Subsequence(最长公共子序列LCS)
    题解报告:hdu 2059 龟兔赛跑
    循环顺序队列模拟病人看病程序
    题解报告:hdu 1060 Leftmost Digit
    ACM_求N^N的最高位数
    ACM_Encoding
  • 原文地址:https://www.cnblogs.com/hanfeihan1992/p/8258714.html
Copyright © 2011-2022 走看看