修改通信为长连接
This commit is contained in:
parent
871de1e005
commit
ef452e95e3
34
src/MeasureServiceProtocol.h
Normal file
34
src/MeasureServiceProtocol.h
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
#ifndef MEASURESERVICEPROTOCOL_H
|
||||
#define MEASURESERVICEPROTOCOL_H
|
||||
|
||||
#include <QByteArray>
|
||||
|
||||
namespace Protocol {
|
||||
// 固定协议头长度:8字节
|
||||
const int HEAD_SIZE = 8;
|
||||
|
||||
// 打包数据:添加长度头
|
||||
inline QByteArray PackData(const QByteArray &data) {
|
||||
QByteArray buffer;
|
||||
buffer.resize(HEAD_SIZE);
|
||||
// 前4字节 = 数据总长度
|
||||
qint32 data_len = data.size();
|
||||
memcpy(buffer.data(), &data_len, 4);
|
||||
// 后4字节保留
|
||||
qint32 reserve = 0xFFFFFFFF;
|
||||
memcpy(buffer.data() + 4, &reserve, 4);
|
||||
buffer.append(data);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
// 解包:从头部获取数据长度
|
||||
inline qint32 UnpackDataLen(const QByteArray &head) {
|
||||
if (head.size() < HEAD_SIZE)
|
||||
return 0;
|
||||
qint32 len = 0;
|
||||
memcpy(&len, head.constData(), 4);
|
||||
return len;
|
||||
}
|
||||
}
|
||||
|
||||
#endif // MEASURESERVICEPROTOCOL_H
|
||||
|
|
@ -1,88 +1,120 @@
|
|||
#include "MeasureTask.h"
|
||||
#include <QDataStream>
|
||||
#include "MeasureDeviceController.h"
|
||||
#include "MeasureServiceProtocol.h"
|
||||
#include "QsLogManage.h"
|
||||
#include <QDataStream>
|
||||
#include <QFileInfo>
|
||||
#include <QJsonDocument>
|
||||
#include <QJsonObject>
|
||||
#include <QSqlDatabase>
|
||||
#include <QFileInfo>
|
||||
#include "QsLogManage.h"
|
||||
#include <QString>
|
||||
#include <QThread>
|
||||
#include <QHostAddress>
|
||||
#include <QDebug>
|
||||
|
||||
MeasureTask::MeasureTask(int socketDescriptor, QObject *parent)
|
||||
: QThread(parent), socketDescriptor(socketDescriptor)
|
||||
MeasureTask::MeasureTask(int socketDescriptor, QObject* parent)
|
||||
: QThread(parent)
|
||||
, _socket_descriptor(socketDescriptor)
|
||||
, _tcp_socket(nullptr)
|
||||
, _requst_buffer(new QByteArray)
|
||||
, _requst_data_len(0)
|
||||
{
|
||||
}
|
||||
|
||||
void MeasureTask::run()
|
||||
{
|
||||
QTcpSocket tcp_socket;
|
||||
if (!tcp_socket.setSocketDescriptor(socketDescriptor)) {
|
||||
emit errorOccurred(tcp_socket.errorString());
|
||||
QLOG_DEBUG() << tcp_socket.errorString();
|
||||
// 在子线程中创建套接字(关键:必须在当前线程创建)
|
||||
_tcp_socket = new QTcpSocket;
|
||||
// 设置客户端套接字描述符(绑定连接)
|
||||
if (!_tcp_socket->setSocketDescriptor(_socket_descriptor)) {
|
||||
QLOG_ERROR() << QStringLiteral(u"套接字初始化失败:") << _tcp_socket->errorString();
|
||||
return;
|
||||
}
|
||||
tcp_socket.setReadBufferSize(1024 * 1024);
|
||||
QByteArray buffer;
|
||||
if ( tcp_socket.waitForReadyRead(500) ) {
|
||||
while (tcp_socket.bytesAvailable() < 2 ) {
|
||||
if ( tcp_socket.waitForReadyRead(100) ) {
|
||||
QByteArray len_buf = tcp_socket.read(2);
|
||||
quint16 total_len = (uchar)len_buf[0] | ((uchar)len_buf[1] << 8);
|
||||
while (buffer.size() < total_len) {
|
||||
if (tcp_socket.waitForReadyRead(500)) {
|
||||
buffer.append(tcp_socket.readAll());
|
||||
}
|
||||
}
|
||||
}
|
||||
QLOG_INFO() << QStringLiteral(u"新客户端接入:") << _tcp_socket->peerAddress().toString()
|
||||
<< QStringLiteral(u"端口:") << _tcp_socket->peerPort()
|
||||
<< QStringLiteral(u"线程ID:") << QThread::currentThreadId();
|
||||
// 绑定信号槽(长连接核心:持续监听数据/断开)
|
||||
connect(_tcp_socket, &QTcpSocket::readyRead, this, &MeasureTask::onClientRequstData);
|
||||
connect(_tcp_socket, &QTcpSocket::disconnected, this, &MeasureTask::onClientDisconnected);
|
||||
connect(_tcp_socket, QOverload<QAbstractSocket::SocketError>::of(&QTcpSocket::error), this, &MeasureTask::onSocketError);
|
||||
// 开启线程事件循环(保持长连接,不退出线程)
|
||||
exec();
|
||||
// 线程退出后清理资源
|
||||
_tcp_socket->close();
|
||||
_tcp_socket->deleteLater();
|
||||
}
|
||||
|
||||
void MeasureTask::onClientRequstData()
|
||||
{
|
||||
_requst_buffer->append(_tcp_socket->readAll());
|
||||
while (true) {
|
||||
// 读头
|
||||
if (_requst_data_len == 0) {
|
||||
if (_requst_buffer->size() < Protocol::HEAD_SIZE)
|
||||
break;
|
||||
QByteArray head = _requst_buffer->left(Protocol::HEAD_SIZE);
|
||||
_requst_data_len = Protocol::UnpackDataLen(head);
|
||||
_requst_buffer->remove(0, Protocol::HEAD_SIZE);
|
||||
}
|
||||
}
|
||||
if ( buffer.size() ) {
|
||||
QLOG_INFO() << QStringLiteral(u"请求数据长度:") << buffer.size();
|
||||
QDataStream requst_data_stream(buffer);
|
||||
QString cmd_type, device_guid, cmd_data;
|
||||
requst_data_stream >> cmd_type >> device_guid;
|
||||
if ( cmd_type == "START" ) {
|
||||
requst_data_stream >> cmd_data;
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data;
|
||||
processStartMeasureCmd(&tcp_socket, device_guid, cmd_data);
|
||||
} else if (cmd_type == "STOP") {
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid;
|
||||
processStopMeasureCmd(&tcp_socket, device_guid);
|
||||
} else if (cmd_type == "SET") {
|
||||
requst_data_stream >> cmd_data;
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data;
|
||||
processSetDeviceMeasureConfigParamsCmd(&tcp_socket, device_guid, cmd_data);
|
||||
} else if (cmd_type == "CLEAR") {
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid;
|
||||
processClearDataCmd(&tcp_socket, device_guid);
|
||||
} else if (cmd_type == "DEVICE") {
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid;
|
||||
processGetMeasureDeviceListCmd(&tcp_socket);
|
||||
// 读满完整数据
|
||||
if (_requst_buffer->size() >= _requst_data_len) {
|
||||
QByteArray requst_data = _requst_buffer->left(_requst_data_len);
|
||||
_requst_buffer->remove(0, _requst_data_len);
|
||||
QLOG_INFO() << QStringLiteral(u"接收请求数据长度%1").arg(requst_data.size());
|
||||
QThread* process_thread = QThread::create(&MeasureTask::processRequstData, this, requst_data);
|
||||
connect(process_thread, &QThread::finished, process_thread, &QThread::deleteLater);
|
||||
process_thread->start();
|
||||
_requst_data_len = 0;
|
||||
} else {
|
||||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("UNKNOW") << false << QStringLiteral(u"未知请求");
|
||||
tcp_socket.write(replay_data);
|
||||
tcp_socket.flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("UNKNOW") << false << QStringLiteral(u"请求信息空异常");
|
||||
tcp_socket.write(replay_data);
|
||||
tcp_socket.flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
}
|
||||
tcp_socket.disconnectFromHost();
|
||||
if (tcp_socket.state() != QTcpSocket::UnconnectedState) {
|
||||
tcp_socket.waitForDisconnected(1000);
|
||||
}
|
||||
}
|
||||
|
||||
void MeasureTask::processStartMeasureCmd(QTcpSocket* socket, const QString &device_guid, const QString &cmd_data)
|
||||
void MeasureTask::onClientDisconnected()
|
||||
{
|
||||
QLOG_INFO() << QStringLiteral(u"客户端%1断开连接").arg(_tcp_socket->peerAddress().toString());
|
||||
quit();
|
||||
}
|
||||
|
||||
void MeasureTask::onSocketError(QAbstractSocket::SocketError error)
|
||||
{
|
||||
Q_UNUSED(error);
|
||||
QLOG_INFO() << QStringLiteral(u"套接字错误:").arg(_tcp_socket->errorString());
|
||||
quit();
|
||||
}
|
||||
|
||||
void MeasureTask::processRequstData(const QByteArray &requst_data)
|
||||
{
|
||||
QDataStream requst_data_stream(requst_data);
|
||||
QString cmd_type, device_guid, cmd_data;
|
||||
requst_data_stream >> cmd_type >> device_guid;
|
||||
if (cmd_type == "START") {
|
||||
requst_data_stream >> cmd_data;
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data;
|
||||
processStartMeasureCmd(device_guid, cmd_data);
|
||||
} else if (cmd_type == "STOP") {
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid;
|
||||
processStopMeasureCmd(device_guid);
|
||||
} else if (cmd_type == "SET") {
|
||||
requst_data_stream >> cmd_data;
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid << cmd_data;
|
||||
processSetDeviceMeasureConfigParamsCmd(device_guid, cmd_data);
|
||||
} else if (cmd_type == "CLEAR") {
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid;
|
||||
processClearDataCmd(device_guid);
|
||||
} else if (cmd_type == "DEVICE") {
|
||||
QLOG_INFO() << QStringLiteral(u"处理请求:") << cmd_type << device_guid;
|
||||
processGetMeasureDeviceListCmd();
|
||||
} else {
|
||||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("UNKNOW") << false << QStringLiteral(u"未知请求");
|
||||
this->replayClient(replay_data);
|
||||
}
|
||||
}
|
||||
|
||||
void MeasureTask::processStartMeasureCmd(const QString& device_guid, const QString& cmd_data)
|
||||
{
|
||||
QByteArray json_data = cmd_data.toUtf8();
|
||||
QJsonDocument json_doc = QJsonDocument::fromJson(json_data);
|
||||
|
|
@ -133,9 +165,7 @@ void MeasureTask::processStartMeasureCmd(QTcpSocket* socket, const QString &devi
|
|||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
if (!errors.isEmpty()) {
|
||||
replay_data_stream << QString("START") << false << errors.join("\n");
|
||||
socket->write(replay_data);
|
||||
socket->flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
this->replayClient(replay_data);
|
||||
} else {
|
||||
const QString& measure_data_gvf_filename = MeasureDeviceController::Instance()->GetMeasureGvfDataFilename();
|
||||
QString replay_info;
|
||||
|
|
@ -148,24 +178,20 @@ void MeasureTask::processStartMeasureCmd(QTcpSocket* socket, const QString &devi
|
|||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("START") << ok << replay_info;
|
||||
socket->write(replay_data);
|
||||
socket->flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
this->replayClient(replay_data);
|
||||
}
|
||||
}
|
||||
|
||||
void MeasureTask::processStopMeasureCmd(QTcpSocket* socket, const QString &device_guid)
|
||||
void MeasureTask::processStopMeasureCmd(const QString& device_guid)
|
||||
{
|
||||
MeasureDeviceController::Instance()->StopMeasure(device_guid);
|
||||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("STOP") << true << QStringLiteral(u"停止测量完成");
|
||||
socket->write(replay_data);
|
||||
socket->flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
this->replayClient(replay_data);
|
||||
}
|
||||
|
||||
void MeasureTask::processSetDeviceMeasureConfigParamsCmd(QTcpSocket* socket, const QString &device_guid, const QString &cmd_data)
|
||||
void MeasureTask::processSetDeviceMeasureConfigParamsCmd(const QString& device_guid, const QString& cmd_data)
|
||||
{
|
||||
QByteArray json_data = cmd_data.toUtf8();
|
||||
QJsonDocument json_doc = QJsonDocument::fromJson(json_data);
|
||||
|
|
@ -206,23 +232,19 @@ void MeasureTask::processSetDeviceMeasureConfigParamsCmd(QTcpSocket* socket, con
|
|||
} else {
|
||||
replay_data_stream << QString("SET") << ok << QStringLiteral(u"设置测量参数完成");
|
||||
}
|
||||
socket->write(replay_data);
|
||||
socket->flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
this->replayClient(replay_data);
|
||||
}
|
||||
|
||||
void MeasureTask::processClearDataCmd(QTcpSocket* socket, const QString &device_guid)
|
||||
void MeasureTask::processClearDataCmd(const QString& device_guid)
|
||||
{
|
||||
MeasureDeviceController::Instance()->ClearData(device_guid);
|
||||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("CLEAR") << true << QStringLiteral(u"清除数据完成");
|
||||
socket->write(replay_data);
|
||||
socket->flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
this->replayClient(replay_data);
|
||||
}
|
||||
|
||||
void MeasureTask::processGetMeasureDeviceListCmd(QTcpSocket* socket)
|
||||
void MeasureTask::processGetMeasureDeviceListCmd()
|
||||
{
|
||||
QStringList device_list = MeasureDeviceController::Instance()->GetMeasureDeviceList();
|
||||
bool ok = !device_list.isEmpty();
|
||||
|
|
@ -230,7 +252,7 @@ void MeasureTask::processGetMeasureDeviceListCmd(QTcpSocket* socket)
|
|||
QByteArray replay_data;
|
||||
QDataStream replay_data_stream(&replay_data, QIODevice::Append);
|
||||
replay_data_stream << QString("DEVICE");
|
||||
if ( ok ) {
|
||||
if (ok) {
|
||||
replay_data_stream << ok << device_list.size();
|
||||
foreach (const QString& device_id, device_list) {
|
||||
replay_data_stream << device_id;
|
||||
|
|
@ -238,7 +260,17 @@ void MeasureTask::processGetMeasureDeviceListCmd(QTcpSocket* socket)
|
|||
} else {
|
||||
replay_data_stream << ok << QStringLiteral(u"测量设备未找到");
|
||||
}
|
||||
socket->write(replay_data);
|
||||
socket->flush();
|
||||
QLOG_INFO() << QStringLiteral(u"请求反馈:") << QString::fromUtf8(replay_data);
|
||||
this->replayClient(replay_data);
|
||||
}
|
||||
|
||||
void MeasureTask::replayClient(const QByteArray &replay_data)
|
||||
{
|
||||
if (!_tcp_socket->isOpen()) {
|
||||
QLOG_WARN() << QStringLiteral(u"未连接,发送失败");
|
||||
return;
|
||||
}
|
||||
QByteArray replay_buf = Protocol::PackData(replay_data);
|
||||
_tcp_socket->write(replay_buf);
|
||||
_tcp_socket->flush();
|
||||
QLOG_DEBUG() << QStringLiteral(u"发送数据大小: %1 字节").arg(replay_buf.size());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,18 +13,25 @@ public:
|
|||
|
||||
void run() override;
|
||||
|
||||
private:
|
||||
void processStartMeasureCmd(QTcpSocket* socket, const QString& device_guid, const QString& cmd_data);
|
||||
void processStopMeasureCmd(QTcpSocket* socket, const QString& device_guid);
|
||||
void processSetDeviceMeasureConfigParamsCmd(QTcpSocket* socket, const QString& device_guid, const QString& cmd_data);
|
||||
void processClearDataCmd(QTcpSocket* socket, const QString& device_guid);
|
||||
void processGetMeasureDeviceListCmd(QTcpSocket* socket);
|
||||
|
||||
signals:
|
||||
void errorOccurred(const QString &errorString);
|
||||
private slots:
|
||||
void onClientRequstData();
|
||||
void onClientDisconnected();
|
||||
void onSocketError(QAbstractSocket::SocketError error);
|
||||
|
||||
private:
|
||||
int socketDescriptor;
|
||||
void processRequstData(const QByteArray& requst_data);
|
||||
void processStartMeasureCmd(const QString& device_guid, const QString& cmd_data);
|
||||
void processStopMeasureCmd(const QString& device_guid);
|
||||
void processSetDeviceMeasureConfigParamsCmd(const QString& device_guid, const QString& cmd_data);
|
||||
void processClearDataCmd( const QString& device_guid);
|
||||
void processGetMeasureDeviceListCmd();
|
||||
void replayClient(const QByteArray& replay_data);
|
||||
|
||||
private:
|
||||
int _socket_descriptor;
|
||||
QTcpSocket * _tcp_socket;
|
||||
QByteArray * _requst_buffer;
|
||||
qint32 _requst_data_len;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -28,7 +28,8 @@ SOURCES += \
|
|||
HEADERS += \
|
||||
$${PWD}/MeasureServer.h \
|
||||
$${PWD}/MeasureTask.h \
|
||||
MeasureDeviceController.h
|
||||
MeasureDeviceController.h \
|
||||
MeasureServiceProtocol.h
|
||||
|
||||
|
||||
DEFINES += ENABLE_DEBUG
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user