DuckDB--Query Execution
本文概要地介绍了DuckDB的Query Execution
Push-based vs Pull-based Model
关于Push-based Exection和Pull-based Model的之前有讨论。DuckDB最开始也是选择了Pull-based Model但经过权衡还是选择了Push-based Model,具体可以参考这里,https://github.com/duckdb/duckdb/issues/1583,还有https://github.com/duckdb/duckdb/pull/2393
Query Execution Design
DuckDB的Query Exectuion Design涉及三个重要概念,Pipeline,Task和Event。
-
Pipeline: 前文介绍过,总体来说,Query的整个Physical Plan会根据Pipeline Breaker被分成多个Pipeline(多个Pipeline会组成一个DAG),如此原本Query Exeuction是基于Operator的现在变成了基于Pipeline的,Pipeline DAG可以看做是另一种视角下的Query Execution Plan。每个Pipeline代表了Query Execution Plan中一段连续的Physical Operator,逻辑上,由
Source
,Operators
和Sink
构成。Source
代表一个Pipeline的数据源,Sink
是一个Pipeline的终点和数据汇合点,Operators
是在Source
和Sink
之间的中间计算结点。Pipeline之间存在Dependency,只有当前Pipeline的所有Dependent Pipeline都执行完后,当前的Pipeline才会执行。而在Pipeline内,通过对数据源Partition可以最大程度的允许多线程并行计算。 -
Task: Task是DuckDB中多线程并行计算的最小执行单位。在一个Pipeline中,会分割数据源并因此构造多个Task(具体执行的是ExecutorTask),使得一个Pipeline可以被多个线程并发执行。Pipeline的Source和Sink需要是并发安全的。
-
Event: Event是Pipeline的对应调度单位。DuckDB在构造Pipeline DAG后,DuckDB会为其构造一个对应的Event DAG,Pipeline通过Event完成对Task的调度和执行。同时,每个Pipeline的Event要记录了需要执行的总并发数和完成的并发数,每当一个Pipeline中的一个Task完成,对应的Event的完成并发数就会加1,当该Pipeline的所有并行计算Task都完成后,对应的Event中的总并发数和已完成并发数相等,标志着该Event也完成,该Event会通知其父亲Event,父亲Event一旦检测到所有 Dependency Event都完成,就会通过调度自己的Task,从而驱动后续的Pipeilne。
Query Execution Implementation
DuckDB的Query Execution的实现,大致流程为,build pipeline,然后选择没有任何依赖的pipeline开始处理,通过schedule这个pipeline的event来执行一个至多个并发task,直到当前pipeline被标记为finished(比如多个并发Task完成了所有的数据处理),则当前pipelined的最后Task会调用Schedule()
函数来schedule下一个没有任何依赖的pipeline的event,此event会驱动一至多个task来执行,如此往复直到所有的pipeline都执行完毕,则root pipeline会pull所有的中间结果并生成最后结果。
Pipeline Initialization & Build
Pipeline的构造由Executor::InitializeInternal()
函数开始,通过从根结点开始以递归调用每个结点的PhysicalOperator::BuildPipelines()
方法来具体构造每个pipeline。
void Executor::InitializeInternal(PhysicalOperator *plan) {
...
// build and ready the pipelines
PipelineBuildState state;
auto root_pipeline = make_shared<MetaPipeline>(*this, state, nullptr);
root_pipeline->Build(physical_plan);
root_pipeline->Ready();
...
// set root pipelines, i.e., all pipelines that end in the final sink
root_pipeline->GetPipelines(root_pipelines, false);
root_pipeline_idx = 0;
// collect all meta-pipelines from the root pipeline
vector<shared_ptr<MetaPipeline>> to_schedule;
root_pipeline->GetMetaPipelines(to_schedule, true, true);
...
// finally, verify and schedule
VerifyPipelines();
ScheduleEvents(to_schedule);
}
其中root_pipeline->Build(physical_plan);
开始从root pipeline开始创造pipeline,这里可以看出来,构建的Pipeline DAG是由root pipeline开始,但是并不包括root pipeline。在DuckDB实现中,root pipeline有单独的实现来执行并获取最终结果。
void MetaPipeline::Build(PhysicalOperator *op) {
...
op->BuildPipelines(*pipelines.back(), *this);
}
void PhysicalOperator::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) {
op_state.reset();
auto &state = meta_pipeline.GetState();
if (IsSink()) {
// operator is a sink, build a pipeline
sink_state.reset();
D_ASSERT(children.size() == 1);
// the operator becomes the data source of the current pipeline
state.SetPipelineSource(current, this);
// we create a new pipeline starting from the child
auto child_meta_pipeline =
meta_pipeline.CreateChildMetaPipeline(current, this);
child_meta_pipeline->Build(children[0].get());
} else {
// operator is not a sink! recurse in children
if (children.empty()) {
// source
state.SetPipelineSource(current, this);
} else {
...
state.AddPipelineOperator(current, this);
children[0]->BuildPipelines(current, meta_pipeline);
}
}
}
Event Schedule
在构建好Pipeline之后,ScheduleEvents(to_schedule)
会基于除了root pipeline以外的其他Pipeline构造Event DAG,完成初始Event的调度。
void Executor::ScheduleEvents(const vector<shared_ptr<MetaPipeline>> &meta_pipelines) {
ScheduleEventData event_data(meta_pipelines, events, true);
ScheduleEventsInternal(event_data);
void Executor::ScheduleEventsInternal(ScheduleEventData &event_data) {
...
// create all the required pipeline events
for (auto &pipeline : event_data.meta_pipelines) {
SchedulePipeline(pipeline, event_data);
}
// set up the dependencies across MetaPipelines
auto &event_map = event_data.event_map;
for (auto &entry : event_map) {
auto pipeline = entry.first;
for (auto &dependency : pipeline->dependencies) {
auto dep = dependency.lock();
D_ASSERT(dep);
auto event_map_entry = event_map.find(dep.get());
D_ASSERT(event_map_entry != event_map.end());
auto &dep_entry = event_map_entry->second;
D_ASSERT(dep_entry.pipeline_complete_event);
entry.second.pipeline_event->AddDependency(
*dep_entry.pipeline_complete_event
);
}
}
...
// schedule the pipelines that do not have dependencies
for (auto &event : events) {
if (!event->HasDependencies()) {
event->Schedule();
}
}
}
基本上pipeline或者operator会实现Schedule()
函数,其中定了如何执行对应的Task。所以event->Schedule()
就会驱动Task Execution。比如,一般的PipelineTask
的Schedule()
定义如下:
class PipelineTask : public ExecutorTask {
...
};
void Pipeline::Schedule(shared_ptr<Event> &event) {
...
if (!ScheduleParallel(event)) {
// could not parallelize this pipeline: push a sequential task instead
ScheduleSequentialTask(event);
}
}
其中ScheduleParallel
和ScheduleSequentialTask
分别确定如何平行或者顺序执行Task。
bool Pipeline::ScheduleParallel(shared_ptr<Event> &event) {
// check if the sink, source and all intermediate operators support parallelism
if (!sink->ParallelSink()) {
return false;
}
if (!source->ParallelSource()) {
return false;
}
for (auto &op : operators) {
if (!op->ParallelOperator()) {
return false;
}
}
if (sink->RequiresBatchIndex()) {
if (!source->SupportsBatchIndex()) {...}
}
idx_t max_threads = source_state->MaxThreads();
return LaunchScanTasks(event, max_threads);
}
void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) {
vector<unique_ptr<Task>> tasks;
tasks.push_back(make_unique<PipelineTask>(*this, event));
event->SetTasks(std::move(tasks));
}
而Hash Join Operator中HashJoinFinalizeEvent的Schedule()
定义如下:
class HashJoinFinalizeEvent : public BasePipelineEvent {
...
void Schedule() override {
auto &context = pipeline->GetClientContext();
vector<unique_ptr<Task>> finalize_tasks;
auto &ht = *sink.hash_table;
const auto &block_collection = ht.GetBlockCollection();
const auto &blocks = block_collection.blocks;
const auto num_blocks = blocks.size();
if (block_collection.count < PARALLEL_CONSTRUCT_THRESHOLD
&& !context.config.verify_parallelism) {
// Single-threaded finalize
finalize_tasks.push_back(
make_unique<HashJoinFinalizeTask>(shared_from_this(),
context, sink, 0,
num_blocks, false)
);
} else {
// Parallel finalize
idx_t num_threads =
TaskScheduler::GetScheduler(context).NumberOfThreads();
auto blocks_per_thread =
MaxValue<idx_t>((num_blocks + num_threads - 1) / num_threads, 1);
idx_t block_idx = 0;
for (idx_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
auto block_idx_start = block_idx;
auto block_idx_end =
MinValue<idx_t>(block_idx_start + blocks_per_thread, num_blocks);
finalize_tasks.push_back(
make_unique<HashJoinFinalizeTask>(shared_from_this(),
context, sink,
block_idx_start,
block_idx_end, true)
);
block_idx = block_idx_end;
if (block_idx == num_blocks) {
break;
}
}
}
SetTasks(std::move(finalize_tasks));
}
}
Task Execution
通过追踪上述Schedule()
方法,它们最终都会调用SetTasks()
来让Task入队,等待执行。
void Event::SetTasks(vector<unique_ptr<Task>> tasks) {
auto &ts = TaskScheduler::GetScheduler(executor.context);
D_ASSERT(total_tasks == 0);
D_ASSERT(!tasks.empty());
this->total_tasks = tasks.size();
for (auto &task : tasks) {
ts.ScheduleTask(executor.GetToken(), std::move(task));
}
}
void TaskScheduler::ScheduleTask(ProducerToken &token, unique_ptr<Task> task) {
// Enqueue a task for the given producer token and signal any sleeping threads
queue->Enqueue(token, std::move(task));
}
这里的queue是一个并发队列,DuckDB采用moodycamel::ConcurrentQueue
作为一个无锁并发队列来对任务进行调度。其代码位于third_party/concurrentqueue/concurrentqueue.h
,3000多行的代码实现。其中在third_party/concurrentqueue/concurrentqueue.h
的开头:
Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue. An overview, including benchmark results, is provided here: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++. The full design is also described in excruciating detail at: http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
那么这些Task是如何从queue中取出被不同的线程并发执行的呢?
这里分成两部分,第一部分是在DuckDB初始化的时候就开始的多个后台线程,通过ExecuteForever()
函数定义,
void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
unique_ptr<Task> task;
// loop until the marker is set to false
while (*marker) {
// wait for a signal with a timeout
queue->semaphore.wait();
if (queue->q.try_dequeue(task)) {
task->Execute(TaskExecutionMode::PROCESS_ALL);
task.reset();
}
}
}
Execute()
是最终会调用ExecutorTask()
,而ExecutorTask()
是一个虚函数,pipeline和operator都会由具体的实现,每个Task都依赖ExecutorTask
类来执行。因为每个Task可能对应不同的场景,比如一个Task可能对应执行一个Operator或者对应一整条Pipeline,所以ExecuteTask
会定义不同的执行逻辑。
class PipelineTask : public ExecutorTask {
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
if (!pipeline_executor) {
pipeline_executor =
make_unique<PipelineExecutor>(pipeline.GetClientContext(), pipeline);
}
if (mode == TaskExecutionMode::PROCESS_PARTIAL) {
bool finished = pipeline_executor->Execute(PARTIAL_CHUNK_COUNT);
if (!finished) {
return TaskExecutionResult::TASK_NOT_FINISHED;
}
} else {
pipeline_executor->Execute();
}
event->FinishTask();
pipeline_executor.reset();
return TaskExecutionResult::TASK_FINISHED;
}
};
class HashJoinFinalizeTask : public ExecutorTask {
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
sink.hash_table->Finalize(block_idx_start, block_idx_end, parallel);
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}
}
当Task执行完会调用event->FinishTask()
从而驱动下一个event并执行新的task。
DuckDB的Parallel Query Execution另一个重要的问题是,不同的thread执行的结果如何进行同步
这列涉及到两个概念GlobalState
和LocalState
,具体实现有GlobalSourceState
,GlobalSinkState
,LocalSourceState
,LocalSinkState
等等。
每个Thread执行Task会有独立的LocalState[因为LocalState都是通过unique_ptr创建的,很明显不应该被多线程共享],然后使用LocalState来储存中间计算结果,每次一个Thread处理完一个Data Chunk时,会调用Sink()
方法将结果合并到LocalSinkState
,比如在PipelineTask中调用pipeline_executor->Execute
并进一步调用ExecutePushInternal
,当完成一个Data Chunk时就会调用这个pipeline的sink operator的Sink
方法,pipeline.sink->Sink
。
bool PipelineExecutor::Execute(idx_t max_chunks) {
D_ASSERT(pipeline.sink);
bool exhausted_source = false;
auto &source_chunk =
pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];
for (idx_t i = 0; i < max_chunks; i++) {
if (IsFinished()) {
break;
}
source_chunk.Reset();
FetchFromSource(source_chunk);
if (source_chunk.size() == 0) {
exhausted_source = true;
break;
}
auto result = ExecutePushInternal(source_chunk);
if (result == OperatorResultType::FINISHED) {
D_ASSERT(IsFinished());
break;
}
}
if (!exhausted_source && !IsFinished()) {
return false;
}
PushFinalize();
return true;
}
OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t initial_idx) {
D_ASSERT(pipeline.sink);
if (input.size() == 0) { // LCOV_EXCL_START
return OperatorResultType::NEED_MORE_INPUT;
} // LCOV_EXCL_STOP
while (true) {
OperatorResultType result;
// Note: if input is the final_chunk, we don't do any executing.
// Then the chunk just needs to be sinked
if (&input != &final_chunk) {
final_chunk.Reset();
result = Execute(input, final_chunk, initial_idx);
if (result == OperatorResultType::FINISHED) {
return OperatorResultType::FINISHED;
}
} else {
result = OperatorResultType::NEED_MORE_INPUT;
}
auto &sink_chunk = final_chunk;
if (sink_chunk.size() > 0) {
StartOperator(pipeline.sink);
D_ASSERT(pipeline.sink);
D_ASSERT(pipeline.sink->sink_state);
auto sink_result = pipeline.sink->Sink(context,
*pipeline.sink->sink_state,
*local_sink_state,
sink_chunk);
EndOperator(pipeline.sink, nullptr);
if (sink_result == SinkResultType::FINISHED) {
FinishProcessing();
return OperatorResultType::FINISHED;
}
}
if (result == OperatorResultType::NEED_MORE_INPUT) {
return OperatorResultType::NEED_MORE_INPUT;
}
}
}
最后通过调用PushFinalize
来调用Sink Operator中的Combine()
方法,LocalState中的中间结果会被同步到这个Operator的GlobalState,当然同步过程需要加锁。所以DuckDB通过LocalState和GlobalState来拆分私有内存空间和公共内存空间,并发的基础是在私有内存空间上进行运算,在公有内存空间进行同步。
void PipelineExecutor::PushFinalize() {
...
finalized = true;
...
// run the combine for the sink
pipeline.sink->Combine(context, *pipeline.sink->sink_state, *local_sink_state);
// flush all query profiler info
for (idx_t i = 0; i < intermediate_states.size(); i++) {
intermediate_states[i]->Finalize(pipeline.operators[i], context);
}
pipeline.executor.Flush(thread);
local_sink_state.reset();
}
void PhysicalHashJoin::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
auto &gstate = (HashJoinGlobalSinkState &)gstate_p;
auto &lstate = (HashJoinLocalSinkState &)lstate_p;
if (lstate.hash_table) {
lock_guard<mutex> local_ht_lock(gstate.lock);
gstate.local_hash_tables.push_back(std::move(lstate.hash_table));
}
auto &client_profiler = QueryProfiler::Get(context.client);
context.thread.profiler.Flush(this, &lstate.build_executor, "build_executor", 1);
client_profiler.Flush(context.thread.profiler);
}
因为每个Thread会执行Execute()
和Combine()
,所以在pipeline.sink->Combine()
会有多线程竞争。多线程的竞争只会发生在Sink上,也就是一个Pipeline的尾端,同时如何处理parallelism的算法也需要实现在Sink端,其他的非Sink的operators[例如,Hash Join Probe, Projection, Filter等)则不需要感知多线程同步的问题。
当一个Pipeline执行完成之后,会调度执行PipelineFinishEvent::FinishEvent()
进而调用该pipeline的Sink的Finalize
方法。
void PipelineFinishEvent::FinishEvent() {
pipeline->Finalize(*this);
}
void Pipeline::Finalize(Event &event) {
...
try {
auto sink_state = sink->Finalize(*this, event,
executor.context,
*sink->sink_state);
sink->sink_state->state = sink_state;
}
}
例如,一个HT Build的pipeline,就会调用PhysicalHashJoin::Finalize
,并进一步通过调用ScheduleFinalize()
来调度HashJoinFinalizeEvent
。HashJoinFinalizeEvent
会调用Sink的Finalize
,比如,JoinHashTable::Finalize
会创建真正的Hash Table。为了确保一致性,每个Pipeline只会在所有任务都完成了才会调用一次Sink
的Finalize()
。
//! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is
//! entirely single threaded.
//! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished
virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const;
SinkFinalizeType PhysicalHashJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const {
auto &sink = (HashJoinGlobalSinkState &)gstate;
if (sink.external) {
D_ASSERT(can_go_external);
// External join - partition HT
sink.perfect_join_executor.reset();
sink.hash_table->ComputePartitionSizes(context.config,
sink.local_hash_tables,
sink.max_ht_size);
auto new_event = make_shared<HashJoinPartitionEvent>(pipeline, sink,
sink.local_hash_tables);
event.InsertEvent(std::move(new_event));
sink.finalized = true;
return SinkFinalizeType::READY;
} else {
for (auto &local_ht : sink.local_hash_tables) {
sink.hash_table->Merge(*local_ht);
}
sink.local_hash_tables.clear();
}
// check for possible perfect hash table
auto use_perfect_hash =
sink.perfect_join_executor->CanDoPerfectHashJoin();
if (use_perfect_hash) {
D_ASSERT(sink.hash_table->equality_types.size() == 1);
auto key_type = sink.hash_table->equality_types[0];
use_perfect_hash = sink.perfect_join_executor->BuildPerfectHashTable(key_type);
}
// In case of a large build side or duplicates, use regular hash join
if (!use_perfect_hash) {
sink.perfect_join_executor.reset();
sink.ScheduleFinalize(pipeline, event);
}
sink.finalized = true;
if (sink.hash_table->Count() == 0 && EmptyResultIfRHSIsEmpty()) {
return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
}
return SinkFinalizeType::READY;
}
void HashJoinGlobalSinkState::ScheduleFinalize(Pipeline &pipeline, Event &event) {
if (hash_table->Count() == 0) {
hash_table->finalized = true;
return;
}
hash_table->InitializePointerTable();
auto new_event = make_shared<HashJoinFinalizeEvent>(pipeline, *this);
event.InsertEvent(std::move(new_event));
}
class HashJoinFinalizeTask : public ExecutorTask {
public:
HashJoinFinalizeTask(shared_ptr<Event> event_p, ClientContext &context, HashJoinGlobalSinkState &sink, idx_t block_idx_start, idx_t block_idx_end, bool parallel)
: ExecutorTask(context), event(std::move(event_p)), sink(sink), block_idx_start(block_idx_start), block_idx_end(block_idx_end), parallel(parallel) {
}
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
sink.hash_table->Finalize(block_idx_start, block_idx_end, parallel);
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}
}
//! Finalize the build of the HT, constructing the actual hash table and making the HT ready for probing.
//! Finalize must be called before any call to Probe,
//! And after Finalize is called Build should no longer be ever called.
void JoinHashTable::Finalize(idx_t block_idx_start, idx_t block_idx_end, bool parallel) {
...
Vector hashes(LogicalType::HASH);
auto hash_data = FlatVector::GetData<hash_t>(hashes);
data_ptr_t key_locations[STANDARD_VECTOR_SIZE];
// now construct the actual hash table; scan the nodes
// as we scan the nodes we pin all the blocks of the HT and keep them pinned until the HT is destroyed
// this is so that we can keep pointers around to the blocks
for (idx_t block_idx = block_idx_start; block_idx < block_idx_end; block_idx++) {
auto &block = block_collection->blocks[block_idx];
auto handle = buffer_manager.Pin(block->block);
data_ptr_t dataptr = handle.Ptr();
data_ptr_t heap_ptr = nullptr;
if (unswizzle) {
auto &heap_block = string_heap->blocks[block_idx];
auto heap_handle = buffer_manager.Pin(heap_block->block);
heap_ptr = heap_handle.Ptr();
local_pinned_handles.push_back(std::move(heap_handle));
}
idx_t entry = 0;
while (entry < block->count) {
idx_t next = MinValue<idx_t>(STANDARD_VECTOR_SIZE, block->count - entry);
if (unswizzle) {
RowOperations::UnswizzlePointers(layout, dataptr, heap_ptr, next);
}
// fetch the next vector of entries from the blocks
for (idx_t i = 0; i < next; i++) {
hash_data[i] = Load<hash_t>((data_ptr_t)(dataptr + pointer_offset));
key_locations[i] = dataptr;
dataptr += entry_size;
}
// now insert into the hash table
InsertHashes(hashes, next, key_locations, parallel);
entry += next;
}
local_pinned_handles.push_back(std::move(handle));
}
lock_guard<mutex> lock(pinned_handles_lock);
for (auto &local_pinned_handle : local_pinned_handles) {
pinned_handles.push_back(std::move(local_pinned_handle));
}
}
Root Pipeline & Final Result
root pipeline一般是指sink是原始physical plan的根结点的pipeline,在构建Event DAG的时候不会将这部分Pipeline考虑进去,这些root pipeline是由主线程执行,而并不会被TaskScheduler启动的后台线程异步执行,而root pipeline要想得到执行也需要等待所有中间pipeline执行结束。
root pipeline是主线程通过调用PipelineExecutor的Execute函数完成的。主线程完成中间pipeline的初始调度后,因为root pipeline在中间结果没有准备好之前也不能计算,这时为了加速查询的执行最好的办法就是主线程也参与到中间pipeline的执行当中去。所以,主线程会停留在一个不断获取小部分数据进行处理得while循环,比如在DuckDB Python Client中的CompletePendingQuery
,而pending_query.ExecuteTask()
会不断调用task->Execute(TaskExecutionMode::PROCESS_PARTIAL)
,从而让主线程每次只处理一小部分数据。
unique_ptr<QueryResult> DuckDBPyConnection::CompletePendingQuery(PendingQueryResult &pending_query) {
PendingExecutionResult execution_result;
do {
execution_result = pending_query.ExecuteTask();
{
py::gil_scoped_acquire gil;
if (PyErr_CheckSignals() != 0) {
throw std::runtime_error("Query interrupted");
}
}
} while (execution_result == PendingExecutionResult::RESULT_NOT_READY);
...
return pending_query.Execute();
}
而一旦execution_result
的状态变为RESULT_READY
,就意味着我们结束了所有中间pipeline的执行,最外层的do-while循环也会退出,从而进入下一阶段,通过最后的return pending_query.Execute()
来执行root pipeline,经过一系列函数调用,最用来到FetchChunk()
已经其中会调用的ExecutePull()
,虽然名字有Pull
,但其实这个函数内部实现仍旧是push的方式,先从source拿到一批数据,然后再依次的经过所有operators的计算得到最终结果,与其他pipeline无异。
unique_ptr<DataChunk> Executor::FetchChunk() {
auto chunk = make_unique<DataChunk>();
root_executor->InitializeChunk(*chunk);
while (true) {
root_executor->ExecutePull(*chunk);
if (chunk->size() == 0) {
root_executor->PullFinalize();
if (NextExecutor()) {
continue;
}
break;
} else {
break;
}
}
return chunk;
}
void PipelineExecutor::ExecutePull(DataChunk &result) {
if (IsFinished()) {
return;
}
auto &executor = pipeline.executor;
try {
D_ASSERT(!pipeline.sink);
auto &source_chunk =
pipeline.operators.empty() ? result : *intermediate_chunks[0];
while (result.size() == 0) {
if (in_process_operators.empty()) {
source_chunk.Reset();
FetchFromSource(source_chunk);
if (source_chunk.size() == 0) {
break;
}
}
if (!pipeline.operators.empty()) {
auto state = Execute(source_chunk, result);
if (state == OperatorResultType::FINISHED) {
break;
}
}
}
} catch (const Exception &ex) { // LCOV_EXCL_START
...
} // LCOV_EXCL_STOP
}
Reference
- Notes on Duckdb: Build Pipelines, https://zhuanlan.zhihu.com/p/609337363
- [DuckDB] Push-Based Execution Model, https://zhuanlan.zhihu.com/p/402355976
- Viktor Leis, Peter Boncz, Alfons Kemper, and Thomas Neumann. “Morsel-driven parallelism: a NUMA-aware query evaluation framework for the many-core age.” ACM International Conference on Management of Data (SIGMOD), pp. 743-754. 2014.
- Thomas Neumann. “Efficiently compiling efficient query plans for modern hardware.” VLDB Endowment 4, no. 9 (2011): 539-550.