通过kafka客户端发送数据,由KafkaSpout进行接收消息,传输到ConsumerBolt进行实时数据处理。
maven依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
驱动类
/**
* @description: description: kafka 输出数据 到 Strom进行分析 实时分析
* <p>
* @author: upuptop
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @cnblogs: http://www.cnblogs.com/upuptop
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/06/2019/6/5
* <p>
*/
public class Main {
public static void main(String[] args) {
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("KafkaSpout", new KafkaSpout(KafkaSpoutConfig.builder("hadoop137:9092", "first").build()), 1);
tp.setBolt("ConsumerBolt", new ConsumerBolt()).shuffleGrouping("KafkaSpout");
Config cfg = new Config();
cfg.setNumWorkers(1);//指定工作进程数 (jvm数量,分布式环境下可用,本地模式设置无意义)
if (args.length != 0) {
// 集群提交
try {
StormSubmitter.submitTopology(args[0], cfg, tp.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} else {
// 本地提交
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("KafkaToppology", cfg, tp.createTopology());
}
}
}
Bolt类(处理数据类)
/**
* @description: description: 处理kafka传递过来的数据
* <p>
* @author: upuptop
* <p>
* @qq: 337081267
* <p>
* @CSDN: http://blog.csdn.net/pyfysf
* <p>
* @cnblogs: http://www.cnblogs.com/upuptop
* <p>
* @blog: http://wintp.top
* <p>
* @email: pyfysf@163.com
* <p>
* @time: 2019/06/2019/6/5
* <p>
*/
public class ConsumerBolt extends BaseBasicBolt {
private static final Logger logger = LoggerFactory.getLogger(ConsumerBolt.class);
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//处理数据
String topic = tuple.getStringByField("topic");
logger.info("ConsumerBolt execute() 订阅的Kafka主题: " + topic);
String message = tuple.getStringByField("value");
logger.info("ConsumerBolt execute() 接受到的消息 " + message);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}