zoukankan      html  css  js  c++  java
  • 简易的分布式文件系统

    简易的分布式文件系统

    本来初期打算用Hadoop 2,可是后来有限的服务器部署了Solr Cloud,各种站点,发现资源不够了,近10T的文件,已经几乎把服务器的磁盘全部用光。想来想去,由于目前架构基于Scala的,所以还是用Scala Akka实现了一个简单版本的分布式文件系统。

     

    Scala版本是2.10.3:http://www.scala-lang.org,Akka版本是2.2.3:http://akka.io。

     

    所有文件随机放在不同的服务器上,在数据库中记录了文件存放的服务器IP地址、文件路径。在服务端部署基于Akka的简单文件服务,接收文件路径,读取并返回文件内容。调用者根据文件地址,去数据库中查找文件的服务IP地址和文件路径,根据得到的服务器IP地址,传入文件路径,调用该服务器的文件服务。

     

    以下是部分实现代码。

     

    1.文件服务参数

     

    复制代码
    1 case class PatentFulltextArgs(
    2   val url: String,
    3   val start: Int,
    4   val size: Int) {
    5 
    6 }
    复制代码

     

    2.文件服务Trait(有点像WCF中的服务契约)

     

    1 trait PatentFulltextService {
    2   def find(args: PatentFulltextArgs): Array[Byte]
    3 }

     

    3.文件服务实现

     

    复制代码
     1 class PatentFulltextServiceImpl extends PatentFulltextService with Disposable {
     2   def find(args: PatentFulltextArgs): Array[Byte] = {
     3     val list = ListBuffer[Byte]()
     4     val file = FileSystems.getDefault().getPath(args.url)
     5 
     6     using(Files.newInputStream(file)) { in =>
     7       {
     8         val bytes = new Array[Byte](args.size + 1)
     9         in.skip(args.start)
    10         in.read(bytes, 0, bytes.length)
    11 
    12         list ++= bytes
    13       }
    14     }
    15 
    16     list.toArray
    17   }
    18 }
    复制代码

     

    4.用户Akka Deploy发布的类

     

    复制代码
    class ServiceApplication extends Bootable {
      val system = ActorSystem("serivce", ConfigFactory.load.getConfig("service"))
      def startup() {
        TypedActor(system).typedActorOf(TypedProps[PatentFulltextServiceImpl], "patentfulltext")
      }
    
      def shutdown() {
        system.shutdown
      }
    }
    复制代码

     

    在这里,我使用的Akka的TypeActor,请参考:http://doc.akka.io/docs/akka/2.2.3/scala/typed-actors.html。

     

    以下是部署过程。

     

    把生成的jar包,发布在Akka的deploy目录下,根据需要修改Akka的配置文件目录config下的application.conf。以下是我配置的内容,仅供参考:

     

    actor {

     

    provider = "akka.remote.RemoteActorRefProvider"

     

     

    typed {

     

    # Default timeout for typed actor methods with non-void return type

     

    timeout = 6000s

     

    }

     

    }

     

    remote {

     

    transport = "akka.remote.netty.NettyRemoteTransport"

     

    netty.tcp {

     

      hostname = "服务端IP"

     

      port = 2552

     

    }

     

    客户端使用时只需要服务契约Trait和相关实体类,以下是我写的一个客户端调用的类,仅供参考:

     

    复制代码
     1 object RemoteService {
     2   val logger = LoggerFactory.getLogger(this.getClass())
     3   private var system: ActorSystem = null
     4 
     5   def apply(configFile: String) = {
     6     system = ActorSystem("RemoteService", ConfigFactory.parseFile(new File(configFile)))
     7   }
     8 
     9   def findPatentFulltext(serverIp: String, patentFulltextArgs: PatentFulltextArgs) = {
    10     TypedActor(system).typedActorOf(TypedProps[com.cloud.akka.service.model.PatentFulltextService], system.actorFor("akka.tcp://serivce@" + serverIp + ":2552/user/patentfulltext")).find(patentFulltextArgs)
    11 
    12   }
    13 
    14   def shutdown = {
    15     if (null != system) system.shutdown()
    16   }
    17 }}
    复制代码

     

    以下问题是我还没找到合适的解决办法:

     

    1.Akka无法传输大文件,即使修改配置,服务器可以返回,但是接收的客户端还会报错。我的解决方案是在客户端分块读取,然后合并。

     

    2.在客户端使用时,TypedActor没有找到使用ActorSelection构建,因为ActorFor是标记为Deprecated。

     

  • 相关阅读:
    beginAppearanceTransition
    runtime基础
    UIStoryboard跳转界面
    xcode所有版本下载地址
    UIImage添加滤镜
    苹果copy等其他的英文改成中文
    UITextField输入中文
    keyboard添加down按钮
    2020-07-08:mysql只有一个表a,什么情况下会造成死锁,解决办法是什么?
    2020-07-05:tcp和udp的区别和应用场景。如何实现断点续传?
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3465269.html
Copyright © 2011-2022 走看看