整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。
1、配置Maven依赖包
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<!-- mysql maven相关依赖 -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jdbc</artifactId>
<version>1.0.5</version>
</dependency>
<!-- druid数据源 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.27</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<!-- mysql maven相关依赖 -->
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-jdbc</artifactId>
<version>1.0.5</version>
</dependency>
<!-- druid数据源 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.27</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
2、编写Storm程序
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordSplitBolt implements IRichBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void prepare(Map mapConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String line = tuple.getString(0);
String[] arr = line.split(" ");
for (String s : arr) {
collector.emit(new Values(s,1));
}
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
public void cleanup() {
// TODO Auto-generated method stub
}
}
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordSplitBolt implements IRichBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void prepare(Map mapConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String line = tuple.getString(0);
String[] arr = line.split(" ");
for (String s : arr) {
collector.emit(new Values(s,1));
}
collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
public void cleanup() {
// TODO Auto-generated method stub
}
}
3 编写MyDbUtils工具类
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;
public class MyDbUtils {
private static String className = "com.mysql.jdbc.Driver";
private static QueryRunner queryRunner = new QueryRunner();
public static final String INSERT_LOG = "INSERT INTO "+JdbcUtils.getTable_name()+"(word,count) VALUES (?,?)";
static{
try {
Class.forName(className);
} catch (Exception e) {
e.printStackTrace();
}
}
static void update(String sql,Object... params) throws SQLException {
Connection connection = getConnection();
//更新数据
queryRunner.update(connection,sql, params);
connection.close();
}
public static List<String> executeQuerySql(String sql){
List<String> result = new ArrayList<String>();
try {
List<Object[]> requstList = queryRunner.query(getConnection(), sql,
new ArrayListHandler(new BasicRowProcessor() {
@Override
public <Object> List<Object> toBeanList(ResultSet rs,
Class<Object> type) throws SQLException {
return super.toBeanList(rs, type);
}
}));
for (Object[] objects : requstList) {
result.add(objects[0].toString());
}
} catch (SQLException e) {
e.printStackTrace();
}
return result;
}
private static Connection getConnection() throws SQLException {
//获取mysql连接
return DriverManager.getConnection(JdbcUtils.getUrl(),JdbcUtils.getUser(),JdbcUtils.getPassword());
}
}
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.dbutils.BasicRowProcessor;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;
public class MyDbUtils {
private static String className = "com.mysql.jdbc.Driver";
private static QueryRunner queryRunner = new QueryRunner();
public static final String INSERT_LOG = "INSERT INTO "+JdbcUtils.getTable_name()+"(word,count) VALUES (?,?)";
static{
try {
Class.forName(className);
} catch (Exception e) {
e.printStackTrace();
}
}
static void update(String sql,Object... params) throws SQLException {
Connection connection = getConnection();
//更新数据
queryRunner.update(connection,sql, params);
connection.close();
}
public static List<String> executeQuerySql(String sql){
List<String> result = new ArrayList<String>();
try {
List<Object[]> requstList = queryRunner.query(getConnection(), sql,
new ArrayListHandler(new BasicRowProcessor() {
@Override
public <Object> List<Object> toBeanList(ResultSet rs,
Class<Object> type) throws SQLException {
return super.toBeanList(rs, type);
}
}));
for (Object[] objects : requstList) {
result.add(objects[0].toString());
}
} catch (SQLException e) {
e.printStackTrace();
}
return result;
}
private static Connection getConnection() throws SQLException {
//获取mysql连接
return DriverManager.getConnection(JdbcUtils.getUrl(),JdbcUtils.getUser(),JdbcUtils.getPassword());
}
}
4.编写JdbcUtils程序
public class JdbcUtils {
private static String url;
private static String user;
private static String password;
private static String table_name;
public static String getTable_name() {
return table_name;
}
public static void setTable_name(String table_name) {
JdbcUtils.table_name = table_name;
}
public static String getUrl() {
return url;
}
public static void setUrl(String url) {
JdbcUtils.url = url;
}
public static String getUser() {
return user;
}
public static void setUser(String user) {
JdbcUtils.user = user;
}
public static String getPassword() {
return password;
}
public static void setPassword(String password) {
JdbcUtils.password = password;
}
public JdbcUtils(String url, String user, String password,String table_name) {
this.url = url;
this.user = user;
this.password = password;
this.table_name = table_name;
}
}
private static String url;
private static String user;
private static String password;
private static String table_name;
public static String getTable_name() {
return table_name;
}
public static void setTable_name(String table_name) {
JdbcUtils.table_name = table_name;
}
public static String getUrl() {
return url;
}
public static void setUrl(String url) {
JdbcUtils.url = url;
}
public static String getUser() {
return user;
}
public static void setUser(String user) {
JdbcUtils.user = user;
}
public static String getPassword() {
return password;
}
public static void setPassword(String password) {
JdbcUtils.password = password;
}
public JdbcUtils(String url, String user, String password,String table_name) {
this.url = url;
this.user = user;
this.password = password;
this.table_name = table_name;
}
}
5.编写Mytopology程序
import java.util.UUID;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class Mytopology {
public void Mytopologys(String zkConnString,String topic,String url,String user,String password,String table_name) throws InterruptedException{
JdbcUtils ju = new JdbcUtils(url,user,password,table_name);
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new WordSplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter", new WordCountBolt()).fieldsGrouping("word-spitter", new Fields("word"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(15000);
cluster.shutdown();
}
}
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class Mytopology {
public void Mytopologys(String zkConnString,String topic,String url,String user,String password,String table_name) throws InterruptedException{
JdbcUtils ju = new JdbcUtils(url,user,password,table_name);
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,UUID.randomUUID().toString());
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("word-spitter", new WordSplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter", new WordCountBolt()).fieldsGrouping("word-spitter", new Fields("word"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(15000);
cluster.shutdown();
}
}
6.最后测试程序是否能实现
注意:运行此程序时先启动flume,配置好kafka
public class Test1 {
public static void main(String[] args) throws Exception {
Mytopology mp = new Mytopology();
mp.Mytopologys("zkConnString", "topic","数据库的url","数据库user","数据库password","tableName");
}
}
public static void main(String[] args) throws Exception {
Mytopology mp = new Mytopology();
mp.Mytopologys("zkConnString", "topic","数据库的url","数据库user","数据库password","tableName");
}
}