EnergySpectrumAnalyer/src/DataProcessWorkPool.cpp

534 lines
20 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "DataProcessWorkPool.h"
#include "MeasureAnalysisProjectModel.h"
#include "GlobalDefine.h"
#include "csv.h"
#include <QDir>
#include <QThreadPool>
#include <algorithm>
#include <fstream>
#include <memory>
#include <queue>
#include <sstream>
#include <string>
#include <vector>
using namespace DataProcessWorkPool;
using namespace io;
void ParticleDataTask::SetAllChannelParticleDataFilename(const QString& all_channel_particle_data_filename)
{
this->_all_channel_particle_data_filename = all_channel_particle_data_filename;
}
void ParticleDataTask::SetFinishedNotifier(QObject* finished_notifier, const char* finished_process, const QString& project_name)
{
this->_finished_notifier = finished_notifier;
this->_finished_notifier_process = finished_process;
this->_project_name = project_name;
}
const QString& ParticleDataTask::GetAllChannelParticleDataFilename() const
{
return this->_all_channel_particle_data_filename;
}
const QString& ParticleDataTask::GetProjectName() const
{
return this->_project_name;
}
const char* ParticleDataTask::GetFinishedNotifierProcess() const
{
return this->_finished_notifier_process;
}
QObject* ParticleDataTask::GetFinishedNotifier() const
{
return this->_finished_notifier;
}
bool ParticleDataTask::IsValidSetWorkParameters() const
{
return (!GetAllChannelParticleDataFilename().isEmpty()) && (!GetProjectName().isEmpty());
}
void ParticleDataTask::StartTask()
{
QThreadPool::globalInstance()->start(this);
}
void ParticleDataTask::run()
{
if (!IsValidSetWorkParameters()) {
return;
}
if (!processEveryChannelParticleData()) {
return;
}
if ((GetFinishedNotifier() != nullptr) && (GetFinishedNotifierProcess() != nullptr)) {
QMetaObject::invokeMethod(_finished_notifier, _finished_notifier_process, Qt::QueuedConnection, Q_ARG(QString, _project_name));
}
}
void EveryChannelParticleDataSeparateTask::SetResultDataDir(const QString& result_data_dir)
{
this->_result_data_dir = result_data_dir;
}
const QString& EveryChannelParticleDataSeparateTask::GetResultDataDir() const
{
return this->_result_data_dir;
}
bool EveryChannelParticleDataSeparateTask::IsValidSetWorkParameters() const
{
return (!GetResultDataDir().isEmpty()) && ParticleDataTask::IsValidSetWorkParameters();
}
bool EveryChannelParticleDataSeparateTask::processEveryChannelParticleData()
{
bool ret_ok = true;
const QString& result_data_output_dir_path = GetResultDataDir();
QDir result_data_output_dir(result_data_output_dir_path);
result_data_output_dir.mkpath(result_data_output_dir_path);
const QString& all_channel_particle_data_filename = GetAllChannelParticleDataFilename();
QMap<uint, QString> particle_data_filename_list;
try {
QMap<uint, std::shared_ptr<std::ofstream>> ch_particle_data_of_list;
std::string board_id_str = QString(QStringLiteral(u"板卡号")).toStdString();
std::string channel_id_str = QString(QStringLiteral(u"通道号")).toStdString();
std::string address_str = QString(QStringLiteral(u"道址")).toStdString();
std::string time_str = QString(QStringLiteral(u"时间计数")).toStdString();
// 使用更灵活的方式处理CSV文件忽略额外列
io::CSVReader<
4,
io::trim_chars<' ', '\t'>,
io::double_quote_escape<',', '"'>,
io::throw_on_overflow,
io::empty_line_comment>
reader(QStrToSysPath(all_channel_particle_data_filename));
reader.read_header(io::ignore_extra_column, board_id_str, channel_id_str, address_str, time_str);
uint board_id;
uint channel_id;
uint address;
unsigned long long time;
while (reader.read_row(board_id, channel_id, address, time)) {
// 板卡和通道号计算,通道号 = 板卡号 * 4 + 通道号
int channel_num = (board_id) * 4 + (channel_id + 1);
QString particle_data_filename = result_data_output_dir.filePath(QString("ParticleData_ch_%1.csv").arg(channel_num));
if (!particle_data_filename_list.contains(channel_num)) {
particle_data_filename_list.insert(channel_num, particle_data_filename);
}
if (!ch_particle_data_of_list.contains(channel_num)) {
std::shared_ptr<std::ofstream> out(
new std::ofstream(QStrToSysPath(particle_data_filename), std::ios::out | std::ios::app),
[](std::ofstream* p) { p->close(); });
*out << QString(QStringLiteral(u"板卡号,通道号,道址,时间计数")).toStdString() << std::endl;
ch_particle_data_of_list.insert(channel_num, out);
}
auto ch_particle_data_of = ch_particle_data_of_list.value(channel_num);
*ch_particle_data_of << board_id << "," << channel_id << "," << address << "," << time << std::endl;
}
} catch (const std::runtime_error& e) {
QString error = QString(QStringLiteral(u"处理%1发生运行时异常:%2")).arg(all_channel_particle_data_filename).arg(e.what());
LOG_ERROR(error)
ret_ok = false;
} catch (const std::exception& e) {
QString error = QString(QStringLiteral(u"处理%1异常:%2")).arg(all_channel_particle_data_filename).arg(e.what());
LOG_ERROR(error)
ret_ok = false;
} catch (...) {
QString error = QString(QStringLiteral(u"处理%1未知异常.")).arg(all_channel_particle_data_filename);
LOG_ERROR(error)
ret_ok = false;
}
const QString& project_name = GetProjectName();
MeasureAnalysisProjectModel* project_model = MeasureAnalysisProjectModelList::GetProjectModel(project_name);
if (project_model == nullptr) {
ret_ok = false;
}
for (auto it = particle_data_filename_list.begin(); it != particle_data_filename_list.end(); ++it) {
project_model->SetChannelParticleDataFilename(it.key(), it.value());
}
return ret_ok;
}
void EveryChannelParticleCountDataTask::SetAllChannelCountResultDir(const QString& dir_path)
{
this->_all_ch_count_dir = dir_path;
}
const QString& EveryChannelParticleCountDataTask::GetAllChannelCountResultDir() const
{
return this->_all_ch_count_dir;
}
void EveryChannelParticleCountDataTask::SetEveryChannelCountResultDir(const QString& dir_path)
{
this->_every_ch_count_dir = dir_path;
}
const QString& EveryChannelParticleCountDataTask::GetEveryChannelCountResultDir() const
{
return this->_every_ch_count_dir;
}
bool EveryChannelParticleCountDataTask::IsValidSetWorkParameters() const
{
return (!GetAllChannelCountResultDir().isEmpty()) && (!GetEveryChannelCountResultDir().isEmpty()) && ParticleDataTask::IsValidSetWorkParameters();
}
bool EveryChannelParticleCountDataTask::processEveryChannelParticleData()
{
bool ret_ok = true;
const QString& all_ch_count_dir = GetAllChannelCountResultDir();
const QString& every_ch_count_dir = GetEveryChannelCountResultDir();
QDir all_ch_count_output_dir(all_ch_count_dir);
all_ch_count_output_dir.mkpath(all_ch_count_dir);
QDir every_ch_count_output_dir(every_ch_count_dir);
every_ch_count_output_dir.mkpath(every_ch_count_dir);
const QString& all_channel_particle_data_filename = GetAllChannelParticleDataFilename();
QMap<uint, QString> particle_count_filename_list;
QString all_channel_total_count_filename;
try {
// 统计每个通道的粒子计数(相同板卡号通道号相同道址)
QMap<uint, QMap<uint, uint>> channel_address_counts; // 通道号 -> 地址 -> 计数
// 统计所有通道的粒子计数(不同板卡号通道号相同道址)
QMap<uint, uint> all_channel_address_counts; // 地址 -> 计数
std::string board_id_str = QString(QStringLiteral(u"板卡号")).toStdString();
std::string channel_id_str = QString(QStringLiteral(u"通道号")).toStdString();
std::string address_str = QString(QStringLiteral(u"道址")).toStdString();
std::string time_str = QString(QStringLiteral(u"时间计数")).toStdString();
// 使用更灵活的方式处理CSV文件忽略额外列
io::CSVReader<
4,
io::trim_chars<' ', '\t'>,
io::double_quote_escape<',', '"'>,
io::throw_on_overflow,
io::empty_line_comment>
reader(QStrToSysPath(all_channel_particle_data_filename));
reader.read_header(io::ignore_extra_column, board_id_str, channel_id_str, address_str, time_str);
uint board_id;
uint channel_id;
uint address;
unsigned long long time;
while (reader.read_row(board_id, channel_id, address, time)) {
// 板卡和通道号计算,通道号 = 板卡号 * 4 + 通道号
int channel_num = (board_id) * 4 + (channel_id + 1);
// 统计每个通道的粒子计数
if (!channel_address_counts.contains(channel_num)) {
channel_address_counts[channel_num] = QMap<uint, uint>();
}
channel_address_counts[channel_num][address]++;
// 统计所有通道的粒子计数
all_channel_address_counts[address]++;
}
// 写入每个通道的粒子计数数据(优化:使用一次打开文件,批量写入)
QMap<uint, std::shared_ptr<std::ofstream>> channel_file_streams;
// 预创建所有通道的文件流
for (auto channel_it = channel_address_counts.begin(); channel_it != channel_address_counts.end(); ++channel_it) {
uint channel_num = channel_it.key();
QString count_data_filename = every_ch_count_output_dir.filePath(QString("ParticleCountData_ch_%1.csv").arg(channel_num));
particle_count_filename_list.insert(channel_num, count_data_filename);
// 创建文件流
std::shared_ptr<std::ofstream> out(new std::ofstream( QStrToSysPath(count_data_filename)));
channel_file_streams[channel_num] = out;
*out << QString(QStringLiteral(u"道址")).toStdString() << "," << QString(QStringLiteral(u"计数")).toStdString() << std::endl;
}
// 批量写入数据
for (auto channel_it = channel_address_counts.begin(); channel_it != channel_address_counts.end(); ++channel_it) {
uint channel_num = channel_it.key();
const QMap<uint, uint>& address_counts = channel_it.value();
auto out_stream = channel_file_streams[channel_num];
for (auto address_it = address_counts.begin(); address_it != address_counts.end(); ++address_it) {
uint address = address_it.key();
uint count = address_it.value();
*out_stream << address << "," << count << std::endl;
}
}
// 文件流会在shared_ptr析构时自动关闭
channel_file_streams.clear();
// 写入所有通道的粒子计数数据
all_channel_total_count_filename = all_ch_count_output_dir.filePath("AllChannelParticleTotalCountData.csv");
std::ofstream all_channel_out(QStrToSysPath(all_channel_total_count_filename));
all_channel_out << QString(QStringLiteral(u"道址")).toStdString() << "," << QString(QStringLiteral(u"计数")).toStdString() << std::endl;
for (auto address_it = all_channel_address_counts.begin(); address_it != all_channel_address_counts.end(); ++address_it) {
uint address = address_it.key();
uint count = address_it.value();
all_channel_out << address << "," << count << std::endl;
}
all_channel_out.close();
} catch (const std::runtime_error& e) {
QString error = QString(QStringLiteral(u"处理%1发生运行时异常:%2")).arg(all_channel_particle_data_filename).arg(e.what());
LOG_ERROR(error)
ret_ok = false;
} catch (const std::exception& e) {
QString error = QString(QStringLiteral(u"处理%1异常:%2")).arg(all_channel_particle_data_filename).arg(e.what());
LOG_ERROR(error)
ret_ok = false;
} catch (...) {
QString error = QString(QStringLiteral(u"处理%1未知异常.").arg(all_channel_particle_data_filename));
LOG_ERROR(error)
ret_ok = false;
}
const QString& project_name = GetProjectName();
MeasureAnalysisProjectModel* project_model = MeasureAnalysisProjectModelList::GetProjectModel(project_name);
if (project_model == nullptr) {
ret_ok = false;
} else {
// 更新项目模型中的通道粒子计数数据文件名
for (auto it = particle_count_filename_list.begin(); it != particle_count_filename_list.end(); ++it) {
project_model->SetChannelParticleCountDataFilename(it.key(), it.value());
}
// 更新项目模型中的所有通道粒子总计数数据文件名
project_model->SetAllChannelParticleTotalCountDataFilename(all_channel_total_count_filename);
}
return ret_ok;
}
////////////////////////////////////////////////////////////////////////////////////
void ParticleDataSortTask::SetSortedResultDir(const QString& sorted_result_dir)
{
this->_sorted_result_dir = sorted_result_dir;
}
const QString& ParticleDataSortTask::GetSortedResultDir() const
{
return this->_sorted_result_dir;
}
bool ParticleDataSortTask::IsValidSetWorkParameters() const
{
return (!GetSortedResultDir().isEmpty()) && ParticleDataTask::IsValidSetWorkParameters();
}
struct CsvRow {
uint board_id;
uint channel_id;
uint address;
unsigned long long time;
size_t chunk_index;
bool operator<(const CsvRow& other) const
{
return time > other.time;
}
};
std::vector<std::string> splitFile(const std::string& input_file, size_t chunk_size)
{
std::vector<std::string> chunks;
try {
std::string board_id_str = QString(QStringLiteral(u"板卡号")).toStdString();
std::string channel_id_str = QString(QStringLiteral(u"通道号")).toStdString();
std::string address_str = QString(QStringLiteral(u"道址")).toStdString();
std::string time_str = QString(QStringLiteral(u"时间计数")).toStdString();
io::CSVReader<
4,
io::trim_chars<' ', '\t'>,
io::double_quote_escape<',', '"'>,
io::throw_on_overflow,
io::empty_line_comment
> reader(input_file);
reader.read_header(io::ignore_extra_column, board_id_str, channel_id_str, address_str, time_str);
int chunkIndex = 0;
while (true) {
std::vector<CsvRow> rows;
size_t currentSize = 0;
uint board_id;
uint channel_id;
uint address;
unsigned long long time;
while (reader.read_row(board_id, channel_id, address, time)) {
CsvRow row;
row.board_id = board_id;
row.channel_id = channel_id;
row.address = address;
row.time = time;
// Estimate row size
currentSize += std::to_string(board_id).size() + std::to_string(channel_id).size() + std::to_string(address).size() + std::to_string(time).size() + 4; // +4 for commas
if (currentSize > chunk_size && !rows.empty()) {
break;
}
rows.push_back(row);
}
if (rows.empty())
break;
std::sort(rows.begin(), rows.end(), [](const CsvRow& a, const CsvRow& b) {
return a.time < b.time;
});
std::string chunkFile = input_file + ".chunk" + std::to_string(chunkIndex);
std::ofstream outFile(chunkFile);
for (const auto& row : rows) {
outFile << row.board_id << "," << row.channel_id << "," << row.address << "," << row.time << "\n";
}
outFile.close();
chunks.push_back(chunkFile);
chunkIndex++;
}
} catch (const std::exception& e) {
// Handle exception
}
return chunks;
}
void mergeChunks(const std::vector<std::string>& chunks, const std::string& output_file)
{
std::vector<std::unique_ptr<io::CSVReader<4>>> chunkReaders;
std::priority_queue<CsvRow> minHeap;
for (const auto& chunk : chunks) {
auto reader = std::make_unique<io::CSVReader<4>>(chunk);
chunkReaders.push_back(std::move(reader));
}
for (size_t i = 0; i < chunkReaders.size(); i++) {
uint board_id;
uint channel_id;
uint address;
unsigned long long time;
if (chunkReaders[i]->read_row(board_id, channel_id, address, time)) {
CsvRow row;
row.board_id = board_id;
row.channel_id = channel_id;
row.address = address;
row.time = time;
row.chunk_index = i;
minHeap.push(row);
}
}
std::string board_id_str = QString(QStringLiteral(u"板卡号")).toStdString();
std::string channel_id_str = QString(QStringLiteral(u"通道号")).toStdString();
std::string address_str = QString(QStringLiteral(u"道址")).toStdString();
std::string time_str = QString(QStringLiteral(u"时间计数")).toStdString();
std::ofstream outFile(output_file);
outFile << board_id_str << "," << channel_id_str << "," << address_str << "," << time_str << "\n";
while (!minHeap.empty()) {
CsvRow current = minHeap.top();
minHeap.pop();
outFile << current.board_id << "," << current.channel_id << "," << current.address << "," << current.time << "\n";
size_t chunk_index = current.chunk_index;
if (chunkReaders[chunk_index]) {
uint board_id;
uint channel_id;
uint address;
unsigned long long time;
if (chunkReaders[chunk_index]->read_row(board_id, channel_id, address, time)) {
CsvRow row;
row.board_id = board_id;
row.channel_id = channel_id;
row.address = address;
row.time = time;
row.chunk_index = chunk_index;
minHeap.push(row);
} else {
chunkReaders[chunk_index].reset();
}
}
}
outFile.close();
for (const auto& chunk : chunks) {
std::remove(chunk.c_str());
}
}
bool ParticleDataSortTask::processEveryChannelParticleData()
{
bool ret_ok = true;
const QString& sorted_result_dir = GetSortedResultDir();
QDir sorted_result_output_dir(sorted_result_dir);
sorted_result_output_dir.mkpath(sorted_result_dir);
const QString& all_channel_particle_data_filename = GetAllChannelParticleDataFilename();
QString sorted_output_filename = sorted_result_output_dir.filePath("SortedParticleData.csv");
try {
const size_t CHUNK_SIZE = 100 * 1024 * 1024; // 100MB chunks
std::vector<std::string> chunks = splitFile(QStrToSysPath(all_channel_particle_data_filename), CHUNK_SIZE);
if (chunks.empty()) {
std::ifstream in_file(QStrToSysPath(all_channel_particle_data_filename));
std::ofstream out_file(QStrToSysPath(sorted_output_filename));
std::string line;
while (std::getline(in_file, line)) {
out_file << line << "\n";
}
in_file.close();
out_file.close();
} else {
mergeChunks(chunks, QStrToSysPath(sorted_output_filename));
}
} catch (const std::exception& e) {
QString error = QString(QStringLiteral(u"处理%1异常:%2")).arg(all_channel_particle_data_filename).arg(e.what());
LOG_ERROR(error)
ret_ok = false;
} catch (...) {
QString error = QString(QStringLiteral(u"处理%1未知异常.")).arg(all_channel_particle_data_filename);
LOG_ERROR(error)
ret_ok = false;
}
const QString& project_name = GetProjectName();
MeasureAnalysisProjectModel* project_model = MeasureAnalysisProjectModelList::GetProjectModel(project_name);
if (project_model == nullptr) {
ret_ok = false;
} else {
project_model->SetSortedParticleDataFilename(sorted_output_filename);
}
return ret_ok;
}