做转码服务的原型时,看了看MCU的实现,考虑到如果不做转码,可以将多路rtp流直接合成为一路rtmp流输出,这样就相当于实现了多人连麦,并将多人连麦的视频转发直播了,所以做了这个简单的原型实现!
DEMO只实现了接收一路rtp流,输出一路rtmp流!
同转码服务的类图设计:
基础库是ZLMediaKit,确实很方便!
直接上代码:
TranscoderTaskManager.h
可以使用linux的nc?127.0.0.1?3500?进行客户端测试!
然后使用ffmpeg对接收到的端口进行rtp包的推流:
?
ffmpeg?-re?-i?tuiliu_mp4.mp4?-vcodec?libx264?-b:v?600k?-s?480x320?-profile?baseline??-maxrate?600k?-minrate?600k?-g?20?-keyint_min?20??-sc_threshold?0?-an?-f?rtp?rtp://11.12.112.42:52458
在ZLMediaKit的ZLMediaKit\server\main.cpp中启动TCP?3500端口的监听:
//启动转码服务
TranscoderTaskManager::getInstance().startTranscoderServer();
此结构体用来接收命令
/*
{“dest_ip”:11.12.112.10,
“dest_port”:9000,
“socket_protocol”:”udp”,
“transport_protocol”:”rtp”,
“source_width”:1080,
“source_height”:1920,
"source_sps":"";
"source_pps":"";
“source_samplerate”:2000,//kbps
“source_video_payloadtype”:”rtp”,
“source_video_codec”:”h264”,
“source_audio_codec”:”aac”,
“dest_video_codec”:”h264”,
“dest_audio_codec”:”aac”,
“dest_width”:640,
“dest_height”:480,
“dest_samplerate”:800?}
*/
class?InputTaskInfo?:?public?std::enable_shared_from_this<InputTaskInfo>?{
friend?class?TranscoderTaskManager;
friend?class?TranscoderSession;
public:
typedef?std::shared_ptr<InputTaskInfo>?Ptr;
protected:
string?dest_ip;
string?transactcode;
string?protocol;
int?dest_port;//for?tcp
int?dest_audio_port;
int?dest_video_port;
int?socket_protocol;//0:udp,?1:tcp
string?transport_protocol;
int?source_width;
int?source_height;
int?source_samplerate;
string?source_video_payloadtype;
string?source_audio_payloadtype;
string?source_video_codec;
string?source_audio_codec;
string?source_sps;
string?source_pps;
string?dest_video_codec;
string?dest_audio_codec;
int?dest_width;
int?dest_height;
int?dest_samplerate;?
bool?needTranscode;
bool?outputUseRTMP;
bool?outputNoAudio;
bool?bSrtp;
string?output_rtmp_live_name;
int?proxy_recv_audio_port;
int?proxy_recv_video_port;
RcvUDPDataTask::Ptr?rcvVideoUDPTask;
RcvUDPDataTask::Ptr?rcvAudioUDPTask;
Timer::Ptr?_muteAudiotimer;
unsigned?long?lastTimeStamp;
unsigned?long?lastVideoTimeStamp;
unsigned?long?lastAudioTimeStamp;
Timer::Ptr?_timer;
MuteAudioMaker::Ptr?_audioMaker;
MultiMediaSourceMuxer::Ptr?_mediaMuxer?=?NULL;
std::shared_ptr<FrameMerger>?_merger;
AudioTrack::Ptr?_audioTrack?=?NULL;
VideoTrack::Ptr?_videoTrack?=?NULL;
void?*_rtp_decoder?=?nullptr;
BufferRaw::Ptr?_buffer;
};
class?TranscoderTaskManager?:?public?std::enable_shared_from_this<TranscoderTaskManager>
{
public:
typedef?std::shared_ptr<TranscoderTaskManager>?Ptr;
static?TranscoderTaskManager&?getInstance()?{
static?TranscoderTaskManager?taskManager;
return?taskManager;
}
void?startTranscoderServer();
void?addTask(const?string?&transcode,?const?InputTaskInfo::Ptr?&inputInfo);
void?removeTask(const?string?&transcode)?{
lock_guard<mutex>?lck(_mtxTranscodeClient);
_userTranscoderClientInfoMap.erase(transcode);
}
InputTaskInfo::Ptr?getTask(string?&transcode);
void?removeTask(string?&transcode);
protected:
TranscoderTaskManager();
~TranscoderTaskManager();
?
private:
TcpServer::Ptr?_transcoderSrv;
unordered_map<string,?InputTaskInfo::Ptr>?_userTranscoderClientInfoMap;
mutex?_mtxTranscodeClient;?
};
////////////TRANSCODER?配置///////////
namespace?Transcoder?{
#define?TRANSCODER_FIELD?"transcoder."
const?string?kPort?=?TRANSCODER_FIELD"port";
onceToken?token1([]()?{
mINI::Instance()[kPort]?=?3500;
},?nullptr);
}?//namespace?Shell
TranscoderTaskManager::TranscoderTaskManager():_transcoderSrv(new?TcpServer())
{?
}
TranscoderTaskManager::~TranscoderTaskManager()
{
}
void?TranscoderTaskManager::startTranscoderServer()?{
uint16_t?transcoderPort?=?mINI::Instance()[Transcoder::kPort];
_transcoderSrv->start<TranscoderSession>(transcoderPort);
}
void?TranscoderTaskManager::addTask(const?string?&transcode,?const?InputTaskInfo::Ptr?&inputInfo)?{
//创建转码对象TranscoderTask
//创建接收socket
//开始监听接收任务和转码任务
lock_guard<mutex>?lck(_mtxTranscodeClient);
_userTranscoderClientInfoMap[transcode]?=?inputInfo;
}
InputTaskInfo::Ptr?TranscoderTaskManager::getTask(string?&transcode)?{
if?(_userTranscoderClientInfoMap.find(transcode)?!=?_userTranscoderClientInfoMap.end())?{
return?_userTranscoderClientInfoMap[transcode];
}
return?NULL;
}
void?TranscoderTaskManager::removeTask(string?&transcode)?{
_userTranscoderClientInfoMap->erase(transcode);
}
TranscoderSession.h
class?TranscoderSession?:
public?TcpSession
{
public:
TranscoderSession(const?Socket::Ptr?&pSock);
virtual?~TranscoderSession();
////TcpSession?override////
void?onRecv(const?Buffer::Ptr?&pBuf)?override;
void?onError(const?SockException?&err)?override;
void?onManager()?override;
private:
string?_transcoder;
string?_strRecvBuf;
Ticker?_beatTicker;
string?_strUserName;
//消耗的总流量
uint64_t?_ui64TotalBytes?=?0;
};
TranscoderSession.cpp
/**?常量定义?**/
#define?START_TRANSCODE_CMD?"1001"
#define?STOP_TRANSCODE_CMD?"1002"
#define?START_PROXY_CMD?"2001"
#define?STOP_PROXY_CMD?"2002"
/**?函数?**/
TranscoderSession::TranscoderSession(const?Socket::Ptr?&pSock)?:?TcpSession(pSock)?{
DebugP(this);
//send("hello.");
}
void?TranscoderSession::onRecv(const?Buffer::Ptr&buf)?{
//DebugL?<<?hexdump(buf->data(),?buf->size());??
_beatTicker.resetTime();
//所有3500的输入消息会回调到这个方法:
//使用json解析出命令START_PROXY_CMD,?然后启动一个UDP的接收任务:
_strRecvBuf.append(buf->data(),?buf->size());
Json::Reader?reader;
Json::Value?root;
if?(reader.parse(strValue,?root))
{
????????//..此处省略解析json字符串的代码
????????
if?(value.compare(START_PROXY_CMD)?==?0)?{
const?weak_ptr<TcpSession>?weakSelf?=?shared_from_this();
auto?&weak1?=?inputInfo;
if?(inputInfo->_mediaMuxer?==?NULL)?{
????//使用rtmp://127.0.0.1/live/chn_00?点播就可以了
????????inputInfo->_mediaMuxer.reset(new?MultiMediaSourceMuxer(DEFAULT_VHOST,?"live",?"chn_00",?0,?true,?true,?false,?false));
}?
inputInfo->rcvVideoUDPTask?=?make_shared<RcvUDPDataTask>();
inputInfo->proxy_recv_video_port?=?inputInfo->rcvVideoUDPTask->startListener([weakSelf,?weak1](const?Buffer::Ptr?&buf,?struct?sockaddr?*addr,?int?len)?{
????????????
????uint8_t?*?data?=?(uint8_t?*)buf->data();
????uint8_t?rtp_type?=?0x7F?&?data[1];
????uint8_t?rtp_mark?=?0x1?&?data[2];
????uint32_t?timestamp?=?(((uint32_t)data[4])?<<?24)?|?(((uint32_t)data[5])?<<?16)?|?(((uint32_t)data[6])?<<?8)?|?data[7];
????????auto?frame?=?std::make_shared<H264FrameNoCacheAble>((char?*)(buf->data()?+?12),?buf->size()?-?12,?timestamp,?timestamp,?0);
????????//这里就是把收到的rtp流转发给mediamuxer,用于混合成rtmp流
????????weak1->_videoTrack->inputFrame(frame);
});
inputInfo->_videoTrack?=?std::make_shared<H264Track>();
//添加视频
inputInfo->_mediaMuxer->addTrack(inputInfo->_videoTrack);
//视频数据写入_mediaMuxer
inputInfo->_videoTrack->addDelegate(inputInfo->_mediaMuxer);
//用来合并rtp包
inputInfo->_merger?=?std::make_shared<FrameMerger>();
inputInfo->rcvAudioUDPTask?=?make_shared<RcvUDPDataTask>();
inputInfo->proxy_recv_audio_port?=?inputInfo->rcvAudioUDPTask->startListener([weakSelf,?weak1](const?Buffer::Ptr?&buf,?struct?sockaddr?*addr,?int?len)?{
//??
uint8_t?*?data?=?(uint8_t?*)buf->data();
uint8_t?rtp_type?=?0x7F?&?data[1];
uint8_t?rtp_mark?=?0x1?&?data[2];
uint32_t?timestamp?=?(((uint32_t)data[4])?<<?24)?|?(((uint32_t)data[5])?<<?16)?|?(((uint32_t)data[6])?<<?8)?|?data[7];
auto?frame?=?std::make_shared<AACFrameNoCacheAble>((char?*)(buf->data()?+?12),?buf->size()?-?12,?timestamp,?timestamp);?
weak1->_audioTrack->inputFrame(frame);
weak1->_timer.reset();
}
);
inputInfo->_audioTrack?=?std::make_shared<AACTrack>();
//添加音频
inputInfo->_mediaMuxer->addTrack(inputInfo->_audioTrack);
inputInfo->_audioTrack->addDelegate(inputInfo->_mediaMuxer);?
retJson["proxy_recv_video_port"]?=?inputInfo->proxy_recv_video_port;
retJson["proxy_recv_audio_port"]?=?inputInfo->proxy_recv_audio_port;
TranscoderTaskManager::getInstance().addTask(inputInfo->transactcode,?inputInfo);
//将接收video和audio的端口返回给客户端
std::string?out?=?retJson.toStyledString();
send(out);
}
}
}
TranscoderSession::~TranscoderSession()
{
DebugP(this);
TranscoderTaskManager::getInstance().removeTask(_transactcode);
}
void?TranscoderSession::onError(const?SockException?&err)?{
WarnP(this)?<<?err.what();
}
void?TranscoderSession::onManager()?{
//session?超时管理
}
class?RcvUDPDataTask?:?public?std::enable_shared_from_this<RcvUDPDataTask>
{
public:
//接收数据回调
typedef?function<void(const?Buffer::Ptr?&buf,?struct?sockaddr?*addr,?int?addr_len)>?onReadCB;
enum?MediaType?{VIDEO?=?0,?AUDIO};
typedef?std::shared_ptr<RcvUDPDataTask>?Ptr;
public:
RcvUDPDataTask();
virtual?~RcvUDPDataTask();
int?startListener(string?peerAddr,?int?peerPort);
int?startListener(onReadCB?cb);
int?stopListener();
private:
//RTP端口,trackid?idx?为数组下标
Socket::Ptr?_rcvSock;
Socket::Ptr?_sendSock;
uint64_t?_ui64TotalBytes?=?0;
MediaType?_mediaType;
};
RcvUDPDataTask::RcvUDPDataTask()
{?
_sendSock.reset(new?Socket(nullptr,?false));
_rcvSock.reset(new?Socket(nullptr,?false));
}?
int?RcvUDPDataTask::stopListener()
{?
_rcvSock->closeSock();
return?0;
}
int?RcvUDPDataTask::startListener(onReadCB?cb)?{
//设置接收socket
onceToken?token(nullptr,?[&]()?{
SockUtil::setRecvBuf(_rcvSock->rawFD(),?4?*?1024?*?1024);
//SockUtil::setSendBuf(_sendSock->rawFD(),?4?*?1024?*?1024);
});
?
????????//所有收到的包直接回调到cb方法
_rcvSock->setOnRead(cb);
_rcvSock->setOnErr([this](const?SockException?&err)?{?});
if?(!_rcvSock->bindUdpSock(0,?"0.0.0.0"))?{
return?-1;
}
return?_rcvSock->get_local_port();
}