经过研究,决定在 cql3/QueryProcessor.java 里面下手。
这里有两个函数,第一个是
public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime)
该函数接受到 String 类型的sql语句,进行规整处理(判断是否合法),然后 调用 函数
processStatement(prepared, queryState, options, queryStartNanoTime);
进行具体的处理。
我们在同一个类里面建立bench函数
public void Bench(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime)
在 process 函数开始时调用bench函数。
但是注意:如果不加处理,cassandra启动后会陷入某种无限循环导致一直无法进入服务状态(客户端无法操作数据库)
因此,在调用 bench 之前,加入if语句,判断如果 queryString 中含有某字符(bench)再调用bench函数。
注意需要将判断语句放在开始,这样可以使用任意的 query语句触发bench函数。否则如果放在后面,如果query语句非法,规整处理会直接返回。
bench函数被调用后,可以更改传递过来的 sql语句,然后调用
processStatement
完成处理。
测试:在bench函数中使用insert语句,结果插入成功。
------------------------------问题
只能插入一条,如果插入多条的话,会发生错误。
WriteTimeout: Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}
goolge该错误,找到该链接,一模一样的错误 http://stackoverflow.com/questions/30575125/coordinator-node-timed-out-waiting-for-replica-nodes-in-cassandra-datastax-while
将配置文件 cassandra.yaml 中的 write_request_timeout_in_ms
=2000 改为
write_request_timeout_in_ms: 20000
重启,多条插入成功
---------------------------------问题2
但是这并不能完成负载,一般插入几十k条记录就结束了。
后来发现 write_request_timeout_in_ms 的设置同时是服务端的运行时间,将这个值在加几个0,就可以bench很长时间了。
-------------------------问题3
当解决了上面问题后,去除打印语句,可以发现log的速度很快,iostat在7到十几MB/s
但是,合并后又出了问题。
决定放弃这一方案,准备在多核机器上使用YCSB客户端测试。
WARN [MutationStage-7] 2017-02-23 05:44:40,240 AbstractLocalAwareExecutorService.java:167 - Uncaught exception on thread Thread[MutationStage-7,10,main]: {}
java.lang.AssertionError: null
at io.netty.util.Recycler$WeakOrderQueue.<init>(Recycler.java:225) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.util.Recycler$DefaultHandle.recycle(Recycler.java:180) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.util.Recycler.recycle(Recycler.java:141) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at org.apache.cassandra.utils.btree.BTree$Builder.recycle(BTree.java:839) ~[main/:na]
at org.apache.cassandra.utils.btree.BTree$Builder.build(BTree.java:1092) ~[main/:na]
at org.apache.cassandra.db.partitions.PartitionUpdate.build(PartitionUpdate.java:584) ~[main/:na]
at org.apache.cassandra.db.partitions.PartitionUpdate.maybeBuild(PartitionUpdate.java:574) ~[main/:na]
at org.apache.cassandra.db.partitions.PartitionUpdate.holder(PartitionUpdate.java:385) ~[main/:na]
at org.apache.cassandra.db.partitions.AbstractBTreePartition.unfilteredIterator(AbstractBTreePartition.java:177) ~[main/:na]
at org.apache.cassandra.db.partitions.AbstractBTreePartition.unfilteredIterator(AbstractBTreePartition.java:172) ~[main/:na]
at org.apache.cassandra.db.partitions.PartitionUpdate$PartitionUpdateSerializer.serialize(PartitionUpdate.java:776) ~[main/:na]
at org.apache.cassandra.db.Mutation$MutationSerializer.serialize(Mutation.java:377) ~[main/:na]
at org.apache.cassandra.db.commitlog.CommitLog.add(CommitLog.java:249) ~[main/:na]
at org.apache.cassandra.db.Keyspace.apply(Keyspace.java:581) ~[main/:na]
at org.apache.cassandra.db.Keyspace.applyNotDeferrable(Keyspace.java:440) ~[main/:na]
at org.apache.cassandra.db.Mutation.apply(Mutation.java:223) ~[main/:na]
at org.apache.cassandra.db.Mutation.apply(Mutation.java:237) ~[main/:na]
at org.apache.cassandra.service.StorageProxy$8.runMayThrow(StorageProxy.java:1374) ~[main/:na]
at org.apache.cassandra.service.StorageProxy$LocalMutationRunnable.run(StorageProxy.java:2596) ~[main/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_111]
at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:162) ~[main/:na]
at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:134) [main/:na]
at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:109) [main/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]