前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >实现一个接收多路RTP流,输出一路RTMP流的简单MCU

实现一个接收多路RTP流,输出一路RTMP流的简单MCU

作者头像
呱牛笔记
发布2023-05-02 15:42:49
8860
发布2023-05-02 15:42:49
举报
文章被收录于专栏:呱牛笔记呱牛笔记

做转码服务的原型时,看了看MCU的实现,考虑到如果不做转码,可以将多路rtp流直接合成为一路rtmp流输出,这样就相当于实现了多人连麦,并将多人连麦的视频转发直播了,所以做了这个简单的原型实现!

DEMO只实现了接收一路rtp流,输出一路rtmp流!

同转码服务的类图设计:

呱牛笔记
呱牛笔记

基础库是ZLMediaKit,确实很方便!

直接上代码:

TranscoderTaskManager.h

代码语言:javascript
复制
可以使用linux的nc?127.0.0.1?3500?进行客户端测试!
然后使用ffmpeg对接收到的端口进行rtp包的推流:
?
ffmpeg?-re?-i?tuiliu_mp4.mp4?-vcodec?libx264?-b:v?600k?-s?480x320?-profile?baseline??-maxrate?600k?-minrate?600k?-g?20?-keyint_min?20??-sc_threshold?0?-an?-f?rtp?rtp://11.12.112.42:52458

在ZLMediaKit的ZLMediaKit\server\main.cpp中启动TCP?3500端口的监听:

//启动转码服务
TranscoderTaskManager::getInstance().startTranscoderServer();

此结构体用来接收命令
/*
{“dest_ip”:11.12.112.10,
“dest_port”:9000,
“socket_protocol”:”udp”,
“transport_protocol”:”rtp”,
“source_width”:1080,
“source_height”:1920,
"source_sps":"";
"source_pps":"";
“source_samplerate”:2000,//kbps
“source_video_payloadtype”:”rtp”,
“source_video_codec”:”h264”,
“source_audio_codec”:”aac”,
“dest_video_codec”:”h264”,
“dest_audio_codec”:”aac”,
“dest_width”:640,
“dest_height”:480,
“dest_samplerate”:800?}
*/
class?InputTaskInfo?:?public?std::enable_shared_from_this<InputTaskInfo>?{
	friend?class?TranscoderTaskManager;
	friend?class?TranscoderSession;
public:
	typedef?std::shared_ptr<InputTaskInfo>?Ptr;
protected:
	string?dest_ip;
	string?transactcode;
	string?protocol;
	int?dest_port;//for?tcp
	int?dest_audio_port;
	int?dest_video_port;
	int?socket_protocol;//0:udp,?1:tcp
	string?transport_protocol;
	int?source_width;
	int?source_height;
	int?source_samplerate;
	string?source_video_payloadtype;
	string?source_audio_payloadtype;
	string?source_video_codec;
	string?source_audio_codec;
	string?source_sps;
	string?source_pps;

	string?dest_video_codec;
	string?dest_audio_codec;
	int?dest_width;
	int?dest_height;
	int?dest_samplerate;?
	bool?needTranscode;
	bool?outputUseRTMP;
	bool?outputNoAudio;
	bool?bSrtp;
	string?output_rtmp_live_name;
	int?proxy_recv_audio_port;
	int?proxy_recv_video_port;
	RcvUDPDataTask::Ptr?rcvVideoUDPTask;
	RcvUDPDataTask::Ptr?rcvAudioUDPTask;

	Timer::Ptr?_muteAudiotimer;
	unsigned?long?lastTimeStamp;
	unsigned?long?lastVideoTimeStamp;
	unsigned?long?lastAudioTimeStamp;
	Timer::Ptr?_timer;
	MuteAudioMaker::Ptr?_audioMaker;
	MultiMediaSourceMuxer::Ptr?_mediaMuxer?=?NULL;
	std::shared_ptr<FrameMerger>?_merger;

	AudioTrack::Ptr?_audioTrack?=?NULL;
	VideoTrack::Ptr?_videoTrack?=?NULL;
	void?*_rtp_decoder?=?nullptr;
	BufferRaw::Ptr?_buffer;
};


class?TranscoderTaskManager?:?public?std::enable_shared_from_this<TranscoderTaskManager>
{
public:
	typedef?std::shared_ptr<TranscoderTaskManager>?Ptr;
	static?TranscoderTaskManager&?getInstance()?{
		static?TranscoderTaskManager?taskManager;
		return?taskManager;
	}

	void?startTranscoderServer();


	void?addTask(const?string?&transcode,?const?InputTaskInfo::Ptr?&inputInfo);
	void?removeTask(const?string?&transcode)?{

		lock_guard<mutex>?lck(_mtxTranscodeClient);
		_userTranscoderClientInfoMap.erase(transcode);
	}

	InputTaskInfo::Ptr?getTask(string?&transcode);
	void?removeTask(string?&transcode);
protected:
	TranscoderTaskManager();
	~TranscoderTaskManager();
	?
private:
	TcpServer::Ptr?_transcoderSrv;
	unordered_map<string,?InputTaskInfo::Ptr>?_userTranscoderClientInfoMap;
	mutex?_mtxTranscodeClient;?
};



////////////TRANSCODER?配置///////////
namespace?Transcoder?{
#define?TRANSCODER_FIELD?"transcoder."
	const?string?kPort?=?TRANSCODER_FIELD"port";
	onceToken?token1([]()?{
		mINI::Instance()[kPort]?=?3500;
	},?nullptr);
}?//namespace?Shell

TranscoderTaskManager::TranscoderTaskManager():_transcoderSrv(new?TcpServer())
{?
}


TranscoderTaskManager::~TranscoderTaskManager()
{
}

void?TranscoderTaskManager::startTranscoderServer()?{

	uint16_t?transcoderPort?=?mINI::Instance()[Transcoder::kPort];
	_transcoderSrv->start<TranscoderSession>(transcoderPort);
}

void?TranscoderTaskManager::addTask(const?string?&transcode,?const?InputTaskInfo::Ptr?&inputInfo)?{
	//创建转码对象TranscoderTask

	//创建接收socket

	//开始监听接收任务和转码任务
	lock_guard<mutex>?lck(_mtxTranscodeClient);
	_userTranscoderClientInfoMap[transcode]?=?inputInfo;

}

InputTaskInfo::Ptr?TranscoderTaskManager::getTask(string?&transcode)?{
	if?(_userTranscoderClientInfoMap.find(transcode)?!=?_userTranscoderClientInfoMap.end())?{
		return?_userTranscoderClientInfoMap[transcode];
	}
	return?NULL;
}

void?TranscoderTaskManager::removeTask(string?&transcode)?{

	_userTranscoderClientInfoMap->erase(transcode);
}

TranscoderSession.h

代码语言:javascript
复制
	class?TranscoderSession?:
		public?TcpSession
	{
	public:
		TranscoderSession(const?Socket::Ptr?&pSock);
		virtual?~TranscoderSession();

		////TcpSession?override////
		void?onRecv(const?Buffer::Ptr?&pBuf)?override;
		void?onError(const?SockException?&err)?override;
		void?onManager()?override;

	private:
		string?_transcoder;
		string?_strRecvBuf;
		Ticker?_beatTicker;
		string?_strUserName;
		//消耗的总流量
		uint64_t?_ui64TotalBytes?=?0;
	};

TranscoderSession.cpp

代码语言:javascript
复制
/**?常量定义?**/
#define?START_TRANSCODE_CMD?"1001"
#define?STOP_TRANSCODE_CMD?"1002"

#define?START_PROXY_CMD?"2001"
#define?STOP_PROXY_CMD?"2002"

/**?函数?**/
	TranscoderSession::TranscoderSession(const?Socket::Ptr?&pSock)?:?TcpSession(pSock)?{
		DebugP(this);
		//send("hello.");
	}

	void?TranscoderSession::onRecv(const?Buffer::Ptr&buf)?{
		//DebugL?<<?hexdump(buf->data(),?buf->size());??
		_beatTicker.resetTime();
		
		//所有3500的输入消息会回调到这个方法:
		//使用json解析出命令START_PROXY_CMD,?然后启动一个UDP的接收任务:
		_strRecvBuf.append(buf->data(),?buf->size());
		
		Json::Reader?reader;
		Json::Value?root;
		if?(reader.parse(strValue,?root))
		{
		????????//..此处省略解析json字符串的代码
		????????
			if?(value.compare(START_PROXY_CMD)?==?0)?{
				const?weak_ptr<TcpSession>?weakSelf?=?shared_from_this();
				auto?&weak1?=?inputInfo;
				if?(inputInfo->_mediaMuxer?==?NULL)?{
				????//使用rtmp://127.0.0.1/live/chn_00?点播就可以了
			????????inputInfo->_mediaMuxer.reset(new?MultiMediaSourceMuxer(DEFAULT_VHOST,?"live",?"chn_00",?0,?true,?true,?false,?false));
				}?
				inputInfo->rcvVideoUDPTask?=?make_shared<RcvUDPDataTask>();

				inputInfo->proxy_recv_video_port?=?inputInfo->rcvVideoUDPTask->startListener([weakSelf,?weak1](const?Buffer::Ptr?&buf,?struct?sockaddr?*addr,?int?len)?{
			????????????
				????uint8_t?*?data?=?(uint8_t?*)buf->data();
				????uint8_t?rtp_type?=?0x7F?&?data[1];
				????uint8_t?rtp_mark?=?0x1?&?data[2];
				????uint32_t?timestamp?=?(((uint32_t)data[4])?<<?24)?|?(((uint32_t)data[5])?<<?16)?|?(((uint32_t)data[6])?<<?8)?|?data[7];
			????????auto?frame?=?std::make_shared<H264FrameNoCacheAble>((char?*)(buf->data()?+?12),?buf->size()?-?12,?timestamp,?timestamp,?0);
			????????//这里就是把收到的rtp流转发给mediamuxer,用于混合成rtmp流
			????????weak1->_videoTrack->inputFrame(frame);	
				});
				
				inputInfo->_videoTrack?=?std::make_shared<H264Track>();
				//添加视频
				inputInfo->_mediaMuxer->addTrack(inputInfo->_videoTrack);
				//视频数据写入_mediaMuxer
				inputInfo->_videoTrack->addDelegate(inputInfo->_mediaMuxer);
				//用来合并rtp包
				inputInfo->_merger?=?std::make_shared<FrameMerger>();
				
				
				inputInfo->rcvAudioUDPTask?=?make_shared<RcvUDPDataTask>();

				inputInfo->proxy_recv_audio_port?=?inputInfo->rcvAudioUDPTask->startListener([weakSelf,?weak1](const?Buffer::Ptr?&buf,?struct?sockaddr?*addr,?int?len)?{
					//??
					uint8_t?*?data?=?(uint8_t?*)buf->data();
					uint8_t?rtp_type?=?0x7F?&?data[1];
					uint8_t?rtp_mark?=?0x1?&?data[2];
					uint32_t?timestamp?=?(((uint32_t)data[4])?<<?24)?|?(((uint32_t)data[5])?<<?16)?|?(((uint32_t)data[6])?<<?8)?|?data[7];
					auto?frame?=?std::make_shared<AACFrameNoCacheAble>((char?*)(buf->data()?+?12),?buf->size()?-?12,?timestamp,?timestamp);?
					weak1->_audioTrack->inputFrame(frame);
					weak1->_timer.reset();
				}
				);

				inputInfo->_audioTrack?=?std::make_shared<AACTrack>();
				//添加音频
				inputInfo->_mediaMuxer->addTrack(inputInfo->_audioTrack);
				inputInfo->_audioTrack->addDelegate(inputInfo->_mediaMuxer);?
				
				retJson["proxy_recv_video_port"]?=?inputInfo->proxy_recv_video_port;
				retJson["proxy_recv_audio_port"]?=?inputInfo->proxy_recv_audio_port;
				
				TranscoderTaskManager::getInstance().addTask(inputInfo->transactcode,?inputInfo);
				//将接收video和audio的端口返回给客户端				
				std::string?out?=?retJson.toStyledString();
				send(out);
			}
					
		}
			
	}	
	
	TranscoderSession::~TranscoderSession()
	{
		DebugP(this);
		TranscoderTaskManager::getInstance().removeTask(_transactcode);
	}

	void?TranscoderSession::onError(const?SockException?&err)?{
		WarnP(this)?<<?err.what();
	}

	void?TranscoderSession::onManager()?{
		//session?超时管理

	}
代码语言:javascript
复制
class?RcvUDPDataTask?:?public?std::enable_shared_from_this<RcvUDPDataTask>
{
public:
	//接收数据回调
	typedef?function<void(const?Buffer::Ptr?&buf,?struct?sockaddr?*addr,?int?addr_len)>?onReadCB;
	enum?MediaType?{VIDEO?=?0,?AUDIO};
	typedef?std::shared_ptr<RcvUDPDataTask>?Ptr;
public:
	RcvUDPDataTask();
	virtual?~RcvUDPDataTask();
	int?startListener(string?peerAddr,?int?peerPort);
	int?startListener(onReadCB?cb);
	int?stopListener();
private:
	//RTP端口,trackid?idx?为数组下标
	Socket::Ptr?_rcvSock;
	Socket::Ptr?_sendSock;
	uint64_t?_ui64TotalBytes?=?0;
	MediaType?_mediaType;
};


RcvUDPDataTask::RcvUDPDataTask()
{?
	_sendSock.reset(new?Socket(nullptr,?false));
	_rcvSock.reset(new?Socket(nullptr,?false));
}?

int?RcvUDPDataTask::stopListener()
{?
	_rcvSock->closeSock();
	return?0;
}
int?RcvUDPDataTask::startListener(onReadCB?cb)?{
	//设置接收socket
	onceToken?token(nullptr,?[&]()?{
		SockUtil::setRecvBuf(_rcvSock->rawFD(),?4?*?1024?*?1024);
		//SockUtil::setSendBuf(_sendSock->rawFD(),?4?*?1024?*?1024);
	});
?
????????//所有收到的包直接回调到cb方法
	_rcvSock->setOnRead(cb);

	_rcvSock->setOnErr([this](const?SockException?&err)?{?});
	if?(!_rcvSock->bindUdpSock(0,?"0.0.0.0"))?{
		return?-1;
	}
	return?_rcvSock->get_local_port();
}
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-04-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云直播
云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com