logplus/Workflow/WFEngine/WorkflowEngine/include/WorkflowRunner.h
2026-01-16 17:18:41 +08:00

470 lines
14 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* @file WorkflowRunner.h
* @brief 目前不能在WorkflowRunner中使用Logger(Log4cxx),因为目前这个阶段还没有设置Logger类型后期把它重构成C++类,可以这么用。
* Created on: 2011-6-17
* Author: John.Huang
*/
#ifndef PAI_FRAME_WORKFLOWENGINE_WORKFLOWRUNNER_H
#define PAI_FRAME_WORKFLOWENGINE_WORKFLOWRUNNER_H
#include "Module.h"
#include "MutiParallModule.h"
#include "ModuleSelfSplitSupport.h"
#include "Buffer.h"
#include "pb_gen/splitInfo.pb.h"
#include <set>
#include <vector>
#include <string>
#include <iostream>
#include <map>
using namespace pai::module;
typedef struct workflow_modify_input_filter workflow_modify_input_filter;
typedef struct workflow_del_input_filter workflow_del_input_filter;
typedef struct workflow_input_info workflow_input_info;
typedef struct workflow_output_filter workflow_output_filter;
typedef struct workflow_info workflow_info;
/**
* 修改Filter类型
*/
enum ModifyFilterType
{
//没有过滤参数集合
NO_FILTER,
//有过滤参数集合,但是没有排序关键字
NO_ORDER_KEY,
//有过滤参数集合,有排序关键字
HAS_ORDER__KEY
};
struct workflow_modify_input_filter
{
/*
* ModifyFilter类型
* 0:没有filteritems
* 先判断是否有node(id="files[*].traceselection"),否则需要添加;然后Add新的filteritem
* 1:没有排序关键字的filteritem
* 在node(id="files[*].traceselection")添加filteritem子节点
* 2:有排序关键字的filteritem
* 重新设置设param_item_id(id=“files[*].traceselection[*]”)的CUSTOM节点的子节点(start,end)
*/
ModifyFilterType modify_filter_type;
std::string param_item_id;
std::string key;
std::string start;
std::string end;
//increment default;
//exclude default
//And/Or default
workflow_modify_input_filter(): modify_filter_type(), param_item_id(), key(), start(), end(){}
void Print(int index = 0)
{
std::cout << "\t\tworkflow_modify_input_filter(" << index << "):" << std::endl;
std::cout << "\t\t{" << std::endl;
std::cout << "\t\t\tmodify_filter_type=" << static_cast<int>(modify_filter_type) << std::endl;
std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl;
std::cout << "\t\t\tkey=" << key << std::endl;
std::cout << "\t\t\tstart=" << start << std::endl;
std::cout << "\t\t\tend=" << end << std::endl;
std::cout << "\t\t}" << std::endl;
};
bool equal(workflow_modify_input_filter& other)
{
if(param_item_id.compare(other.param_item_id) != 0)
return false;
if(key.compare(other.key) != 0)
return false;
if(start.compare(other.start) != 0)
return false;
if(end.compare(other.end) != 0)
return false;
if(modify_filter_type != other.modify_filter_type)
return false;
return true;
}
};
struct workflow_del_input_filter
{
std::string param_item_id;
workflow_del_input_filter():param_item_id(){}
void Print(int index = 0)
{
std::cout << "\t\tworkflow_del_input_filter(" << index << "):" << std::endl;
std::cout << "\t\t{" << std::endl;
std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl;
std::cout << "\t\t}" << std::endl;
};
bool equal(workflow_del_input_filter& other)
{
if(param_item_id.compare(other.param_item_id)!=0)
return false;
return true;
};
};
struct workflow_input_info
{
//inputmodule runtime_id
std::string module_context_id;
//inputmodule 需要修改的filter参数项目
std::vector<workflow_modify_input_filter> modify_filters;
//inputmodule 需要删除的filter参数项目
std::vector<workflow_del_input_filter> del_filters;
workflow_input_info():module_context_id(), modify_filters(), del_filters(){}
void Print(int index = 0)
{
std::cout << "\tworkflow_input_info(" << index << "):" << std::endl;
std::cout << "\t{" << std::endl;
std::cout << "\t\tmodule_context_id=" << module_context_id << std::endl;
std::cout << "\t\tmodify_filters:" << std::endl;
std::cout << "\t\t[" << std::endl;
size_t modify_size = modify_filters.size();
for(size_t i = 0;i < modify_size;i++)
{
workflow_modify_input_filter modify = modify_filters.at(i);
modify.Print(static_cast<int>(i));
}
std::cout << "\t\t]" << std::endl;
std::cout << "\t\tdel_filters:" << std::endl;
std::cout << "\t\t[" << std::endl;
size_t del_size = del_filters.size();
for(size_t i = 0;i < del_size;i++)
{
workflow_del_input_filter del = del_filters.at(i);
del.Print(static_cast<int>(i));
}
std::cout << "\t\t]" << std::endl;
std::cout << "\t}" << std::endl;
};
bool equal(workflow_input_info& other)
{
if(module_context_id.compare(other.module_context_id)!=0)
return false;
if(modify_filters.size() != other.modify_filters.size())
return false;
else if(modify_filters.size() > 0)
for(size_t i = 0;i < modify_filters.size();i++)
{
workflow_modify_input_filter modify = modify_filters.at(i);
if(!modify.equal(other.modify_filters.at(i)))
return false;
}
if(del_filters.size() != other.del_filters.size())
return false;
else if(del_filters.size() > 0)
for(size_t i = 0;i < del_filters.size();i++)
{
workflow_del_input_filter del = del_filters.at(i);
if(!del.equal(other.del_filters.at(i)))
return false;
}
return true;
}
};
struct workflow_output_filter
{
std::string param_item_id;
std::string filenames;
workflow_output_filter():param_item_id(), filenames(){}
void Print(int index = 0)
{
std::cout << "\t\tworkflow_output_filter(" << index << "):" << std::endl;
std::cout << "\t\t{" << std::endl;
std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl;
std::cout << "\t\t\tfilenames=" << filenames << std::endl;
std::cout << "\t\t}" << std::endl;
};
bool equal(workflow_output_filter& other)
{
if(param_item_id.compare(other.param_item_id)!=0)
return false;
if(filenames.compare(other.filenames)!=0)
return false;
return true;
}
};
struct workflow_output_info
{
//outputmodule runtime_id
std::string module_context_id;
//outputmodule 需要重新设置过滤参数项
std::vector<workflow_output_filter> filters;
workflow_output_info():module_context_id(), filters(){}
void Print(int index = 0)
{
std::cout << "\tworkflow_output_info(" << index << "):" << std::endl;
std::cout << "\t{" << std::endl;
std::cout << "\t\tmodule_context_id=" << module_context_id << std::endl;
std::cout << "\t\tfilters:" << std::endl;
std::cout << "\t\t[" << std::endl;
size_t size = filters.size();
for(size_t i = 0;i < size;i++)
{
workflow_output_filter filter = filters.at(i);
filter.Print(static_cast<int>(i));
}
std::cout << "\t\t]" << std::endl;
std::cout << "\t}" << std::endl;
};
bool equal(workflow_output_info& other)
{
if(module_context_id.compare(other.module_context_id) != 0)
return false;
if(filters.size() != other.filters.size())
return false;
else if(filters.size() > 0)
for(size_t i=0; i<filters.size(); i++)
{
workflow_output_filter filter = filters.at(i);
if(!filter.equal(other.filters.at(i)))
return false;
}
return true;
}
};
struct workflow_info {
std::string id;
TaskInfo taskInfo;
//inputmodule 因为切片重新设置的参数项信息
std::vector<workflow_input_info> inputs;
//outputmodule 因为切片重新设置的参数项信息
std::vector<workflow_output_info> outputs;
//用于统计到回调函数
Callback cb_update_module_progress;
workflow_info():id(), taskInfo(), inputs(), outputs(), cb_update_module_progress(){}
void Print()
{
std::cout << "workflow_info:" << std::endl;
std::cout << "{" << std::endl;
std::cout << "\tid=" << id << std::endl;
std::cout << "\tjob_id=" << taskInfo.job_id << std::endl;
std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl;
std::cout << "\thostname=" << taskInfo.hostname << std::endl;
std::cout << "\tinputs:" << std::endl;
std::cout << "\t[" << std::endl;
size_t inputs_size = inputs.size();
for( size_t i = 0;i < inputs_size;i++)
{
workflow_input_info input = inputs.at(i);
input.Print(static_cast<int>(i));
}
std::cout << "\t]" << std::endl;
std::cout << "\toutputs:" << std::endl;
std::cout << "\t[" << std::endl;
size_t outputs_size = outputs.size();
for( size_t i = 0;i < outputs_size;i++)
{
workflow_output_info output = outputs.at(i);
output.Print(static_cast<int>(i));
}
std::cout << "\t]" << std::endl;
std::cout << "}" << std::endl;
};
};
struct ReduceInfo
{
TaskInfo taskInfo;
//outputmodule 因为切片重新设置的参数项信息
std::vector<workflow_output_info> outputs;
//prevision hadoop jobid
std::string preJobId;
std::map<std::string,std::string> parameters;
//用于统计到回调函数
Callback cb_update_module_progress;
ReduceInfo():taskInfo(), outputs(), preJobId(), parameters(), cb_update_module_progress()
{}
void Print()
{
std::cout << "ReduceInfo:" << std::endl;
std::cout << "{" << std::endl;
std::cout << "\tjob_id=" << taskInfo.job_id << std::endl;
std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl;
std::cout << "\thostname=" << taskInfo.hostname << std::endl;
std::cout << "\tpreJobId=" << preJobId << std::endl;
std::cout << "\tparameters size=" << parameters.size() << std::endl;
for(std::map<std::string,std::string>::iterator iter = parameters.begin();iter!=parameters.end();++iter){
std::cout << "\toparam[" << iter->first <<"]=" << iter->second << std::endl;
}
std::cout << "\toutputs size=" << outputs.size() << std::endl;
std::cout << "\t{" << std::endl;
size_t outputs_size = outputs.size();
for( size_t i = 0;i < outputs_size;i++)
{
workflow_output_info output = outputs.at(i);
output.Print(static_cast<int>(i));
}
std::cout << "\t}" << std::endl;
std::cout << "}" << std::endl;
};
};
struct JobInfo
{
//outputmodule 因为切片重新设置的参数项信息
std::vector<workflow_output_info> outputs;
bool isReduceWorkflow;
TaskInfo taskInfo;
JobInfo():outputs(), isReduceWorkflow(), taskInfo(){}
void Print()
{
std::cout << "JobInfo:" << std::endl;
std::cout << "{" << std::endl;
std::cout << "\tisReduceWorkflow=" << isReduceWorkflow << std::endl;
std::cout << "\tjob_id=" << taskInfo.job_id << std::endl;
std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl;
std::cout << "\thostname=" << taskInfo.hostname << std::endl;
std::cout << "\toutputs size=" << outputs.size() << std::endl;
std::cout << "\t{" << std::endl;
for(size_t i=0; i<outputs.size(); i++)
{
outputs.at(i).Print(static_cast<int>(i));
}
std::cout << "\t}" << std::endl;
std::cout << "}" << std::endl;
};
bool equal(JobInfo& other)
{
if(isReduceWorkflow != other.isReduceWorkflow)
return false;
if(taskInfo.job_id.compare(other.taskInfo.job_id) != 0)
return false;
if(taskInfo.attempt_id.compare(other.taskInfo.attempt_id) != 0)
return false;
if(taskInfo.hostname.compare(other.taskInfo.hostname) != 0)
return false;
//compare outputs
if(outputs.size() != other.outputs.size())
return false;
for(size_t i=0; i<outputs.size(); i++)
{
if(!outputs.at(i).equal(other.outputs.at(i)))
{
return false;
}
}
return true;
}
};
/**
* 在所有子工作流运行前,执行此方法。
*/
void BeforeJob(const JobInfo& workflow);
/**
* 在所有子工作流运行后,执行此方法。
*/
void AfterJob(const JobInfo& workflow);
/**
* 运行工作流Reduce
*/
void RunReduce(const ReduceInfo& reduce);
/**
* abortJob
*/
void AbortJob(const JobInfo& job,const bool isFailed);
/**
* abortTask
*/
void AbortTask(const JobInfo& job);
//****************************************************************
// below refactor part of workflowrunner
//****************************************************************
namespace pai {
namespace workflow {
/**
* @brief workflow执行器
* 目前主要对于工作流各种阶段为Job提供执行服务
*/
class WorkflowRunner
{
public:
WorkflowRunner():m_workflow(),m_reduce(),m_config(),m_split(NULL),firstStartedModules(),split_msg_(NULL){
};
void SetWorkflowInfo(const workflow_info& workflow){
this->m_workflow = workflow;
};
workflow_info GetWorkflowInfo() const{
return this->m_workflow;
};
Split* GetModuleSelfSplit() const{
return this->m_split;
}
void SetModuleSelfSplit(Split* split){
m_split = split;
};
void SetReduceInfo(const ReduceInfo& reduce){
this->m_reduce = reduce;
};
ReduceInfo GetReduceInfo() const{
return this->m_reduce;
}
void SetConfiguration(const std::map<std::string,std::string>& config){
this->m_config = config;
};
inline void SetSplitMessage(google::protobuf::Message* s){
split_msg_=s;
}
/**
* 运行Map工作流
*/
void RunWorkflow();
/**
* 运行Reduce工作流
*/
void RunReduce();
void BeforeJob(const JobInfo& job);
void AfterJob(const JobInfo& job);
/**
* abortJob:
* @param[job] the job's info
* @param[isFailed] if the job is failed or killed
*/
void AbortJob(const JobInfo& job,const bool isFailed);
void AbortTask(const JobInfo& job);
private:
bool IsFirstStartModule(CModule* module);
bool IsOutputModule(CModule* module){
if(module->GetClassName() == "COutputModule")
return true;
else
return false;
};
bool IsUseModuleSelfSplit(){
if(m_split!=NULL)
return true;
else
return false;
};
WorkflowRunner(const WorkflowRunner & runner);
WorkflowRunner & operator=(const WorkflowRunner & runner);
private:
workflow_info m_workflow;
ReduceInfo m_reduce;
std::map<std::string,std::string> m_config;
Split* m_split;
std::set<std::string> firstStartedModules;
//it's created by split factory,so need destry it.
google::protobuf::Message* split_msg_;
};
}
}
#endif