zoukankan      html  css  js  c++  java
  • skywalking为啥要因人kafka消息队里的原因

    最近开始研究skywalking准备应用到系统中,在测试环境测试的时候遇到一个现象:在oapserver刚启动的时候,trace数据上传一切正常,大概几分钟之后就不再有SegmentTrace数据上传了。

    ES 实例只有一台 分了8G内存
    service实例数:30+

    观察oapserver日志,
    在oapserver刚启动的时候报了下面这个错误,看代码之后猜测是因为elasticsearch还没有连接上导致的

    2019-07-25 15:34:51,516 - org.apache.skywalking.oap.server.core.register.worker.RegisterRemoteWorker - 49 [DataCarrier.REGISTER_L1.BulkConsumePool.0.Thread] ERROR [] - Index: 0, Size: 0
    java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
            at java.util.LinkedList.checkElementIndex(LinkedList.java:555) ~[?:1.8.0_111]
            at java.util.LinkedList.get(LinkedList.java:476) ~[?:1.8.0_111]
            at org.apache.skywalking.oap.server.core.remote.selector.ForeverFirstSelector.select(ForeverFirstSelector.java:37) ~[server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.remote.RemoteSenderService.send(RemoteSenderService.java:58) ~[server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterRemoteWorker.in(RegisterRemoteWorker.java:47) ~[server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterRemoteWorker.in(RegisterRemoteWorker.java:32) ~[server-core-6.2.0.jar:6.2.0]
            at java.util.HashMap$Values.forEach(HashMap.java:980) [?:1.8.0_111]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterDistinctWorker.onWork(RegisterDistinctWorker.java:77) [server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterDistinctWorker.access$100(RegisterDistinctWorker.java:34) [server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterDistinctWorker$AggregatorConsumer.consume(RegisterDistinctWorker.java:104) [server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.consume(MultipleChannelsConsumer.java:80) [apm-datacarrier-6.2.0.jar:6.2.0]
            at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.run(MultipleChannelsConsumer.java:49) [apm-datacarrier-6.2.0.jar:6.2.0]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在SegmentTrace中断数据上传之前报了几个调用ES的错误日志如下:

    2019-07-25 15:37:48,084 - org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker - 113 [DataCarrier.REGISTER_L2.BulkConsumePool.0.Thread] ERROR [] - Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of processing of [356918][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[lpt_skywalking__service_instance_inventory][0]] containing [index {[lpt_skywalking__service_instance_inventory][type][5_e6372b0d8acc461886f43d36e9f10dbf_0_0], source[{"sequence":12,"heartbeat_time":1564040275951,"service_id":5,"address_id":0,"name":"enrollment-service-pid:27836@spring-boot-servicei","is_address":0,"instance_uuid":"e6372b0d8acc461886f43d36e9f10dbf","register_time":1564040123195,"properties":"{"os_name":"Linux","host_name":"spring-boot-servicei","process_no":"27836","language":"java","ipv4s":"[\"10.10.10.111\"]"}"}]}] and a refresh, target allocation id: uNxjird6TG6JolbwhVpYHQ, primary term: 1 on EsThreadPoolExecutor[name = LgVAr7j/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@76ce58ba[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 19571]]]
    org.elasticsearch.ElasticsearchStatusException: Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of processing of [356918][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[lpt_skywalking__service_instance_inventory][0]] containing [index {[lpt_skywalking__service_instance_inventory][type][5_e6372b0d8acc461886f43d36e9f10dbf_0_0], source[{"sequence":12,"heartbeat_time":1564040275951,"service_id":5,"address_id":0,"name":"enrollment-service-pid:27836@spring-boot-servicei","is_address":0,"instance_uuid":"e6372b0d8acc461886f43d36e9f10dbf","register_time":1564040123195,"properties":"{"os_name":"Linux","host_name":"spring-boot-servicei","process_no":"27836","language":"java","ipv4s":"[\"10.10.10.111\"]"}"}]}] and a refresh, target allocation id: uNxjird6TG6JolbwhVpYHQ, primary term: 1 on EsThreadPoolExecutor[name = LgVAr7j/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@76ce58ba[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 19571]]]
            at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177) ~[elasticsearch-6.3.2.jar:6.3.2]
            at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
            at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:628) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
            at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:535) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
            at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:508) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
            at org.elasticsearch.client.RestHighLevelClient.update(RestHighLevelClient.java:366) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
            at org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient.forceUpdate(ElasticSearchClient.java:262) ~[library-client-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RegisterEsDAO.forceUpdate(RegisterEsDAO.java:56) ~[storage-elasticsearch-plugin-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker.lambda$onWork$0(RegisterPersistentWorker.java:90) ~[server-core-6.2.0.jar:6.2.0]
            at java.util.HashMap$Values.forEach(HashMap.java:980) [?:1.8.0_111]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker.onWork(RegisterPersistentWorker.java:85) [server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker.access$100(RegisterPersistentWorker.java:36) [server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker$PersistentConsumer.consume(RegisterPersistentWorker.java:142) [server-core-6.2.0.jar:6.2.0]
            at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.consume(MultipleChannelsConsumer.java:80) [apm-datacarrier-6.2.0.jar:6.2.0]
            at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.run(MultipleChannelsConsumer.java:49) [apm-datacarrier-6.2.0.jar:6.2.0]
            Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://10.10.10.251:3012], URI [/lpt_skywalking__service_instance_inventory/type/5_e6372b0d8acc461886f43d36e9f10dbf_0_0/_update?refresh=true&timeout=1m], status line [HTTP/1.1 429 Too Many Requests]
    {"error":{"root_cause":[{"type":"remote_transport_exception","reason":"[LgVAr7j][10.0.23.34:9300][indices:data/write/update[s]]"}],"type":"es_rejected_execution_exception","reason":"rejected execution of processing of [356918][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[lpt_skywalking__service_instance_inventory][0]] containing [index {[lpt_skywalking__service_instance_inventory][type][5_e6372b0d8acc461886f43d36e9f10dbf_0_0], source[{"sequence":12,"heartbeat_time":1564040275951,"service_id":5,"address_id":0,"name":"enrollment-service-pid:27836@spring-boot-servicei","is_address":0,"instance_uuid":"e6372b0d8acc461886f43d36e9f10dbf","register_time":1564040123195,"properties":"{\"os_name\":\"Linux\",\"host_name\":\"spring-boot-servicei\",\"process_no\":\"27836\",\"language\":\"java\",\"ipv4s\":\"[\\\"10.10.10.111\\\"]\"}"}]}] and a refresh, target allocation id: uNxjird6TG6JolbwhVpYHQ, primary term: 1 on EsThreadPoolExecutor[name = LgVAr7j/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@76ce58ba[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 19571]]"},"status":429}
                    at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:705) ~[elasticsearch-rest-client-6.3.2.jar:6.3.2]
                    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[elasticsearch-rest-client-6.3.2.jar:6.3.2]
                    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:198) ~[elasticsearch-rest-client-6.3.2.jar:6.3.2]
                    at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:522) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
                    at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:508) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
                    at org.elasticsearch.client.RestHighLevelClient.update(RestHighLevelClient.java:366) ~[elasticsearch-rest-high-level-client-6.3.2.jar:6.3.2]
                    at org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient.forceUpdate(ElasticSearchClient.java:262) ~[library-client-6.2.0.jar:6.2.0]
                    at org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RegisterEsDAO.forceUpdate(RegisterEsDAO.java:56) ~[storage-elasticsearch-plugin-6.2.0.jar:6.2.0]
                    at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker.lambda$onWork$0(RegisterPersistentWorker.java:90) ~[server-core-6.2.0.jar:6.2.0]
                    at java.util.HashMap$Values.forEach(HashMap.java:980) [?:1.8.0_111]
                    at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker.onWork(RegisterPersistentWorker.java:85) [server-core-6.2.0.jar:6.2.0]
                    at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker.access$100(RegisterPersistentWorker.java:36) [server-core-6.2.0.jar:6.2.0]
                    at org.apache.skywalking.oap.server.core.register.worker.RegisterPersistentWorker$PersistentConsumer.consume(RegisterPersistentWorker.java:142) [server-core-6.2.0.jar:6.2.0]
                    at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.consume(MultipleChannelsConsumer.java:80) [apm-datacarrier-6.2.0.jar:6.2.0]
                    at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.run(MultipleChannelsConsumer.java:49) [apm-datacarrier-6.2.0.jar:6.2.0]
            Caused by: org.elasticsearch.client.ResponseException: method [POST], host [http://10.10.10.251:3012], URI [/lpt_skywalking__service_instance_inventory/type/5_e6372b0d8acc461886f43d36e9f10dbf_0_0/_update?refresh=true&timeout=1m], status line 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    从日志上看并没有大的问题,顶多就是es压力有点儿大, 不至于以后所有的SegmentTrace都不能上传
    看SegmentTrace数据量,大概两分钟的时间了上传了2000多条记录,之所以这么多数据记录是因为有大量的consul心跳检测的http请求(好像是1s一条service心跳,我们的一个应用实例可能会注册多个Service)
    image.png
    此时观察elasticsearch,oapserver 以及agent的资源占用情况都正常,由于skywalking的agent与server端的通信是通过grpc通信的,查看agent的相关源码以及调试发现 agent端通过grpc stream的方式调用server端的时候会阻塞在这里等待server端返回,相关源码,可以看到如果server端不返回,客户端就不会再上传数据。
    image.png
    通过jstack看server端的线程情况,发现大量的grpc线程的阻塞在了Buffer的save中方法

    "grpc-default-executor-207" #279 daemon prio=5 os_prio=0 tid=0x00007f0174156000 nid=0x916 sleeping[0x00007f0138760000]
       java.lang.Thread.State: TIMED_WAITING (sleeping)
            at java.lang.Thread.sleep(Native Method)
            at org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer.save(Buffer.java:63)
            at org.apache.skywalking.apm.commons.datacarrier.buffer.Channels.save(Channels.java:54)
            at org.apache.skywalking.apm.commons.datacarrier.DataCarrier.produce(DataCarrier.java:90)
            at org.apache.skywalking.oap.server.core.analysis.worker.RecordPersistentWorker.in(RecordPersistentWorker.java:64)
            at org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor.in(RecordStreamProcessor.java:47)
            at org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher.dispatch(SegmentDispatcher.java:46)
            at org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher.dispatch(SegmentDispatcher.java:28)
            at org.apache.skywalking.oap.server.core.analysis.DispatcherManager.forward(DispatcherManager.java:57)
            at org.apache.skywalking.oap.server.core.source.SourceReceiverImpl.receive(SourceReceiverImpl.java:36)
            at org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener.build(SegmentSpanListener.java:135)
            at org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2$$Lambda$317/386780536.accept(Unknown Source)
            at java.lang.Iterable.forEach(Iterable.java:75)
            at org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2.notifyListenerToBuild(SegmentParseV2.java:207)
            at org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2.parse(SegmentParseV2.java:109)
            at org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2$Producer.send(SegmentParseV2.java:270)
            at org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler$1.onNext(TraceSegmentReportServiceHandler.java:58)
            at org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler$1.onNext(TraceSegmentReportServiceHandler.java:50)
            at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
            at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
            at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
            at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
            at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    
       Locked ownable synchronizers:
            - <0x00000000e69a2d90> (a java.util.concurrent.ThreadPoolExecutor$Worker)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    通过查看源码发现,是因为Server收到客户端上传的TraceSegment之后会放入Buffer中,如果Buffer满了就会while sleep(1ms) 阻塞在这里继续尝试放入Buffer中(默认的Buffer是Blocking策略)
    image.png
    TraceSegment的DataCarrier对应的Consumer会不断的从Buffer中拉取数据并写入到ES中,如果Buffer满了,很可能是Consumer出了问题,继续查看jstack中Consumer相关的代码,发现Consumer使用ES的BulkProcessor来批量的写入ES,Consumer都在等待internalAdd方法的锁

    "DataCarrier.METRICS_L2_AGGREGATION.BulkConsumePool.0.Thread" #14 daemon prio=5 os_prio=0 tid=0x00007f01e1abd000 nid=0x7bd2 waiting for monitor entry [0x00007f01c2762000]
       java.lang.Thread.State: BLOCKED (on object monitor)
            at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:286)
            - waiting to lock <0x00000000e17b6908> (a org.elasticsearch.action.bulk.BulkProcessor)
            at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
            at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
            at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
            at org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO.lambda$batchPersistence$0(BatchProcessEsDAO.java:64)
            at org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO$$Lambda$325/1751934213.accept(Unknown Source)
            at java.lang.Iterable.forEach(Iterable.java:75)
            at org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO.batchPersistence(BatchProcessEsDAO.java:62)
            at org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker.onWork(PersistenceWorker.java:51)
            at org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker.onWork(MetricsPersistentWorker.java:77)
            at org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker$PersistentConsumer.consume(MetricsPersistentWorker.java:176)
            at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.consume(MultipleChannelsConsumer.java:80)
            at org.apache.skywalking.apm.commons.datacarrier.consumer.MultipleChannelsConsumer.run(MultipleChannelsConsumer.java:49)
    
       Locked ownable synchronizers:
            - None
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    而该锁被其他线程持有, 持有线程的锁在等待另外一个semaphore锁,而该semaphore锁并没有线程持有,

    "pool-12-thread-1" #57 prio=5 os_prio=0 tid=0x00007f01e1534000 nid=0x7c01 waiting on condition [0x00007f01a48c6000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x00000000e17b6a80> (a java.util.concurrent.Semaphore$NonfairSync)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
            at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
            at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:60)
            at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
            at org.elasticsearch.action.bulk.BulkProcessor.flush(BulkProcessor.java:358)
            - locked <0x00000000e17b6908> (a org.elasticsearch.action.bulk.BulkProcessor)
            at org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO.batchPersistence(BatchProcessEsDAO.java:72)
            at org.apache.skywalking.oap.server.core.storage.PersistenceTimer.extractDataAndSave(PersistenceTimer.java:113)
            at org.apache.skywalking.oap.server.core.storage.PersistenceTimer.lambda$start$0(PersistenceTimer.java:65)
            at org.apache.skywalking.oap.server.core.storage.PersistenceTimer$$Lambda$226/640294829.run(Unknown Source)
            at org.apache.skywalking.apm.util.RunnableWithExceptionProtection.run(RunnableWithExceptionProtection.java:36)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    查看Elasticsearchclient的源码发现,ES客户端在执行批量操作的时候获取semaphore锁,并通过CountDownLatch的方式实现异步转同步
    image.png
    再次查看jstack,并没有发现哪个线程持有该semaphore锁,这次的调查暂告一段落,猜测是ES的bug导致的,以后有时间再仔细查看具体原因。直接原因还是因为上传的数据量太大导致,暂时解决方案,

    1. 去掉skywalking-agent中的okhttp plugin来防止大量的consul数据上报请求
    2. skywalking-server修改sample比例为10%来减小ES压力
    3. 增加ES实例数

    简单画一下刚才分析的处理逻辑的流程图
    skywalking处理segment数据流转图.png

    结论就是skywalking

    skywalking的agent中会通过grpc将dataCarrier中buffer中对于的TraceSegment发送给后端,后端收到TraceSegment会将TraceSegment保存到es中,保存成功之后

    会通知agent的发送方说数据处理成功了,这个时候agent会将dataCarrier中buffer清空,这个时候agent采集到的数据才能存储到dataCarrier中buffer中

    现在因为后端es的瓶颈,es出现了问题。skywalking后端收到数据之后一致无法存储到es中,就无法给agent回复数据处理成功的消息,就会导致skywalking 的agent椎中存在大量的发送消息的thread处于阻塞状态,导致数据无法发送成功,所以这个时候需要引入kafaka进行解耦

  • 相关阅读:
    auto_ptr(转载)
    OSG在VS2008下的配置安装
    没有找到MSVCR80.dll (转)
    获取程序数据路径(转)
    vc中error LNK2001:unresolved external symbol _WinMain@16的解决方法(转)
    wxWidgets编程笔记二(samples使用设置)
    关于简繁转换的工作以及校正转换词汇表的设计
    汉文博士简繁汉字转换功能测试版已经上线
    感谢wangyanhan和sanwsw网友为汉文博士制作数据库
    汉文博士新增四角号码检索字典
  • 原文地址:https://www.cnblogs.com/kebibuluan/p/14042229.html
Copyright © 2011-2022 走看看