zoukankan      html  css  js  c++  java
  • Kafka系列四 flume-kafka-storm整合

    flume-kafka-storm

    flume读取日志数据,然后发送至kafka。

    1、flume配置文件

    agent.sources = kafkaSource
    agent.channels = kafkaChannel
    agent.sinks = kafkaSink
    
    agent.sources.kafkaSource.type = exec
    agent.sources.kafkaSource.command = tail -F /home/hadoop/kafkaData/kafka.log
    agent.sources.kafkaSource.channels = kafkaChannel
    
    agent.sinks.kafkaSink.channel = kafkaChannel
    agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.kafkaSink.topic = stormTopic
    agent.sinks.kafkaSink.brokerList = 192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
    agent.sinks.kafkaSink.kafka.flumeBatchSize = 20
    agent.sinks.kafkaSink.kafka.producer.acks = 1
    agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
    agent.sinks.kafkaSink.kafka.producer.compression.type = snappy
    agent.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder 
    
    agent.channels.kafkaChannel.type=memory                                                                                                       
    agent.channels.kafkaChannel.capacity=10000                                                                                                    
    agent.channels.kafkaChannel.transactionCapacity=100

    2、启动flume

    bin/flume-ng agent --conf-file  conf/flume-kafka.conf -c conf/ --name agent -Dflume.root.logger=DEBUG,console

    3、需要在flume机器上修改hosts文件,添加上kafka的主机名和ip的映射。

    4、在kafka上创建主题

    bin/kafka-topics.sh --create --zookeeper hadoop2:2181 --replication-factor 1 --partitions 3 --topic stormTopic

    5、模拟生成日志脚本

    for((i=0;i<=10000;i++));
    do echo "kafka_test-"$i>>/home/hadoop/kafkaData/kafka.log;
    done

    6、在kafka上开启消费者

    bin/kafka-console-consumer.sh --zookeeper hadoop2:2181 --from-beginning --topic stormTopic

    至此,flum->kafka的数据流走通。

    7、整合Storm,将kafka作为stom的spout,将使用KafkaSpout。

     1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     3     <modelVersion>4.0.0</modelVersion>
     4     <groupId>cn.itcast</groupId>
     5     <artifactId>kafkaStorm</artifactId>
     6     <version>0.0.1-SNAPSHOT</version>
     7     <packaging>jar</packaging>
     8     <dependencies>
     9         <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
    10         <dependency>
    11             <groupId>org.apache.storm</groupId>
    12             <artifactId>storm-core</artifactId>
    13             <version>1.2.1</version>
    14             <scope>provided</scope>
    15         </dependency>
    16         <dependency>
    17             <groupId>org.apache.storm</groupId>
    18             <artifactId>storm-kafka</artifactId>
    19             <version>1.2.1</version>
    20         </dependency>
    21         <dependency>
    22             <groupId>org.apache.kafka</groupId>
    23             <artifactId>kafka_2.12</artifactId>
    24             <version>1.0.0</version>
    25             <exclusions>
    26                 <exclusion>
    27                     <groupId>org.slf4j</groupId>
    28                     <artifactId>slf4j-log4j12</artifactId>
    29                 </exclusion>
    30             </exclusions>
    31         </dependency>
    32         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    33         <dependency>
    34             <groupId>org.apache.kafka</groupId>
    35             <artifactId>kafka-clients</artifactId>
    36             <version>1.0.0</version>
    37         </dependency>
    38 
    39 
    40     </dependencies>
    41     <build>
    42         <plugins>
    43             <!-- 资源文件拷贝插件 -->
    44             <plugin>
    45                 <groupId>org.apache.maven.plugins</groupId>
    46                 <artifactId>maven-resources-plugin</artifactId>
    47                 <version>2.7</version>
    48                 <configuration>
    49                     <encoding>UTF-8</encoding>
    50                 </configuration>
    51             </plugin>
    52             <!-- java编译插件 -->
    53             <plugin>
    54                 <groupId>org.apache.maven.plugins</groupId>
    55                 <artifactId>maven-compiler-plugin</artifactId>
    56                 <version>3.2</version>
    57                 <configuration>
    58                     <source>1.8</source>
    59                     <target>1.8</target>
    60                     <encoding>UTF-8</encoding>
    61                 </configuration>
    62             </plugin>
    63             <plugin>
    64                 <groupId>org.apache.maven.plugins</groupId>
    65                 <artifactId>maven-jar-plugin</artifactId>
    66                 <version>2.4</version>
    67             </plugin>
    68             <plugin>
    69                 <groupId>org.apache.maven.plugins</groupId>
    70                 <artifactId>maven-assembly-plugin</artifactId>
    71                 <version>2.4</version>
    72                 <configuration>
    73                     <descriptorRefs>
    74                         <descriptorRef>jar-with-dependencies</descriptorRef>
    75                     </descriptorRefs>
    76                     <archive>
    77                         <manifest>
    78                             <mainClass>cn.itcast.kafka.Kafka2Storm</mainClass>
    79                         </manifest>
    80                     </archive>
    81                 </configuration>
    82                 <executions>
    83                     <execution>
    84                         <id>make-assembly</id>
    85                         <phase>package</phase>
    86                         <goals>
    87                             <goal>single</goal>
    88                         </goals>
    89                     </execution>
    90                 </executions>
    91             </plugin>
    92         </plugins>
    93     </build>
    94 </project>
    pom.xml
     1 package cn.itcast.kafka;
     2 
     3 import org.apache.storm.Config;
     4 import org.apache.storm.LocalCluster;
     5 import org.apache.storm.generated.AlreadyAliveException;
     6 import org.apache.storm.generated.AuthorizationException;
     7 import org.apache.storm.generated.InvalidTopologyException;
     8 import org.apache.storm.kafka.BrokerHosts;
     9 import org.apache.storm.kafka.KafkaSpout;
    10 import org.apache.storm.kafka.SpoutConfig;
    11 import org.apache.storm.kafka.ZkHosts;
    12 import org.apache.storm.topology.TopologyBuilder;
    13 
    14 public class Kafka2Storm {
    15     public static void main(String[] args)
    16             throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
    17         TopologyBuilder topologyBuilder = new TopologyBuilder();
    18         BrokerHosts hosts = new ZkHosts("192.168.25.142:2181,192.168.25.143:2181,192.168.25.144:2181");
    19         /**
    20          * hosts:用以获取Kafka broker和partition的信息,在zk上获取,此处填写zk的地址
    21          * topic:从哪个topic读取消息 zkRoot:进度信息记录于zookeeper的哪个路径下
    22          * id:进度记录的id,想要一个新的Spout读取之前的记录,应把它的id设为跟之前的一样
    23          */
    24         SpoutConfig spoutConfig = new SpoutConfig(hosts, "stormTopic", "/mykafka", "kafkaSpout");
    25         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    26         topologyBuilder.setSpout("kafkaSpout", kafkaSpout);
    27         // 将一行行的文本切分成单词
    28         topologyBuilder.setBolt("valueBolt", new ValueBolt(), 1).shuffleGrouping("kafkaSpout");
    29         // 启动topology的配置信息
    30         Config config = new Config();
    31         // 定义集群分配多少个工作进程来执行这个topology
    32         config.setNumWorkers(3);
    33         
    34          LocalCluster localCluster = new LocalCluster();
    35          localCluster.submitTopology("kafkaStomrTopology", config,
    36          topologyBuilder.createTopology());
    37         // 集群模式提交topology
    38 //        StormSubmitter.submitTopologyWithProgressBar("kafkaStomrTopology", config, topologyBuilder.createTopology());
    39     }
    40 }
    Kafka2Storm.java
     1 package cn.itcast.kafka;
     2 
     3 import java.util.Map;
     4 
     5 import org.apache.storm.task.OutputCollector;
     6 import org.apache.storm.task.TopologyContext;
     7 import org.apache.storm.topology.OutputFieldsDeclarer;
     8 import org.apache.storm.topology.base.BaseRichBolt;
     9 import org.apache.storm.tuple.Tuple;
    10 import org.slf4j.Logger;
    11 import org.slf4j.LoggerFactory;
    12 
    13 public class ValueBolt extends BaseRichBolt {
    14     Logger logger = LoggerFactory.getLogger(ValueBolt.class);
    15 
    16     /**
    17      * 
    18      */
    19     private static final long serialVersionUID = 1L;
    20 
    21     @Override
    22     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    23         // TODO Auto-generated method stub
    24 
    25     }
    26 
    27     @Override
    28     public void execute(Tuple input) {
    29         logger.info(new String((byte[]) input.getValue(0)));
    30     }
    31 
    32     @Override
    33     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    34         // TODO Auto-generated method stub
    35 
    36     }
    37 
    38 }
    ValueBolt.java
  • 相关阅读:
    python编程学习进度七
    python编程学习进度六
    SOA——2020.5.15
    代码大全001/
    Refined Architecture阶段——细化架构
    架构即未来003(摘自网络)
    我对外包公司的小小看法
    架构即未来002
    每日日报
    架构即未来阅读笔记001
  • 原文地址:https://www.cnblogs.com/zhaobingqing/p/8609336.html
Copyright © 2011-2022 走看看