zoukankan      html  css  js  c++  java
  • Spring与Akka的集成

    概述

           近年来随着Spark的火热,Spark本身使用的开发语言Scala、用到的分布式内存文件系统Tachyon(现已更名为Alluxio)以及基于Actor并发编程模型的Akka都引起了大家的注意。了解过Akka或者Actor的人应该知道,这的确是一个很不错的框架,按照Akka官网的描述——使用Akka使得构建强有力的并发与分布式应用将更加容易。由于历史原因,很多Web系统在开发分布式服务时首先会选择RMI(Remote Method Invoke ,远程方法调用)、RPC(Remote Procedure Call Protocol,远程过程调用)或者使用JMS(Java Messaging Service,Java消息服务)。

           但是使用RMI只能使用java语言,而且开发、执行效率都不高;RPC框架虽然可以通过匹配方法签名的方式比RMI更灵活,但是其存在调用超时、调用丢失等缺点;JMS方式虽然可以通过At Least Delivery Once、消息持久化等机制保证消息不会丢失,但是只能作为一种跨服务的生产者、消费者编程模型使用。Akka不但处理了以上问题,而且还可以使用Actor作为并发编程模型,减少java多线程编程的阻塞、调度、上下文开销甚至死锁等问题。此外,Akka还提供了集群Sharding、流处理等功能的支持,更易于实现有限状态自动机等功能。所以有心的开发者势必会关心如何在最常见的Java系统中使用它,如何与Spring集成?

           本文参考Akka官方使用文档,根据自身的经验和理解,提供Akka与Spring集成的方案。本文不说明Spring框架的具体使用,并从Spring已经配置完备的情况开始叙述。

    Actor系统——ActorSystem

           什么是ActorSystem?根据Akka官网的描述——ActorSystem是一个重量级的结构体,可以用于分配1到N个线程,所以每个应用都需要创建一个ActorSystem。通常而言,使用以下代码来创建ActorSystem。

    ActorSystem system = ActorSystem.create("Hello");

    不过对于接入Spring而言,由IOC(Inversion of Control,控制反转)方式会更接地气,你可以这样:

        <!-- AKKA System Setup -->
        <bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton">
            <constructor-arg value="helloAkkaSystem"/>
        </bean>

    然后在你需要的地方依赖注入即可。

    Actor编程模型

           有关Actor编程模型的具体介绍可以看我的另一篇博文——《Spark如何使用Akka实现进程、节点通信的简明介绍》,里面有更多的介绍。需要补充的是,在最新的Scala官方网站上已经决定废弃Scala自身的Actor编程模型,转而全面拥抱Akka提供的Actor编程模型。

           我们可以通过以下代码(代码片段借用了Akka官网的例子)创建一个简单的Actor例子。

           Greeter是代表问候者的Actor:

    public class Greeter extends UntypedActor {
    
      public static enum Msg {
        GREET, DONE;
      }
    
      @Override
      public void onReceive(Object msg) {
        if (msg == Msg.GREET) {
          System.out.println("Hello World!");
          getSender().tell(Msg.DONE, getSelf());
        } else
          unhandled(msg);
      }
    
    }

    一般情况下我们的Actor都需要继承自UntypedActor,并实现其onReceive方法。onReceive用于接收消息,你可以在其中实现对消息的匹配并做不同的处理。

    HelloWorld是用于向Greeter发送问候消息的访客:

    public class HelloWorld extends UntypedActor {
    
      @Override
      public void preStart() {
        // create the greeter actor
        final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter");
        // tell it to perform the greeting
        greeter.tell(Greeter.Msg.GREET, getSelf());
      }
    
      @Override
      public void onReceive(Object msg) {
        if (msg == Greeter.Msg.DONE) {
          // when the greeter is done, stop this actor and with it the application
          getContext().stop(getSelf());
        } else
          unhandled(msg);
      }
    }

    有了Actor之后,我们可以这样使用它:

    ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");

           在HelloWorld的preStart实现中,获取了Greeter的ActorRef(Actor的引用)并向Greeter发送了问候的消息,Greeter收到问候消息后,会先打印Hello World!,然后向HelloWorld回复完成的消息,HelloWorld得知Greeter完成了向世界问好这个伟大的任务后,就结束了自己的生命。HelloWorld的例子用编程API的方式告诉了我们如何使用Actor及发送、接收消息。为了便于描述与Spring的集成,下面再介绍一个例子。

           CountingActor(代码主体借用自Akka官网)是用于计数的Actor,见代码清单1所示。

    代码清单1

    @Named("CountingActor")
    @Scope("prototype")
    public class CountingActor extends UntypedActor {
    
        public static class Count {
        }
    
        public static class Get {
        }
    
        // the service that will be automatically injected
        @Resource
        private CountingService countingService;
    
        private int count = 0;
    
        @Override
        public void onReceive(Object message) throws Exception {
            if (message instanceof Count) {
                count = countingService.increment(count);
            } else if (message instanceof Get) {
                getSender().tell(count, getSelf());
            } else {
                unhandled(message);
            }
        }
    }
    CountingActor用于接收Count消息进行计数,接收Get消息回复给发送者当前的计数值。CountingService是用于计数的接口,其定义如下:
    public interface CountingService {
        
        /**
         * 计数
         * @param count
         * @return
         */
        int increment(int count);
    
    }
    CountingService的具体实现是CountingServiceImpl,其实现如下:
    @Service("countingService")
    public class CountingServiceImpl implements CountingService {
    
        private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);
    
        /*
         * (non-Javadoc)
         * 
         * @see com.elong.sentosa.metadata.service.CountingService#increment(int)
         */
        @Override
        public int increment(int count) {
            logger.info("increase " + count + "by 1.");
            return count + 1;
        }
    
    }
    CountingActor通过注解方式注入了CountingService,CountingActor的计数实际是由CountingService完成。
            细心的同学可能发现了CountingActor使用了注解Named,这里为什么没有使用@Service或者@Component等注解呢?由于Akka的Actor在初始化的时候必须使用System或者Context的工厂方法actorOf创建新的Actor实例,不能使用构造器来初始化,而使用Spring的Service或者Component注解,会导致使用构造器初始化Actor,所以会抛出以下异常:
    akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.

    如果我们不能使用@Service或者@Component,也不能使用XML配置的方式使用(与注解一个道理),那么我们如何使用CountingActor提供的服务呢?

    IndirectActorProducer接口

            IndirectActorProducer是Akka提供的Actor生成接口,从其名字我们知道Akka给我们指出了另一条道路——石头大了绕着走!通过实现IndirectActorProducer接口我们可以定制一些Actor的生成方式,与Spring集成可以这样实现它,见代码清单2所示。

    代码清单2

    public class SpringActorProducer implements IndirectActorProducer {
        private final ApplicationContext applicationContext;
        private final String actorBeanName;
        private final Object[] args;
    
        public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
            this.applicationContext = applicationContext;
            this.actorBeanName = actorBeanName;
            this.args = args;
        }
    
        public Actor produce() {
            return (Actor) applicationContext.getBean(actorBeanName, args);
        }
    
        public Class<? extends Actor> actorClass() {
            return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
        }
    }

    SpringActorProducer的实现主要借鉴了Akka官方文档,我这里对其作了一些扩展以便于支持构造器带有多个参数的情况。从其实现看到实际是利用了ApplicationContext提供的getBean方式实例化Actor。
           这里还有两个问题:一、ApplicationContext如何获取和设置?二、如何使用SpringActorProducer生成Spring需要的Actor实例?

           对于第一个问题,我们可以通过封装SpringActorProducer并实现ApplicationContextAware接口的方式获取ApplicationContext;对于第二个问题,我们知道Akka中的所有Actor实例都是以Props作为配置参数开始的,这里以SpringActorProducer为代理生成我们需要的Actor的Props。

           SpringExt实现了以上思路,见代码清单3所示。

    代码清单3

    @Component("springExt")
    public class SpringExt implements Extension, ApplicationContextAware {
    
        private ApplicationContext applicationContext;
    
        /**
         * Create a Props for the specified actorBeanName using the
         * SpringActorProducer class.
         *
         * @param actorBeanName
         *            The name of the actor bean to create Props for
         * @return a Props that will create the named actor bean using Spring
         */
        public Props props(String actorBeanName, Object ... args) {
            return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
        }
    
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

    应用例子

            经过了以上的铺垫,现在你可以使用创建好的CountingActor了,首先你需要在你的业务类中注入ActorSystem和SpringExt。

            @Autowired
        private ActorSystem actorSystem;
    
        @Autowired
        private SpringExt springExt;

    然后我们使用CountingActor进行计数,代码如下:

        ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");
    
        // Create the "actor-in-a-box"
            final Inbox inbox = Inbox.create(system);
            
        // tell it to count three times
            inbox.send(counter, new Count());
            inbox.send(counter, new Count());
            inbox.send(counter, new Count());
    
        // print the result
        FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
        Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
        try {
            System.out.println("Got back " + Await.result(result, duration));
        } catch (Exception e) {
            System.err.println("Failed getting result: " + e.getMessage());
            throw e;
        }

    输出结果为:

    Got back 3

    总结

           本文只是最简单的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等机制都可以应用。

    后记:经过近一年的准备,《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
     
    售卖链接如下:
  • 相关阅读:
    cmanformat
    mysql-sql语言参考
    jQuery 判断多个 input checkbox 中至少有一个勾选
    Java实现 蓝桥杯 算法提高 计算行列式
    Java实现 蓝桥杯 数独游戏
    Java实现 蓝桥杯 数独游戏
    Java实现 蓝桥杯 数独游戏
    Java实现 蓝桥杯 算法提高 成绩排序2
    Java实现 蓝桥杯 算法提高 成绩排序2
    Java实现 蓝桥杯 算法提高 成绩排序2
  • 原文地址:https://www.cnblogs.com/jiaan-geng/p/8581312.html
Copyright © 2011-2022 走看看