logplus/Workflow/WFEngine/Module/include/WorkflowContext-.h
2026-01-16 17:18:41 +08:00

454 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* @file WorkflowContext.h
* @brief 文件定义了与WorkflowContext相关的类
* @author Ben
* @date 2012-10-25
*/
#ifndef PAI_FRAME_MODULEAPI_WORKFLOW_CONTEXT_H
#define PAI_FRAME_MODULEAPI_WORKFLOW_CONTEXT_H
// #include "error.h"
#include "Turtle.h"
#include <string>
using std::string;
#include <map>
using std::map;
namespace pai
{
namespace workflow
{
class ModuleContainer;
}
}
// for test
class WorkflowContextTest;
namespace pai
{
namespace module
{
/**
* WORKFLOW CONTEXT KEY- DATASETINFO
*/
const std::string WORKLFOWCONTEXT_KEY_DATASETINFO = "DataSetInfo"; /**< WorkflowContext中文件头的key常量 */
const std::string KEY_DATABASE_ID = "database_id"; /**< WorkflowContext中database_id的key常量*/
const std::string KEY_CHAIN_ID = "submitJob.info.chainId"; /**< WorkflowContext中作业链id的key常量*/
const std::string KEY_HADOOP_JOB_ID = "pai.workflow.job.hadoop.id"; /**< WorkflowContext中hadoopId的key常量*/
const std::string KEY_JOB_DB_ID = "pai.workflow.job.db.id"; /**< WorkflowContext中工作流id的key常量*/
const std::string INPUT_GATHER_ORDER = "INPUT_GATHER_ORDER"; /**< WorkflowContext中输入道集顺序的key常量*/
const std::string STACK_FLAG="STACK_FLAG"; /**< WorkflowContext中Stack_flag的key常量*/
const std::string KEY_SURCON_MODULE_ISAPPLY = "pai.workflow.surcon.isapply"; /**< WorkflowContext中isapply的key常量*/
const std::string HADOOP_CONF_KEY_PRE_HADOOP_JOB_ID = "pai.workflow.pre.job.hadoop.id"; /**< WorkflowContext中上一个作业的hadoopId的key常量*/
const std::string RESETDATASETID = "resetDataSetID"; /**< WorkflowContext中resetDataSetID的key常量*/
const std::string CONTEXT_IDXSORTINGCODE="CONTEXT_IDXSORTINGCODE"; /**< WorkflowContext中idxSortingCode的key常量*/
const std::string KEY_MAPRED_SPLIT="split"; /**< WorkflowContext中切片的key常量*/
/**
* 用于多波模块中
*/
const std::string MC_FILEPATH_X="mc_filepath_x"; /**< WorkflowContext中三分量中x分量的文件*/
const std::string MC_FILEPATH_Y="mc_filepath_y"; /**< WorkflowContext中三分量中y分量的文件*/
const std::string MC_FILEPATH_Z="mc_filepath_z"; /**< WorkflowContext中三分量中z分量的文件*/
/**
* @class Placeholder
* @brief 占位符基类
*/
class Placeholder
{
public:
/**
* @brief 虚析构函数
*/
virtual ~Placeholder()
{
}
/**
* @brief 克隆函数
*/
virtual Placeholder * Clone() const = 0;
};
/**
* @class Holder
* @brief 占位符实现类使用模板方式实现具体存储类型由T决定
*/
template<typename T> class Holder: public Placeholder
{
public:
/**
* @brief 构造函数
*/
Holder(const T & t) :
m_held(t)
{
}
/**
* @brief 克隆函数
*/
virtual Placeholder * Clone() const
{
return new Holder(m_held);
}
/**
* @brief 获取<code>Holder</code>中的值
* @return <code>Holder</code>中的值
*/
T Get()
{
return m_held;
}
private:
Holder & operator=(const Holder&);//unimplemented
T m_held;
};
/**
* @class Any
* @brief 代表Key-Value中的Value类型在创建和获取value时使用模板方法获得具体类型
*/
class Any
{
public:
/**
* @brief 构造函数
*/
Any() :
m_content(NULL)
{
}
/**
* @brief 构造函数
*/
template<typename T> Any(const T & t) :
m_content(new Holder<T> (t))
{
}
/**
* @brief 拷贝构造函数
*/
Any(const Any& other) :
m_content(other.m_content ? other.m_content->Clone() : NULL)
{
}
/**
* @brief 析构函数
*/
~Any()
{
if (m_content != NULL)
{
delete m_content;
m_content = NULL;
}
}
/**
* @brief 用参数中的<code>Any</code>的值交换
* @return 交换后的对象
*/
Any & swap(Any & rhs)
{
std::swap(m_content, rhs.m_content);
return *this;
}
/**
* @brief 重载=运算符
*/
template<typename T> Any& operator=(const T & rhs)
{
anything(rhs).swap(*this);
return *this;
}
/**
* @brief 重载=运算符
*/
Any & operator=(Any rhs)
{
rhs.swap(*this);
return *this;
}
/**
* @brief 判断值是否为空
* @return 为空返回true否则返回false
*/
bool empty() const
{
return !m_content;
}
/**
* @brief 获取<code>Any</code>中的值
*/
template<typename T> T Get()
{
return (dynamic_cast<Holder<T>*> (m_content))->Get();
}
private:
Placeholder * m_content;
};
/**
* @class Context
* @brief 基于Key-Value的上线文容器
* <p>注意为了实现简单,易于维护,没有释放内部的资源,所以不要在一个进程中频繁申请此Context,否则会存在大量的内存泄漏
*/
class PAI_MODULE_EXPORT Context
{
public:
/**
* @brief 构造函数
*/
Context() : m_container(new map<string, Any>())
{
init ();
}
/**
* 拷贝构造函数
*/
Context(const Context& another) : m_container(another.m_container)
{
init ();
}
/**
* @brief 重载=运算符
*/
Context& operator=(const Context& rhs)
{
if (this == &rhs)
return *this;
pthread_mutex_lock(&m_Locker);
if (m_container != NULL)
{
delete m_container;
m_container = NULL;
}
m_container = rhs.m_container;
pthread_mutex_unlock(&m_Locker);
return *this;
}
/**
* TODO 存在内存泄漏
* @brief 析构方法
* @remark 本方法中没有释放内部的资源,这样做的原因是
* - 需要释放的时机通常为工作流结束后,此时操作系统会自动释放资源
* - 实现简单,易于维护
*/
virtual ~Context(){
pthread_mutex_destroy(&m_Locker);
}
/**
* @brief 向<code>Context</code>中放入键值对
* @param[in] key 键
* @param[in] anything 值
*/
void Put(const string& key, Any anything)
{
pthread_mutex_lock(&m_Locker);
(*m_container)[key] = anything;
pthread_mutex_unlock(&m_Locker);
}
/**
* @brief 从<code>Context</code>中获取指定key的值
* @param[in] key 键
* @return 键值为key的值
*/
template<typename T>
T Get(const string& key)
{
pthread_mutex_lock(&m_Locker);
if (m_container->count(key) == 0)
{
pthread_mutex_unlock(&m_Locker);
// throw pai::error::key_not_found_error("Key [" + key + "] not found.");
}
T rest =(*m_container)[key].Get<T> ();
pthread_mutex_unlock(&m_Locker);
return rest;
}
/**
* @brief 判断<code>Context</code>中是否包含指定key的键值对
* @param[in] key 键
* @return 包含则返回true否则返回false
*/
bool ContainsKey(const string& key)
{
pthread_mutex_lock(&m_Locker);
bool rest = false;
if (m_container->count(key) == 0)
rest = false;
else
rest = true;
pthread_mutex_unlock(&m_Locker);
return rest;
}
private:
void init ()
{
pthread_mutex_init(&m_Locker,NULL);
}
map<string, Any> * m_container;
pthread_mutex_t m_Locker;
};
/**
* @class BufferContext
* @brief 模块之间的Buffer上下文信息
*/
class BufferContext:public Context
{
public:
/**
* @brief 构造函数
*/
BufferContext() : Context()
{
}
/**
* @brief 拷贝构造函数
*/
BufferContext(const BufferContext& another) : Context(another)
{
}
/**
* @brief 重载=运算符
*/
BufferContext& operator=(const BufferContext& rhs)
{
if (this == &rhs)
return *this;
Context::operator=(rhs);
return *this;
}
/**
* @brief 析构函数
* @remark 本方法中没有释放内部的资源,这样做的原因是
* - 需要释放的时机通常为工作流结束后,此时操作系统会自动释放资源
* - 实现简单,易于维护
*/
virtual ~BufferContext(){}
};
/**
* @class WorkflowContext
* @brief 工作流全局上下文,整个工作流只有一份
* <p>工作流中的模块通过WorkflowContext获取全局性的信息以及共享信息.
*/
class WorkflowContext:public Context
{
private:
friend class ::WorkflowContextTest;//for test
friend class pai::workflow::ModuleContainer;
//module 所在的分支
std::vector<std::string> branchs;
/**
* need_branch == true模块所在的分支都存储一份模块中使用put操作期望这样的结果
* 在内存中以(branch-id)-key 》 anything保存多分
*
* need_branch == false整个工作流中保存一份非模块中使用期望这样的操作
* 在内存中以key 》 anything保存多分
*/
void Put(const string& key, Any anything, bool need_branch)
{
if(need_branch)
{
if(branchs.size() == 0){
throw /*pai::error::runtime_error*/(" Illegal use of context or program error!");
}
for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
{
std::string key_tmp = (*it) + "-" + key;
Context::Put(key_tmp, anything);
}
}
else
{
Context::Put(key, anything);
}
}
public:
/**
* @brief 构造函数
*/
WorkflowContext() : Context(), branchs()
{
}
/**
* @brief 拷贝构造函数
*/
WorkflowContext(const WorkflowContext& another) : Context(another), branchs(another.branchs)
{
}
/**
* @brief 向<code>WorkflowContext</code>中存放键值对
* @param[in] key 键
* @param[in] anything 值
*/
void Put(const string& key, Any anything)
{
Put(key, anything, true);
}
/**
* @brief 获取<code>WorkflowContext</code>中指定键的值
* <p>做get操作时首先通过常规方式获取如果没有则从分支中获取如果没有抛出异常
* 如果一个模块存在在多个分支中,选取第一个找到的
* @param[in] key 键
* @return 值
*/
// template<typename T>
// T Get(const string& key)
// {
// try{
// return Context::Get<T>(key);
// }catch(pai::error::key_not_found_error& e){
// for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
// {
// std::string key_tmp = (*it) + "-" + key;
// try{
// return Context::Get<T>(key_tmp);
// }catch(pai::error::key_not_found_error& e){
// continue;
// }
// }
// }
// throw pai::error::key_not_found_error("Key [" + key + "] not found.");
// }
/**
* @brief 判断<code>WorkflowContext</code>中是否包含指定key的键值对
* @param[in] key 键
* @return 包含则返回true否则返回false
*/
bool ContainsKey(const string& key)
{
bool res = Context::ContainsKey(key);
for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
{
const std::string key_tmp = (*it) + "-" + key;
res = (res || Context::ContainsKey(key_tmp));
}
return res;
}
/**
* @brief 重载=运算符
*/
WorkflowContext& operator=(const WorkflowContext& rhs)
{
if (this == &rhs)
return *this;
branchs = rhs.branchs;
Context::operator=(rhs);
return *this;
}
/**
* @brief 析构方法
* @remark 本方法中没有释放内部的资源,这样做的原因是
* - 需要释放的时机通常为工作流结束后,此时操作系统会自动释放资源
* - 实现简单,易于维护
*/
virtual ~WorkflowContext(){}
};
}
}
#endif