XNSim/XNCore/XNThread.cpp
2025-04-28 12:25:20 +08:00

557 lines
12 KiB
C++
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file XNThread.cpp
* @author jinchao
* @brief 线程类源文件
* @version 1.0
* @date 2024-11-07
*
* @copyright Copyright (c) 2024 XN
*
*/
#include "XNLogger.h"
#include "XNThread.h"
#include "XNThreadManager.h"
#include "XNDDSManager.h"
#include "XNIDL/XNSimStatusPubSubTypes.hpp"
/**
* @brief 线程类私有结构体
*/
class XNThreadPrivate
{
public:
/**
* @brief 宏定义,用于声明公有数据成员
*/
Q_DECLARE_PUBLIC(XNThread)
/**
* @brief 构造函数
* @param q 线程类指针
*/
explicit XNThreadPrivate(XNThread *q) : q_ptr(q) {}
private:
/**
* @brief 线程类指针
*/
XNThread *q_ptr;
/**
* @brief 线程运行频率组
*/
FreqLevel _eRunFreq = FreqLevel::BaseFreq;
/**
* @brief 线程运行优先级
*/
quint32 _uPriority = 0;
/**
* @brief 线程CPU亲和性掩码
* @details 按位表示某CPU核是否使用从低到高0表示不使用1表示使用。例如0x00000003表示使用0,1号CPU
*/
quint32 _uAffinity = 0;
/**
* @brief 线程运行频率
*/
double _setFreq = BASE_RUN_FREQ;
/**
* @brief 线程调度任务表
*/
QVector<QMap<quint32, QVector<XNCallBack> > > _funVec;
/**
* @brief pthread线程调度参数
*/
sched_param param;
/**
* @brief pthread线程属性
*/
pthread_attr_t attr;
/**
* @brief pthread线程
*/
pthread_t thread;
/**
* @brief 线程睡眠时间控制
*/
PERIOD_INFO pinfo;
/**
* @brief 线程控制锁
*/
pthread_mutex_t _mtx = PTHREAD_MUTEX_INITIALIZER;
/**
* @brief 线程控制条件变量
*/
pthread_cond_t _cond = PTHREAD_COND_INITIALIZER;
/**
* @brief 线程运行状态
*/
RunStatus _eRunStatus = RunStatus::NotStart;
/**
* @brief 线程执行进度
*/
quint32 _RunPosition = 0;
/**
* @brief 线程执行计数
*/
int count = 0;
/**
* @brief 线程执行时间统计
*/
timespec _lastRunTime;
/**
* @brief 线程运行状态主题写入器
*/
FAST_DDS_MACRO::DataWriter *writer;
/**
* @brief 线程ID
*/
quint32 _threadID = 0;
};
// 默认构造函数
XNThread::XNThread(QObject *parent) : QObject(parent), d_ptr(new XNThreadPrivate(this))
{
}
XNThread::XNThread(QObject *parent, QString name, FreqLevel freq, quint32 priority, quint32 CPUAff,
double RunInter)
: QObject(parent), d_ptr(new XNThreadPrivate(this))
{
Q_D(XNThread);
setObjectName(name);
d->_eRunFreq = freq;
d->_uPriority = priority;
d->_uAffinity = CPUAff;
d->pinfo.period_ns = RunInter;
d->_setFreq = 1.0E9 / RunInter;
InitialFunPool();
}
// 默认析构函数
XNThread::~XNThread(void)
{
Q_D(XNThread);
delete d;
}
// 初始化函数
bool XNThread::Initialize()
{
Q_D(XNThread);
int ret;
// 初始化线程参数
ret = pthread_attr_init(&(d->attr));
if (ret) {
LOG_ERROR("0x2210 Thread: %1 Initialize Attribute Failed!", objectName());
return false;
}
// 设置线程栈空间大小
ret = pthread_attr_setstacksize(&(d->attr), PTHREAD_STACK_MIN);
if (ret) {
LOG_ERROR("0x2211 Thread: %1 Set Stack Space Failed!", objectName());
return false;
}
// 设置线程调度策略
ret = pthread_attr_setschedpolicy(&(d->attr), SCHED_FIFO);
if (ret) {
LOG_ERROR("0x2212 Thread: %1 Set Scheduling Policy Failed!", objectName());
return false;
}
// 设置线程优先级
d->param.sched_priority = d->_uPriority;
ret = pthread_attr_setschedparam(&(d->attr), &d->param);
if (ret) {
LOG_ERROR("0x2213 Thread: %1 Set Priority Failed!", objectName());
return false;
}
// 设置调度器继承
ret = pthread_attr_setinheritsched(&(d->attr), PTHREAD_EXPLICIT_SCHED);
if (ret) {
LOG_ERROR("0x2214 Thread: %1 Set Scheduler Inheritance Failed!", objectName());
return false;
}
// 线程创建
ret = pthread_create(&d->thread, &d->attr, ThreadFunction, this);
if (ret) {
LOG_ERROR("0x2215 Thread: %1 Create Failed!", objectName());
return false;
}
// 设置亲和性
if (!OnSetCPUAffinity()) {
return false;
}
// if (objectName() == "TimeManagerThread")
// return true;
XNThreadManager *threadManager = qobject_cast<XNThreadManager *>(parent());
if (threadManager == nullptr) {
LOG_WARNING("0x2216 Thread: %1 get ThreadManager Failed!", objectName());
return true;
}
XNDDSManager *ddsManager = threadManager->parent()->findChild<XNDDSManager *>();
if (ddsManager == nullptr) {
LOG_WARNING("0x2216 Thread: %1 get DDSManager Failed!", objectName());
return true;
}
d->writer = ddsManager->RegisterPublisher<XNSim::XNSimStatus::XNThreadStatusPubSubType>(
"XNSim::XNSimStatus::XNThreadStatus", d->_threadID);
if (d->writer == nullptr) {
LOG_WARNING("0x2217 Thread: %1 get DDS Writer Failed!", objectName());
return true;
}
LOG_INFO("Thread: %1 is prepared!", objectName());
return true;
}
// 仿真控制
void XNThread::OnSimControl(quint32 objectId, SimControlCmd cmd)
{
Q_D(XNThread);
if (objectId == 0) {
switch (cmd) {
case SimControlCmd::Start:
Start();
break;
case SimControlCmd::Suspend:
Pause();
break;
case SimControlCmd::Continue:
Continue();
break;
case SimControlCmd::Abort:
Stop(true);
break;
}
}
}
// 开始执行
void XNThread::Start()
{
Q_D(XNThread);
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
if (d->_eRunStatus == RunStatus::NotStart || d->_eRunStatus == RunStatus::Suspend) {
d->_eRunStatus = RunStatus::Runing;
}
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
LOG_INFO("Thread: %1 Start!", objectName());
}
// 暂停执行
void XNThread::Pause()
{
Q_D(XNThread);
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
if (d->_eRunStatus == RunStatus::Runing) {
d->_eRunStatus = RunStatus::Suspend;
}
pthread_mutex_unlock(&d->_mtx);
LOG_INFO("Thread: %1 Pause!", objectName());
}
// 继续执行
void XNThread::Continue()
{
Q_D(XNThread);
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
if (d->_eRunStatus == RunStatus::Suspend) {
d->_eRunStatus = RunStatus::Runing;
}
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
}
// 停止执行
void XNThread::Stop(bool force)
{
Q_D(XNThread);
if (force) {
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
d->_eRunStatus = RunStatus::Aborted;
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
Join();
} else {
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
d->_eRunStatus = RunStatus::Finished;
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
Join();
}
LOG_INFO("Thread: %1 Stop!", objectName());
}
// 加入线程
void XNThread::Join()
{
Q_D(XNThread);
pthread_join(d->thread, NULL);
}
// 分离线程
void XNThread::Detach()
{
Q_D(XNThread);
pthread_detach(d->thread);
}
// 获取线程运行状态
RunStatus XNThread::GetRunStatus()
{
Q_D(XNThread);
return d->_eRunStatus;
}
// 向线程添加周期性函数
void XNThread::AddFunction(XNCallBack fun, FreqLevel freq, quint32 pos, quint32 priorty)
{
Q_D(XNThread);
for (int i = 0; i < d->_funVec.size();) {
if (i + pos >= d->_funVec.size())
break;
d->_funVec[i + pos][priorty].push_back(fun);
switch (freq) {
case FreqLevel::BaseFreq:
i++;
break;
case FreqLevel::HalfFreq:
i += 2;
break;
case FreqLevel::QuarterFreq:
i += 4;
break;
case FreqLevel::EighthFreq:
i += 8;
break;
case FreqLevel::SixteenthFreq:
i += 16;
break;
case FreqLevel::ThirtyTwothFreq:
i += 32;
break;
default:
i += 32;
break;
}
}
}
// 获取线程运行频率
const FreqLevel &XNThread::GetRunFrequecy()
{
Q_D(XNThread);
return d->_eRunFreq;
}
// 设置线程运行频率
void XNThread::SetRunFrequecy(const FreqLevel &eRunFrequecy)
{
Q_D(XNThread);
d->_eRunFreq = eRunFrequecy;
InitialFunPool();
}
// 获取线程运行优先级
const quint32 &XNThread::GetRunPriority()
{
Q_D(XNThread);
return d->_uPriority;
}
// 设置线程运行优先级
void XNThread::SetRunPriority(const quint32 &uRunPriority)
{
Q_D(XNThread);
d->_uPriority = uRunPriority;
}
// 获取线程CPU亲和性掩码
const quint32 &XNThread::GetCPUAffinity()
{
Q_D(XNThread);
return d->_uAffinity;
}
// 设置线程CPU亲和性掩码
void XNThread::SetCPUAffinity(const quint32 &uCPUAffinity)
{
Q_D(XNThread);
d->_uAffinity = uCPUAffinity;
}
// 设置线程运行间隔
void XNThread::SetRunInter(const double &dRunInter)
{
Q_D(XNThread);
d->pinfo.period_ns = dRunInter;
d->_setFreq = 1.0E9 / dRunInter;
}
// 获取线程运行间隔
void XNThread::OnSetStartTime(const timespec &startTime)
{
Q_D(XNThread);
d->pinfo.next_period = startTime;
d->_lastRunTime = startTime;
}
// 执行线程CPU亲和性设置
bool XNThread::OnSetCPUAffinity()
{
Q_D(XNThread);
cpu_set_t mask;
CPU_ZERO(&mask);
int cpuNum = sysconf(_SC_NPROCESSORS_CONF);
for (int i = 0; i < cpuNum; i++) {
if (((d->_uAffinity >> i) & 1) == 1)
CPU_SET(i, &mask);
}
if (pthread_setaffinity_np(d->thread, sizeof(mask), &mask) == -1) {
LOG_WARNING("0x2216 线程: %1 设置CPU亲和性失败!", objectName());
return false;
}
return true;
}
// 线程主执行函数
void *XNThread::ThreadFunction(void *args)
{
// 获取创建线程类的私有变量结构体指针
XNThreadPrivate *temp = ((XNThread *)args)->d_ptr;
// 获取当前时间
// clock_gettime(CLOCK_MONOTONIC, &(temp->pinfo.next_period));
while (1) {
// 加锁保护线程状态
pthread_mutex_lock(&temp->_mtx);
// 当处于notstart或suspend时准备挂起
while (temp->_eRunStatus == RunStatus::NotStart
|| temp->_eRunStatus == RunStatus::Suspend) {
// 如果挂起时切换为终止,则直接跳出
if (temp->_eRunStatus == RunStatus::Aborted
|| temp->_eRunStatus == RunStatus::Finished) {
pthread_mutex_unlock(&temp->_mtx);
break;
}
// 挂起线程
pthread_cond_wait(&temp->_cond, &temp->_mtx);
}
// 如果为finished直接结束线程
if (temp->_eRunStatus == RunStatus::Finished) {
pthread_mutex_unlock(&temp->_mtx);
break;
}
// 解锁
pthread_mutex_unlock(&temp->_mtx);
// 任务执行
auto &funMap = temp->_funVec[temp->_RunPosition++];
for (auto &funv : funMap) {
for (auto &fun : funv) {
fun();
}
}
temp->_RunPosition %= temp->_funVec.size();
// 如果为abort等待任务执行完成再结束
pthread_mutex_lock(&temp->_mtx);
if (temp->_eRunStatus == RunStatus::Aborted) {
pthread_mutex_unlock(&temp->_mtx);
break;
}
pthread_mutex_unlock(&temp->_mtx);
// 填写DDS主题数据
quint32 setFreq = (1.0E9 / temp->pinfo.period_ns) < 1.0 ? 1 : (quint32)temp->_setFreq;
if (temp->writer != nullptr && temp->count > 0 && temp->count % setFreq == 0) {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double seconds = (double)(now.tv_sec - temp->_lastRunTime.tv_sec)
+ (double)(now.tv_nsec - temp->_lastRunTime.tv_nsec) / 1.0E9;
XNSim::XNSimStatus::XNThreadStatus threadStatus;
threadStatus.XNThreadName(temp->q_ptr->objectName().toStdString());
threadStatus.XNThreadID(pthread_self());
threadStatus.XNThreadSt((quint32)temp->_eRunStatus);
threadStatus.XNThreadAff(temp->_uAffinity);
threadStatus.XNThreadPro(temp->_uPriority);
threadStatus.XNThRunCnt(temp->count);
threadStatus.XNThSetFreq(temp->_setFreq);
threadStatus.XNThCurFreq(temp->_setFreq / seconds);
temp->writer->write(&threadStatus);
temp->_lastRunTime = now;
LOG_DEBUG("Thread: %1 Write DDS! SetFreq: %2 Hz, CurFreq: %3 Hz",
temp->q_ptr->objectName(), temp->_setFreq, temp->_setFreq / seconds);
}
temp->count++;
// 睡眠时间步进
temp->pinfo.next_period.tv_nsec += temp->pinfo.period_ns;
// 睡眠时间整理
while (temp->pinfo.next_period.tv_nsec >= 1000000000) {
temp->pinfo.next_period.tv_sec++;
temp->pinfo.next_period.tv_nsec -= 1000000000;
}
// 执行纳秒睡眠
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &temp->pinfo.next_period, NULL);
}
return nullptr;
}
// 初始化线程调度表
void XNThread::InitialFunPool()
{
Q_D(XNThread);
// 设置循环表长度
switch (d->_eRunFreq) {
case FreqLevel::BaseFreq:
d->_funVec.resize(32);
break;
case FreqLevel::HalfFreq:
d->_funVec.resize(16);
break;
case FreqLevel::QuarterFreq:
d->_funVec.resize(8);
break;
case FreqLevel::EighthFreq:
d->_funVec.resize(4);
break;
case FreqLevel::SixteenthFreq:
d->_funVec.resize(2);
break;
default:
d->_funVec.resize(1);
break;
}
}
// 获取线程ID
const quint32 &XNThread::GetThreadID()
{
Q_D(XNThread);
return d->_threadID;
}
// 设置线程ID
void XNThread::SetThreadID(const quint32 &threadID)
{
Q_D(XNThread);
d->_threadID = threadID;
}