zoukankan      html  css  js  c++  java
  • CentOS7搭建Storm集群及基础操作

    前提

    安装Kafka前需要先安装zookeeper集群,集体安装方法请参照我的另一篇文档

    Storm安装

    下载
     wget https://mirrors.tuna.tsinghua.edu.cn/apache/storm/apache-storm-1.1.0/apache-storm-1.1.0.tar.gz
    解压
    tar -zxvf apache-storm-1.1.0.tar.gz
    移动文件夹
    mv apache-storm-1.1.0 /usr/local/hadoop/
    vim storm.yaml
    storm.zookeeper.servers:
         - "192.168.174.200"
         - "192.168.174.201"
    
    nimbus.seeds: ["192.168.174.200"]
    
    storm.local.dir: "/usr/local/hadoop/apache-storm-1.1.0/data"

    • storm.zookeeper.servers:表示zookeeper的集群地址,如果Zookeeper集群使用的不是默认端口,那么还需要配置storm.zookeeper.port
    • storm.zookeeper.port: Zookeeper集群的端口号
    • storm.local.dir: 用于配置Storm存储少量文件的路径
    • nimbus.seeds: 用于配置主控节点的地址,可以配置多个
    拷贝文件到其余工作节点
    scp apache-storm-1.1.0 salver1:/usr/local/hadoop/

    Storm操作

    启动主控节点
    ./storm nimbus 1>/dev/null 2>&1 &
    启动主控节点管理界面
    ./storm ui 1>/dev/null 2>&1 &  
    启动工作节点
    ./storm supervisor 1>/dev/null 2>&1 &
    访问地址

    http://127.0.0.1:8080

    运行拓扑
    ./storm jar storm-book.jar com.TopologyMain /usr/words.txt
    删除拓扑
    ./storm kill Getting-Started-Toplogie

    完整示例

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class MykafkaSpout {
    	/**
         * @param args
         * @throws AuthorizationException 
         */
        public static void main(String[] args) throws AuthorizationException {
            // TODO Auto-generated method stub
    
        	String host = "127.0.0.1";
        	int port = 6385;
            String topic = "test" ;
            ZkHosts zkHosts = new ZkHosts("192.168.174.200:2181,192.168.174.201:2181");
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, 
                    "", 
                    UUID.randomUUID().toString()) ;
            List<String> zkServers = new ArrayList<String>() ;
            zkServers.add("192.168.174.200");
            zkServers.add("192.168.174.201");
            
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = 2181;
            spoutConfig.socketTimeoutMs = 60 * 1000 ;
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ;
    
            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
            TopologyBuilder builder = new TopologyBuilder() ;
            builder.setSpout("spout", new KafkaSpout(spoutConfig) ,1) ;
            builder.setBolt("bolt1", new MyKafkaBolt(), 2).shuffleGrouping("spout") ;
            builder.setBolt("MyCountBolt", new MyCountBolt(), 2).fieldsGrouping("bolt1", new Fields("type"));
            // 将所有单词及其次数进行汇总输出
            builder.setBolt("MyReportBolt", new MyReportBolt(), 2).globalGrouping("MyCountBolt");
            
            JedisPoolConfig poolConfig = new JedisPoolConfig.Builder().setHost(host).setPort(port).setPassword("Apple05101314").build();
            RedisStoreMapper storeMapper = new MyCountStoreMapper();
            RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
            //向redis保存数据
            builder.setBolt("redis-store-bolt", storeBolt).globalGrouping("MyReportBolt");
            
            Config conf = new Config ();
            conf.setDebug(false) ;
    
            if (args.length > 0) {
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            }else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, builder.createTopology());
            }
        }
    }
    
    package com;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class MyKafkaBolt extends BaseRichBolt {
    
    	private OutputCollector outputCollector;
    
    	// key:messageId,Data
        private HashMap<String, String> waitAck = new HashMap<String, String>();
        
    	public void prepare(Map map, TopologyContext context,
    			OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.outputCollector = collector;
    	}
    
    	public void execute(Tuple input) {
    		// TODO Auto-generated method stub
    		String kafkaMsg = input.getString(0);
    		if(kafkaMsg!=null){
    			this.outputCollector.emit(new Values(kafkaMsg));
    			this.outputCollector.ack(input); 
    		}	
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("type"));
    	}
    
    }
    package com;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class MyCountBolt extends BaseRichBolt {
    
    	private OutputCollector outputCollector;
    	private HashMap<String, Integer> count;
    	public void prepare(Map stormConf, TopologyContext context,
    			OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.outputCollector = collector;
    		this.count = new HashMap<String, Integer>();
    	}
    
    	public void execute(Tuple input) {
    		// TODO Auto-generated method stub
    		String type = input.getStringByField("type");
    		int cnt = 1;
            if(count.containsKey(type)){
                cnt = count.get(type) + 1;
            }
            count.put(type, cnt);
            this.outputCollector.emit(new Values(type, cnt));
            this.outputCollector.ack(input); 
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("type", "cnt"));
    	}
    
    }
    package com;
    
    import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.tuple.ITuple;
    
    public class MyCountStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "myCount";
    
        public MyCountStoreMapper() {
            description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
    
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }
    
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("zs");
        }
    
        public String getValueFromTuple(ITuple tuple) {
            return tuple.getIntegerByField("cnt")+"";
        }
    }
    
    package com;
    
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.log4j.Logger;
    
    /**
     * Created by gzx on 17-2-6.
     */
    public class MyReportBolt extends BaseRichBolt {
    
    	private static Logger logger = Logger.getLogger(MyReportBolt.class);
    	private OutputCollector outputCollector;
    	private HashMap<String, Integer> count;
    
    	public void prepare(Map map, TopologyContext topologyContext,
    			OutputCollector collector) {
    		this.count = new HashMap<String, Integer>();
    		this.outputCollector = collector;
    	}
    
    	/**
    	 * 打印单词及其出现次数
    	 * 
    	 * @param tuple
    	 */
    	public void execute(Tuple tuple) {
    		String type = tuple.getStringByField("type");
    		int cnt = tuple.getIntegerByField("cnt");
    		
    		count.put(type, cnt);
    		if (count.containsKey("join") && count.containsKey("out")) {
    			int join = count.get("join");
    			int out = count.get("out");
    			int sy = join-out;
    			System.out.println("join=" + join);
    			System.out.println("out=" + out);
    			//System.out.printf("===当前剩余总数==="+sy+"
    ");
    			logger.debug("===当前剩余总数==="+sy);
    			this.outputCollector.emit(new Values("zs", sy));
    	        this.outputCollector.ack(tuple); 
    		}
    		
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("zs", "cnt"));
    	}
    }





  • 相关阅读:
    安装lnmp 时如何修改数据库数据存储地址及默认访问地址
    ubuntu 设置root用户密码并实现root用户登录
    解决ubuntu 远程连接问题
    linux 搭建FTP服务器
    PHP 根据ip获取对应的实际地址
    如何发布自己的composer包
    使用composer安装composer包报Your requirements could not be resolved to an installable set of packages
    laravel 框架配置404等异常页面
    使用Xshell登录linux服务器报WARNING! The remote SSH server rejected X11 forwarding request
    IoTSharp 已支持国产松果时序数据库PinusDB
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472445.html
Copyright © 2011-2022 走看看