zoukankan      html  css  js  c++  java
  • CentOS7搭建Storm集群及基础操作

    前提

    安装Kafka前需要先安装zookeeper集群,集体安装方法请参照我的另一篇文档

    Storm安装

    下载
     wget https://mirrors.tuna.tsinghua.edu.cn/apache/storm/apache-storm-1.1.0/apache-storm-1.1.0.tar.gz
    解压
    tar -zxvf apache-storm-1.1.0.tar.gz
    移动文件夹
    mv apache-storm-1.1.0 /usr/local/hadoop/
    vim storm.yaml
    storm.zookeeper.servers:
         - "192.168.174.200"
         - "192.168.174.201"
    
    nimbus.seeds: ["192.168.174.200"]
    
    storm.local.dir: "/usr/local/hadoop/apache-storm-1.1.0/data"

    • storm.zookeeper.servers:表示zookeeper的集群地址,如果Zookeeper集群使用的不是默认端口,那么还需要配置storm.zookeeper.port
    • storm.zookeeper.port: Zookeeper集群的端口号
    • storm.local.dir: 用于配置Storm存储少量文件的路径
    • nimbus.seeds: 用于配置主控节点的地址,可以配置多个
    拷贝文件到其余工作节点
    scp apache-storm-1.1.0 salver1:/usr/local/hadoop/

    Storm操作

    启动主控节点
    ./storm nimbus 1>/dev/null 2>&1 &
    启动主控节点管理界面
    ./storm ui 1>/dev/null 2>&1 &  
    启动工作节点
    ./storm supervisor 1>/dev/null 2>&1 &
    访问地址

    http://127.0.0.1:8080

    运行拓扑
    ./storm jar storm-book.jar com.TopologyMain /usr/words.txt
    删除拓扑
    ./storm kill Getting-Started-Toplogie

    完整示例

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class MykafkaSpout {
    	/**
         * @param args
         * @throws AuthorizationException 
         */
        public static void main(String[] args) throws AuthorizationException {
            // TODO Auto-generated method stub
    
        	String host = "127.0.0.1";
        	int port = 6385;
            String topic = "test" ;
            ZkHosts zkHosts = new ZkHosts("192.168.174.200:2181,192.168.174.201:2181");
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, 
                    "", 
                    UUID.randomUUID().toString()) ;
            List<String> zkServers = new ArrayList<String>() ;
            zkServers.add("192.168.174.200");
            zkServers.add("192.168.174.201");
            
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = 2181;
            spoutConfig.socketTimeoutMs = 60 * 1000 ;
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ;
    
            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
            TopologyBuilder builder = new TopologyBuilder() ;
            builder.setSpout("spout", new KafkaSpout(spoutConfig) ,1) ;
            builder.setBolt("bolt1", new MyKafkaBolt(), 2).shuffleGrouping("spout") ;
            builder.setBolt("MyCountBolt", new MyCountBolt(), 2).fieldsGrouping("bolt1", new Fields("type"));
            // 将所有单词及其次数进行汇总输出
            builder.setBolt("MyReportBolt", new MyReportBolt(), 2).globalGrouping("MyCountBolt");
            
            JedisPoolConfig poolConfig = new JedisPoolConfig.Builder().setHost(host).setPort(port).setPassword("Apple05101314").build();
            RedisStoreMapper storeMapper = new MyCountStoreMapper();
            RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
            //向redis保存数据
            builder.setBolt("redis-store-bolt", storeBolt).globalGrouping("MyReportBolt");
            
            Config conf = new Config ();
            conf.setDebug(false) ;
    
            if (args.length > 0) {
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            }else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, builder.createTopology());
            }
        }
    }
    
    package com;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class MyKafkaBolt extends BaseRichBolt {
    
    	private OutputCollector outputCollector;
    
    	// key:messageId,Data
        private HashMap<String, String> waitAck = new HashMap<String, String>();
        
    	public void prepare(Map map, TopologyContext context,
    			OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.outputCollector = collector;
    	}
    
    	public void execute(Tuple input) {
    		// TODO Auto-generated method stub
    		String kafkaMsg = input.getString(0);
    		if(kafkaMsg!=null){
    			this.outputCollector.emit(new Values(kafkaMsg));
    			this.outputCollector.ack(input); 
    		}	
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("type"));
    	}
    
    }
    package com;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class MyCountBolt extends BaseRichBolt {
    
    	private OutputCollector outputCollector;
    	private HashMap<String, Integer> count;
    	public void prepare(Map stormConf, TopologyContext context,
    			OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.outputCollector = collector;
    		this.count = new HashMap<String, Integer>();
    	}
    
    	public void execute(Tuple input) {
    		// TODO Auto-generated method stub
    		String type = input.getStringByField("type");
    		int cnt = 1;
            if(count.containsKey(type)){
                cnt = count.get(type) + 1;
            }
            count.put(type, cnt);
            this.outputCollector.emit(new Values(type, cnt));
            this.outputCollector.ack(input); 
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("type", "cnt"));
    	}
    
    }
    package com;
    
    import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.tuple.ITuple;
    
    public class MyCountStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "myCount";
    
        public MyCountStoreMapper() {
            description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
    
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }
    
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("zs");
        }
    
        public String getValueFromTuple(ITuple tuple) {
            return tuple.getIntegerByField("cnt")+"";
        }
    }
    
    package com;
    
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.log4j.Logger;
    
    /**
     * Created by gzx on 17-2-6.
     */
    public class MyReportBolt extends BaseRichBolt {
    
    	private static Logger logger = Logger.getLogger(MyReportBolt.class);
    	private OutputCollector outputCollector;
    	private HashMap<String, Integer> count;
    
    	public void prepare(Map map, TopologyContext topologyContext,
    			OutputCollector collector) {
    		this.count = new HashMap<String, Integer>();
    		this.outputCollector = collector;
    	}
    
    	/**
    	 * 打印单词及其出现次数
    	 * 
    	 * @param tuple
    	 */
    	public void execute(Tuple tuple) {
    		String type = tuple.getStringByField("type");
    		int cnt = tuple.getIntegerByField("cnt");
    		
    		count.put(type, cnt);
    		if (count.containsKey("join") && count.containsKey("out")) {
    			int join = count.get("join");
    			int out = count.get("out");
    			int sy = join-out;
    			System.out.println("join=" + join);
    			System.out.println("out=" + out);
    			//System.out.printf("===当前剩余总数==="+sy+"
    ");
    			logger.debug("===当前剩余总数==="+sy);
    			this.outputCollector.emit(new Values("zs", sy));
    	        this.outputCollector.ack(tuple); 
    		}
    		
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("zs", "cnt"));
    	}
    }





  • 相关阅读:
    目前加尼福尼亚自动驾驶公司测试公司————20150529
    DDR3
    Linux mysql 5.7: ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)
    macOS 10.12,解决如何打开隐私中的任何来源方法
    git查看某个文件的提交历史
    ios-deploy命令
    sed简用
    啊,栈溢出了
    二叉树题目总结(一)
    线段树(二)
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472445.html
Copyright © 2011-2022 走看看