阅读 Erlang lib_chan 源码
2015-11-19 22:42
459 查看
lib_chan 是《Programming Erlang》 作者写的一种构建分布式系统的组件,思想很值得学习。
刚开始学习 Erlang,感觉编写代码的风格不是很适应,但感觉思想非常 nice。
看到书的中间部分用到了它,不想用的不明不白,所以打算阅读下,并做了简单地注释。若有错误还望指正。
我觉得最好的一点就是中间人思想。
![](https://img-blog.csdn.net/20151119222417016)
(图片来源《Programming Erlang》)
假设 P1 是客户端, P2 是服务器,MM1 和 MM2 是中间人。客户端和服务器与中间人通过 Pid ! Message 进行通信,中间人与中间人之间通过 Socket 进行通信。中间人负责编码/解码,认证等操作。它对应用程序隐藏了套接字相关内容,客户端和服务端获得的信息都是 Term 而不是 Binary。
![](https://img-blog.csdn.net/20151122134444660)
另一点好处如上图,中间人和服务器可以并行的运行,中间负责编码/解码,服务器只负责结果,相比仅有服务器进行编码/解码以及响应请求的工作,添加中间人可以增加并发性。
![](https://img-blog.csdn.net/20151122134502051)
其次,我们可以添加多个中间人,因为不同的请求请求的内容可能相同,添加不同的中间人我们可以解析不同的请求,解析后转化成相同的请求给服务器即可。
刚开始学习 Erlang,感觉编写代码的风格不是很适应,但感觉思想非常 nice。
看到书的中间部分用到了它,不想用的不明不白,所以打算阅读下,并做了简单地注释。若有错误还望指正。
我的一点看法:
我们如果要使用 lib_chan 构建一个分布式系统非常简单,只需要写上服务器的功能代码即可(注意编写一定要符合lib_chan的规范),任意客户端都可以在远程调用服务器的功能函数并取得结果。感觉类似 RPC,书上有例子,这里就不举例了(注意:lib_chan_auth 中的 lib_md5:string 函数不存在,替换为erlang:md5 即可)。我觉得最好的一点就是中间人思想。
(图片来源《Programming Erlang》)
假设 P1 是客户端, P2 是服务器,MM1 和 MM2 是中间人。客户端和服务器与中间人通过 Pid ! Message 进行通信,中间人与中间人之间通过 Socket 进行通信。中间人负责编码/解码,认证等操作。它对应用程序隐藏了套接字相关内容,客户端和服务端获得的信息都是 Term 而不是 Binary。
另一点好处如上图,中间人和服务器可以并行的运行,中间负责编码/解码,服务器只负责结果,相比仅有服务器进行编码/解码以及响应请求的工作,添加中间人可以增加并发性。
其次,我们可以添加多个中间人,因为不同的请求请求的内容可能相同,添加不同的中间人我们可以解析不同的请求,解析后转化成相同的请求给服务器即可。
lib_chan
%% ---------------------------------------------------------------------- %% 主模块 %% ---------------------------------------------------------------------- -module(lib_chan). -export([cast/2, start_server/0, start_server/1, connect/5, disconnect/1, rpc/2]). -import(lists, [map/2, member/2, foreach/2]). -import(lib_chan_mm, [send/2, close/1]). %%----------------------------------------------------------------------- %% 服务器代码 %% os:getenv 获得环境变量,组装成配置文件的默认路径 start_server() -> case os:getenv("HOME") of false -> exit({ebanEnv, "HOME"}); Home -> start_server(Home ++ "/.erlang_config/lib_chan.conf") end. %% 若直接指明配置文件的路径,file:consult 读取配置文件 %% 接着提取出需要的信息 ConfigData,类型为 list %% check_term 过滤并检测 ConfigData,若正确则传递给 start_server1 %% 错误则退出 start_server(ConfigFile) -> io:format("lib_chan starting:~p~n", [ConfigFile]), case file:consult(ConfigFile) of {ok, ConfigData} -> io:format("ConfigData=~p~n", [ConfigData]), case check_terms(ConfigData) of [] -> start_server1(ConfigData); Errors -> exit({eDaemonConfig, Errors}) end; {error, Why} -> exit({eDaemonConfig, Why}) end. %% 过滤并检测配置文件 %% check_terms() -> [Error] check_terms(ConfigData) -> L = map(fun check_term/1, ConfigData), [X || {error, X} <- L]. check_term({port, P}) when is_integer(P) -> ok; check_term({service, _, password, _, mfa, _, _, _}) ->ok; check_term(X) -> {error, {badTerm, X}}. %% start_server1 启动一个新的进程执行 start_server2,并且注册名字为 lib_chan start_server1(ConfigData) -> register(lib_chan, spawn(fun() -> start_server2(ConfigData) end)). %% 从 ConfigData 里提取出 Port端口,传递给 start_port_server start_server2(ConfigData) -> [Port] = [P || {port, P} <- ConfigData], start_port_server(Port, ConfigData). %% 启动 start_raw_server,start_raw_server 启动一个监视器来监听 Port 上的连接 %% fun(Socket) 会在连接开始时执行 start_port_server(Port, ConfigData) -> lib_chan_cs:start_raw_server(Port, fun(Socket) -> start_port_instance(Socket, ConfigData) end, 100, 4). start_port_instance(Socket, ConfigData) -> %% 这是处理底层连接的位置 %% 但首先要分裂出一个连接处理进程,必须成为中间人 % 获得当前进程的 Pid S = self(), % 成为中间人 Pid = S,并和新创建的进程连接 % 创建进程处理连接 Pid = Controller。 Controller = spawn_link(fun() -> start_erl_port_server(S, ConfigData) end), % loop 负责编码和解码,接受 Controller 发来的消息。 % 注意 Erlang 的思想是这样的,如果我要给你发消息,消息内容中就会包含我的信息(Pid), % 否则没办法知道消息是谁发送的,并且回复给谁。 lib_chan_mm:loop(Socket, Controller). %% MM 就是上面的 S 中间人 %% 接受 MM 发来的请求,并且获取服务器 Mod,ArgC 等信息。 %% 获取信息后,回复 MM ok 表示收到,配合 config 文件里的内容执行函数调用,提供服务。 %% really_start 函数是真正执行功能调用的函数。也就是服务器提供给客户端的服务 start_erl_port_server(MM, ConfigData) -> receive {chan, MM, {startService, Mod, ArgC}} -> % 接受到中间人 MM 发来的消息后判断是否认证 case get_service_definition(Mod, ConfigData) of {yes, Pwd, MFA} -> case Pwd of none -> send(MM, ack), % 提取配置信息,通过 MFA 提供服务(执行其中的函数)。 really_start(MM, ArgC, MFA); _ -> % 若未认证,则执行认证 do_authentication(Pwd, MM, ArgC, MFA) end; no -> io:format("sending bad service~n"), send(MM, badService), close(MM) end; Any -> io:format("*** Erl port server got:~p ~p~n", [MM, Any]), exit({protocolViolation, Any}) end. %% 进行身份认证,认证成功进行调用,失败则退出关闭 do_authentication(Pwd, MM, ArgC, MFA) -> C = lib_chan_auth:make_challenge(), send(MM, {challenge, C}), receive {chan, MM, {response, R}} -> case lib_chan_auth:is_response_correct(C, R, Pwd) of true -> send(MM, ack), really_start(MM, ArgC, MFA); false -> send(MM, authFail), close(MM) end end. %% really_start 是真正的执行服务器上的功能函数 %% MM是中间人,功能执行完毕后得到结果然后发送给中间人 %% Mod是我们想要执行的模块。ArgC和ArgS分别来自客户端和服务器 really_start(MM, ArgC, {Mod, Func, ArgS}) -> %% 认证成功,现在开始工作 case (catch apply(Mod, Func, [MM, ArgC, ArgS])) of {'EXIT', normal} -> true; {'EXIT', Why} -> io:format("server error:~p~n", [Why]); Why -> io:format("server error should dir with exit(normal) was:~p~n", [Why]) end. %% get_service_definition(Name, ConfigData) %% 获得服务器的配置,包括模块,调用函数及参数。 get_service_definition(Mod, [{service, Mod, password, Pwd, mfa, M, F, A}|_]) -> {yes, Pwd, {M, F, A}}; get_service_definition(Name, [_|T]) -> get_service_definition(Name, T); get_service_definition(_, []) -> no. %% -------------------------------------- %% 客户端连接代码 %% connect(...) -> [ok, MM] | Error %% 客户端连接代码。 %% 连接后获得 Socket,接着进行认证 %% 从下面 rpc 函数代码可以看出客户端没有直接进行 Socket 通信,而是将消息发送给了中间人 MM,中间人 MM 将消息进行编码等,发送给服务器。服务器那边也通过中间人进行接收,接收后进行解码然后发送给服务器。 connect(Host, Port, Service, Secret, ArgC) -> S = self(), MM = spawn(fun() -> connect(S, Host, Port) end), receive {MM, ok} -> case authenticate(MM, Service, Secret, ArgC) of ok -> {ok, MM}; Error -> Error end; {MM, Error} -> Error end. %% 客户端连接,连接成功进入 loop 循环,等待消息。 connect(Parent, Host, Port) -> case lib_chan_cs:start_raw_client(Host, Port, 4) of {ok, Socket} -> Parent ! {self(), ok}, lib_chan_mm:loop(Socket, Parent); Error -> Parent ! {self(), Error} end. %% 双方进行认证 authenticate(MM, Service, Secret, ArgC) -> send(MM, {startService, Service, ArgC}), %% 应该会接收到质询、ack或者套接字已关闭的消息 receive {chan, MM, ack} -> ok; {chan, MM, {challenge, C}} -> R = lib_chan_auth:make_response(C, Secret), send(MM, {response, R}), receive {chan, MM, ack} -> ok; {chan, MM, authFail} -> wait_close(MM), {error, authFail}; Other -> {error, Other} end; {chan, MM, badService} -> wait_close(MM), {error, badService}; Other -> {error, Other} end. %% 关闭 wait_close(MM) -> receive {chan_closed, MM} -> true after 5000 -> io:format("**error lib_chan~n"), true end. disconnect(MM) -> close(MM). %% 客户端调用,用来发送请求 rpc(MM, Q) -> send(MM, Q), receive {chan, MM, Reply} -> Reply end. cast(MM, Q) -> send(MM, Q).
lib_chan_cs
-module(lib_chan_cs). %% cs代表client_server -export([start_raw_server/4, start_raw_client/3]). -export([stop/1]). -export([children/1]). %% 建立客户端,发起连接 start_raw_client(Host, Port, PacketLength) -> gen_tcp:connect(Host, Port, [binary, {active, true}, {packet, PacketLength}]). %% 建立服务器,进行配置 start_raw_server(Port, Fun, Max, PacketLength) -> Name = port_name(Port), case whereis(Name) of undefined -> Self = self(), Pid = spawn_link(fun() -> cold_start(Self, Port, Fun, Max, PacketLength) end), receive {Pid, ok} -> register(Name, Pid), {ok, self()}; {Pid, Error} -> Error end; _Pid -> {error, already_started} end. stop(Port) when is_integer(Port) -> Name = port_name(Port), case whereis(Name) of undefined -> not_started; Pid -> exit(Pid, kill), (catch unregister(Name)), stopped end. children(Port) when is_integer(Port) -> port_name(Port) ! {children, self()}, receive {session_server, Reply} ->Reply end. port_name(Port) when is_integer(Port) -> list_to_atom("portServer" ++ integer_to_list(Port)). %% cold_start 做两件事情, %% 一个是 start_accept,另一个是 socket_loop %% start_accept 创建一个新进程去接受连接 cold_start(Master, Port, Fun, Max, PacketLength) -> process_flag(trap_exit, true), %% 现在我们准备好运行了 %% 建立监听套接字 case gen_tcp:listen(Port, [binary, %% {dontroute, true}, {nodelay, true}, {packet, PacketLength}, {reuseaddr, true}, {active, true}]) of {ok, Listen} -> %% io:format("Listening to:~p~n", [Listen]), Master ! {self(), ok}, New = start_accept(Listen, Fun), socket_loop(Listen, New, [], Fun, Max); Error -> Master ! {self(), Error} end. %% socket_loop 是监听套接字的 loop,是服务器运行的 loop %% 用来接受中间人发来的消息 socket_loop(Listen, New, Active, Fun, Max) -> receive {istarted, New} -> Active1 = [New | Active], possibly_start_another(false, Listen, Active1, Fun, Max); {'EXIT', New, _Why} -> possibly_start_another(false, Listen, Active, Fun, Max); {'EXIT', Pid, _Why} -> Active1 = lists:delete(Pid, Active), possibly_start_another(New, Listen, Active1, Fun, Max); {children, From} -> From ! {session_server, Active}, socket_loop(Listen, New, Active, Fun, Max); _Other -> socket_loop(Listen, New, Active, Fun, Max) end. %% 判断连接是否达到最大限制 possibly_start_another(New, Listen, Active, Fun, Max) when is_pid(New) -> socket_loop(Listen, New, Active, Fun, Max); possibly_start_another(false, Listen, Active, Fun, Max) -> case length(Active) of N when N < Max -> New = start_accept(Listen, Fun), socket_loop(Listen, New, Active, Fun, Max); _ -> socket_loop(Listen, false, Active, Fun, Max) end. %% 创建一个新进程去执行接收连接 start_accept(Listen, Fun) -> S = self(), spawn_link(fun() -> start_child(S, Listen, Fun) end). %% 通过 Listen 监听套接字接受一个连接请求 %% Fun 为 lib_chan:start_port_instance %% start_child 也为服务器建立中间人 %% start_child 将 Socket 传递给 lib_chan:start_port_instance 函数后,中间人通过获得的 Socket 进入 lib_chan_mm:loop 循环接受消息。 %% 注意:服务器和客户端只接受中间人的消息。 start_child(Parent, Listen, Fun) -> case gen_tcp:accept(Listen) of {ok, Socket} -> Parent ! {istarted, self()}, inet:setopts(Socket, [{packet, 4}, binary, {nodelay, true}, {active, true}]), process_flag(trap_exit, true), case (catch Fun(Socket)) of {'EXIT', normal} -> true; {'EXIT', Why} -> io:format("Port process dies with exit:~p~n", [Why]), true; _ -> true end end.
lib_chan_mm
%% 负责编码和解码消息 %% 发送和接收消息都要通过中间人进行编码和解码 -module(lib_chan_mm). -export([loop/2, send/2, close/1, controller/2, set_trace/2, trace_with_tag/2]). send(Pid, Term) -> Pid ! {send, Term}. close(Pid) -> Pid ! close. controller(Pid, Pid1) -> Pid ! {setController, Pid1}. set_trace(Pid, X) -> Pid ! {trace, X}. trace_with_tag(Pid, Tag) -> set_trace(Pid, {true, fun(Msg) -> io:format("MM:~p ~p~n", [Tag, Msg]) end}). %% 设置为系统进程后进入 loop()。 loop(Socket, Pid) -> process_flag(trap_exit, true), loop1(Socket, Pid, false). %% loop 是套接字 Socket 和 Erlang 中消息传递的桥梁。 %% 中间人执行 loop 接受消息后进行编码/解码,然后传递给对应的服务器或客户端。 loop1(Socket, Pid, Trace) -> receive {tcp, Socket, Bin} -> Term = binary_to_term(Bin), trace_it(Trace, {socketReceived, Term}), Pid ! {chan, self(), Term}, loop1(Socket, Pid, Trace); {tcp_closed, Socket} -> trace_it(Trace, socketClosed), Pid ! {chan_closed, self()}; {'EXIT', Pid, Why} -> trace_it(Trace, {controllingProcessExit, Why}), gen_tcp:close(Socket); {setController, Pid1} -> trace_it(Trace, {changedController, Pid}), loop1(Socket, Pid1, Trace); {trace, Trace1} -> trace_it(Trace, {setTrace, Trace1}), loop1(Socket, Pid, Trace1); close -> trace_it(Trace, closedByClient), gen_tcp:close(Socket); {send, Term} -> trace_it(Trace, {sendingMessage, Term}), gen_tcp:send(Socket, term_to_binary(Term)), loop1(Socket, Pid, Trace); UUg -> io:format("lib_chan_mm:protocol error:~p~n", [UUg]), loop1(Socket, Pid, Trace) end. trace_it(false, _) -> void; trace_it({true, F}, M) -> F(M).
lib_chan_auth
-module(lib_chan_auth). -export([make_challenge/0, make_response/2, is_response_correct/3]). %% 客户端与服务端进行质询/响应。完成验证才可以通信。 make_challenge() -> random_string(25). make_response(Challenge, Secret) -> % lib_md5:string(Challenge ++ Secret). erlang:md5(Challenge ++ Secret). is_response_correct(Challenge, Response, Secret) -> % case lib_md5:string(Challenge ++ Secret) of case erlang:md5(Challenge ++ Secret) of Response -> true; _ -> false end. random_string(N) -> random_seed(), random_string(N, []). random_string(0, D) -> D; random_string(N, D) -> random_string(N-1, [random:uniform(26)-1 + $a|D]). random_seed() -> {_, _, X} = erlang:now(), {H, M, S} = time(), H1 = H * X rem 32767, M1 = M * X rem 32767, S1 = S * X rem 32767, put(random_seed, {H1,M1,S1}).
相关文章推荐
- [Web前端]梳理-JavaScript.2.变量与量. 3.数据类型
- PAT-PAT (Advanced Level) Practise The Dominant Color (20) (map计数)【二星级】
- iOS开发技巧(autolayout自动布局)
- Entity Framework 学习
- 鼠标事件onmousemove的clientY
- Android内存性能优化
- JavaScript初学笔记
- ssh免密码远程执行sudo命令
- .highcharts 函数
- 鼠标事件onmousemove的clientY
- python array 类型 转换为 list 类型
- 安装linux jdk错误
- Android Studio1.4.x JNI开发基础 - 简单实例
- cmd
- 瞎玩php——初识cookie和session
- 一个i++的教训,多线程问题
- 慕课网3 异常处理 字符串 Math类 Calendar类 data simpleDateFormat类 包装类
- 《Python基础教程第二版》学习笔记(一)第一章 基础知识
- cvCreateFileCapture返回NULL
- java静态和非静态代码块的执行顺序