您的位置:首页 > 其它

阅读 Erlang lib_chan 源码

2015-11-19 22:42 459 查看
lib_chan 是《Programming Erlang》 作者写的一种构建分布式系统的组件,思想很值得学习。

刚开始学习 Erlang,感觉编写代码的风格不是很适应,但感觉思想非常 nice。

看到书的中间部分用到了它,不想用的不明不白,所以打算阅读下,并做了简单地注释。若有错误还望指正。

我的一点看法:

我们如果要使用 lib_chan 构建一个分布式系统非常简单,只需要写上服务器的功能代码即可(注意编写一定要符合lib_chan的规范),任意客户端都可以在远程调用服务器的功能函数并取得结果。感觉类似 RPC,书上有例子,这里就不举例了(注意:lib_chan_auth 中的 lib_md5:string 函数不存在,替换为erlang:md5 即可)。

我觉得最好的一点就是中间人思想。



(图片来源《Programming Erlang》)

假设 P1 是客户端, P2 是服务器,MM1 和 MM2 是中间人。客户端和服务器与中间人通过 Pid ! Message 进行通信,中间人与中间人之间通过 Socket 进行通信。中间人负责编码/解码,认证等操作。它对应用程序隐藏了套接字相关内容,客户端和服务端获得的信息都是 Term 而不是 Binary。



另一点好处如上图,中间人和服务器可以并行的运行,中间负责编码/解码,服务器只负责结果,相比仅有服务器进行编码/解码以及响应请求的工作,添加中间人可以增加并发性。



其次,我们可以添加多个中间人,因为不同的请求请求的内容可能相同,添加不同的中间人我们可以解析不同的请求,解析后转化成相同的请求给服务器即可。

lib_chan

%% ----------------------------------------------------------------------
%% 主模块
%% ----------------------------------------------------------------------

-module(lib_chan).
-export([cast/2, start_server/0, start_server/1,
connect/5, disconnect/1, rpc/2]).
-import(lists, [map/2, member/2, foreach/2]).
-import(lib_chan_mm, [send/2, close/1]).

%%-----------------------------------------------------------------------
%% 服务器代码

%% os:getenv 获得环境变量,组装成配置文件的默认路径
start_server() ->
case os:getenv("HOME") of
false ->
exit({ebanEnv, "HOME"});
Home  ->
start_server(Home ++ "/.erlang_config/lib_chan.conf")
end.

%% 若直接指明配置文件的路径,file:consult 读取配置文件
%% 接着提取出需要的信息 ConfigData,类型为 list
%% check_term 过滤并检测 ConfigData,若正确则传递给 start_server1
%% 错误则退出
start_server(ConfigFile) ->
io:format("lib_chan starting:~p~n", [ConfigFile]),
case file:consult(ConfigFile) of
{ok, ConfigData} ->
io:format("ConfigData=~p~n", [ConfigData]),
case check_terms(ConfigData) of
[] ->
start_server1(ConfigData);
Errors ->
exit({eDaemonConfig, Errors})
end;
{error, Why} ->
exit({eDaemonConfig, Why})
end.

%% 过滤并检测配置文件
%% check_terms() -> [Error]
check_terms(ConfigData) ->
L = map(fun check_term/1, ConfigData),
[X || {error, X} <- L].
check_term({port, P}) when is_integer(P)     -> ok;
check_term({service, _, password, _, mfa, _, _, _})   ->ok;
check_term(X) -> {error, {badTerm, X}}.

%% start_server1 启动一个新的进程执行 start_server2,并且注册名字为 lib_chan
start_server1(ConfigData) ->
register(lib_chan, spawn(fun() -> start_server2(ConfigData) end)).

%% 从 ConfigData 里提取出 Port端口,传递给 start_port_server
start_server2(ConfigData) ->
[Port] = [P || {port, P} <- ConfigData],
start_port_server(Port, ConfigData).

%% 启动 start_raw_server,start_raw_server 启动一个监视器来监听 Port 上的连接
%% fun(Socket) 会在连接开始时执行
start_port_server(Port, ConfigData) ->
lib_chan_cs:start_raw_server(Port,
fun(Socket) ->
start_port_instance(Socket, ConfigData) end,
100,
4).

start_port_instance(Socket, ConfigData) ->
%% 这是处理底层连接的位置
%% 但首先要分裂出一个连接处理进程,必须成为中间人

% 获得当前进程的 Pid
S = self(),
% 成为中间人 Pid = S,并和新创建的进程连接
% 创建进程处理连接 Pid = Controller。
Controller = spawn_link(fun() -> start_erl_port_server(S, ConfigData) end),
% loop 负责编码和解码,接受 Controller 发来的消息。
% 注意 Erlang 的思想是这样的,如果我要给你发消息,消息内容中就会包含我的信息(Pid),
% 否则没办法知道消息是谁发送的,并且回复给谁。
lib_chan_mm:loop(Socket, Controller).

%% MM 就是上面的 S 中间人
%% 接受 MM 发来的请求,并且获取服务器 Mod,ArgC 等信息。
%% 获取信息后,回复 MM ok 表示收到,配合 config 文件里的内容执行函数调用,提供服务。
%% really_start 函数是真正执行功能调用的函数。也就是服务器提供给客户端的服务
start_erl_port_server(MM, ConfigData) ->
receive
{chan, MM, {startService, Mod, ArgC}} ->
% 接受到中间人 MM 发来的消息后判断是否认证
case get_service_definition(Mod, ConfigData) of
{yes, Pwd, MFA} ->
case Pwd of
none ->
send(MM, ack),
% 提取配置信息,通过 MFA 提供服务(执行其中的函数)。
really_start(MM, ArgC, MFA);
_ ->
% 若未认证,则执行认证
do_authentication(Pwd, MM, ArgC, MFA)
end;
no ->
io:format("sending bad service~n"),
send(MM, badService),
close(MM)
end;
Any ->
io:format("*** Erl port server got:~p ~p~n", [MM, Any]),
exit({protocolViolation, Any})
end.

%% 进行身份认证,认证成功进行调用,失败则退出关闭
do_authentication(Pwd, MM, ArgC, MFA) ->
C = lib_chan_auth:make_challenge(),
send(MM, {challenge, C}),
receive
{chan, MM, {response, R}} ->
case lib_chan_auth:is_response_correct(C, R, Pwd) of
true ->
send(MM, ack),
really_start(MM, ArgC, MFA);
false ->
send(MM, authFail),
close(MM)
end
end.

%% really_start 是真正的执行服务器上的功能函数
%% MM是中间人,功能执行完毕后得到结果然后发送给中间人
%% Mod是我们想要执行的模块。ArgC和ArgS分别来自客户端和服务器

really_start(MM, ArgC, {Mod, Func, ArgS}) ->
%% 认证成功,现在开始工作
case (catch apply(Mod, Func, [MM, ArgC, ArgS])) of
{'EXIT', normal} ->
true;
{'EXIT', Why} ->
io:format("server error:~p~n", [Why]);
Why ->
io:format("server error should dir with exit(normal) was:~p~n",
[Why])
end.

%% get_service_definition(Name, ConfigData)
%% 获得服务器的配置,包括模块,调用函数及参数。
get_service_definition(Mod, [{service, Mod, password, Pwd, mfa, M, F, A}|_]) ->
{yes, Pwd, {M, F, A}};
get_service_definition(Name, [_|T]) ->
get_service_definition(Name, T);
get_service_definition(_, []) ->
no.

%% --------------------------------------
%% 客户端连接代码
%% connect(...) -> [ok, MM] | Error

%% 客户端连接代码。
%% 连接后获得 Socket,接着进行认证
%% 从下面 rpc 函数代码可以看出客户端没有直接进行 Socket 通信,而是将消息发送给了中间人 MM,中间人 MM 将消息进行编码等,发送给服务器。服务器那边也通过中间人进行接收,接收后进行解码然后发送给服务器。
connect(Host, Port, Service, Secret, ArgC) ->
S = self(),
MM = spawn(fun() -> connect(S, Host, Port) end),
receive
{MM, ok} ->
case authenticate(MM, Service, Secret, ArgC) of
ok    -> {ok, MM};
Error -> Error
end;
{MM, Error} ->
Error
end.

%% 客户端连接,连接成功进入 loop 循环,等待消息。
connect(Parent, Host, Port) ->
case lib_chan_cs:start_raw_client(Host, Port, 4) of
{ok, Socket} ->
Parent ! {self(), ok},
lib_chan_mm:loop(Socket, Parent);
Error ->
Parent ! {self(), Error}
end.

%% 双方进行认证
authenticate(MM, Service, Secret, ArgC) ->
send(MM, {startService, Service, ArgC}),
%% 应该会接收到质询、ack或者套接字已关闭的消息
receive
{chan, MM, ack} ->
ok;
{chan, MM, {challenge, C}} ->
R = lib_chan_auth:make_response(C, Secret),
send(MM, {response, R}),
receive
{chan, MM, ack} ->
ok;
{chan, MM, authFail} ->
wait_close(MM),
{error, authFail};
Other ->
{error, Other}
end;
{chan, MM, badService} ->
wait_close(MM),
{error, badService};
Other ->
{error, Other}
end.

%% 关闭
wait_close(MM) ->
receive
{chan_closed, MM} ->
true
after 5000 ->
io:format("**error lib_chan~n"),
true
end.

disconnect(MM) -> close(MM).

%% 客户端调用,用来发送请求
rpc(MM, Q) ->
send(MM, Q),
receive
{chan, MM, Reply} ->
Reply
end.

cast(MM, Q) ->
send(MM, Q).


lib_chan_cs

-module(lib_chan_cs).
%% cs代表client_server
-export([start_raw_server/4, start_raw_client/3]).
-export([stop/1]).
-export([children/1]).

%% 建立客户端,发起连接
start_raw_client(Host, Port, PacketLength) ->
gen_tcp:connect(Host, Port,
[binary, {active, true}, {packet, PacketLength}]).

%% 建立服务器,进行配置
start_raw_server(Port, Fun, Max, PacketLength) ->
Name = port_name(Port),
case whereis(Name) of
undefined ->
Self = self(),
Pid = spawn_link(fun() ->
cold_start(Self, Port, Fun, Max, PacketLength)
end),
receive
{Pid, ok} ->
register(Name, Pid),
{ok, self()};
{Pid, Error} ->
Error
end;
_Pid ->
{error, already_started}
end.

stop(Port) when is_integer(Port) ->
Name = port_name(Port),
case whereis(Name) of
undefined ->
not_started;
Pid ->
exit(Pid, kill),
(catch unregister(Name)),
stopped
end.

children(Port) when is_integer(Port) ->
port_name(Port) ! {children, self()},
receive
{session_server, Reply} ->Reply
end.

port_name(Port) when is_integer(Port) ->
list_to_atom("portServer" ++ integer_to_list(Port)).

%% cold_start 做两件事情,
%% 一个是 start_accept,另一个是 socket_loop
%% start_accept 创建一个新进程去接受连接
cold_start(Master, Port, Fun, Max, PacketLength) ->
process_flag(trap_exit, true),
%% 现在我们准备好运行了
%% 建立监听套接字
case gen_tcp:listen(Port, [binary,
%% {dontroute, true},
{nodelay, true},
{packet, PacketLength},
{reuseaddr, true},
{active, true}]) of
{ok, Listen} ->
%% io:format("Listening to:~p~n", [Listen]),
Master ! {self(), ok},
New = start_accept(Listen, Fun),
socket_loop(Listen, New, [], Fun, Max);
Error ->
Master ! {self(), Error}
end.

%% socket_loop 是监听套接字的 loop,是服务器运行的 loop
%% 用来接受中间人发来的消息
socket_loop(Listen, New, Active, Fun, Max) ->
receive
{istarted, New} ->
Active1 = [New | Active],
possibly_start_another(false, Listen, Active1, Fun, Max);
{'EXIT', New, _Why} ->
possibly_start_another(false, Listen, Active, Fun, Max);
{'EXIT', Pid, _Why} ->
Active1 = lists:delete(Pid, Active),
possibly_start_another(New, Listen, Active1, Fun, Max);
{children, From} ->
From ! {session_server, Active},
socket_loop(Listen, New, Active, Fun, Max);
_Other ->
socket_loop(Listen, New, Active, Fun, Max)
end.

%% 判断连接是否达到最大限制
possibly_start_another(New, Listen, Active, Fun, Max)
when is_pid(New) ->
socket_loop(Listen, New, Active, Fun, Max);
possibly_start_another(false, Listen, Active, Fun, Max) ->
case length(Active) of
N when N < Max ->
New = start_accept(Listen, Fun),
socket_loop(Listen, New, Active, Fun, Max);
_ ->
socket_loop(Listen, false, Active, Fun, Max)
end.

%% 创建一个新进程去执行接收连接
start_accept(Listen, Fun) ->
S = self(),
spawn_link(fun() -> start_child(S, Listen, Fun) end).

%% 通过 Listen 监听套接字接受一个连接请求
%% Fun 为 lib_chan:start_port_instance
%% start_child 也为服务器建立中间人
%% start_child 将 Socket 传递给 lib_chan:start_port_instance 函数后,中间人通过获得的 Socket 进入 lib_chan_mm:loop 循环接受消息。
%% 注意:服务器和客户端只接受中间人的消息。
start_child(Parent, Listen, Fun) ->
case gen_tcp:accept(Listen) of
{ok, Socket} ->
Parent ! {istarted, self()},
inet:setopts(Socket, [{packet, 4},
binary,
{nodelay, true},
{active, true}]),
process_flag(trap_exit, true),
case (catch Fun(Socket)) of
{'EXIT', normal} ->
true;
{'EXIT', Why} ->
io:format("Port process dies with exit:~p~n", [Why]),
true;
_ ->
true
end
end.


lib_chan_mm

%% 负责编码和解码消息
%% 发送和接收消息都要通过中间人进行编码和解码

-module(lib_chan_mm).

-export([loop/2, send/2, close/1, controller/2, set_trace/2, trace_with_tag/2]).

send(Pid, Term)          -> Pid ! {send, Term}.
close(Pid)               -> Pid ! close.
controller(Pid, Pid1)    -> Pid ! {setController, Pid1}.
set_trace(Pid, X)        -> Pid ! {trace, X}.
trace_with_tag(Pid, Tag) ->
set_trace(Pid, {true,
fun(Msg) ->
io:format("MM:~p ~p~n", [Tag, Msg])
end}).

%% 设置为系统进程后进入 loop()。
loop(Socket, Pid) ->
process_flag(trap_exit, true),
loop1(Socket, Pid, false).

%% loop 是套接字 Socket 和 Erlang 中消息传递的桥梁。
%% 中间人执行 loop 接受消息后进行编码/解码,然后传递给对应的服务器或客户端。
loop1(Socket, Pid, Trace) ->
receive
{tcp, Socket, Bin} ->
Term = binary_to_term(Bin),
trace_it(Trace, {socketReceived, Term}),
Pid ! {chan, self(), Term},
loop1(Socket, Pid, Trace);
{tcp_closed, Socket} ->
trace_it(Trace, socketClosed),
Pid ! {chan_closed, self()};
{'EXIT', Pid, Why} ->
trace_it(Trace, {controllingProcessExit, Why}),
gen_tcp:close(Socket);
{setController, Pid1} ->
trace_it(Trace, {changedController, Pid}),
loop1(Socket, Pid1, Trace);
{trace, Trace1} ->
trace_it(Trace, {setTrace, Trace1}),
loop1(Socket, Pid, Trace1);
close ->
trace_it(Trace, closedByClient),
gen_tcp:close(Socket);
{send, Term} ->
trace_it(Trace, {sendingMessage, Term}),
gen_tcp:send(Socket, term_to_binary(Term)),
loop1(Socket, Pid, Trace);
UUg ->
io:format("lib_chan_mm:protocol error:~p~n", [UUg]),
loop1(Socket, Pid, Trace)
end.

trace_it(false, _)        -> void;
trace_it({true, F}, M) -> F(M).


lib_chan_auth

-module(lib_chan_auth).
-export([make_challenge/0, make_response/2, is_response_correct/3]).

%% 客户端与服务端进行质询/响应。完成验证才可以通信。
make_challenge() ->
random_string(25).
make_response(Challenge, Secret) ->
%   lib_md5:string(Challenge ++ Secret).
erlang:md5(Challenge ++ Secret).
is_response_correct(Challenge, Response, Secret) ->
%    case lib_md5:string(Challenge ++ Secret) of
case erlang:md5(Challenge ++ Secret) of
Response -> true;
_        -> false
end.

random_string(N) -> random_seed(), random_string(N, []).
random_string(0, D) -> D;
random_string(N, D) ->
random_string(N-1, [random:uniform(26)-1 + $a|D]).
random_seed() ->
{_, _, X} = erlang:now(),
{H, M, S} = time(),
H1 = H * X rem 32767,
M1 = M * X rem 32767,
S1 = S * X rem 32767,
put(random_seed, {H1,M1,S1}).
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: