networking module:
rabbit_networking.erl:
start() ->
{ok,_} = supervisor2:start_child(
rabbit_sup,
{rabbit_tcp_client_sup,
{rabbit_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]},
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
====>
rabbit_sup->rabbit_client_sup->rabbit_connection_sup:
start_link() ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, Collector} =
supervisor2:start_child(
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
{ok, ChannelSupSupPid} =
supervisor2:start_child(
SupPid,
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
[ChannelSupSupPid, Collector,
rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
rabbit_channel_sup_sup.erl:
start_link() ->
supervisor2:start_link(?MODULE, []).
init([]) ->
{ok, {{simple_one_for_one_terminate, 0, 1},
[{channel_sup, {rabbit_channel_sup, start_link, []},
temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
supervisor2.erl:
%% SupName = self
%% SupFlags = {simple_one_for_one_terminate, 0, 1}
%% Mod = rabbit_channel_sup
%% Args = []
case init_state(SupName, SupFlags, Mod, Args) of
{ok, State} when ?is_simple(State) -> %% 这里判断的结果确实是simple
init_dynamic(State, StartSpec);
{ok, State} ->
init_children(State, StartSpec);
Error ->
{stop, {supervisor_data, Error}}
end;
==> {ok, State} =
{ok, #state{name = supname(SupName,Mod),
strategy = Strategy,
intensity = MaxIntensity,
period = Period,
module = Mod,
args = Args}};
==>
到这里为止,目前rabbit_channel_sup_sup并没有把rabbit_channel_sup启动起来
%% StartSpec: {channel_sup, {rabbit_channel_sup, start_link, []},
temporary, infinity, supervisor, [rabbit_channel_sup]}
init_dynamic(State, [StartSpec]) ->
case check_startspec([StartSpec]) of
{ok, Children} ->
{ok, State#state{children = Children}};
Error ->
{stop, {start_spec, Error}}
end;
==> (由rabbit_reader.erl启动channel):
init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive %% 这里由rabbit_networking.erl主动发起 Reader ! {go, Sock, SockTransform}
%% 以下boot_tcp成功后才通知Reader
{go, Sock, SockTransform} ->
start_connection(
Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
==>start_connection:
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8))
==>recvloop:
recvloop(Deb, handle_input(State#v1.callback, Data,
State#v1{buf = [Rest],
buf_len = BufLen - RecvLen})).
==>handle_input:
switch_callback(handle_frame(Type, Channel, Payload, State),
frame_header, 7);
==>handle_frame:
true -> send_to_new_channel(
Channel, AnalyzedFrame, State);
==>send_to_new_channel:
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
由此,这里才开始start rabbit_channel_sup
所有(包括networking)都是rabbit.erl 的-rabbit_boot_step加载步骤加载起来
rabbit_networking.erl ==> boot_tcp:
start_tcp_listener -> start_listener -> start_listener0:
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
[IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]}, %% OnConnect=rabbit_networking:start_client
transient, infinity, supervisor, [tcp_listener_sup]}).
[IPAddress, Port, [Family | tcp_opts()],
=> tcp_listener_sup:start_link(Args).
%% Args=
[IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]
{ok, {{one_for_all, 10, 10},
[{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,
[Name, AcceptCallback]},
transient, infinity, supervisor, [tcp_acceptor_sup]},
{tcp_listener, {tcp_listener, start_link,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
OnStartup, OnShutdown, Label]},
transient, 16#ffffffff, worker, [tcp_listener]}]}}.
=>tcp_acceptor_sup.erl:
{ok, {{simple_one_for_one, 10, 10},
[{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
transient, brutal_kill, worker, [tcp_acceptor]}]}}.
这里tcp_acceptor_sup start_link后并不会start tcp_acceptor。看代码可以知道"simple_one_for_one"调用init_dynamic,只是把child信息记录,不会调用start_link
下一个tcp_listener start的时候
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock]) %% AcceptorSup就是上面的tcp_acceptor_sup
在这里再次调用sup的start_child方法,看supervisor可以知道start_child会把这次和上次保存的child参数合并:
Child = hd(State#state.children),
#child{mfargs = {M, F, A}} = Child,
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
故最终调用tcp_acceptor:start_link方法,解释了为什么tcp_acceptor的start_link两个参数的原因。
rabbit_channel_sup.erl: 通过Reader启动之后
1,先对于Reader和Channel启动一个Writer(rabbit_writer.erl)
[{writer, {rabbit_writer, start_link,
[Sock, Channel, FrameMax, Protocol, ReaderPid]},
2, start channel:
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, Protocol,
User, VHost, Capabilities, Collector,
rabbit_limiter:make_token(LimiterPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]})