您的位置:首页 > 编程语言

srs代码学习(6)--如何实现edge

2016-09-06 10:51 513 查看
sre集群的方式有两种一种是forword类型的。一种是edge-origin类型的。后者比前者要复杂的多。就从比较难的开始分析。

有实现edge,首先得在配置文件中做配置。配置方式如下



这个配置里配置了实时模式,边缘模式(remote)

那么这个配置到了代码中是如何生效的呢?

这里面核心的类还是SrsSource,如果服务设置为边缘模式,这个类的角色,经历了从服务到客户端在到服务的转变,具体来看。

首先,如果一个客户端连接上了一个边缘节点,在走到发布了play命令后,服务器会有进入playing()函数,具体如下

int SrsRtmpConn::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;

// create consumer of souce.
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
srs_error("create consumer failed. ret=%d", ret);
return ret;
}


在create_consumer函数里,除了创建一个consumer外,服务还会做一个判断,判断服务的模式是否是边缘节点模式,如果是,那么调用边缘节点的对应函数启动边缘节点。

int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg)
{
.....

// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(_req->vhost)) {
// notice edge to start for the first client.
if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) {
srs_error("notice edge start play stream failed. ret=%d", ret);
return ret;
}
}


代码如上,控制权转给了play_edge。那么这个play_edge是个什么东西?看看其类图



很简单的一个类,看看源代码

int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
{
int ret = ERROR_SUCCESS;

if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) {
return ret;
}

return ret;
}

int SrsPlayEdge::on_client_play()
{
int ret = ERROR_SUCCESS;

// start ingest when init state.
if (state == SrsEdgeStateInit) {
state = SrsEdgeStatePlay;
return ingester->start();
}

return ret;
}


其他代码基本都是这样,它只是一个封装类,真正干活的是一个叫SrsEdgeIngester* ingester的类。看看这个类的结构图



里面有很多变量。有一部分是其上级设定的,比如 _source _edge,这些基本都是有数据给回调上去的。但有一个关键的变量

 SrsReusableThread2* pthread

有了这个变量。说明这个类是可以自持的,就是可以自己运行。那么它的循环函数是

int SrsEdgeIngester::cycle()
{
int ret = ERROR_SUCCESS;

_source->on_source_id_changed(_srs_context->get_id());

std::string ep_server, ep_port;
if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) {
return ret;
}
srs_assert(client);

client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

SrsRequest* req = _req;

if ((ret = client->handshake()) != ERROR_SUCCESS) {
srs_error("handshake with server failed. ret=%d", ret);
return ret;
}
if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret);
return ret;
}

if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) {
srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d",
req->stream.c_str(), stream_id, ret);
return ret;
}

if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) {
return ret;
}

ret = ingest();
if (srs_is_client_gracefully_close(ret)) {
srs_warn("origin disconnected, retry. ret=%d", ret);
ret = ERROR_SUCCESS;
}

return ret;
}


真相逐渐明白了,看看真正的循环函数ingest()

int SrsEdgeIngester::ingest()
{
int ret = ERROR_SUCCESS;

client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);

SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);

while (!pthread->interrupted()) {
pprint->elapse();

// pithy print
if (pprint->can_print()) {
kbps->sample();
srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY
" time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d",
pprint->age(),
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
}

// read from client.
SrsCommonMessage* msg = NULL;
if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("pull origin server message failed. ret=%d", ret);
}
return ret;
}
srs_verbose("edge loop recv message. ret=%d", ret);

srs_assert(msg);
SrsAutoFree(SrsCommonMessage, msg);

if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) {
return ret;
}
}

return ret;
}


看到没,但client收到数据后,交给了process_publish_message()函数。在看看这个函数

int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;

SrsSource* source = _source;

// process audio packet
if (msg->header.is_audio()) {
if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("source process audio message failed. ret=%d", ret);
return ret;
}
}

// process video packet
if (msg->header.is_video()) {
if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
srs_error("source process video message failed. ret=%d", ret);
return ret;
}
}

// process aggregate packet
if (msg->header.is_aggregate()) {
if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
srs_error("source process aggregate message failed. ret=%d", ret);
return ret;
}
return ret;
}

// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt);

if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
}
srs_info("process onMetaData message success.");
return ret;
}

srs_info("ignore AMF0/AMF3 data message.");
return ret;
}

return ret;
}


看到没,这里把数据这就交个了source。我们上一次已经分析过,在souru拿到数据以后,会直接复制给consumer,而consumer是和客户端相连的,这样就可以把数据的转发就做完了。

从上面的流程可以看出,如果多个客户请求同一个数据源,只有一个source会被创建,也就意味这一个play_edeg.,这样就把流量服务转移到了边缘服务器这边。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐