442 lines
15 KiB
C++
442 lines
15 KiB
C++
/**
|
||
* @file Module.h
|
||
* @brief 文件定义了与模块相关的类及接口
|
||
* @author dev
|
||
* @date 2011-4-28
|
||
*/
|
||
|
||
#ifndef PAI_FRAME_MODULEAPI_MODULE_H
|
||
#define PAI_FRAME_MODULEAPI_MODULE_H
|
||
#include "Turtle.h"
|
||
// #include "Log.h"
|
||
#include "Buffer.h"
|
||
#include "ModuleCheckResult.h"
|
||
#include "ModuleFactory.h"
|
||
#include "ModuleMetaData.h"
|
||
#include "ModuleParameter.h"
|
||
// #include "WorkflowContext.h"
|
||
// #include "DataSetInfo.h"
|
||
|
||
#include <string>
|
||
#include <vector>
|
||
|
||
#include <assert.h>
|
||
namespace pai
|
||
{
|
||
namespace module
|
||
{
|
||
|
||
// using namespace pai::ios::seis;
|
||
|
||
/**
|
||
* @brief 模块运行状态
|
||
*/
|
||
enum STATUS
|
||
{
|
||
WAIT_INPUT, /**< 等待buffer中的有可读的数据 */
|
||
OVER /**< 模块运行完毕 */
|
||
};
|
||
|
||
/**
|
||
* @brief 模块并行的阶段
|
||
*/
|
||
enum Phase{
|
||
Map, /**< 模块在hadoop的map阶段运行 */
|
||
Reduce /**< 模块在hadoop的reduce阶段运行 */
|
||
};
|
||
|
||
const std::string CONST_PROP_WORKFLOW_CLUSTER_TMP = "WORKFLOW_CLUSTER_TMP"; /**< 集群临时目录常量 */
|
||
const std::string CONST_PROP_ATTEMPTASK_DIR = "mapred.local.dir"; /**< hadoop Task的本地临时目录常量 */
|
||
const std::string CONST_FILEPATH_SEPARATOR = "/"; /**< 文件分隔符常量 */
|
||
|
||
/**
|
||
* @class CModule
|
||
* @brief 此类是模块开发规范的基类。所有的算法模块都必须继承此类。
|
||
* 目前,此类所定义的接口随着验证的进展,将会不断地进行完善和重构。<br>
|
||
* 在开发具体的子模块时,需要做如下的几件事情:
|
||
* <li>1. 设计并实现子模块对应的类,它继承<code>CModule</code>类</li>
|
||
* <li>2. 实现Run()和validate()两个纯虚函数</li>
|
||
* <li>3. 实现子模块对应的参数类</li>
|
||
* <li>4. 编写模块配置文件,其中包括模块相关的元信息、参数信息</li>
|
||
* 模块开发结束后,需要将此模块打包为动态链接库
|
||
*
|
||
* <p>模块测试:
|
||
* UT中创建Module的时候,最好使用ModuleManager中的工厂方法(CreateModuleByClassName)
|
||
* 否则创建的模块没有模块元数据,以及对应的输入、输出缓存
|
||
*/
|
||
class PAI_MODULE_EXPORT CModule
|
||
{
|
||
private:
|
||
CModule& operator=(const CModule& moudle);
|
||
public:
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
CModule();
|
||
/**
|
||
* @brief 拷贝构造函数
|
||
*/
|
||
CModule(const CModule& srcModule);
|
||
//welllog 特殊模块使用
|
||
virtual CModule* Clone(){return NULL;};
|
||
/**
|
||
* @brief 虚析构函数
|
||
*/
|
||
virtual ~CModule();
|
||
/**
|
||
* @brief 获取模块的输入缓冲buffer
|
||
* @param[in] port 该模块输入端口标号。第一个端口号为0,第二个端口号为1,以此类推
|
||
* @return 该模块输入缓冲<code>CBuffer</code>的指针
|
||
*/
|
||
virtual CBuffer* GetInputBuffer(int port)
|
||
{
|
||
return inputBuffers[port];
|
||
}
|
||
/**
|
||
* @brief 获取模块的输出缓冲
|
||
* @param[in] port 该模块输出端口标号。第一个端口号为0,第二个端口号为1,以此类推
|
||
* @return 该模块输入缓冲<code>CBuffer</code>的指针
|
||
*/
|
||
virtual CBuffer* GetOutputBuffer(int port)
|
||
{
|
||
return outputBuffers[port];
|
||
}
|
||
|
||
/**
|
||
* @brief 设置模块对应输入端口的buffer
|
||
* @param[in] port 模块输入缓冲端口号
|
||
* @param[in] buffer 该输入端口对应的缓冲区<code>CBuffer</code>指针
|
||
*/
|
||
// void SetInputBuffer(int port, CBuffer* buffer);
|
||
|
||
/**
|
||
* @brief 设置模块对应输出端口
|
||
* @param[in] port 模块输出缓冲端口号
|
||
* @param[in] buffer 该输出端口对应的缓冲区<code>CBuffer</code>指针
|
||
*/
|
||
void SetOutputBuffer(int port, CBuffer* buffer);
|
||
// /**
|
||
// * 设置模块的上下文信息
|
||
// * @param context 模块的上下文信息
|
||
// */
|
||
// void SetModuleContext(SModuleContext& context);
|
||
//
|
||
// WorkflowContext& SetWorkflowContext() const;
|
||
|
||
/**
|
||
* @brief 模块的核心计算函数。在作业的map阶段执行
|
||
* <p>此函数应该实现如下内容:
|
||
* <li>1. 从输入缓冲中获取数据</li>
|
||
* <li>2. 判断数据是否满足要求,如果满足,则进行计算,计算技术后返回WAIT_INPUT</li>
|
||
* <li>3. 如果不满足,则直接返回WAIT_INPUT</li>
|
||
* <li>4. 如果缓冲区的数据已经到达数据尾部,则返回OVER</li>
|
||
* <li>5. 计算结果数据要存放到输出缓冲中</li>
|
||
* @return 返回模块的状态
|
||
*/
|
||
virtual STATUS Run() = 0;
|
||
/**
|
||
* @brief 模块在作业运行之前执行的方法,整个作业就执行一次,目前仅供inputmodule/outputmodule来override,其他模块不得调用,不得override
|
||
*/
|
||
virtual void RunBeforeJob()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 模块在作业运行之后执行的方法,整个作业就执行一次,目前仅供inputmodule/outputmodule来override,其他模块不得调用,不得override
|
||
*/
|
||
virtual void RunAfterJob()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 模块在作业失败或者killed之后执行的方法,整个作业就执行一次
|
||
* @param[isFailed] 作业是否失败,true为失败,false为被杀死
|
||
*/
|
||
virtual void RunAfterJobAbort(bool /*isFailed*/)
|
||
{
|
||
}
|
||
/**
|
||
* @brief 模块在task失败或者killed之后执行的方法,每個失敗或者killed的task都會执行一次
|
||
*/
|
||
virtual void RunAfterTaskAbort()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 模块在作業提交前運行一次的方法,整个作业就执行一次, 暂时留做扩展用,请勿使用
|
||
*/
|
||
virtual void RunBeforSubmit()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 设置模块参数
|
||
* @param[in] _parammmmeter 模块所对应的参数<code>CModuleParameter</code>对象指针
|
||
*/
|
||
virtual void SetParameters(CModuleParameter* _parameter)
|
||
{
|
||
parameter = _parameter;
|
||
}
|
||
/**
|
||
* @brief 获取模块参数
|
||
* @return 模块所对应的参数<code>CModuleParameter</code>指针
|
||
*/
|
||
virtual CModuleParameter* GetModuleParameter()
|
||
{
|
||
return parameter;
|
||
}
|
||
/**
|
||
* @brief 校验模块参数是否正确
|
||
* <p>此函数为纯虚函数,在封装模块时必须实现。
|
||
* @param[in] moduleCheckResult 错误信息对象,此对象中的错误信息会显示在界面
|
||
* @return 如果参数合法,返回真,否则返回假
|
||
*/
|
||
virtual bool validate(CModuleCheckResult& moduleCheckResult) = 0;
|
||
/**
|
||
* @brief 对此模块的数据进行归并操作
|
||
* <p>如果模块需要进行归并操作,则必须实现此函数
|
||
*/
|
||
virtual void Reduce()
|
||
{
|
||
}
|
||
|
||
/**
|
||
* @brief 进行初始化操作,在Run和Reduce方法之前自动被调用
|
||
*/
|
||
virtual void SetUp();
|
||
|
||
/**
|
||
* @brief 进行后续处理,在Run和Reduce方法结束后自动被调用
|
||
*/
|
||
virtual void CleanUp();
|
||
|
||
/**
|
||
* @brief 进行数据历史注册
|
||
*/
|
||
virtual void WriteHistory() = 0;
|
||
|
||
/**
|
||
* @brief 设置模块运行时工作流的上下文.
|
||
*/
|
||
// void SetWorkflowContext(const WorkflowContext& context)
|
||
// {
|
||
// m_context = context;
|
||
// }
|
||
|
||
/**
|
||
* @brief 获取模块运行时工作流的上下文.
|
||
* @WARN 该上下文不允许被使用者释放,否则系统会崩溃
|
||
*/
|
||
// WorkflowContext GetWorkflowContext() const
|
||
// {
|
||
// return m_context;
|
||
// }
|
||
|
||
/**
|
||
* @brief 设置模块上下文信息
|
||
*/
|
||
// void SetModuleContext(const SModuleContext& context)
|
||
// {
|
||
// m_modulecontext = context;
|
||
// }
|
||
/**
|
||
* @brief 获取模块上下文信息, 不推荐模块开发人员使用此接口
|
||
*/
|
||
// SModuleContext GetModuleContext() const
|
||
// {
|
||
// return m_modulecontext;
|
||
// }
|
||
/**
|
||
* @brief 获取模块的元信息
|
||
* @return 模块的元信息<code>CModuleMetaData</code>指针
|
||
*/
|
||
CModuleMetaData* GetMetaData();
|
||
|
||
/**
|
||
* @brief 获取模块的类名
|
||
* @return 模块的类名
|
||
*/
|
||
virtual std::string GetClassName()
|
||
{
|
||
return std::string("CModule");
|
||
}
|
||
|
||
/**
|
||
*@brief 写日志
|
||
*@param logPriority 日志级别
|
||
*@param logmsg 日志内容
|
||
*/
|
||
// void WriteLog(pai::log::Priority logPriority, std::string logmsg);
|
||
/**
|
||
* @brief 该模块是否产出了项目数据
|
||
* @return 若该模块有生成数据则返回true,否则返回false
|
||
*/
|
||
// bool HasProjectDataGenerated() const;
|
||
|
||
/**
|
||
* @brief 主要用于需要metaData信息的初始化,模块开发请勿调用此方法
|
||
* 在ModuleManager创建模块的时候会自动被调用
|
||
*/
|
||
void Initialize();
|
||
/**
|
||
* @deprecated
|
||
* @brief 设置文件头到下游模块, 不推荐使用此方法,推荐将文件头放到WorkflowContext中
|
||
* @param[in] fileHeader 文件头
|
||
*/
|
||
// void SetFileHeader(Any anything);
|
||
/**
|
||
* @brief 获取第一个输入口传递过来的文件头, 不推荐使用此方法,推荐将文件头放到WorkflowContext中
|
||
* @return 第一个输入口传递过来的文件头
|
||
*/
|
||
template<typename T>
|
||
T GetFileHeader()
|
||
{
|
||
return this->GetFileHeader<T>(0);
|
||
};
|
||
/**
|
||
* @deprecated
|
||
* @brief 获取指定输入口传递过来的文件头,不推荐使用此方法,推荐将文件头放到WorkflowContext中
|
||
* @param[in] port 输入口
|
||
* @return 指定输入口传递过来的文件头
|
||
*/
|
||
template<typename T>
|
||
T GetFileHeader(const int& port)
|
||
{
|
||
return this->GetInputMetaData<T>(port,WORKLFOWCONTEXT_KEY_DATASETINFO);
|
||
};
|
||
/**
|
||
* @brief 设置元数据信息到下游模块
|
||
* @param[in] key 元数据关键字
|
||
* @param[in] anything 元数据内容
|
||
*/
|
||
// void SetOutputMetaData(const string& key, Any anything);
|
||
/**
|
||
* @brief 获取第一个输入口传递过来的元数据信息
|
||
* @param[in] key 元数据关键字
|
||
* @return 获取的元数据内容
|
||
*/
|
||
// template<typename T>
|
||
// T GetInputMetaData(const string& key)
|
||
// {
|
||
// return this->GetInputMetaData<T>(0,key);
|
||
// };
|
||
|
||
/**
|
||
* @brief 获取指定输入口传递过来的元数据信息
|
||
* @param port[in] 输入端口
|
||
* @param key [in] 元数据关键字
|
||
* @return 获取的元数据内容
|
||
*/
|
||
// template<typename T>
|
||
// T GetInputMetaData(const int& port,const string& key)
|
||
// {
|
||
// assert(port < m_nInputPortCount);
|
||
// CBuffer* buffer = this->GetInputBuffer(port);
|
||
// if(buffer==NULL)
|
||
// {
|
||
// std::stringstream ss;
|
||
// ss << this->m_modulecontext.module_runtime_name
|
||
// << " input port:" << port << "doesn't exist!"
|
||
// << std::endl;
|
||
// throw pai::error::invalid_argument(ss.str());
|
||
// }
|
||
// return buffer->GetMetaData<T>(key);
|
||
// };
|
||
/**
|
||
* @brief 是否有对应的输入元数据信息
|
||
* @param port[in] 输入端口
|
||
* @param key [in] 元数据关键字
|
||
* @exception pai::error::invalid_argument 端口的缓冲数组为空时抛出异常
|
||
* @return 是否有对应的输入元数据信息
|
||
*/
|
||
// bool HasInputMetaData(const int& port,const std::string& key);
|
||
/**
|
||
* @brief 第一个输入端口,是否有对应的输入元数据信息
|
||
* @param key [in] 元数据关键字
|
||
* @return 是否有对应的输入元数据信息
|
||
* @exception pai::error::invalid_argument 默认端口的缓冲数组为空时抛出异常
|
||
*/
|
||
// bool HasInputMetaData(const std::string& key);
|
||
|
||
|
||
/**
|
||
* @brief 在单作业多子作业的情况下,获取前一个作业的HadoopJobId,如果前面没有作业获取为空
|
||
* @return 单作业多子作业情况下,当前作业的前一个作业
|
||
*/
|
||
std::string GetPreJobId()const{
|
||
return this->preJobId;
|
||
};
|
||
/**
|
||
* @brief 在单作业多子作业的情况下,设置前一个作业的HadoopJobId,如果前面没有作业则不用设置
|
||
* @param[in] pJobId 作业的HadoopJobId
|
||
*/
|
||
void SetPreJobId(const std::string& pJobId){
|
||
this->preJobId = pJobId;
|
||
};
|
||
|
||
/**
|
||
* @biref 目前只能计算浅层的内存统计,主要用于GUI排查内存增长问题
|
||
* @return 对象占用内存的大小
|
||
*/
|
||
unsigned long GetMemorySize();
|
||
|
||
protected:
|
||
|
||
/**
|
||
* @brief 获取hadoop计算的本地目录(在map-site.xml中通过mapred.local.dir配置)
|
||
*/
|
||
std::string GetLocalAttemptDir();
|
||
/**
|
||
*@brief 获取集群上当前作业的临时目录,主要用于存放模块生成的临时文件。形式:/workflow_tmp/jobId
|
||
*@param[in] jobid 作业的jobId
|
||
*@return 集群上作业的临时目录
|
||
*/
|
||
std::string GetClusterJobTempDir(const std::string& jobid);
|
||
/**
|
||
* @brief 获取集群上当前Task的临时目录路径,主要用于存放模块生成的临时文件。形式:/workflow_tmp/jobId/attempt_id
|
||
* @return 集群上当前作业的临时目录路径
|
||
*/
|
||
// std::string GetClusterCurrentJobAttemptDir();
|
||
/**
|
||
* @brief 在单作业多子作业的情况下,获取前一个作业在集群上的临时目录。形式为:/workflow_tmp/hadoopJobId
|
||
* @return 集群上前一个作业的临时目录路径,如果没有前一个作业则返回:/workflow_tmp
|
||
*/
|
||
std::string GetClusterPreJobAttemptDir();
|
||
/**
|
||
* @brief 清除作业的集群临时目录, 模块生成的临时文件在使用完后需要调用此接口清理
|
||
* @param[in] path 临时目录路径
|
||
* @return 临时目录清除成功则返回true,否则返回false
|
||
*/
|
||
// bool CleanClusterJobTempDir(const std::string& path);
|
||
|
||
protected:
|
||
int m_nInputPortCount; /**< 模块输入端口数 */
|
||
int m_nOutputPortCount; /**< 模块输出端口数 */
|
||
CBuffer** inputBuffers; /**< 模块输入Buffer */
|
||
CBuffer** outputBuffers; /**< 模块输出Buffer */
|
||
|
||
int* m_usedInputPort; /**< 模块已经使用的输入端口 */
|
||
int* m_usedOutputPort; /**< 模块已经使用的输出端口 */
|
||
int m_usedInputPortCount; /**< 模块已经使用的输入端口数 */
|
||
int m_usedOutputPortCount; /**< 模块已经使用的输出端口数 */
|
||
|
||
CModuleParameter* parameter; /**< 模块参数 */
|
||
std::string m_strID; /**< 模块id */
|
||
|
||
// SModuleContext m_modulecontext; /**< 模块上下文,存储模块的运行时信息 */
|
||
CModuleMetaData m_metaData; /**< 模块的元数据信息 */
|
||
std::stringstream _pai_log_common_ss; /**< 日志信息,模块开发人员勿使用此变量 */
|
||
private:
|
||
// WorkflowContext m_context;
|
||
// pai::log::LoggerPtr m_logger;
|
||
|
||
std::string m_workflowClusterTmp;
|
||
std::string preJobId;
|
||
};
|
||
|
||
/*
|
||
* @def WRITE_MODULE_LOG(PRIORITY,MSG,VALUE)
|
||
* @brief 模块中写日志的宏
|
||
*/
|
||
#define WRITE_MODULE_LOG(PRIORITY,MSG,VALUE) _pai_log_common_ss << __FILE__ <<":"<<__LINE__ << ":" << __FUNCTION__ << "\t" << (MSG) << (VALUE) << std::endl;WriteLog((PRIORITY),_pai_log_common_ss.str());_pai_log_common_ss.str("");;
|
||
|
||
}
|
||
}
|
||
#endif
|