这周在项目中遇到了一个错误,就是Circuit Breaker time out。以前没有接触过,因此学习了下akka的断路器。
一、为什么使用Circuit Breaker
断路器是为了防止分布式系统中的级联故障,从而保障其稳定性。其应该与远程系统之间接口的明智超时结合使用,以防止单个组件故障导致所有组件关闭。
例如,我们有一个与第三方远程Web服务交互的应用程序。假设请求超出了第三方的容量,他们的数据库在负载下不能正常工作了。假设数据库以这样的方式失败,即将错误交还给第三方Web服务需要很长时间。这反过来使得请求在很长一段时间后失败。回到我们的Web应用程序,用户已经注意到他们的表单提交需要更长时间才能挂起。而用户一般会不断的使用刷新按钮,为他们已经运行的请求添加更多请求。这最终导致由于资源耗尽而导致Web应用程序失败。这将影响所有用户,即使那些未依赖于此第三方Web服务功能的用户也是如此。
因此,在Web服务调用中引入断路器将导致请求开始快速失败,让用户知道出现问题并且他们不需要刷新他们的请求。这也将故障行为限制为仅使用依赖于第三方功能的用户,其他用户不再受资源耗尽而受影响。
我们都知道在springcloud中hystrix。同样的在akka中也提供了自己的熔断措施,即akka.pattern.CircuitBreaker
二、断路器的几种状态
在正常操作期间,断路器处于闭合状态:
(1)超出配置的callTimeout的异常或调用会增加失败计数器
(2)成功将故障计数重置为零
(3)当故障计数器达到maxFailures计数时,断路器跳闸到打开状态
处于打开状态时:
(1)所有呼叫都是快速失败的 CircuitBreakerOpenException
(2)在配置的resetTimeout之后,断路器进入半开状态
在半开状态时:
(1)允许第一次尝试通过而不会快速失败
(2)所有其他调用失败快速,异常与打开状态一样
(3)如果第一次调用成功,则断路器将重置为Closed状态并重置resetTimeout
(4)如果第一次呼叫失败,则断路器再次跳闸到开路状态(对于指数退避断路器,resetTimeout乘以指数退避因子)
其各状态之间之间的转换如下图所示:
三、例子
class DangerousActor extends Actor with ActorLogging {
import context.dispatcher
val breaker =
new CircuitBreaker(
context.system.scheduler,
//最大失败次数设置为5
maxFailures = 5,
//超时时间设置为5s
callTimeout = 10.seconds,
//重置超时为1分钟
resetTimeout = 1.minute).onOpen(notifyMeOnOpen())
def notifyMeOnOpen(): Unit =
log.warning("My CircuitBreaker is now open, and will not close for one minute")
//#circuit-breaker-initialization
//#circuit-breaker-usage
def dangerousCall: String = "This really isn't that dangerous of a call after all"
def receive = {
case "is my middle name" ⇒
breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender()
case "block for me" ⇒
sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
}
//#circuit-breaker-usage
}
在该示例中,我们使用了withCircuitBreaker
一个异步方法(该方法返回一个Future),例如从数据库中检索数据的调用,然后将结果传回给发送方。如果由于某种原因,此示例中的数据库没有响应,或者存在另一个问题,则断路器将打开并停止尝试一次又一次地击中数据库,直到超时结束。