zoukankan      html  css  js  c++  java
  • Mysql增量写入Hdfs(二) --Storm+hdfs的流式处理

    一. 概述

    上一篇我们介绍了如何将数据从mysql抛到kafka,这次我们就专注于利用storm将数据写入到hdfs的过程,由于storm写入hdfs的可定制东西有些多,我们先不从kafka读取,而先自己定义一个Spout数据充当数据源,下章再进行整合。这里默认你是拥有一定的storm知识的基础,起码知道Spout和bolt是什么。

    写入hdfs可以有以下的定制策略:

    1. 自定义写入文件的名字
    2. 定义写入内容格式
    3. 满足给定条件后更改写入的文件
    4. 更改写入文件时触发的Action

    本篇会先说明如何用storm写入HDFS,写入过程一些API的描述,以及最后给定一个例子:

    storm每接收到10个Tuple后就会改变hdfs写入文件,新文件的名字就是第几次改变。

    ps:storm版本:1.1.1。Hadoop版本:2.7.4。

    接下来我们首先看看Storm如何写入HDFS。

    二.Storm写入HDFS

    Storm官方有提供了相应的API让我们可以使用。可以通过创建HdfsBolt以及定义相应的规则,即可写入HDFS 。

    首先通过maven配置依赖以及插件。

    
        <properties>
            <storm.version>1.1.1</storm.version>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>${storm.version}</version>
                <!--<scope>provided</scope>-->
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>log4j-over-slf4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>commons-collections</groupId>
                <artifactId>commons-collections</artifactId>
                <version>3.2.1</version>
            </dependency>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>15.0</version>
            </dependency>
    
            <!--hadoop模块-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.4</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.7.4</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs -->
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-hdfs</artifactId>
                <version>1.1.1</version>
                <!--<scope>test</scope>-->
            </dependency>
    
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <version>1.2.1</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>exec</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <executable>java</executable>
                        <includeProjectDependencies>true</includeProjectDependencies>
                        <includePluginDependencies>false</includePluginDependencies>
                        <classpathScope>compile</classpathScope>
                        <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass>
                    </configuration>
                </plugin>
       
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>1.7</version>
                    <configuration>
                        <createDependencyReducedPom>true</createDependencyReducedPom>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    

    这里要提一下,如果要打包部署到集群上的话,打包的插件需要使用maven-shade-plugin这个插件,然后使用maven Lifecycle中的package打包。而不是用Maven-assembly-plugin插件进行打包。

    因为使用Maven-assembly-plugin的时候,会将所有依赖的包unpack,然后在pack,这样就会出现,同样的文件被覆盖的情况。发布到集群上的时候就会报No FileSystem for scheme: hdfs的错。

    然后是使用HdfsBolt写入Hdfs。这里来看看官方文档中的例子吧。

    // 使用 "|" 来替代 ",",来进行字符分割
    RecordFormat format = new DelimitedRecordFormat()
            .withFieldDelimiter("|");
    
    // 每输入 1k 后将内容同步到 Hdfs 中
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    
    // 当文件大小达到 5MB ,转换写入文件,即写入到一个新的文件中
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    
    //当转换写入文件时,生成新文件的名字并使用
    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath("/foo/");
    
    HdfsBolt bolt = new HdfsBolt()
            .withFsUrl("hdfs://localhost:9000")
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy);
    
    //生成该 bolt
    topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
            
    

    到这里就结束了。可以将HdfsBolt当作一个Storm中特殊一些的bolt即可。这个bolt的作用即使根据接收信息写入Hdfs。

    而在新建HdfsBolt中,Storm为我们提供了相当强的灵活性,我们可以定义一些策略,比如当达成某个条件的时候转换写入文件,新写入文件的名字,写入时候的分隔符等等。

    如果选择使用的话,Storm有提供部分接口供我们使用,但如果我们觉得不够丰富也可以自定义相应的类。下面我们看看如何控制这些策略吧。

    RecordFormat

    这是一个接口,允许你自由定义接收到内容的格式。

    public interface RecordFormat extends Serializable {
        byte[] format(Tuple tuple);
    }
    

    Storm提供了DelimitedRecordFormat,使用方法在上面已经有了。这个类默认的分割符是逗号",",而你可以通过withFieldDelimiter方法改变分隔符。
    如果你的初始分隔符不是逗号的话,那么也可以重写写一个类实现RecordFormat接口即可。

    FileNameFormat

    同样是一个接口。

    public interface FileNameFormat extends Serializable {
        void prepare(Map conf, TopologyContext topologyContext);
        String getName(long rotation, long timeStamp);
        String getPath();
    }
    

    Storm所提供的默认的是org.apache.storm.hdfs.format.DefaultFileNameFormat。默认人使用的转换文件名有点长,格式是这样的:

    {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

    例如:

    MyBolt-5-7-1390579837830.txt

    默认情况下,前缀是空的,扩展标识是".txt"。

    SyncPolicy

    同步策略允许你将buffered data缓冲到Hdfs文件中(从而client可以读取数据),通过实现org.apache.storm.hdfs.sync.SyncPolicy接口:

    public interface SyncPolicy extends Serializable {
        boolean mark(Tuple tuple, long offset);
        void reset();
    }
    

    FileRotationPolicy

    这个接口允许你控制什么情况下转换写入文件。

    public interface FileRotationPolicy extends Serializable {
        boolean mark(Tuple tuple, long offset);
        void reset();
    }
    

    Storm有提供三个实现该接口的类:

    • 最简单的就是不进行转换的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy,就是什么也不干。

    • 通过文件大小触发转换的org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。

    • 通过时间条件来触发转换的org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。

    如果有更加复杂的需求也可以自己定义。

    RotationAction

    这个主要是提供一个或多个hook,可加可不加。主要是在触发写入文件转换的时候会启动。

    public interface RotationAction extends Serializable {
        void execute(FileSystem fileSystem, Path filePath) throws IOException;
    }
    

    三.实现一个例子

    了解了上面的情况后,我们会实现一个例子,根据写入记录的多少来控制写入转换(改变写入的文件),并且转换后文件的名字表示当前是第几次转换。

    首先来看看HdfsBolt的内容:

            RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
            // sync the filesystem after every 1k tuples
            SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    //        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
            /** rotate file with Date,every month create a new file
             * format:yyyymm.txt
             */
            FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
            FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
            RotationAction action = new NewFileAction();
            HdfsBolt bolt = new HdfsBolt()
                    .withFsUrl("hdfs://127.0.0.1:9000")
                    .withFileNameFormat(fileNameFormat)
                    .withRecordFormat(format)
                    .withRotationPolicy(rotationPolicy)
                    .withSyncPolicy(syncPolicy)
                    .addRotationAction(action);
    

    然后分别来看各个策略的类。

    FileRotationPolicy

    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.tuple.Tuple;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 计数以改变Hdfs写入文件的位置,当写入10次的时候,则更改写入文件,更改名字取决于 “TimesFileNameFormat”
     * 这个类是线程安全
     */
    
    public class CountStrRotationPolicy implements FileRotationPolicy {
    
    
        private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");
    
        private String date =  null;
    
        private int count = 0;
    
        public CountStrRotationPolicy(){
            this.date =  df.format(new Date());
    //        this.date = df.format(new Date());
        }
    
    
        /**
         * Called for every tuple the HdfsBolt executes.
         *
         * @param tuple  The tuple executed.
         * @param offset current offset of file being written
         * @return true if a file rotation should be performed
         */
        @Override
        public boolean mark(Tuple tuple, long offset) {
            count ++;
            if(count == 10) {
                System.out.print("num :" +count + "   ");
                count = 0;
                return true;
    
            }
            else {
                return false;
            }
        }
    
        /**
         * Called after the HdfsBolt rotates a file.
         */
        @Override
        public void reset() {
    
        }
    
        @Override
        public FileRotationPolicy copy() {
            return new CountStrRotationPolicy();
        }
    
    
    }
    

    FileNameFormat

    
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.task.TopologyContext;
    
    import java.util.Map;
    
    /**
     * 决定重新写入文件时候的名字
     * 这里会返回是第几次转换写入文件,将这个第几次做为文件名
     */
    public class TimesFileNameFormat implements FileNameFormat {
        //默认路径
        private String path = "/storm";
        //默认后缀
        private String extension = ".txt";
        private Long times = new Long(0);
    
        public TimesFileNameFormat withPath(String path){
            this.path = path;
            return this;
        }
    
        @Override
        public void prepare(Map conf, TopologyContext topologyContext) {
        }
    
    
        @Override
        public String getName(long rotation, long timeStamp) {
            times ++ ;
            //返回文件名,文件名为更换写入文件次数
            return times.toString() + this.extension;
        }
    
        public String getPath(){
            return this.path;
        }
    }
    

    RotationAction

    
    import org.apache.hadoop.fs.FileContext;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.storm.hdfs.common.rotation.RotationAction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.net.URI;
    /**
        当转换写入文件时候调用的 hook ,这里仅写入日志。
     */
    public class NewFileAction implements RotationAction {
        private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);
    
    
    
        @Override
        public void execute(FileSystem fileSystem, Path filePath) throws IOException {
            LOG.info("Hdfs change the written file!!");
    
            return;
        }
    }
    

    OK,这样就大功告成了。通过上面的代码,每接收到10个Tuple后就会转换写入文件,新文件的名字就是第几次转换。

    完整代码包括一个随机生成字符串的Spout,可以到我的github上查看。

    StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo


    推荐阅读:
    Mysql 流增量写入 Hdfs(一) --从 mysql 到 kafka
    Spark SQL,如何将 DataFrame 转为 json 格式

  • 相关阅读:
    【算法•日更•第五十五期】知识扫盲:什么是卡常数?
    【算法•日更•第五十四期】知识扫盲:什么是operator?
    【算法•日更•第五十三期】知识扫盲:什么是积性函数?
    【原】无脑操作:Webstorm集成Git/Github
    【原】无脑操作:Markdown可以这样玩
    【原】无脑操作:Eclipse + Maven + jFinal + MariaDB 环境搭建
    【原】无脑操作:Centos 7后台运行及终止jar包程序
    【原】无脑操作:TypeScript环境搭建
    【原】无脑操作:Windows下搭建Kafka运行环境
    【原】无脑操作:Chrome浏览器安装Vue.js devtool
  • 原文地址:https://www.cnblogs.com/listenfwind/p/10111033.html
Copyright © 2011-2022 走看看