zoukankan      html  css  js  c++  java
  • Spring Cloud(六):Hystrix 监控数据聚合 Turbine【Finchley 版】

    Spring Cloud(六):Hystrix 监控数据聚合 Turbine【Finchley 版】

    上一篇我们介绍了使用 Hystrix Dashboard 来展示 Hystrix 用于熔断的各项度量指标。通过 Hystrix Dashboard,我们可以方便的查看服务实例的综合情况,比如:服务调用次数、服务调用延迟等。但是仅通过 Hystrix Dashboard 我们只能实现对服务当个实例的数据展现,在生产环境我们的服务是肯定需要做高可用的,那么对于多实例的情况,我们就需要将这些度量指标数据进行聚合。下面,在本篇中,我们就来介绍一下另外一个工具:Turbine。

    准备工作

    在开始使用 Turbine 之前,我们先回顾一下上一篇中实现的架构,如下图所示:

    其中,我们构建的内容包括:

    • eureka-server:服务注册中心
    • eureka-producer:服务提供者
    • eureka-consumer-hystrix:使用 Feign 和 Hystrix 实现的服务消费者
    • hystrix-dashboard:用于展示eureka-consumer-hystrix服务的 Hystrix 数据

    创建 Turbine

    下面,我们将在上述架构基础上,引入 Turbine 来对服务的 Hystrix 数据进行聚合展示。 这里我们将分别介绍两种聚合方式。

    通过 HTTP 收集聚合

    创建一个标准的 Spring Boot 工程,命名为:turbine。

    POM 配置

    在 pom.xml 中添加以下依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-turbine</artifactId>
    </dependency>

    启动类

    在启动类上使用@EnableTurbine注解开启 Turbine

    1
    2
    3
    4
    5
    6
    7
    8
    @EnableTurbine
    @SpringBootApplication
    public class TurbineApplication {

    public static void main(String[] args) {
    SpringApplication.run(TurbineApplication.class, args);
    }
    }

    配置文件

    在 application.yml 加入 Eureka 和 Turbine 的相关配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    spring:
    application:
    name: turbine
    server:
    port: 12010
    management:
    port: 12011
    eureka:
    client:
    service-url:
    defaultZone: http://localhost:10010/eureka/
    turbine:
    app-config: eureka-consumer-hystrix
    cluster-name-expression: new String("default")
    combine-host-port: true

    参数说明

    • turbine.app-config参数指定了需要收集监控信息的服务名;
    • turbine.cluster-name-expression 参数指定了集群名称为 default,当我们服务数量非常多的时候,可以启动多个 Turbine 服务来构建不同的聚合集群,而该参数可以用来区分这些不同的聚合集群,同时该参数值可以在 Hystrix 仪表盘中用来定位不同的聚合集群,只需要在 Hystrix Stream 的 URL 中通过 cluster 参数来指定;
    • turbine.combine-host-port参数设置为true,可以让同一主机上的服务通过主机名与端口号的组合来进行区分,默认情况下会以 host 来区分不同的服务,这会使得在本地调试的时候,本机上的不同服务聚合成一个服务来统计。

    注意:new String("default")这个一定要用 String 来包一下,否则启动的时候会抛出异常:

    1
    org.springframework.expression.spel.SpelEvaluationException: EL1008E: Property or field 'default' cannot be found on object of type 'com.netflix.appinfo.InstanceInfo' - maybe not public or not valid?

    测试

    在完成了上面的内容构建之后,我们来体验一下 Turbine 对集群的监控能力。分别启动

    • eureka-server
    • eureka-producer
    • eureka-consumer-hystrix
    • turbine
    • hystrix-dashboard

    访问 Hystrix Dashboard 并开启对 http://localhost:12010/turbine.stream 的监控,这时候,我们将看到针对服务 eureka-consumer-hystrix 的聚合监控数据。

    此时的架构如下图所示:

    通过消息代理收集聚合

    Spring Cloud 在封装 Turbine 的时候,还实现了基于消息代理的收集实现。所以,我们可以将所有需要收集的监控信息都输出到消息代理中,然后 Turbine 服务再从消息代理中异步的获取这些监控信息,最后将这些监控信息聚合并输出到 Hystrix Dashboard 中。通过引入消息代理,我们的 Turbine 和 Hystrix Dashoard 实现的监控架构可以改成如下图所示的结构:

    从图中我们可以看到,这里多了一个重要元素:RabbitMQ。对于 RabbitMQ 的安装与基本时候我们可以查看之前的 MQ 系列,这里不做过多的说明。下面,我们可以来构建一个新的应用来实现基于消息代理的 Turbine 聚合服务。

    关于通过 MQ 的聚合,在 Finchley.RC1 版本下有好多坑,好在最后能正常运行了。

    Turbine Stream

    创建一个标准的 Spring Boot 工程,命名为:turbine-amqp

    POM

    1
    2
    3
    4
    5
    6
    7
    8
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    配置文件

    1
    2
    3
    4
    5
    6
    7
    spring:
    application:
    name: turbine-amqp
    eureka:
    client:
    service-url:
    defaultZone: http://localhost:10010/eureka/

    启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @SpringBootApplication
    @EnableTurbineStream
    public class TurbineAmqpApplication {

    public static void main(String[] args) {
    SpringApplication.run(TurbineAmqpApplication.class, args);
    }

    @Bean
    public ConfigurableCompositeMessageConverter integrationArgumentResolverMessageConverter(CompositeMessageConverterFactory factory) {
    return new ConfigurableCompositeMessageConverter(factory.getMessageConverterForAllRegistered().getConverters());
    }

    }

    改造服务调用者

    以之前的 eureka-consumer-hystrix 项目为基础,在 pom.xml 里加入以下依赖

    1
    2
    3
    4
    5
    6
    7
    8
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-netflix-hystrix-stream</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    再在启动类上加上@EnableHystrix注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @EnableHystrix
    @EnableFeignClients
    @SpringBootApplication
    public class EurekaConsumerHystrixApplication {

    public static void main(String[] args) {
    SpringApplication.run(EurekaConsumerHystrixApplication.class, args);
    }

    }

    测试

    分别启动 eureka-consumer-hystrix、turbine-amqp 这两个项目,然后在 RabbitMQ 的管理后台可以看到,自动创建了一个 Exchange 和 Queue


    看到这还是挺高兴的,但是……

    当访问了一下 consumer 中的接口后,就开始了艰辛的爬坑路程……

    遇到的坑

    依赖

    这个 Turbine Stream 之前应该是叫 Turbine AMQP,当时有个 spring-cloud-starter-turbine-amqp 依赖可以用,里边包装了相关的依赖,但是现在它被 deprecated 了,让用 spring-cloud-starter-netflix-turbine-stream 来代替,这就得靠自己来组合了。而坑主要就出在这里,至于哪些是必须的,哪些是添加了后就出问题的,还有依赖冲突的问题,都得靠自己来搞了。

    JsonParseException

    Turbine Stream 从 RabbitMQ 取数据的时候抛出以下异常:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    com.fasterxml.jackson.core.JsonParseException: Unexpected character (',' (code 44)): Expected space separating root-level values
    at [Source: (String)"123,34,111,114,105,103,105,110,34,58,123,34,104,111,115,116,34,58,34,49,55,50,46,49,54,46,49,48,54,46,57,51,34,44,34,112,111,114,116,34,58,57,48,49,51,44,34,115,101,114,118,105,99,101,73,100,34,58,34,101,117,114,101,107,97,45,99,111,110,115,117,109,101,114,45,104,121,115,116,114,105,120,34,44,34,105,100,34,58,34,97,112,112,108,105,99,97,116,105,111,110,45,49,34,125,44,34,101,118,101,110,116,34,58,34,109,101,115,115,97,103,101,34,44,34,100,97,116,97,34,58,123,34,116,121,112,101,34,58,34,72,121,11"[truncated 6105 chars]; line: 1, column: 5]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1798) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:663) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:561) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:608) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._verifyRootSpace(ReaderBasedJsonParser.java:1654) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._parsePosNumber(ReaderBasedJsonParser.java:1297) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:744) ~[jackson-core-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4129) ~[jackson-databind-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3988) ~[jackson-databind-2.9.3.jar:2.9.3]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2992) ~[jackson-databind-2.9.3.jar:2.9.3]
    at org.springframework.cloud.netflix.turbine.stream.HystrixStreamAggregator.sendToSubject(HystrixStreamAggregator.java:73) ~[spring-cloud-netflix-turbine-stream-2.0.0.M8.jar:2.0.0.M8]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161]
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:120) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:111) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:54) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:384) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:89) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:116) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:371) [spring-expression-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:157) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:614) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:605) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:468) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:312) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:104) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) [spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) [spring-integration-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:59) [spring-integration-amqp-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:217) [spring-integration-amqp-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:214) [spring-integration-amqp-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:785) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:769) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1010) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

    实际的 json 串类似以下格式:

    1
    {"origin":{"host":"172.16.106.93","port":9013,"serviceId":"eureka-consumer-hystrix","id":"application-1"},"event":"message","data":{"type":"HystrixCommand","name":"eureka-consumer-hystrix.HelloRemote#hello(String)","group":"eureka-producer","currentTime":1523938311789,"isCircuitBreakerOpen":false,"errorPercentage":0,"errorCount":0,"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":0,"rollingCountFallbackFailure":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":0,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":0,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":1,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1}}

    上边这串字符串转为 byte[] 后就是

    1
    123,34,111,114,105,103,105,110,34,58,123,34,104,111,115,116,34,58,34,49,55,50,46,49,54,46,49,48,54,46,57,51,34,44,34,112,111,114,116,34,58,57,48,49,51,44,34,115,101,114,118,105,99,101,73,100,34,58,34,101,117,114,101,107,97,45,99,111,110,115,117,109,101,114,45,104,121,115,116,114,105,120,34,44,34,105,100,34,58,34,97,112,112,108,105,99,97,116,105,111,110,45,49,34,125,44,34,101,118,101,110,116,34,58,34,109,101,115,115,97,103,101,34,44,34,100,97,116,97,34,58,123,34,116,121,112,101,34,58,34,72,121,115,116,114,105,120,67,111,109,109,97,110,100,34,44,34,110,97,109,101,34,58,34,101,117,114,101,107,97,45,99,111,110,115,117,109,101,114,45,104,121,115,116,114,105,120,46,72,101,108,108,111,82,101,109,111,116,101,35,104,101,108,108,111,40,83,116,114,105,110,103,41,34,44,34,103,114,111,117,112,34,58,34,101,117,114,101,107,97,45,112,114,111,100,117,99,101,114,34,44,34,99,117,114,114,101,110,116,84,105,109,101,34,58,49,53,50,51,57,51,56,56,57,49,55,53,48,44,34,105,115,67,105,114,99,117,105,116,66,114,101,97,107,101,114,79,112,101,110,34,58,102,97,108,115,101,44,34,101,114,114,111,114,80,101,114,99,101,110,116,97,103,101,34,58,48,44,34,101,114,114,111,114,67,111,117,110,116,34,58,48,44,34,114,101,113,117,101,115,116,67,111,117,110,116,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,67,111,108,108,97,112,115,101,100,82,101,113,117,101,115,116,115,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,69,120,99,101,112,116,105,111,110,115,84,104,114,111,119,110,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,105,108,117,114,101,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,108,108,98,97,99,107,70,97,105,108,117,114,101,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,108,108,98,97,99,107,82,101,106,101,99,116,105,111,110,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,70,97,108,108,98,97,99,107,83,117,99,99,101,115,115,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,82,101,115,112,111,110,115,101,115,70,114,111,109,67,97,99,104,101,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,83,101,109,97,112,104,111,114,101,82,101,106,101,99,116,101,100,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,83,104,111,114,116,67,105,114,99,117,105,116,101,100,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,83,117,99,99,101,115,115,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,84,104,114,101,97,100,80,111,111,108,82,101,106,101,99,116,101,100,34,58,48,44,34,114,111,108,108,105,110,103,67,111,117,110,116,84,105,109,101,111,117,116,34,58,48,44,34,99,117,114,114,101,110,116,67,111,110,99,117,114,114,101,110,116,69,120,101,99,117,116,105,111,110,67,111,117,110,116,34,58,49,44,34,108,97,116,101,110,99,121,69,120,101,99,117,116,101,95,109,101,97,110,34,58,48,44,34,108,97,116,101,110,99,121,69,120,101,99,117,116,101,34,58,123,34,48,34,58,48,44,34,50,53,34,58,48,44,34,53,48,34,58,48,44,34,55,53,34,58,48,44,34,57,48,34,58,48,44,34,57,53,34,58,48,44,34,57,57,34,58,48,44,34,57,57,46,53,34,58,48,44,34,49,48,48,34,58,48,125,44,34,108,97,116,101,110,99,121,84,111,116,97,108,95,109,101,97,110,34,58,48,44,34,108,97,116,101,110,99,121,84,111,116,97,108,34,58,123,34,48,34,58,48,44,34,50,53,34,58,48,44,34,53,48,34,58,48,44,34,55,53,34,58,48,44,34,57,48,34,58,48,44,34,57,53,34,58,48,44,34,57,57,34,58,48,44,34,57,57,46,53,34,58,48,44,34,49,48,48,34,58,48,125,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,82,101,113,117,101,115,116,86,111,108,117,109,101,84,104,114,101,115,104,111,108,100,34,58,50,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,83,108,101,101,112,87,105,110,100,111,119,73,110,77,105,108,108,105,115,101,99,111,110,100,115,34,58,53,48,48,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,69,114,114,111,114,84,104,114,101,115,104,111,108,100,80,101,114,99,101,110,116,97,103,101,34,58,53,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,70,111,114,99,101,79,112,101,110,34,58,102,97,108,115,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,70,111,114,99,101,67,108,111,115,101,100,34,58,102,97,108,115,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,99,105,114,99,117,105,116,66,114,101,97,107,101,114,69,110,97,98,108,101,100,34,58,116,114,117,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,83,116,114,97,116,101,103,121,34,58,34,84,72,82,69,65,68,34,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,84,104,114,101,97,100,84,105,109,101,111,117,116,73,110,77,105,108,108,105,115,101,99,111,110,100,115,34,58,49,48,48,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,84,104,114,101,97,100,73,110,116,101,114,114,117,112,116,79,110,84,105,109,101,111,117,116,34,58,116,114,117,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,84,104,114,101,97,100,80,111,111,108,75,101,121,79,118,101,114,114,105,100,101,34,58,110,117,108,108,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,101,120,101,99,117,116,105,111,110,73,115,111,108,97,116,105,111,110,83,101,109,97,112,104,111,114,101,77,97,120,67,111,110,99,117,114,114,101,110,116,82,101,113,117,101,115,116,115,34,58,49,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,102,97,108,108,98,97,99,107,73,115,111,108,97,116,105,111,110,83,101,109,97,112,104,111,114,101,77,97,120,67,111,110,99,117,114,114,101,110,116,82,101,113,117,101,115,116,115,34,58,49,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,109,101,116,114,105,99,115,82,111,108,108,105,110,103,83,116,97,116,105,115,116,105,99,97,108,87,105,110,100,111,119,73,110,77,105,108,108,105,115,101,99,111,110,100,115,34,58,49,48,48,48,48,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,114,101,113,117,101,115,116,67,97,99,104,101,69,110,97,98,108,101,100,34,58,116,114,117,101,44,34,112,114,111,112,101,114,116,121,86,97,108,117,101,95,114,101,113,117,101,115,116,76,111,103,69,110,97,98,108,101,100,34,58,116,114,117,101,44,34,114,101,112,111,114,116,105,110,103,72,111,115,116,115,34,58,49,125,125

    从上边异常可以看出,这其实就是将这个 byte[] 转为 String 的时候出错了。

    在源码里找了好久,最后发现原来是启动的时候要初始化一个 ConfigurableCompositeMessageConverter,但是这个类默认的只提供以下 4 个MessageConverter

    • MappingJackson2MessageConverter
    • ByteArrayMessageConverter
    • ObjectStringMessageConverter
    • GenericMessageConverter

    这 4 个 Converter 处理 byte[] -> String 的时候都会出问题(上边的异常就是MappingJackson2MessageConverter这个抛出的)。说到这你可能会问了,byte[] 转 String 有这么难吗?不就一个 new String(bytes) 就解决了。我也这么想啊,就差自己动手写了。

    这时候发现了CompositeMessageConverterFactory,从名字上可以看出就是 MessageConverter的工厂类,似乎是一根救命稻草。它里边默认提供了 7 个 Converter,第一个就是ApplicationJsonMessageMarshallingConverter,看了里边的实现,这不正是我需要的嘛!

    1
    2
    3
    4
    5
    6
    if (message.getPayload() instanceof byte[] &&  targetClass.isAssignableFrom(String.class)) {
    result = new String((byte[])message.getPayload(), StandardCharsets.UTF_8);
    }
    else {
    result = super.convertFromInternal(message, targetClass, conversionHint);
    }

    所以就自己手动注入这个 bean

    1
    2
    3
    4
    @Bean
    public ConfigurableCompositeMessageConverter integrationArgumentResolverMessageConverter(CompositeMessageConverterFactory factory) {
    return new ConfigurableCompositeMessageConverter(factory.getMessageConverterForAllRegistered().getConverters());
    }

    其实这个 Bean 在ContentTypeConfiguration中已经声明了但没效果,我只是原封不动的 copy 出来。

    1
    2
    3
    4
    @Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
    public ConfigurableCompositeMessageConverter configurableCompositeMessageConverter(CompositeMessageConverterFactory factory){
    return new ConfigurableCompositeMessageConverter(factory.getMessageConverterForAllRegistered().getConverters());
    }

    Fatal Exception thrown on Scheduler.Worker thread

    这个的异常信息如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.AbstractMethodError
    at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77)
    at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704)
    at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1056)
    at io.netty.channel.AbstractChannel.write(AbstractChannel.java:290)
    at io.reactivex.netty.channel.DefaultChannelWriter.writeOnChannel(DefaultChannelWriter.java:165)
    at io.reactivex.netty.protocol.http.server.HttpServerResponse.writeOnChannel(HttpServerResponse.java:195)
    at io.reactivex.netty.channel.DefaultChannelWriter.write(DefaultChannelWriter.java:83)
    at io.reactivex.netty.channel.DefaultChannelWriter.writeAndFlush(DefaultChannelWriter.java:65)
    at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
    at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:120)
    at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublish.java:585)
    at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.java:283)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.internal.schedulers.SchedulePeriodicHelper$1.call(SchedulePeriodicHelper.java:72)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    ... 7 more

    同时在 Hystrix Dashboard 中 monitor 相应的地址的会提示 “Unable to connect to Command Metric Stream.” 并在 Console 里报错:

    1
    EventSource's response has a MIME type ("text/plain") that is not "text/event-stream". Aborting the connection.

    这个问题真不知道怎么搞了,先弃坑了,有空了再研究吧。

    2018-05-06 更新:
    这个问题可以详见 Github 上的 这个 issus 感谢 @MadeInChina

    为了本文的完整性,我这里也说明下这个问题的解决思路和解决办法。

    这个异常信息关键的一行是是第 22 行

    1
    at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)

    我们就打开 TurbineStreamConfiguration 看一下这 106 行到底写了点啥

    如果这时你点进去看 ServerSentEvent 这个类,问题其实就明了了(我当时就是困在了response.writeAndFlush这个方法上,根本没注意到 ServerSentEvent)

    这个类直接报错,因为没有完整实现 ByteBufHolder 里边的方法,从这你已经能看出来应该是相关依赖的问题了。我们这里注意一下这个类所在的 JAR 包 io.reactivex:rxnetty:0.4.9,然后再看一下它实现的这个接口

    这个接口的变更记录可以看这里。最后一次的修改是在 2016.5.17 添加了几个方法,也就是说 ByteBufHolder 在io.netty:netty-buffer:4.1.0.Final(2016.5.25)就已经被修改了,更不用说io.netty:netty-buffer:4.1.23.Final(2018.4.4)了。而io.reactivex:rxnetty:0.4.9是一个 2015.5.6 发布的 jar 包。

    那我们就用io.reactivex:rxnetty:0.4.20最新的版本来试试,pom.xml 里添加以下内容

    1
    2
    3
    4
    5
    6
    <dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxnetty</artifactId>
    <version>0.4.20</version>
    <scope>runtime</scope>
    </dependency>

    再次启动并测试

    1
    2
    $ curl http://localhost:8989
    curl: (52) Empty reply from server

    发现依旧报错,控制台里错误如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    2018-05-06 23:29:37.056  WARN 52477 --- [o-eventloop-3-2] i.n.c.AbstractChannelHandlerContext      : An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:

    java.lang.NoSuchMethodError: rx.internal.operators.NotificationLite.instance()Lrx/internal/operators/NotificationLite;
    at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:243) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:241) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject$State.<init>(UnicastContentSubject.java:197) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:132) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:122) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:117) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter$RequestState.createRxRequest(ServerRequestResponseConverter.java:176) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter$RequestState.access$100(ServerRequestResponseConverter.java:168) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:87) ~[rx-netty-0.3.18.jar:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-codec-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) [netty-codec-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241) [netty-handler-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59) [rx-netty-0.3.18.jar:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.23.Final.jar:4.1.23.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

    2018-05-06 23:29:37.057 ERROR 52477 --- [o-eventloop-3-2] i.r.netty.server.DefaultErrorHandler : Unexpected error in RxNetty.

    java.lang.NullPointerException: null
    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.invokeContentOnNext(ServerRequestResponseConverter.java:160) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:96) ~[rx-netty-0.3.18.jar:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-codec-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284) [netty-codec-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241) [netty-handler-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59) [rx-netty-0.3.18.jar:na]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) [netty-transport-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.23.Final.jar:4.1.23.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.23.Final.jar:4.1.23.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

    就仅看异常栈的第一行,你会发现 UnicastContentSubject 这个类在当前环境下有三个

    将三个都打开来看一下,会发现io.reactivex:rxnetty那两个看起来问题不大,而com.netflix.rxnetty:rx-netty这个直接又是报错,依旧是未完全实现接口。

    根据 DEBUG 的信息来看,而实际使用的就是com.netflix.rxnetty:rx-netty:0.3.18里边的这个,那就分析一下依赖吧,把这个残疾的老古董(2014.11.6)给排除掉

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    $ mvn dependency:tree -Dverbose -Dincludes=*:*netty*
    [INFO] Scanning for projects...
    [INFO]
    [INFO] ----------< com.windmt:spring-cloud-turbine-stream-rabbitmq >-----------
    [INFO] Building turbine-stream-rabbitmq 0.0.1-SNAPSHOT
    [INFO] --------------------------------[ jar ]---------------------------------
    [INFO]
    [INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli) @ spring-cloud-turbine-stream-rabbitmq ---
    [INFO] Verbose not supported since maven-dependency-plugin 3.0
    [INFO] com.windmt:spring-cloud-turbine-stream-rabbitmq:jar:0.0.1-SNAPSHOT
    [INFO] +- org.springframework.cloud:spring-cloud-starter-netflix-turbine-stream:jar:2.0.0.RC1:compile
    [INFO] | +- org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:jar:2.0.0.RC1:compile
    [INFO] | | - org.springframework.cloud:spring-cloud-starter-netflix-ribbon:jar:2.0.0.RC1:compile
    [INFO] | | - com.netflix.ribbon:ribbon:jar:2.2.5:compile
    [INFO] | | +- com.netflix.ribbon:ribbon-transport:jar:2.2.5:runtime
    [INFO] | | | +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
    [INFO] | | | - io.reactivex:rxnetty-servo:jar:0.4.9:runtime
    [INFO] | | - io.reactivex:rxnetty:jar:0.4.9:runtime
    [INFO] | - com.netflix.turbine:turbine-core:jar:2.0.0-DP.2:compile
    [INFO] | - com.netflix.rxnetty:rx-netty:jar:0.3.18:compile
    [INFO] | +- io.netty:netty-codec-http:jar:4.1.23.Final:compile
    [INFO] | | - io.netty:netty-codec:jar:4.1.23.Final:compile
    [INFO] | - io.netty:netty-transport-native-epoll:jar:4.1.23.Final:compile
    [INFO] | +- io.netty:netty-common:jar:4.1.23.Final:compile
    [INFO] | +- io.netty:netty-buffer:jar:4.1.23.Final:compile
    [INFO] | +- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile
    [INFO] | - io.netty:netty-transport:jar:4.1.23.Final:compile
    [INFO] | - io.netty:netty-resolver:jar:4.1.23.Final:compile
    [INFO] - org.springframework.cloud:spring-cloud-stream-binder-rabbit:jar:2.0.0.RELEASE:compile
    [INFO] - org.springframework.boot:spring-boot-starter-amqp:jar:2.0.1.RELEASE:compile
    [INFO] - org.springframework.amqp:spring-rabbit:jar:2.0.3.RELEASE:compile
    [INFO] - com.rabbitmq:http-client:jar:2.0.1.RELEASE:compile
    [INFO] - io.projectreactor.ipc:reactor-netty:jar:0.7.6.RELEASE:compile
    [INFO] +- io.netty:netty-handler:jar:4.1.23.Final:compile
    [INFO] - io.netty:netty-handler-proxy:jar:4.1.23.Final:compile
    [INFO] - io.netty:netty-codec-socks:jar:4.1.23.Final:compile
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------

    最终 pom.xml 里的依赖坐标如下(这也是能正常启动 Turbine Stream 的最小配置了,Spring Cloud 的版本为 Finchley.RC1):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
    <exclusions>
    <exclusion>
    <groupId>com.netflix.rxnetty</groupId>
    <artifactId>rx-netty</artifactId>
    </exclusion>
    <exclusion>
    <groupId>io.reactivex</groupId>
    <artifactId>rxnetty</artifactId>
    </exclusion>
    </exclusions>
    </dependency>

    <dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxnetty</artifactId>
    <version>0.4.20</version>
    <scope>runtime</scope>
    </dependency>

    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>

    再次测试就能正常收到 SSE 了

    1
    2
    3
    4
    5
    6
    $ curl http://localhost:8989
    event: message
    data: {"rollingCountFallbackFailure":0,"rollingCountFallbackSuccess":0,"propertyValue_circuitBreakerRequestVolumeThreshold":"20","propertyValue_circuitBreakerForceOpen":false,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":"10000","latencyTotal_mean":0,"type":"HystrixCommand","rollingCountResponsesFromCache":0,"TypeAndName":"TypeAndName=>HystrixCommand_consumer-feign-hystrix-stream.HelloRemote#hello(String)","rollingCountTimeout":0,"propertyValue_executionIsolationStrategy":"THREAD","instanceId":"application-1","rollingCountFailure":0,"rollingCountExceptionsThrown":0,"latencyExecute_mean":0,"isCircuitBreakerOpen":false,"errorCount":0,"group":"producer","rollingCountSemaphoreRejected":0,"latencyTotal":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"requestCount":0,"rollingCountCollapsedRequests":0,"rollingCountShortCircuited":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":0,"90":0,"95":0,"99":0,"99.5":0,"100":0},"propertyValue_circuitBreakerSleepWindowInMilliseconds":"5000","currentConcurrentExecutionCount":0,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":"10","errorPercentage":0,"rollingCountThreadPoolRejected":0,"propertyValue_circuitBreakerEnabled":true,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_requestCacheEnabled":true,"rollingCountFallbackRejection":0,"propertyValue_requestLogEnabled":true,"rollingCountSuccess":0,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":"10","InstanceKey":"InstanceKey=>application-1","propertyValue_circuitBreakerErrorThresholdPercentage":"50","propertyValue_circuitBreakerForceClosed":false,"name":"consumer-feign-hystrix-stream.HelloRemote#hello(String)","reportingHosts":1,"propertyValue_executionIsolationThreadPoolKeyOverride":"null","propertyValue_executionIsolationThreadTimeoutInMilliseconds":"1000"}

    event: message
    data: {"currentCorePoolSize":10,"currentLargestPoolSize":8,"currentActiveCount":0,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":"10000","currentMaximumPoolSize":10,"currentQueueSize":0,"type":"HystrixThreadPool","currentTaskCount":8,"TypeAndName":"TypeAndName=>HystrixThreadPool_producer","currentCompletedTaskCount":8,"rollingMaxActiveThreads":0,"instanceId":"application-1","InstanceKey":"InstanceKey=>application-1","name":"producer","reportingHosts":1,"currentPoolSize":8,"propertyValue_queueSizeRejectionThreshold":"5","rollingCountThreadsExecuted":0}

    控制台也不报错了

    1
    2
    3
    4
    2018-05-06 23:57:25.833  INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : SSE Request Received
    2018-05-06 23:57:25.843 INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : Starting aggregation
    2018-05-06 23:57:32.115 INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : Unsubscribing RxNetty server connection
    2018-05-06 23:57:32.116 INFO 53309 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : Unsubscribing aggregation.

    然后在 Hystrix Dashboard 的地址栏里输入 http://localhost:8989 就能看到了(/turbine.stream可加可不加,如果要修改端口号,在 Spring Boot 的配置文件中修改 turbine.stream.port

    相关阅读

    Spring Cloud(一):服务治理技术概览
    Spring Cloud(二):服务注册与发现 Eureka
    Spring Cloud(三):服务提供与调用 Eureka
    Spring Cloud(四):服务容错保护 Hystrix
    Spring Cloud(五):Hystrix 监控面板
    Spring Cloud(六):Hystrix 监控数据聚合 Turbine
    Spring Cloud(七):配置中心(Git 版与动态刷新)
    Spring Cloud(八):配置中心(服务化与高可用)
    Spring Cloud(九):配置中心(消息总线)
    Spring Cloud(十):服务网关 Zuul(路由)
    Spring Cloud(十一):服务网关 Zuul(过滤器)
    Spring Cloud(十二):分布式链路跟踪(Sleuth 与 Zipkin)

    示例代码:GitHub

    参考

    Spring Cloud 构建微服务架构:Hystrix 监控数据聚合【Dalston 版】
    springcloud(五):熔断监控 Hystrix Dashboard 和 Turbine
    Turbine AMQP aggregator fails to de-serialize messages from Hystrix stream
    Spring Cloud - Turbine Stream

  • 相关阅读:
    MySQL-基本sql命令
    Java for LeetCode 203 Remove Linked List Elements
    Java for LeetCode 202 Happy Number
    Java for LeetCode 201 Bitwise AND of Numbers Range
    Java for LeetCode 200 Number of Islands
    Java for LeetCode 199 Binary Tree Right Side View
    Java for LeetCode 198 House Robber
    Java for LeetCode 191 Number of 1 Bits
    Java for LeetCode 190 Reverse Bits
    Java for LeetCode 189 Rotate Array
  • 原文地址:https://www.cnblogs.com/chenweida/p/9025583.html
Copyright © 2011-2022 走看看