不同行情分发节点间同步合约
2014-05-30 16:57
225 查看
h2. 需求
* 选择一个行情分发节点,并点击"同步到其他节点",并选择对应节点(可多选),并点击“确定”
h2. 方案
* market_auth节点
* http_instrument 进程将同步合约到其他节点的请求消息转发到进程 instrument_handler(gen_server)
* instrument_handler 进程从源行情分发节点获得合约数据,通过TCP连接到MarketDispatcher逐个更新到目标行情分发节点(更新合约数据包)
* instrument_handler 同时处理market_dispatcher发送过来的响应
* http_md_stream 将请求结果通过websocket返回到浏览器
* market_dispatcher节点
* subscribe_handler 进程将更新合约请求发送到market_hub对应进程
* market_hub节点
* instrument_handler 进程将要更新的合约信息写到数据库,写入数据库成功后更新对应context
h2. 实现
* market_auth节点
* http请求 PUT /md/:id/sync_instruments params: [2, 4, ..., MdDest] 目标行情分发节点
* 响应(成功): [?SYNC_INSTRUMENTS, {ok : MdDest}}]
* 响应(失败): [?SYNC_INSTRUMENTS, {error : [MdDest, Reason]}}]
* market_auth/http_instrument.erl 将同步合约到其他节点的请求消息转发到instrument_handler(gen_server)
* instrument_handler:sync(MdSrc, MdDestList)
* market_auth/instrument_handler(gen_server)
* instrument_handler:sync 从源行情分发节点获得合约数据,通过TCP连接到MarketDispatcher逐个更新到目标行情分发节点(更新合约数据包)
<pre>
sync(MdSrc, MdDestList) ->
case get_instrument(MdSrc) of
{ok, Instruments} ->
lists:foreach(fun(MdDest) ->
copy_to(Instruments, MdDest)
end, MdDestList);
{error, Reason} ->
{error, Reason}
end.
copy_to(Instruments, MdDes) -> %通过TCP连接到MarketDispatcher,
根据MdDes获取host和port
ReqId = list_to_binary(uuid:to_string(uuid:random()))
Socket = gen_tcp:connect(host,port)
Packet = packet:encode(?SYNC_INSTRUMENTS, ?PLAIN, [ReqId, [0, 0, 0], [MdDes, Instruments]]),
gen_tcp:send(Socket, Packet)
</pre>
* handle_info 处理market_dispatcher发送过来的TCP响应(发送给websocket)
解包
根据ReqId找到CallbackId,通过CallbackId找到websocket的Pid,将响应发送到浏览器.
* market_dispatcher节点
* subscribe_handler 进程将更新合约请求发送到market_hub对应进程
<pre>
Reply = gen_util:rpc_call(MarketHub, instrument_handler, sync, [MdDes, Instruments])
Packet = packet:encode([?SYNC_INSTRUMENTS, ?PLAIN, Reply]),
gen_tcp:send(Socket, Packet)
</pre>
* market_hub节点
* instrument_handler 进程将要更新的合约信息写到数据库,写入数据库成功后更新对应context
返回值为[?SYNC_INSTRUMENTS, {ok : MdDest}}]
或[?SYNC_INSTRUMENTS, {error : [MdDest, Reason]}}]
<pre>
sync([MdDes, Instruments]) ->
Fun = fun() ->
lists:foreach(fun(Instrument) ->
[Code, Id, Status, Trading] = Instrument,
case mnesia:read({instrument, Code}) of
[] ->
mnesia:write(#instrument{code = Code, id = Id, status = Status, is_trading = IsTrading});
[Instrument] ->
mnesia:write(Instrument#instrument{status = Status, is_trading = IsTrading})
end
end, Instruments)
end,
Reply = case mnesia:transaction(Fun) of
{atomic, _} ->
update_subscribe_context(Subs, UnSubs),%Subs:status为sub的code列表,UnSubs:status为unsub的code列表
instrument_subscriber:subscribe_instruments(Subs),
lists:foreach(fun(#instrument{code=Code}=Instrument) ->
market_hub_context:update(instrument, {Code, Instrument})
end, Instruments)
[?RESP_OK, MdDes];
{aborted, Reason} ->
[?RESP_FAILED, [MdDes,Reason]]
end.
</pre>
* 同步合约响应的处理
* 发起同步合约请求时, 遍历目标节点,将同步请求发送到Market_dispatcher节点.同时将所有ReqId记录下来.存在两个orddict中
<pre>
req_holder = orddict:new() % key:ReqId, value:Pid
inverted_req_holder = orddict:new() % key:Pid, value:[ReqId1, ReqId2, ReqId3, ...]
</pre>
* 若目标节点可以连接上,则向目标节点发送请求,请求的响应回来后,删除两个orddict中的记录,并将响应发送到浏览器
* 若目标节点连接不上,则直接给浏览器发送 {?SYNC_INSTRUMENTS, {error, [MdDest, Reason]},并删除两个orddict中的记录
<pre>
同步源节点到目标节点
copy_to(Instruments, MdDest, ReqId, Pid) ->
根据MdDest获取Host和Port,
case gen_tcp:connect(binary_to_list(Host), Port, [binary, {packet, 4}]) of
{ok, Socket} ->
连接上,发送请求,等待响应,
gen_tcp:send(Socket, Packet);
{error, Reason} ->
没有连接上目标节点,直接向浏览器发送消息
Pid ! {?SYNC_INSTRUMENTS, {error, [MdDest, Reason]}},
gen_server:cast(?SERVER, {sync_error, ReqId})
end.
handle_info({tcp, Socket, Bin}, #state{} = State) ->
关闭socket,
解包,
{ok, [?SYNC_INSTRUMENTS, ReqId, Reply], []} ->
向浏览器发送消息,并删除两个orddict中的记录,
根据inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭
handle_cast({sync_error, ReqId}, #state{} = State) ->
删除两个orddict中的记录,
根据inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭
</pre>
* 最后判断inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭
h2. 问题
* 打开两个或多个合约管理页面只有一个收到websocket信息
h2. 原因
* websocket进程将模块名http_md_stream注册为进程名
* 向websocket发送消息时用whereis(http_md_stream)!Msg. 所以多个页面只有一个能收到websocket信息.
h2. 解决方案
* 浏览器发出请求时,带上CallbackId(通过uuid生成)
* 服务器接受请求时, 通过uuid生成ReqId,ReqId和CallbackId映射,CallbackId和Pid映射.然后对请求进行处理.
* 服务器处理完请求后,通过以上映射找到对应Pid.通过Pid ! Msg将响应发送回去(带上CallbackId)
h2. 详细实现
* 浏览器发出订阅websocket请求: CallbackId 由 Ext.data.UuidGenerator生成, url: ws://localhost/md/stream/:callbackid
浏览器发出合约操作请求: HTTP Get /md/:id/instruments.json?callbackid=CallbackId
* market_auth/http_md_stream (创建websocket)
* 在websocket_init中调用instrument_handler:reg_ws/2, 将CallbackId和websocket的Pid映射起来
<pre>
{CallbackId, Req0} = cowboy_req:binding(callbackid, Req),
instrument_handler:reg_callback(CallbackId, self()).
</pre>
* market_auth/http_instruments:接受http请求,调用instrument_handler中的接口
<pre>
get_json(Req, State) ->
{PathInfo, Req0} = cowboy_req:path_info(Req),
Resp = case PathInfo of
[Md, <<"instruments.json">>, CallbackId] ->
ReqId = list_to_binary(uuid:to_string(uuid:srandom())),
instrument_handler:instruments(Md, ReqId, CallbackId).
end.
</pre>
* market_auth/instrument_handler(gen_server):
* init中创建callbacks和reqids两个空表.
<pre>
init([]) ->
{ok, #state{callbacks = orddict:new(), % {CallbackId, Pid}
reqids = orddict:new()} }. % {ReqId, CallbackId}
</pre>
* 新增reg_callback和unreg_callback接口
<pre>
reg_callback(CallbackId, Pid) ->
gen_server:cast(?SERVER, {reg_callback, CallbackId, Pid}).
unreg_callback(CallbackId) ->
gen_server:cast(?SERVER, {unreg_callback, CallbackId}).
handle_cast({reg_callback, CallbackId, Pid}, #state{callbacks = Callbacks} = State) ->
NCallbacks = orddict:store(CallbackId, Pid, Callbacks),
{noreply, State#state{callbacks= Callbacks}}.
</pre>
* 在发送TCP请求前将ReqId和CallbackId映射起来.
<pre>
instruments(Md, ReqId, CallbackId) ->
gen_server:cast(?SERVER, {instruments, Md, ReqId, CallbackId}),
{ok, ok}.
handle_cast({instruments, Md, ReqId, CallbackId}, #state{reqids = ReqIds} = State) -> ?
NReqIds = orddict:store(ReqId, CallbackId, ReqIds),
发送TCP请求
{noreply, State#state{reqids = NReqIds}}
</pre>
* 接收TCP响应后,通过响应中的ReqId获取Pid,将响应发送回去
<pre>
handle_info({tcp, Socket, Bin}, #state{reqids = ReqIds} = State) ->
解析包
{ok, [?INSTRUMENTS, [ReqId, [?RESP_OK, InstrumentList]]], []} ->
根据ReqId获取Pid
Pid ! InstrumentList
NReqIds = orddict:erase(ReqId, ReqIds),
{noreply, State#state{reqids = NReqIds}}
</pre>
* 选择一个行情分发节点,并点击"同步到其他节点",并选择对应节点(可多选),并点击“确定”
h2. 方案
* market_auth节点
* http_instrument 进程将同步合约到其他节点的请求消息转发到进程 instrument_handler(gen_server)
* instrument_handler 进程从源行情分发节点获得合约数据,通过TCP连接到MarketDispatcher逐个更新到目标行情分发节点(更新合约数据包)
* instrument_handler 同时处理market_dispatcher发送过来的响应
* http_md_stream 将请求结果通过websocket返回到浏览器
* market_dispatcher节点
* subscribe_handler 进程将更新合约请求发送到market_hub对应进程
* market_hub节点
* instrument_handler 进程将要更新的合约信息写到数据库,写入数据库成功后更新对应context
h2. 实现
* market_auth节点
* http请求 PUT /md/:id/sync_instruments params: [2, 4, ..., MdDest] 目标行情分发节点
* 响应(成功): [?SYNC_INSTRUMENTS, {ok : MdDest}}]
* 响应(失败): [?SYNC_INSTRUMENTS, {error : [MdDest, Reason]}}]
* market_auth/http_instrument.erl 将同步合约到其他节点的请求消息转发到instrument_handler(gen_server)
* instrument_handler:sync(MdSrc, MdDestList)
* market_auth/instrument_handler(gen_server)
* instrument_handler:sync 从源行情分发节点获得合约数据,通过TCP连接到MarketDispatcher逐个更新到目标行情分发节点(更新合约数据包)
<pre>
sync(MdSrc, MdDestList) ->
case get_instrument(MdSrc) of
{ok, Instruments} ->
lists:foreach(fun(MdDest) ->
copy_to(Instruments, MdDest)
end, MdDestList);
{error, Reason} ->
{error, Reason}
end.
copy_to(Instruments, MdDes) -> %通过TCP连接到MarketDispatcher,
根据MdDes获取host和port
ReqId = list_to_binary(uuid:to_string(uuid:random()))
Socket = gen_tcp:connect(host,port)
Packet = packet:encode(?SYNC_INSTRUMENTS, ?PLAIN, [ReqId, [0, 0, 0], [MdDes, Instruments]]),
gen_tcp:send(Socket, Packet)
</pre>
* handle_info 处理market_dispatcher发送过来的TCP响应(发送给websocket)
解包
根据ReqId找到CallbackId,通过CallbackId找到websocket的Pid,将响应发送到浏览器.
* market_dispatcher节点
* subscribe_handler 进程将更新合约请求发送到market_hub对应进程
<pre>
Reply = gen_util:rpc_call(MarketHub, instrument_handler, sync, [MdDes, Instruments])
Packet = packet:encode([?SYNC_INSTRUMENTS, ?PLAIN, Reply]),
gen_tcp:send(Socket, Packet)
</pre>
* market_hub节点
* instrument_handler 进程将要更新的合约信息写到数据库,写入数据库成功后更新对应context
返回值为[?SYNC_INSTRUMENTS, {ok : MdDest}}]
或[?SYNC_INSTRUMENTS, {error : [MdDest, Reason]}}]
<pre>
sync([MdDes, Instruments]) ->
Fun = fun() ->
lists:foreach(fun(Instrument) ->
[Code, Id, Status, Trading] = Instrument,
case mnesia:read({instrument, Code}) of
[] ->
mnesia:write(#instrument{code = Code, id = Id, status = Status, is_trading = IsTrading});
[Instrument] ->
mnesia:write(Instrument#instrument{status = Status, is_trading = IsTrading})
end
end, Instruments)
end,
Reply = case mnesia:transaction(Fun) of
{atomic, _} ->
update_subscribe_context(Subs, UnSubs),%Subs:status为sub的code列表,UnSubs:status为unsub的code列表
instrument_subscriber:subscribe_instruments(Subs),
lists:foreach(fun(#instrument{code=Code}=Instrument) ->
market_hub_context:update(instrument, {Code, Instrument})
end, Instruments)
[?RESP_OK, MdDes];
{aborted, Reason} ->
[?RESP_FAILED, [MdDes,Reason]]
end.
</pre>
* 同步合约响应的处理
* 发起同步合约请求时, 遍历目标节点,将同步请求发送到Market_dispatcher节点.同时将所有ReqId记录下来.存在两个orddict中
<pre>
req_holder = orddict:new() % key:ReqId, value:Pid
inverted_req_holder = orddict:new() % key:Pid, value:[ReqId1, ReqId2, ReqId3, ...]
</pre>
* 若目标节点可以连接上,则向目标节点发送请求,请求的响应回来后,删除两个orddict中的记录,并将响应发送到浏览器
* 若目标节点连接不上,则直接给浏览器发送 {?SYNC_INSTRUMENTS, {error, [MdDest, Reason]},并删除两个orddict中的记录
<pre>
同步源节点到目标节点
copy_to(Instruments, MdDest, ReqId, Pid) ->
根据MdDest获取Host和Port,
case gen_tcp:connect(binary_to_list(Host), Port, [binary, {packet, 4}]) of
{ok, Socket} ->
连接上,发送请求,等待响应,
gen_tcp:send(Socket, Packet);
{error, Reason} ->
没有连接上目标节点,直接向浏览器发送消息
Pid ! {?SYNC_INSTRUMENTS, {error, [MdDest, Reason]}},
gen_server:cast(?SERVER, {sync_error, ReqId})
end.
handle_info({tcp, Socket, Bin}, #state{} = State) ->
关闭socket,
解包,
{ok, [?SYNC_INSTRUMENTS, ReqId, Reply], []} ->
向浏览器发送消息,并删除两个orddict中的记录,
根据inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭
handle_cast({sync_error, ReqId}, #state{} = State) ->
删除两个orddict中的记录,
根据inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭
</pre>
* 最后判断inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭
h2. 问题
* 打开两个或多个合约管理页面只有一个收到websocket信息
h2. 原因
* websocket进程将模块名http_md_stream注册为进程名
* 向websocket发送消息时用whereis(http_md_stream)!Msg. 所以多个页面只有一个能收到websocket信息.
h2. 解决方案
* 浏览器发出请求时,带上CallbackId(通过uuid生成)
* 服务器接受请求时, 通过uuid生成ReqId,ReqId和CallbackId映射,CallbackId和Pid映射.然后对请求进行处理.
* 服务器处理完请求后,通过以上映射找到对应Pid.通过Pid ! Msg将响应发送回去(带上CallbackId)
h2. 详细实现
* 浏览器发出订阅websocket请求: CallbackId 由 Ext.data.UuidGenerator生成, url: ws://localhost/md/stream/:callbackid
浏览器发出合约操作请求: HTTP Get /md/:id/instruments.json?callbackid=CallbackId
* market_auth/http_md_stream (创建websocket)
* 在websocket_init中调用instrument_handler:reg_ws/2, 将CallbackId和websocket的Pid映射起来
<pre>
{CallbackId, Req0} = cowboy_req:binding(callbackid, Req),
instrument_handler:reg_callback(CallbackId, self()).
</pre>
* market_auth/http_instruments:接受http请求,调用instrument_handler中的接口
<pre>
get_json(Req, State) ->
{PathInfo, Req0} = cowboy_req:path_info(Req),
Resp = case PathInfo of
[Md, <<"instruments.json">>, CallbackId] ->
ReqId = list_to_binary(uuid:to_string(uuid:srandom())),
instrument_handler:instruments(Md, ReqId, CallbackId).
end.
</pre>
* market_auth/instrument_handler(gen_server):
* init中创建callbacks和reqids两个空表.
<pre>
init([]) ->
{ok, #state{callbacks = orddict:new(), % {CallbackId, Pid}
reqids = orddict:new()} }. % {ReqId, CallbackId}
</pre>
* 新增reg_callback和unreg_callback接口
<pre>
reg_callback(CallbackId, Pid) ->
gen_server:cast(?SERVER, {reg_callback, CallbackId, Pid}).
unreg_callback(CallbackId) ->
gen_server:cast(?SERVER, {unreg_callback, CallbackId}).
handle_cast({reg_callback, CallbackId, Pid}, #state{callbacks = Callbacks} = State) ->
NCallbacks = orddict:store(CallbackId, Pid, Callbacks),
{noreply, State#state{callbacks= Callbacks}}.
</pre>
* 在发送TCP请求前将ReqId和CallbackId映射起来.
<pre>
instruments(Md, ReqId, CallbackId) ->
gen_server:cast(?SERVER, {instruments, Md, ReqId, CallbackId}),
{ok, ok}.
handle_cast({instruments, Md, ReqId, CallbackId}, #state{reqids = ReqIds} = State) -> ?
NReqIds = orddict:store(ReqId, CallbackId, ReqIds),
发送TCP请求
{noreply, State#state{reqids = NReqIds}}
</pre>
* 接收TCP响应后,通过响应中的ReqId获取Pid,将响应发送回去
<pre>
handle_info({tcp, Socket, Bin}, #state{reqids = ReqIds} = State) ->
解析包
{ok, [?INSTRUMENTS, [ReqId, [?RESP_OK, InstrumentList]]], []} ->
根据ReqId获取Pid
Pid ! InstrumentList
NReqIds = orddict:erase(ReqId, ReqIds),
{noreply, State#state{reqids = NReqIds}}
</pre>
相关文章推荐
- 行情分发节点信息统计(登陆人数, 合约数)
- 获得指定行情分发节点的合约列表和订阅情况
- 用 sersync 实现多个不同目录向多个节点实时同步
- 行情分发节点列表
- 通过nginx_lua实现根据请求参数分发道不同后端节点
- 同步选中,展开和闭合具有不同TreeModel的JTree中的节点
- 推荐一个很好的网盘,可以在不同电脑间同步文件!
- windows 2008 R2搭建NLB web负载均衡及使用分布式文件系统(DFS)同步节点文件
- Redis从节点和主节点中key的数量不同是为什么
- 在SWIFT中应该采用什么样的方式来定义这个链表中的节点呢?附Class和Struct的不同
- mongodb 2.4 不同server节点的replica set 搭建过程(一)
- Drupal不同节点类型使用不同模板
- Web时代的服务分发:微信和YunOS选择不同的路径
- Mysql主从节点不同步解决方法
- 树型控件中目录与子节点检查框同步选择
- foxmail或者outlook 不同机器同步接收邮件
- 黑马程序员_学习日记61_707Dom3(节点操作、不同浏览器的差异)
- 前端控制器是整个MVC框架中最为核心的一块,它主要用来拦截符合要求的外部请求,并把请求分发到不同的控制器去处理,根据控制器处理后的结果,生成相应的响应发送到客户端。前端控制器既可以使用Filter实现(Struts2采用这种方式),也可以使用Servlet来实现(spring MVC框架)。
- MySQL的3节点主从同步复制方案
- expect脚本同步文件,构建文件分发系统,批量远程执行命令