/** * @file Buffer.h * @brief 文件主要定义了工作流Buffer、工作流进度、以及与Buffer相关的操作 * @author Dev * @date 2011-5-25 */ #ifndef PAI_FRAME_MODULEAPI_BUFFER_H #define PAI_FRAME_MODULEAPI_BUFFER_H // #include "BufferElement.h" // #include "ModuleContext.h" // #include "WorkflowContext.h" #include #include #include #include class BufferTest; namespace pai { namespace module { /** * Buffer可能会被模块用来作为输入和输出 */ const int BufferAsInput = 0; /**< 输入Buffer标识*/ const int BufferAsOutput = 1; /**< 输出Buffer标识*/ const unsigned int MaxBufferSize = 2000; /**< 默认Buffer的大小*/ /** * @var typedef void* ModuleId * @brief 定义一个任意类型的指针 */ typedef void* ModuleId; /** * @brief 模块进度信息 */ struct SProcessInfo { std::string module_runtime_id; /**< 处理的模块运行时ID */ int port; /**< 模块链接端口 */ int tracenum; /**< 处理道数 */ int iotype; /**< 输入和输出类型 */ int time; /**< 两次操作间到时间差*/ SProcessInfo():module_runtime_id(""),port(0),tracenum(0),iotype(0),time(0){} }; /** * @brief Buffer管理的的目标 */ enum BUFFER_MANAGER_TARGET { BMT_MODULE, /**< 模块使用的Buffer */ BMT_OTHER /**< 非模块使用的Buffer */ }; /** * @var typedef std::map ModuleProcessIndex * @brief 读数据模块,当前读取的计数信息 */ typedef std::map ModuleProcessIndex; /** * @var typedef std::map ModuleProcess * @brief 当前Buffer关联的所有读写模块 */ typedef std::map ModuleProcess; /** * @var typedef void (*Callback)(const SProcessInfo info) * @brief 定义一个回调函数指针 */ typedef void (*Callback)(const SProcessInfo info); class CModule; /** * @class CBuffer * @brief 模块间通讯,存储模块处理的数据 */ class PAI_MODULE_EXPORT CBuffer { private: CBuffer(const CBuffer& buffer); CBuffer& operator=(const CBuffer& buffer); public: /** * @brief 带有回调函数到构造方法.构造方法用来记录每个模块到处理进度信息。 * @param[in] size 缓冲区大小 * @param[in] target Buffer管理的目标类型 * @param[in] cbUpdateModuleProgress 回调函数 */ CBuffer(int size, BUFFER_MANAGER_TARGET target, Callback cbUpdateModuleProgress); /** * @brief 带有回调函数到构造方法.构造方法用来记录每个模块到处理进度信息。 * @param[in] size 缓冲区大小 * @param[in] cbUpdateModuleProgress 回调函数 */ CBuffer(int size, Callback cbUpdateModuleProgress); /** * @brief 构造方法. * @param[in] size 缓冲区大小 */ CBuffer(int size); /** * @brief 虚析构函数 */ virtual ~CBuffer(); /** * @deprecated * @see void SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type); * @brief 存buffer元素到buffer,该元素中可能存放了道数据,也可能是各种标志元素,目前这些标志元素的类型请参考ElementType。 * @param[in] moduleID 模块id,,默认请使用当前模块指针,即this */ // void SetNextElement(CBufferElement* element, ModuleId moduleId); /** * @deprecated * @see CBufferElement* GetNextElement(ModuleId moduleID, ElementType &type); * @brief 获取一个buffer元素(即Trace),该元素中可能存放了道数据,也可能是各种标志元素,目前这些标志元素的类型请参考ElementType。 * 使用本接口获得数据,必须对数据的类型进行判断 * @param[in] moduleID 模块id,,默认请使用当前模块指针,即this * @return 返回下个元素的指针 */ // CBufferElement* GetNextElement(ModuleId moduleID); /** * @brief 获取一个当前(第N道)的buffer元素,如果第N道是Any Marker就一直Skip到TRACE(或者Skip到DATA_END_MARKER)结束; * 获取到TRACE后(需要判断当前位置是否是DATA_END_MARKER),预读后面数据的数据类型: * a.如果第N+1道数据类型是TRACE,返回类型是TRACE * b.如果第N+1道数据类型是DATA_END_MARKER,返回类型是DATA_END_MARKER,并Skip这个Element * c.如果第N+1道数据类型是GATHER_END_MARKER/LINE_END_MARKER,首先Skip这个Element; * 然后在读第N+2道数据是否TRACE,如果是返回N+1道Marker的类型; * 如果N+2道数据类型依然Any MARKER,一直Skip(N+2,N+3...)到TRACE为止或者到DATA_END_MARKER; * 对于中间多个MARKER这种情况,返回最大MARKER(GATHER_END_MARKER < LINE_END_MARKER < DATA_END_MARKER) * 获取buffer元素之后,并用输出参数type返回道集、线、数据是否结束。 * @remark 对于目前Module实现的逻辑来说,认为Get一定会有数据,认为返回返回NULL是异常情况,直接OVER; * 但是这样是不严谨的,如果Buffer里没有输入任何数据/就只有一个DATA_END_MARKER,那么Get只能返回NULL,Type=DATA_END_MARKER * @param[in] moduleID 当前模块id * @param[in/out] type 下面数据的数据类型 * type为TRACE,正常道数据; * type为GATHER_ENDMARKER,道集结束,该道数据为道集的最后一道; * type为LINE_ENDMARKER,线结束,该道数据为线的最后一道; * type为DATA_ENDMARKER,数据结束,该道数据为数据的最后一道 * @return 返回下一个道数据,如果获取到数据结尾(DATA_END_MARKER)依然没有,返回NULL */ // CBufferElement* GetNextElement(ModuleId moduleID, ElementType &type); /** * @brief 保存一个buffer元素(即Trace),保存完成buffer元素之后,根据模块开发者指定的type类型保存不同的分隔符。type 当前道下一道数据类型 * type为TRACE,则不保存分隔符; * type为GATHER_ENDMARKER,保存道结束分隔符; * type为LINE_ENDMARKER,保存线结束分隔符; * type为DATA_ENDMARKER,保存数据流结束分割符 * * @param[in] element 要保存的元素 * @param[in] moduleID 当前模块id * @param[in] type 当前道下一道数据类型 TRACE-->正常道数据 GATHER_ENDMARKER-->道结束 LINE_ENDMARKER-->线结束 DATA_ENDMARKER-->数据流结束 */ // void SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type); //////////////////////////////////////////////////////////////////////// //FIXME:GetNextElements/SetNextElements 目前没有人调用,而且这个两个方法目前 //不太严谨,当一个Gather大于BufferSize的时候,就会出现deadlock,建议不要继续使用 //////////////////////////////////////////////////////////////////////// /** * @deprecated * @brief 存入一个道集 * 1. 从目前的processIndex到m_size之间,有gatherSplitMarker存在 * @param[in] gather 道集 * @param[in] moduleId 模块号,默认请使用当前模块指针,即this * @exception pai::error::invalid_argument 道集参数为空,道集长度大于缓冲数组长度时抛出异常 */ // void SetNextElements(std::vector gather, ModuleId moduleId ); /** * @deprecated * @brief 读取下一个道集 * @param[in] moduleID 模块号,默认请使用当前模块指针,即this * @return 下一个道集 */ // std::vector GetNextElements(ModuleId moduleID); /** * @brief 获得buffer的长度 * @return buffer的长度 */ int Size() const { return static_cast(m_size); } /** * @brief 判断Buffer是否为空 * @return 若buffer中没有元素,则返回true;否则返回false */ bool IsEmpty() const { return m_size == 0; } /** * @brief 仅供workflow调用,模块开发人员请不要调用,用于添加后面的使用模块,用来关联模块之间的关系 * @param[in] moduleID 待关联模块,一般为后续模块的指针 */ // void AddReferenceModule(ModuleId module); /*** * @brief 仅供workflow调用,模块开发人员请不要调用,添加进度处理上下文信息 * @param[in] moduleID 待关联模块,一般为后续模块的指针 * @param[in] info 进度信息 */ void AddProcessContext(ModuleId moudle, const SProcessInfo& info) { this->m_moduleProcess[moudle] = info; } /** * @brief 设置Buffer的元数据信息 * @param[in] key 元数据关键字 * @param[in] anything 元数据内容 */ // void SetMetaData(const std::string& key, Any anything); /** * @brief 获取Buffer元数据信息 * @param[in] key 元数据关键字 * @return 获取到的元数据信息 */ template T GetMetaData(const std::string& key) { return this->m_context.Get(key); }; /** * @brief 是否有对应的元数据信息 * @param[in] key 元信息的key * @return 若BufferContext中含有指定key的元信息,则返回true;否则返回false */ bool HasMetaData(const std::string& key); private: /** * @brief 预读下一个Element */ // CBufferElement* PreReadyNextElement(ModuleId moduleID); /** * @brief skip连续的Marker(上一个element是marker) */ // void SkipContinuousMarkerElement(ModuleId moduleID, ElementType &type); /** * @brief Skip&free 当前的这个Element. */ // void SkipCurrentElement(CBufferElement* element,ModuleId moduleID); /** * @brief 预读下一个Element的数据类型 */ // void PreReadyNextElementType(ModuleId moduleID, ElementType &type); /** * @brief 获取下一个Trace,如果获取到DATA_END_MARKER都没有获取到Trace,返回NULL */ // CBufferElement* GetTraceNextElement(ModuleId moduleID); // CopyType CalculateCopyType(const std::string& caller, ModuleId); /** * Readable & Writable become inline as they both are invoked frequently in * limited place. although they are not small method, they will not increase * too much size in final executable. */ /** * Module is allow to read from the buffer, the conditions must satisfy are: * 1. module.current - inputIndex < 0 */ bool Readable(const std::string& caller, ModuleId moduleId); /** * InputMoudle is allow to write to the buffer, the conditions must satisfy are: * 1. inputIndex - MAX(processIndex) >=0 * 2. inputIndex - MIN(processIndex) < size //buffer size */ bool Writable(const std::string& caller); /*** * @brief 在buffer不能满足读取或者写入条件的时候,阻塞当前线程 */ inline void Block(); /*** * @brief 缓存初始化函数,用来初始化部分变量 */ void Init(); /** * @brief 判断下一个道集是否就绪 在下列情况下,道集就绪 * 从目前的processIndex到m_size之间,有gatherSplitMarker存在 * @param[in] moduleID 模块号,,默认请使用当前模块指针,即this * @param[out] nextGatherEndIndex 下一个道集的最后一道的索引 * @return bool 下一个道集是否就绪 */ bool NextElementsIsReady(ModuleId moduleID,unsigned long & nextGatherEndIndex); /** * @brief 根据模块参数获得相应的分量道数据 * @param[in] moduleID 模块指针 * @param[out] element buffer指针 */ // void GetMultiTraceToElement(ModuleId moduleID,CBufferElement* element); /** * @brief 根据模块参数设置相应的分量 * @param[in] moduleID 模块指针 * @param[out] element buffer指针 */ // void SetMultiTraceToElement(ModuleId moduleID,CBufferElement* element); /** * @brief 获取多波中被选中的分量 * @param[in] moduleID 模块指针 * @param[in] componentMap 所有分量ID及分量对应的数据类型值 * @param[out] dataType 返回被选中的分量 * @return bool 如果存在被选中的分量则返回true,否则返回false */ bool GetSelectedMultiwaveComponent(ModuleId moduleID ,std::map< std::string ,int > & componentMap , int & dataType ); private: //TODO remove this after finding a better solution friend class ::BufferTest; // CBufferElement** m_elements; unsigned m_size; volatile unsigned m_elementCount; /** * 用于做进度统计的回调函数 */ Callback m_cbUpdateModuleProgress; /** * the processIndex must be initialized for each module using it when the Buffer is created */ ModuleProcessIndex m_processIndex; /** * 模块进度信息 */ ModuleProcess m_moduleProcess; /** * elementCount is the number of elements has been written to the buffer by previous modules */ time_t m_prevGetTime; time_t m_prevSetTime; BUFFER_MANAGER_TARGET m_target; // BufferContext m_context; }; } } #endif