impala be query plan 3 prepare->open->close
作者:快盘下载 人气:知识点 ControlServerice
QueryState
为特定查询创建的所有后端执行状态的中心类(例如:各个片段实例的FragmentInstanceStates)。此类包含或使可访问状态在片段实例之间共享;相反,片段实例特定的状态收集在FragmentInstanceState中。QueryState的生存期由引用计数决定。代表查询执行并访问其任何状态的任何线程都必须获取对相应QueryState的引用,并至少在该访问期间保持该引用。通过QueryExecMgr::Get-/ReleaseQueryState()或QueryState::ScopedRef(后者用于仅限于单个函数或块范围的引用)获取和发布引用。只要引用计数大于0,查询的所有控制结构(包含在该类中或可通过该类访问,如FragmentInstanceStates)都保证是活动的。
FragmentInstanceState
FragmentInstanceState处理单个计划片段实例执行的所有方面,包括成功和错误情况下的设置和终结。Close()在Exec()结束时自动发生,释放为此片段实例分配的所有内存,并关闭所有数据流。
堆栈
ControlService::ControlService(MetricGroup* metric_group) this->ExecQueryFInstances(static_cast<const ExecQueryFInstancesRequestPB*>(req), void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request, ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context) const Status& fragment_info_sidecar_status = GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info); Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, query_ctx, fragment_info); QueryState* qs = GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy); Status status = qs->Init(request, fragment_info); 这里主要为初始化 TExecPlanFragmentInfo by fragemtn_ifno unique_ptr<Thread> t; status = Thread::Create("query-exec-mgr",Substitute("query-state-$0", PrintId(query_id)), &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true); bool QueryState::StartFInstances() bool QueryState::StartFInstances(); start_finstances_status = FragmentState::CreateFragmentStateMap(fragment_info_, exec_rpc_params_, this, fragment_state_map_) for(fragment_size) //根据instance_size 创建分布式fragment, 即单个查询拆分为多个fragment FragmentState* fragment_state = state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx)); for(fragment_size) fragment_state->init(); Status Plannode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root) Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root); (递归创建) Status PlanNode::CreateTreeHelper(FragmentState* state, const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx, PlanNode** root) const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnode.num_children; RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));(创建PlanNode) *node = pool->Add(new ScanPlanNode());/PartitionedHashJoinPlanNode/ for(num_children) CreateTreeHelper(state, tnodes, node, node_idx, nullptr)); 递归 RETURN_IF_ERROR(node->Init(tnode, state)); Status = HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state)(这里可以了解下impala ShardedState 的概念,将同一个be节点不同的 scanode 放到了一个队列来处理) for (auto& fragment : fragment_state_map_) { FragmentState* fragment_state = fragment.second; for (int i = 0; i < fragment_state->instance_ctxs().size(); ++i) //创建 FragmentInstanceState FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx, *instance_ctx_pb)); fis_map_.emplace(fis->instance_id(), fis); unique_ptr<Thread> t; //执行单个 FragmentInstanceState ExecFInstance Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,[this, fis]() { this->ExecFInstance(fis); }, &t, true); void QueryState::ExecFInstance(FragmentInstanceState* fis) Status status = fis->Exec();(Status FragmentInstanceState::Exec()) Status status = Prepare(); status = Open(); Close(); Status FragmentInstanceState::Prepare() runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance())); Init(); resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool(); instance_mem_tracker_ = obj_pool()->Add(new MemTracker(runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker())); runtime_state_->resource_pool()->AcquireThreadToken();(获取一个线程资源) const PlanNode* plan_tree = fragment_state_->plan_tree(); //ExecNode 执行节点,根据 PlanNode 创建ExecNode RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_)); RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));(这里举例一个PartitionedHashJoinNode) ObjectPool* pool = state->obj_pool(); *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));(Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) ScanNode 也是同理) for (auto& child : plan_node.children_) { 递归创建子节点的ExecNode ExecNode* child_node; RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node)); DCHECK(child_node != nullptr); (*root)->children_.push_back(child_node); } //当前 Fragement Instance State ExecNode 创建完成 //1 ExchangeNode // set #senders of exchange nodes before calling Prepare() vector<ExecNode*> exch_nodes; exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); //2 scanNode vector<ExecNode*> scan_nodes; ScanRangesPB no_scan_ranges; exec_tree_->CollectScanNodes(&scan_nodes); static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges()); //3 RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_)); //Status ExecNode::Prepare(RuntimeState* state) mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(), for (int i = 0; i < children_.size(); ++i) { RETURN_IF_ERROR(children_[i]->Prepare(state)); } //Status HdfsScanNodeMt::Prepare(RuntimeState* state) //4 prepare sink_ const DataSinkConfig* sink_config = fragment_state_->sink_config(); DCHECK(sink_config != nullptr); sink_ = sink_config->CreateSink(runtime_state_); RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker())); //5 row batch 数据 row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),runtime_state_->instance_mem_tracker())); Status FragmentInstanceState::Open() RETURN_IF_ERROR(exec_tree_->Open(runtime_state_)); return sink_->Open(runtime_state_); void FragmentInstanceState::Close() for (int i = 0; i < children_.size(); ++i) { children_[i]->Close(state); }
认识一个下FragmentInstanceState
加载全部内容