onceToken
利用变量生命周期执行传入的构造和析构,保证代码执行的先后顺序
class onceToken {
public:
using task = std::function<void(void)>;
template<typename FUNC>
onceToken(const FUNC &onConstructed, task onDestructed = nullptr) {
onConstructed();
_onDestructed = std::move(onDestructed);
}
onceToken(std::nullptr_t, task onDestructed = nullptr) {
_onDestructed = std::move(onDestructed);
}
~onceToken() {
if (_onDestructed) {
_onDestructed();
}
}
private:
onceToken() = delete;
onceToken(const onceToken &) = delete;
onceToken(onceToken &&) = delete;
onceToken &operator=(const onceToken &) = delete;
onceToken &operator=(onceToken &&) = delete;
private:
task _onDestructed;
};
应用
socket->setOnErr([weak_self, weak_session, id](const SockException &err) {
// 在本函数作用域结束时移除会话对象
// 目的是确保移除会话前执行其 onError 函数
// 同时避免其 onError 函数抛异常时没有移除会话对象
onceToken token(nullptr, [&]() {
// 移除掉会话
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
//从共享map中移除本session对象
lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
strong_self->_session_map->erase(id);
});
// 获取会话强应用
if (auto strong_session = weak_session.lock()) {
// 触发 onError 事件回调
strong_session->onError(err);
}
});
RingBuffer
RingDelegate
继承的子类需要实现onWrite函数
virtual void onWrite(T in, bool is_key = true) = 0;
_RingStorage
数据
using GopType = List< List<std::pair<bool, T> > >;//链表中装着链表,次级链表是装着<bool, T>
bool _have_idr;
size_t _size; //当前帧的数量 SUM(ervey gop size)
size_t _max_size; //最大记录帧数量 不能小于32。即最少最少也能存32帧
size_t _max_gop_size; //最大gop个数(Group of Picture IBPBP)
GopType _data_cache; //链表套链表
构造
设置 _max_size _max_gop_size
clearCache清除主list,插入一个空的次级list
popFrontGop 弹出最先插入的GOP
void clearCache() {
_size = 0;
_have_idr = false;
_data_cache.clear();
_data_cache.emplace_back();
}
void popFrontGop()
write
传过来的是I帧,就往新的GOP组里添加,
传过来的是PB帧,
之前的没有I帧则PB帧丢弃;
之前已经有I帧到来,插入到对应的GOP组
如果帧总数超过 _max_size,先尝试清除老的GOP缓存,还是大于最大缓冲限制,那么清空所有GOP
/**
* 写入环形缓存数据
* @param in 数据
* @param is_key 是否为关键帧
* @return 是否触发重置环形缓存大小
*/
void write(T in, bool is_key = true) {
if (is_key) {
_have_idr = true;
if (!_data_cache.back().empty()) {
//当前gop列队还没收到任意缓存
_data_cache.emplace_back();
}
if (_data_cache.size() > _max_gop_size) {
// GOP个数超过限制,那么移除最早的GOP
popFrontGop();
}
}
if (!_have_idr) {
//缓存中没有关键帧,那么gop缓存无效
return;
}
_data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));
if (++_size > _max_size) {
//GOP缓存溢出
while (_data_cache.size() > 1) {
//先尝试清除老的GOP缓存
popFrontGop();
}
if (_size > _max_size) {
//还是大于最大缓冲限制,那么清空所有GOP
clearCache();
}
}
}
List
继承了list,增加了append/for_each函数
template<typename T>
class List : public std::list<T> {
public:
template<typename ... ARGS>
List(ARGS &&...args) : std::list<T>(std::forward<ARGS>(args)...) {};
~List() = default;
void append(List<T> &other) {
if (other.empty()) {
return;
}
this->insert(this->end(), other.begin(), other.end());
other.clear();
}
template<typename FUNC>
void for_each(FUNC &&func) {
for (auto &t : *this) {
func(t);
}
}
template<typename FUNC>
void for_each(FUNC &&func) const {
for (auto &t : *this) {
func(t);
}
}
};
_RingReader
环形缓存读取器,构造时即绑定了对应的_RingStorage。
自己注册读取和detach函数,flushGop一次性读取所有帧
std::function<void(void)> _detach_cb = []() {};
std::function<void(const T &)> _read_cb = [](const T &) {};
void flushGop() {
if (!_storage) {
return;
}
_storage->getCache().for_each([this](const List<std::pair<bool, T > > &lst) {
lst.for_each([this](const std::pair<bool, T> &pr) { onRead(pr.second, pr.first); });
});
}
_RingReaderDispatcher
环形事件派发器
std::atomic_int _reader_size; //?
std::function<void(int, bool)> _on_size_changed; //?
typename RingStorage::Ptr _storage; //环形存储
std::unordered_map<void *, std::weak_ptr<RingReader> > _reader_map;//本线程中,读取相同源的player的集合
析构:_reader_map中的所有RingReader调用onDetach
构造:private,初始化_storage,_reader_size,_on_size_changed
write
,调用_reader_map中的每个RingReader的onRead,然后往环形存储中写
rtp包来时,给每个拉流源调用onRead,同时往自己的_storage里写入缓存,当新的拉流源来时,根据useCache判断是否使用_storage
void write(T in, bool is_key = true) {
for (auto it = _reader_map.begin(); it != _reader_map.end();) {
auto reader = it->second.lock();
if (!reader) {
it = _reader_map.erase(it);
--_reader_size;
onSizeChanged(false);
continue;
}
reader->onRead(in, is_key);
++it;
}
_storage->write(std::move(in), is_key);
}
attach
附加到poller上,构造RingReader,_reader_map++。可以选择是否使用存储的cache。
std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
if (!poller->isCurrentThread()) {
throw std::runtime_error("必须在绑定的poller线程中执行attach操作");
}
std::weak_ptr<_RingReaderDispatcher> weakSelf = this->shared_from_this();
auto on_dealloc = [weakSelf, poller](RingReader *ptr) {
poller->async([weakSelf, ptr]() {
auto strongSelf = weakSelf.lock();
if (strongSelf && strongSelf->_reader_map.erase(ptr)) {
--strongSelf->_reader_size;
strongSelf->onSizeChanged(false);
}
delete ptr;
});
};
std::shared_ptr<RingReader> reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);
_reader_map[reader.get()] = reader;
++_reader_size;
onSizeChanged(true);
return reader;
}
RingBuffer
using Ptr = std::shared_ptr<RingBuffer>;
using RingReader = _RingReader<T>;
using RingStorage = _RingStorage<T>;
using RingReaderDispatcher = _RingReaderDispatcher<T>;
using onReaderChanged = std::function<void(int size)>; //
std::mutex _mtx_map; //?
std::atomic_int _total_count { 0 }; //?_total_count是正在拉流的客户端的个数
typename RingStorage::Ptr _storage; //环形缓存
typename RingDelegate<T>::Ptr _delegate; //子类需要实现onWrite
onReaderChanged _on_reader_changed; //读取器变化的函数
std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
//不同的poller对应的不同的RingReaderDispatcher
构造
初始化RingStorage
attach
如果**_dispatcher_map中没有对应的poller键,new一个RingReaderDispatcher**,使用clone的RingStorage
write
有_delegate,_delegate->onWrite
否则 调用每个环形事件分发器中(其他线程)的write,往自己线程的缓存写。往自己的线程中写
本线程的对应的pusher数据来了,往storage中写,同时调用各个线程的读poller去读这数据
void write(T in, bool is_key = true) {
if (_delegate) {
_delegate->onWrite(std::move(in), is_key);
return;
}
LOCK_GUARD(_mtx_map);
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
//切换线程后触发onRead事件
pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);
}
_storage->write(std::move(in), is_key);
}
发送包到player的调用栈
EventPoller::runLoop//
addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) //
pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);//
_RingReaderDispatcher::write --->reader->onRead(in, is_key);
_RingReader::onRead--->_read_cb(data);
_play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->sendRtpPacket(pack);//发送rtp包
});
StrPrinter
一个可以支持<<的字符串
class _StrPrinter : public std::string {
public:
_StrPrinter() {}
template<typename T>
_StrPrinter& operator <<(T && data) {
_stream << std::forward<T>(data);
this->std::string::operator=(_stream.str());
return *this;
}
std::string operator <<(std::ostream&(*f)(std::ostream&)) const {
return *this;
}
private:
std::stringstream _stream;
};
EventDispatcher/NoticeCenter
NoticeCenter
结构
std::recursive_mutex _mtxListener;
std::unordered_map<std::string, EventDispatcher::Ptr> _mapListener;
private 获取和删除
EventDispatcher::Ptr getDispatcher(const std::string &event, bool create = false) {
std::lock_guard<std::recursive_mutex> lck(_mtxListener);
auto it = _mapListener.find(event);
if (it != _mapListener.end()) {
return it->second;
}
if (create) {
//如果为空则创建一个
EventDispatcher::Ptr dispatcher(new EventDispatcher());
_mapListener.emplace(event, dispatcher);
return dispatcher;
}
return nullptr;
}
void delDispatcher(const std::string &event, const EventDispatcher::Ptr &dispatcher) {
std::lock_guard<std::recursive_mutex> lck(_mtxListener);
auto it = _mapListener.find(event);
if (it != _mapListener.end() && dispatcher == it->second) {
//两者相同则删除
_mapListener.erase(it);
}
}
EventDispatcher
任意类型的FUNC
template<typename FUNC>
void addListener(void *tag, FUNC &&func) {
using funType = typename function_traits<typename std::remove_reference::type>::stl_function_type;
std::shared_ptr<void> pListener(new funType(std::forward(func)), [](void *ptr) {
funType *obj = (funType *) ptr;
delete obj;
});
多参数
template<typename …ArgsType>
int emitEvent(ArgsType &&…args) {
using funType = std::function<void(decltype(std::forward<ArgsType>(args))…)>;
funType *obj = (funType *) (pr.second.get());
(*obj)(std::forward<ArgsType>(args)…);
class EventDispatcher {
public:
friend class NoticeCenter;
using Ptr = std::shared_ptr<EventDispatcher>;
~EventDispatcher() = default;
private:
using MapType = std::unordered_multimap<void *, std::shared_ptr<void> >;
EventDispatcher() = default;
class InterruptException : public std::runtime_error {
public:
InterruptException() : std::runtime_error("InterruptException") {}
~InterruptException() {}
};
template<typename ...ArgsType>
int emitEvent(ArgsType &&...args) {
using funType = std::function<void(decltype(std::forward<ArgsType>(args))...)>;
decltype(_mapListener) copy;
{
//先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
std::lock_guard<std::recursive_mutex> lck(_mtxListener);
copy = _mapListener;
}
int ret = 0;
for (auto &pr : copy) {
funType *obj = (funType *) (pr.second.get());
try {
(*obj)(std::forward<ArgsType>(args)...);
++ret;
} catch (InterruptException &) {
++ret;
break;
}
}
return ret;
}
template<typename FUNC>
void addListener(void *tag, FUNC &&func) {
using funType = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
std::shared_ptr<void> pListener(new funType(std::forward<FUNC>(func)), [](void *ptr) {
funType *obj = (funType *) ptr;
delete obj;
});
std::lock_guard<std::recursive_mutex> lck(_mtxListener);
_mapListener.emplace(tag, pListener);
}
void delListener(void *tag, bool &empty) {
std::lock_guard<std::recursive_mutex> lck(_mtxListener);
_mapListener.erase(tag);
empty = _mapListener.empty();
}
private:
std::recursive_mutex _mtxListener;
MapType _mapListener;
};
如果用C
完美转发用void*代替
锁+map用静态的map和vector代替(非多线程)
typedef int (*BG_RPC_MSG_HANDLER_FUNC)(void *rpc_msg);
std::map<int, std::vector<BG_RPC_MSG_HANDLER_FUNC>> event_handles;
void event_register(int id, BG_RPC_MSG_HANDLER_FUNC func)
{
auto& event_funcs = event_handles[id];
for (const auto& each_func : event_funcs)
{
if (each_func == func)
{
error_log("id:%d already register!", id);
return;
}
}
event_handles[id].push_back(func);
}
void event_call(int id, void *msg)
{
auto it = event_handles.find(id);
if (it == event_handles.end()) return;
for (const auto &h : it->second)
{
h(msg);
}
}
总结
-
onceToken
-
EventDispatcher 模板编程,std::unordered_multimap<**void ***, std::shared_ptr<void> >的使用; std::forward, ArgsType &&…args
-
如何多线程处理一个rtp数据包,且不涉及到拷贝
每个RingBuffer有对应的map [EventPoller::Ptr, typename RingReaderDispatcher] 加锁,给对应的RingReaderDispatcher发rtp包指针,RingReaderDispatcher再给_reader_map中的不同客户端(但是是相同的poller管理)发rtp包。