今天天气不错,但是赶上恶意加班心情就不爽,怀着不爽的心情干活,总能创造出更多的问题,这不,今天就自己挖了一个坑,自己跳进去了,好在上来了
经过是这样的,开始调试canal采集binlog时,由于添加了一个上报数量大小,随手打印了一个日志
logger.info("batchData:{}", batchData); pipeline.put(batchData);
发到sit环境测试一把,卧槽。。。,怎么不同步数据了,开始排查问题,是我改的代码有问题?于是回退了,还是不同步数据,但是发现在停止任务时会中断读线程,这时数据同步过来了,试了几次都是这样,开始怀疑读线程被阻塞了,于是乎祭出许久未用的jstack去看一下读线程状态:jstack pid
"canal-binlog-encrypt-wangbd-test-001,219025,ReaderTask_CanalReader_0_0" #32 prio=5 os_prio=0 tid=0x00007fdfdc010800 nid=0x4199 waiting on condition [0x00007fe044653000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000800ad988> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) at com.data.integration.engine.stream.core.cache.memory.MemoryDataCache.get(MemoryDataCache.java:135) at com.data.integration.engine.stream.core.cache.DataCache.get(DataCache.java:65) at com.data.integration.engine.stream.core.cache.DataCache.get(DataCache.java:61) at com.data.integration.engine.stream.core.config.pipeline.Pipeline.getBatchData(Pipeline.java:74) at com.alibaba.fastjson.serializer.ASMSerializer_4_Pipeline.write(Unknown Source) at com.alibaba.fastjson.serializer.ASMSerializer_1_DefaultBatchData.write(Unknown Source) at com.alibaba.fastjson.serializer.JSONSerializer.write(JSONSerializer.java:285) at com.alibaba.fastjson.JSON.toJSONString(JSON.java:740) at com.alibaba.fastjson.JSON.toJSONString(JSON.java:678) at com.alibaba.fastjson.JSON.toJSONString(JSON.java:643) at com.data.integration.engine.stream.plugin.reader.canalreader.CanalReader.start(CanalReader.java:127) at com.data.integration.engine.stream.plugin.reader.ReaderTask$ReaderThread.run(ReaderTask.java:138) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
发现居然在使用fastjson转换json时阻塞在缓存队列上了,说一下缓存队列读线程只会向里面写,写现场才会读,于是乎开始怀疑打印的这个日志,顺着思路找到了如下代码:
pipeline的batchData属性居然是从缓存中获取的对象 public BatchData getBatchData(){ return dataCache.get(); }
这就难怪了,由于缓存队列是空的,在toJson时阻塞了,导致整个读线程被阻塞,把这个日志去掉后就正常了,正常的读线程栈是这样的:
"canal-binlog-encrypt-wangbd-test-001,219023,ReaderTask_CanalReader_0_0" #32 prio=5 os_prio=0 tid=0x00007f7c8c00b000 nid=0xefb waiting on condition [0x00007f7d0476d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000807bf660> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer.get(MemoryEventStoreWithBuffer.java:219) at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.getEvents(CanalServerWithEmbedded.java:538) at com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.getWithoutAck(CanalServerWithEmbedded.java:347) - locked <0x00000000808a3448> (a com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager) at com.data.integration.engine.stream.plugin.reader.canalreader.CanalReader.start(CanalReader.java:101) at com.data.integration.engine.stream.plugin.reader.ReaderTask$ReaderThread.run(ReaderTask.java:138) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)