/* * @file WorkflowRunner.h * @brief 目前不能在WorkflowRunner中使用Logger(Log4cxx),因为目前这个阶段还没有设置Logger类型,后期把它重构成C++类,可以这么用。 * Created on: 2011-6-17 * Author: John.Huang */ #ifndef PAI_FRAME_WORKFLOWENGINE_WORKFLOWRUNNER_H #define PAI_FRAME_WORKFLOWENGINE_WORKFLOWRUNNER_H #include "Module.h" #include "MutiParallModule.h" #include "ModuleSelfSplitSupport.h" #include "Buffer.h" #include "pb_gen/splitInfo.pb.h" #include #include #include #include #include using namespace pai::module; typedef struct workflow_modify_input_filter workflow_modify_input_filter; typedef struct workflow_del_input_filter workflow_del_input_filter; typedef struct workflow_input_info workflow_input_info; typedef struct workflow_output_filter workflow_output_filter; typedef struct workflow_info workflow_info; /** * 修改Filter类型 */ enum ModifyFilterType { //没有过滤参数集合 NO_FILTER, //有过滤参数集合,但是没有排序关键字 NO_ORDER_KEY, //有过滤参数集合,有排序关键字 HAS_ORDER__KEY }; struct workflow_modify_input_filter { /* * ModifyFilter类型 * 0:没有filteritems * 先判断是否有node(id="files[*].traceselection"),否则需要添加;然后Add新的filteritem * 1:没有排序关键字的filteritem * 在node(id="files[*].traceselection")添加filteritem子节点 * 2:有排序关键字的filteritem * 重新设置设param_item_id(id=“files[*].traceselection[*]”)的CUSTOM节点的子节点(start,end) */ ModifyFilterType modify_filter_type; std::string param_item_id; std::string key; std::string start; std::string end; //increment default; //exclude default //And/Or default workflow_modify_input_filter(): modify_filter_type(), param_item_id(), key(), start(), end(){} void Print(int index = 0) { std::cout << "\t\tworkflow_modify_input_filter(" << index << "):" << std::endl; std::cout << "\t\t{" << std::endl; std::cout << "\t\t\tmodify_filter_type=" << static_cast(modify_filter_type) << std::endl; std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl; std::cout << "\t\t\tkey=" << key << std::endl; std::cout << "\t\t\tstart=" << start << std::endl; std::cout << "\t\t\tend=" << end << std::endl; std::cout << "\t\t}" << std::endl; }; bool equal(workflow_modify_input_filter& other) { if(param_item_id.compare(other.param_item_id) != 0) return false; if(key.compare(other.key) != 0) return false; if(start.compare(other.start) != 0) return false; if(end.compare(other.end) != 0) return false; if(modify_filter_type != other.modify_filter_type) return false; return true; } }; struct workflow_del_input_filter { std::string param_item_id; workflow_del_input_filter():param_item_id(){} void Print(int index = 0) { std::cout << "\t\tworkflow_del_input_filter(" << index << "):" << std::endl; std::cout << "\t\t{" << std::endl; std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl; std::cout << "\t\t}" << std::endl; }; bool equal(workflow_del_input_filter& other) { if(param_item_id.compare(other.param_item_id)!=0) return false; return true; }; }; struct workflow_input_info { //inputmodule runtime_id std::string module_context_id; //inputmodule 需要修改的filter参数项目 std::vector modify_filters; //inputmodule 需要删除的filter参数项目 std::vector del_filters; workflow_input_info():module_context_id(), modify_filters(), del_filters(){} void Print(int index = 0) { std::cout << "\tworkflow_input_info(" << index << "):" << std::endl; std::cout << "\t{" << std::endl; std::cout << "\t\tmodule_context_id=" << module_context_id << std::endl; std::cout << "\t\tmodify_filters:" << std::endl; std::cout << "\t\t[" << std::endl; size_t modify_size = modify_filters.size(); for(size_t i = 0;i < modify_size;i++) { workflow_modify_input_filter modify = modify_filters.at(i); modify.Print(static_cast(i)); } std::cout << "\t\t]" << std::endl; std::cout << "\t\tdel_filters:" << std::endl; std::cout << "\t\t[" << std::endl; size_t del_size = del_filters.size(); for(size_t i = 0;i < del_size;i++) { workflow_del_input_filter del = del_filters.at(i); del.Print(static_cast(i)); } std::cout << "\t\t]" << std::endl; std::cout << "\t}" << std::endl; }; bool equal(workflow_input_info& other) { if(module_context_id.compare(other.module_context_id)!=0) return false; if(modify_filters.size() != other.modify_filters.size()) return false; else if(modify_filters.size() > 0) for(size_t i = 0;i < modify_filters.size();i++) { workflow_modify_input_filter modify = modify_filters.at(i); if(!modify.equal(other.modify_filters.at(i))) return false; } if(del_filters.size() != other.del_filters.size()) return false; else if(del_filters.size() > 0) for(size_t i = 0;i < del_filters.size();i++) { workflow_del_input_filter del = del_filters.at(i); if(!del.equal(other.del_filters.at(i))) return false; } return true; } }; struct workflow_output_filter { std::string param_item_id; std::string filenames; workflow_output_filter():param_item_id(), filenames(){} void Print(int index = 0) { std::cout << "\t\tworkflow_output_filter(" << index << "):" << std::endl; std::cout << "\t\t{" << std::endl; std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl; std::cout << "\t\t\tfilenames=" << filenames << std::endl; std::cout << "\t\t}" << std::endl; }; bool equal(workflow_output_filter& other) { if(param_item_id.compare(other.param_item_id)!=0) return false; if(filenames.compare(other.filenames)!=0) return false; return true; } }; struct workflow_output_info { //outputmodule runtime_id std::string module_context_id; //outputmodule 需要重新设置过滤参数项 std::vector filters; workflow_output_info():module_context_id(), filters(){} void Print(int index = 0) { std::cout << "\tworkflow_output_info(" << index << "):" << std::endl; std::cout << "\t{" << std::endl; std::cout << "\t\tmodule_context_id=" << module_context_id << std::endl; std::cout << "\t\tfilters:" << std::endl; std::cout << "\t\t[" << std::endl; size_t size = filters.size(); for(size_t i = 0;i < size;i++) { workflow_output_filter filter = filters.at(i); filter.Print(static_cast(i)); } std::cout << "\t\t]" << std::endl; std::cout << "\t}" << std::endl; }; bool equal(workflow_output_info& other) { if(module_context_id.compare(other.module_context_id) != 0) return false; if(filters.size() != other.filters.size()) return false; else if(filters.size() > 0) for(size_t i=0; i inputs; //outputmodule 因为切片重新设置的参数项信息 std::vector outputs; //用于统计到回调函数 Callback cb_update_module_progress; workflow_info():id(), taskInfo(), inputs(), outputs(), cb_update_module_progress(){} void Print() { std::cout << "workflow_info:" << std::endl; std::cout << "{" << std::endl; std::cout << "\tid=" << id << std::endl; std::cout << "\tjob_id=" << taskInfo.job_id << std::endl; std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl; std::cout << "\thostname=" << taskInfo.hostname << std::endl; std::cout << "\tinputs:" << std::endl; std::cout << "\t[" << std::endl; size_t inputs_size = inputs.size(); for( size_t i = 0;i < inputs_size;i++) { workflow_input_info input = inputs.at(i); input.Print(static_cast(i)); } std::cout << "\t]" << std::endl; std::cout << "\toutputs:" << std::endl; std::cout << "\t[" << std::endl; size_t outputs_size = outputs.size(); for( size_t i = 0;i < outputs_size;i++) { workflow_output_info output = outputs.at(i); output.Print(static_cast(i)); } std::cout << "\t]" << std::endl; std::cout << "}" << std::endl; }; }; struct ReduceInfo { TaskInfo taskInfo; //outputmodule 因为切片重新设置的参数项信息 std::vector outputs; //prevision hadoop jobid std::string preJobId; std::map parameters; //用于统计到回调函数 Callback cb_update_module_progress; ReduceInfo():taskInfo(), outputs(), preJobId(), parameters(), cb_update_module_progress() {} void Print() { std::cout << "ReduceInfo:" << std::endl; std::cout << "{" << std::endl; std::cout << "\tjob_id=" << taskInfo.job_id << std::endl; std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl; std::cout << "\thostname=" << taskInfo.hostname << std::endl; std::cout << "\tpreJobId=" << preJobId << std::endl; std::cout << "\tparameters size=" << parameters.size() << std::endl; for(std::map::iterator iter = parameters.begin();iter!=parameters.end();++iter){ std::cout << "\toparam[" << iter->first <<"]=" << iter->second << std::endl; } std::cout << "\toutputs size=" << outputs.size() << std::endl; std::cout << "\t{" << std::endl; size_t outputs_size = outputs.size(); for( size_t i = 0;i < outputs_size;i++) { workflow_output_info output = outputs.at(i); output.Print(static_cast(i)); } std::cout << "\t}" << std::endl; std::cout << "}" << std::endl; }; }; struct JobInfo { //outputmodule 因为切片重新设置的参数项信息 std::vector outputs; bool isReduceWorkflow; TaskInfo taskInfo; JobInfo():outputs(), isReduceWorkflow(), taskInfo(){} void Print() { std::cout << "JobInfo:" << std::endl; std::cout << "{" << std::endl; std::cout << "\tisReduceWorkflow=" << isReduceWorkflow << std::endl; std::cout << "\tjob_id=" << taskInfo.job_id << std::endl; std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl; std::cout << "\thostname=" << taskInfo.hostname << std::endl; std::cout << "\toutputs size=" << outputs.size() << std::endl; std::cout << "\t{" << std::endl; for(size_t i=0; i(i)); } std::cout << "\t}" << std::endl; std::cout << "}" << std::endl; }; bool equal(JobInfo& other) { if(isReduceWorkflow != other.isReduceWorkflow) return false; if(taskInfo.job_id.compare(other.taskInfo.job_id) != 0) return false; if(taskInfo.attempt_id.compare(other.taskInfo.attempt_id) != 0) return false; if(taskInfo.hostname.compare(other.taskInfo.hostname) != 0) return false; //compare outputs if(outputs.size() != other.outputs.size()) return false; for(size_t i=0; im_workflow = workflow; }; workflow_info GetWorkflowInfo() const{ return this->m_workflow; }; Split* GetModuleSelfSplit() const{ return this->m_split; } void SetModuleSelfSplit(Split* split){ m_split = split; }; void SetReduceInfo(const ReduceInfo& reduce){ this->m_reduce = reduce; }; ReduceInfo GetReduceInfo() const{ return this->m_reduce; } void SetConfiguration(const std::map& config){ this->m_config = config; }; inline void SetSplitMessage(google::protobuf::Message* s){ split_msg_=s; } /** * 运行Map工作流 */ void RunWorkflow(); /** * 运行Reduce工作流 */ void RunReduce(); void BeforeJob(const JobInfo& job); void AfterJob(const JobInfo& job); /** * abortJob: * @param[job] the job's info * @param[isFailed] if the job is failed or killed */ void AbortJob(const JobInfo& job,const bool isFailed); void AbortTask(const JobInfo& job); private: bool IsFirstStartModule(CModule* module); bool IsOutputModule(CModule* module){ if(module->GetClassName() == "COutputModule") return true; else return false; }; bool IsUseModuleSelfSplit(){ if(m_split!=NULL) return true; else return false; }; WorkflowRunner(const WorkflowRunner & runner); WorkflowRunner & operator=(const WorkflowRunner & runner); private: workflow_info m_workflow; ReduceInfo m_reduce; std::map m_config; Split* m_split; std::set firstStartedModules; //it's created by split factory,so need destry it. google::protobuf::Message* split_msg_; }; } } #endif