zoukankan      html  css  js  c++  java
  • Run a task only once in (akka) cluster

    在stackOverflow网站上看到这一提问,下文是部分摘抄问题简述:

    Java cluster, run task only once

    We have a java process, which listen's to a directory X on the file system using apache commons vfs. Whenever a new file is exported to this directory, our process kicks in. We first rename the file to filename.processing and parse the file name, get some information from the file and insert into tables, before sending this file to a Document management system. This is a single-threaded application per cluster. Now consider this running in a cluster environment, we have 5 server's. So 5 different VM's are trying to get access of the same file. The whole implementation was on the basis that only one process can rename the file to .processing at a given time, as OS will not allow multiple processes modifying the file at the same time. Once a cluster get's holds and renames file to .processing, other cluster's will ignore files which are of format .processing.

    上述问题简而言之,就是假设软件里有一个监听某个文件变化以及命名服务,这在单机部署时一点问题都没有,但是当多机集群时,如果不加额外逻辑限制,那么每个集群节点都会去执行多个监听以及命名服务,对于文件系统,这是非常危险的。
    还有比如,软件里存在一个定时发送email的服务,单机部署,那么接收方最多收到一份email。当多机集群部署时,如果不去额外限制,接收方应当接收到多个email。
    问题是这个额外的逻辑限制应当怎么样去设计呢?
    解决方案当然不止一种。

    Via a shared database

    第一种你可以利用数据库,建立job表。
    找出需要执行的job,然后去执行它,大致逻辑类似如此。

    SELECT *
    FROM jobs
    WHERE state = 'NotRun'
      ORDER BY run_time ASC
    
    UPDATE jobs
    SET state = 'Running'
    WHERE job_id = :id
      AND state = 'NotRun'
    

    Via a cluster singleton

    借助集群单例即可。
    我们来看看akka-cluster中对cluster singleton的描述。

    For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.

    我们知道akka的架构是树状层级结构。就是说每一个节点都对应着一个path。当akka.actor.providerLocalProvider是,ActorPath是从/user/somePath开始。当集群时,也就是akka.actor.providerClusterProvider或者RemoteProvider时,ActorPath是由akka.tcp://systemName@hots:port/user/somePath组成。

    代码解析:

    system.actorOf(
      ClusterSingletonManager.props(
        singletonProps = Props(classOf[Consumer], queue, testActor),
        terminationMessage = End,
        settings = ClusterSingletonManagerSettings(system).withRole("worker")),
      name = "consumer")
      val proxy = system.actorOf(
      ClusterSingletonProxy.props(
        singletonManagerPath = "/user/consumer",
        settings = ClusterSingletonProxySettings(system).withRole("worker")),
      name = "consumerProxy")
    

    上述代码分为两步骤。第一步是创建一个集群单例对象,第二步创建单例对象的代理对象。
    那么为何需要这样设计呢?代理对象是逻辑地址,第一步创建的是单例的实际地址。

    以下代码利用单例实现只执行一次的语义。

    class OnlyExecuteOnce[A](exe: () => A) extends Actor {
      override def preStart(): Unit =
        exe()
    
      override def receive: Receive = {
        case _ => ()
      }
    }
    
    
    object OnlyExecuteOnce {
      // /user/onlyExecuteOnce1234
      // /user/onlyExecuteOnce1234Proxy
      def apply[A](id: String)(exe: () => A)(implicit system: ActorSystem) = {
        system.actorOf(
          ClusterSingletonManager.props(Props(classOf[OnlyExecuteOnce[A]], exe), PoisonPill, ClusterSingletonManagerSettings(system)),
          name = s"onlyExecuteOnce${id}")
        system.actorOf(ClusterSingletonProxy.props(s"/user/onlyExecuteOnce${id}", ClusterSingletonProxySettings(system)),
          name = s"onlyExecuteOnce${id}Proxy")
      }
    }
    

    参数id用于构建单例对象的path。

    知难行易
    原创博文,请勿转载
    我的又一个博客hangscer.win
  • 相关阅读:
    【LeetCode-动态规划】编辑代价
    【C++】使用istringstream分割字符串
    【LeetCode-字符串】简化路径
    【LeetCode-字符串】简化路径
    【LeetCode-并查集】朋友圈
    【LeetCode-背包】目标和
    【LeetCode-动态规划】分割等和子集
    The code of method _jspService(HttpServletRequest, HttpServletResponse) is exceeding the 65535 bytes
    【错误解决】本地计算机上的mysql服务启动停止后,某些服务在未由其他服务或程序使用时将自动停止
    MySQL解压版安装配置详解
  • 原文地址:https://www.cnblogs.com/hangscer/p/8097951.html
Copyright © 2011-2022 走看看