zlMediaKit 7 utils模块--ringbuffer发布订阅

news/2024/7/23 11:21:48 标签: c++

onceToken

利用变量生命周期执行传入的构造和析构,保证代码执行的先后顺序

class onceToken {
public:
    using task = std::function<void(void)>;

    template<typename FUNC>
    onceToken(const FUNC &onConstructed, task onDestructed = nullptr) {
        onConstructed();
        _onDestructed = std::move(onDestructed);
    }

    onceToken(std::nullptr_t, task onDestructed = nullptr) {
        _onDestructed = std::move(onDestructed);
    }

    ~onceToken() {
        if (_onDestructed) {
            _onDestructed();
        }
    }

private:
    onceToken() = delete;
    onceToken(const onceToken &) = delete;
    onceToken(onceToken &&) = delete;
    onceToken &operator=(const onceToken &) = delete;
    onceToken &operator=(onceToken &&) = delete;

private:
    task _onDestructed;
};

应用

socket->setOnErr([weak_self, weak_session, id](const SockException &err) {
            // 在本函数作用域结束时移除会话对象
            // 目的是确保移除会话前执行其 onError 函数
            // 同时避免其 onError 函数抛异常时没有移除会话对象
            onceToken token(nullptr, [&]() {
                // 移除掉会话
                auto strong_self = weak_self.lock();
                if (!strong_self) {
                    return;
                }
                //从共享map中移除本session对象
                lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
                strong_self->_session_map->erase(id);
            });

            // 获取会话强应用
            if (auto strong_session = weak_session.lock()) {
                // 触发 onError 事件回调
                strong_session->onError(err);
            }
        });

RingBuffer

RingDelegate

继承的子类需要实现onWrite函数

virtual void onWrite(T in, bool is_key = true) = 0;

_RingStorage

在这里插入图片描述

数据

using GopType = List< List<std::pair<bool, T> > >;//链表中装着链表,次级链表是装着<bool, T>

    bool _have_idr;
    size_t _size;         //当前帧的数量 SUM(ervey gop size)
    size_t _max_size;     //最大记录帧数量 不能小于32。即最少最少也能存32帧
    size_t _max_gop_size; //最大gop个数(Group of Picture IBPBP)
    GopType _data_cache;  //链表套链表

构造

设置 _max_size _max_gop_size

clearCache清除主list,插入一个空的次级list

popFrontGop 弹出最先插入的GOP

    void clearCache() {
        _size = 0;
        _have_idr = false;
        _data_cache.clear();
        _data_cache.emplace_back();
    }

	void popFrontGop()

write

传过来的是I帧,就往新的GOP组里添加,

​ 传过来的是PB帧,

​ 之前的没有I帧则PB帧丢弃;

​ 之前已经有I帧到来,插入到对应的GOP组

如果帧总数超过 _max_size,先尝试清除老的GOP缓存,还是大于最大缓冲限制,那么清空所有GOP

/**
     * 写入环形缓存数据
     * @param in 数据
     * @param is_key 是否为关键帧
     * @return 是否触发重置环形缓存大小
     */
    void write(T in, bool is_key = true) {
        if (is_key) {
            _have_idr = true;
            if (!_data_cache.back().empty()) {
                //当前gop列队还没收到任意缓存
                _data_cache.emplace_back();
            }
            if (_data_cache.size() > _max_gop_size) {
                // GOP个数超过限制,那么移除最早的GOP
                popFrontGop();
            }
        }

        if (!_have_idr) {
            //缓存中没有关键帧,那么gop缓存无效
            return;
        }
        _data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));
        if (++_size > _max_size) {
            //GOP缓存溢出
            while (_data_cache.size() > 1) {
                //先尝试清除老的GOP缓存
                popFrontGop();
            }
            if (_size > _max_size) {
                //还是大于最大缓冲限制,那么清空所有GOP
                clearCache();
            }
        }
    }

List

继承了list,增加了append/for_each函数

template<typename T>
class List : public std::list<T> {
public:
    template<typename ... ARGS>
    List(ARGS &&...args) : std::list<T>(std::forward<ARGS>(args)...) {};

    ~List() = default;

    void append(List<T> &other) {
        if (other.empty()) {
            return;
        }
        this->insert(this->end(), other.begin(), other.end());
        other.clear();
    }

    template<typename FUNC>
    void for_each(FUNC &&func) {
        for (auto &t : *this) {
            func(t);
        }
    }

    template<typename FUNC>
    void for_each(FUNC &&func) const {
        for (auto &t : *this) {
            func(t);
        }
    }
};

_RingReader

环形缓存读取器,构造时即绑定了对应的_RingStorage

自己注册读取和detach函数,flushGop一次性读取所有帧

	std::function<void(void)> _detach_cb = []() {};
    std::function<void(const T &)> _read_cb = [](const T &) {};
	
    void flushGop() {
        if (!_storage) {
            return;
        }
        _storage->getCache().for_each([this](const List<std::pair<bool, T > > &lst) {
            lst.for_each([this](const std::pair<bool, T> &pr) { onRead(pr.second, pr.first); });
        });
    }

_RingReaderDispatcher

环形事件派发器

	std::atomic_int _reader_size;                         //?
    std::function<void(int, bool)> _on_size_changed;      //?
    typename RingStorage::Ptr _storage;                   //环形存储
    std::unordered_map<void *, std::weak_ptr<RingReader> > _reader_map;//本线程中,读取相同源的player的集合

析构:_reader_map中的所有RingReader调用onDetach

构造:private,初始化_storage,_reader_size,_on_size_changed

write

,调用_reader_map中的每个RingReader的onRead,然后往环形存储中写

rtp包来时,给每个拉流源调用onRead,同时往自己的_storage里写入缓存,当新的拉流源来时,根据useCache判断是否使用_storage

void write(T in, bool is_key = true) {
    for (auto it = _reader_map.begin(); it != _reader_map.end();) { 
        auto reader = it->second.lock();
        if (!reader) {
            it = _reader_map.erase(it);
            --_reader_size;
            onSizeChanged(false);
            continue;
        }
        reader->onRead(in, is_key);
        ++it;
    }
    _storage->write(std::move(in), is_key);
}

attach

附加到poller上,构造RingReader,_reader_map++。可以选择是否使用存储的cache。

std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
    if (!poller->isCurrentThread()) {
        throw std::runtime_error("必须在绑定的poller线程中执行attach操作");
    }

    std::weak_ptr<_RingReaderDispatcher> weakSelf = this->shared_from_this();
    auto on_dealloc = [weakSelf, poller](RingReader *ptr) {
        poller->async([weakSelf, ptr]() {
            auto strongSelf = weakSelf.lock();
            if (strongSelf && strongSelf->_reader_map.erase(ptr)) {
                --strongSelf->_reader_size;
                strongSelf->onSizeChanged(false);
            }
            delete ptr;
        });
    };

    std::shared_ptr<RingReader> reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);
    _reader_map[reader.get()] = reader;
    ++_reader_size;
    onSizeChanged(true);
    return reader;
}

RingBuffer

    using Ptr = std::shared_ptr<RingBuffer>;
    using RingReader = _RingReader<T>;                     
    using RingStorage = _RingStorage<T>;
    using RingReaderDispatcher = _RingReaderDispatcher<T>;
    using onReaderChanged = std::function<void(int size)>; //

	std::mutex _mtx_map;                               //?
    std::atomic_int _total_count { 0 };                //?_total_count是正在拉流的客户端的个数
    typename RingStorage::Ptr _storage;                //环形缓存
    typename RingDelegate<T>::Ptr _delegate;           //子类需要实现onWrite
    onReaderChanged _on_reader_changed;                //读取器变化的函数
    std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
    //不同的poller对应的不同的RingReaderDispatcher

构造

初始化RingStorage

attach

如果**_dispatcher_map中没有对应的poller键,new一个RingReaderDispatcher**,使用clone的RingStorage

write

有_delegate,_delegate->onWrite

否则 调用每个环形事件分发器中(其他线程)的write往自己线程的缓存写。往自己的线程中写

本线程的对应的pusher数据来了,往storage中写,同时调用各个线程的读poller去读这数据

void write(T in, bool is_key = true) {
        if (_delegate) {
            _delegate->onWrite(std::move(in), is_key);
            return;
        }

        LOCK_GUARD(_mtx_map);
        for (auto &pr : _dispatcher_map) {
            auto &second = pr.second;
            //切换线程后触发onRead事件
            pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);
        }
        _storage->write(std::move(in), is_key);
    }

发送包到player的调用栈

EventPoller::runLoop//
addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) //
pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);//
_RingReaderDispatcher::write --->reader->onRead(in, is_key);
_RingReader::onRead--->_read_cb(data);

_play_reader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) {
            auto strongSelf = weakSelf.lock();
            if (!strongSelf) {
                return;
            }
            strongSelf->sendRtpPacket(pack);//发送rtp包
        });

StrPrinter

一个可以支持<<的字符串

class _StrPrinter : public std::string {
public:
    _StrPrinter() {}

    template<typename T>
    _StrPrinter& operator <<(T && data) {
        _stream << std::forward<T>(data);
        this->std::string::operator=(_stream.str());
        return *this;
    }

    std::string operator <<(std::ostream&(*f)(std::ostream&)) const {
        return *this;
    }

private:
    std::stringstream _stream;
};

EventDispatcher/NoticeCenter

NoticeCenter

结构

std::recursive_mutex _mtxListener;
std::unordered_map<std::string, EventDispatcher::Ptr> _mapListener;

private 获取和删除

    EventDispatcher::Ptr getDispatcher(const std::string &event, bool create = false) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        auto it = _mapListener.find(event);
        if (it != _mapListener.end()) {
            return it->second;
        }
        if (create) {
            //如果为空则创建一个
            EventDispatcher::Ptr dispatcher(new EventDispatcher());
            _mapListener.emplace(event, dispatcher);
            return dispatcher;
        }
        return nullptr;
    }

	void delDispatcher(const std::string &event, const EventDispatcher::Ptr &dispatcher) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        auto it = _mapListener.find(event);
        if (it != _mapListener.end() && dispatcher == it->second) {
            //两者相同则删除
            _mapListener.erase(it);
        }
    }

EventDispatcher

在这里插入图片描述

任意类型的FUNC

template<typename FUNC>

void addListener(void *tag, FUNC &&func) {

​ using funType = typename function_traits<typename std::remove_reference::type>::stl_function_type;

​ std::shared_ptr<void> pListener(new funType(std::forward(func)), [](void *ptr) {

​ funType *obj = (funType *) ptr;

​ delete obj;

​ });

多参数

template<typename …ArgsType>

int emitEvent(ArgsType &&…args) {

​ using funType = std::function<void(decltype(std::forward<ArgsType>(args))…)>;

​ funType *obj = (funType *) (pr.second.get());

​ (*obj)(std::forward<ArgsType>(args)…);

class EventDispatcher {
public:
    friend class NoticeCenter;
    using Ptr = std::shared_ptr<EventDispatcher>;

    ~EventDispatcher() = default;

private:
    using MapType = std::unordered_multimap<void *, std::shared_ptr<void> >;

    EventDispatcher() = default;

    class InterruptException : public std::runtime_error {
    public:
        InterruptException() : std::runtime_error("InterruptException") {}

        ~InterruptException() {}
    };

    template<typename ...ArgsType>
    int emitEvent(ArgsType &&...args) {
        using funType = std::function<void(decltype(std::forward<ArgsType>(args))...)>;
        decltype(_mapListener) copy;
        {
            //先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            copy = _mapListener;
        }

        int ret = 0;
        for (auto &pr : copy) {
            funType *obj = (funType *) (pr.second.get());
            try {
                (*obj)(std::forward<ArgsType>(args)...);
                ++ret;
            } catch (InterruptException &) {
                ++ret;
                break;
            }
        }
        return ret;
    }

    template<typename FUNC>
    void addListener(void *tag, FUNC &&func) {
        using funType = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
        std::shared_ptr<void> pListener(new funType(std::forward<FUNC>(func)), [](void *ptr) {
            funType *obj = (funType *) ptr;
            delete obj;
        });
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        _mapListener.emplace(tag, pListener);
    }

    void delListener(void *tag, bool &empty) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        _mapListener.erase(tag);
        empty = _mapListener.empty();
    }

private:
    std::recursive_mutex _mtxListener;
    MapType _mapListener;
};

如果用C

完美转发void*代替

锁+map用静态的map和vector代替(非多线程)

typedef int (*BG_RPC_MSG_HANDLER_FUNC)(void *rpc_msg);
std::map<int, std::vector<BG_RPC_MSG_HANDLER_FUNC>> event_handles;

void event_register(int id, BG_RPC_MSG_HANDLER_FUNC func)
{
    auto& event_funcs = event_handles[id];
    for (const auto& each_func : event_funcs)
    {
        if (each_func == func)
        {
            error_log("id:%d already register!", id);
            return;
        }
    }
    event_handles[id].push_back(func);
}

void event_call(int id, void *msg)
{
    auto it = event_handles.find(id);
    if (it == event_handles.end()) return;
    
    for (const auto &h : it->second)
    {
        h(msg);
    }
}

总结

  • onceToken

  • EventDispatcher 模板编程,std::unordered_multimap<**void ***, std::shared_ptr<void> >的使用; std::forward, ArgsType &&…args

  • 如何多线程处理一个rtp数据包,且不涉及到拷贝

    每个RingBuffer有对应的map [EventPoller::Ptr, typename RingReaderDispatcher] 加锁,给对应的RingReaderDispatcher发rtp包指针,RingReaderDispatcher再给_reader_map中的不同客户端(但是是相同的poller管理)发rtp包。


http://www.niftyadmin.cn/n/3888.html

相关文章

基于光栅波导结构的 R ARMR 系统的 建模

增强现实和混合现实(AR&MR)作为全新的头戴式显示概念&#xff0c;作为 5G 时代的一个核心应用&#xff0c; 具有巨大的市场需求和潜力。其中一种典型的 AR&MR 设备是基于光栅波导结构。而正是因为 光学光栅这种微纳元件的使用&#xff0c;我们不能简单地使用基于几何光…

较低成本的ISO7637-2 5A 5B抛负载保护方案

科普下什么是抛负载&#xff0c;抛负载测试方案以及后端电路参数的选择。 在众多汽车电子电磁兼容测试中&#xff0c;最具破坏性的就是ISO7637-2的5A 5B测试了&#xff0c;当然也有的测试项目放在ISO16750标准中&#xff0c;但测试波形大体相同。上海雷卯有专门的文章描述这2个…

基于JAVA仁爱公益网站计算机毕业设计源码+系统+mysql数据库+lw文档+部署

基于JAVA仁爱公益网站计算机毕业设计源码系统mysql数据库lw文档部署 基于JAVA仁爱公益网站计算机毕业设计源码系统mysql数据库lw文档部署本源码技术栈&#xff1a; 项目架构&#xff1a;B/S架构 开发语言&#xff1a;Java语言 开发软件&#xff1a;idea eclipse 前端技术&a…

《进程状态》

【一】看看Linux内核源代码怎么说 为了弄明白正在运行的进程是什么意思&#xff0c;我们需要知道进程的不同状态。一个进程可以有几个状态&#xff08;再Linux内核里&#xff0c;进程有时候也叫做任务&#xff09;。下面的状态再kernel源代码里的定义&#xff1a; /* * The t…

农产品果蔬商城交易系统(Java+Web+MySQL)

目录 摘要 I Abstract II 前言 1 1 课题简介 2 1.1 选题背景 2 1.2 课题的意义 2 1.3 系统目标 3 2. 可行性研究 5 2.1 技术可行性 5 2.2 经济可行性 5 2.3 操作可行性 5 2.4 法律可行性 6 3. 需求分析 7 3.1 系统需要解决的主要问题 7 3.2 系统具备的基本功能 7 3.3 数据流图…

HICP之BGP基础

目录 HICP之BGP基础 自治系统 --- AS 定义 形成原因 AS号 BGP BGP之间传递路由信息的方式 BGP --- 无类别的路径矢量型协议 BGP --- 特点 在BGP中&#xff0c;我们将邻居关系称为对等体关系&#xff1a; BGP的数据包 open包 Keeplive包 update包 notification包 …

Bypass WAF常规绕过思路

BYPASS WAFBYPASS WAF概念WAF的分类BYPASS WAF的各种绕过姿势Web架构层bypassWEB Server层bypassApache1.畸形method2.phpapache畸形的boundaryIIS1.%特性2.%u特性3.另类%u特性Web应用层bypass1.双重url编码2.变更请求方式3.HPP数据库层bypassMysql数据库1.参数和union之间的WA…

【Linux】第十章 进程间通信(管道+system V共享内存)

&#x1f3c6;个人主页&#xff1a;企鹅不叫的博客 ​ &#x1f308;专栏 C语言初阶和进阶C项目Leetcode刷题初阶数据结构与算法C初阶和进阶《深入理解计算机操作系统》《高质量C/C编程》Linux ⭐️ 博主码云gitee链接&#xff1a;代码仓库地址 ⚡若有帮助可以【关注点赞收藏】…