/** * @file XNThreadObject.cpp * @author jinchao * @brief 线程类源文件 * @version 1.0 * @date 2024-11-07 * * @copyright Copyright (c) 2024 XN * */ #include "XNThreadObject.h" #include "XNDDSManager/XNDDSManager.h" #include "XNFramework/XNFramework.h" #include "XNIDL/XNSimStatusPubSubTypes.hpp" #include "XNLogger/XNLogger.h" #include "XNThreadManager/XNThreadManager.h" #include "XNThreadObject_p.h" namespace XNSim { XNThreadObject::XNThreadObject(XN_STRING name, XN_DOUBLE freq, XN_UINT32 priority, XN_UINT32 CPUAff) : XNObject(new XNThreadObjectPrivate()) { SetObjectName(name); T_D(); d->_uPriority = priority; d->_uAffinity = CPUAff; d->_pinfo.period_ns = 1.0E9 / freq; d->_setFreq = freq; d->_funVec.resize(32); // InitialFunPool(); } // 默认析构函数 XNThreadObject::~XNThreadObject(void) {} XNFrameworkPtr XNThreadObject::GetFramework() { T_D(); return d->_framework; } void XNThreadObject::SetFramework(XNFrameworkPtr framework) { T_D(); d->_framework = framework; } // 初始化函数 XN_BOOL XNThreadObject::Initialize() { T_D(); XN_INT32 ret; #ifdef XN_WINDOWS d->_thread = std::thread(&XNThreadObject::ThreadFunction, this); HANDLE h_thread = d->_thread.native_handle(); if (h_thread != nullptr) { bool bRet = SetThreadPriority(h_thread, THREAD_PRIORITY_TIME_CRITICAL); if (!bRet) { LOG_ERROR("0x2210 Thread: %1 Set Thread Priority Failed!", GetObjectName()); return false; } } #endif #ifdef XN_LINUX // 初始化线程参数 ret = pthread_attr_init(&(d->_attr)); if (ret) { LOG_ERROR("0x2210 Thread: %1 Initialize Attribute Failed!", GetObjectName()); return false; } // 设置线程栈空间大小 ret = pthread_attr_setstacksize(&(d->_attr), PTHREAD_STACK_MIN * 20); if (ret) { LOG_ERROR("0x2211 Thread: %1 Set Stack Space Failed!", GetObjectName()); return false; } // 设置线程调度策略 ret = pthread_attr_setschedpolicy(&(d->_attr), SCHED_FIFO); if (ret) { LOG_ERROR("0x2212 Thread: %1 Set Scheduling Policy Failed!", GetObjectName()); return false; } // 设置线程优先级 d->_param.sched_priority = d->_uPriority; ret = pthread_attr_setschedparam(&(d->_attr), &d->_param); if (ret) { LOG_ERROR("0x2213 Thread: %1 Set Priority Failed!", GetObjectName()); return false; } // 设置调度器继承 ret = pthread_attr_setinheritsched(&(d->_attr), PTHREAD_EXPLICIT_SCHED); if (ret) { LOG_ERROR("0x2214 Thread: %1 Set Scheduler Inheritance Failed!", GetObjectName()); return false; } // 线程创建 ret = pthread_create(&d->_thread, &d->_attr, ThreadFunction, this); if (ret) { LOG_ERROR("0x2215 Thread: %1 Create Failed!", GetObjectName()); return false; } // 设置亲和性 if (!OnSetCPUAffinity()) { return false; } #endif XNFrameworkPtr framework = GetFramework(); if (!framework) { LOG_WARNING("0x2216 Thread: %1 get Framework Failed!", GetObjectName()); return true; } XNDDSManagerPtr ddsManager = framework->GetDDSManager(); if (!ddsManager) { LOG_WARNING("0x2216 Thread: %1 get DDSManager Failed!", GetObjectName()); return true; } d->_writer = ddsManager ->RegisterPublisher( "XNSim::XNSimStatus::XNThreadStatus", d->_threadID); if (d->_writer == nullptr) { LOG_WARNING("0x2217 Thread: %1 get DDS Writer Failed!", GetObjectName()); return true; } LOG_INFO("Thread: %1 is prepared!", GetObjectName()); return true; } // 仿真控制 void XNThreadObject::SimControl(XN_UINT32 objectId, SimControlCmd cmd) { T_D(); if (objectId == 0) { switch (cmd) { case SimControlCmd::Start: Start(); break; case SimControlCmd::Suspend: Pause(); break; case SimControlCmd::Continue: Continue(); break; case SimControlCmd::Abort: Stop(true); break; } } } // 开始执行 void XNThreadObject::Start() { T_D(); XNThreadMutexLock(&d->_mtx); // 设置运行状态 if (d->_eRunStatus == RunStatus::NotStart || d->_eRunStatus == RunStatus::Suspend) { d->_eRunStatus = RunStatus::Runing; } XNThreadCVNotifyOne(&d->_cond); XNThreadMutexUnlock(&d->_mtx); LOG_INFO("Thread: %1 Start!", GetObjectName()); } // 暂停执行 void XNThreadObject::Pause() { T_D(); XNThreadMutexLock(&d->_mtx); // 设置运行状态 if (d->_eRunStatus == RunStatus::Runing) { d->_eRunStatus = RunStatus::Suspend; } XNThreadMutexUnlock(&d->_mtx); LOG_INFO("Thread: %1 Pause!", GetObjectName()); } // 继续执行 void XNThreadObject::Continue() { T_D(); XNThreadMutexLock(&d->_mtx); // 设置运行状态 if (d->_eRunStatus == RunStatus::Suspend) { d->_eRunStatus = RunStatus::Runing; } XNThreadCVNotifyOne(&d->_cond); XNThreadMutexUnlock(&d->_mtx); } // 停止执行 void XNThreadObject::Stop(bool force) { T_D(); if (force) { XNThreadMutexLock(&d->_mtx); // 设置运行状态 d->_eRunStatus = RunStatus::Aborted; XNThreadCVNotifyOne(&d->_cond); XNThreadMutexUnlock(&d->_mtx); Join(); } else { XNThreadMutexLock(&d->_mtx); // 设置运行状态 d->_eRunStatus = RunStatus::Finished; XNThreadCVNotifyOne(&d->_cond); XNThreadMutexUnlock(&d->_mtx); Join(); } LOG_INFO("Thread: %1 Stop!", GetObjectName()); } // 加入线程 void XNThreadObject::Join() { T_D(); XNThreadJoin(d->_thread); } // 分离线程 void XNThreadObject::Detach() { T_D(); XNThreadDetach(d->_thread); } // 获取线程运行状态 RunStatus XNThreadObject::GetRunStatus() { T_D(); return d->_eRunStatus; } // 向线程添加周期性函数 void XNThreadObject::AddFunction(XNCallBack fun, FreqLevel freq, XN_UINT32 pos, XN_UINT32 priorty) { T_D(); for (XN_INT32 i = 0; i < d->_funVec.size();) { if (i + pos >= d->_funVec.size()) break; d->_funVec[i + pos][priorty].push_back(fun); switch (freq) { case FreqLevel::BaseFreq: i++; break; case FreqLevel::HalfFreq: i += 2; break; case FreqLevel::QuarterFreq: i += 4; break; case FreqLevel::EighthFreq: i += 8; break; case FreqLevel::SixteenthFreq: i += 16; break; case FreqLevel::ThirtyTwothFreq: i += 32; break; default: i += 32; break; } } } // 获取线程运行频率 const XN_DOUBLE &XNThreadObject::GetRunFrequecy() { T_D(); return d->_setFreq; } // 设置线程运行频率 void XNThreadObject::SetRunFrequecy(const XN_DOUBLE &dRunFrequecy) { T_D(); d->_setFreq = dRunFrequecy; d->_pinfo.period_ns = 1.0E9 / dRunFrequecy; } // 获取线程运行优先级 const XN_UINT32 &XNThreadObject::GetRunPriority() { T_D(); return d->_uPriority; } // 设置线程运行优先级 void XNThreadObject::SetRunPriority(const XN_UINT32 &uRunPriority) { T_D(); d->_uPriority = uRunPriority; } // 获取线程CPU亲和性掩码 const XN_UINT32 &XNThreadObject::GetCPUAffinity() { T_D(); return d->_uAffinity; } // 设置线程CPU亲和性掩码 void XNThreadObject::SetCPUAffinity(const XN_UINT32 &uCPUAffinity) { T_D(); d->_uAffinity = uCPUAffinity; } // 设置线程运行间隔 void XNThreadObject::SetStartTime(const XN_TIMESPEC &startTime) { T_D(); #ifdef XN_WINDOWS d->_lastRunTime = startTime; #endif #ifdef XN_LINUX d->_pinfo.next_period = startTime; d->_lastRunTime = startTime; #endif } // 执行线程CPU亲和性设置 bool XNThreadObject::OnSetCPUAffinity() { T_D(); #ifdef XN_LINUX cpu_set_t mask; CPU_ZERO(&mask); XN_INT32 cpuNum = sysconf(_SC_NPROCESSORS_CONF); for (XN_INT32 i = 0; i < cpuNum; i++) { if (((d->_uAffinity >> i) & 1) == 1) CPU_SET(i, &mask); } if (pthread_setaffinity_np(d->_thread, sizeof(mask), &mask) == -1) { LOG_WARNING("0x2216 线程: %1 设置CPU亲和性失败!", GetObjectName()); return false; } #endif return true; } // 线程主执行函数 #ifdef XN_WINDOWS void XNThreadObject::ThreadFunction() { T_D(); XN_STRING name = GetObjectName(); #endif #ifdef XN_LINUX void *XNThreadObject::ThreadFunction(void *args) { // 获取创建线程类的私有变量结构体指针 ThisType *thisPtr = (ThisType *)args; auto d = thisPtr->GetPP(); XN_STRING name = thisPtr->GetObjectName(); #endif while (1) { // 加锁保护线程状态 XNThreadMutexLock(&d->_mtx); // 当处于notstart或suspend时准备挂起 while (d->_eRunStatus == RunStatus::NotStart || d->_eRunStatus == RunStatus::Suspend) { // 如果挂起时切换为终止,则直接跳出 if (d->_eRunStatus == RunStatus::Aborted || d->_eRunStatus == RunStatus::Finished) { XNThreadMutexUnlock(&d->_mtx); break; } // 挂起线程 XNThreadCVWait(&d->_cond, &d->_mtx); } // 如果为finished直接结束线程 if (d->_eRunStatus == RunStatus::Finished) { XNThreadMutexUnlock(&d->_mtx); break; } // 解锁 XNThreadMutexUnlock(&d->_mtx); // 任务执行 auto &funMap = d->_funVec[d->_RunPosition++]; for (auto &funv : funMap) { for (auto &fun : funv.second) { fun(); } } d->_RunPosition %= d->_funVec.size(); // 如果为abort等待任务执行完成再结束 XNThreadMutexLock(&d->_mtx); if (d->_eRunStatus == RunStatus::Aborted) { XNThreadMutexUnlock(&d->_mtx); break; } XNThreadMutexUnlock(&d->_mtx); // 填写DDS主题数据 XN_UINT32 setFreq = (1.0E9 / d->_pinfo.period_ns) < 1.0 ? 1 : (XN_UINT32)d->_setFreq; if (d->_writer != nullptr && d->_count > 0 && d->_count % setFreq == 0) { XN_TIMESPEC now; getCurrentRTTime(&now); double seconds = calculateRTTime(&now, &d->_lastRunTime); XNSim::XNSimStatus::XNThreadStatus threadStatus; threadStatus.XNThreadName(name); threadStatus.XNThreadID(XNThreadGetID()); threadStatus.XNThreadSt((uint32_t)d->_eRunStatus); threadStatus.XNThreadAff(d->_uAffinity); threadStatus.XNThreadPro(d->_uPriority); threadStatus.XNThRunCnt(d->_count); threadStatus.XNThSetFreq(d->_setFreq); threadStatus.XNThCurFreq(d->_setFreq / seconds); d->_writer->write(&threadStatus); d->_lastRunTime = now; LOG_DEBUG("Thread: %1 Write DDS! SetFreq: %2 Hz, CurFreq: %3 Hz", name, d->_setFreq, d->_setFreq / seconds); } d->_count++; sleepToNextRTTime(&d->_pinfo); } #ifdef XN_WINDOWS } #endif #ifdef XN_LINUX return nullptr; } #endif // 获取线程ID const XN_UINT32 &XNThreadObject::GetThreadID() { T_D(); return d->_threadID; } // 设置线程ID void XNThreadObject::SetThreadID(const XN_UINT32 &threadID) { T_D(); d->_threadID = threadID; } } // namespace XNSim