logplus/Workflow/WFEngine/Module/include/Buffer.h
2026-01-16 17:18:41 +08:00

338 lines
13 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file Buffer.h
* @brief 文件主要定义了工作流Buffer、工作流进度、以及与Buffer相关的操作
* @author Dev
* @date 2011-5-25
*/
#ifndef PAI_FRAME_MODULEAPI_BUFFER_H
#define PAI_FRAME_MODULEAPI_BUFFER_H
// #include "BufferElement.h"
// #include "ModuleContext.h"
// #include "WorkflowContext.h"
#include <time.h>
#include <iostream>
#include <map>
#include <vector>
class BufferTest;
namespace pai {
namespace module {
/**
* Buffer可能会被模块用来作为输入和输出
*/
const int BufferAsInput = 0; /**< 输入Buffer标识*/
const int BufferAsOutput = 1; /**< 输出Buffer标识*/
const unsigned int MaxBufferSize = 2000; /**< 默认Buffer的大小*/
/**
* @var typedef void* ModuleId
* @brief 定义一个任意类型的指针
*/
typedef void* ModuleId;
/**
* @brief 模块进度信息
*/
struct SProcessInfo {
std::string module_runtime_id; /**< 处理的模块运行时ID */
int port; /**< 模块链接端口 */
int tracenum; /**< 处理道数 */
int iotype; /**< 输入和输出类型 */
int time; /**< 两次操作间到时间差*/
SProcessInfo():module_runtime_id(""),port(0),tracenum(0),iotype(0),time(0){}
};
/**
* @brief Buffer管理的的目标
*/
enum BUFFER_MANAGER_TARGET {
BMT_MODULE, /**< 模块使用的Buffer */
BMT_OTHER /**< 非模块使用的Buffer */
};
/**
* @var typedef std::map<const ModuleId, unsigned long> ModuleProcessIndex
* @brief 读数据模块,当前读取的计数信息
*/
typedef std::map<const ModuleId, unsigned long> ModuleProcessIndex;
/**
* @var typedef std::map<const ModuleId, SProcessInfo> ModuleProcess
* @brief 当前Buffer关联的所有读写模块
*/
typedef std::map<const ModuleId, SProcessInfo> ModuleProcess;
/**
* @var typedef void (*Callback)(const SProcessInfo info)
* @brief 定义一个回调函数指针
*/
typedef void (*Callback)(const SProcessInfo info);
class CModule;
/**
* @class CBuffer
* @brief 模块间通讯,存储模块处理的数据
*/
class PAI_MODULE_EXPORT CBuffer {
private:
CBuffer(const CBuffer& buffer);
CBuffer& operator=(const CBuffer& buffer);
public:
/**
* @brief 带有回调函数到构造方法.构造方法用来记录每个模块到处理进度信息。
* @param[in] size 缓冲区大小
* @param[in] target Buffer管理的目标类型
* @param[in] cbUpdateModuleProgress 回调函数
*/
CBuffer(int size, BUFFER_MANAGER_TARGET target, Callback cbUpdateModuleProgress);
/**
* @brief 带有回调函数到构造方法.构造方法用来记录每个模块到处理进度信息。
* @param[in] size 缓冲区大小
* @param[in] cbUpdateModuleProgress 回调函数
*/
CBuffer(int size, Callback cbUpdateModuleProgress);
/**
* @brief 构造方法.
* @param[in] size 缓冲区大小
*/
CBuffer(int size);
/**
* @brief 虚析构函数
*/
virtual ~CBuffer();
/**
* @deprecated
* @see void SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type);
* @brief 存buffer元素到buffer该元素中可能存放了道数据也可能是各种标志元素目前这些标志元素的类型请参考ElementType。
* @param[in] moduleID 模块id默认请使用当前模块指针即this
*/
// void SetNextElement(CBufferElement* element, ModuleId moduleId);
/**
* @deprecated
* @see CBufferElement* GetNextElement(ModuleId moduleID, ElementType &type);
* @brief 获取一个buffer元素即Trace该元素中可能存放了道数据也可能是各种标志元素目前这些标志元素的类型请参考ElementType。
* 使用本接口获得数据,必须对数据的类型进行判断
* @param[in] moduleID 模块id默认请使用当前模块指针即this
* @return 返回下个元素的指针
*/
// CBufferElement* GetNextElement(ModuleId moduleID);
/**
* @brief 获取一个当前(第N道)的buffer元素如果第N道是Any Marker就一直Skip到TRACE(或者Skip到DATA_END_MARKER)结束;
* 获取到TRACE后(需要判断当前位置是否是DATA_END_MARKER),预读后面数据的数据类型:
* a.如果第N+1道数据类型是TRACE,返回类型是TRACE
* b.如果第N+1道数据类型是DATA_END_MARKER,返回类型是DATA_END_MARKER并Skip这个Element
* c.如果第N+1道数据类型是GATHER_END_MARKER/LINE_END_MARKER,首先Skip这个Element
* 然后在读第N+2道数据是否TRACE如果是返回N+1道Marker的类型
* 如果N+2道数据类型依然Any MARKER一直Skip(N+2,N+3...)到TRACE为止或者到DATA_END_MARKER
* 对于中间多个MARKER这种情况返回最大MARKER(GATHER_END_MARKER < LINE_END_MARKER < DATA_END_MARKER)
* 获取buffer元素之后并用输出参数type返回道集、线、数据是否结束。
* @remark 对于目前Module实现的逻辑来说认为Get一定会有数据认为返回返回NULL是异常情况直接OVER
* 但是这样是不严谨的如果Buffer里没有输入任何数据/就只有一个DATA_END_MARKER那么Get只能返回NULL,Type=DATA_END_MARKER
* @param[in] moduleID 当前模块id
* @param[in/out] type 下面数据的数据类型
* type为TRACE正常道数据
* type为GATHER_ENDMARKER道集结束该道数据为道集的最后一道
* type为LINE_ENDMARKER线结束该道数据为线的最后一道
* type为DATA_ENDMARKER数据结束该道数据为数据的最后一道
* @return 返回下一个道数据,如果获取到数据结尾(DATA_END_MARKER)依然没有返回NULL
*/
// CBufferElement* GetNextElement(ModuleId moduleID, ElementType &type);
/**
* @brief 保存一个buffer元素即Trace保存完成buffer元素之后根据模块开发者指定的type类型保存不同的分隔符。type 当前道下一道数据类型
* type为TRACE则不保存分隔符
* type为GATHER_ENDMARKER保存道结束分隔符
* type为LINE_ENDMARKER保存线结束分隔符
* type为DATA_ENDMARKER保存数据流结束分割符
*
* @param[in] element 要保存的元素
* @param[in] moduleID 当前模块id
* @param[in] type 当前道下一道数据类型 TRACE-->正常道数据 GATHER_ENDMARKER-->道结束 LINE_ENDMARKER-->线结束 DATA_ENDMARKER-->数据流结束
*/
// void SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type);
////////////////////////////////////////////////////////////////////////
//FIXME:GetNextElements/SetNextElements 目前没有人调用,而且这个两个方法目前
//不太严谨当一个Gather大于BufferSize的时候就会出现deadlock,建议不要继续使用
////////////////////////////////////////////////////////////////////////
/**
* @deprecated
* @brief 存入一个道集
* 1. 从目前的processIndex到m_size之间有gatherSplitMarker存在
* @param[in] gather 道集
* @param[in] moduleId 模块号默认请使用当前模块指针即this
* @exception pai::error::invalid_argument 道集参数为空,道集长度大于缓冲数组长度时抛出异常
*/
// void SetNextElements(std::vector<CBufferElement*> gather, ModuleId moduleId );
/**
* @deprecated
* @brief 读取下一个道集
* @param[in] moduleID 模块号默认请使用当前模块指针即this
* @return 下一个道集
*/
// std::vector<CBufferElement*> GetNextElements(ModuleId moduleID);
/**
* @brief 获得buffer的长度
* @return buffer的长度
*/
int Size() const {
return static_cast<int>(m_size);
}
/**
* @brief 判断Buffer是否为空
* @return 若buffer中没有元素则返回true否则返回false
*/
bool IsEmpty() const {
return m_size == 0;
}
/**
* @brief 仅供workflow调用模块开发人员请不要调用用于添加后面的使用模块用来关联模块之间的关系
* @param[in] moduleID 待关联模块,一般为后续模块的指针
*/
// void AddReferenceModule(ModuleId module);
/***
* @brief 仅供workflow调用模块开发人员请不要调用添加进度处理上下文信息
* @param[in] moduleID 待关联模块,一般为后续模块的指针
* @param[in] info 进度信息
*/
void AddProcessContext(ModuleId moudle, const SProcessInfo& info) {
this->m_moduleProcess[moudle] = info;
}
/**
* @brief 设置Buffer的元数据信息
* @param[in] key 元数据关键字
* @param[in] anything 元数据内容
*/
// void SetMetaData(const std::string& key, Any anything);
/**
* @brief 获取Buffer元数据信息
* @param[in] key 元数据关键字
* @return 获取到的元数据信息
*/
template<typename T>
T GetMetaData(const std::string& key)
{
return this->m_context.Get<T>(key);
};
/**
* @brief 是否有对应的元数据信息
* @param[in] key 元信息的key
* @return 若BufferContext中含有指定key的元信息则返回true否则返回false
*/
bool HasMetaData(const std::string& key);
private:
/**
* @brief 预读下一个Element
*/
// CBufferElement* PreReadyNextElement(ModuleId moduleID);
/**
* @brief skip连续的Marker(上一个element是marker)
*/
// void SkipContinuousMarkerElement(ModuleId moduleID, ElementType &type);
/**
* @brief Skip&free 当前的这个Element.
*/
// void SkipCurrentElement(CBufferElement* element,ModuleId moduleID);
/**
* @brief 预读下一个Element的数据类型
*/
// void PreReadyNextElementType(ModuleId moduleID, ElementType &type);
/**
* @brief 获取下一个Trace如果获取到DATA_END_MARKER都没有获取到Trace返回NULL
*/
// CBufferElement* GetTraceNextElement(ModuleId moduleID);
// CopyType CalculateCopyType(const std::string& caller, ModuleId);
/**
* Readable & Writable become inline as they both are invoked frequently in
* limited place. although they are not small method, they will not increase
* too much size in final executable.
*/
/**
* Module is allow to read from the buffer, the conditions must satisfy are:
* 1. module.current - inputIndex < 0
*/
bool Readable(const std::string& caller, ModuleId moduleId);
/**
* InputMoudle is allow to write to the buffer, the conditions must satisfy are:
* 1. inputIndex - MAX(processIndex) >=0
* 2. inputIndex - MIN(processIndex) < size //buffer size
*/
bool Writable(const std::string& caller);
/***
* @brief 在buffer不能满足读取或者写入条件的时候阻塞当前线程
*/
inline void Block();
/***
* @brief 缓存初始化函数,用来初始化部分变量
*/
void Init();
/**
* @brief 判断下一个道集是否就绪 在下列情况下,道集就绪
* 从目前的processIndex到m_size之间有gatherSplitMarker存在
* @param[in] moduleID 模块号,默认请使用当前模块指针即this
* @param[out] nextGatherEndIndex 下一个道集的最后一道的索引
* @return bool 下一个道集是否就绪
*/
bool NextElementsIsReady(ModuleId moduleID,unsigned long & nextGatherEndIndex);
/**
* @brief 根据模块参数获得相应的分量道数据
* @param[in] moduleID 模块指针
* @param[out] element buffer指针
*/
// void GetMultiTraceToElement(ModuleId moduleID,CBufferElement* element);
/**
* @brief 根据模块参数设置相应的分量
* @param[in] moduleID 模块指针
* @param[out] element buffer指针
*/
// void SetMultiTraceToElement(ModuleId moduleID,CBufferElement* element);
/**
* @brief 获取多波中被选中的分量
* @param[in] moduleID 模块指针
* @param[in] componentMap 所有分量ID及分量对应的数据类型值
* @param[out] dataType 返回被选中的分量
* @return bool 如果存在被选中的分量则返回true否则返回false
*/
bool GetSelectedMultiwaveComponent(ModuleId moduleID ,std::map< std::string ,int > & componentMap , int & dataType );
private:
//TODO remove this after finding a better solution
friend class ::BufferTest;
// CBufferElement** m_elements;
unsigned m_size;
volatile unsigned m_elementCount;
/**
* 用于做进度统计的回调函数
*/
Callback m_cbUpdateModuleProgress;
/**
* the processIndex must be initialized for each module using it when the Buffer is created
*/
ModuleProcessIndex m_processIndex;
/**
* 模块进度信息
*/
ModuleProcess m_moduleProcess;
/**
* elementCount is the number of elements has been written to the buffer by previous modules
*/
time_t m_prevGetTime;
time_t m_prevSetTime;
BUFFER_MANAGER_TARGET m_target;
// BufferContext m_context;
};
}
}
#endif