您的位置:首页 > 其它

不同行情分发节点间同步合约

2014-05-30 16:57 225 查看
h2. 需求

* 选择一个行情分发节点,并点击"同步到其他节点",并选择对应节点(可多选),并点击“确定”

h2. 方案

* market_auth节点

* http_instrument 进程将同步合约到其他节点的请求消息转发到进程 instrument_handler(gen_server)

* instrument_handler 进程从源行情分发节点获得合约数据,通过TCP连接到MarketDispatcher逐个更新到目标行情分发节点(更新合约数据包)

* instrument_handler 同时处理market_dispatcher发送过来的响应

* http_md_stream 将请求结果通过websocket返回到浏览器

* market_dispatcher节点

* subscribe_handler 进程将更新合约请求发送到market_hub对应进程

* market_hub节点

* instrument_handler 进程将要更新的合约信息写到数据库,写入数据库成功后更新对应context

h2. 实现

* market_auth节点

* http请求 PUT /md/:id/sync_instruments params: [2, 4, ..., MdDest] 目标行情分发节点

* 响应(成功): [?SYNC_INSTRUMENTS, {ok : MdDest}}]

* 响应(失败): [?SYNC_INSTRUMENTS, {error : [MdDest, Reason]}}]

* market_auth/http_instrument.erl 将同步合约到其他节点的请求消息转发到instrument_handler(gen_server)

* instrument_handler:sync(MdSrc, MdDestList)

* market_auth/instrument_handler(gen_server)

* instrument_handler:sync 从源行情分发节点获得合约数据,通过TCP连接到MarketDispatcher逐个更新到目标行情分发节点(更新合约数据包)

<pre>

sync(MdSrc, MdDestList) ->

case get_instrument(MdSrc) of

{ok, Instruments} ->

lists:foreach(fun(MdDest) ->

copy_to(Instruments, MdDest)

end, MdDestList);

{error, Reason} ->

{error, Reason}

end.

copy_to(Instruments, MdDes) -> %通过TCP连接到MarketDispatcher,

根据MdDes获取host和port

ReqId = list_to_binary(uuid:to_string(uuid:random()))

Socket = gen_tcp:connect(host,port)

Packet = packet:encode(?SYNC_INSTRUMENTS, ?PLAIN, [ReqId, [0, 0, 0], [MdDes, Instruments]]),

gen_tcp:send(Socket, Packet)

</pre>

* handle_info 处理market_dispatcher发送过来的TCP响应(发送给websocket)

解包

根据ReqId找到CallbackId,通过CallbackId找到websocket的Pid,将响应发送到浏览器.

* market_dispatcher节点

* subscribe_handler 进程将更新合约请求发送到market_hub对应进程

<pre>

Reply = gen_util:rpc_call(MarketHub, instrument_handler, sync, [MdDes, Instruments])

Packet = packet:encode([?SYNC_INSTRUMENTS, ?PLAIN, Reply]),

gen_tcp:send(Socket, Packet)

</pre>

* market_hub节点

* instrument_handler 进程将要更新的合约信息写到数据库,写入数据库成功后更新对应context

返回值为[?SYNC_INSTRUMENTS, {ok : MdDest}}]

或[?SYNC_INSTRUMENTS, {error : [MdDest, Reason]}}]

<pre>

sync([MdDes, Instruments]) ->

Fun = fun() ->

lists:foreach(fun(Instrument) ->

[Code, Id, Status, Trading] = Instrument,

case mnesia:read({instrument, Code}) of

[] ->

mnesia:write(#instrument{code = Code, id = Id, status = Status, is_trading = IsTrading});

[Instrument] ->

mnesia:write(Instrument#instrument{status = Status, is_trading = IsTrading})

end

end, Instruments)

end,

Reply = case mnesia:transaction(Fun) of

{atomic, _} ->

update_subscribe_context(Subs, UnSubs),%Subs:status为sub的code列表,UnSubs:status为unsub的code列表

instrument_subscriber:subscribe_instruments(Subs),

lists:foreach(fun(#instrument{code=Code}=Instrument) ->

market_hub_context:update(instrument, {Code, Instrument})

end, Instruments)

[?RESP_OK, MdDes];

{aborted, Reason} ->

[?RESP_FAILED, [MdDes,Reason]]

end.

</pre>

* 同步合约响应的处理

* 发起同步合约请求时, 遍历目标节点,将同步请求发送到Market_dispatcher节点.同时将所有ReqId记录下来.存在两个orddict中

<pre>

req_holder = orddict:new() % key:ReqId, value:Pid

inverted_req_holder = orddict:new() % key:Pid, value:[ReqId1, ReqId2, ReqId3, ...]

</pre>

* 若目标节点可以连接上,则向目标节点发送请求,请求的响应回来后,删除两个orddict中的记录,并将响应发送到浏览器

* 若目标节点连接不上,则直接给浏览器发送 {?SYNC_INSTRUMENTS, {error, [MdDest, Reason]},并删除两个orddict中的记录

<pre>

同步源节点到目标节点

copy_to(Instruments, MdDest, ReqId, Pid) ->

根据MdDest获取Host和Port,

case gen_tcp:connect(binary_to_list(Host), Port, [binary, {packet, 4}]) of

{ok, Socket} ->

连接上,发送请求,等待响应,

gen_tcp:send(Socket, Packet);

{error, Reason} ->

没有连接上目标节点,直接向浏览器发送消息

Pid ! {?SYNC_INSTRUMENTS, {error, [MdDest, Reason]}},

gen_server:cast(?SERVER, {sync_error, ReqId})

end.

handle_info({tcp, Socket, Bin}, #state{} = State) ->

关闭socket,

解包,

{ok, [?SYNC_INSTRUMENTS, ReqId, Reply], []} ->

向浏览器发送消息,并删除两个orddict中的记录,

根据inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭

handle_cast({sync_error, ReqId}, #state{} = State) ->

删除两个orddict中的记录,

根据inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭

</pre>

* 最后判断inverted_req_holder中Pid对应value是否为空,为空则将websocket关闭;不为空则不关闭

h2. 问题

* 打开两个或多个合约管理页面只有一个收到websocket信息

h2. 原因

* websocket进程将模块名http_md_stream注册为进程名

* 向websocket发送消息时用whereis(http_md_stream)!Msg. 所以多个页面只有一个能收到websocket信息.

h2. 解决方案

* 浏览器发出请求时,带上CallbackId(通过uuid生成)

* 服务器接受请求时, 通过uuid生成ReqId,ReqId和CallbackId映射,CallbackId和Pid映射.然后对请求进行处理.

* 服务器处理完请求后,通过以上映射找到对应Pid.通过Pid ! Msg将响应发送回去(带上CallbackId)

h2. 详细实现

* 浏览器发出订阅websocket请求: CallbackId 由 Ext.data.UuidGenerator生成, url: ws://localhost/md/stream/:callbackid

浏览器发出合约操作请求: HTTP Get /md/:id/instruments.json?callbackid=CallbackId

* market_auth/http_md_stream (创建websocket)

* 在websocket_init中调用instrument_handler:reg_ws/2, 将CallbackId和websocket的Pid映射起来

<pre>

{CallbackId, Req0} = cowboy_req:binding(callbackid, Req),

instrument_handler:reg_callback(CallbackId, self()).

</pre>

* market_auth/http_instruments:接受http请求,调用instrument_handler中的接口

<pre>

get_json(Req, State) ->

{PathInfo, Req0} = cowboy_req:path_info(Req),

Resp = case PathInfo of

[Md, <<"instruments.json">>, CallbackId] ->

ReqId = list_to_binary(uuid:to_string(uuid:srandom())),

instrument_handler:instruments(Md, ReqId, CallbackId).

end.

</pre>

* market_auth/instrument_handler(gen_server):

* init中创建callbacks和reqids两个空表.

<pre>

init([]) ->

{ok, #state{callbacks = orddict:new(), % {CallbackId, Pid}

reqids = orddict:new()} }. % {ReqId, CallbackId}

</pre>

* 新增reg_callback和unreg_callback接口

<pre>

reg_callback(CallbackId, Pid) ->

gen_server:cast(?SERVER, {reg_callback, CallbackId, Pid}).

unreg_callback(CallbackId) ->

gen_server:cast(?SERVER, {unreg_callback, CallbackId}).

handle_cast({reg_callback, CallbackId, Pid}, #state{callbacks = Callbacks} = State) ->

NCallbacks = orddict:store(CallbackId, Pid, Callbacks),

{noreply, State#state{callbacks= Callbacks}}.

</pre>

* 在发送TCP请求前将ReqId和CallbackId映射起来.

<pre>

instruments(Md, ReqId, CallbackId) ->

gen_server:cast(?SERVER, {instruments, Md, ReqId, CallbackId}),

{ok, ok}.

handle_cast({instruments, Md, ReqId, CallbackId}, #state{reqids = ReqIds} = State) -> ?

NReqIds = orddict:store(ReqId, CallbackId, ReqIds),

发送TCP请求

{noreply, State#state{reqids = NReqIds}}

</pre>

* 接收TCP响应后,通过响应中的ReqId获取Pid,将响应发送回去

<pre>

handle_info({tcp, Socket, Bin}, #state{reqids = ReqIds} = State) ->

解析包

{ok, [?INSTRUMENTS, [ReqId, [?RESP_OK, InstrumentList]]], []} ->

根据ReqId获取Pid

Pid ! InstrumentList

NReqIds = orddict:erase(ReqId, ReqIds),

{noreply, State#state{reqids = NReqIds}}

</pre>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐