zoukankan      html  css  js  c++  java
  • CentOS7搭建Storm集群及基础操作

    前提

    安装Kafka前需要先安装zookeeper集群,集体安装方法请参照我的另一篇文档

    Storm安装

    下载
     wget https://mirrors.tuna.tsinghua.edu.cn/apache/storm/apache-storm-1.1.0/apache-storm-1.1.0.tar.gz
    解压
    tar -zxvf apache-storm-1.1.0.tar.gz
    移动文件夹
    mv apache-storm-1.1.0 /usr/local/hadoop/
    vim storm.yaml
    storm.zookeeper.servers:
         - "192.168.174.200"
         - "192.168.174.201"
    
    nimbus.seeds: ["192.168.174.200"]
    
    storm.local.dir: "/usr/local/hadoop/apache-storm-1.1.0/data"

    • storm.zookeeper.servers:表示zookeeper的集群地址,如果Zookeeper集群使用的不是默认端口,那么还需要配置storm.zookeeper.port
    • storm.zookeeper.port: Zookeeper集群的端口号
    • storm.local.dir: 用于配置Storm存储少量文件的路径
    • nimbus.seeds: 用于配置主控节点的地址,可以配置多个
    拷贝文件到其余工作节点
    scp apache-storm-1.1.0 salver1:/usr/local/hadoop/

    Storm操作

    启动主控节点
    ./storm nimbus 1>/dev/null 2>&1 &
    启动主控节点管理界面
    ./storm ui 1>/dev/null 2>&1 &  
    启动工作节点
    ./storm supervisor 1>/dev/null 2>&1 &
    访问地址

    http://127.0.0.1:8080

    运行拓扑
    ./storm jar storm-book.jar com.TopologyMain /usr/words.txt
    删除拓扑
    ./storm kill Getting-Started-Toplogie

    完整示例

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class MykafkaSpout {
    	/**
         * @param args
         * @throws AuthorizationException 
         */
        public static void main(String[] args) throws AuthorizationException {
            // TODO Auto-generated method stub
    
        	String host = "127.0.0.1";
        	int port = 6385;
            String topic = "test" ;
            ZkHosts zkHosts = new ZkHosts("192.168.174.200:2181,192.168.174.201:2181");
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, 
                    "", 
                    UUID.randomUUID().toString()) ;
            List<String> zkServers = new ArrayList<String>() ;
            zkServers.add("192.168.174.200");
            zkServers.add("192.168.174.201");
            
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = 2181;
            spoutConfig.socketTimeoutMs = 60 * 1000 ;
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ;
    
            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
            TopologyBuilder builder = new TopologyBuilder() ;
            builder.setSpout("spout", new KafkaSpout(spoutConfig) ,1) ;
            builder.setBolt("bolt1", new MyKafkaBolt(), 2).shuffleGrouping("spout") ;
            builder.setBolt("MyCountBolt", new MyCountBolt(), 2).fieldsGrouping("bolt1", new Fields("type"));
            // 将所有单词及其次数进行汇总输出
            builder.setBolt("MyReportBolt", new MyReportBolt(), 2).globalGrouping("MyCountBolt");
            
            JedisPoolConfig poolConfig = new JedisPoolConfig.Builder().setHost(host).setPort(port).setPassword("Apple05101314").build();
            RedisStoreMapper storeMapper = new MyCountStoreMapper();
            RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
            //向redis保存数据
            builder.setBolt("redis-store-bolt", storeBolt).globalGrouping("MyReportBolt");
            
            Config conf = new Config ();
            conf.setDebug(false) ;
    
            if (args.length > 0) {
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            }else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, builder.createTopology());
            }
        }
    }
    
    package com;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class MyKafkaBolt extends BaseRichBolt {
    
    	private OutputCollector outputCollector;
    
    	// key:messageId,Data
        private HashMap<String, String> waitAck = new HashMap<String, String>();
        
    	public void prepare(Map map, TopologyContext context,
    			OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.outputCollector = collector;
    	}
    
    	public void execute(Tuple input) {
    		// TODO Auto-generated method stub
    		String kafkaMsg = input.getString(0);
    		if(kafkaMsg!=null){
    			this.outputCollector.emit(new Values(kafkaMsg));
    			this.outputCollector.ack(input); 
    		}	
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("type"));
    	}
    
    }
    package com;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class MyCountBolt extends BaseRichBolt {
    
    	private OutputCollector outputCollector;
    	private HashMap<String, Integer> count;
    	public void prepare(Map stormConf, TopologyContext context,
    			OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.outputCollector = collector;
    		this.count = new HashMap<String, Integer>();
    	}
    
    	public void execute(Tuple input) {
    		// TODO Auto-generated method stub
    		String type = input.getStringByField("type");
    		int cnt = 1;
            if(count.containsKey(type)){
                cnt = count.get(type) + 1;
            }
            count.put(type, cnt);
            this.outputCollector.emit(new Values(type, cnt));
            this.outputCollector.ack(input); 
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("type", "cnt"));
    	}
    
    }
    package com;
    
    import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.tuple.ITuple;
    
    public class MyCountStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "myCount";
    
        public MyCountStoreMapper() {
            description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
    
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }
    
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("zs");
        }
    
        public String getValueFromTuple(ITuple tuple) {
            return tuple.getIntegerByField("cnt")+"";
        }
    }
    
    package com;
    
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.log4j.Logger;
    
    /**
     * Created by gzx on 17-2-6.
     */
    public class MyReportBolt extends BaseRichBolt {
    
    	private static Logger logger = Logger.getLogger(MyReportBolt.class);
    	private OutputCollector outputCollector;
    	private HashMap<String, Integer> count;
    
    	public void prepare(Map map, TopologyContext topologyContext,
    			OutputCollector collector) {
    		this.count = new HashMap<String, Integer>();
    		this.outputCollector = collector;
    	}
    
    	/**
    	 * 打印单词及其出现次数
    	 * 
    	 * @param tuple
    	 */
    	public void execute(Tuple tuple) {
    		String type = tuple.getStringByField("type");
    		int cnt = tuple.getIntegerByField("cnt");
    		
    		count.put(type, cnt);
    		if (count.containsKey("join") && count.containsKey("out")) {
    			int join = count.get("join");
    			int out = count.get("out");
    			int sy = join-out;
    			System.out.println("join=" + join);
    			System.out.println("out=" + out);
    			//System.out.printf("===当前剩余总数==="+sy+"
    ");
    			logger.debug("===当前剩余总数==="+sy);
    			this.outputCollector.emit(new Values("zs", sy));
    	        this.outputCollector.ack(tuple); 
    		}
    		
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("zs", "cnt"));
    	}
    }





  • 相关阅读:
    Chandy-Lamport_algorithm
    3 differences between Savepoints and Checkpoints in Apache Flink
    列数 行数 表数 限制
    数据收集、传输、元数据管理、作业流调度、海量数据查询引擎、数据可视化
    分析云负载均衡产品
    端口被占用通过域名的处理 把www.domain.com均衡到本机不同的端口 反向代理 隐藏端口 Nginx做非80端口转发 搭建nginx反向代理用做内网域名转发 location 规则
    JSON Web Token
    查看开启端口的应用
    If the parts of an organization (e.g., teams, departments, or subdivisions) do not closely reflect the essential parts of the product, or if the relationship between organizations do not reflect the r
    微服务架构的理论基础
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472445.html
Copyright © 2011-2022 走看看