zoukankan      html  css  js  c++  java
  • 大数据综合项目DocCloud之需求分析与功能实现详细(续更)

    DocCloud项目需求

    项目背景:

    在一些大型企事业单位,工作中存在各种各样的工作文档,技术文档,规范等等。这些文档以word,xls,ppt,wps,pdf,txt存在。在此项目之前,文档的分享主要靠单位内部人员的互相发送。没有一个统一的平台对企业现存的各种文档进行统一管理。DocCloud项目提供了统一的文档管理平台。用户可以将文档上传至平台,所有其他用户可以在线查看此文档。同时满足搜索文档,分享,收藏等等一系列需求。在实践中,有百度文库,doc88,豆丁等公网项目。但是没有一个专门为企业用户服务的一个文档管理平台。

    项目需求:

    1.文档的统一存储

    2.文档的检索

    3.文档的在线预览

    4.文档分享

    5.文档推荐

    6.文档上传下载

    7.用户的注册,登录

    8.文档权限管理

    项目架构:

    HDFS+LibreOffice6.0+solr+nginx+flume+hive+springboot+jpa+js+html+css

    文档存储: HDFS

    文件存储:1.本地(linux)-web服务器 优缺点:内存小,但是存储方便

    2.ftp服务器(搭建一个存储文件的集群)优缺点:文档存储内存够,但是不能容错

    3.hdfs (hadoop集群)优缺点:可扩展、容错、分布式存储

    文档格式转换: LibreOffice6.0

    因为存储的内容不是纯文本,就是传统的io流不能用、需要变成纯文本文件(txt)

    doc、docx、ppt---→html---→txt

    使用LibreOffice6.0,不适用word的原因:1.没有接口(不开源)2.不能再linux上运行

    进程间通信:hadoop ipc

    全文检索: solr

    日志记录服务器:ngnix

    web日志采集:flume

    日志分析:hive

    webMvc:springboot

    持久层框架:jpa

    单元测试:junit4

    前端:css+html+js+jquery+bootstrap

    版本管理:svn

    依赖管理:maven

    开发环境:idea

    部署环境:linux

    数据库:mysql


     

    项目具体设计:

    1.文档的上传下载

    a.用户在前端点击上传按钮

    b.在本地选择上传文档

    c.开始上传

    b.服务端校验文件后缀是否符合文档格式。

    允许格式:doc,docx,ppt,pptx,xls,xlsx,pdf,txt

    目的:避免上传不能转码的文档如:exe,zip,….

    e.校验文档大小,允许128兆以下的文档上传。

    128M:为了使文档在hdfs是一个块的形式保存。

    f.计算文档的md5值,判断文档是否在文库中已经存在,如果存在,告知用户已经存在。

    g.不存在,则上传至hdfs,同时数据库中保存用户上传文档信息。

    数据保存在hdfs上,元数据保存在数据库mysql

    2.上传成功以后需要提交文档转换任务(主要功能如下)

          1>转换成html

           2>转换成pdf提取缩略图,页数

           3>提取文本 建立索引


    以下代码没有涉及前端、只是后台测试(使用Postman测试),部分参数都没有从session中获取,都是随机生成的

    没有软件的附上资源(下载双击安装就可以):链接:https://pan.baidu.com/s/1SibrDOB4GwkX4L0iw3nYTA 
    提取码:bisn

    一、功能一:上传文件/2018.10.29

    日志配置:

             在类名上添加注解

             @Slf4j

             直接在类中使用log记录日志

    文件上传:

             关键注解:

             @RequestParam("file") MultipartFile file

             获取上传文件名:

             file.getOriginalFilename()

     

    创建一个java项目DocCloud

    在DocCloud中创建model---->doccloudweb(模块名要小写)

    在module中选择Spring Initializr

    在创建module时,选择pom中的依赖如下,选完之后一路下一步

    core下选择DevTools、LomBok

    web下选择web

    sql下选择JPA、mysql

    NoSQL下选solr

    <dependencies>
        <!--数据相关依赖-持久层-->
        <!--<dependency>-->
            <!--<groupId>org.springframework.boot</groupId>-->
            <!--<artifactId>spring-boot-starter-data-jpa</artifactId>-->
        <!--</dependency>-->
        <!--全文检索-->
        <!--<dependency>-->
            <!--<groupId>org.springframework.boot</groupId>-->
            <!--<artifactId>spring-boot-starter-data-solr</artifactId>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--热部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--数据库连接-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>                   
    </dependencies>

    <!--编译、打jar包-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    以上是创建项目选择的依赖,请添加以下外加依赖

    <!--上传文件到hdfs上-->

            <dependency>

                <groupId>org.apache.hadoop</groupId>

                <artifactId>hadoop-common</artifactId>

                <version>2.7.5</version>

            </dependency>

            <dependency>

                <groupId>org.apache.hadoop</groupId>

                <artifactId>hadoop-hdfs</artifactId>

                <version>2.7.5</version>

            </dependency>

    数据库创建Doc_cloud

    Doc:记录文件属性

    使用jpa---->java+persistence+api

    JPA是Java Persistence API的简称,中文名Java持久层API,是JDK 5.0注解或XML描述对象-关系表的映射关系,并将运行期的实体对象持久化到数据库中

    1.配置数据源application.properties

    #数据源配置
    spring.datasource.name=root
    spring.datasource.password=123
    #true:表示展示sql语句
    spring.jpa.show-sql=true
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://localhost:3306/doc_cloud

    2..上传文件到hdfs上用到的core-site.xml

    <?xml version="1.0" encoding="UTF-8"?>

    <configuration>

        <property>

            <name>fs.defaultFS</name>

            <value>hdfs://master2:9000</value>

        </property>

        <property>

            <name>fs.hdfs.impl</name>

            <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>

        </property>

    </configuration>

    3.编写controller层-- DocController

    import com.zhiyou100.doccloudweb.service.DocService;

    import com.zhiyou100.doccloudweb.util.HdfsUtil;

    import com.zhiyou100.doccloudweb.util.MD5Util;

    import com.zhiyou100.doccloudweb.entity.Doc;

    import lombok.extern.slf4j.Slf4j;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Controller;

    import org.springframework.web.bind.annotation.RequestMapping;

    import org.springframework.web.bind.annotation.RequestParam;

    import org.springframework.web.bind.annotation.ResponseBody;

    import org.springframework.web.multipart.MultipartFile;

    import java.io.IOException;

    import java.text.SimpleDateFormat;

    import java.util.Date;

    import java.util.Optional;

    import java.util.Random;

    @Controller  //表示是controller层--业务层

    @RequestMapping("/doc")

    @Slf4j

    public class DocController {

        @Autowired

        private DocService docService;

        //定义合法的文件后缀类型

        public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};

        //定义文件最大大小

        public static final int DOC_MAX_SIZE = 128*1024*1024;

        //定义文件保存到hdfs上的根目录

        public static final String HOME="hdfs://192.168.228.13:9000/doccloud";

        @RequestMapping("/upload")

        @ResponseBody

        public String upload(@RequestParam("file") MultipartFile file){

            //判断是否是文件

            if (file.isEmpty()){

                return "file is empty";

            }

            //获取文件名

            String filename = file.getOriginalFilename();

            //以点分割-获取文件后缀

            String[] strings = filename.split("\.");

            if (strings.length==1){

                return "file does not has suffix";

            }

            String suffix = strings[1];

            log.info("doc suffix is {}",suffix);

            //1.判断文件后缀是否合法

            boolean flag = isSuffixLegal(suffix);

            if (!flag){

                return "file is illegal";

            }

            try {

                //2.判断文件大小是否合法

                byte[] bytes = file.getBytes();

                log.info("file size is {}",bytes.length);

                if (bytes.length>DOC_MAX_SIZE){

                    return "file is large,file Max size:"+DOC_MAX_SIZE;

                }

                //3.计算文档的MD5值

                String md5 = getMD5(bytes);

                log.info("file is md5 {} ",md5);

                //用户上传文件,保存到数据库

                //1.校验数据库中的md5值,判断数据库中是否存在

                Optional<Doc> doc = docService.findByMd5(md5);

                if (doc.isPresent()){

                    //2.如果存在,更新

                    // 2.1获取文件对象

                    Doc docEntity = doc.get();

                    //2.2设置文件更新的人

                    docEntity.setUserId(new Random().nextInt());

                    //2.3保存到数据库

                    docService.save(docEntity);

                }else {

                    //3.如果不存在,将文件元数据保存到数据库,将数据保存到hdfs

                    //3.1保存数据到hdfs

                    //3.1.1生成文件保存路径:HOME+当前时间

                    String date = getDate();

                    String dst = HOME+"/"+date+"/"+file.getOriginalFilename()+"/";

                    log.info("file dst {}",dst);

                    //3.1.2上传文件

                    HdfsUtil.upload(bytes,file.getOriginalFilename(),dst);

                    //3.2将元数据保存到数据库

                    //3.2.1创建一个文件对象

                    Doc docEntity = new Doc();

                    //3.2.2设置作者

                    docEntity.setUserId(new Random().nextInt());

                    //3.2.3设置备注

                    docEntity.setDocComment("hadoop");

                    //3.2.4设置文件路径

                    docEntity.setDocDir(dst);

                    //3.2.5设置文件名

                    docEntity.setDocName(filename);

                    //3.2.6设置文件大小

                    docEntity.setDocSize(bytes.length);

                    //3.2.7设置文件权限

                    docEntity.setDocPermission("1");

                    //3.2.8设置文件类型(后缀)

                    docEntity.setDocType(suffix);

                    //3.2.9设置文件状态

                    docEntity.setDocStatus("upload");

                    //3.2.10设置文件的md5值--保证文件的唯一性

                    docEntity.setMd5(md5);

                    //3.2.11设置文件创作时间

                    docEntity.setDocCreateTime(new Date());

                    //3.2.12保存元数据

                    docService.save(docEntity);

                }

            } catch (IOException e) {

                e.printStackTrace();

            }

            return "upload success";

        }

        /**

         * 获取当前是时间,用于文件的保存路径

         * @return

         */

        private String getDate() {

            Date date = new Date();

            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");

            return simpleDateFormat.format(date);

        }

        /**

         * 计算字节数组的MD5值

         * @param bytes

         * @return

         */

        private String getMD5(byte[] bytes) {

            return MD5Util.getMD5String(bytes);

        }

        /**

         * 判断文件后缀是否合法

         * @param suffix

         * @return

         */

        private boolean isSuffixLegal(String suffix) {

            for (String docsuffix :

                    DOC_SUFFIXS) {

                if (suffix.equals(docsuffix)){

                    return true;

                }

            }

            return false;

        }

    }

     

    4.编写业务层service层-- DocService

    import com.zhiyou100.doccloudweb.dao.DocRepository;

    import com.zhiyou100.doccloudweb.entity.Doc;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Service;

    import java.util.Optional;

    @Service

    public class DocService {

        @Autowired

        private DocRepository docRepository;

        //通过id获取文件对象

        public Optional<Doc> findById(int id) {

            return docRepository.findById(id);

        }

        //通过MD5获取文件对象

        public Optional<Doc> findByMd5(String md5) {

            return docRepository.findByMd5(md5);

        }

        //保存文件对象到数据库

        public void save(Doc docEntity) {

            docRepository.save(docEntity);

        }

    }

        //定义合法的文件后缀类型

        public static final String[] DOC_SUFFIXS= new String[]{"doc", "docx", "ppt", "pptx", "txt", "xls", "xlsx", "pdf"};

        //定义文件最大大小

        public static final int DOC_MAX_SIZE = 128*1024*1024;

        @RequestMapping("/doclist")

        @ResponseBody

        Doc doList(){

            Optional<Doc> id = docService.findById(1);

            return null;

        }

     

    5.dao层—持久层--DocRepository

    import com.zhiyou100.doccloudweb.entity.Doc;

    import org.springframework.data.jpa.repository.JpaRepository;

    import org.springframework.stereotype.Repository;

    import java.util.Optional;

    @Repository

    //Doc:表示定义的实体类,Integer:表示主键类型

    public interface DocRepository extends JpaRepository<Doc,Integer> {

        //利用反射机制自动识别

        Optional<Doc> findByMd5(String md5);

    }

     

    6.实体层—Doc

    import lombok.Data;

    import org.springframework.web.bind.annotation.ResponseBody;

    import javax.persistence.*;

    import java.util.Date;

    /**

     * 文件属性

     */

    @Entity

    @Table(name = "doc") //映射到数据库中的表

    @Data //get/set

    public class Doc {

     

        @Id //主键

        //告诉框架id生成策略(怎么生成)GenerationType.IDENTITY:表示自动生成

        @GeneratedValue(strategy = GenerationType.IDENTITY)

        private int id;

        @Column(name = "md5")//如果数据库字段与entity中字段名一样,则不用加此注解

        private String md5;

        @Column(name = "doc_name")

        private String docName;

        @Column(name = "doc_type")

        private String docType;

        @Column(name = "doc_status")

        private String docStatus;

        @Column(name = "doc_size")

        private int docSize;

        @Column(name = "doc_dir")

        private String docDir;

        @Column(name = "user_id")

        private int userId;

        @Column(name = "doc_create_time")

        private Date docCreateTime;

        @Column(name = "doc_comment")

        private String docComment;

        @Column(name = "doc_permission")

        private String docPermission;

    }

    7.工具类

    HdfsUtil 

    import com.google.common.io.Resources;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import java.io.IOException;
    
    /*
    *@ClassName:HdfsUtil
     @Description:TODO
     @Author:
     @Date:2018/10/29 17:17 
     @Version:v1.0
    */
    public class HdfsUtil {
        //文档上传工具类
        public static void upload(byte[] src, String docName, String dst) throws IOException {
            //加载配置文件
            Configuration coreSiteConf = new Configuration();
            coreSiteConf.addResource(Resources.getResource("core-site.xml"));
            //获取文件系统客户端对象
            FileSystem fileSystem = FileSystem.get(coreSiteConf);
    
            FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));
    
            fsDataOutputStream.write(src);
            fsDataOutputStream.close();
            fileSystem.close();
        }
    }
    

    MD5Util 

    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    
    public class MD5Util {
        protected static char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
        protected static MessageDigest messagedigest = null;
    
        /**
         * MessageDigest初始化
         *
         * @author
         */
        static {
            try {
                messagedigest = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                System.err.println("MD5FileUtil messagedigest初始化失败");
                e.printStackTrace();
            }
        }
    
        /**
         * 对文件进行MD5加密
         *
         * @author
         */
        public static String getFileMD5String(File file) throws IOException {
            FileInputStream in = new FileInputStream(file);
            FileChannel ch = in.getChannel();
            MappedByteBuffer byteBuffer = ch.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
            messagedigest.update(byteBuffer);
            return bufferToHex(messagedigest.digest());
        }
    
        /**
         * 对字符串进行MD5加密
         *
         * @author
         */
        public static String getMD5String(String s) {
            return getMD5String(s.getBytes());
        }
    
        /**
         * 对byte类型的数组进行MD5加密
         *
         * @author
         */
        public static String getMD5String(byte[] bytes) {
            messagedigest.update(bytes);
            return bufferToHex(messagedigest.digest());
        }
    
        private static String bufferToHex(byte bytes[]) {
            return bufferToHex(bytes, 0, bytes.length);
        }
    
        private static String bufferToHex(byte bytes[], int m, int n) {
            StringBuffer stringbuffer = new StringBuffer(2 * n);
            int k = m + n;
            for (int l = m; l < k; l++) {
                char c0 = hexDigits[(bytes[l] & 0xf0) >> 4];
                char c1 = hexDigits[bytes[l] & 0xf];
                stringbuffer.append(c0);
                stringbuffer.append(c1);
            }
            return stringbuffer.toString();
        }
    }
    

    二、功能:文档转换

    上传成功以后需要提交文档转换任务(主要功能如下)

          1>转换成html

           2>转换成pdf提取缩略图,页数

           3>提取文本 建立索引

    1.定义一个docjob对象,用于封装任务信息

    2.实现writable接口。因为要通过hadoop ipc序列化实现文档转换守护进程
    该进程的作用是完成存放在本节点文档的转换,索引的任务。

    1.文档转换成htm!通过runtime.exec执行命令来实现

    2.通过hadoop ipc来接受任务

    hadoop ipc
    hadoop ipc是一套hadoop自带的成熟的rpc框架,性能高,稳定性性强。

    server:
    a.服务端定义接口b.定义按口的实现类
    c用hadoop ipc暴露服务

    client;
    通过rpc.geproxy来调用服务岗的接口。


    下面创建的是服务端的代码: 

    1.新建模块--docservicedeamon文件转换守护进程

    2.pom文件中的依赖

    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.5</version>
    </dependency>
    <!--ipc通信模块-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.5</version>
    </dependency>
    <!--注解、-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
    <!--berklydb数据库依赖-->
    <!-- https://mvnrepository.com/artifact/com.sleepycat/je -->
    <dependency>
        <groupId>com.sleepycat</groupId>
        <artifactId>je</artifactId>
        <version>5.0.73</version>
    </dependency>
    <!--hdfs文件上传与下载-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.5</version>
    </dependency>

    3.配置core-site.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property>
            <name>fs.defaultFS</name>
            <value>hdfs://master2:9000</value>
        </property>
        <property>
            <name>fs.hdfs.impl</name>
            <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        </property>
    </configuration>
    

    4.类一---DocJob

    import lombok.Data;
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.io.Serializable;
    
    /**
     *此方法用于封装任务信息
     */
    @Data
    public class DocJob implements Writable,Serializable {
        private static final long serialVersionUID = 12345678L;
        //任务id
        private int id;
        //任务名
        private String name;
        //任务类型
        private DocJobType jobType;
        //提交者
        private int userId;
        //提交时间
        private long submitTime;
        //完成时间
        private long finishTime;
        //任务状态
        private JobStatus jobStatus;
        //任务重试次数
        private int retryTime;
        //文档输入路径
        private String input;
        //任务输出路径
        private String output;
        //文件名
        private String fileName;
        public void write(DataOutput out) throws IOException {
            out.writeInt(id);
            out.writeUTF(name);
            out.writeUTF(jobType.name());
            out.writeInt(userId);
            out.writeLong(finishTime);
            out.writeLong(submitTime);
            out.writeUTF(jobStatus.name());
            out.writeInt(retryTime);
            out.writeUTF(input);
            out.writeUTF(output);
            out.writeUTF(fileName);
        }
    
        public void readFields(DataInput in) throws IOException {
            id= in.readInt();
            name=in.readUTF();
            jobType=DocJobType.valueOf(in.readUTF());
            userId=in.readInt();
            finishTime=in.readLong();
            submitTime=in.readLong();
            jobStatus=JobStatus.valueOf(in.readUTF());
            retryTime=in.readInt();
            input=in.readUTF();
            output=in.readUTF();
            fileName=in.readUTF();
        }
    }
    

    5.类二、DocJobType

    /**
     * 项目的类型:文档转换、定义索引...
     */
    public enum DocJobType {
        DOC_JOB_CONVERT,DOC_JOB_CREATE_INDEX,DOC_JOB_UPDATE_INDEX
    }

    6.类三、JobStatus

    /**
     * 文档状态:准备、提交、运行、失败、完成
     */
    public enum JobStatus {
        PREPARE,SUBMIT,RUNNING,FAILED,SUCCEED,
    }
    

    7类四、.JobDaemonService

    /**
     * 服务端
     * 1.定义接口继承VersionedProtocol
     */
    public interface JobDaemonService extends VersionedProtocol {
        //定义通信间的暗号
        long versionID=1L;
        //定义提交方法
        void submitDocJob(DocJob job);
    
    }

    8.类五、JobDaemonServiceImpl

    import com.zhiyou100.doccloud.utils.BdbPersistentQueue;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.hadoop.ipc.ProtocolSignature;
    
    
    import java.io.File;
    import java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 2.定义接口的实现类
     * 实现Runnable接口:是为了使用多线程处理
     */
    @Slf4j
    public class JobDaemonServiceImpl implements JobDaemonService,Runnable{
        //定义将hdfs下载到本地的目录的根路径
        private static final String WORK_DIR="/tmp/docjobdaemon/";
        //定义持久化对象
        public  BdbPersistentQueue<DocJob> queue;
        //定义线程池--多线程并行处理
        private ExecutorService pool = Executors.newFixedThreadPool(4);
        //定义一个标准-让线程运行
        private boolean flag = true;
    
        //构造方法:用于创建berkly数据库目录,并初始化持久化队列
        public JobDaemonServiceImpl(){
            //创建工作目录--本地保存路径
            File workDir = new File(WORK_DIR + "/" + "bdb/");
            if (!workDir.exists()){
                //如果不存在将创建
                workDir.mkdirs();
                System.out.println(workDir.getAbsolutePath());
            }
            //初始化持久化队列
            queue = new BdbPersistentQueue<DocJob>(WORK_DIR+"/"+"bdb/", "docjob", DocJob.class);
        }
    
        public void submitDocJob(DocJob job) {
            System.out.println(job);
            //将任务保存在序列化队列中,1.保证任务不丢失   2.并发控制,内存溢出
            log.info("receive job {}",job);
            queue.offer(job);
        }
    
        public long getProtocolVersion(String s, long l) throws IOException {
            return versionID;
        }
    
        public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
            return null;
        }
    
        @Override
        public void run() {
            while (flag){
                //将任务从序列化队列中取出任务,poll:每取出一个就从磁盘中移除一个
                DocJob docJob = queue.poll();
                //判断docjob中否为空
                if (docJob==null){
                    //为空,等待5000毫秒
                    try {
                        Thread.sleep(5000);
                        System.out.println("waiting for docjob");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else {
                    pool.submit(new DocJobHandler(docJob));
                }
            }
        }
    }

    9.类六、Main

    import com.zhiyou100.doccloud.job.JobDaemonService;
    import com.zhiyou100.doccloud.job.JobDaemonServiceImpl;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    
    import java.io.IOException;
    
    /**
     * 守护进程--项目的入口类
     * 3.服务端:暴露端口
     */
    public class Main {
        public static void main(String[] args) throws IOException {
            //创建服务端接口实现类对象
            JobDaemonServiceImpl instance = new JobDaemonServiceImpl();
            //开启线程
            new Thread(instance).start();
    
            // 创建一个RPC builder
            RPC.Builder builder = new RPC.Builder(new Configuration());
    
            //指定RPC Server的参数
            builder.setBindAddress("localhost");
            builder.setPort(7788);
    
            //将自己的程序部署到server上
            builder.setProtocol(JobDaemonService.class);
            builder.setInstance(instance);
    
            //创建Server
            RPC.Server server = builder.build();
    
            //启动服务
            server.start();
        }
    }

    10.类七--DocJobHandler

    import com.zhiyou100.doccloud.utils.HdfsUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.io.IOUtils;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.UUID;
    
    @Slf4j
    public class DocJobHandler implements Runnable {
        private DocJob docJob;
    
        public DocJobHandler(DocJob docJob) {
            this.docJob = docJob;
            log.info("start to deal job {}",docJob);
        }
    
        /**
         *将文件冲hdfs上下载到本地,再将文件格式转化成HTML,最终上传到hdfs上
         */
        @Override
        public void run() {
            //1.将hdfs上的文件下载到本地
            //1.1获取文件的下载路径(在hdfs上的位置)
            String input = docJob.getInput();
            //1.2创建目标路径(下载到本地的路径)
            String tmpWorkDirPath = "/tmp/docjobdaemon/" + UUID.randomUUID().toString() + "/";
            File tmpWorkDir = new File(tmpWorkDirPath);
            tmpWorkDir.mkdirs();
            System.out.println("tmpWorkDirPath: "+tmpWorkDirPath);
            //1.3下载文件到临时目录
            try {
                HdfsUtil.copyToLocal(input,tmpWorkDirPath);
                log.info("download file to {}",tmpWorkDirPath);
                //step1:将下载到本地的文件格式转化成HTML
                String command = "D:\soft\LibreOffice_6.0.6\program\soffice --headless --invisible --convert-to html " + docJob.getFileName();
                Process process = Runtime.getRuntime().exec(command, null, tmpWorkDir);
                //结果信息
                System.out.println(IOUtils.toString(process.getInputStream()));
                //错误信息
                System.out.println(IOUtils.toString(process.getErrorStream()));
                //step2 转换成pdf
                //step3 提取页码
                //step4 提取首页缩略图
                //step5 利用solr建立索引
                //step6 上传结果
                //step7 清理临时目录
                //step8 任务成功回调
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }

    工具类、

    将hdfs上的文件下载到本地--HdfsUtil

    import com.google.common.io.Resources;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;

    import java.io.IOException;
    import java.net.URI;

    public class HdfsUtil {
        public static final String HOME="hdfs://192.168.228.13:9000/";
        //文档上传工具类
        public static void upload(byte[] src, String docName, String dst) throws IOException {
            //加载配置文件
            Configuration coreSiteConf = new Configuration();
            coreSiteConf.addResource(Resources.getResource("core-site.xml"));
            //获取文件系统客户端对象
            FileSystem fileSystem = FileSystem.get(coreSiteConf);

            FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(dst + "/" + docName));

            fsDataOutputStream.write(src);
            fsDataOutputStream.close();
            fileSystem.close();
        }

        /**
         * 将集群的问价下载到本地
         * @param dst
         * @param localPath
         * @throws IOException
         */
        public static void copyToLocal(String dst,String localPath) throws IOException {
            Configuration conf = new Configuration();
            conf.addResource(Resources.getResource("core-site.xml"));
            FileSystem fs = FileSystem.get(URI.create(dst),conf);
            fs.copyToLocalFile(new Path(dst),new Path(localPath));
            fs.close();
        }
    }

    持久化队列,基于BDB实现--BdbPersistentQueue&&BdbEnvironment

    
    import java.io.File;
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.AbstractQueue;
    import java.util.Iterator;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.apache.commons.io.FileUtils;
    
    import com.sleepycat.bind.EntryBinding;
    import com.sleepycat.bind.serial.SerialBinding;
    import com.sleepycat.bind.serial.StoredClassCatalog;
    import com.sleepycat.bind.tuple.TupleBinding;
    import com.sleepycat.collections.StoredMap;
    import com.sleepycat.collections.StoredSortedMap;
    import com.sleepycat.je.Database;
    import com.sleepycat.je.DatabaseConfig;
    import com.sleepycat.je.DatabaseException;
    import com.sleepycat.je.DatabaseExistsException;
    import com.sleepycat.je.DatabaseNotFoundException;
    import com.sleepycat.je.EnvironmentConfig;
    /**
     * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭
     * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间
     * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,
     * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现
     *
     * @contributor 
     * @param <E>
     */
    public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
            Serializable {
        private static final long serialVersionUID = 3427799316155220967L;
        private transient BdbEnvironment dbEnv;            // 数据库环境,无需序列化
        private transient Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化
        private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key为指针位置,Value为值,无需序列化
        private transient String dbDir;                 // 数据库所在目录
        private transient String dbName;                // 数据库名字
        //AtomicLong:元子类型,线程安全
        //i++线程不安全
        private AtomicLong headIndex;                   // 头部指针
        private AtomicLong tailIndex;                   // 尾部指针
        private transient E peekItem=null;              // 当前获取的值
    
        /**
         * 构造函数,传入BDB数据库
         *
         * @param db
         * @param valueClass
         * @param classCatalog
         */
        public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
            this.queueDb=db;
            this.dbName=db.getDatabaseName();
            headIndex=new AtomicLong(0);
            tailIndex=new AtomicLong(0);
            bindDatabase(queueDb,valueClass,classCatalog);
        }
        /**
         * 构造函数,传入BDB数据库位置和名字,自己创建数据库
         *
         * @param dbDir
         * @param dbName
         * @param valueClass
         */
        public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
            //headIndex=new AtomicLong(0);
            //tailIndex=new AtomicLong(0);
            this.dbDir=dbDir;
            this.dbName=dbName;
            createAndBindDatabase(dbDir,dbName,valueClass);
        }
        /**
         * 绑定数据库
         *
         * @param db
         * @param valueClass
         * @param classCatalog
         */
        public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
            EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
            if(valueBinding == null) {
                valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化绑定
            }
            queueDb = db;
            queueMap = new StoredSortedMap<Long,E>(
                    db,                                             // db
                    TupleBinding.getPrimitiveBinding(Long.class),   //Key 序列化类型
                    valueBinding,                                   // Value
                    true);                                          // allow write
            //todo
            Long firstKey = ((StoredSortedMap<Long, E>) queueMap).firstKey();
            Long lastKey = ((StoredSortedMap<Long, E>) queueMap).lastKey();
    
            headIndex=new AtomicLong(firstKey == null ? 0 : firstKey);
            tailIndex=new AtomicLong(lastKey==null?0:lastKey+1);
        }
        /**
         * 创建以及绑定数据库
         *
         * @param dbDir
         * @param dbName
         * @param valueClass
         * @throws DatabaseNotFoundException
         * @throws DatabaseExistsException
         * @throws DatabaseException
         * @throws IllegalArgumentException
         */
        private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
                DatabaseExistsException,DatabaseException,IllegalArgumentException{
            File envFile = null;
            EnvironmentConfig envConfig = null;
            DatabaseConfig dbConfig = null;
            Database db=null;
    
            try {
                // 数据库位置
                envFile = new File(dbDir);
    
                // 数据库环境配置
                envConfig = new EnvironmentConfig();
                envConfig.setAllowCreate(true);
                //不支持事务
                envConfig.setTransactional(false);
    
                // 数据库配置
                dbConfig = new DatabaseConfig();
                dbConfig.setAllowCreate(true);
                dbConfig.setTransactional(false);
                //是否要延迟写
                dbConfig.setDeferredWrite(true);
    
                // 创建环境
                dbEnv = new BdbEnvironment(envFile, envConfig);
                // 打开数据库
                db = dbEnv.openDatabase(null, dbName, dbConfig);
                // 绑定数据库
                bindDatabase(db,valueClass,dbEnv.getClassCatalog());
    
            } catch (DatabaseNotFoundException e) {
                throw e;
            } catch (DatabaseExistsException e) {
                throw e;
            } catch (DatabaseException e) {
                throw e;
            } catch (IllegalArgumentException e) {
                throw e;
            }
    
    
        }
    
        /**
         * 值遍历器
         */
        @Override
        public Iterator<E> iterator() {
            return queueMap.values().iterator();
        }
        /**
         * 大小
         */
        @Override
        public int size() {
            synchronized(tailIndex){
                synchronized(headIndex){
                    return (int)(tailIndex.get()-headIndex.get());
                }
            }
        }
    
        /**
         * 插入值
         */
        @Override
        public boolean offer(E e) {
            synchronized(tailIndex){
                queueMap.put(tailIndex.getAndIncrement(), e);// 从尾部插入
                //将数据不保存在缓冲区,直接存入磁盘
                dbEnv.sync();
            }
            return true;
        }
    
        /**
         * 获取值,从头部获取
         */
        @Override
        public E peek() {
            synchronized(headIndex){
                if(peekItem!=null){
                    return peekItem;
                }
                E headItem=null;
                while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围
                    headItem=queueMap.get(headIndex.get());
                    if(headItem!=null){
                        peekItem=headItem;
                        continue;
                    }
                    headIndex.incrementAndGet();    // 头部指针后移
                }
                return headItem;
            }
        }
    
        /**
         * 移出元素,移出头部元素
         */
        @Override
        public E poll() {
            synchronized(headIndex){
                E headItem=peek();
                if(headItem!=null){
                    queueMap.remove(headIndex.getAndIncrement());
                    //从磁盘上移除
                    dbEnv.sync();
                    peekItem=null;
                    return headItem;
                }
            }
            return null;
        }
    
        /**
         * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
         */
        public void close(){
            try {
                if(queueDb!=null){
                    queueDb.sync();
                    queueDb.close();
                }
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (UnsupportedOperationException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        /**
         * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
         */
        @Override
        public void clear() {
            try {
                close();
                if(dbEnv!=null&&queueDb!=null){
                    dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);
                    dbEnv.close();
                }
            } catch (DatabaseNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                try {
                    if(this.dbDir!=null){
                        FileUtils.deleteDirectory(new File(this.dbDir));
                    }
    
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    
    import java.io.File;
    
    import com.sleepycat.bind.serial.StoredClassCatalog;
    import com.sleepycat.je.Database;
    import com.sleepycat.je.DatabaseConfig;
    import com.sleepycat.je.DatabaseException;
    import com.sleepycat.je.Environment;
    import com.sleepycat.je.EnvironmentConfig;
    /**
     * BDB数据库环境,可以缓存StoredClassCatalog并共享
     *
     * @contributor 
     */
    public class BdbEnvironment extends Environment {
        StoredClassCatalog classCatalog;
        Database classCatalogDB;
    
        /**
         * Constructor
         *
         * @param envHome 数据库环境目录
         * @param envConfig config options  数据库换纪念馆配置
         * @throws DatabaseException
         */
        public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
            super(envHome, envConfig);
        }
    
        /**
         * 返回StoredClassCatalog
         * @return the cached class catalog
         */
        public StoredClassCatalog getClassCatalog() {
            if(classCatalog == null) {
                DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setAllowCreate(true);
                try {
                    //事务、数据库名、配置项
                    classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
                    classCatalog = new StoredClassCatalog(classCatalogDB);
                } catch (DatabaseException e) {
                    // TODO Auto-generated catch block
                    throw new RuntimeException(e);
                }
            }
            return classCatalog;
        }
    
        @Override
        public synchronized void close() throws DatabaseException {
            if(classCatalogDB!=null) {
                classCatalogDB.close();
            }
            super.close();
        }
    
    }

    客户端--doccloudweb

    1.将上面客户端的DocJob、DocJobType、JobDeamonService、JobStatus类复制到客户端

    2.将DocController中接着添加

    //上传成功以后需要提交文档转换任务
    //转换成html,
    submitDocJob(docEntity,new Random().nextInt());
    /**
     * 提交任务到集群上运行--文档转换任务
     * @param docEntity
     * @param userId
     */
    private void submitDocJob(Doc docEntity, int userId) throws IOException {
        //创建一个文档转换任务对象
        DocJob docJob = new DocJob();
        //1.设置提交者
        docJob.setUserId(userId);
        //2.设置任务名
        docJob.setName("doc convent");
        //3.任务的状态
        docJob.setJobStatus(JobStatus.SUBMIT);
        //4.设置任务类型
        docJob.setJobType(DocJobType.DOC_JOB_CONVERT);
        //5.设置提交时间
        docJob.setSubmitTime(System.nanoTime());
        //6.设置输入路径
        docJob.setInput(docEntity.getDocDir()+"/"+docEntity.getDocName());
        //7.设置输出路径
        docJob.setOutput(docEntity.getDocDir());
        //8.设置重试次数
        docJob.setRetryTime(4);
        //9.设置文件名
        docJob.setFileName(docEntity.getDocName());
        //todo 将job元数据保存到数据库
        //获取动态代理对象
        JobDaemonService jobDaemonService = RPC.getProxy(JobDaemonService.class, 1L, new InetSocketAddress("localhost", 7788), new Configuration());
        //提交任务到服务器(hdfs上)
        log.info("submit job:{}",docJob);
        jobDaemonService.submitDocJob(docJob);
    
    }

    将上传到hdfs上的文件下载到本地,将下载的文件转化为HTML(通过runtime调用exec来执行命令)并保存到本地(客户端提交任务到服务器)通过hadoop IPC来接受任务。将任务保存在序列化队列中,1.保证任务不丢失 2.并发控制,内存溢出

    ---------------------------<待更>-------------------------


     


     


     


     


     


     


     


     


     


     

     

  • 相关阅读:
    堆和栈的区别
    熟悉熟悉常用的几个算法用JS的实现
    JS设置CSS样式的几种方式
    javascript的基本语法、数据结构
    DOM的概念及子节点类型
    三列自适应布局
    Javascript中括号“[]”的多义性
    Javascript中大括号“{}”的多义性
    Javascript小括号“()”的多义性
    JavaScript 开发者经常忽略或误用的七个基础知识点
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305583.html
Copyright © 2011-2022 走看看