zoukankan      html  css  js  c++  java
  • storm-redis 详解

                                        多的不说,先来代码分析,再贴我自己写的代码。如果代码有错误,求更正。。

    导入两个关键包,其他项目需要的包,大家自己导入了,我pom下的包太多,不好一下扔上来。

    <dependency>  
                <groupId>org.apache.storm</groupId>  
                <artifactId>storm-redis</artifactId>  
                <version>${storm.version}</version>  
      </dependency>
     <dependency>
       	<groupId>redis.clients</groupId>
       	<artifactId>jedis</artifactId>
        	<version>2.9.0</version>
    </dependency>

    我是连接的linux上的redis,所以要对redis进行配置,不然会出现拒绝连接的错误。

    redis部署在linux时,java远程连接需要修改配置:
    修改redis.conf文件
    1.将bind 127.0.0.1加上注释,(#bind 127.0.0.1),允许出本机外的IP访问redis
    2.将protected-mode yes,修改为protected-mode no;不保护redis
    3.将daemonize no,修改为daemonize yes;允许redis服务后台运行
    修改防火墙端口号
    1.将redis默认的6379注册到防火墙中 
    /sbin/iptables -I INPUT -p tcp –dport 6379 -j ACCEPT
    2.保存防火墙端口号表 
    /etc/rc.d/init.d/iptables save
    3.重启防火墙 
    /etc/rc.d/init.d/iptables restart
    4.查看防火墙状态 
    /etc/rc.d/init.d/iptables status

    使用测试类连接下看能不能连同:

    import java.util.Iterator;
    import java.util.Set;
    import redis.clients.jedis.Jedis;
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:
     * @version 1.0.0 
     */
    public class RedisTest {
    
    	public static void main(String[]args){
    		
    		  //连接本地的 Redis 服务
            Jedis jedis = new Jedis("xxx.xx.xxx.xx");
            System.out.println("连接成功");
            //查看服务是否运行
            System.out.println("服务正在运行: "+jedis.ping());
            // 获取数据并输出
            Set<String> keys = jedis.keys("*"); 
            Iterator<String> it=keys.iterator() ;   
            while(it.hasNext()){   
                String key = it.next();   
                System.out.println(key);   
            }
    	}
    }

    准备就绪,先说说storm向redis写入:

            官方给的写入API:

    class WordCountStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "wordCount";
    
        public WordCountStoreMapper() {
            description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
    
        @Override
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }
    
        @Override
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("word");
        }
    
        @Override
        public String getValueFromTuple(ITuple tuple) {
            return tuple.getStringByField("count");
        }
    }
    //这里是用来new 一个新的bolt,在TopologyBuilder时调用操作
    JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                    .setHost(host).setPort(port).build();
    RedisStoreMapper storeMapper = new WordCountStoreMapper();
    RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

        我反正刚刚看的时候一脸懵逼,之后研究了很久才明白,下面贴我自己的代码:

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * @author cwc
     * @date 2018年5月29日  
     * @description:这是给的假的数据源
     * @version 1.0.0 
     */
    public class RedisWriteSpout extends BaseRichSpout {
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector spoutOutputCollector;  
        
        /**
         * 作为字段word输出
         */
        private static final Map<Integer, String> LASTNAME = new HashMap<Integer, String>();  
        static {  
            LASTNAME.put(0, "anderson");  
            LASTNAME.put(1, "watson");  
            LASTNAME.put(2, "ponting");  
            LASTNAME.put(3, "dravid");  
            LASTNAME.put(4, "lara");  
        }  
        /**
         * 作为字段myValues输出
         */
        private static final Map<Integer, String> COMPANYNAME = new HashMap<Integer, String>();  
        static {  
            COMPANYNAME.put(0, "abc");  
            COMPANYNAME.put(1, "dfg");  
            COMPANYNAME.put(2, "pqr");  
            COMPANYNAME.put(3, "ecd");  
            COMPANYNAME.put(4, "awe");  
        }  
      
        public void open(Map conf, TopologyContext context,  
                SpoutOutputCollector spoutOutputCollector) {  
            this.spoutOutputCollector = spoutOutputCollector;  
        }  
      
        public void nextTuple() {  
            final Random rand = new Random();  
            int randomNumber = rand.nextInt(5); 
            try {
    			Thread.sleep(100);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
            spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));  
            System.out.println("数据来袭!!!!!!");
        }  
      
        public void declareOutputFields(OutputFieldsDeclarer declarer) {  
            // emit the field site.  
            declarer.declare(new Fields("word","myValues"));  
        }  
    }
    
    import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.tuple.ITuple;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:
     * @version 1.0.0 
     */
    public class RedisWriteMapper implements RedisStoreMapper{
    	private static final long serialVersionUID = 1L;
    	private RedisDataTypeDescription description;
    	//这里的key是redis中的key
        private final String hashKey = "mykey";
    
        public RedisWriteMapper() {
            description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
        
    	@Override
    	public String getKeyFromTuple(ITuple ituple) {
    		//这个代表redis中,hash中的字段名
    		return ituple.getStringByField("word");
    	}
    
    	@Override
    	public String getValueFromTuple(ITuple ituple) {
    		//这个代表redis中,hash中的字段名对应的值
    		return ituple.getStringByField("myValues");
    	}
    
    	@Override
    	public RedisDataTypeDescription getDataTypeDescription() {
    		return description;
    	}
    
    	
    }

    storm读取redis数据:

            官方给的API:

    class WordCountRedisLookupMapper implements RedisLookupMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "wordCount";
    
        public WordCountRedisLookupMapper() {
            description = new RedisDataTypeDescription(
                    RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
    
        @Override
        public List<Values> toTuple(ITuple input, Object value) {
            String member = getKeyFromTuple(input);
            List<Values> values = Lists.newArrayList();
            values.add(new Values(member, value));
            return values;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("wordName", "count"));
        }
    
        @Override
        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }
    
        @Override
        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("word");
        }
    
        @Override
        public String getValueFromTuple(ITuple tuple) {
            return null;
        }
    }
    JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
            .setHost(host).setPort(port).build();
    RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
    RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);

        自己代码:

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:
     * @version 1.0.0 
     */
    public class RedisReadSpout extends BaseRichSpout {
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector spoutOutputCollector;  
        
        /**
         * 这是刚刚作为word写入的数据,要通过他获取我们存的值
         */
        private static final Map<Integer, String> LASTNAME = new HashMap<Integer, String>();  
        static {  
            LASTNAME.put(0, "anderson");  
            LASTNAME.put(1, "watson");  
            LASTNAME.put(2, "ponting");  
            LASTNAME.put(3, "dravid");  
            LASTNAME.put(4, "lara");  
        }  
          
        public void open(Map conf, TopologyContext context,  
                SpoutOutputCollector spoutOutputCollector) {  
            this.spoutOutputCollector = spoutOutputCollector;  
        }  
      
        public void nextTuple() {  
            final Random rand = new Random();  
            int randomNumber = rand.nextInt(5); 
            try {
    			Thread.sleep(100);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
            spoutOutputCollector.emit (new Values(LASTNAME.get(randomNumber)));  
            System.out.println("读数据来袭!!!!!!");
        }  
      
        public void declareOutputFields(OutputFieldsDeclarer declarer) {  
            // emit the field site.  
            declarer.declare(new Fields("word"));  
        }  
    }
    import java.util.List;
    
    import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
    import org.apache.storm.redis.common.mapper.RedisLookupMapper;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.ITuple;
    import org.apache.storm.tuple.Values;
    import com.google.common.collect.Lists;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:
     * @version 1.0.0 
     */
    public class RedisReadMapper implements RedisLookupMapper {
    	private static final long serialVersionUID = 1L;
    	//对redis的所支持的种类进行了初始化
    	private RedisDataTypeDescription description;
    	//你想要读取的hash表中的key,这里使用的是刚刚存储的key字段名
    	private final String hashKey="mykey";
    	 /**
         * redis中储存结构为hash hashKey为根key 然后在通过getKeyFromTuple 获得的key找到相对于的value  
         * key1-key2[]-value  key2中的每一个key对应一个value
         * lookupValue = jedisCommand.hget(additionalKey, key);
         */
    	public RedisReadMapper() {
            description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }
    	@Override
    	public String getKeyFromTuple(ITuple tuple) {
    		//获取传过来的字段名
    		return tuple.getStringByField("word");
    	}
    	@Override
    	public String getValueFromTuple(ITuple tuple) {
    		return null;
    	}
    	@Override
    	public RedisDataTypeDescription getDataTypeDescription() {
    		return description;
    	}
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		//从redis中hash通过上面的key下面找到制定的word中的字段名下的值,有点想hbase中row:cf:val一样
    		declarer.declare(new Fields("word","values"));
    	}
    	@Override
    	/**
    	 * 将拿到的数据装进集合并且返回
    	 */
    	public List<Values> toTuple(ITuple input, Object value) {
    		String member =getKeyFromTuple(input);
    		List<Values> values =Lists.newArrayList();
    		//将拿到的数据存进集合,下面时将两个值返回的,所以向下游传值时需要定义两个名字。
    		values.add(new Values(member,value));
    		return values;
    	}
    }
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:打印获取的数据
     * @version 1.0.0 
     */
    public class RedisOutBolt extends BaseRichBolt{
    
    	private OutputCollector collector;
    	@Override
    	public void execute(Tuple tuple) {
    //				String str =tuple.getString(0);
    				String strs =tuple.getString(1);
    				System.out.println(strs);
    				
    	}
    
    	@Override
    	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.collector=collector;
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("RedisOutBolt"));
    	}		
    }
    

      接下来是  RedisMain,测试读写方法:

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.redis.bolt.RedisLookupBolt;
    import org.apache.storm.redis.bolt.RedisStoreBolt;
    import org.apache.storm.redis.common.config.JedisPoolConfig;
    import org.apache.storm.redis.common.mapper.RedisLookupMapper;
    import org.apache.storm.redis.common.mapper.RedisStoreMapper;
    import org.apache.storm.topology.TopologyBuilder;
    
    public class RedisMain {
    	public static void main(String[] args) throws Exception {
    //		writeRedis();
    		readRedis();
    	}
        /**
    	 * 写redis
    	 */
    	public static void writeRedis(){
    		JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                    .setHost("xxx.xx.xx.xx").setPort(6379).build();
    		System.out.println("连接成功!!!");
    		RedisStoreMapper storeMapper = new RedisWriteMapper();
    		RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
    		
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("RedisWriteSpout", new RedisWriteSpout(), 2);
    		builder.setBolt("to-save", storeBolt, 1).shuffleGrouping("RedisWriteSpout");
    		
    		Config conf = new Config();
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("test", conf, builder.createTopology());
    		System.err.println("写入完成!!!!!");
    		try {
    			Thread.sleep(10000);
    			 //等待6s之后关闭集群
    		     cluster.killTopology("test");
    		     //关闭集群
    		     cluster.shutdown();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	} 
    	/**
    	 * 读redis
    	 */
    	public static void readRedis(){
    		JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
    		        .setHost("xxx.xx.xxx.xx").setPort(6379).build();
    		RedisLookupMapper lookupMapper = new RedisReadMapper();
    		RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
    		
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("RedisReadSpout-reader", new RedisReadSpout(), 2);
    		builder.setBolt("to-lookupBolt", lookupBolt, 1).shuffleGrouping("RedisReadSpout-reader");
    		builder.setBolt("to-out",new RedisOutBolt(), 1).shuffleGrouping("to-lookupBolt");
    		Config conf = new Config();
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("test", conf, builder.createTopology());
    		try {
    			Thread.sleep(100000);
    			 //等待6s之后关闭集群
    		     cluster.killTopology("test");
    		     //关闭集群
    		     cluster.shutdown();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
        
    }
        很多解释都写在了代码注解中,其中也有很多问题,在代码注释的地方放生的,认真看下代码,祝大家零BUG哦~~












    深夜码文不易,若对看官有帮助,望看官可以在右侧打赏。
  • 相关阅读:
    TransGAN
    Paper-About-GAN (3)
    VAE的Pytorch实现
    Lua_元表
    Kong网关1_service与route
    Win系统Notepad++中使用Json、xml格式化插件
    Linux下Konga的安装
    Linux下设置pgsql远程访问
    Linux中查看端口占用情况
    Mysql使用存储过程插入十万条数据
  • 原文地址:https://www.cnblogs.com/wanchen-chen/p/12934130.html
Copyright © 2011-2022 走看看