本博客介绍一种AOP、无侵入的akka监控方案,方便大家在生产使用akka的过程中对akka进行监控。
对于自身javaer来说,AOP三个字母基本就解释清楚了akka监控框架的原理。哈哈哈,不过我这里还是啰嗦一点,把相关的方案和框架介绍一下。
无侵入监控方案,不管是对akka还是其他java应用来说,经常听到动态代理、静态代理、AOP这些词汇。AOP为Aspect Oriented Programming的缩写,意为:面向切面编程。使用Spring的童鞋一定听过这个词。虽然AOP一般用在spring上,但用在我们的akka监控来说也是非常合适的。静态代理、动态代理只不过是AOP的一种实现方式,就是用统一的代理类来拦截特定对象的执行过程,并加以监控。但这有一个比较严重的缺陷,至少在akka的监控中是这样的,那就是代理类会影响目标类的类型,比如通过classOf获取到的是代理类的类型。字节码注入是另一种AOP的实现方式,它是通过编译期或加载期修改目标类的字节码,来进行监控的。简单来说就是修改了Java的class文件,从而在不修改源码的情况下,修改编译结果。字节码注入有很多框架,但我们这里选择了ASPECTJ,这个工具可以通过注解的形式定义注入点,非常方便。
好了,基本的知识点就讲解清楚了,下面直接上代码及其框架。
监控框架工程命名为akkamon,意为akka的监控,哈哈。分为四个模块:
- akkamon-core。核心模块,定义基本类型,主要用来定义注入点函数,就是aspectJ中的pointCut。
- akkamon-injector。注入器。定义并实现ActorSystem的一个AkkaInjectorExtension,将注入点拦截到的信息以消息的形式发送给特定的actor
- akkamon-java-shell。pointCut实现模块。因为aspectj-maven-plugin对scala支持不太友好,此处用Java实现对pointCut的具体实现。
- akkamon-target。测试模块
下面我们分模块详细讲解各个类的定义及其意义。首先是akkamon-core
trait Instrumentation
Instrumentation意为仪器、仪表,代表一组注入函数集合。
trait ActorCellInstrumentation extends Instrumentation { def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef ):Unit def beforeActorTerminate(cell:ActorCell):Unit }
比如ActorCellInstrumentation代表ActorCell的监控仪表,它定义了两个函数,分别是ActorCell创建前和销毁前。
trait ActorSystemInjectPoint extends ListenerInstrumentation with ActorCellInstrumentation with ActorSystemInstrumentation with MessageInstrumentation
ActorSystemInjectPoint代表ActorSystem的所有注入点,它主要是所有Instrumentation的一个汇总。
class ActorSystemInjectPointImpl extends ActorSystemInjectPoint
ActorSystemInjectPoint有一个实现类,它来实现ActorSystemInjectPoint,主要是把拦截到的消息通知给listener,我们来看看其中一个函数的实现。
override def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = { val message = notifyMessage(ActorCellCreation(system.name,ref,props,parent)) log.debug(s"beforeActorCellCreation $message") }
很明显在拦截到ActorCell创建的时候,会构建ActorCellCreation这个case class,然后通过notifyMessage函数把消息通知给listener。
notifyMessage是ActorSystemInstrumentationListeners的一个方法,ActorSystemInstrumentationListeners维护了ActorSystemInstrumentationListener列表,在notifyMessae内部,依次调用所有listener的onMessage函数。
abstract class ActorSystemInstrumentationListener{ def onMessage(message:InjectMessage):Unit def close():Unit }
那么到这里,可能会有读者问,ActorSystemInstrumentationListener是什么时候注册到ActorSystemInstrumentationListeners里面的呢?答案就是监控ActorSystemInstrumentationListener构造函数的调用,然后把它register到listeners列表的。那Instrumentation定义的那些函数又是啥时候调用的呢?答案就在akkamon-java-shell模块。
@Aspect public class InstrumentationJavaShell implements ActorSystemInjectPoint
这个模块只有一个InstrumentationJavaShell类,它继承了ActorSystemInjectPoint,所以它会实现你所有定义的注入点,并把相应的函数调用转发给ActorSystemInjectPointImpl
private static ActorSystemInjectPointImpl actorSystemInstrumentation = new ActorSystemInjectPointImpl();
ActorSystemInjectPointImpl是InstrumentationJavaShell里面的一个静态类。
@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, *, parent)") public void actorCellCreation(ActorSystem system , ActorRef ref,Props props , ActorRef parent){} @Override @Before("actorCellCreation(system,ref,props,parent)") public void beforeActorCellCreation(ActorSystem system , ActorRef ref, Props props , ActorRef parent) { actorSystemInstrumentation.beforeActorCellCreation(system,ref,props,parent); }
上面两个函数分别定义了一个Pointcut和一个advice,Pointcut是对akka.actor.ActorCell.new的注入,Before这个advice是在akka.actor.ActorCell.new执行之前调用,可以看到beforeActorCellCreation就是简单的把相关的参数传给actorSystemInstrumentation.beforeActorCellCreation,而beforeActorCellCreation里面是通知各个listener。
下面是listener的注册过程。
@Pointcut("execution(com.gabry.akkamon.listener.ActorSystemInstrumentationListener.new(..)) && this(listener)") public void listenerCreation(ActorSystemInstrumentationListener listener) { } @Override @After("listenerCreation(listener)") public void afterListenerCreation(ActorSystemInstrumentationListener listener) { actorSystemInstrumentation.afterListenerCreation(listener); }
listener的注册过程其实也是通过AOP来实现的。
class ActorListener(actorRef:ActorRef) extends ActorSystemInstrumentationListener{ override def onMessage(message: InjectMessage): Unit = { actorRef ! message } override def close(): Unit = { } }
上面是一个listener的实现,其实就是简单的把消息通知给了相关的actor。
至此,一个akka的监控框架就实现了,是不是很简单呢?框架很简单,麻烦的是Instrumentation的定义和实现啊。
读者问怎么执行?那就是java命令行的-javaagent参数啊。