454 lines
12 KiB
C++
454 lines
12 KiB
C++
/**
|
||
* @file WorkflowContext.h
|
||
* @brief 文件定义了与WorkflowContext相关的类
|
||
* @author Ben
|
||
* @date 2012-10-25
|
||
*/
|
||
|
||
#ifndef PAI_FRAME_MODULEAPI_WORKFLOW_CONTEXT_H
|
||
#define PAI_FRAME_MODULEAPI_WORKFLOW_CONTEXT_H
|
||
|
||
// #include "error.h"
|
||
#include "Turtle.h"
|
||
#include <string>
|
||
using std::string;
|
||
|
||
#include <map>
|
||
using std::map;
|
||
|
||
namespace pai
|
||
{
|
||
namespace workflow
|
||
{
|
||
class ModuleContainer;
|
||
}
|
||
}
|
||
|
||
// for test
|
||
class WorkflowContextTest;
|
||
namespace pai
|
||
{
|
||
namespace module
|
||
{
|
||
/**
|
||
* WORKFLOW CONTEXT KEY- DATASETINFO
|
||
*/
|
||
const std::string WORKLFOWCONTEXT_KEY_DATASETINFO = "DataSetInfo"; /**< WorkflowContext中文件头的key常量 */
|
||
const std::string KEY_DATABASE_ID = "database_id"; /**< WorkflowContext中database_id的key常量*/
|
||
const std::string KEY_CHAIN_ID = "submitJob.info.chainId"; /**< WorkflowContext中作业链id的key常量*/
|
||
const std::string KEY_HADOOP_JOB_ID = "pai.workflow.job.hadoop.id"; /**< WorkflowContext中hadoopId的key常量*/
|
||
const std::string KEY_JOB_DB_ID = "pai.workflow.job.db.id"; /**< WorkflowContext中工作流id的key常量*/
|
||
const std::string INPUT_GATHER_ORDER = "INPUT_GATHER_ORDER"; /**< WorkflowContext中输入道集顺序的key常量*/
|
||
const std::string STACK_FLAG="STACK_FLAG"; /**< WorkflowContext中Stack_flag的key常量*/
|
||
const std::string KEY_SURCON_MODULE_ISAPPLY = "pai.workflow.surcon.isapply"; /**< WorkflowContext中isapply的key常量*/
|
||
const std::string HADOOP_CONF_KEY_PRE_HADOOP_JOB_ID = "pai.workflow.pre.job.hadoop.id"; /**< WorkflowContext中上一个作业的hadoopId的key常量*/
|
||
|
||
const std::string RESETDATASETID = "resetDataSetID"; /**< WorkflowContext中resetDataSetID的key常量*/
|
||
const std::string CONTEXT_IDXSORTINGCODE="CONTEXT_IDXSORTINGCODE"; /**< WorkflowContext中idxSortingCode的key常量*/
|
||
const std::string KEY_MAPRED_SPLIT="split"; /**< WorkflowContext中切片的key常量*/
|
||
/**
|
||
* 用于多波模块中
|
||
*/
|
||
const std::string MC_FILEPATH_X="mc_filepath_x"; /**< WorkflowContext中三分量中x分量的文件*/
|
||
const std::string MC_FILEPATH_Y="mc_filepath_y"; /**< WorkflowContext中三分量中y分量的文件*/
|
||
const std::string MC_FILEPATH_Z="mc_filepath_z"; /**< WorkflowContext中三分量中z分量的文件*/
|
||
/**
|
||
* @class Placeholder
|
||
* @brief 占位符基类
|
||
*/
|
||
class Placeholder
|
||
{
|
||
public:
|
||
/**
|
||
* @brief 虚析构函数
|
||
*/
|
||
virtual ~Placeholder()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 克隆函数
|
||
*/
|
||
virtual Placeholder * Clone() const = 0;
|
||
};
|
||
/**
|
||
* @class Holder
|
||
* @brief 占位符实现类,使用模板方式实现,具体存储类型由T决定
|
||
*/
|
||
template<typename T> class Holder: public Placeholder
|
||
{
|
||
public:
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
Holder(const T & t) :
|
||
m_held(t)
|
||
{
|
||
}
|
||
/**
|
||
* @brief 克隆函数
|
||
*/
|
||
virtual Placeholder * Clone() const
|
||
{
|
||
return new Holder(m_held);
|
||
}
|
||
/**
|
||
* @brief 获取<code>Holder</code>中的值
|
||
* @return <code>Holder</code>中的值
|
||
*/
|
||
T Get()
|
||
{
|
||
return m_held;
|
||
}
|
||
private:
|
||
Holder & operator=(const Holder&);//unimplemented
|
||
T m_held;
|
||
};
|
||
/**
|
||
* @class Any
|
||
* @brief 代表Key-Value中的Value类型,在创建和获取value时,使用模板方法获得具体类型
|
||
*/
|
||
class Any
|
||
{
|
||
public:
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
Any() :
|
||
m_content(NULL)
|
||
{
|
||
}
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
template<typename T> Any(const T & t) :
|
||
m_content(new Holder<T> (t))
|
||
{
|
||
}
|
||
/**
|
||
* @brief 拷贝构造函数
|
||
*/
|
||
Any(const Any& other) :
|
||
m_content(other.m_content ? other.m_content->Clone() : NULL)
|
||
{
|
||
}
|
||
/**
|
||
* @brief 析构函数
|
||
*/
|
||
~Any()
|
||
{
|
||
if (m_content != NULL)
|
||
{
|
||
delete m_content;
|
||
m_content = NULL;
|
||
}
|
||
}
|
||
/**
|
||
* @brief 用参数中的<code>Any</code>的值交换
|
||
* @return 交换后的对象
|
||
*/
|
||
Any & swap(Any & rhs)
|
||
{
|
||
std::swap(m_content, rhs.m_content);
|
||
return *this;
|
||
}
|
||
/**
|
||
* @brief 重载=运算符
|
||
*/
|
||
template<typename T> Any& operator=(const T & rhs)
|
||
{
|
||
anything(rhs).swap(*this);
|
||
return *this;
|
||
}
|
||
/**
|
||
* @brief 重载=运算符
|
||
*/
|
||
Any & operator=(Any rhs)
|
||
{
|
||
rhs.swap(*this);
|
||
return *this;
|
||
}
|
||
/**
|
||
* @brief 判断值是否为空
|
||
* @return 为空返回true,否则返回false
|
||
*/
|
||
bool empty() const
|
||
{
|
||
return !m_content;
|
||
}
|
||
/**
|
||
* @brief 获取<code>Any</code>中的值
|
||
*/
|
||
template<typename T> T Get()
|
||
{
|
||
return (dynamic_cast<Holder<T>*> (m_content))->Get();
|
||
}
|
||
private:
|
||
Placeholder * m_content;
|
||
};
|
||
|
||
/**
|
||
* @class Context
|
||
* @brief 基于Key-Value的上线文容器
|
||
* <p>注意为了实现简单,易于维护,没有释放内部的资源,所以不要在一个进程中频繁申请此Context,否则会存在大量的内存泄漏
|
||
*/
|
||
class PAI_MODULE_EXPORT Context
|
||
{
|
||
|
||
public:
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
Context() : m_container(new map<string, Any>())
|
||
{
|
||
init ();
|
||
}
|
||
/**
|
||
* 拷贝构造函数
|
||
*/
|
||
Context(const Context& another) : m_container(another.m_container)
|
||
{
|
||
init ();
|
||
}
|
||
/**
|
||
* @brief 重载=运算符
|
||
*/
|
||
Context& operator=(const Context& rhs)
|
||
{
|
||
if (this == &rhs)
|
||
return *this;
|
||
pthread_mutex_lock(&m_Locker);
|
||
if (m_container != NULL)
|
||
{
|
||
delete m_container;
|
||
m_container = NULL;
|
||
}
|
||
m_container = rhs.m_container;
|
||
pthread_mutex_unlock(&m_Locker);
|
||
return *this;
|
||
}
|
||
/**
|
||
* TODO 存在内存泄漏
|
||
* @brief 析构方法
|
||
* @remark 本方法中没有释放内部的资源,这样做的原因是
|
||
* - 需要释放的时机通常为工作流结束后,此时操作系统会自动释放资源
|
||
* - 实现简单,易于维护
|
||
*/
|
||
virtual ~Context(){
|
||
pthread_mutex_destroy(&m_Locker);
|
||
}
|
||
/**
|
||
* @brief 向<code>Context</code>中放入键值对
|
||
* @param[in] key 键
|
||
* @param[in] anything 值
|
||
*/
|
||
void Put(const string& key, Any anything)
|
||
{
|
||
pthread_mutex_lock(&m_Locker);
|
||
(*m_container)[key] = anything;
|
||
pthread_mutex_unlock(&m_Locker);
|
||
}
|
||
/**
|
||
* @brief 从<code>Context</code>中获取指定key的值
|
||
* @param[in] key 键
|
||
* @return 键值为key的值
|
||
*/
|
||
template<typename T>
|
||
T Get(const string& key)
|
||
{
|
||
pthread_mutex_lock(&m_Locker);
|
||
if (m_container->count(key) == 0)
|
||
{
|
||
pthread_mutex_unlock(&m_Locker);
|
||
// throw pai::error::key_not_found_error("Key [" + key + "] not found.");
|
||
}
|
||
T rest =(*m_container)[key].Get<T> ();
|
||
pthread_mutex_unlock(&m_Locker);
|
||
return rest;
|
||
}
|
||
/**
|
||
* @brief 判断<code>Context</code>中是否包含指定key的键值对
|
||
* @param[in] key 键
|
||
* @return 包含则返回true,否则返回false
|
||
*/
|
||
bool ContainsKey(const string& key)
|
||
{
|
||
pthread_mutex_lock(&m_Locker);
|
||
bool rest = false;
|
||
if (m_container->count(key) == 0)
|
||
rest = false;
|
||
else
|
||
rest = true;
|
||
pthread_mutex_unlock(&m_Locker);
|
||
return rest;
|
||
}
|
||
|
||
private:
|
||
void init ()
|
||
{
|
||
pthread_mutex_init(&m_Locker,NULL);
|
||
}
|
||
map<string, Any> * m_container;
|
||
pthread_mutex_t m_Locker;
|
||
};
|
||
|
||
/**
|
||
* @class BufferContext
|
||
* @brief 模块之间的Buffer上下文信息
|
||
*/
|
||
class BufferContext:public Context
|
||
{
|
||
public:
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
BufferContext() : Context()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 拷贝构造函数
|
||
*/
|
||
BufferContext(const BufferContext& another) : Context(another)
|
||
{
|
||
}
|
||
/**
|
||
* @brief 重载=运算符
|
||
*/
|
||
BufferContext& operator=(const BufferContext& rhs)
|
||
{
|
||
if (this == &rhs)
|
||
return *this;
|
||
Context::operator=(rhs);
|
||
return *this;
|
||
}
|
||
/**
|
||
* @brief 析构函数
|
||
* @remark 本方法中没有释放内部的资源,这样做的原因是
|
||
* - 需要释放的时机通常为工作流结束后,此时操作系统会自动释放资源
|
||
* - 实现简单,易于维护
|
||
*/
|
||
virtual ~BufferContext(){}
|
||
};
|
||
|
||
/**
|
||
* @class WorkflowContext
|
||
* @brief 工作流全局上下文,整个工作流只有一份
|
||
* <p>工作流中的模块通过WorkflowContext获取全局性的信息以及共享信息.
|
||
*/
|
||
class WorkflowContext:public Context
|
||
{
|
||
private:
|
||
friend class ::WorkflowContextTest;//for test
|
||
friend class pai::workflow::ModuleContainer;
|
||
//module 所在的分支
|
||
std::vector<std::string> branchs;
|
||
/**
|
||
* need_branch == true,模块所在的分支都存储一份,模块中使用put操作期望这样的结果
|
||
* 在内存中以(branch-id)-key 》 anything保存多分
|
||
*
|
||
* need_branch == false,整个工作流中保存一份,非模块中使用期望这样的操作
|
||
* 在内存中以key 》 anything保存多分
|
||
*/
|
||
void Put(const string& key, Any anything, bool need_branch)
|
||
{
|
||
if(need_branch)
|
||
{
|
||
if(branchs.size() == 0){
|
||
throw /*pai::error::runtime_error*/(" Illegal use of context or program error!");
|
||
}
|
||
for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
|
||
{
|
||
std::string key_tmp = (*it) + "-" + key;
|
||
Context::Put(key_tmp, anything);
|
||
}
|
||
}
|
||
else
|
||
{
|
||
Context::Put(key, anything);
|
||
}
|
||
}
|
||
public:
|
||
/**
|
||
* @brief 构造函数
|
||
*/
|
||
WorkflowContext() : Context(), branchs()
|
||
{
|
||
}
|
||
/**
|
||
* @brief 拷贝构造函数
|
||
*/
|
||
WorkflowContext(const WorkflowContext& another) : Context(another), branchs(another.branchs)
|
||
{
|
||
}
|
||
/**
|
||
* @brief 向<code>WorkflowContext</code>中存放键值对
|
||
* @param[in] key 键
|
||
* @param[in] anything 值
|
||
*/
|
||
void Put(const string& key, Any anything)
|
||
{
|
||
Put(key, anything, true);
|
||
}
|
||
/**
|
||
* @brief 获取<code>WorkflowContext</code>中指定键的值
|
||
* <p>做get操作时,首先通过常规方式获取,如果没有则从分支中获取,如果没有,抛出异常
|
||
* 如果一个模块存在在多个分支中,选取第一个找到的
|
||
* @param[in] key 键
|
||
* @return 值
|
||
*/
|
||
// template<typename T>
|
||
// T Get(const string& key)
|
||
// {
|
||
// try{
|
||
// return Context::Get<T>(key);
|
||
// }catch(pai::error::key_not_found_error& e){
|
||
// for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
|
||
// {
|
||
// std::string key_tmp = (*it) + "-" + key;
|
||
// try{
|
||
// return Context::Get<T>(key_tmp);
|
||
// }catch(pai::error::key_not_found_error& e){
|
||
// continue;
|
||
// }
|
||
// }
|
||
// }
|
||
// throw pai::error::key_not_found_error("Key [" + key + "] not found.");
|
||
// }
|
||
/**
|
||
* @brief 判断<code>WorkflowContext</code>中是否包含指定key的键值对
|
||
* @param[in] key 键
|
||
* @return 包含则返回true,否则返回false
|
||
*/
|
||
bool ContainsKey(const string& key)
|
||
{
|
||
bool res = Context::ContainsKey(key);
|
||
for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
|
||
{
|
||
const std::string key_tmp = (*it) + "-" + key;
|
||
res = (res || Context::ContainsKey(key_tmp));
|
||
}
|
||
return res;
|
||
}
|
||
/**
|
||
* @brief 重载=运算符
|
||
*/
|
||
WorkflowContext& operator=(const WorkflowContext& rhs)
|
||
{
|
||
if (this == &rhs)
|
||
return *this;
|
||
branchs = rhs.branchs;
|
||
Context::operator=(rhs);
|
||
return *this;
|
||
}
|
||
/**
|
||
* @brief 析构方法
|
||
* @remark 本方法中没有释放内部的资源,这样做的原因是
|
||
* - 需要释放的时机通常为工作流结束后,此时操作系统会自动释放资源
|
||
* - 实现简单,易于维护
|
||
*/
|
||
virtual ~WorkflowContext(){}
|
||
};
|
||
}
|
||
}
|
||
|
||
#endif
|