/** * @file XNThread.cpp * @author jinchao * @brief 线程类源文件 * @version 1.0 * @date 2024-11-07 * * @copyright Copyright (c) 2024 XN * */ #include "XNLogger.h" #include "XNThread.h" #include "XNThreadManager.h" #include "XNDDSManager.h" #include "XNIDL/XNSimStatusPubSubTypes.hpp" /** * @brief 线程类私有结构体 */ class XNThreadPrivate { public: /** * @brief 宏定义,用于声明公有数据成员 */ Q_DECLARE_PUBLIC(XNThread) /** * @brief 构造函数 * @param q 线程类指针 */ explicit XNThreadPrivate(XNThread *q) : q_ptr(q) {} private: /** * @brief 线程类指针 */ XNThread *q_ptr; /** * @brief 线程运行频率组 */ FreqLevel _eRunFreq = FreqLevel::BaseFreq; /** * @brief 线程运行优先级 */ quint32 _uPriority = 0; /** * @brief 线程CPU亲和性掩码 * @details 按位表示某CPU核是否使用,从低到高,0表示不使用,1表示使用。例如:0x00000003表示使用0,1号CPU */ quint32 _uAffinity = 0; /** * @brief 线程运行频率 */ double _setFreq = BASE_RUN_FREQ; /** * @brief 线程调度任务表 */ QVector > > _funVec; /** * @brief pthread线程调度参数 */ sched_param param; /** * @brief pthread线程属性 */ pthread_attr_t attr; /** * @brief pthread线程 */ pthread_t thread; /** * @brief 线程睡眠时间控制 */ PERIOD_INFO pinfo; /** * @brief 线程控制锁 */ pthread_mutex_t _mtx = PTHREAD_MUTEX_INITIALIZER; /** * @brief 线程控制条件变量 */ pthread_cond_t _cond = PTHREAD_COND_INITIALIZER; /** * @brief 线程运行状态 */ RunStatus _eRunStatus = RunStatus::NotStart; /** * @brief 线程执行进度 */ quint32 _RunPosition = 0; /** * @brief 线程执行计数 */ int count = 0; /** * @brief 线程执行时间统计 */ timespec _lastRunTime; /** * @brief 线程运行状态主题写入器 */ FAST_DDS_MACRO::DataWriter *writer; /** * @brief 线程ID */ quint32 _threadID = 0; }; // 默认构造函数 XNThread::XNThread(QObject *parent) : QObject(parent), d_ptr(new XNThreadPrivate(this)) { } XNThread::XNThread(QObject *parent, QString name, FreqLevel freq, quint32 priority, quint32 CPUAff, double RunInter) : QObject(parent), d_ptr(new XNThreadPrivate(this)) { Q_D(XNThread); setObjectName(name); d->_eRunFreq = freq; d->_uPriority = priority; d->_uAffinity = CPUAff; d->pinfo.period_ns = RunInter; d->_setFreq = 1.0E9 / RunInter; InitialFunPool(); } // 默认析构函数 XNThread::~XNThread(void) { Q_D(XNThread); delete d; } // 初始化函数 bool XNThread::Initialize() { Q_D(XNThread); int ret; // 初始化线程参数 ret = pthread_attr_init(&(d->attr)); if (ret) { LOG_ERROR("0x2210 Thread: %1 Initialize Attribute Failed!", objectName()); return false; } // 设置线程栈空间大小 ret = pthread_attr_setstacksize(&(d->attr), PTHREAD_STACK_MIN); if (ret) { LOG_ERROR("0x2211 Thread: %1 Set Stack Space Failed!", objectName()); return false; } // 设置线程调度策略 ret = pthread_attr_setschedpolicy(&(d->attr), SCHED_FIFO); if (ret) { LOG_ERROR("0x2212 Thread: %1 Set Scheduling Policy Failed!", objectName()); return false; } // 设置线程优先级 d->param.sched_priority = d->_uPriority; ret = pthread_attr_setschedparam(&(d->attr), &d->param); if (ret) { LOG_ERROR("0x2213 Thread: %1 Set Priority Failed!", objectName()); return false; } // 设置调度器继承 ret = pthread_attr_setinheritsched(&(d->attr), PTHREAD_EXPLICIT_SCHED); if (ret) { LOG_ERROR("0x2214 Thread: %1 Set Scheduler Inheritance Failed!", objectName()); return false; } // 线程创建 ret = pthread_create(&d->thread, &d->attr, ThreadFunction, this); if (ret) { LOG_ERROR("0x2215 Thread: %1 Create Failed!", objectName()); return false; } // 设置亲和性 if (!OnSetCPUAffinity()) { return false; } // if (objectName() == "TimeManagerThread") // return true; XNThreadManager *threadManager = qobject_cast(parent()); if (threadManager == nullptr) { LOG_WARNING("0x2216 Thread: %1 get ThreadManager Failed!", objectName()); return true; } XNDDSManager *ddsManager = threadManager->parent()->findChild(); if (ddsManager == nullptr) { LOG_WARNING("0x2216 Thread: %1 get DDSManager Failed!", objectName()); return true; } d->writer = ddsManager->RegisterPublisher( "XNSim::XNSimStatus::XNThreadStatus", d->_threadID); if (d->writer == nullptr) { LOG_WARNING("0x2217 Thread: %1 get DDS Writer Failed!", objectName()); return true; } LOG_INFO("Thread: %1 is prepared!", objectName()); return true; } // 仿真控制 void XNThread::OnSimControl(quint32 objectId, SimControlCmd cmd) { Q_D(XNThread); if (objectId == 0) { switch (cmd) { case SimControlCmd::Start: Start(); break; case SimControlCmd::Suspend: Pause(); break; case SimControlCmd::Continue: Continue(); break; case SimControlCmd::Abort: Stop(true); break; } } } // 开始执行 void XNThread::Start() { Q_D(XNThread); pthread_mutex_lock(&d->_mtx); // 设置运行状态 if (d->_eRunStatus == RunStatus::NotStart || d->_eRunStatus == RunStatus::Suspend) { d->_eRunStatus = RunStatus::Runing; } pthread_cond_signal(&d->_cond); pthread_mutex_unlock(&d->_mtx); LOG_INFO("Thread: %1 Start!", objectName()); } // 暂停执行 void XNThread::Pause() { Q_D(XNThread); pthread_mutex_lock(&d->_mtx); // 设置运行状态 if (d->_eRunStatus == RunStatus::Runing) { d->_eRunStatus = RunStatus::Suspend; } pthread_mutex_unlock(&d->_mtx); LOG_INFO("Thread: %1 Pause!", objectName()); } // 继续执行 void XNThread::Continue() { Q_D(XNThread); pthread_mutex_lock(&d->_mtx); // 设置运行状态 if (d->_eRunStatus == RunStatus::Suspend) { d->_eRunStatus = RunStatus::Runing; } pthread_cond_signal(&d->_cond); pthread_mutex_unlock(&d->_mtx); } // 停止执行 void XNThread::Stop(bool force) { Q_D(XNThread); if (force) { pthread_mutex_lock(&d->_mtx); // 设置运行状态 d->_eRunStatus = RunStatus::Aborted; pthread_cond_signal(&d->_cond); pthread_mutex_unlock(&d->_mtx); Join(); } else { pthread_mutex_lock(&d->_mtx); // 设置运行状态 d->_eRunStatus = RunStatus::Finished; pthread_cond_signal(&d->_cond); pthread_mutex_unlock(&d->_mtx); Join(); } LOG_INFO("Thread: %1 Stop!", objectName()); } // 加入线程 void XNThread::Join() { Q_D(XNThread); pthread_join(d->thread, NULL); } // 分离线程 void XNThread::Detach() { Q_D(XNThread); pthread_detach(d->thread); } // 获取线程运行状态 RunStatus XNThread::GetRunStatus() { Q_D(XNThread); return d->_eRunStatus; } // 向线程添加周期性函数 void XNThread::AddFunction(XNCallBack fun, FreqLevel freq, quint32 pos, quint32 priorty) { Q_D(XNThread); for (int i = 0; i < d->_funVec.size();) { if (i + pos >= d->_funVec.size()) break; d->_funVec[i + pos][priorty].push_back(fun); switch (freq) { case FreqLevel::BaseFreq: i++; break; case FreqLevel::HalfFreq: i += 2; break; case FreqLevel::QuarterFreq: i += 4; break; case FreqLevel::EighthFreq: i += 8; break; case FreqLevel::SixteenthFreq: i += 16; break; case FreqLevel::ThirtyTwothFreq: i += 32; break; default: i += 32; break; } } } // 获取线程运行频率 const FreqLevel &XNThread::GetRunFrequecy() { Q_D(XNThread); return d->_eRunFreq; } // 设置线程运行频率 void XNThread::SetRunFrequecy(const FreqLevel &eRunFrequecy) { Q_D(XNThread); d->_eRunFreq = eRunFrequecy; InitialFunPool(); } // 获取线程运行优先级 const quint32 &XNThread::GetRunPriority() { Q_D(XNThread); return d->_uPriority; } // 设置线程运行优先级 void XNThread::SetRunPriority(const quint32 &uRunPriority) { Q_D(XNThread); d->_uPriority = uRunPriority; } // 获取线程CPU亲和性掩码 const quint32 &XNThread::GetCPUAffinity() { Q_D(XNThread); return d->_uAffinity; } // 设置线程CPU亲和性掩码 void XNThread::SetCPUAffinity(const quint32 &uCPUAffinity) { Q_D(XNThread); d->_uAffinity = uCPUAffinity; } // 设置线程运行间隔 void XNThread::SetRunInter(const double &dRunInter) { Q_D(XNThread); d->pinfo.period_ns = dRunInter; d->_setFreq = 1.0E9 / dRunInter; } // 获取线程运行间隔 void XNThread::OnSetStartTime(const timespec &startTime) { Q_D(XNThread); d->pinfo.next_period = startTime; d->_lastRunTime = startTime; } // 执行线程CPU亲和性设置 bool XNThread::OnSetCPUAffinity() { Q_D(XNThread); cpu_set_t mask; CPU_ZERO(&mask); int cpuNum = sysconf(_SC_NPROCESSORS_CONF); for (int i = 0; i < cpuNum; i++) { if (((d->_uAffinity >> i) & 1) == 1) CPU_SET(i, &mask); } if (pthread_setaffinity_np(d->thread, sizeof(mask), &mask) == -1) { LOG_WARNING("0x2216 线程: %1 设置CPU亲和性失败!", objectName()); return false; } return true; } // 线程主执行函数 void *XNThread::ThreadFunction(void *args) { // 获取创建线程类的私有变量结构体指针 XNThreadPrivate *temp = ((XNThread *)args)->d_ptr; // 获取当前时间 // clock_gettime(CLOCK_MONOTONIC, &(temp->pinfo.next_period)); while (1) { // 加锁保护线程状态 pthread_mutex_lock(&temp->_mtx); // 当处于notstart或suspend时准备挂起 while (temp->_eRunStatus == RunStatus::NotStart || temp->_eRunStatus == RunStatus::Suspend) { // 如果挂起时切换为终止,则直接跳出 if (temp->_eRunStatus == RunStatus::Aborted || temp->_eRunStatus == RunStatus::Finished) { pthread_mutex_unlock(&temp->_mtx); break; } // 挂起线程 pthread_cond_wait(&temp->_cond, &temp->_mtx); } // 如果为finished直接结束线程 if (temp->_eRunStatus == RunStatus::Finished) { pthread_mutex_unlock(&temp->_mtx); break; } // 解锁 pthread_mutex_unlock(&temp->_mtx); // 任务执行 auto &funMap = temp->_funVec[temp->_RunPosition++]; for (auto &funv : funMap) { for (auto &fun : funv) { fun(); } } temp->_RunPosition %= temp->_funVec.size(); // 如果为abort等待任务执行完成再结束 pthread_mutex_lock(&temp->_mtx); if (temp->_eRunStatus == RunStatus::Aborted) { pthread_mutex_unlock(&temp->_mtx); break; } pthread_mutex_unlock(&temp->_mtx); // 填写DDS主题数据 quint32 setFreq = (1.0E9 / temp->pinfo.period_ns) < 1.0 ? 1 : (quint32)temp->_setFreq; if (temp->writer != nullptr && temp->count > 0 && temp->count % setFreq == 0) { timespec now; clock_gettime(CLOCK_MONOTONIC, &now); double seconds = (double)(now.tv_sec - temp->_lastRunTime.tv_sec) + (double)(now.tv_nsec - temp->_lastRunTime.tv_nsec) / 1.0E9; XNSim::XNSimStatus::XNThreadStatus threadStatus; threadStatus.XNThreadName(temp->q_ptr->objectName().toStdString()); threadStatus.XNThreadID(pthread_self()); threadStatus.XNThreadSt((quint32)temp->_eRunStatus); threadStatus.XNThreadAff(temp->_uAffinity); threadStatus.XNThreadPro(temp->_uPriority); threadStatus.XNThRunCnt(temp->count); threadStatus.XNThSetFreq(temp->_setFreq); threadStatus.XNThCurFreq(temp->_setFreq / seconds); temp->writer->write(&threadStatus); temp->_lastRunTime = now; LOG_DEBUG("Thread: %1 Write DDS! SetFreq: %2 Hz, CurFreq: %3 Hz", temp->q_ptr->objectName(), temp->_setFreq, temp->_setFreq / seconds); } temp->count++; // 睡眠时间步进 temp->pinfo.next_period.tv_nsec += temp->pinfo.period_ns; // 睡眠时间整理 while (temp->pinfo.next_period.tv_nsec >= 1000000000) { temp->pinfo.next_period.tv_sec++; temp->pinfo.next_period.tv_nsec -= 1000000000; } // 执行纳秒睡眠 clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &temp->pinfo.next_period, NULL); } return nullptr; } // 初始化线程调度表 void XNThread::InitialFunPool() { Q_D(XNThread); // 设置循环表长度 switch (d->_eRunFreq) { case FreqLevel::BaseFreq: d->_funVec.resize(32); break; case FreqLevel::HalfFreq: d->_funVec.resize(16); break; case FreqLevel::QuarterFreq: d->_funVec.resize(8); break; case FreqLevel::EighthFreq: d->_funVec.resize(4); break; case FreqLevel::SixteenthFreq: d->_funVec.resize(2); break; default: d->_funVec.resize(1); break; } } // 获取线程ID const quint32 &XNThread::GetThreadID() { Q_D(XNThread); return d->_threadID; } // 设置线程ID void XNThread::SetThreadID(const quint32 &threadID) { Q_D(XNThread); d->_threadID = threadID; }