zoukankan      html  css  js  c++  java
  • storm写入到hdfs

    spout

    package com.heibaiying.component;
    
    import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.*;
    
    
    /**
     * 产生词频样本的数据源
     */
    public class DataSourceSpout extends BaseRichSpout {
    
        private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
    
        private SpoutOutputCollector spoutOutputCollector;
    
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }
    
        @Override
        public void nextTuple() {
            // 模拟产生数据
            String lineData = productData();
            spoutOutputCollector.emit(new Values(lineData));   //向BOLT 提交信息
            Utils.sleep(1000);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("line"));                            //spout   象征性ID
        }
    
    
        /**
         * 模拟数据
         */
        private String productData() {
            Collections.shuffle(list);           //打乱顺序重新排序
            Random random = new Random();              //声明一个随机数的对象
            int endIndex = random.nextInt(list.size()) % (list.size()) + 1;   //取随机数
            return StringUtils.join(list.toArray(), "	", 0, endIndex);    //返回   0  ---n  长度的 数组的值  
        }
    
    }

    bolt

    package com.heibaiying;
    
    import com.heibaiying.component.DataSourceSpout;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.hdfs.bolt.HdfsBolt;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.RecordFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * 将样本数据存储到HDFS中
     */
    public class DataToHdfsApp {
    
        private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
        private static final String HDFS_BOLT = "hdfsBolt";
    
        public static void main(String[] args) {
    
            // 指定Hadoop的用户名 如果不指定,则在HDFS创建目录时候有可能抛出无权限的异常(RemoteException: Permission denied)
            System.setProperty("HADOOP_USER_NAME", "root");
    
            // 定义输出字段(Field)之间的分隔符
            RecordFormat format = new DelimitedRecordFormat()
                    .withFieldDelimiter("|");
    
            // 同步策略: 每100个tuples之后就会把数据从缓存刷新到HDFS中
            SyncPolicy syncPolicy = new CountSyncPolicy(100);
    
            // 文件策略: 每个文件大小上限1M,超过限定时,创建新文件并继续写入
            FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);
    
            // 定义存储路径
            FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                    .withPath("/storm-hdfs/");
    
            // 定义HdfsBolt
            HdfsBolt hdfsBolt = new HdfsBolt()
                    .withFsUrl("hdfs://192.168.199.125:9000")
                    .withFileNameFormat(fileNameFormat)
                    .withRecordFormat(format)
                    .withRotationPolicy(rotationPolicy)
                    .withSyncPolicy(syncPolicy);
    
    
            // 构建Topology
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
            // save to HDFS
            builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);
    
    
            // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
            if (args.length > 0 && args[0].equals("cluster")) {
                try {
                    StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
                } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                    e.printStackTrace();
                }
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("LocalDataToHdfsApp",
                        new Config(), builder.createTopology());
            }
        }
    }

    pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.heibaiying</groupId>
        <artifactId>storm-hdfs-integration</artifactId>
        <version>1.0</version>
    
        <properties>
            <storm.version>1.2.2</storm.version>
        </properties>
    
        <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>${storm.version}</version>
            </dependency>
            <!--Storm整合HDFS依赖-->
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>${storm.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.0-cdh5.15.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.0-cdh5.15.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.6.0-cdh5.15.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    
    
        <build>
            <plugins>
                <!--使用java8编译-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>8</source>
                        <target>8</target>
                    </configuration>
                </plugin>
    
                <!--使用shade进行打包-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <configuration>
                        <createDependencyReducedPom>true</createDependencyReducedPom>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.sf</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.dsa</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                    <exclude>META-INF/*.rsa</exclude>
                                    <exclude>META-INF/*.EC</exclude>
                                    <exclude>META-INF/*.ec</exclude>
                                    <exclude>META-INF/MSFTSIG.SF</exclude>
                                    <exclude>META-INF/MSFTSIG.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.storm:storm-core</exclude>
                            </excludes>
                        </artifactSet>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    import java.util.Random;
    
    /**
     * Created with IntelliJ IDEA.
     * User: @别慌
     * Date: 2019-07-07
     * Time: 22:00
     * Description:
     */
    public class test {
    
        public static void main(String arg[]) {
    
            List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
            Collections.shuffle(list);            //打乱顺序重新排序
            for(int i=0;i<list.size();i++){
                System.out.println(list.get(i));
            }
            System.out.println("+-------------------------------------------------------+");
            System.out.println("+-------------------------------------------------------+");
    
            Random random = new Random();
            int endIndex = random.nextInt(list.size()) % (list.size()) + 1;         //1/3  的余数
            System.out.println(endIndex);
    
            System.out.println(3%1);  //1除三的余数
            }
        }
    RUSH B
  • 相关阅读:
    POJ 2923 Relocation (状态压缩,01背包)
    HDU 2126 Buy the souvenirs (01背包,输出方案数)
    hdu 2639 Bone Collector II (01背包,求第k优解)
    UVA 562 Dividing coins (01背包)
    POJ 3437 Tree Grafting
    Light OJ 1095 Arrange the Numbers(容斥)
    BZOJ 1560 火星藏宝图(DP)
    POJ 3675 Telescope
    POJ 2986 A Triangle and a Circle
    BZOJ 1040 骑士
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11148482.html
Copyright © 2011-2022 走看看