-module(lib_chan_cs). %% 实现服务器端结构和机制的模块 -export([start_raw_server/4, start_raw_client/3]). -export([stop/1]). -export([children/1]). %% 客户端调用,用来连接服务器 start_raw_client(Host, Port, PacketLength) -> gen_tcp:connect(Host, Port, [binary, {active, true}, {packet, PacketLength}]). %%启动服务器 %%以给定端口创建名字,如果该端口已经注册则服务器已经启动 %%如果端口未注册,则新建进程启动服务器,如果成功启动,则注册端口为新建进程号 %%调用cold_start新建进程来启动服务器,需要传入当前进程以便能获取新建进程的消息 %%新建进程传回进程id以确保接收到的信息是由新建进程传回的。 start_raw_server(Port, Fun, Max, PacketLength) -> Name = port_name(Port), case whereis(Name) of undefined -> Self = self(), Pid = spawn_link(fun() -> cold_start(Self,Port,Fun,Max,PacketLength) end), receive {Pid, ok} -> register(Name, Pid), {ok, self()}; {Pid, Error} -> Error end; _Pid -> {error, already_started} end. stop(Port) when integer(Port) -> Name = port_name(Port), case whereis(Name) of undefined -> not_started; Pid -> exit(Pid, kill), (catch unregister(Name)), stopped end. %%获取连接到某端口的socket children(Port) when integer(Port) -> port_name(Port) ! {children, self()}, receive {session_server, Reply} -> Reply end. port_name(Port) when integer(Port) -> list_to_atom("portServer" ++ integer_to_list(Port)). %%监听端口 %%开始新建进程接受客户端连接,如果接收到连接,则调用Fun(Socket) %%如果一个进程已经接收了一个连接,则需要再次新建进程接收客户端连接 %%最多只能接受Max个连接,超过了后将不再新建进程接收客户端连接,直到有其他的连接结束 cold_start(Master, Port, Fun, Max, PacketLength) -> process_flag(trap_exit, true), %% io:format("Starting a port server on ~p...~n",[Port]), case gen_tcp:listen(Port, [binary, %% {dontroute, true}, {nodelay,true}, {packet, PacketLength}, {reuseaddr, true}, {active, true}]) of {ok, Listen} -> %% io:format("Listening to:~p~n",[Listen]), Master ! {self(), ok}, New = start_accept(Listen, Fun), %% Now we're ready to run socket_loop(Listen, New, [], Fun, Max); Error -> Master ! {self(), Error} end. socket_loop(Listen, New, Active, Fun, Max) -> receive {istarted, New} -> Active1 = [New|Active], possibly_start_another(false,Listen,Active1,Fun,Max); {'EXIT', New, _Why} -> %% io:format("Child exit=~p~n",[Why]), possibly_start_another(false,Listen,Active,Fun,Max); {'EXIT', Pid, _Why} -> %% io:format("Child exit=~p~n",[Why]), Active1 = lists:delete(Pid, Active), possibly_start_another(New,Listen,Active1,Fun,Max); {children, From} -> From ! {session_server, Active}, socket_loop(Listen,New,Active,Fun,Max); _Other -> socket_loop(Listen,New,Active,Fun,Max) end. possibly_start_another(New, Listen, Active, Fun, Max) when pid(New) -> socket_loop(Listen, New, Active, Fun, Max); possibly_start_another(false, Listen, Active, Fun, Max) -> case length(Active) of N when N < Max -> New = start_accept(Listen, Fun), socket_loop(Listen, New, Active, Fun,Max); _ -> socket_loop(Listen, false, Active, Fun, Max) end. start_accept(Listen, Fun) -> S = self(), spawn_link(fun() -> start_child(S, Listen, Fun) end). start_child(Parent, Listen, Fun) -> case gen_tcp:accept(Listen) of {ok, Socket} -> Parent ! {istarted,self()}, % tell the controller inet:setopts(Socket, [{packet,4}, binary, {nodelay,true}, {active, true}]), %% before we activate socket %% io:format("running the child:~p Fun=~p~n", [Socket, Fun]), process_flag(trap_exit, true), case (catch Fun(Socket)) of {'EXIT', normal} -> true; {'EXIT', Why} -> io:format("Port process dies with exit:~p~n",[Why]), true; _ -> %% not an exit so everything's ok true end end.
%%验证模块 -module(lib_chan_auth). -export([make_challenge/0, make_response/2, is_response_correct/3]). make_challenge() -> random_string(25). make_response(Challenge, Secret) -> lib_md5:string(Challenge ++ Secret). is_response_correct(Challenge, Response, Secret) -> case lib_md5:string(Challenge ++ Secret) of Response -> true; _ -> false end. %% random_string(N) -> a random string with N characters. random_string(N) -> random_seed(), random_string(N, []). random_string(0, D) -> D; random_string(N, D) -> random_string(N-1, [random:uniform(26)-1+$a|D]). random_seed() -> {_,_,X} = erlang:now(), {H,M,S} = time(), H1 = H * X rem 32767, M1 = M * X rem 32767, S1 = S * X rem 32767, put(random_seed, {H1,M1,S1}).
-module(lib_chan_mm). %% TCP中转 %% 导入该模块的模块可以应用send发送信息,并接收{chan, self(), Term}形式的信息 -export([loop/2, send/2, close/1, controller/2, set_trace/2, trace_with_tag/2]). send(Pid, Term) -> Pid ! {send, Term}. close(Pid) -> Pid ! close. controller(Pid, Pid1) -> Pid ! {setController, Pid1}. set_trace(Pid, X) -> Pid ! {trace, X}. trace_with_tag(Pid, Tag) -> set_trace(Pid, {true, fun(Msg) -> io:format("MM:~p ~p~n",[Tag, Msg]) end}). loop(Socket, Pid) -> %% trace_with_tag(self(), trace), process_flag(trap_exit, true), loop1(Socket, Pid, false). %%实现对TCP发送信息,接收信息的封装 loop1(Socket, Pid, Trace) -> receive {tcp, Socket, Bin} -> Term = binary_to_term(Bin), trace_it(Trace,{socketReceived, Term}), Pid ! {chan, self(), Term}, loop1(Socket, Pid, Trace); {tcp_closed, Socket} -> trace_it(Trace, socketClosed), Pid ! {chan_closed, self()}; {'EXIT', Pid, Why} -> trace_it(Trace,{controllingProcessExit, Why}), gen_tcp:close(Socket); {setController, Pid1} -> trace_it(Trace, {changedController, Pid}), loop1(Socket, Pid1, Trace); {trace, Trace1} -> trace_it(Trace, {setTrace, Trace1}), loop1(Socket, Pid, Trace1); close -> trace_it(Trace, closedByClient), gen_tcp:close(Socket); {send, Term} -> trace_it(Trace, {sendingMessage, Term}), gen_tcp:send(Socket, term_to_binary(Term)), loop1(Socket, Pid, Trace); UUg -> io:format("lib_chan_mm: protocol error:~p~n",[UUg]), loop1(Socket, Pid, Trace) end. trace_it(false, _) -> void; trace_it({true, F}, M) -> F(M).
%%主要模块 -module(lib_chan). -export([cast/2, start_server/0, start_server/1, connect/5, disconnect/1, rpc/2]). -import(lists, [map/2, member/2, foreach/2]). -import(lib_chan_mm, [send/2, close/1]). start_server() -> case os:getenv("HOME") of false -> exit({ebadEnv, "HOME"}); Home -> start_server(Home ++ "/.erlang_config/lib_chan.conf") end. %%根据配置文件启动服务器 %%配置文件形如 %%{port, 2223}. %%{service, chat, password,"AsDT67aQ",mfa,mod_chat_controller,start,[]}. %% file:consult(ConfigFile)读取config文件 start_server(ConfigFile) -> io:format("lib_chan starting:~p~n",[ConfigFile]), case file:consult(ConfigFile) of {ok, ConfigData} -> io:format("ConfigData=~p~n",[ConfigData]), case check_terms(ConfigData) of [] -> start_server1(ConfigData); Errors -> exit({eDaemonConfig, Errors}) end; {error, Why} -> exit({eDaemonConfig, Why}) end. check_terms(ConfigData) -> L = map(fun check_term/1, ConfigData), [X || {error, X} <- L]. check_term({port, P}) when is_integer(P) -> ok; check_term({service,_,password,_,mfa,_,_,_}) -> ok; check_term(X) -> {error, {badTerm, X}}. %%新建服务器进程并注册为lib_chan start_server1(ConfigData) -> register(lib_chan, spawn(fun() -> start_server2(ConfigData) end)). start_server2(ConfigData) -> %%从ConfigData中提取Port [Port] = [ P || {port,P} <- ConfigData], start_port_server(Port, ConfigData). %%启动服务器,对每一个连接进入服务器的Socket调用start_port_instance(Socket,ConfigData) start_port_server(Port, ConfigData) -> lib_chan_cs:start_raw_server(Port, fun(Socket) -> start_port_instance(Socket, ConfigData) end, 100, 4). start_port_instance(Socket, ConfigData) -> %% This is where the low-level connection is handled %% We must become a middle man %% But first we spawn a connection handler S = self(), Controller = spawn_link(fun() -> start_erl_port_server(S, ConfigData) end), lib_chan_mm:loop(Socket, Controller). start_erl_port_server(MM, ConfigData) -> receive {chan, MM, {startService, Mod, ArgC}} -> case get_service_definition(Mod, ConfigData) of {yes, Pwd, MFA} -> case Pwd of none -> send(MM, ack), really_start(MM, ArgC, MFA); _ -> do_authentication(Pwd, MM, ArgC, MFA) end; no -> io:format("sending bad service~n"), send(MM, badService), close(MM) end; Any -> io:format("*** ErL port server got:~p ~p~n",[MM, Any]), exit({protocolViolation, Any}) end. %%服务器用do_authentication进行验证,验证成功才对socket真正启动服务器的服务 do_authentication(Pwd, MM, ArgC, MFA) -> %%生成25位随机码 C = lib_chan_auth:make_challenge(), send(MM, {challenge, C}), receive {chan, MM, {response, R}} -> case lib_chan_auth:is_response_correct(C, R, Pwd) of true -> send(MM, ack), really_start(MM, ArgC, MFA); false -> send(MM, authFail), close(MM) end end. %% MM is the middle man %% Mod is the Module we want to execute ArgC and ArgS come from the client and %% server respectively really_start(MM, ArgC, {Mod, Func, ArgS}) -> %% authentication worked so now we're off case (catch apply(Mod,Func,[MM,ArgC,ArgS])) of {'EXIT', normal} -> true; {'EXIT', Why} -> io:format("server error:~p~n",[Why]); Why -> io:format("server error should die with exit(normal) was:~p~n", [Why]) end. %% get_service_definition(Name, ConfigData) %%获取服务的配置信息 get_service_definition(Mod, [{service, Mod, password, Pwd, mfa, M, F, A}|_]) -> {yes, Pwd, {M, F, A}}; get_service_definition(Name, [_|T]) -> get_service_definition(Name, T); get_service_definition(_, []) -> no. %%客户端连接服务器,先连接再进行验证 connect(Host, Port, Service, Secret, ArgC) -> S = self(), MM = spawn(fun() -> connect(S, Host, Port) end), receive {MM, ok} -> case authenticate(MM, Service, Secret, ArgC) of ok -> {ok, MM}; Error -> Error end; {MM, Error} -> Error end. connect(Parent, Host, Port) -> case lib_chan_cs:start_raw_client(Host, Port, 4) of {ok, Socket} -> Parent ! {self(), ok}, lib_chan_mm:loop(Socket, Parent); Error -> Parent ! {self(), Error} end. authenticate(MM, Service, Secret, ArgC) -> send(MM, {startService, Service, ArgC}), %% we should get back a challenge or a ack or closed socket receive {chan, MM, ack} -> ok; {chan, MM, {challenge, C}} -> R = lib_chan_auth:make_response(C, Secret), send(MM, {response, R}), receive {chan, MM, ack} -> ok; {chan, MM, authFail} -> wait_close(MM), {error, authFail}; Other -> {error, Other} end; {chan, MM, badService} -> wait_close(MM), {error, badService}; Other -> {error, Other} end. wait_close(MM) -> receive {chan_closed, MM} -> true after 5000 -> io:format("**error lib_chan~n"), true end. disconnect(MM) -> close(MM). rpc(MM, Q) -> send(MM, Q), receive {chan, MM, Reply} -> Reply end. cast(MM, Q) -> send(MM, Q).
该功能的使用方法为:
服务器端:
-import(lib_chan_mm, [send/2, controller/2]).
启动服务器
lib_chan:start_server("D:\code\erlcode\socket_dist\test.conf").
chat.conf为服务器的配置,如
{port, 2223}.
{service, test, password,"AsDT67aQ",mfa,mod_test_controller,start,[]}.
新启进程来接收消息
客户端:
调用lib_chan:connect(Host, Port, chat, Pwd, []) 连接服务器
将会调用配置文件中的mod_test_controller:start
连接好后用lib_chan_mm:send(MM, Msg).来给服务器发消息
使用方法如下例
-module(test_server). -import(lib_chan_mm, [send/2, controller/2]). -compile(export_all). start() -> start_server(), lib_chan:start_server("D:\code\erlcode\socket_dist\test.conf"). start_server() -> register(test_server, spawn(fun() -> process_flag(trap_exit, true), Val= (catch server_loop([])), io:format("Server terminated with:~p~n",[Val]) end)). server_loop(L) -> receive {mm, Channel, Msg} -> io:format("test_server received Msg=~p~n",[{Channel,Msg}]); Msg -> io:format("Server received Msg=~p~n", [Msg]), server_loop(L) end.
-module(mod_test_controller). -export([start/3]). -import(lib_chan_mm, [send/2]). start(MM, _, _) -> process_flag(trap_exit, true), io:format("mod_test_controller start...~p~n",[MM]), loop(MM). loop(MM) -> receive {chan, MM, Msg} -> test_server ! {mm, MM, Msg}, loop(MM); Other -> io:format("mod_test_controller unexpected message =~p (MM=~p)~n", [Other, MM]), loop(MM) end.
-module(test_client). -export([start/0, connect/4]). start() -> connect("localhost", 2223, "AsDT67aQ", "a test msg"). connect(Host, Port, Pwd, Msg) -> io:format("test_client connect host:~p~n",[Msg]), case lib_chan:connect(Host, Port, test, Pwd, []) of {error, _Why} -> io:format("test_client can not connected:~p~n",[Msg]); {ok, MM} -> io:format("test_client connected:~p~n",[Msg]), lib_chan_mm:send(MM, Msg) end.