/* * Buffer.cpp * * Created on: May 25, 2011 * Author: dev */ #include "Buffer.h" #include "Module.h" #include "error.h" #include "Utils.h" #include "Configure.h" #include "Turtle.h" #include #include #include #include "MultiwaveTrace.h" using namespace std; namespace pai { namespace module { CBuffer::CBuffer(int size) : m_elements(NULL), m_size(unsigned(size)), m_elementCount(0), m_cbUpdateModuleProgress(NULL), m_processIndex(), m_moduleProcess(), m_prevGetTime(), m_prevSetTime(), m_target(BMT_MODULE), m_context(){ Init(); } CBuffer::CBuffer(int size, Callback cbUpdateModuleProgress) : m_elements(NULL), m_size(unsigned(size)), m_elementCount(0), m_cbUpdateModuleProgress(cbUpdateModuleProgress), m_processIndex(),m_moduleProcess(),m_prevGetTime(),m_prevSetTime(),m_target(BMT_MODULE),m_context(){ Init(); } CBuffer::CBuffer(int size, BUFFER_MANAGER_TARGET target, Callback cbUpdateModuleProgress) : m_elements(NULL), m_size(unsigned(size)), m_elementCount(0), m_cbUpdateModuleProgress(cbUpdateModuleProgress), m_processIndex(), m_moduleProcess(), m_prevGetTime(), m_prevSetTime(), m_target(target), m_context(){ Init(); } void CBuffer::Block() { timespec tm; tm.tv_sec = 0; tm.tv_nsec = 1000 * 1000 * 50; //clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tm, NULL); GetTurtleAPI()->Sleep(tm.tv_nsec); } void CBuffer::Init() { if (m_size <= 0 || m_size > MaxBufferSize) { m_size = MaxBufferSize; } pai::conf::CConfigure conf; const string confSize = conf.GetValueByKey("MODULE_BUFFER_SIZE"); //TODO:LOGGER CONNECT HDFS ERROR,SO COMMENTS BELOW LINE(IN BUILDENGINE) //pai::log::Info(_FLF("PAICONF MODULE_BUFFER_SIZE=" + confSize)); if(confSize!="") m_size = unsigned(pai::utils::CUtils::StrToInt(confSize)); m_elements = new CBufferElement*[m_size]; for (unsigned i = 0; i < m_size; i++) { m_elements[i] = NULL; } m_elementCount = 0; time(&m_prevGetTime); time(&m_prevSetTime); } CBuffer::~CBuffer() { if (!m_elements) { return; } //在Buffer的数据流转中,由使用module去统一清理CBufferElement(Trace),CBuffer不在负责析够BufferElement for (unsigned int i = 0; i < m_size; ++i) { if (m_elements[i]) { m_elements[i] = NULL; } } //m_elements = (CBufferElement**) NULL; m_elements = reinterpret_cast(NULL) ; } void CBuffer::SetNextElement(CBufferElement* element, ModuleId moduleId) { string caller = "..."; if (moduleId != NULL && this->m_target == BMT_MODULE) { CModule* module = reinterpret_cast(moduleId); caller = module->GetModuleContext().module_runtime_name; if (element == NULL) { cerr << caller << " SetNextElement element is null" << endl; } } while (!Writable(caller)) { //cout << caller << " waiting to write\n"; Block(); } //多波数据处理逻辑,如果不是多波数据此方法内部不做任何处理 SetMultiTraceToElement(moduleId,element); unsigned long idx = m_elementCount % m_size; //cerr << caller << " idx= " << idx << " size: " << size << " elementCount:" << elementCount << endl; if (m_elements[idx]) { delete m_elements[idx]; m_elements[idx] = NULL; } m_elements[idx] = element; // cout << "set m_elements[" << idx << "]=" << m_elements[idx] << ", addr(element)=" << element << "\n"; ++m_elementCount; //should only increment when the data is already in the buffer //cerr << caller <<" is writed one buffer blocked"<GetElementType() == TRACE) { /** * 记录本次操作与上次操作到时间间隔 */ time_t now; time(&now); m_moduleProcess[moduleId].tracenum = 1; m_moduleProcess[moduleId].time = int(now - m_prevSetTime); m_cbUpdateModuleProgress(m_moduleProcess[moduleId]); m_prevSetTime = now; } } void CBuffer::SetNextElements(std::vector gather, ModuleId moduleId) { if (gather.size() <= 0) throw pai::error::invalid_argument("Invalid Gather.Gather is empty!"); if (!gather[gather.size() - 1]->IsGatherSplitMarker()&&!gather[gather.size() - 1]->IsEndMarker()) throw pai::error::invalid_argument("Invalid Gather.Gather must end with a GatherSplitMarker,Last Gather must end with a EndMarker!"); if (gather.size() > m_size) throw pai::error::invalid_argument("Invalid Gather.Gather length exceed max buffer size!"); unsigned long current = m_processIndex[moduleId]; if (current + gather.size() > m_size) throw pai::error::invalid_argument("Invalid Gather.Gather length exceed buffer left size!"); for (unsigned int i = 0; i < gather.size(); i++) { SetNextElement(gather[i], moduleId); } } /** * @brief 保存一个buffer元素(即Trace),保存完成buffer元素之后,根据模块开发者指定的type类型保存不同的分隔符。 * type为TRACE,则不保存分隔符; * type为GATHER_ENDMARKER,保存道结束分隔符; * type为LINE_ENDMARKER,保存线结束分隔符; * type为DATA_ENDMARKER,保存数据流结束分割符 * * @param[in] element 要保存的元素 * @param[in] moduleID 当前模块id * @param[in] type TRACE-->正常道数据 GATHER_ENDMARKER-->道结束 LINE_ENDMARKER-->线结束 DATA_ENDMARKER-->数据流结束 */ void CBuffer::SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type) { SetNextElement(element,moduleId); if(type!=TRACE) SetNextElement(CBufferElement::CreateElement(type),moduleId); } std::vector CBuffer::GetNextElements(ModuleId moduleID) { unsigned long gatherend; while (!NextElementsIsReady(moduleID, gatherend)) { Block(); } unsigned long current = m_processIndex[moduleID]; vector gather; gather.reserve(gatherend - current + 1); for (unsigned long i = current; i <= gatherend; i++) { gather.push_back(GetNextElement(moduleID)); } return gather; } //要求Inputmodule必须传递道集分割的标志位下来 bool CBuffer::NextElementsIsReady(ModuleId moduleID, unsigned long & nextGatherEndIndex) { unsigned long current = m_processIndex[moduleID]; for (unsigned long i = current; i < m_size; i++) { CBufferElement* currentelement = m_elements[i]; if (currentelement->IsGatherSplitMarker()||currentelement->isEnd) { nextGatherEndIndex = i; return true; } } return false; } void CBuffer::AddReferenceModule(ModuleId module) { //cout << module << " module" << endl; if (module == NULL) { cerr << " AddReferenceModule moduleId is null!" << endl; } if (m_processIndex.count(module) > 0) { cerr << "moduleId[" << module << "] AddReferenceModule is exist!" << endl; } m_processIndex[module] = 0; //the value of the index indicates the element to be process NOW } /** * This method should check if an element is allowed to be returned before * returning the BufferElement, the thread will be blocked until the element * is allowed to be returned, e.g. all modules need the BufferElement has * processed the data in the element */ CBufferElement* CBuffer::GetNextElement(ModuleId moduleId) { //sleep(1); string caller = "..."; if (moduleId != NULL && this->m_target == BMT_MODULE) { CModule* module = reinterpret_cast(moduleId); caller = module->GetModuleContext().module_runtime_name; } while (!Readable(caller, moduleId)) { //cout << caller << " Waiting to read\n"; Block(); } unsigned long current = m_processIndex[moduleId]; //cout << caller << " current=" << current << ", element count=" << m_elementCount << "\n"; unsigned long idx = current % m_size; CBufferElement* element = m_elements[idx]; //cout << caller << " element 1= " << element << "\n"; //cout << "length = " << element->GetDataLength() << "\n"; if (element->IsEndMarker()) { //cerr << caller <<" is read finished"<Reset(); delete element; m_elements[idx] = NULL; } //cout << caller << " new element = " << newElement << "\n"; element = newElement; m_processIndex[moduleId] = current + 1; //should only increase when the element is read //cout << caller << " new process index = " << m_processIndex[moduleId] << "\n"; //cerr << caller <<" is readed one buffer blocked"<GetElementType() == TRACE) { /** * 记录本次操作与上次操作到时间间隔 */ time_t now; time(&now); m_moduleProcess[moduleId].tracenum = 1; m_moduleProcess[moduleId].time = int(now - m_prevGetTime); m_cbUpdateModuleProgress(m_moduleProcess[moduleId]); m_prevGetTime = now; } //cout << caller << " safter update progress\n"; return element; } CBufferElement* CBuffer::GetTraceNextElement(ModuleId moduleID) { CBufferElement* element = GetNextElement(moduleID); // std::cout << "GetTraceNextElement first element address:" << element << ",type:" << element->GetElementType() << std::endl; while(!element->IsTraceElement() && !element->IsEndMarker()) { element = GetNextElement(moduleID); // std::cout << "GetTraceNextElement skip_marker element address:" << element << ",type:" << element->GetElementType() << std::endl; } if(element->IsTraceElement()) { // std::cout << "GetTraceNextElement find trace element address:" << element << ",type:" << element->GetElementType() << std::endl; return element; }else if(element->IsEndMarker()){ // std::cout << "GetTraceNextElement find->end endelement address:" << element << ",type:" << element->GetElementType() << std::endl; return NULL; } return NULL; } CBufferElement* CBuffer::PreReadyNextElement(ModuleId moduleID) { string caller = "..."; if (moduleID != NULL && this->m_target == BMT_MODULE) { CModule* module = reinterpret_cast(moduleID); caller = module->GetModuleContext().module_runtime_name; } while (!Readable(caller, moduleID)) { //cout << caller << " Waiting to read\n"; Block(); } unsigned long current = m_processIndex[moduleID]; unsigned long idx = current % m_size; // std::cout << "PreReadyNextElement index=" << idx << ",element address:" << m_elements[idx] << ",type:" << m_elements[idx]->GetElementType() << std::endl; return m_elements[idx]; } void CBuffer::SkipContinuousMarkerElement(ModuleId moduleID, ElementType &type) { CBufferElement* element = PreReadyNextElement(moduleID); // std::cout << "SkipContinuousMarkerElement PreReadyNextElement element address:" << element << ",type:" << element->GetElementType() << std::endl; if(element->IsTraceElement()){ return; }else if(element->IsEndMarker()){ type = DATA_ENDMARKER; SkipCurrentElement(element,moduleID); }else{ // std::cout << "SkipContinuousMarkerElement more marker,preMarker:" << type << ",currentMarker:" << element->GetElementType() << std::endl; if(element->GetElementType() > type) type = element->GetElementType(); // std::cout << "SkipContinuousMarkerElement changed marker:" << type << std::endl; SkipCurrentElement(element,moduleID); SkipContinuousMarkerElement(moduleID,type); } } void CBuffer::SkipCurrentElement(CBufferElement* /*element*/,ModuleId moduleID) { m_processIndex[moduleID]++; } void CBuffer::PreReadyNextElementType(ModuleId moduleID, ElementType &type) { CBufferElement* element = PreReadyNextElement(moduleID); // std::cout << "PreReadyNextElementType element address:" << element << ",type:" << element->GetElementType() << std::endl; if(element->IsTraceElement()){ type = TRACE; }else if(element->IsEndMarker()){ type = DATA_ENDMARKER; SkipCurrentElement(element,moduleID); }else{ type = element->GetElementType(); SkipCurrentElement(element,moduleID); SkipContinuousMarkerElement(moduleID,type); } } CBufferElement* CBuffer::GetNextElement(ModuleId moduleID, ElementType &type) { //step1:读取Trace,如果读到DATA_END_MARKER还读取不到数据,返回NULL CBufferElement* element = GetTraceNextElement(moduleID); if(element==NULL) { // std::cout << "GetNextElement don't trace element." << std::endl; type = DATA_ENDMARKER; return NULL; } // std::cout << "GetNextElement find element address:" << element << ",type:" << element->GetElementType() << std::endl; //step2:读取下一个Element的类型 PreReadyNextElementType(moduleID,type); // std::cout << "GetNextElement find nextelement type:" << type << std::endl; //多波数据处理逻辑,如果不是多波数据此方法内部不做任何处理 GetMultiTraceToElement(moduleID,element); return element; } bool CBuffer::Writable(const string& /*caller*/) { unsigned long max = 0; //set to max value unsigned long min = ULONG_MAX; //set to max value ModuleProcessIndex::iterator end = m_processIndex.end(); for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) { if (it->second < min) min = it->second; if (it->second > max) max = it->second; } //cerr << caller << " " << __FUNCTION__ << " min:" << min << ",max:" << max << ",element count:" << elementCount << endl; if (m_processIndex.empty()) { //there is no module using the buffer as input return true; } return (m_elementCount >= max) && (m_elementCount - min < m_size); } //huangjun modify 2011-10-08 // 目前算法:如果当前模块要读取的Buff块index是所有模块中最小的index,那么判断是它最后一个读取此Buff块的Module(ShallowCopy). // 目前问题:但是如果有两个模块都在读取最小index的Buff块,那么将出现错误。 // 正确处理:需要判断当前模块是[唯一]读取最小index的Buff块才ShallCopy. CopyType CBuffer::CalculateCopyType(const string& /*caller*/, ModuleId moduleId) { unsigned long max = 0; unsigned long min = ULONG_MAX; ModuleProcessIndex::iterator end = m_processIndex.end(); for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) { if (it->second < min) min = it->second; if (it->second > max) max = it->second; } //读取最小内存块的模块个数 int min_count = 0; for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) { if (it->second == min) min_count++; } //cout << caller << " " << __FUNCTION__ << " min = " << min << "max = " << max << endl; if (m_processIndex.size() == 1) { //there is only one module using the buffer as input return ShallowCopy; } return (m_processIndex[moduleId] == min && (min_count < 2) && min != max) ? ShallowCopy : DeepCopy; } /** * this method must be mutex between all of the threads using this buffer * Data can only be read if the data has been written into the buffer */ bool CBuffer::Readable(const string& /*caller*/, ModuleId moduleId) { // cerr << ", m_processIndex " << m_processIndex[moduleId] << ", elemenmt count:" << m_elementCount << endl; return m_processIndex[moduleId] < m_elementCount; } void CBuffer::SetMetaData(const string& key, Any anything) { //设置当前Buffer的MetaData this->m_context.Put(key,anything); if(this->m_target == BMT_MODULE){ //递归设置下游模块MetaData ModuleProcessIndex::iterator end = m_processIndex.end(); for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) { CModule* module = reinterpret_cast(it->first); module->SetOutputMetaData(key,anything); } } } bool CBuffer::HasMetaData(const std::string& key) { return this->m_context.ContainsKey(key); } void CBuffer::GetMultiTraceToElement(ModuleId moduleID,CBufferElement* element){ if((element->elementType==TRACE)&&(NULL!=element->GetData())){ //设置多波分量中的分量ID(josn文件中的ID)和此分量对应的数据类型 std::map componentMap; componentMap.insert(std::make_pair("x-component",CONST_THREE_COMPONENT_DATATYPE_X_CODE)); componentMap.insert(std::make_pair("y-component",CONST_THREE_COMPONENT_DATATYPE_Y_CODE)); componentMap.insert(std::make_pair("z-component",CONST_THREE_COMPONENT_DATATYPE_Z_CODE)); int m_dataType =1; //查找需要处理的多波分量,如何未找到则不进行多波处理 bool isMultiwave=false; isMultiwave=GetSelectedMultiwaveComponent(moduleID,componentMap,m_dataType); if(isMultiwave){ ::pai::ios::seis::MultiwaveTrace* multiwaveTrace =(::pai::ios::seis::MultiwaveTrace*)(element->GetData()); if(CONST_THREE_COMPONENT_DATATYPE_X_CODE==m_dataType){ multiwaveTrace->m_traceData=multiwaveTrace->m_traceX_SR->m_traceData; multiwaveTrace->m_traceHeader=multiwaveTrace->m_traceX_SR->m_traceHeader; }else if(CONST_THREE_COMPONENT_DATATYPE_Y_CODE==m_dataType){ multiwaveTrace->m_traceData=multiwaveTrace->m_traceY_SV->m_traceData; multiwaveTrace->m_traceHeader=multiwaveTrace->m_traceY_SV->m_traceHeader; }else if(CONST_THREE_COMPONENT_DATATYPE_Z_CODE==m_dataType){ multiwaveTrace->m_traceData=multiwaveTrace->m_traceZ->m_traceData; multiwaveTrace->m_traceHeader=multiwaveTrace->m_traceZ->m_traceHeader; }else{ std::cerr << __FILE__ << __LINE__ <<" m_dataType [ "<elementType==TRACE)&&(NULL!=element->GetData())){ //设置多波分量中的分量ID(josn文件中的ID)和此分量对应的数据类型 std::map componentMap; componentMap.insert(std::make_pair("x-component",CONST_THREE_COMPONENT_DATATYPE_X_CODE)); componentMap.insert(std::make_pair("y-component",CONST_THREE_COMPONENT_DATATYPE_Y_CODE)); componentMap.insert(std::make_pair("z-component",CONST_THREE_COMPONENT_DATATYPE_Z_CODE)); int m_dataType =1; //查找需要处理的多波分量,如何未找到则不进行多波处理 bool isMultiwave=false; isMultiwave=GetSelectedMultiwaveComponent(moduleID,componentMap,m_dataType); if(isMultiwave){ ::pai::ios::seis::MultiwaveTrace* multiwaveTrace =(::pai::ios::seis::MultiwaveTrace*)(element->GetData()); if(CONST_THREE_COMPONENT_DATATYPE_X_CODE==m_dataType){ multiwaveTrace->m_traceX_SR->m_traceData=multiwaveTrace->m_traceData; multiwaveTrace->m_traceX_SR->m_traceHeader=multiwaveTrace->m_traceHeader; }else if(CONST_THREE_COMPONENT_DATATYPE_Y_CODE==m_dataType){ multiwaveTrace->m_traceY_SV->m_traceData=multiwaveTrace->m_traceData; multiwaveTrace->m_traceY_SV->m_traceHeader=multiwaveTrace->m_traceHeader; }else if(CONST_THREE_COMPONENT_DATATYPE_Z_CODE==m_dataType){ multiwaveTrace->m_traceZ->m_traceData=multiwaveTrace->m_traceData; multiwaveTrace->m_traceZ->m_traceHeader=multiwaveTrace->m_traceHeader; }else{ std::cerr << __FILE__ << __LINE__ <<" m_dataType [ "< & componentMap , int & dataType ){ bool isSelected=false; //模块指针不为空 if(moduleID != NULL && this->m_target == BMT_MODULE){ CModule* module = reinterpret_cast(moduleID); CModuleParameter* paramter= module->GetModuleParameter(); for(std::map::iterator it =componentMap.begin(); it!=componentMap.end();++it){ CParameterItem *pItem = paramter->GetParameterItem(it->first); if(NULL!=pItem){ std::string s_flag =pItem->ValueToString(); //是否选中 bool b_select =atoi(s_flag.c_str()) ; //判断此分量是否被选中,如果选中则赋值datatype //(目前多波和常规模块窜接时,经工作流分解后,每个常规模块处理一个分量,所以找到第一个被选中的分量后,直接退出循环) if(b_select){ dataType=it->second; isSelected=true; break; } } } } return isSelected; } } }