这周一至周四基本都在做mysql的测试和调优工作,包括erlang端对mysql的写入测试,到今天为止暂且告一段落,下周先做下其他的开发。
测试环境
使用的测试环境是aliyun的杭州节点,
CPU:8核
内存:8GB
带宽:5MB
数据盘:100GB
Erlang版本:OTP18
mysql版本:mysql 5.7.11
测试工具:sysbench,mysqlslap
Sysbench部分测试:
prepare的语句:
./sysbench --test=/root/sysbench/tests/db/oltp.lua --oltp_tables_count=10 --oltp-table-size=800000 --db-driver=mysql --mysql-socket=/data/mysql/mysql.sock --mysql-user=自己的用户名 --mysql-password='自己的密码' --mysql-db=test_qps prepare
run的语句:
./sysbench --test=/root/sysbench/tests/db/oltp.lua --oltp_tables_count=10 --oltp-table-size=800000 --oltp-read-only=off --max-requests=0 --num-threads=128 --oltp-dist-type=uniform --max-time=600 --report-interval=10 --db-driver=mysql --mysql-socket=/data/mysql/mysql.sock --mysql-user=自己的用户名 --mysql-password='自己的密码' --mysql-db=test_qps run > test_result.log
mysqlslap的简单测试
测试表的sql:
CREATE TABLE `account` ( `id` int(11) NOT NULL default '0', `name` varchar(250) default NULL, `password` varchar(250) default NULL, `last_login_time` int(11) NOT NULL default '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
总感觉我不太会用mysqlslap,这样测试的结果有不少问题。还是倾向于用sysbench进行测试。
Erlang写的测试代码,用于测试emysql效率:
%%%------------------------------------------------------------------- %%% @author Administrator %%% @copyright (C) 2016, <COMPANY> %%% @doc %%% %%% @end %%% Created : 14. 四月 2016 16:28 %%%------------------------------------------------------------------- -module(multi_thread_test). -author("Administrator"). %% API -export([start/2,run/3,run_sql/2,recv/1]). -record(state,{running=0,start_time,total_count_all,sql_count_each_process}). %% 启动ProcessCount个进程,每个进程执行SqlCountEachProcess次sql 操作 start(TotalCount,SqlCountEachProcess)-> emysql:execute(default,<<"delete from account">>), CurrentTime=time_utility:longunixtime(), spawn_link(?MODULE,run,[TotalCount,SqlCountEachProcess,#state{start_time=CurrentTime, total_count_all=TotalCount, sql_count_each_process=SqlCountEachProcess}]). recv(#state{running=0,start_time=StartTime, total_count_all=ProcessCount, sql_count_each_process=SqlCountEachProcess})-> CurrentTime = time_utility:longunixtime(), Usedtime = CurrentTime-StartTime, io:format("process_count:~p sql count each process:~p used time:~p~n",[ProcessCount,SqlCountEachProcess,Usedtime]); recv(#state{running=Running}=State)-> receive done-> recv(State#state{running=Running-1}) end. run(0,_SqlCountEachProcess,#state{}=State)-> recv(State); run(TotalCount,SqlCountEachProcess,#state{running=Running}=State) when (TotalCount>0) -> Parent =self(), spawn(fun()-> run_sql(SqlCountEachProcess,Parent)end), run(TotalCount-SqlCountEachProcess,SqlCountEachProcess,State#state{running=Running+1}). run_sql(0,Parent)-> Parent!done; run_sql(SqlCountEachProcess,Parent) -> L = lists:seq(1,SqlCountEachProcess), [test2() || _<-L], run_sql(SqlCountEachProcess-SqlCountEachProcess ,Parent). test_prepare()-> Rand = util:rand(1,10000), emysql:execute(default,account_replace,[Rand]). test1()-> Rand = util:rand(1,10000), Sql = io_lib:format(<<"REPLACE INTO account(id) values (~p)">>,[Rand]). test2()-> emysql:execute(default,<<"REPLACE INTO account(id) values (floor(RAND()*10000));">>).
运行结果:
是mysqlslap效率的二分之一。
本周测试用的部分代码:
game_db_writer.erl
这个文件实现的是一个队列性质的mysql写入器,做的操作是redis队列中取需要写入的sql,然后一条条的写入mysql
%%%------------------------------------------------------------------- %%% @author 李世铭 %%% @copyright (C) April 1st,2016, <COMPANY> %%% @doc %%% 负责redis->mysql同步的写线程 %%% @end %%% Created : 01. 四月 2016 15:02 %%%------------------------------------------------------------------- -module(game_db_writer). -author("Administrator"). -behaviour(gen_fsm). -include("db_config.hrl"). -include("error_log.hrl"). -include("config_keys.hrl"). %% API -export([start_link/0]). -export([write_sql/0]). %% gen_fsm callbacks -export([init/1, writing/2, writing/3, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). -define(SERVER, ?MODULE). -define(MAX_PACKET,4096).%%mysql5.6默认允许的最大的包上限 -define(TIMEOUT_SPAN, 1000).%%休眠间隔 -define(ZERO_SPAN,0).%%立即执行 -record(state, {try_times=0}).%%重试次数 %%%=================================================================== %%% API %%%=================================================================== %%写一条sql语句 write_sql()-> StartTime = time_utility:longunixtime(), io:format("Start Writing Time is ~p!~n",[StartTime]), gen_fsm:send_event(?MODULE,{write_a_sql}). %%-------------------------------------------------------------------- %% @doc %% Creates a gen_fsm process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. %% %% @end %%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== %%% gen_fsm callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a gen_fsm is started using gen_fsm:start/[3,4] or %% gen_fsm:start_link/[3,4], this function is called by the new %% process to initialize. %% %% @end %%-------------------------------------------------------------------- -spec(init(Args :: term()) -> {ok, StateName :: atom(), StateData :: #state{}} | {ok, StateName :: atom(), StateData :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> io:format("db_writer is ready!~n"), {ok, writing, #state{},?ZERO_SPAN}. %%{ok,writing,#state{}}. %%-------------------------------------------------------------------- %% @private %% @doc %% There should be one instance of this function for each possible %% state name. Whenever a gen_fsm receives an event sent using %% gen_fsm:send_event/2, the instance of this function with the same %% name as the current state name StateName is called to handle %% the event. It is also called if a timeout occurs. %% %% @end %%-------------------------------------------------------------------- -spec(writing(Event :: term(), State :: #state{}) -> {next_state, NextStateName :: atom(), NextState :: #state{}} | {next_state, NextStateName :: atom(), NextState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). writing(timeout,State)-> do_write(State); writing(_Event, State) -> do_write(State). %%-------------------------------------------------------------------- %% @private %% @doc %% There should be one instance of this function for each possible %% state name. Whenever a gen_fsm receives an event sent using %% gen_fsm:sync_send_event/[2,3], the instance of this function with %% the same name as the current state name StateName is called to %% handle the event. %% %% @end %%-------------------------------------------------------------------- -spec(writing(Event :: term(), From :: {pid(), term()}, State :: #state{}) -> {next_state, NextStateName :: atom(), NextState :: #state{}} | {next_state, NextStateName :: atom(), NextState :: #state{}, timeout() | hibernate} | {reply, Reply, NextStateName :: atom(), NextState :: #state{}} | {reply, Reply, NextStateName :: atom(), NextState :: #state{}, timeout() | hibernate} | {stop, Reason :: normal | term(), NewState :: #state{}} | {stop, Reason :: normal | term(), Reply :: term(), NewState :: #state{}}). writing(_Event, _From, State) -> Reply = ok, {reply, Reply, writing, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a gen_fsm receives an event sent using %% gen_fsm:send_all_state_event/2, this function is called to handle %% the event. %% %% @end %%-------------------------------------------------------------------- -spec(handle_event(Event :: term(), StateName :: atom(), StateData :: #state{}) -> {next_state, NextStateName :: atom(), NewStateData :: #state{}} | {next_state, NextStateName :: atom(), NewStateData :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewStateData :: #state{}}). handle_event(_Event, StateName, State) -> {next_state, StateName, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a gen_fsm receives an event sent using %% gen_fsm:sync_send_all_state_event/[2,3], this function is called %% to handle the event. %% %% @end %%-------------------------------------------------------------------- -spec(handle_sync_event(Event :: term(), From :: {pid(), Tag :: term()}, StateName :: atom(), StateData :: term()) -> {reply, Reply :: term(), NextStateName :: atom(), NewStateData :: term()} | {reply, Reply :: term(), NextStateName :: atom(), NewStateData :: term(), timeout() | hibernate} | {next_state, NextStateName :: atom(), NewStateData :: term()} | {next_state, NextStateName :: atom(), NewStateData :: term(), timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewStateData :: term()} | {stop, Reason :: term(), NewStateData :: term()}). handle_sync_event(_Event, _From, StateName, State) -> Reply = ok, {reply, Reply, StateName, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_fsm when it receives any %% message other than a synchronous or asynchronous event %% (or a system message). %% %% @end %%-------------------------------------------------------------------- -spec(handle_info(Info :: term(), StateName :: atom(), StateData :: term()) -> {next_state, NextStateName :: atom(), NewStateData :: term()} | {next_state, NextStateName :: atom(), NewStateData :: term(), timeout() | hibernate} | {stop, Reason :: normal | term(), NewStateData :: term()}). handle_info(_Info, StateName, State) -> {next_state, StateName, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_fsm when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_fsm terminates with %% Reason. The return value is ignored. %% %% @end %%-------------------------------------------------------------------- -spec(terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), StateName :: atom(), StateData :: term()) -> term()). terminate(_Reason, _StateName, _State) -> ok. %%-------------------------------------------------------------------- %% @private %% @doc %% Convert process state when code is changed %% %% @end %%-------------------------------------------------------------------- -spec(code_change(OldVsn :: term() | {down, term()}, StateName :: atom(), StateData :: #state{}, Extra :: term()) -> {ok, NextStateName :: atom(), NewStateData :: #state{}}). code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== %%进行实际的写操作 do_write(State)-> case State#state.try_times>0 of true-> %%说明上次的消息未写入成功,从中转区取消息 Result = redis:get(?CURR_WRITING_MSG), case Result of {ok,SzMsg} -> Msg = db_utility:unpack_data(SzMsg); _-> {ok,Msg} = game_db_queue:dequeue(?MYSQL_WRITE_LIST), ?LOG_ERROR("REDIS SYSTEM ERROR!!!Cannot load Msg from game_frame:mysql_writing_msg") end; _-> {ok,Msg} = game_db_queue:dequeue(?MYSQL_WRITE_LIST) end, case Msg of %%队列已空 undefined-> CurrTime = time_utility:longunixtime(), io:format("end writing test time is:~w~n",[{CurrTime}]), {next_state,writing,#state{},?TIMEOUT_SPAN}; _-> do_write(Msg,State) end. do_write(Msg,State)-> %%先将取出来的消息存入中转区 redis:set(?CURR_WRITING_MSG,db_utility:pack_data(Msg)), IsPrepare = Msg#db_queue_msg.prepare, case IsPrepare of true-> %%如果预编译过 SqlId = Msg#db_queue_msg.prepare_atom, SqlArgs = Msg#db_queue_msg.prepare_param, PoolId = Msg#db_queue_msg.poolid, Result = mysql:run_prepare(PoolId,SqlId,SqlArgs); _-> %%如果没有 PoolId = Msg#db_queue_msg.poolid, Sql = Msg#db_queue_msg.sql, Result = mysql:execute(PoolId,Sql) end, case Result of {ok,_}-> %%写入成功后标记数据过期时间 Redis_expir_time = game_config:lookup_keys([?CF_DB_QUEUE, <<"redis_expir_time">>]), redis:expire(Msg#db_queue_msg.redis_key, integer_to_list(util:floor(3600 * Redis_expir_time))), %%然后中转区标记为<<"successful">>,表示写成功 redis:set(?CURR_WRITING_MSG,<<"successful">>), {next_state,writing,#state{},?ZERO_SPAN}; _-> RetryTimes = State#state.try_times, case RetryTimes>=?MAX_MYSQL_RETRY_TIME of true-> %% 如果写代码次数超过上限 %% 单独写一个log,方便查找log ?LOG_ERROR("Max MySQL retry times reached, Msg is: ~p", [[Msg]]), {next_state,writing,#state{},?ZERO_SPAN}; _-> {next_state,writing,#state{try_times=RetryTimes + 1},?ZERO_SPAN} end end.
game_db_writer2:
这个文件是上面那个文件的升级版,区别是一次性取一定数量(宏定义现在是100)的sql语句,自行进行sql拼接一次性写入,写入失败的语句会重新拼接进行写入,效率比上面那种方法能提高不少,但是如果多个mysql节点的话处理起来会比较麻烦
%%%------------------------------------------------------------------- %%% @author 李世铭 %%% @copyright (C) April 1st,2016, <COMPANY> %%% @doc %%% 负责redis->mysql同步的写线程 %%% @end %%% Created : 01. 四月 2016 15:02 %%%------------------------------------------------------------------- -module(game_db_writer2). -author("Administrator"). -behaviour(gen_fsm). -include("db_config.hrl"). -include("error_log.hrl"). -include("config_keys.hrl"). %% API -export([start_link/0]). -export([write_sql/0]). %% gen_fsm callbacks -export([init/1, writing/2, writing/3, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). -define(SERVER, ?MODULE). -define(MAX_PACKET,4096).%%mysql5.6默认允许的最大的包上限 -define(TIMEOUT_SPAN, 1000).%%休眠间隔 -define(ZERO_SPAN,0).%%立即执行 -record(state, {try_times=0}).%%重试次数 %%%=================================================================== %%% API %%%=================================================================== %%写一条sql语句 write_sql()-> StartTime = time_utility:longunixtime(), io:format("Start Writing Time is ~p!~n",[StartTime]), gen_fsm:send_event(?MODULE,{write_a_sql}). %%-------------------------------------------------------------------- %% @doc %% Creates a gen_fsm process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. %% %% @end %%-------------------------------------------------------------------- -spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_fsm:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== %%% gen_fsm callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a gen_fsm is started using gen_fsm:start/[3,4] or %% gen_fsm:start_link/[3,4], this function is called by the new %% process to initialize. %% %% @end %%-------------------------------------------------------------------- -spec(init(Args :: term()) -> {ok, StateName :: atom(), StateData :: #state{}} | {ok, StateName :: atom(), StateData :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> io:format("db_writer is ready!~n"), {ok, writing, #state{},?ZERO_SPAN}. %%{ok,writing,#state{}}. %%-------------------------------------------------------------------- %% @private %% @doc %% There should be one instance of this function for each possible %% state name. Whenever a gen_fsm receives an event sent using %% gen_fsm:send_event/2, the instance of this function with the same %% name as the current state name StateName is called to handle %% the event. It is also called if a timeout occurs. %% %% @end %%-------------------------------------------------------------------- -spec(writing(Event :: term(), State :: #state{}) -> {next_state, NextStateName :: atom(), NextState :: #state{}} | {next_state, NextStateName :: atom(), NextState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). writing(timeout,State)-> do_write(State); writing(_Event, State) -> do_write(State). %%-------------------------------------------------------------------- %% @private %% @doc %% There should be one instance of this function for each possible %% state name. Whenever a gen_fsm receives an event sent using %% gen_fsm:sync_send_event/[2,3], the instance of this function with %% the same name as the current state name StateName is called to %% handle the event. %% %% @end %%-------------------------------------------------------------------- -spec(writing(Event :: term(), From :: {pid(), term()}, State :: #state{}) -> {next_state, NextStateName :: atom(), NextState :: #state{}} | {next_state, NextStateName :: atom(), NextState :: #state{}, timeout() | hibernate} | {reply, Reply, NextStateName :: atom(), NextState :: #state{}} | {reply, Reply, NextStateName :: atom(), NextState :: #state{}, timeout() | hibernate} | {stop, Reason :: normal | term(), NewState :: #state{}} | {stop, Reason :: normal | term(), Reply :: term(), NewState :: #state{}}). writing(_Event, _From, State) -> Reply = ok, {reply, Reply, writing, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a gen_fsm receives an event sent using %% gen_fsm:send_all_state_event/2, this function is called to handle %% the event. %% %% @end %%-------------------------------------------------------------------- -spec(handle_event(Event :: term(), StateName :: atom(), StateData :: #state{}) -> {next_state, NextStateName :: atom(), NewStateData :: #state{}} | {next_state, NextStateName :: atom(), NewStateData :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewStateData :: #state{}}). handle_event(_Event, StateName, State) -> {next_state, StateName, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a gen_fsm receives an event sent using %% gen_fsm:sync_send_all_state_event/[2,3], this function is called %% to handle the event. %% %% @end %%-------------------------------------------------------------------- -spec(handle_sync_event(Event :: term(), From :: {pid(), Tag :: term()}, StateName :: atom(), StateData :: term()) -> {reply, Reply :: term(), NextStateName :: atom(), NewStateData :: term()} | {reply, Reply :: term(), NextStateName :: atom(), NewStateData :: term(), timeout() | hibernate} | {next_state, NextStateName :: atom(), NewStateData :: term()} | {next_state, NextStateName :: atom(), NewStateData :: term(), timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewStateData :: term()} | {stop, Reason :: term(), NewStateData :: term()}). handle_sync_event(_Event, _From, StateName, State) -> Reply = ok, {reply, Reply, StateName, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_fsm when it receives any %% message other than a synchronous or asynchronous event %% (or a system message). %% %% @end %%-------------------------------------------------------------------- -spec(handle_info(Info :: term(), StateName :: atom(), StateData :: term()) -> {next_state, NextStateName :: atom(), NewStateData :: term()} | {next_state, NextStateName :: atom(), NewStateData :: term(), timeout() | hibernate} | {stop, Reason :: normal | term(), NewStateData :: term()}). handle_info(_Info, StateName, State) -> {next_state, StateName, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_fsm when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_fsm terminates with %% Reason. The return value is ignored. %% %% @end %%-------------------------------------------------------------------- -spec(terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), StateName :: atom(), StateData :: term()) -> term()). terminate(_Reason, _StateName, _State) -> ok. %%-------------------------------------------------------------------- %% @private %% @doc %% Convert process state when code is changed %% %% @end %%-------------------------------------------------------------------- -spec(code_change(OldVsn :: term() | {down, term()}, StateName :: atom(), StateData :: #state{}, Extra :: term()) -> {ok, NextStateName :: atom(), NewStateData :: #state{}}). code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== %%进行实际的写操作 do_write(State)-> case State#state.try_times>0 of true-> %%说明上次的消息未写入成功,从中转区取消息 Result = redis:get(?CURR_WRITING_MSG_MULT), case Result of {ok,SzMsg} -> MsgList = db_utility:unpack_data(SzMsg); _-> {ok,MsgList} = game_db_queue:dequeue(?MYSQL_MULTI_WRITE_NUM,?MYSQL_WRITE_LIST_MULT), ?LOG_ERROR("REDIS SYSTEM ERROR!!!Cannot load Msg from game_frame:mysql_writing_msg") end; _-> {ok,MsgList} = game_db_queue:dequeue(?MYSQL_MULTI_WRITE_NUM,?MYSQL_WRITE_LIST_MULT) end, case MsgList of %%队列已空 []-> CurrTime = time_utility:longunixtime(), io:format("end writing test time is:~w~n",[{CurrTime}]), {next_state,writing,#state{},?TIMEOUT_SPAN}; _-> do_write(MsgList,State) end. do_write(MsgList,State)-> %%先将取出来的消息存入中转区 redis:set(?CURR_WRITING_MSG_MULT,db_utility:pack_data(MsgList)), F = fun(X,{_,FinalSql})-> Sql = X#db_queue_msg.sql, PoolId = X#db_queue_msg.poolid, %%之所以逆序是因为取出来的时候逆序存放 {PoolId,<<Sql/binary,";",FinalSql/binary>>} end, {PoolId,Sql} = lists:foldr(F,{default,<<>>},MsgList), Result = emysql:execute(PoolId,Sql), Result1 = lists:zip(MsgList,Result), F1 = fun({Msg,ETM}, Res) -> case ETM of {ok_packet,_,_,NID,_,_,_}-> Res; {result_packet,_,_,RS,_} -> Res; {error_packet,_,_,_,DB_ERROR_MSG} -> Res ++ [Msg] end end, if is_list(Result)-> LeftMsgList = lists:foldl(F1, [], Result1); true-> LeftMsgList = lists:foldl(F1, [], [Result1]) end, case LeftMsgList of []-> %%写入成功后标记数据过期时间 Redis_expir_time = game_config:lookup_keys([?CF_DB_QUEUE, <<"redis_expir_time">>]), [redis:expire(Msg#db_queue_msg.redis_key, integer_to_list(util:floor(3600 * Redis_expir_time))) || Msg<-MsgList], %%然后中转区标记为<<"successful">>,表示写成功 redis:set(?CURR_WRITING_MSG_MULT,<<"successful">>), {next_state,writing,#state{},?ZERO_SPAN}; _-> RetryTimes = State#state.try_times, case RetryTimes>=?MAX_MYSQL_RETRY_TIME of true-> %% 如果写代码次数超过上限 %% 单独写一个log,方便查找log ?LOG_ERROR("Max MySQL retry times reached, Msg is: ~p", [[MsgList]]), {next_state,writing,#state{},?ZERO_SPAN}; _-> redis:set(?CURR_WRITING_MSG_MULT,LeftMsgList), {next_state,writing,#state{try_times=RetryTimes + 1},?ZERO_SPAN} end end.
game_db_writer3:
这个放弃了状态机的方式,之前我对前两种的效率很不满意,怀疑是gen_fsm内部的超时机制消耗了大量的事情,才写了这个非otp的写入,但经测试发现,otp内部消耗的时间基本可以忽略不计。
%%%------------------------------------------------------------------- %%% @author Administrator %%% @copyright (C) 2016, <COMPANY> %%% @doc %%% %%% @end %%% Created : 14. 四月 2016 11:13 %%%------------------------------------------------------------------- -module(game_db_writer3). -author("Administrator"). -include("db_config.hrl"). -include("error_log.hrl"). -include("config_keys.hrl"). %% API -export([write_sql/0]). write_sql()-> StartTime = time_utility:longunixtime(), io:format("Start Writing Time is ~p!~n",[StartTime]), do_write(0), ok. %%进行实际的写操作 do_write(TryTimes)-> case TryTimes>0 of true-> %%说明上次的消息未写入成功,从中转区取消息 Result = redis:get(?CURR_WRITING_MSG), case Result of {ok,SzMsg} -> Msg = db_utility:unpack_data(SzMsg); _-> {ok,Msg} = game_db_queue:dequeue(?MYSQL_WRITE_LIST), ?LOG_ERROR("REDIS SYSTEM ERROR!!!Cannot load Msg from game_frame:mysql_writing_msg") end; _-> {ok,Msg} = game_db_queue:dequeue(?MYSQL_WRITE_LIST) end, case Msg of %%队列已空 undefined-> CurrTime = time_utility:longunixtime(), io:format("end writing test time is:~w~n",[{CurrTime}]), timer:sleep(1000); %%do_write(0); _-> do_write(Msg,TryTimes) end. do_write(Msg,TryTimes)-> %%先将取出来的消息存入中转区 redis:set(?CURR_WRITING_MSG,db_utility:pack_data(Msg)), IsPrepare = Msg#db_queue_msg.prepare, case IsPrepare of true-> %%如果预编译过 SqlId = Msg#db_queue_msg.prepare_atom, SqlArgs = Msg#db_queue_msg.prepare_param, PoolId = Msg#db_queue_msg.poolid, Result = mysql:run_prepare(PoolId,SqlId,SqlArgs); _-> %%如果没有 PoolId = Msg#db_queue_msg.poolid, Sql = Msg#db_queue_msg.sql, Result = mysql:execute(PoolId,Sql) end, case Result of {ok,_}-> %%写入成功后标记数据过期时间 Redis_expir_time = game_config:lookup_keys([?CF_DB_QUEUE, <<"redis_expir_time">>]), redis:expire(Msg#db_queue_msg.redis_key, integer_to_list(util:floor(3600 * Redis_expir_time))), %%然后中转区标记为<<"successful">>,表示写成功 redis:set(?CURR_WRITING_MSG,<<"successful">>), do_write(0); _-> case TryTimes>=?MAX_MYSQL_RETRY_TIME of true-> %% 如果写代码次数超过上限 %% 单独写一个log,方便查找log ?LOG_ERROR("Max MySQL retry times reached, Msg is: ~p", [[Msg]]), do_write(0); _-> do_write(Msg,TryTimes+1) end end.
db_test.erl
这个主要是配合进行压测的erl,包括简单的单线程sql效率测试
%%%------------------------------------------------------------------- %%% @author 李世铭 %%% @copyright (C) 2016, <COMPANY> %%% @doc %%% 测试数据库各种写方法的效率 %%% @end %%% Created : 05. 四月 2016 16:48 %%%------------------------------------------------------------------- -module(db_test). -author("Administrator"). -include("db_config.hrl"). %% API -export([test_db_multi_write/1,test_prepare_write/1,test_directly_write/1]). -export([test_directly_select/1,test_prepare_select/1]). -export([test_db_write/1,test_db_write_single/1]). -export([test_eprof_start/0,test_eprof_end/0]). test_db_multi_write(N)-> CurrTime = time_utility:longunixtime(), L = lists:seq(1,N), F = fun(X,Res)-> Rand = util:rand(1,1000000), Sql = mysql:make_insert_sql(account,["id"],[Rand]), SzSql = list_to_binary(Sql), case Res==<<"">> of true-> SzSql; _-> <<Res/binary,";",SzSql/binary>> end end, FinalSql = lists:foldl(F,<<>>,L), Result = emysql:execute(default,FinalSql), EndTime = time_utility:longunixtime(), io:format("Cost time is:~w~n",[{EndTime - CurrTime}]). test_prepare_write(N)-> CurrTime = time_utility:longunixtime(), L = lists:seq(1,N), F = fun(X)-> Rand = util:rand(1,10000), emysql:execute(default,account_replace,[Rand]) end, [F(X) || X<-L], EndTime = time_utility:longunixtime(), io:format("Cost time is:~w~n",[{EndTime - CurrTime}]). test_directly_write(N)-> CurrTime = time_utility:longunixtime(), L = lists:seq(1,N), F = fun(X)-> Rand = util:rand(1,10000), Sql = mysql:make_replace_sql(account,["id"],[Rand]), emysql:execute(default,Sql) end, [F(X) || X<-L], EndTime = time_utility:longunixtime(), io:format("Cost time is:~w~n",[{EndTime - CurrTime}]). test_directly_select(N)-> CurrTime = time_utility:longunixtime(), L = lists:seq(1,N), F = fun(X)-> Sql = io_lib:format(<<"select * from account where id=~p">>,[X]), emysql:execute(default,Sql) end, [F(X) || X<-L], EndTime = time_utility:longunixtime(), io:format("Cost time is:~w~n",[{EndTime - CurrTime}]). test_prepare_select(N)-> emysql:prepare(account_select,"select * from account where id=?"), CurrTime = time_utility:longunixtime(), L = lists:seq(1,N), F = fun(X)-> emysql:execute(default,account_select,[X]) end, [F(X) || X<-L], EndTime = time_utility:longunixtime(), io:format("Cost time is:~w~n",[{EndTime - CurrTime}]). test_db_write()-> Rand = util:rand(1,100000), Sql = mysql:make_replace_sql(account,["id"],[Rand]), SzSql = conversion_utility:to_binary(Sql), State = #db_queue_msg{redis_key = <<"TEST_HINCR">>,sql = SzSql}, %%State = #db_queue_msg{redis_key = <<"TEST_HINCR">>,prepare = true,prepare_atom = account_replace,prepare_param = [Rand]}, game_db_queue:enqueue(State,?MYSQL_WRITE_LIST_MULT). test_db_write(N)-> test_eprof_start(), CurrTime = time_utility:longunixtime(), io:format("enqueue start time is:~w~n",[{CurrTime}]), L = lists:seq(1,N), [test_db_write() || X<-L], CurrTime1 = time_utility:longunixtime(), io:format("enqueue end time is:~w~n",[{CurrTime1}]), test_eprof_end(), ok. test_db_write_single()-> Rand = util:rand(1,100000), Sql = mysql:make_replace_sql(account,["id"],[Rand]), SzSql = conversion_utility:to_binary(Sql), State = #db_queue_msg{redis_key = <<"TEST_HINCR">>,sql = SzSql}, %%State = #db_queue_msg{redis_key = <<"TEST_HINCR">>,prepare = true,prepare_atom = account_replace,prepare_param = [Rand]}, game_db_queue:enqueue(State,?MYSQL_WRITE_LIST). test_db_write_single(N)-> test_eprof_start(), CurrTime = time_utility:longunixtime(), io:format("enqueue start time is:~w~n",[{CurrTime}]), L = lists:seq(1,N), [test_db_write_single() || X<-L], CurrTime1 = time_utility:longunixtime(), io:format("enqueue end time is:~w~n",[{CurrTime1}]), test_eprof_end(), ok. test_eprof_start()-> eprof:start(), eprof:start_profiling([self()]). test_eprof_end()-> eprof:stop_profiling(), eprof:log(test_match), eprof:analyze(), eprof:stop().
个人感觉:
1.队列方式进行写入的时候无法充分的利用cpu,因为队列必然是单线程,800%CPU只能使用50%的时候mysql的效率一定不高。
2.批量写入并不见得一定好用,会带来很多其它的问题,如果不是特殊需求,没必要非得批量写入。
3.尽量购买rdb而不是自己搭建mysql服务器,20倍的iops差距真的很坑。
最后附上我自己测试用的my.cnf
# For advice on how to change settings please see # http://dev.mysql.com/doc/refman/5.6/en/server-configuration-defaults.html # *** DO NOT EDIT THIS FILE. It's a template which will be copied to the # *** default location during install, and will be replaced if you # *** upgrade to a newer version of MySQL. [mysqld] # Remove leading # and set to the amount of RAM for the most important data # cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. # Remove leading # to turn on a very important data integrity option: logging # changes to the binary log between backups. # These are commonly set, remove the # and set as required. basedir = /usr datadir = /data/mysql socket = /data/mysql/mysql.sock pid-file = /data/mysql/mysql_pidfile.pid log-error = /data/mysql/mysql_errorlog.err # port = ..... server_id = 1 # socket = ..... # Remove leading # to set options mainly useful for reporting servers. # The server defaults are faster for transactions and fast SELECTs. # Adjust sizes as needed, experiment to find the optimal values. # join_buffer_size = 128M # sort_buffer_size = 2M # read_rnd_buffer_size = 2M log_bin = /data/binlog/mysql_binlog sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES max_connections=4000 key_buffer_size=200M low_priority_updates=1 table_open_cache = 8000 back_log=1500 query_cache_type=0 query_cache_limit = 1M query_cache_size=256M table_open_cache_instances=16 # files innodb_file_per_table = ON innodb_log_file_size=1024M innodb_log_files_in_group = 3 innodb_open_files=4000 # buffers innodb_buffer_pool_size=4096M innodb_buffer_pool_instances=32 innodb_log_buffer_size=64M join_buffer_size=32K sort_buffer_size=32K # innodb innodb_checksums=0 innodb_doublewrite=0 innodb_support_xa=0 innodb_thread_concurrency=0 innodb_flush_log_at_trx_commit=2 innodb_max_dirty_pages_pct=50 innodb_use_native_aio=1 innodb_stats_persistent = 1 # perf special innodb_adaptive_flushing = 1 innodb_flush_neighbors = 0 innodb_read_io_threads = 4 innodb_write_io_threads = 4 innodb_io_capacity = 4000 innodb_purge_threads=1 innodb_adaptive_hash_index=0 # monitoring innodb_monitor_enable = '%' performance_schema=OFF