diff --git a/src/MeasureServiceProtocol.h b/src/MeasureServiceProtocol.h new file mode 100644 index 0000000..ecfb544 --- /dev/null +++ b/src/MeasureServiceProtocol.h @@ -0,0 +1,34 @@ +#ifndef MEASURESERVICEPROTOCOL_H +#define MEASURESERVICEPROTOCOL_H + +#include + +namespace Protocol { +// 固定协议头长度:8字节 +const int HEAD_SIZE = 8; + +// 打包数据:添加长度头 +inline QByteArray PackData(const QByteArray &data) { + QByteArray buffer; + buffer.resize(HEAD_SIZE); + // 前4字节 = 数据总长度 + qint32 data_len = data.size(); + memcpy(buffer.data(), &data_len, 4); + // 后4字节保留 + qint32 reserve = 0xFFFFFFFF; + memcpy(buffer.data() + 4, &reserve, 4); + buffer.append(data); + return buffer; +} + +// 解包:从头部获取数据长度 +inline qint32 UnpackDataLen(const QByteArray &head) { + if (head.size() < HEAD_SIZE) + return 0; + qint32 len = 0; + memcpy(&len, head.constData(), 4); + return len; +} +} + +#endif // MEASURESERVICEPROTOCOL_H diff --git a/src/MeasureTask.cpp b/src/MeasureTask.cpp index c6fcb67..84e2480 100644 --- a/src/MeasureTask.cpp +++ b/src/MeasureTask.cpp @@ -1,88 +1,120 @@ #include "MeasureTask.h" -#include #include "MeasureDeviceController.h" +#include "MeasureServiceProtocol.h" +#include "QsLogManage.h" +#include +#include #include #include #include -#include -#include "QsLogManage.h" #include #include +#include #include -MeasureTask::MeasureTask(int socketDescriptor, QObject *parent) - : QThread(parent), socketDescriptor(socketDescriptor) +MeasureTask::MeasureTask(int socketDescriptor, QObject* parent) + : QThread(parent) + , _socket_descriptor(socketDescriptor) + , _tcp_socket(nullptr) + , _requst_buffer(new QByteArray) + , _requst_data_len(0) { } void MeasureTask::run() { - QTcpSocket tcp_socket; - if (!tcp_socket.setSocketDescriptor(socketDescriptor)) { - emit errorOccurred(tcp_socket.errorString()); - QLOG_DEBUG() << tcp_socket.errorString(); + // 在子线程中创建套接字(关键:必须在当前线程创建) + _tcp_socket = new QTcpSocket; + // 设置客户端套接字描述符(绑定连接) + if (!_tcp_socket->setSocketDescriptor(_socket_descriptor)) { + QLOG_ERROR() << QStringLiteral(u"套接字初始化失败:") << _tcp_socket->errorString(); return; } - tcp_socket.setReadBufferSize(1024 * 1024); - QByteArray buffer; - if ( tcp_socket.waitForReadyRead(500) ) { - while (tcp_socket.bytesAvailable() < 2 ) { - if ( tcp_socket.waitForReadyRead(100) ) { - QByteArray len_buf = tcp_socket.read(2); - quint16 total_len = (uchar)len_buf[0] | ((uchar)len_buf[1] << 8); - while (buffer.size() < total_len) { - if (tcp_socket.waitForReadyRead(500)) { - buffer.append(tcp_socket.readAll()); - } - } - } + QLOG_INFO() << QStringLiteral(u"新客户端接入:") << _tcp_socket->peerAddress().toString() + << QStringLiteral(u"端口:") << _tcp_socket->peerPort() + << QStringLiteral(u"线程ID:") << QThread::currentThreadId(); + // 绑定信号槽(长连接核心:持续监听数据/断开) + connect(_tcp_socket, &QTcpSocket::readyRead, this, &MeasureTask::onClientRequstData); + connect(_tcp_socket, &QTcpSocket::disconnected, this, &MeasureTask::onClientDisconnected); + connect(_tcp_socket, QOverload::of(&QTcpSocket::error), this, &MeasureTask::onSocketError); + // 开启线程事件循环(保持长连接,不退出线程) + exec(); + // 线程退出后清理资源 + _tcp_socket->close(); + _tcp_socket->deleteLater(); +} + +void MeasureTask::onClientRequstData() +{ + _requst_buffer->append(_tcp_socket->readAll()); + while (true) { + // 读头 + if (_requst_data_len == 0) { + if (_requst_buffer->size() < Protocol::HEAD_SIZE) + break; + QByteArray head = _requst_buffer->left(Protocol::HEAD_SIZE); + _requst_data_len = Protocol::UnpackDataLen(head); + _requst_buffer->remove(0, Protocol::HEAD_SIZE); } - } - if ( buffer.size() ) { - QLOG_INFO() << QStringLiteral(u"请求数据长度:") << buffer.size(); - QDataStream requst_data_stream(buffer); - QString cmd_type, device_guid, cmd_data; - requst_data_stream >> cmd_type >> device_guid; - if ( cmd_type == "START" ) { - requst_data_stream >> cmd_data; - QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data; - processStartMeasureCmd(&tcp_socket, device_guid, cmd_data); - } else if (cmd_type == "STOP") { - QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid; - processStopMeasureCmd(&tcp_socket, device_guid); - } else if (cmd_type == "SET") { - requst_data_stream >> cmd_data; - QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data; - processSetDeviceMeasureConfigParamsCmd(&tcp_socket, device_guid, cmd_data); - } else if (cmd_type == "CLEAR") { - QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid; - processClearDataCmd(&tcp_socket, device_guid); - } else if (cmd_type == "DEVICE") { - QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid; - processGetMeasureDeviceListCmd(&tcp_socket); + // 读满完整数据 + if (_requst_buffer->size() >= _requst_data_len) { + QByteArray requst_data = _requst_buffer->left(_requst_data_len); + _requst_buffer->remove(0, _requst_data_len); + QLOG_INFO() << QStringLiteral(u"接收请求数据长度%1").arg(requst_data.size()); + QThread* process_thread = QThread::create(&MeasureTask::processRequstData, this, requst_data); + connect(process_thread, &QThread::finished, process_thread, &QThread::deleteLater); + process_thread->start(); + _requst_data_len = 0; } else { - QByteArray replay_data; - QDataStream replay_data_stream(&replay_data, QIODevice::Append); - replay_data_stream << QString("UNKNOW") << false << QStringLiteral(u"未知请求"); - tcp_socket.write(replay_data); - tcp_socket.flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + break; } - } else { - QByteArray replay_data; - QDataStream replay_data_stream(&replay_data, QIODevice::Append); - replay_data_stream << QString("UNKNOW") << false << QStringLiteral(u"请求信息空异常"); - tcp_socket.write(replay_data); - tcp_socket.flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); - } - tcp_socket.disconnectFromHost(); - if (tcp_socket.state() != QTcpSocket::UnconnectedState) { - tcp_socket.waitForDisconnected(1000); } } -void MeasureTask::processStartMeasureCmd(QTcpSocket* socket, const QString &device_guid, const QString &cmd_data) +void MeasureTask::onClientDisconnected() +{ + QLOG_INFO() << QStringLiteral(u"客户端%1断开连接").arg(_tcp_socket->peerAddress().toString()); + quit(); +} + +void MeasureTask::onSocketError(QAbstractSocket::SocketError error) +{ + Q_UNUSED(error); + QLOG_INFO() << QStringLiteral(u"套接字错误:").arg(_tcp_socket->errorString()); + quit(); +} + +void MeasureTask::processRequstData(const QByteArray &requst_data) +{ + QDataStream requst_data_stream(requst_data); + QString cmd_type, device_guid, cmd_data; + requst_data_stream >> cmd_type >> device_guid; + if (cmd_type == "START") { + requst_data_stream >> cmd_data; + QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data; + processStartMeasureCmd(device_guid, cmd_data); + } else if (cmd_type == "STOP") { + QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid; + processStopMeasureCmd(device_guid); + } else if (cmd_type == "SET") { + requst_data_stream >> cmd_data; + QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data; + processSetDeviceMeasureConfigParamsCmd(device_guid, cmd_data); + } else if (cmd_type == "CLEAR") { + QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid; + processClearDataCmd(device_guid); + } else if (cmd_type == "DEVICE") { + QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid; + processGetMeasureDeviceListCmd(); + } else { + QByteArray replay_data; + QDataStream replay_data_stream(&replay_data, QIODevice::Append); + replay_data_stream << QString("UNKNOW") << false << QStringLiteral(u"未知请求"); + this->replayClient(replay_data); + } +} + +void MeasureTask::processStartMeasureCmd(const QString& device_guid, const QString& cmd_data) { QByteArray json_data = cmd_data.toUtf8(); QJsonDocument json_doc = QJsonDocument::fromJson(json_data); @@ -133,9 +165,7 @@ void MeasureTask::processStartMeasureCmd(QTcpSocket* socket, const QString &devi QDataStream replay_data_stream(&replay_data, QIODevice::Append); if (!errors.isEmpty()) { replay_data_stream << QString("START") << false << errors.join("\n"); - socket->write(replay_data); - socket->flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + this->replayClient(replay_data); } else { const QString& measure_data_gvf_filename = MeasureDeviceController::Instance()->GetMeasureGvfDataFilename(); QString replay_info; @@ -148,24 +178,20 @@ void MeasureTask::processStartMeasureCmd(QTcpSocket* socket, const QString &devi QByteArray replay_data; QDataStream replay_data_stream(&replay_data, QIODevice::Append); replay_data_stream << QString("START") << ok << replay_info; - socket->write(replay_data); - socket->flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + this->replayClient(replay_data); } } -void MeasureTask::processStopMeasureCmd(QTcpSocket* socket, const QString &device_guid) +void MeasureTask::processStopMeasureCmd(const QString& device_guid) { MeasureDeviceController::Instance()->StopMeasure(device_guid); QByteArray replay_data; QDataStream replay_data_stream(&replay_data, QIODevice::Append); replay_data_stream << QString("STOP") << true << QStringLiteral(u"停止测量完成"); - socket->write(replay_data); - socket->flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + this->replayClient(replay_data); } -void MeasureTask::processSetDeviceMeasureConfigParamsCmd(QTcpSocket* socket, const QString &device_guid, const QString &cmd_data) +void MeasureTask::processSetDeviceMeasureConfigParamsCmd(const QString& device_guid, const QString& cmd_data) { QByteArray json_data = cmd_data.toUtf8(); QJsonDocument json_doc = QJsonDocument::fromJson(json_data); @@ -206,23 +232,19 @@ void MeasureTask::processSetDeviceMeasureConfigParamsCmd(QTcpSocket* socket, con } else { replay_data_stream << QString("SET") << ok << QStringLiteral(u"设置测量参数完成"); } - socket->write(replay_data); - socket->flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + this->replayClient(replay_data); } -void MeasureTask::processClearDataCmd(QTcpSocket* socket, const QString &device_guid) +void MeasureTask::processClearDataCmd(const QString& device_guid) { MeasureDeviceController::Instance()->ClearData(device_guid); QByteArray replay_data; QDataStream replay_data_stream(&replay_data, QIODevice::Append); replay_data_stream << QString("CLEAR") << true << QStringLiteral(u"清除数据完成"); - socket->write(replay_data); - socket->flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + this->replayClient(replay_data); } -void MeasureTask::processGetMeasureDeviceListCmd(QTcpSocket* socket) +void MeasureTask::processGetMeasureDeviceListCmd() { QStringList device_list = MeasureDeviceController::Instance()->GetMeasureDeviceList(); bool ok = !device_list.isEmpty(); @@ -230,7 +252,7 @@ void MeasureTask::processGetMeasureDeviceListCmd(QTcpSocket* socket) QByteArray replay_data; QDataStream replay_data_stream(&replay_data, QIODevice::Append); replay_data_stream << QString("DEVICE"); - if ( ok ) { + if (ok) { replay_data_stream << ok << device_list.size(); foreach (const QString& device_id, device_list) { replay_data_stream << device_id; @@ -238,7 +260,17 @@ void MeasureTask::processGetMeasureDeviceListCmd(QTcpSocket* socket) } else { replay_data_stream << ok << QStringLiteral(u"测量设备未找到"); } - socket->write(replay_data); - socket->flush(); - QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data); + this->replayClient(replay_data); +} + +void MeasureTask::replayClient(const QByteArray &replay_data) +{ + if (!_tcp_socket->isOpen()) { + QLOG_WARN() << QStringLiteral(u"未连接,发送失败"); + return; + } + QByteArray replay_buf = Protocol::PackData(replay_data); + _tcp_socket->write(replay_buf); + _tcp_socket->flush(); + QLOG_DEBUG() << QStringLiteral(u"发送数据大小: %1 字节").arg(replay_buf.size()); } diff --git a/src/MeasureTask.h b/src/MeasureTask.h index fe83839..71792a1 100644 --- a/src/MeasureTask.h +++ b/src/MeasureTask.h @@ -13,18 +13,25 @@ public: void run() override; -private: - void processStartMeasureCmd(QTcpSocket* socket, const QString& device_guid, const QString& cmd_data); - void processStopMeasureCmd(QTcpSocket* socket, const QString& device_guid); - void processSetDeviceMeasureConfigParamsCmd(QTcpSocket* socket, const QString& device_guid, const QString& cmd_data); - void processClearDataCmd(QTcpSocket* socket, const QString& device_guid); - void processGetMeasureDeviceListCmd(QTcpSocket* socket); - -signals: - void errorOccurred(const QString &errorString); +private slots: + void onClientRequstData(); + void onClientDisconnected(); + void onSocketError(QAbstractSocket::SocketError error); private: - int socketDescriptor; + void processRequstData(const QByteArray& requst_data); + void processStartMeasureCmd(const QString& device_guid, const QString& cmd_data); + void processStopMeasureCmd(const QString& device_guid); + void processSetDeviceMeasureConfigParamsCmd(const QString& device_guid, const QString& cmd_data); + void processClearDataCmd( const QString& device_guid); + void processGetMeasureDeviceListCmd(); + void replayClient(const QByteArray& replay_data); + +private: + int _socket_descriptor; + QTcpSocket * _tcp_socket; + QByteArray * _requst_buffer; + qint32 _requst_data_len; }; #endif diff --git a/src/src.pro b/src/src.pro index ab3b1b4..9b7250d 100644 --- a/src/src.pro +++ b/src/src.pro @@ -28,7 +28,8 @@ SOURCES += \ HEADERS += \ $${PWD}/MeasureServer.h \ $${PWD}/MeasureTask.h \ - MeasureDeviceController.h + MeasureDeviceController.h \ + MeasureServiceProtocol.h DEFINES += ENABLE_DEBUG