社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
srs在完成rtmp信令交互协议后,客户端就会将视频及音频数据打包成RTMP流,发送给客户端。在srs中,最终是在stream_service_cycle中处理,它的代码写得比较隐蔽,在stream_service_cycle中,先判断用户是推流还是拉流,然后如果是拉流,则会调用publishing处理:
int SrsRtmpConn::stream_service_cycle() {
SrsRtmpConnType type;
if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("identify client failed. ret=%d", ret);
}
return ret;
}
...
//一些校验工作
switch (type) {
...
case SrsRtmpConnFMLEPublish: {
srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to publish stream failed. ret=%d", ret);
return ret;
}
return publishing(source);
}
...
}
}
而publishing中,会创建一个SrsPublishRecvThread:
if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread trd(rtmp, req,
st_netfd_fileno(stfd), 0, this, source,
client_type != SrsRtmpConnFlashPublish,
vhost_is_edge);
srs_info("start to publish stream %s success", req->stream.c_str());
ret = do_publishing(source, &trd);
// stop isolate recv thread
trd.stop();
}
在do_publishing中启动协程,而SrsPublishRecvThread中带了一个SrsRecvThread,该协程专门负责接收消息,并传递给注册的handler处理:
int SrsRecvThread::cycle()
{
int ret = ERROR_SUCCESS;
while (!trd->interrupted()) {
if (!handler->can_handle()) {
st_usleep(timeout * 1000);
continue;
}
SrsCommonMessage* msg = NULL;
// recv and handle message
ret = rtmp->recv_message(&msg);
if (ret == ERROR_SUCCESS) {
ret = handler->handle(msg);
}
if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_error("thread process message failed. ret=%d", ret);
}
// we use no timeout to recv, should never got any error.
trd->interrupt();
// notice the handler got a recv error.
handler->on_recv_error(ret);
return ret;
}
srs_verbose("thread loop recv message. ret=%d", ret);
}
return ret;
}
这里的handler就是SrsPublishRecvThread:
int SrsPublishRecvThread::handle(SrsCommonMessage* msg)
{
int ret = ERROR_SUCCESS;
// when cid changed, change it.
if (ncid != cid) {
_srs_context->set_id(ncid);
cid = ncid;
}
_nb_msgs++;
if (msg->header.is_video()) {
video_frames++;
}
// log to show the time of recv thread.
srs_verbose("recv thread now=%"PRId64"us, got msg time=%"PRId64"ms, size=%d",
srs_update_system_time_ms(), msg->header.timestamp, msg->size);
// the rtmp connection will handle this message
ret = _conn->handle_publish_message(_source, msg, _is_fmle, _is_edge);
// must always free it,
// the source will copy it if need to use.
srs_freep(msg);
return ret;
}
其处理方式是交由SrsRtmpConn对象处理:
int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
// process publish event.
if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("fmle decode unpublish message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt);
// for flash, any packet is republish.
if (!is_fmle) {
// flash unpublish.
// TODO: maybe need to support republish.
srs_trace("flash flash publish finished.");
return ERROR_CONTROL_REPUBLISH;
}
// for fmle, drop others except the fmle start packet.
if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
if ((ret = rtmp->fmle_unpublish(res->stream_id, unpublish->transaction_id)) != ERROR_SUCCESS) {
return ret;
}
return ERROR_CONTROL_REPUBLISH;
}
srs_trace("fmle ignore AMF0/AMF3 command message.");
return ret;
}
// video, audio, data message
if ((ret = process_publish_message(source, msg, vhost_is_edge)) != ERROR_SUCCESS) {
srs_error("fmle process publish message failed. ret=%d", ret);
return ret;
}
return ret;
}
这里我们就可以看到,处理音频,视频,数据的函数了:
int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge)
{
int ret = ERROR_SUCCESS;
// for edge, directly proxy message to origin.
if (vhost_is_edge) {
if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {
srs_error("edge publish proxy msg failed. ret=%d", ret);
return ret;
}
return ret;
}
// process audio packet
if (msg->header.is_audio()) {
if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {
srs_error("source process audio message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process video packet
if (msg->header.is_video()) {
if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {
srs_error("source process video message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {
srs_error("source process aggregate message failed. ret=%d", ret);
return ret;
}
return ret;
}
// process onMetaData
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
srs_error("decode onMetaData message failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {
srs_error("source process onMetaData message failed. ret=%d", ret);
return ret;
}
srs_info("process onMetaData message success.");
return ret;
}
srs_info("ignore AMF0/AMF3 data message.");
return ret;
}
return ret;
}
主要就是送到对应的source中处理,其他流程后续分析;
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!