zoukankan      html  css  js  c++  java
  • webmagic之爬取数据存入HDFS

    • 连接HDFS
    添加以下的依赖

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.5</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
    连接代码

    Configuration config = new Configuration();
    //(new URI("hdfs://192.168.146.110:9000"),config,"root")
    FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.146.110:9000"),config,"root");
    注意import的包一定要对!!!

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
     

    开发时遇到下错误以下错误:

    maven:java.lang.NoClassDefFoundError: org/apache/commons/io/Charsets
    原因是:common-io 2.2下面是没有这个Charsets的,commons-io-2.5 下面就有了这个类,所以要升级commons版本

    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.5</version>
    </dependency>
     

    • 将爬去到的数据以TXT写入hdfs文件中
    封装连接方法
    public static FileSystem connect() throws IOException, InterruptedException {
        Configuration config = new Configuration();
        //(new URI("hdfs://192.168.146.110:9000"),config,"root")
        FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.146.110:9000"),config,"root");
        return fs;
    }
    封装追加的方法

    1.权限问题
    原因:客户端追加写入会报错

    java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no
     more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[192.168.146.110:50010,DS-34d5abfb-b9c0-4aa7-aaa3-753010d9b3b6,DISK]], original=[DatanodeInfoWithStorage[192.168.146.110:50010,DS-34d5abfb-b9c0-4aa7-aaa3-753010d9b3b6,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
    解决办法:
    • 在hdfs.xml中添加配置

    config.set("dfs.support.append", "true");
    config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
    config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    • 封装方法,配置需要定义

    config.set("dfs.support.append", "true");
    config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
    config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    2.文件系统不支持问题

    原因:ChecksumFileSystem不支持追加,DistributedFileSystem 才是hdfs采用的文件系统,这两个是不一样的文件系统 ,所以实现方式是不同的。也就是说DistributedFileSystem支持append,而它不支持。
    解决办法:对filesystem进行类型转化。转化成DistributedFileSystem
    又遇到强转报错原因是:程序使用的FileSystem是LocalFileSystem,是这个类org.apache.hadoop.fs.LocalFileSystem,并非org.apache.hadoop.hdfs.DistributedFileSystem这个类,所以强转会报错,你需要在conf中设置fs.default.name这个参数值,默认是file:///,所以FileSystem fs = FileSystem.get(conf);得到的是LocalFileSystem。
    解决办法:conf.set("fs.default.name",”hdfs://master:9000“);

    完整代码如下:

    public static void write(String str)  {
            Configuration config = new Configuration();
            config.set("fs.default.name", "hdfs://192.168.146.110:9000");
            config.set("dfs.support.append", "true");
            config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
            config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    
            try {
                DistributedFileSystem fs = (DistributedFileSystem) DistributedFileSystem.get(config);
                //文件必须存在
                FSDataOutputStream outpustream = fs.append(new Path("/hadoop/Zbgg.txt"));
    
                //System.getProperty("line.separator")换行
                outpustream.writeBytes(System.getProperty("line.separator"));
                outpustream.writeBytes(str);
                outpustream.close();
                fs.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
  • 相关阅读:
    AtomicReference与volatile的区别
    深度剖析ConcurrentHashMap(转)
    ConcurrentHashMap原理分析
    Java Stack源码分析
    Fail-Fast机制详解
    TreeSet源码分析
    状态(State)模式
    原型(Prototype)模式
    职责连模式
    观察者模式(Observer)
  • 原文地址:https://www.cnblogs.com/wf1647790534/p/9802967.html
Copyright © 2011-2022 走看看