zoukankan      html  css  js  c++  java
  • Akka-Cluster(6)- Cluster-Sharding:集群分片,分布式交互程序核心方式

    在前面几篇讨论里我们介绍了在集群环境里的一些编程模式、分布式数据结构及具体实现方式。到目前为止,我们已经实现了把程序任务分配给处于很多服务器上的actor,能够最大程度的利用整体系统的硬件资源。这是因为通过akka-cluster能够把很多服务器组合成一个虚拟的整体系统,编程人员不需要知道负责运算的actor具体在那台服务器上运行。当然,我所指的整体系统是一种分布式的系统,实质底层还是各集群节点作为完整个体独立运行的,所以核心理念还是需要将程序分割成能独立运算的任务,然后分派给可能分布在很多服务器上的actor去运算。在上一篇的cluster-load-balance里我们采用了一种fire-and-forget模式把多项独立任务分配给集群节点上的actor,然后任由它们各自完成运算,中途不做任何交互、控制。这也是一种典型的无内部状态的运算模式。对外界来讲就是开始、完成,中间没有关于运算进展或当前状态的交流需要。但在现实里,很多任务是无法完全进行独立细分的,或者再细分会影响系统效率。比如网上购物网站每个客户的购物车:它记录了客户在网上的所有商品拣选过程,每一个拣选动作都代表更新的购物车状态,直到完成结算。那么在一个可能有几十万用户同时在线购物的网站,保留在内存的购物车状态应该是任何机器都无法容纳的,只有回到传统的数据库模式了,还是要面对无法解决的多并发系统效率问题。这么分析,集群分片技术可能是最好的解决方法了。

    简单讲:集群分片技术就是把一堆带唯一标识identifier的actor,即entity分布到集群节点上去。控制程序可以通过唯一ID与entityr进行交互,控制整个运算过程。这样,我们可以把程序分成相对合理的包含多个过程状态的细分任务。这些细分任务是由分布在集群节点上的entity来运算的,产生的状态当然也使用的是各集群节点上的资源,如此解决上面所提到的内存容量问题。akka-cluster提供的actor位置透明化机制能在系统崩溃、增减集群节点时自动重新部署所有的actor以达到负责均衡。而用户通过固定的ID就能联络目标entity,无论它被转移到任何集群节点上。

    集群分片由分片管理ShardRegion和分片定位ShardCoordinator共同协作实现,目标是把消息正确传递给指定ID的entity。分片定位负责确定分片所在集群节点,分片管理则对每个集群节点上分片内的entity进行定位。ShardCoordinator是个cluster-singleton,而ShardRegion则必须部署在每个集群节点上。每个分片内的entity必须是一个类型的actor。发给entity的消息内部必须包含分片编号和entity ID。通过从消息中解析位置信息后由ShardCoordinator确定负责传递消息的ShardRegion,相关的ShardRegion按ID把消息发送至目标entity。

    每个节点上的ShardRegion是通过下面这个start函数构建的:

      /**
       * Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor
       * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
       * for this type can later be retrieved with the [[#shardRegion]] method.
       *
       * Some settings can be configured as described in the `akka.cluster.sharding` section
       * of the `reference.conf`.
       *
       * @param typeName the name of the entity type
       * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
       * @param settings configuration settings, see [[ClusterShardingSettings]]
       * @param extractEntityId partial function to extract the entity id and the message to send to the
       *   entity from the incoming message, if the partial function does not match the message will
       *   be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
       * @param extractShardId function to determine the shard id for an incoming message, only messages
       *   that passed the `extractEntityId` will be used
       * @param allocationStrategy possibility to use a custom shard allocation and
       *   rebalancing logic
       * @param handOffStopMessage the message that will be sent to entities when they are to be stopped
       *   for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
       * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
       */
      def start(
        typeName:           String,
        entityProps:        Props,
        settings:           ClusterShardingSettings,
        extractEntityId:    ShardRegion.ExtractEntityId,
        extractShardId:     ShardRegion.ExtractShardId,
        allocationStrategy: ShardAllocationStrategy,
        handOffStopMessage: Any): ActorRef = {...}

    这个函数登记了名称为typeName类型entity的分片。函数返回ActorRef,说明ShardRegion是在本节点上的一个actor。下面是调用示范:

         ClusterSharding(system).start(
            typeName = Counter.shardName,
            entityProps = Counter.props(),
            settings = ClusterShardingSettings(system),
            extractEntityId = Counter.idExtractor,
            extractShardId = Counter.shardResolver)
    ...
    
    object Counter {
    
      trait Command
      case object Increment extends Command
      case object Decrement extends Command
      case object Get extends Command
      case object Stop extends Command
    
      trait Event
      case class CounterChanged(delta: Int) extends Event
    
      // Sharding Name
      val shardName: String = "Counter"
    
      // outside world if he want to send message to sharding should use this message
      case class CounterMessage(id: Long, cmd: Command)
    
      // id extrator
      val idExtractor: ShardRegion.ExtractEntityId = {
        case CounterMessage(id, msg) => (id.toString, msg)
      }
     
      // shard resolver
      val shardResolver: ShardRegion.ExtractShardId = {
        case CounterMessage(id, msg) => (id % 12).toString
      }
    
      def props() = Props[Counter]
    
    }

    entityProps是ShardRegion用来重构entity的。typeName是用来查找ShardRegion的,如下:

    val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
    counterRegion ! Get(123)

    用"Counter"获得ShardRegion的ActorRef后所有本节点的消息都是通过这个ShardRegion actor来定位,转达。所以每个ShardRegion都必须具备消息目的地entity的分片编号及entityID的解析方法:extractShardId和extractEntityId。在有些情况下由于节点角色的关系在某个节点不部署任何entity,但本节点需要向其它节点的entity发送消息,这时需要构建一个中介ProxyOnlyShardRegion:

      /**
       * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
       * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
       * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
       * [[#shardRegion]] method.
       *
       * Some settings can be configured as described in the `akka.cluster.sharding` section
       * of the `reference.conf`.
       *
       * @param typeName the name of the entity type
       * @param role specifies that this entity type is located on cluster nodes with a specific role.
       *   If the role is not specified all nodes in the cluster are used.
       * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
       *   entity from the incoming message
       * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
       */
      def startProxy(
        typeName:         String,
        role:             Optional[String],
        messageExtractor: ShardRegion.MessageExtractor): ActorRef = {...}

    还有一个重要问题是如何弃用passivate entity,以释放占用资源。akka-cluster提供的方法是通过定义一个空转时间值idle-timeout,如果空转超出此时间段则可以进行passivate。下面是一段应用示范:两分钟空转就passivate entity

    class ABC extends Actor {
    ...
     // passivate the entity when no activity
      context.setReceiveTimeout(2.minutes)
    
    ...
    
    override def receive .....
    
      override def receiveCommand: Receive = {
        case Increment      ⇒ persist(CounterChanged(+1))(updateState)
        case Decrement      ⇒ persist(CounterChanged(-1))(updateState)
        case Get(_)         ⇒ sender() ! count
        case ReceiveTimeout ⇒ context.parent ! Passivate(stopMessage = Stop)
        case Stop           ⇒ context.stop(self)
      }
    /* 或者
      override def unhandled(msg: Any): Unit = msg match {
        case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)
        case _              => super.unhandled(msg)
      }
    */
    }

    又或者通过设定配置来实现自动的passivation:

    在配置文件中设定:akka.cluster.sharding.passivate-idle-entity-after = 120 s   // off to disable

    下面是官网提供的一个说明passivation-stop-message的示范代码:

    trait CounterCommand
    case object Increment extends CounterCommand
    final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
    
    case object Idle extends CounterCommand
    case object GoodByeCounter extends CounterCommand
    
    def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
      Behaviors.setup { ctx ⇒
    
        def become(value: Int): Behavior[CounterCommand] =
          Behaviors.receiveMessage[CounterCommand] {
            case Increment ⇒
              become(value + 1)
            case GetValue(replyTo) ⇒
              replyTo ! value
              Behaviors.same
            case Idle ⇒
              // after receive timeout
              shard ! ClusterSharding.Passivate(ctx.self)
              Behaviors.same
            case GoodByeCounter ⇒
              // the stopMessage, used for rebalance and passivate
              Behaviors.stopped
          }
    
        ctx.setReceiveTimeout(30.seconds, Idle)
        become(0)
      }
    }
    
    sharding.init(Entity(
      typeKey = TypeKey,
      createBehavior = ctx ⇒ counter2(ctx.shard, ctx.entityId))
      .withStopMessage(GoodByeCounter))

    实际上是向主管ShardRegion发送Passivation消息,并指定停止方式。

    还有必须注意的是如果使用BackoffSupervisor监控entity:必须使用Backoff.OnStop,因为persist异常会直接停掉entity。Backoff.OnStop策略会重构entity(BackoffSupervisedEntity),再启动。那么如果实施passivation时真的需要停止entity呢?我们可以如下操作:

        case "stop" =>
          context.stop(self)
          context.parent ! PoisonPill

    context.parent是BackoffSupervisor,需要同时停掉。

    下面我们就设计一个例子来示范集群分片应用。为了更贴近现实,在例子使用了event-sourcing,persistentActor等尚未完整介绍的技术和工具。我会在接着的讨论里介绍它们的原理和使用方式。这个例子模仿一个水果店收银业务:有三台pos机,顾客到任何pos机前录入商品、数量,然后结账。这个示范的主要目的是任何时间如果后端服务器出现故障,正在录入过程中的销售单状态都能得到完整恢复。

    我们先看看这个pos前端的源代码:

    import akka.actor._
    import akka.cluster._
    import akka.persistence._
    import akka.pattern._
    import scala.concurrent.duration._
    
    
    object POSTerminal {
      case class Fruit(code: String, name: String, price: Double)
      case class Item(fruit: Fruit, qty: Int)
    
      sealed trait Command {
      }
      case class Checkout(fruit: Fruit, qty: Int) extends Command
      case object ShowTotol extends Command
      case class PayCash(amount: Double) extends Command
      case object Shutdown extends Command
    
      sealed trait Event {}
      case class ItemScanned(fruit: Fruit, qty: Int) extends Event
      case object Paid extends Event
    
      case class Items(items: List[Item] = Nil) {
        def itemAdded(evt: Event): Items = evt match {
          case ItemScanned(fruit,qty) =>
            copy( Item(fruit,qty) :: items )   //append item
    
          case _ => this     //nothing happens
        }
        def billPaid = copy(Nil)     //clear all items
        override def toString = items.reverse.toString()
      }
    
      def termProps = Props(new POSTerminal())
    
      //backoff suppervisor  must use onStop mode
      def POSProps: Props = {
        val options = Backoff.onStop(
          childProps = termProps,
          childName = "posterm",
          minBackoff = 1 second,
          maxBackoff = 5 seconds,
          randomFactor = 0.20
        )
        BackoffSupervisor.props(options)
      }
    
    }
    
    class POSTerminal extends PersistentActor with ActorLogging {
      import POSTerminal._
      val cluster = Cluster(context.system)
    
      // self.path.parent.name is the type name (utf-8 URL-encoded)
      // self.path.name is the entry identifier (utf-8 URL-encoded)  but entity has a supervisor
      override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
    
      var currentItems = Items()
    
    
      override def receiveRecover: Receive = {
        case evt: Event => currentItems = currentItems.itemAdded(evt)
          log.info(s"*****  ${persistenceId} recovering events ...  ********")
        case SnapshotOffer(_,loggedItems: Items) =>
          log.info(s"*****  ${persistenceId} recovering snapshot ...  ********")
          currentItems = loggedItems
      }
    
      override def receiveCommand: Receive = {
        case Checkout(fruit,qty) =>
          log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
          persist(ItemScanned(fruit,qty))(evt =>  currentItems = currentItems.itemAdded(evt))
    
        case ShowTotol =>
          log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
          if (currentItems.items == Nil)
            log.info(s"**********${persistenceId} None transaction found! *********")
          else
            currentItems.items.reverse.foreach (item =>
              log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))
    
        case PayCash(amt) =>
          log.info(s"**********${persistenceId} paying $amt to settle ***********")
          persist(Paid) { _ =>
            currentItems = currentItems.billPaid
            saveSnapshot(currentItems)     //no recovery
          }
    
        //shutdown this node to validate entity relocation and proper state recovery
        case Shutdown =>
          log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
          cluster.leave(cluster.selfAddress)
      }
    }

    我用下面几项来总结一下:

    1、POSTerminal是具体的业务运算前端,包裹在BackoffSupervisor里。能保证这个entity在因异常如持久化失败造成停顿时能进行重试。所以,使用了Backoff.onStop方式。

    2、persistenceId=self.path.parent.parent.name+"-"+self.path.parent.name 代表: 店号-机号 如: 1-1021。actor.path.name的产生是由ShardRegion具体操作的,其实就是ExtactShardId-ExtractEntityId。

    3、注意这个状态类型Item,它的方法itemAdded(evt): Item 即返回新状态。所以必须谨记用currentItems=itemAdded(evt)这样的语法。

    下面是构建和启动ClusterSharding的源代码:

    object POSShard {
     import POSTerminal._
    
     val shardName = "POSManager"
     case class POSCommand(id: Long, cmd: Command) {
       def shopId = id.toString.head.toString
       def posId = id.toString
     }
    
     val getPOSId: ShardRegion.ExtractEntityId =  {
       case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
     }
     val getShopId: ShardRegion.ExtractShardId = {
       case posCommand: POSCommand => posCommand.shopId
     }
    
    
     def create(port: Int) = {
       val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
         .withFallback(ConfigFactory.load())
       val system = ActorSystem("posSystem",config)
    
       ClusterSharding(system).start(
         typeName = shardName,
         entityProps = POSProps,
         settings = ClusterShardingSettings(system),
         extractEntityId = getPOSId,
         extractShardId = getShopId
       )
     }
    
    }

    用下面的代码来测试:

    object POSDemo extends App {
      POSShard.create(2551)
      Thread.sleep(1000)
      POSShard.create(2552)
      POSShard.create(2553)
      val posref = POSShard.create(2554)
      scala.io.StdIn.readLine()
    
      val apple = Fruit("0001","high grade apple",10.5)
      val orange = Fruit("0002","sunkist orage",12.0)
      val grape = Fruit("0003","xinjiang red grape",15.8)
    
    
      posref ! POSCommand(1021, Checkout(apple,2))
      posref ! POSCommand(1021,Checkout(grape,1))
    
      posref ! POSCommand(1021,ShowTotol)
      scala.io.StdIn.readLine()
    
      posref ! POSCommand(1021,Shutdown)
      scala.io.StdIn.readLine()
    
      posref ! POSCommand(1021,Checkout(orange,10))
    
    
      posref ! POSCommand(1021,ShowTotol)
      scala.io.StdIn.readLine()
    
      posref ! POSCommand(1028,Checkout(orange,10))
    
      posref ! POSCommand(1028,ShowTotol)
      scala.io.StdIn.readLine()
    
    
    }

    运算结果如下:

    [akka.tcp://posSystem@127.0.0.1:2551*********1-1021 is scanning item: Fruit(0001,high grade apple,10.5), qty: 2 *********
    [akka.tcp://posSystem@127.0.0.1:2551*********1-1021 is scanning item: Fruit(0003,xinjiang red grape,15.8), qty: 1 *********
    [akka.tcp://posSystem@127.0.0.1:2551*********1-1021 on akka.tcp://posSystem@127.0.0.1:2551 has current scanned items: *********
    [akka.tcp://posSystem@127.0.0.1:2551*********1-1021: high grade apple 10.5 X 2 = 21.0 *********
    [akka.tcp://posSystem@127.0.0.1:2551*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 *********
    
    [akka.tcp://posSystem@127.0.0.1:2551******** node akka.tcp://posSystem@127.0.0.1:2551 is leaving cluster ... *******
    [akka.tcp://posSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.
    
    [akka.tcp://posSystem@127.0.0.1:2552*****  1-1021 recovering events ...  ********
    [akka.tcp://posSystem@127.0.0.1:2552*****  1-1021 recovering events ...  ********
    [akka.tcp://posSystem@127.0.0.1:2552********1-1021 is scanning item: Fruit(0002,sunkist orage,12.0), qty: 10 *********
    [akka.tcp://posSystem@127.0.0.1:2552*********1-1021 on akka.tcp://posSystem@127.0.0.1:2552 has current scanned items: *********
    [akka.tcp://posSystem@127.0.0.1:2552*********1-1021: high grade apple 10.5 X 2 = 21.0 *********
    [akka.tcp://posSystem@127.0.0.1:2552*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 *********
    [akka.tcp://posSystem@127.0.0.1:2552*********1-1021: sunkist orage 12.0 X 10 = 120.0 *********

    从结果显示看到:一开始1-1021是在2551节点上运行的。我们用Shutdown关停2551后ClusterSharding立即在2552上重构了1-1021并且恢复了之前的状态。能够在系统出现故障无法使用的情况下自动对运行中的actor进行迁移、状态恢复,正是我们这次讨论的核心内容。

    下面是本次示范的源代码:

    build.sbt

    name := "akka-cluster-sharding"
    
    version := "0.2"
    
    scalaVersion := "2.12.8"
    
    libraryDependencies := Seq(
      "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
      "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
      "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.92",
      "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.92" % Test
    )

    resources/application.conf

    akka.actor.warn-about-java-serializer-usage = off
    akka.log-dead-letters-during-shutdown = off
    akka.log-dead-letters = off
    
    akka {
      loglevel = INFO
      actor {
        provider = "cluster"
      }
    
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://posSystem@127.0.0.1:2551"]
        log-info = off
      }
    
      persistence {
        journal.plugin = "cassandra-journal"
        snapshot-store.plugin = "cassandra-snapshot-store"
      }
    
    }

    Entities.scala

    import akka.actor._
    import akka.cluster._
    import akka.persistence._
    import akka.pattern._
    import scala.concurrent.duration._
    
    
    object POSTerminal {
      case class Fruit(code: String, name: String, price: Double)
      case class Item(fruit: Fruit, qty: Int)
    
      sealed trait Command {
      }
      case class Checkout(fruit: Fruit, qty: Int) extends Command
      case object ShowTotol extends Command
      case class PayCash(amount: Double) extends Command
      case object Shutdown extends Command
    
      sealed trait Event {}
      case class ItemScanned(fruit: Fruit, qty: Int) extends Event
      case object Paid extends Event
    
      case class Items(items: List[Item] = Nil) {
        def itemAdded(evt: Event): Items = evt match {
          case ItemScanned(fruit,qty) =>
            copy( Item(fruit,qty) :: items )   //append item
    
          case _ => this     //nothing happens
        }
        def billPaid = copy(Nil)     //clear all items
        override def toString = items.reverse.toString()
      }
    
      def termProps = Props(new POSTerminal())
    
      //backoff suppervisor  must use onStop mode
      def POSProps: Props = {
        val options = Backoff.onStop(
          childProps = termProps,
          childName = "posterm",
          minBackoff = 1 second,
          maxBackoff = 5 seconds,
          randomFactor = 0.20
        )
        BackoffSupervisor.props(options)
      }
    
    }
    
    class POSTerminal extends PersistentActor with ActorLogging {
      import POSTerminal._
      val cluster = Cluster(context.system)
    
      // self.path.parent.name is the type name (utf-8 URL-encoded)
      // self.path.name is the entry identifier (utf-8 URL-encoded)  but entity has a supervisor
      override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
    
      var currentItems = Items()
    
    
      override def receiveRecover: Receive = {
        case evt: Event => currentItems = currentItems.itemAdded(evt)
          log.info(s"*****  ${persistenceId} recovering events ...  ********")
        case SnapshotOffer(_,loggedItems: Items) =>
          log.info(s"*****  ${persistenceId} recovering snapshot ...  ********")
          currentItems = loggedItems
      }
    
      override def receiveCommand: Receive = {
        case Checkout(fruit,qty) =>
          log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
          persist(ItemScanned(fruit,qty))(evt =>  currentItems = currentItems.itemAdded(evt))
    
        case ShowTotol =>
          log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
          if (currentItems.items == Nil)
            log.info(s"**********${persistenceId} None transaction found! *********")
          else
            currentItems.items.reverse.foreach (item =>
              log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))
    
        case PayCash(amt) =>
          log.info(s"**********${persistenceId} paying $amt to settle ***********")
          persist(Paid) { _ =>
            currentItems = currentItems.billPaid
            saveSnapshot(currentItems)     //no recovery
          }
    
        //shutdown this node to validate entity relocation and proper state recovery
        case Shutdown =>
          log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
          cluster.leave(cluster.selfAddress)
      }
    }

    Shards.scala

    import akka.actor._
    import akka.cluster.sharding._
    import com.typesafe.config.ConfigFactory
    
    object POSShard {
     import POSTerminal._
    
     val shardName = "POSManager"
     case class POSCommand(id: Long, cmd: Command) {
       def shopId = id.toString.head.toString
       def posId = id.toString
     }
    
     val getPOSId: ShardRegion.ExtractEntityId =  {
       case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
     }
     val getShopId: ShardRegion.ExtractShardId = {
       case posCommand: POSCommand => posCommand.shopId
     }
    
    
     def create(port: Int) = {
       val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
         .withFallback(ConfigFactory.load())
       val system = ActorSystem("posSystem",config)
    
       ClusterSharding(system).start(
         typeName = shardName,
         entityProps = POSProps,
         settings = ClusterShardingSettings(system),
         extractEntityId = getPOSId,
         extractShardId = getShopId
       )
     }
    
    }

    POSDemo.scala

    import POSTerminal._
    import POSShard._
    
    object POSDemo extends App {
      POSShard.create(2551)
      Thread.sleep(1000)
      POSShard.create(2552)
      POSShard.create(2553)
      val posref = POSShard.create(2554)
      scala.io.StdIn.readLine()
    
      val apple = Fruit("0001","high grade apple",10.5)
      val orange = Fruit("0002","sunkist orage",12.0)
      val grape = Fruit("0003","xinjiang red grape",15.8)
    
    
      posref ! POSCommand(1021, Checkout(apple,2))
      posref ! POSCommand(1021,Checkout(grape,1))
    
      posref ! POSCommand(1021,ShowTotol)
      scala.io.StdIn.readLine()
    
      posref ! POSCommand(1021,Shutdown)
      scala.io.StdIn.readLine()
    
      posref ! POSCommand(1021,Checkout(orange,10))
    
    
      posref ! POSCommand(1021,ShowTotol)
      scala.io.StdIn.readLine()
    
      posref ! POSCommand(1028,Checkout(orange,10))
    
      posref ! POSCommand(1028,ShowTotol)
      scala.io.StdIn.readLine()
    
    
    }
  • 相关阅读:
    vue学习(十四) 条件搜索框动态查询表中数据 数组的新方法
    vue学习(十三) 删除对象数组中的某个元素
    数据库管理
    PHP基础
    PHP基础之常量与变量
    Cobalt Strike简单使用
    phpstudy后门利用复现
    DNS劫持
    远程控制(远控Bin)
    php基础
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/10280399.html
Copyright © 2011-2022 走看看