XNSim/XNCore/XNThread.cpp

461 lines
10 KiB
C++
Executable File

/**
* @file XNThread.cpp
* @author jinchao
* @brief 线程类源文件
* @version 1.0
* @date 2024-11-07
*
* @copyright Copyright (c) 2024 XN
*
*/
#include "XNLogger.h"
#include "XNThread.h"
#include "XNThread_p.h"
#include "XNFramework.h"
#include "XNThreadManager.h"
#include "XNDDSManager.h"
#include "XNIDL/XNSimStatusPubSubTypes.hpp"
XNThread::XNThread(std::string name, double freq, uint32_t priority, uint32_t CPUAff)
: XNObject(new XNThreadPrivate())
{
SetObjectName(name);
T_D();
d->_uPriority = priority;
d->_uAffinity = CPUAff;
d->pinfo.period_ns = 1.0E9 / freq;
d->_setFreq = freq;
d->_funVec.resize(32);
//InitialFunPool();
}
// 默认析构函数
XNThread::~XNThread(void)
{
}
XNFrameworkPtr XNThread::GetFramework()
{
T_D();
return d->_framework;
}
void XNThread::SetFramework(XNFrameworkPtr framework)
{
T_D();
d->_framework = framework;
}
// 初始化函数
bool XNThread::Initialize()
{
T_D();
int ret;
// 初始化线程参数
ret = pthread_attr_init(&(d->attr));
if (ret) {
LOG_ERROR("0x2210 Thread: %1 Initialize Attribute Failed!", GetObjectName());
return false;
}
// 设置线程栈空间大小
ret = pthread_attr_setstacksize(&(d->attr), PTHREAD_STACK_MIN);
if (ret) {
LOG_ERROR("0x2211 Thread: %1 Set Stack Space Failed!", GetObjectName());
return false;
}
// 设置线程调度策略
ret = pthread_attr_setschedpolicy(&(d->attr), SCHED_FIFO);
if (ret) {
LOG_ERROR("0x2212 Thread: %1 Set Scheduling Policy Failed!", GetObjectName());
return false;
}
// 设置线程优先级
d->param.sched_priority = d->_uPriority;
ret = pthread_attr_setschedparam(&(d->attr), &d->param);
if (ret) {
LOG_ERROR("0x2213 Thread: %1 Set Priority Failed!", GetObjectName());
return false;
}
// 设置调度器继承
ret = pthread_attr_setinheritsched(&(d->attr), PTHREAD_EXPLICIT_SCHED);
if (ret) {
LOG_ERROR("0x2214 Thread: %1 Set Scheduler Inheritance Failed!", GetObjectName());
return false;
}
// 线程创建
ret = pthread_create(&d->thread, &d->attr, ThreadFunction, this);
if (ret) {
LOG_ERROR("0x2215 Thread: %1 Create Failed!", GetObjectName());
return false;
}
// 设置亲和性
if (!OnSetCPUAffinity()) {
return false;
}
XNFrameworkPtr framework = GetFramework();
if (!framework) {
LOG_WARNING("0x2216 Thread: %1 get Framework Failed!", GetObjectName());
return true;
}
XNDDSManagerPtr ddsManager = framework->GetDDSManager();
if (!ddsManager) {
LOG_WARNING("0x2216 Thread: %1 get DDSManager Failed!", GetObjectName());
return true;
}
d->writer = ddsManager->RegisterPublisher<XNSim::XNSimStatus::XNThreadStatusPubSubType>(
"XNSim::XNSimStatus::XNThreadStatus", d->_threadID);
if (d->writer == nullptr) {
LOG_WARNING("0x2217 Thread: %1 get DDS Writer Failed!", GetObjectName());
return true;
}
LOG_INFO("Thread: %1 is prepared!", GetObjectName());
return true;
}
// 仿真控制
void XNThread::SimControl(uint32_t objectId, SimControlCmd cmd)
{
T_D();
if (objectId == 0) {
switch (cmd) {
case SimControlCmd::Start:
Start();
break;
case SimControlCmd::Suspend:
Pause();
break;
case SimControlCmd::Continue:
Continue();
break;
case SimControlCmd::Abort:
Stop(true);
break;
}
}
}
// 开始执行
void XNThread::Start()
{
T_D();
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
if (d->_eRunStatus == RunStatus::NotStart || d->_eRunStatus == RunStatus::Suspend) {
d->_eRunStatus = RunStatus::Runing;
}
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
LOG_INFO("Thread: %1 Start!", GetObjectName());
}
// 暂停执行
void XNThread::Pause()
{
T_D();
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
if (d->_eRunStatus == RunStatus::Runing) {
d->_eRunStatus = RunStatus::Suspend;
}
pthread_mutex_unlock(&d->_mtx);
LOG_INFO("Thread: %1 Pause!", GetObjectName());
}
// 继续执行
void XNThread::Continue()
{
T_D();
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
if (d->_eRunStatus == RunStatus::Suspend) {
d->_eRunStatus = RunStatus::Runing;
}
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
}
// 停止执行
void XNThread::Stop(bool force)
{
T_D();
if (force) {
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
d->_eRunStatus = RunStatus::Aborted;
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
Join();
} else {
pthread_mutex_lock(&d->_mtx);
// 设置运行状态
d->_eRunStatus = RunStatus::Finished;
pthread_cond_signal(&d->_cond);
pthread_mutex_unlock(&d->_mtx);
Join();
}
LOG_INFO("Thread: %1 Stop!", GetObjectName());
}
// 加入线程
void XNThread::Join()
{
T_D();
pthread_join(d->thread, NULL);
}
// 分离线程
void XNThread::Detach()
{
T_D();
pthread_detach(d->thread);
}
// 获取线程运行状态
RunStatus XNThread::GetRunStatus()
{
T_D();
return d->_eRunStatus;
}
// 向线程添加周期性函数
void XNThread::AddFunction(XNCallBack fun, FreqLevel freq, uint32_t pos, uint32_t priorty)
{
T_D();
for (int i = 0; i < d->_funVec.size();) {
if (i + pos >= d->_funVec.size())
break;
d->_funVec[i + pos][priorty].push_back(fun);
switch (freq) {
case FreqLevel::BaseFreq:
i++;
break;
case FreqLevel::HalfFreq:
i += 2;
break;
case FreqLevel::QuarterFreq:
i += 4;
break;
case FreqLevel::EighthFreq:
i += 8;
break;
case FreqLevel::SixteenthFreq:
i += 16;
break;
case FreqLevel::ThirtyTwothFreq:
i += 32;
break;
default:
i += 32;
break;
}
}
}
// 获取线程运行频率
const double &XNThread::GetRunFrequecy()
{
T_D();
return d->_setFreq;
}
// 设置线程运行频率
void XNThread::SetRunFrequecy(const double &dRunFrequecy)
{
T_D();
d->_setFreq = dRunFrequecy;
d->pinfo.period_ns = 1.0E9 / dRunFrequecy;
}
// 获取线程运行优先级
const uint32_t &XNThread::GetRunPriority()
{
T_D();
return d->_uPriority;
}
// 设置线程运行优先级
void XNThread::SetRunPriority(const uint32_t &uRunPriority)
{
T_D();
d->_uPriority = uRunPriority;
}
// 获取线程CPU亲和性掩码
const uint32_t &XNThread::GetCPUAffinity()
{
T_D();
return d->_uAffinity;
}
// 设置线程CPU亲和性掩码
void XNThread::SetCPUAffinity(const uint32_t &uCPUAffinity)
{
T_D();
d->_uAffinity = uCPUAffinity;
}
// 获取线程运行间隔
void XNThread::SetStartTime(const timespec &startTime)
{
T_D();
d->pinfo.next_period = startTime;
d->_lastRunTime = startTime;
}
// 执行线程CPU亲和性设置
bool XNThread::OnSetCPUAffinity()
{
T_D();
cpu_set_t mask;
CPU_ZERO(&mask);
int cpuNum = sysconf(_SC_NPROCESSORS_CONF);
for (int i = 0; i < cpuNum; i++) {
if (((d->_uAffinity >> i) & 1) == 1)
CPU_SET(i, &mask);
}
if (pthread_setaffinity_np(d->thread, sizeof(mask), &mask) == -1) {
LOG_WARNING("0x2216 线程: %1 设置CPU亲和性失败!", GetObjectName());
return false;
}
return true;
}
// 线程主执行函数
void *XNThread::ThreadFunction(void *args)
{
// 获取创建线程类的私有变量结构体指针
ThisType *thisPtr = (ThisType *)args;
PrivateType *temp = thisPtr->GetPP();
// 获取当前时间
// clock_gettime(CLOCK_MONOTONIC, &(temp->pinfo.next_period));
while (1) {
// 加锁保护线程状态
pthread_mutex_lock(&temp->_mtx);
// 当处于notstart或suspend时准备挂起
while (temp->_eRunStatus == RunStatus::NotStart
|| temp->_eRunStatus == RunStatus::Suspend) {
// 如果挂起时切换为终止,则直接跳出
if (temp->_eRunStatus == RunStatus::Aborted
|| temp->_eRunStatus == RunStatus::Finished) {
pthread_mutex_unlock(&temp->_mtx);
break;
}
// 挂起线程
pthread_cond_wait(&temp->_cond, &temp->_mtx);
}
// 如果为finished直接结束线程
if (temp->_eRunStatus == RunStatus::Finished) {
pthread_mutex_unlock(&temp->_mtx);
break;
}
// 解锁
pthread_mutex_unlock(&temp->_mtx);
// 任务执行
auto &funMap = temp->_funVec[temp->_RunPosition++];
for (auto &funv : funMap) {
for (auto &fun : funv.second) {
fun();
}
}
temp->_RunPosition %= temp->_funVec.size();
// 如果为abort等待任务执行完成再结束
pthread_mutex_lock(&temp->_mtx);
if (temp->_eRunStatus == RunStatus::Aborted) {
pthread_mutex_unlock(&temp->_mtx);
break;
}
pthread_mutex_unlock(&temp->_mtx);
// 填写DDS主题数据
uint32_t setFreq = (1.0E9 / temp->pinfo.period_ns) < 1.0 ? 1 : (uint32_t)temp->_setFreq;
if (temp->writer != nullptr && temp->count > 0 && temp->count % setFreq == 0) {
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double seconds = (double)(now.tv_sec - temp->_lastRunTime.tv_sec)
+ (double)(now.tv_nsec - temp->_lastRunTime.tv_nsec) / 1.0E9;
XNSim::XNSimStatus::XNThreadStatus threadStatus;
threadStatus.XNThreadName(thisPtr->GetObjectName());
threadStatus.XNThreadID(pthread_self());
threadStatus.XNThreadSt((uint32_t)temp->_eRunStatus);
threadStatus.XNThreadAff(temp->_uAffinity);
threadStatus.XNThreadPro(temp->_uPriority);
threadStatus.XNThRunCnt(temp->count);
threadStatus.XNThSetFreq(temp->_setFreq);
threadStatus.XNThCurFreq(temp->_setFreq / seconds);
temp->writer->write(&threadStatus);
temp->_lastRunTime = now;
LOG_DEBUG("Thread: %1 Write DDS! SetFreq: %2 Hz, CurFreq: %3 Hz",
thisPtr->GetObjectName(), temp->_setFreq, temp->_setFreq / seconds);
}
temp->count++;
// 睡眠时间步进
temp->pinfo.next_period.tv_nsec += temp->pinfo.period_ns;
// 睡眠时间整理
while (temp->pinfo.next_period.tv_nsec >= 1000000000) {
temp->pinfo.next_period.tv_sec++;
temp->pinfo.next_period.tv_nsec -= 1000000000;
}
// 执行纳秒睡眠
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &temp->pinfo.next_period, NULL);
}
return nullptr;
}
// // 初始化线程调度表
// void XNThread::InitialFunPool()
// {
// T_D();
// // 设置循环表长度
// switch (d->_eRunFreq) {
// case FreqLevel::BaseFreq:
// d->_funVec.resize(32);
// break;
// case FreqLevel::HalfFreq:
// d->_funVec.resize(16);
// break;
// case FreqLevel::QuarterFreq:
// d->_funVec.resize(8);
// break;
// case FreqLevel::EighthFreq:
// d->_funVec.resize(4);
// break;
// case FreqLevel::SixteenthFreq:
// d->_funVec.resize(2);
// break;
// default:
// d->_funVec.resize(1);
// break;
// }
// }
// 获取线程ID
const uint32_t &XNThread::GetThreadID()
{
T_D();
return d->_threadID;
}
// 设置线程ID
void XNThread::SetThreadID(const uint32_t &threadID)
{
T_D();
d->_threadID = threadID;
}