zoukankan      html  css  js  c++  java
  • Akka框架使用注意点

    1.mailbox

    Akka的每个actor默认有一个mailbox,按照FIFO顺序单线程处理。在抛出异常导致父actor根据设置的监管策略执行重启或恢复操作时,会从触发异常的消息的后续消息开始处理,邮箱并不会被清空。如果你想重新处理那个触发异常的消息,可以通过重写preRestart方法来访问该消息,java 中的preRestart参数为(Throwable reason, Option<Object> message),message.get()可以获得该消息(因为是从Option对象中get,所以可能为空),可以将该消息再次发给自己或做其它处理。

    默认邮箱的大小没有限制,也就是内存的上限。可以设置bounded邮箱来限定大小,还可以设置邮箱以文件形式持久存储。

    2.监管策略设置

      1)在actor类中重写supervisorStrategy()

      2)创建父actor时在Props参数中使用FromConfig.getInstance().withSupervisorStrategy(strategy).props(XXX)

    可以使用下面的类来方便设置:

    import akka.actor.AllForOneStrategy;
    import akka.actor.OneForOneStrategy;
    import akka.actor.SupervisorStrategy;
    import akka.japi.Function;
    import scala.concurrent.duration.Duration;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    import static akka.actor.SupervisorStrategy.escalate;
    
    /**
     * Created by fyk on 16-4-2.
     */
    public class StrategySetter {
        private Map<Class<? extends Throwable>, SupervisorStrategy.Directive> map;
        private boolean oneForOne;
        private int maxNrOfRetries=5;
        private Duration withinTimeRange=Duration.create(1, TimeUnit.MINUTES);//Duration.create("1 minute")
        public StrategySetter(boolean oneForOne) {
            this.oneForOne=oneForOne;
            map=new HashMap<Class<? extends Throwable>, SupervisorStrategy.Directive>();
        }
        public void setOptParam(int maxNrOfRetries,Duration withinTimeRange){
            this.maxNrOfRetries=maxNrOfRetries;
            this.withinTimeRange=withinTimeRange;
        }
        public void put(Class<? extends Throwable> t, SupervisorStrategy.Directive action){
            map.put(t,action);
        }
        /**
         * 设定监管策略并返回
         * cls.isInstance(yourObject)
         * instead of using the instanceof operator, which can only be used if you know the class at compile time.
         */
        public SupervisorStrategy getSupervisorStrategy(){
            SupervisorStrategy strategy=null;
            if(oneForOne){
                strategy=new OneForOneStrategy(maxNrOfRetries, withinTimeRange,
                        new Function<Throwable, SupervisorStrategy.Directive>() {
                            @Override
                            public SupervisorStrategy.Directive apply(Throwable t) {
                                for(Class c:map.keySet()){
                                    if(c.isInstance(t)) return map.get(c);
                                }
                                return escalate();//提交给上一级监管
                            }
                        });
            }else{
                strategy=new AllForOneStrategy(maxNrOfRetries, withinTimeRange,
                        new Function<Throwable, SupervisorStrategy.Directive>() {
                            @Override
                            public SupervisorStrategy.Directive apply(Throwable t) {
                                for(Class c:map.keySet()){
                                    if(c.isInstance(t)) return map.get(c);
                                }
                                return escalate();//提交给上一级监管
                            }
                        });
            }
    
            return strategy;
        }
    }
    View Code

     注意在进行某一个actor的重启时会调用postStop、构造函数与preStart、preRestart等,在重写父类的方法时记得在第一句调用父类的方法(会对子actor进行一些操作)。如果你在actor中创建了子actor,重启时也会重启子actor,如果在重写preStart中没有调用父类的preStart会导致子actor重复创建,由于akka不能创建同名的actor,会抛出name not unique的异常信息。

    3.actor Monitor

    监管策略中指定了时间区间内重启或恢复等操作的上限,达到指定出错频率后actor被停止,以后再也不运转了。

    也许你想要监视actor的生命状态,当它发现有actor停止时进行一些操作,如发邮件通知你,或简单粗暴的重新创建运行。

    生命周期monitor:

    /**
     * Created on 16-4-10.
     * 此生命周期monitor与设置了监管策略的supervisor都可以对远程actor进行监督
     */
    public class MonitorActor extends UntypedActor {
        Logger log = LoggerFactory.getLogger(MonitorActor.class);
        Map<ActorRef, ActorRef> monitoredActors =//<worker,supervisor>
                new HashMap<ActorRef, ActorRef>();
        @Override
        public void onReceive(Object message) throws Exception {
            if (message instanceof Terminated) {
                final Terminated t = (Terminated) message;
                if (monitoredActors.containsKey(t.getActor())) {
                    ActorPath path=t.getActor().path();
                    log.info("Received Worker Actor Termination Message ->{}", path);
                    log.info("Sending message to Supervisor");
                    monitoredActors.get(t.getActor()).tell(
                            new DeadWorker(path),self());
                }
            } else if (message instanceof RegisterWorker) {
                RegisterWorker msg = (RegisterWorker) message;
                //下面这句是关键的注册语句,当被观察的actor结束时,本actor会收到akka.actor.Terminated消息
                getContext().watch(msg.getWorker());
                monitoredActors.put(msg.getWorker(), msg.getSupervisor());
            } else {
                unhandled(message);
            }
        }
    }

    使用:在worker actor初始化时向monitor发送RegisterWorker消息,包含对actor本身以及supervisor(一般为父actor)的引用。

    4.默认管理策略

    • ActorInitializationException will stop the failing child actor
    • ActorKilledException will stop the failing child actor
    • Exception will restart the failing child actor
    • Other types of Throwable will be escalated to parent actor

    如果想将自己的部分管理策略与默认管理策略结合,可以在一些异常处理中使用super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)

    注意,当actor被重启时并不会发送Terminated消息给monitor,context.stop(actor)才会

     5.Akka-remote注意事项

    Akka在接收到消息时根据邮箱地址也就是actor的地址来递送消息,本地的地址形式如/user/parent/childActor,远程的akka.tcp://RemoteSystem@host:port/user/xxx。寻址按照字符串匹配的形式,所以hostname与ip地址不能乱写,即使是指向相同的ip地址。

    在向远程actor主动发送消息时需要使用ActorSelection的tell方法。而在接收到远程actor发来的消息后回复时使用sender.tell不一定能够发送成功,并且也没有发送到DeadLetter。最好使用ActorSelection(sender.path())来发送。

    x.continue...

  • 相关阅读:
    c++关于析构的那点小事(个人吐槽向
    我分析内存泄漏的一道作业题,已解决
    大一上学期的一点小疑惑,代码验证ok
    C++类型转换
    c++形参改变实参(对指针的理解
    c++整型->字符型转换
    [转]二重积分换元法的一种简单证明 (ps:里面的符号有点小错误,理解就好。。。
    c++实现矩阵类矩阵行列式,伴随矩阵,逆矩阵
    假期周计划2.0
    大道至简
  • 原文地址:https://www.cnblogs.com/makefile/p/5348600.html
Copyright © 2011-2022 走看看