/* * WorkFlowDecompose.h * * Created on: Feb 27, 2015 * Author: dev */ #ifndef PAI_FRAME_WORKFLOWENGINE_WORKFLOWDECOMPOSE_H #define PAI_FRAME_WORKFLOWENGINE_WORKFLOWDECOMPOSE_H #include "Utils.h" #include "WorkFlowFileWrapper.h" #include "WorkFlowFile.h" #include "PISortWorkFlowFile.h" #include "StatApplyWorkFlowFile.h" #include "OffsetWorkFlowFile.h" #include "TempDataFormat.h" #include #include #include #include class ConcatenateDecomposeTest; class SpecialModuleDecomposeTest; class SortServiceDecomposeTest; using namespace pai::utils; namespace pai{ namespace workflow{ struct InputOutputModulePair { CModuleInformation* input; CModuleInformation* output; }; class CWorkFlowDecompose { public: CWorkFlowDecompose(const int &startModuleId); virtual ~CWorkFlowDecompose(); /** * @brief 拆分工作流的入口函数,其他类需要实现此函数进行分类处理 * @param[in] workflowFileWrapper 原始的 CWorkFlowFileWrapper */ virtual void DecomposeWorkFlowFile(CWorkFlowFileWrapper* workflowFileWrapper); /** * @brief 获取新增模块的stepid * @return 新的id */ int GetNewStepModuleID(); /** * @brief 获取新增工作流的优先级 * @return 数字,越小代表工作流越先执行 */ int GetWorkflowFilePriority(); /** * @brief 优先级清零 */ void ClearWorkflowFilePriority(); /** * @brief 获取上游的所有模块组成的工作流列表 * @param[in] port2UpStreamModuleIDs 流入这个端口的上游模块 * @param[in] conflictPortsList 有交集端口 * @param[in/out] workflowList 存储结果 */ void GetUpStreamWorkflowList(std::map >& port2UpStreamModuleIDs, std::vector >& conflictPortsList, std::vector >& workflowList); /** * @brief 判断key2UpStreamModuleIDs value中是否有交集,并记录下这些key * @param[in] key2UpStreamModuleIDs key为模块端口或moduleid,value为流入这个端口或模块的上游模块列表 * @param[in/out] conflictKeyList 保存上游模块集合有交集key的集合 */ void GetConflictList(std::map >& key2UpStreamModuleIDs, std::vector >& conflictKeyList); /** * @brief 返回原始工作流中id最大值 * @param[in] workflowFile 原始workflowFile对象 * @return 返回原始工作流中id最大值 */ static int GetMaxStepIdInWorkFlowFile(CWorkFlowFile* workflowFile); /** * @brief 复制一份workflowFile中指定moduleID的moduleinformation * @param[in] workflowFile * @param[in] moduleID */ static CModuleInformation* CloneModuleInformationByModuleID(CWorkFlowFile* workflowFile, const int moduleID); /** * @brief 删除workflowFile中指定moduleID的moduleinformation * @param[in] workflowFile * @param[in] moduleID */ static bool RemoveModuleInformationByModuleID(CWorkFlowFile* workflowFile, const int moduleID); /** * @brief 复制工作流对象 * @param[in] workflow 复制源 * @return 工作流对象副本 */ static CWorkFlowFile* CloneWorkFlowFile(CWorkFlowFile *workflow); /** * @brief 创建新的CWorkFlowFile对象 * @param[in] workflowFile * @return */ static CWorkFlowFile* GetNewWorkFlowFile(CWorkFlowFile* workflowFile); /** * @brief 创建offset的workflowfile对象 * @param[in] workflowFile * @return */ static COffsetWorkFlowFile* GetNewOffsetWorkFlowFile(CWorkFlowFile* workflowFile); /** * @brief 创建sort的workflowfile对象 * @param[in] workflowFile * @return */ static CPISortWorkFlowFile* GetNewSortWorkFlowFile(CWorkFlowFile* workflowFile, PISortType type); /** * @brief 创建sort的workflowfile对象 * @param[in] workflowFile * @return */ static CStatApplyWorkFlowFile* GetNewStatisApplyWorkFlowFile(CWorkFlowFile* workflowFile); /** * @brief 复制一个相同source的CModuleConnection * @param[in] source 原CModuleConnection信息 */ static CModuleConnection* CloneModuleConnection(CModuleConnection* source); /** * @brief 获取指定模块的指定端口的所有上游的connection,保存到upStreamConnList * @param[in] connections * @param[in] moduleID 指定模块的id * @param[in] ports 模块上的端口号列表 * @param[in/out] upStreamConnList */ void GetUpStreamConnectionByModuleIDAndPorts(std::vector* connections, const int moduleID, std::set& ports, std::vector& upStreamConnList); /** * @brief 返回一对input和output模块 * @param[in/out] pair 存储创建的input和output的结构提 * @return 返回临时数据逻辑文件名 */ std::string GetInputOutputModulePair(InputOutputModulePair& pair); /** * @brief 编辑指定conn的输出信息 * @param[in] conn * @param[in] destModuleID 输出模块的id * @param[in] inputPort 输出模块的端口号 */ void EditConnectionOuputInfo(CModuleConnection* conn, int destModuleID, int inputPort); /** * @brief 编辑指定conn的输入信息 * @param[in] conn * @param[in] sourceModuleID 输入模块的id * @param[in] inputPort 输入模块的端口号 */ void EditConnectionInputInfo(CModuleConnection* conn, int sourceModuleID, int outputPort); /** * @brief 创建信息的CModuleConnection对象,返回 * @param[in] destModuleID * @param[in] inputPort * @param[in] sourceModuleID * @param[in] outputPort * @return */ CModuleConnection* GetNewConnection(int destModuleID, int inputPort, int sourceModuleID, int outputPort); /** * @brief 检查set集合是否有交集 * @param[in] set1 存储moduleid * @param[in] set2 存储moduleid * @return 如果两个set有相同元素,返回true;否则返回false */ static bool CheckSetIntersection(const std::set& set1, const std::set& set2); /** * @brief 保存workflowfile到指定的优先级上,存储到priority2WkfWrapper * @param[in] priority * @param[in] wkfWrapper 需要保存的CWorkFlowFileWrapper对象 */ void InsertPriority2WorkflowFileWrapper(const int priority, CWorkFlowFileWrapper* wkfWrapper); /** * @brief 清除priority2WkfWrapper的内容 */ void CleanPriority2WorkflowFileWrapper(); /** * @brief 清除outputID2InputID的内容 */ void CleanOutputID2InputID(); /** * @brief 获取priority2WkfWrapper */ std::map >& GetPriority2WorkflowFileWrapper(); /** * @brief 获取assignModuleList中的模块在workflowfile中深度,保存到depth2ModuleID * @param[in] assignModuleList * @param[in] connections workflowfile对象的连接信息 * @param[in/out] depth2ModuleID key为深度,value为当前深度上的模块id */ void GetModuleDepthByModuleIDs(std::vector& assignModuleList, std::vector* connections, std::map >& depth2ModuleID); /** * @brief 获取输出前缀 * @return */ static std::string GetPreOutputPath(); /** * @brief 获取一个新路径,使用preOutputPath做前缀 * @return 新路经string */ std::string GetNewPath(); /** * @brief 获取指定concatenate模块的所有端口的上游模块,保存到port2UpStreamModuleIDs中 * @param[in] connections workflowFile的连线信息 * @param[in] moduleID concatenate模块的id * @param[in/out] port2UpStreamModuleIDs key为concatenate模块的输入端口;value为上游模块集合 */ void GetInputPortUpStreamModuleIDs(std::vector* connections, int moduleID, std::map >& port2UpStreamModuleIDs); /** * @brief 从source中复制值到target * @param[in] target * @param[in] source */ void CloneSet(std::set& target, const std::set& source); /** *@brief 记录当前拆分的出来的模块依赖 * @param[in] outputID 输出数据的模块id * @param[in] inputID 输入数据的模块id */ void InsertOutputID2InputID(const int outputID, const int inputID) { outputID2InputID.insert(make_pair(outputID, inputID)); } map GetOutputID2InputID() { return outputID2InputID; } protected: /** * @brief 创建一个输出模块 * @return 输出模块 */ CModuleInformation* CreateOutputModule(); /** * @brief 创建一个输入模块 * @return 输入模块 */ CModuleInformation* CreateInputModule(); /** * @brief 创建一个PISort模块 * @return PISort模块 */ CModuleInformation* CreatePiSortModule(); /** * @brief 将输入、输出map转换成拆分切面 * @return 拆分切面 */ vector ConvertIOPair2DecomposeAspects(map &ioPairs, bool insertMark=false); private: /** * @brief 递归获取指定模块的指定端口的所有上游的connection,保存到upStreamConnList * @param[in] connections * @param[in] moduleID * @param[in/out] upStreamConnList */ void InternalGetUpStreamConnectionByModuleIDAndPorts(std::vector* connections, const int moduleID, std::vector& upStreamConnList); /** * @brief 递归获取有交集上游模块 * @param[in] connections workflowFile的连线信息 * @param[in] moduleID 从concatenate模块的一个输入端口到跟节点之间的一个moduleid * @param[in/out] upStreamModuleIDs 保存遍历到的上游模块 */ void InternalGetInputPortUpStreamModuleIDs(std::vector* connections, const int moduleID, std::set& upStreamModuleIDs); /** * @brief 将source的基本信息复制到dest中 * @param[in] dest * @param[in] source */ static void SetWorkFlowInfo(CWorkFlowFile* dest, CWorkFlowFile* source); private: static long long currTime; static std::string preOutputPath; int workflowFilePriority;//工作流优先级 int nextModuleId; std::map outputID2InputID;//存储新创建的output和input的id的映射 std::map > priority2WkfWrapper;//存储拆分后的工作流 // std::vector workflowDecomposes;// // CWorkFlowFile* tempWorkflowFile;//用于拆分的工作流对象 // friend class ::WorkFlowDecomposeTest;//友元类,单元测试使用 friend class ::ConcatenateDecomposeTest; friend class ::SpecialModuleDecomposeTest; friend class ::SortServiceDecomposeTest; }; } } #endif /* WORKFLOWDECOMPOSE_H_ */