logplus/Workflow/WFEngine/Module/include/Module.h
2026-01-16 17:18:41 +08:00

442 lines
15 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file Module.h
* @brief 文件定义了与模块相关的类及接口
* @author dev
* @date 2011-4-28
*/
#ifndef PAI_FRAME_MODULEAPI_MODULE_H
#define PAI_FRAME_MODULEAPI_MODULE_H
#include "Turtle.h"
// #include "Log.h"
#include "Buffer.h"
#include "ModuleCheckResult.h"
#include "ModuleFactory.h"
#include "ModuleMetaData.h"
#include "ModuleParameter.h"
// #include "WorkflowContext.h"
// #include "DataSetInfo.h"
#include <string>
#include <vector>
#include <assert.h>
namespace pai
{
namespace module
{
// using namespace pai::ios::seis;
/**
* @brief 模块运行状态
*/
enum STATUS
{
WAIT_INPUT, /**< 等待buffer中的有可读的数据 */
OVER /**< 模块运行完毕 */
};
/**
* @brief 模块并行的阶段
*/
enum Phase{
Map, /**< 模块在hadoop的map阶段运行 */
Reduce /**< 模块在hadoop的reduce阶段运行 */
};
const std::string CONST_PROP_WORKFLOW_CLUSTER_TMP = "WORKFLOW_CLUSTER_TMP"; /**< 集群临时目录常量 */
const std::string CONST_PROP_ATTEMPTASK_DIR = "mapred.local.dir"; /**< hadoop Task的本地临时目录常量 */
const std::string CONST_FILEPATH_SEPARATOR = "/"; /**< 文件分隔符常量 */
/**
* @class CModule
* @brief 此类是模块开发规范的基类。所有的算法模块都必须继承此类。
* 目前,此类所定义的接口随着验证的进展,将会不断地进行完善和重构。<br>
* 在开发具体的子模块时,需要做如下的几件事情:
* <li>1. 设计并实现子模块对应的类,它继承<code>CModule</code>类</li>
* <li>2. 实现Run()和validate()两个纯虚函数</li>
* <li>3. 实现子模块对应的参数类</li>
* <li>4. 编写模块配置文件,其中包括模块相关的元信息、参数信息</li>
* 模块开发结束后,需要将此模块打包为动态链接库
*
* <p>模块测试:
* UT中创建Module的时候最好使用ModuleManager中的工厂方法(CreateModuleByClassName)
* 否则创建的模块没有模块元数据,以及对应的输入、输出缓存
*/
class PAI_MODULE_EXPORT CModule
{
private:
CModule& operator=(const CModule& moudle);
public:
/**
* @brief 构造函数
*/
CModule();
/**
* @brief 拷贝构造函数
*/
CModule(const CModule& srcModule);
//welllog 特殊模块使用
virtual CModule* Clone(){return NULL;};
/**
* @brief 虚析构函数
*/
virtual ~CModule();
/**
* @brief 获取模块的输入缓冲buffer
* @param[in] port 该模块输入端口标号。第一个端口号为0第二个端口号为1以此类推
* @return 该模块输入缓冲<code>CBuffer</code>的指针
*/
virtual CBuffer* GetInputBuffer(int port)
{
return inputBuffers[port];
}
/**
* @brief 获取模块的输出缓冲
* @param[in] port 该模块输出端口标号。第一个端口号为0第二个端口号为1以此类推
* @return 该模块输入缓冲<code>CBuffer</code>的指针
*/
virtual CBuffer* GetOutputBuffer(int port)
{
return outputBuffers[port];
}
/**
* @brief 设置模块对应输入端口的buffer
* @param[in] port 模块输入缓冲端口号
* @param[in] buffer 该输入端口对应的缓冲区<code>CBuffer</code>指针
*/
// void SetInputBuffer(int port, CBuffer* buffer);
/**
* @brief 设置模块对应输出端口
* @param[in] port 模块输出缓冲端口号
* @param[in] buffer 该输出端口对应的缓冲区<code>CBuffer</code>指针
*/
void SetOutputBuffer(int port, CBuffer* buffer);
// /**
// * 设置模块的上下文信息
// * @param context 模块的上下文信息
// */
// void SetModuleContext(SModuleContext& context);
//
// WorkflowContext& SetWorkflowContext() const;
/**
* @brief 模块的核心计算函数。在作业的map阶段执行
* <p>此函数应该实现如下内容:
* <li>1. 从输入缓冲中获取数据</li>
* <li>2. 判断数据是否满足要求如果满足则进行计算计算技术后返回WAIT_INPUT</li>
* <li>3. 如果不满足则直接返回WAIT_INPUT</li>
* <li>4. 如果缓冲区的数据已经到达数据尾部则返回OVER</li>
* <li>5. 计算结果数据要存放到输出缓冲中</li>
* @return 返回模块的状态
*/
virtual STATUS Run() = 0;
/**
* @brief 模块在作业运行之前执行的方法整个作业就执行一次目前仅供inputmodule/outputmodule来override其他模块不得调用不得override
*/
virtual void RunBeforeJob()
{
}
/**
* @brief 模块在作业运行之后执行的方法整个作业就执行一次目前仅供inputmodule/outputmodule来override其他模块不得调用不得override
*/
virtual void RunAfterJob()
{
}
/**
* @brief 模块在作业失败或者killed之后执行的方法整个作业就执行一次
* @param[isFailed] 作业是否失败true为失败false为被杀死
*/
virtual void RunAfterJobAbort(bool /*isFailed*/)
{
}
/**
* @brief 模块在task失败或者killed之后执行的方法每個失敗或者killed的task都會执行一次
*/
virtual void RunAfterTaskAbort()
{
}
/**
* @brief 模块在作業提交前運行一次的方法,整个作业就执行一次, 暂时留做扩展用,请勿使用
*/
virtual void RunBeforSubmit()
{
}
/**
* @brief 设置模块参数
* @param[in] _parammmmeter 模块所对应的参数<code>CModuleParameter</code>对象指针
*/
virtual void SetParameters(CModuleParameter* _parameter)
{
parameter = _parameter;
}
/**
* @brief 获取模块参数
* @return 模块所对应的参数<code>CModuleParameter</code>指针
*/
virtual CModuleParameter* GetModuleParameter()
{
return parameter;
}
/**
* @brief 校验模块参数是否正确
* <p>此函数为纯虚函数,在封装模块时必须实现。
* @param[in] moduleCheckResult 错误信息对象,此对象中的错误信息会显示在界面
* @return 如果参数合法,返回真,否则返回假
*/
virtual bool validate(CModuleCheckResult& moduleCheckResult) = 0;
/**
* @brief 对此模块的数据进行归并操作
* <p>如果模块需要进行归并操作,则必须实现此函数
*/
virtual void Reduce()
{
}
/**
* @brief 进行初始化操作,在Run和Reduce方法之前自动被调用
*/
virtual void SetUp();
/**
* @brief 进行后续处理在Run和Reduce方法结束后自动被调用
*/
virtual void CleanUp();
/**
* @brief 进行数据历史注册
*/
virtual void WriteHistory() = 0;
/**
* @brief 设置模块运行时工作流的上下文.
*/
// void SetWorkflowContext(const WorkflowContext& context)
// {
// m_context = context;
// }
/**
* @brief 获取模块运行时工作流的上下文.
* @WARN 该上下文不允许被使用者释放,否则系统会崩溃
*/
// WorkflowContext GetWorkflowContext() const
// {
// return m_context;
// }
/**
* @brief 设置模块上下文信息
*/
// void SetModuleContext(const SModuleContext& context)
// {
// m_modulecontext = context;
// }
/**
* @brief 获取模块上下文信息, 不推荐模块开发人员使用此接口
*/
// SModuleContext GetModuleContext() const
// {
// return m_modulecontext;
// }
/**
* @brief 获取模块的元信息
* @return 模块的元信息<code>CModuleMetaData</code>指针
*/
CModuleMetaData* GetMetaData();
/**
* @brief 获取模块的类名
* @return 模块的类名
*/
virtual std::string GetClassName()
{
return std::string("CModule");
}
/**
*@brief 写日志
*@param logPriority 日志级别
*@param logmsg 日志内容
*/
// void WriteLog(pai::log::Priority logPriority, std::string logmsg);
/**
* @brief 该模块是否产出了项目数据
* @return 若该模块有生成数据则返回true否则返回false
*/
// bool HasProjectDataGenerated() const;
/**
* @brief 主要用于需要metaData信息的初始化模块开发请勿调用此方法
* 在ModuleManager创建模块的时候会自动被调用
*/
void Initialize();
/**
* @deprecated
* @brief 设置文件头到下游模块, 不推荐使用此方法推荐将文件头放到WorkflowContext中
* @param[in] fileHeader 文件头
*/
// void SetFileHeader(Any anything);
/**
* @brief 获取第一个输入口传递过来的文件头, 不推荐使用此方法推荐将文件头放到WorkflowContext中
* @return 第一个输入口传递过来的文件头
*/
template<typename T>
T GetFileHeader()
{
return this->GetFileHeader<T>(0);
};
/**
* @deprecated
* @brief 获取指定输入口传递过来的文件头不推荐使用此方法推荐将文件头放到WorkflowContext中
* @param[in] port 输入口
* @return 指定输入口传递过来的文件头
*/
template<typename T>
T GetFileHeader(const int& port)
{
return this->GetInputMetaData<T>(port,WORKLFOWCONTEXT_KEY_DATASETINFO);
};
/**
* @brief 设置元数据信息到下游模块
* @param[in] key 元数据关键字
* @param[in] anything 元数据内容
*/
// void SetOutputMetaData(const string& key, Any anything);
/**
* @brief 获取第一个输入口传递过来的元数据信息
* @param[in] key 元数据关键字
* @return 获取的元数据内容
*/
// template<typename T>
// T GetInputMetaData(const string& key)
// {
// return this->GetInputMetaData<T>(0,key);
// };
/**
* @brief 获取指定输入口传递过来的元数据信息
* @param port[in] 输入端口
* @param key [in] 元数据关键字
* @return 获取的元数据内容
*/
// template<typename T>
// T GetInputMetaData(const int& port,const string& key)
// {
// assert(port < m_nInputPortCount);
// CBuffer* buffer = this->GetInputBuffer(port);
// if(buffer==NULL)
// {
// std::stringstream ss;
// ss << this->m_modulecontext.module_runtime_name
// << " input port:" << port << "doesn't exist!"
// << std::endl;
// throw pai::error::invalid_argument(ss.str());
// }
// return buffer->GetMetaData<T>(key);
// };
/**
* @brief 是否有对应的输入元数据信息
* @param port[in] 输入端口
* @param key [in] 元数据关键字
* @exception pai::error::invalid_argument 端口的缓冲数组为空时抛出异常
* @return 是否有对应的输入元数据信息
*/
// bool HasInputMetaData(const int& port,const std::string& key);
/**
* @brief 第一个输入端口,是否有对应的输入元数据信息
* @param key [in] 元数据关键字
* @return 是否有对应的输入元数据信息
* @exception pai::error::invalid_argument 默认端口的缓冲数组为空时抛出异常
*/
// bool HasInputMetaData(const std::string& key);
/**
* @brief 在单作业多子作业的情况下获取前一个作业的HadoopJobId如果前面没有作业获取为空
* @return 单作业多子作业情况下,当前作业的前一个作业
*/
std::string GetPreJobId()const{
return this->preJobId;
};
/**
* @brief 在单作业多子作业的情况下设置前一个作业的HadoopJobId如果前面没有作业则不用设置
* @param[in] pJobId 作业的HadoopJobId
*/
void SetPreJobId(const std::string& pJobId){
this->preJobId = pJobId;
};
/**
* @biref 目前只能计算浅层的内存统计主要用于GUI排查内存增长问题
* @return 对象占用内存的大小
*/
unsigned long GetMemorySize();
protected:
/**
* @brief 获取hadoop计算的本地目录在map-site.xml中通过mapred.local.dir配置
*/
std::string GetLocalAttemptDir();
/**
*@brief 获取集群上当前作业的临时目录,主要用于存放模块生成的临时文件。形式:/workflow_tmp/jobId
*@param[in] jobid 作业的jobId
*@return 集群上作业的临时目录
*/
std::string GetClusterJobTempDir(const std::string& jobid);
/**
* @brief 获取集群上当前Task的临时目录路径主要用于存放模块生成的临时文件。形式/workflow_tmp/jobId/attempt_id
* @return 集群上当前作业的临时目录路径
*/
// std::string GetClusterCurrentJobAttemptDir();
/**
* @brief 在单作业多子作业的情况下,获取前一个作业在集群上的临时目录。形式为:/workflow_tmp/hadoopJobId
* @return 集群上前一个作业的临时目录路径,如果没有前一个作业则返回:/workflow_tmp
*/
std::string GetClusterPreJobAttemptDir();
/**
* @brief 清除作业的集群临时目录, 模块生成的临时文件在使用完后需要调用此接口清理
* @param[in] path 临时目录路径
* @return 临时目录清除成功则返回true否则返回false
*/
// bool CleanClusterJobTempDir(const std::string& path);
protected:
int m_nInputPortCount; /**< 模块输入端口数 */
int m_nOutputPortCount; /**< 模块输出端口数 */
CBuffer** inputBuffers; /**< 模块输入Buffer */
CBuffer** outputBuffers; /**< 模块输出Buffer */
int* m_usedInputPort; /**< 模块已经使用的输入端口 */
int* m_usedOutputPort; /**< 模块已经使用的输出端口 */
int m_usedInputPortCount; /**< 模块已经使用的输入端口数 */
int m_usedOutputPortCount; /**< 模块已经使用的输出端口数 */
CModuleParameter* parameter; /**< 模块参数 */
std::string m_strID; /**< 模块id */
// SModuleContext m_modulecontext; /**< 模块上下文,存储模块的运行时信息 */
CModuleMetaData m_metaData; /**< 模块的元数据信息 */
std::stringstream _pai_log_common_ss; /**< 日志信息,模块开发人员勿使用此变量 */
private:
// WorkflowContext m_context;
// pai::log::LoggerPtr m_logger;
std::string m_workflowClusterTmp;
std::string preJobId;
};
/*
* @def WRITE_MODULE_LOG(PRIORITY,MSG,VALUE)
* @brief 模块中写日志的宏
*/
#define WRITE_MODULE_LOG(PRIORITY,MSG,VALUE) _pai_log_common_ss << __FILE__ <<":"<<__LINE__ << ":" << __FUNCTION__ << "\t" << (MSG) << (VALUE) << std::endl;WriteLog((PRIORITY),_pai_log_common_ss.str());_pai_log_common_ss.str("");;
}
}
#endif