zoukankan      html  css  js  c++  java
  • strom_hdfs与Sequence详解

    这片博客主要是讲解storm-hdfs,Squence及它们的trident方法使用,不多说上代码:

    pom.xml

            <dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-client</artifactId>
    			<version>2.7.3</version>
    			<exclusions>
    				<exclusion>
    					<groupId>org.slf4j</groupId>
    					<artifactId>slf4j-log4j12</artifactId>
    				</exclusion>
    			</exclusions>
    		</dependency>
            <dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-hdfs</artifactId>
    			<version>2.7.3</version>
    			<exclusions>
    				<exclusion>
    					<artifactId>asm</artifactId>
    					<groupId>asm</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-common</artifactId>
    			<version>2.7.3</version>
    		</dependency>
            <dependency>
    			<groupId>org.apache.storm</groupId>
    			<artifactId>storm-core</artifactId>
    			<version>1.1.2</version>
    		</dependency>
            

    我pom.xml中导的包比较多,这上面我是复制出来的相关包,可能会有遗漏,不过我每个类上的导包都会有不必担心,

    HDFS:

    HdfsSpout:获取hdfs中的数据

    
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.ArrayList;
    import java.util.Map;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * @author cwc
     * @date 2018年5月25日  
     * @description: 接收hdfs的spout
     * @version 1.0.0 
     */
    public class HdfsReadSpout  extends BaseRichSpout{
    	private static final long serialVersionUID = 1L;
    	
    	private SpoutOutputCollector collector;
    	private String url="/testData/mytest/app_hdfs-bolt-2-0-1528096963805.log";
    	private ArrayList<String> arrayList;
    	
    	@Override
    	public void nextTuple() {
    		hdfsData(url);
    	}
    
    	@Override
    	public void open(Map conf, TopologyContext arg1, SpoutOutputCollector collector) {
    		this.collector =collector;
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("lines"));
    	}
    
    	private void hdfsData(String FileName){
    		System.out.println("开始拿数据");
    		Configuration conf=new Configuration();
    		conf.set("fs.defaultFS","hdfs://172.18.130.100:8020");
    		 BufferedReader in = null;
    		 FSDataInputStream dis;
    		 String line;
    		try {
    			FileSystem hdfs = FileSystem.get(conf);
    			dis = hdfs.open(new Path(FileName));
    			in = new BufferedReader(new InputStreamReader(dis, "UTF-8"));
    			while ((line = in.readLine()) != null){
    //				System.out.println("拿到的数据为"+line);
    				this.collector.emit(new Values(line));
    			}
    		} catch (IllegalArgumentException e) {
    			e.printStackTrace();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    
    

    storm-hdfs主要演示类:

    
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.hdfs.bolt.HdfsBolt;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.RecordFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.hdfs.spout.HdfsSpout;
    import org.apache.storm.hdfs.spout.TextFileReader;
    import org.apache.storm.topology.TopologyBuilder;
    import com.sunsheen.jfids.bigdata.storm.demo.count.TestSpout;
    /**
     * 
    * @ClassName: HDFSMain   
    * @Description: storm数据写入读取hdfs中 
    * @author cwc   
    * @date 2018年6月4日 下午3:28:12   
    * @version 2.0.0  二次改进版
     */
    public class HDFSMain{
    	
    	public static void main(String[] args) {
    		writeHdfs(args);
    //		readHdfs(args);
    //		selectHdfs(args);
    	}
    	
    	/**
    	 * storm将数据写入Hdfs 
    	 * @param args 传入的参数
    	 */
    	public static void writeHdfs(String[] args){
    		//Configure HDFS bolt
    		RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("	"); // 输出字段分隔符
    		SyncPolicy syncPolicy = new CountSyncPolicy(1000);// 每1000个tuple同步到HDFS一次
    		FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
    		FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    			       .withPath("/testData/mytest/").withPrefix("app_").withExtension(".log"); //目录名,文件名
    		HdfsBolt hdfsBolt = new HdfsBolt()
    				 .withFsUrl("hdfs://172.18.130.100:8020")
    				 .withFileNameFormat(fileNameFormat)
    				 .withRecordFormat(format)
    				 .withRotationPolicy(rotationPolicy)
    				 .withSyncPolicy(syncPolicy);
    		// configure & build topology
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("kafka-write", new TestSpout(), 5);
    		builder.setBolt("hdfs-write", hdfsBolt, 2).shuffleGrouping("kafka-write");
    		// submit topology
    		Config conf = new Config();
    		String name = HDFSMain.class.getSimpleName();
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		conf.setNumWorkers(3);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		} else {
    		conf.setMaxTaskParallelism(3);
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    			Thread.sleep(100000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		cluster.shutdown();
    		}
    	}
    	/**
    	 * 读取Hdfs 
    	 * @param args 传入的参数
    	 */
    	public static void readHdfs(String[] args){
    		// configure & build topology
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("hdfs-reader", new HdfsReadSpout(), 3);
    		builder.setBolt("hdfs-read", new HDFSReadBolt(), 2).shuffleGrouping("hdfs-reader");
    		// submit topology
    		Config conf = new Config();
    		String name = HDFSMain.class.getSimpleName();
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		conf.setNumWorkers(3);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		} else {
    		conf.setMaxTaskParallelism(3);
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    			Thread.sleep(100000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		cluster.shutdown();
    		}
    	}
    	/**
    	 * 这个算是高级API了吧,是直接读取一个目录的hdfs文件。。。批量读取
    	 * @param args
    	 */
    	public static void selectHdfs(String[] args){
    		// Instantiate spout to read text files
    		HdfsSpout textReaderSpout = new HdfsSpout().setReaderType("text")
    		                                          .withOutputFields(TextFileReader.defaultFields)                                      
    		                                          .setHdfsUri("hdfs://172.18.130.100:8020")  // url
    		                                          .setSourceDir("/testData/mytest")          // 要读取的目录                                      
    		                                          .setArchiveDir("/testData/mytest/done")    // 处理完成后移动到该目录
    		                                          .setBadFilesDir("/testData/mytest/badfiles");     // 读取异常时移动的目录                                     
    		// Create topology
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("hdfsspout", textReaderSpout, 2);
    		builder.setBolt("hdfs-read", new HDFSReadBolt(), 2).shuffleGrouping("hdfsspout");
    		// submit topology
    				Config conf = new Config();
    				String name = HDFSMain.class.getSimpleName();
    				if (args != null && args.length > 0) {
    				String nimbus = args[0];
    				conf.put(Config.NIMBUS_HOST, nimbus);
    				conf.setNumWorkers(3);
    				try {
    					StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    				} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    				} else {
    				conf.setMaxTaskParallelism(3);
    				LocalCluster cluster = new LocalCluster();
    				cluster.submitTopology(name, conf, builder.createTopology());
    				try {
    					Thread.sleep(100000);
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    				cluster.shutdown();
    				}
    			}
    }
    

    Trident高级接口:大家看这片之前先去看看trident与普通的方法有什么区别,我们为什么要用这个接口,勤于思考

    package com.sunsheen.jfids.bigdata.storm.demo.hdfs.trident;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.hdfs.trident.HdfsState;
    import org.apache.storm.hdfs.trident.HdfsStateFactory;
    import org.apache.storm.hdfs.trident.HdfsUpdater;
    import org.apache.storm.hdfs.trident.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.trident.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.trident.format.FileNameFormat;
    import org.apache.storm.hdfs.trident.format.RecordFormat;
    import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentState;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.state.StateFactory;
    import org.apache.storm.tuple.Fields;
    
    import com.sunsheen.jfids.bigdata.storm.demo.count.TestSpout;
    import com.sunsheen.jfids.bigdata.storm.demo.hdfs.HDFSMain;
    
    /**
     * @author cwc
     * @date 2018年6月4日  
     * @description: hdfs Trident 写入接口
     * @version 1.0.0 
     */
    public  class  HdfsTridents {
    	
    	public static void main(String[] args){
    		
    		Config conf = new Config();
    		conf.setMaxSpoutPending(3);
    		String name = HDFSMain.class.getSimpleName();
    		if (args != null && args.length > 0) { 
    			//服务器
    			try {
    				StormSubmitter.submitTopology(args[1], conf, buildTopology());
    			} catch (AlreadyAliveException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InvalidTopologyException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (AuthorizationException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			} else { 
    				//本地
    		LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, buildTopology());
            try {
    			Thread.sleep(100000);
    			cluster.shutdown();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		}
    	}
    	/**
    	 * 获取StormTopology
    	 * @return
    	 */
    	public static StormTopology buildTopology(){
    		//注意写入之后发现文件中是空数据,应该是Spout端传值或者分割符有问题
    		TestSpout spout = new TestSpout();
    		TridentTopology topology = new TridentTopology();
    		Stream stream = topology.newStream("stream", spout);//创建stream
             Fields hdfsFields = new Fields("line","values"); //拿取spout数据
             
             FileNameFormat fileNameFormat = new DefaultFileNameFormat()
              .withPrefix("trident")//文件前缀
              .withExtension(".txt")//文件后缀
              .withPath("/testData/mytest/trident/");//文件目录
             
             RecordFormat recordFormat = new DelimitedRecordFormat()
              .withFieldDelimiter("/t")//分割符
              .withFields(hdfsFields);
             
             FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);//设置一个大小条件,到达后就写入
             
             HdfsState.Options options = new HdfsState.HdfsFileOptions()
              .withFileNameFormat(fileNameFormat)
              .withRecordFormat(recordFormat)
              .withRotationPolicy(rotationPolicy)
              .withFsUrl("hdfs://172.18.130.100:8020");//主机名
                  
             StateFactory factory = new HdfsStateFactory().withOptions(options);//创建工厂
             TridentState state = stream
            		 .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());//这步是写入最重要的一步
    		return topology.build();
    	}
    }
    

    Sequence模块:

    spout类:

    
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.SequenceFile.Reader;
    import org.apache.hadoop.util.ReflectionUtils;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    
    /**
     * @author cwc
     * @date 2018年7月24日  
     * @version 1.0.0 
     * @description:读取sequence数据
     */
    public class SquenceSpout extends BaseRichSpout {
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector collector;
    	private String fileUrl ="hdfs://172.18.130.100:8020/testData/mytest/trident/Topology-1-0-1532415869703.seq";
    	
    	/**
    	 * 有参
    	 * @param fileUrl
    	 */
    	public SquenceSpout(String fileUrl){
    		this.fileUrl=fileUrl;//有参可以在调用时直接传参
    	}
    	
    	@Override
    	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.collector =collector;
    	}
    
    	@Override
    	public void nextTuple() {
    		// TODO Auto-generated method stub
    		getSquence();
    	}
    
    	/**
    	 * 获取Sequence文件数据
    	 */
    	public void getSquence(){
    		Configuration conf = new Configuration();
    		Path path = new Path(this.fileUrl);
    		SequenceFile.Reader.Option option1 = Reader.file(path);
    //		SequenceFile.Reader.Option option2 = Reader.length(30);//这个参数表示读取的长度,不知道长度就别开,开了报错
    		SequenceFile.Reader reader = null;
    		try {
    			reader = new SequenceFile.Reader(conf,option1);
    			Writable key = (Writable) ReflectionUtils.newInstance(
    					reader.getKeyClass(), conf);
    			Writable value = (Writable) ReflectionUtils.newInstance(
    					reader.getValueClass(), conf);
    			long position = reader.getPosition();
    			while (reader.next(key, value)) {
    				String syncSeen = reader.syncSeen() ? "*" : "";//我的这个数据读出来是空的
    				System.out.printf("[%s%s]	%s	%s
    ", position, syncSeen, key,
    						value);
    				System.out.println(position+"---------"+syncSeen+"=========="+key+"————————————————"+value);
    				this.collector.emit(new Values(position,syncSeen,key,value));//将数据发送到后方,用集合是不是更好?
    				position = reader.getPosition(); //开始记录下一个目录
    			}
    		} catch (IOException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		} finally {
    			IOUtils.closeStream(reader);
    		}
    		
    		
    	}
    	
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		declarer.declare(new Fields("he","llo","wo","rd"));
    	}
    	
    }
    

    bolt类:

    
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.Map;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.storm.hdfs.bolt.AbstractHdfsBolt;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.SequenceFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.hdfs.common.AbstractHDFSWriter;
    import org.apache.storm.hdfs.common.Partitioner;
    import org.apache.storm.hdfs.common.SequenceFileWriter;
    import org.apache.storm.hdfs.common.rotation.RotationAction;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * @author cwc
     * @date 2018年7月24日  
     * @version 1.0.0 
     * @description:这个是为了给导包没有该类的同学准备的
     */
    public class SequenceFileBolt extends AbstractHdfsBolt {
    	private static final long serialVersionUID = 1L;
    
    	private static final Logger LOG = LoggerFactory.getLogger(SequenceFileBolt.class);//打印日志
    
        private SequenceFormat format;
        private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
        private transient SequenceFile.Writer writer;
    
        private String compressionCodec = "default";
        private transient CompressionCodecFactory codecFactory;
    
        public SequenceFileBolt() {
        }
    
        public SequenceFileBolt withCompressionCodec(String codec) {
            this.compressionCodec = codec;
            return this;
        }
    
        public SequenceFileBolt withFsUrl(String fsUrl) {
            this.fsUrl = fsUrl;
            return this;
        }
    
        public SequenceFileBolt withConfigKey(String configKey) {
            this.configKey = configKey;
            return this;
        }
    
        public SequenceFileBolt withFileNameFormat(FileNameFormat fileNameFormat) {
            this.fileNameFormat = fileNameFormat;
            return this;
        }
    
        public SequenceFileBolt withSequenceFormat(SequenceFormat format) {
            this.format = format;
            return this;
        }
    
        public SequenceFileBolt withSyncPolicy(SyncPolicy syncPolicy) {
            this.syncPolicy = syncPolicy;
            return this;
        }
    
        public SequenceFileBolt withRotationPolicy(FileRotationPolicy rotationPolicy) {
            this.rotationPolicy = rotationPolicy;
            return this;
        }
    
        public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compressionType) {
            this.compressionType = compressionType;
            return this;
        }
    
        public SequenceFileBolt withTickTupleIntervalSeconds(int interval) {
            this.tickTupleInterval = interval;
            return this;
        }
    
        public SequenceFileBolt addRotationAction(RotationAction action) {
            this.rotationActions.add(action);
            return this;
        }
    
        public SequenceFileBolt withRetryCount(int fileRetryCount) {
            this.fileRetryCount = fileRetryCount;
            return this;
        }
    
        public SequenceFileBolt withPartitioner(Partitioner partitioner) {
            this.partitioner = partitioner;
            return this;
        }
    
        public SequenceFileBolt withMaxOpenFiles(int maxOpenFiles) {
            this.maxOpenFiles = maxOpenFiles;
            return this;
        }
    
        @Override
        public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
            LOG.info("Preparing Sequence File Bolt...");
            if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
    
            this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
            this.codecFactory = new CompressionCodecFactory(hdfsConfig);
        }
    
        @Override
        protected String getWriterKey(Tuple tuple) {
            return "CONSTANT";
        }
    
        @Override
        protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException {
            SequenceFile.Writer writer = SequenceFile.createWriter(
                this.hdfsConfig,
                SequenceFile.Writer.file(path),
                SequenceFile.Writer.keyClass(this.format.keyClass()),
                SequenceFile.Writer.valueClass(this.format.valueClass()),
                SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec))
            );
    
            return new SequenceFileWriter(this.rotationPolicy, path, writer, this.format);
        }
    }

    SequenceFileTopology:

    
    
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentHashMap;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DefaultSequenceFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.hdfs.common.rotation.MoveFileAction;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.yaml.snakeyaml.Yaml;
    /**
     * @author cwc
     * @date 2018年7月24日  
     * @version 1.0.0 
     * @description:Sequence文件写入
     */
    public class SequenceFileTopology {
        static final String SENTENCE_SPOUT_ID = "sentence-spout";
        static final String BOLT_ID = "Topology";//定义生成文件的前缀
        static final String TOPOLOGY_NAME = "test-topology";
    
        public static void main(String[] args) throws Exception {
            Config config = new Config();
            config.setNumWorkers(1);
    
            SentenceSpout spout = new SentenceSpout();
    
            // 在每个1k元组之后同步文件系统
            SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    
            // 当文件达到5MB时进行旋转
            FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    
            FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                    .withPath("/testData/mytest/trident/")//存储位置
                    .withExtension(".seq");//后缀名
    
            // create sequence format instance.
            DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");//spout接收到的数据
    
    //        Yaml yaml = new Yaml();//这个目的就是给config传值,这个应该是从xml中获取数据
    //        InputStream in = new FileInputStream("hdfs://172.18.130.100:8020");//这里传入的数据是否是url?
    //        Map<String, Object> yamlConf = (Map<String, Object>) yaml.load(in);
    //        in.close();
            Map<String, Object> yamlConf =new HashMap<String, Object>();
            config.put("hdfs.config", yamlConf);//加载自定义的参数
    
            SequenceFileBolt bolt = new SequenceFileBolt()
                .withFsUrl("hdfs://172.18.130.100:8020")//hdfs
                .withConfigKey("hdfs.config")
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .withCompressionType(SequenceFile.CompressionType.RECORD)
                .withCompressionCodec("deflate")//放出
                .addRotationAction(new MoveFileAction().toDestination("/testData/mytest/seqOne/"));//添加一个循环操作中有个移除文件的作用是什么作用?
    
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
            // SentenceSpout --> MyBolt
            builder.setBolt(BOLT_ID, bolt, 4)
                   .shuffleGrouping(SENTENCE_SPOUT_ID);
    
            String topoName = TOPOLOGY_NAME;
            	//线上
    //        if (args.length == 3) {
    //            topoName = args[2];
    //        } else if (args.length > 3) {
    //            System.out.println("Usage: SequenceFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
    //            return;
    //        }
    //        StormSubmitter.submitTopology(topoName, config, builder.createTopology());
            //本地测试运行
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topoName, config, builder.createTopology());
     	   	try {
     		   Thread.sleep(100000);
     		   cluster.shutdown();
     	   	} catch (InterruptedException e) {
     		   // TODO Auto-generated catch block
     		   e.printStackTrace();
     	   }
        }
    
        public static void waitForSeconds(int seconds) {
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
            }
        }
    
    
        public static class SentenceSpout extends BaseRichSpout {
    
    		private static final long serialVersionUID = 1L;
    		
    		private ConcurrentHashMap<UUID, Values> pending;
            private SpoutOutputCollector collector;
            private String[] sentences = {
                "my dog has fleas",
                "i like cold beverages",
                "the dog ate my homework",
                "don't have a cow man",
                "i don't think i like fleas"
            };
            private int index = 0;
            private int count = 0;
            private long total = 0L;
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("sentence", "timestamp"));
            }
    
            public void open(Map config, TopologyContext context,
                             SpoutOutputCollector collector) {
                this.collector = collector;
                this.pending = new ConcurrentHashMap<UUID, Values>();
            }
    
            public void nextTuple() {
                Values values = new Values(sentences[index], System.currentTimeMillis());
                UUID msgId = UUID.randomUUID();
                this.pending.put(msgId, values);
                this.collector.emit(values, msgId);
                index++;
                if (index >= sentences.length) {
                    index = 0;
                }
                count++;
                total++;
                if (count > 20000) {
                    count = 0;
                    System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
                }
                Thread.yield();
            }
    
            public void ack(Object msgId) {
                //            System.out.println("ACK");
                this.pending.remove(msgId);
            }
    
            public void fail(Object msgId) {
                System.out.println("**** RESENDING FAILED TUPLE");
                this.collector.emit(this.pending.get(msgId), msgId);
            }
        }
    
        public static class MyBolt extends BaseRichBolt {
    
    		private static final long serialVersionUID = 1L;
    		
    //		private HashMap<String, Long> counts = null;
    		private HashMap<String, String> counts = null;
            private OutputCollector collector;
    
            public void prepare(Map config, TopologyContext context, OutputCollector collector) {
    //            this.counts = new HashMap<String, Long>();
            	this.counts = new HashMap<String,String>();
                this.collector = collector;
            }
    
            public void execute(Tuple tuple) {
                collector.ack(tuple);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                // this bolt does not emit anything
            }
    
            @Override
            public void cleanup() {
            }
        }
    }

    sequenceTrident:

    
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.hdfs.common.rotation.MoveFileAction;
    import org.apache.storm.hdfs.trident.HdfsState;
    import org.apache.storm.hdfs.trident.HdfsStateFactory;
    import org.apache.storm.hdfs.trident.HdfsUpdater;
    import org.apache.storm.hdfs.trident.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.trident.format.DefaultSequenceFormat;
    import org.apache.storm.hdfs.trident.format.FileNameFormat;
    import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentState;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.state.StateFactory;
    import org.apache.storm.tuple.Fields;
    import com.sunsheen.jfids.bigdata.storm.demo.hdfs.SequenceFileTopology.SentenceSpout;
    
    /**
     * @author cwc
     * @date 2018年6月5日  
     * @description:Sequence-Trident 高级接口写入
     * @version 1.0.0 
     */
    public class TridentSequenceTopology {
    
    	/**
    	 * trident与普通Topology区别在于返回值及工厂的使用
    	 * @param hdfsUrl 
    	 * @return
    	 */
        public static StormTopology buildTopology(String hdfsUrl){ 
        
        SentenceSpout spout =new SentenceSpout();
        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout1", spout);
    
        Fields hdfsFields = new Fields("sentence", "timestamp");
    
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/testData/mytest/trident/")
                .withPrefix("trident")
                .withExtension(".seq");
        //旋转
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
    
        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("timestamp", "sentence"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl(hdfsUrl)
    //            .withConfigKey("hdfs.config")
                .addRotationAction(new MoveFileAction().toDestination("/testData/mytest/trident111/"));
    
        StateFactory factory = new HdfsStateFactory().withOptions(seqOpts);
        TridentState state = stream
                .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
    
        return topology.build();}
    
        public static void main(String[] args)  {
        	String hdfsUrl ="hdfs://172.18.130.100:8020";
        	Config conf = new Config();
            conf.setMaxSpoutPending(5);
            if (args != null && args.length > 0) { 
    			//服务器
    			try {
    				StormSubmitter.submitTopology(args[1], conf, buildTopology(hdfsUrl));
    			} catch (AlreadyAliveException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InvalidTopologyException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (AuthorizationException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			}
           else{ 
    			//本地
        	   LocalCluster cluster = new LocalCluster();
        	   cluster.submitTopology("test", conf, buildTopology(hdfsUrl));
        	   try {
        		   Thread.sleep(100000);
        		   cluster.shutdown();
        	   } catch (InterruptedException e) {
        		   // TODO Auto-generated catch block
        		   e.printStackTrace();
        	   }
           }
        }
    }
    

    有疑问的多看看代码,多思考,愿诸君努力。

  • 相关阅读:
    昨晚值班将发dla的程序改好后放入正式环境
    本来今天打算不带电脑去值班
    有时候你会觉得,你的不真实让人怀疑,自己却非常尴尬
    其实对于公司的事情分布,我是昨天没有干什么
    异常处理
    反射及内置方法
    绑定方法与非绑定方法
    面向对象之多态
    面向对象之封装
    面向对象三大特性
  • 原文地址:https://www.cnblogs.com/wanchen-chen/p/12934120.html
Copyright © 2011-2022 走看看