srs代码学习(6)--如何实现edge
2016-09-06 10:51
513 查看
sre集群的方式有两种一种是forword类型的。一种是edge-origin类型的。后者比前者要复杂的多。就从比较难的开始分析。
有实现edge,首先得在配置文件中做配置。配置方式如下
这个配置里配置了实时模式,边缘模式(remote)
那么这个配置到了代码中是如何生效的呢?
这里面核心的类还是SrsSource,如果服务设置为边缘模式,这个类的角色,经历了从服务到客户端在到服务的转变,具体来看。
首先,如果一个客户端连接上了一个边缘节点,在走到发布了play命令后,服务器会有进入playing()函数,具体如下
在create_consumer函数里,除了创建一个consumer外,服务还会做一个判断,判断服务的模式是否是边缘节点模式,如果是,那么调用边缘节点的对应函数启动边缘节点。
代码如上,控制权转给了play_edge。那么这个play_edge是个什么东西?看看其类图
很简单的一个类,看看源代码
其他代码基本都是这样,它只是一个封装类,真正干活的是一个叫SrsEdgeIngester* ingester的类。看看这个类的结构图
里面有很多变量。有一部分是其上级设定的,比如 _source _edge,这些基本都是有数据给回调上去的。但有一个关键的变量
SrsReusableThread2* pthread
有了这个变量。说明这个类是可以自持的,就是可以自己运行。那么它的循环函数是
真相逐渐明白了,看看真正的循环函数ingest()
看到没,但client收到数据后,交给了process_publish_message()函数。在看看这个函数
看到没,这里把数据这就交个了source。我们上一次已经分析过,在souru拿到数据以后,会直接复制给consumer,而consumer是和客户端相连的,这样就可以把数据的转发就做完了。
从上面的流程可以看出,如果多个客户请求同一个数据源,只有一个source会被创建,也就意味这一个play_edeg.,这样就把流量服务转移到了边缘服务器这边。
有实现edge,首先得在配置文件中做配置。配置方式如下
这个配置里配置了实时模式,边缘模式(remote)
那么这个配置到了代码中是如何生效的呢?
这里面核心的类还是SrsSource,如果服务设置为边缘模式,这个类的角色,经历了从服务到客户端在到服务的转变,具体来看。
首先,如果一个客户端连接上了一个边缘节点,在走到发布了play命令后,服务器会有进入playing()函数,具体如下
int SrsRtmpConn::playing(SrsSource* source) { int ret = ERROR_SUCCESS; // create consumer of souce. SrsConsumer* consumer = NULL; if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) { srs_error("create consumer failed. ret=%d", ret); return ret; }
在create_consumer函数里,除了创建一个consumer外,服务还会做一个判断,判断服务的模式是否是边缘节点模式,如果是,那么调用边缘节点的对应函数启动边缘节点。
int SrsSource::create_consumer(SrsConnection* conn, SrsConsumer*& consumer, bool ds, bool dm, bool dg) {.....
// for edge, when play edge stream, check the state if (_srs_config->get_vhost_is_edge(_req->vhost)) { // notice edge to start for the first client. if ((ret = play_edge->on_client_play()) != ERROR_SUCCESS) { srs_error("notice edge start play stream failed. ret=%d", ret); return ret; } }
代码如上,控制权转给了play_edge。那么这个play_edge是个什么东西?看看其类图
很简单的一个类,看看源代码
int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req) { int ret = ERROR_SUCCESS; if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) { return ret; } return ret; } int SrsPlayEdge::on_client_play() { int ret = ERROR_SUCCESS; // start ingest when init state. if (state == SrsEdgeStateInit) { state = SrsEdgeStatePlay; return ingester->start(); } return ret; }
其他代码基本都是这样,它只是一个封装类,真正干活的是一个叫SrsEdgeIngester* ingester的类。看看这个类的结构图
里面有很多变量。有一部分是其上级设定的,比如 _source _edge,这些基本都是有数据给回调上去的。但有一个关键的变量
SrsReusableThread2* pthread
有了这个变量。说明这个类是可以自持的,就是可以自己运行。那么它的循环函数是
int SrsEdgeIngester::cycle() { int ret = ERROR_SUCCESS; _source->on_source_id_changed(_srs_context->get_id()); std::string ep_server, ep_port; if ((ret = connect_server(ep_server, ep_port)) != ERROR_SUCCESS) { return ret; } srs_assert(client); client->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); client->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); SrsRequest* req = _req; if ((ret = client->handshake()) != ERROR_SUCCESS) { srs_error("handshake with server failed. ret=%d", ret); return ret; } if ((ret = connect_app(ep_server, ep_port)) != ERROR_SUCCESS) { return ret; } if ((ret = client->create_stream(stream_id)) != ERROR_SUCCESS) { srs_error("connect with server failed, stream_id=%d. ret=%d", stream_id, ret); return ret; } if ((ret = client->play(req->stream, stream_id)) != ERROR_SUCCESS) { srs_error("connect with server failed, stream=%s, stream_id=%d. ret=%d", req->stream.c_str(), stream_id, ret); return ret; } if ((ret = _edge->on_ingest_play()) != ERROR_SUCCESS) { return ret; } ret = ingest(); if (srs_is_client_gracefully_close(ret)) { srs_warn("origin disconnected, retry. ret=%d", ret); ret = ERROR_SUCCESS; } return ret; }
真相逐渐明白了,看看真正的循环函数ingest()
int SrsEdgeIngester::ingest() { int ret = ERROR_SUCCESS; client->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US); SrsPithyPrint* pprint = SrsPithyPrint::create_edge(); SrsAutoFree(SrsPithyPrint, pprint); while (!pthread->interrupted()) { pprint->elapse(); // pithy print if (pprint->can_print()) { kbps->sample(); srs_trace("<- "SRS_CONSTS_LOG_EDGE_PLAY " time=%"PRId64", okbps=%d,%d,%d, ikbps=%d,%d,%d", pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(), kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()); } // read from client. SrsCommonMessage* msg = NULL; if ((ret = client->recv_message(&msg)) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret)) { srs_error("pull origin server message failed. ret=%d", ret); } return ret; } srs_verbose("edge loop recv message. ret=%d", ret); srs_assert(msg); SrsAutoFree(SrsCommonMessage, msg); if ((ret = process_publish_message(msg)) != ERROR_SUCCESS) { return ret; } } return ret; }
看到没,但client收到数据后,交给了process_publish_message()函数。在看看这个函数
int SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg) { int ret = ERROR_SUCCESS; SrsSource* source = _source; // process audio packet if (msg->header.is_audio()) { if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) { srs_error("source process audio message failed. ret=%d", ret); return ret; } } // process video packet if (msg->header.is_video()) { if ((ret = source->on_video(msg)) != ERROR_SUCCESS) { srs_error("source process video message failed. ret=%d", ret); return ret; } } // process aggregate packet if (msg->header.is_aggregate()) { if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) { srs_error("source process aggregate message failed. ret=%d", ret); return ret; } return ret; } // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; if ((ret = client->decode_message(msg, &pkt)) != ERROR_SUCCESS) { srs_error("decode onMetaData message failed. ret=%d", ret); return ret; } SrsAutoFree(SrsPacket, pkt); if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt); if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) { srs_error("source process onMetaData message failed. ret=%d", ret); return ret; } srs_info("process onMetaData message success."); return ret; } srs_info("ignore AMF0/AMF3 data message."); return ret; } return ret; }
看到没,这里把数据这就交个了source。我们上一次已经分析过,在souru拿到数据以后,会直接复制给consumer,而consumer是和客户端相连的,这样就可以把数据的转发就做完了。
从上面的流程可以看出,如果多个客户请求同一个数据源,只有一个source会被创建,也就意味这一个play_edeg.,这样就把流量服务转移到了边缘服务器这边。
相关文章推荐
- GPGPU-Sim学习(代码解读)-如何实现Bypass L1D cache
- 【Java学习之旅】——Java如何实现像c#的#region实现代码折叠
- 一起学习水晶报表之拉模式【如何通过代码实现水晶报表显示数据】(课程2)
- .net c# gif动画如何添加图片水印实现思路及代码学习
- 如何利用JS实现对后台CS代码的调用
- 【导读】本文介绍如何利用带进度条的ASP无组件实现断点续传下载,给出详细代码
- 如何实现自动生成在线播放代码?
- 如何用Javascript代码实现浏览器菜单命令
- SAP ABAP/4学习---如何给写好的一个程序分配事务代码,用户可以直接通过事务码来访问程序.或者加入收藏夹(9)
- 《Asp.Net Forums2.0深入分析》之 Asp.Net Forums是如何实现代码分离和换
- 《Asp.Net Forums2.0深入分析》之 Asp.Net Forums是如何实现代码分离和换皮肤的
- 如何 实现代码编辑状态下 安全过滤html标签
- 如何用代码实现锁定计算机
- 如何编写出高效的数据库连接池(附带完整代码C#和Java实现)
- 《www.uml.org.cn》UML与代码学习--利用有限状态机实现FTP 文件传输
- Community Server 2.0 学习笔记:如何实现在线人数?
- 学习如何在 Java 代码中启用和禁用断言
- 如何找到public extern bool Equals(String value)的实现代码?[C#, C++, BCL, CLR]
- .NET Romoting 学习总结(三)—— Remoting技术之设计模式实现代码安全
- 《Asp.Net Forums2.0深入分析》之 Asp.Net Forums是如何实现代码分离和换皮肤