本文概要地介绍了DuckDB的Query Execution

Push-based vs Pull-based Model

关于Push-based Exection和Pull-based Model的之前有讨论。DuckDB最开始也是选择了Pull-based Model但经过权衡还是选择了Push-based Model,具体可以参考这里,https://github.com/duckdb/duckdb/issues/1583,还有https://github.com/duckdb/duckdb/pull/2393

Query Execution Design

DuckDB的Query Exectuion Design涉及三个重要概念,Pipeline,Task和Event。

  • Pipeline: 前文介绍过,总体来说,Query的整个Physical Plan会根据Pipeline Breaker被分成多个Pipeline(多个Pipeline会组成一个DAG),如此原本Query Exeuction是基于Operator的现在变成了基于Pipeline的,Pipeline DAG可以看做是另一种视角下的Query Execution Plan。每个Pipeline代表了Query Execution Plan中一段连续的Physical Operator,逻辑上,由SourceOperatorsSink构成。Source代表一个Pipeline的数据源,Sink是一个Pipeline的终点和数据汇合点,Operators是在SourceSink之间的中间计算结点。Pipeline之间存在Dependency,只有当前Pipeline的所有Dependent Pipeline都执行完后,当前的Pipeline才会执行。而在Pipeline内,通过对数据源Partition可以最大程度的允许多线程并行计算。

  • Task: Task是DuckDB中多线程并行计算的最小执行单位。在一个Pipeline中,会分割数据源并因此构造多个Task(具体执行的是ExecutorTask),使得一个Pipeline可以被多个线程并发执行。Pipeline的Source和Sink需要是并发安全的。

  • Event: Event是Pipeline的对应调度单位。DuckDB在构造Pipeline DAG后,DuckDB会为其构造一个对应的Event DAG,Pipeline通过Event完成对Task的调度和执行。同时,每个Pipeline的Event要记录了需要执行的总并发数和完成的并发数,每当一个Pipeline中的一个Task完成,对应的Event的完成并发数就会加1,当该Pipeline的所有并行计算Task都完成后,对应的Event中的总并发数和已完成并发数相等,标志着该Event也完成,该Event会通知其父亲Event,父亲Event一旦检测到所有 Dependency Event都完成,就会通过调度自己的Task,从而驱动后续的Pipeilne。

Query Execution Implementation

DuckDB的Query Execution的实现,大致流程为,build pipeline,然后选择没有任何依赖的pipeline开始处理,通过schedule这个pipeline的event来执行一个至多个并发task,直到当前pipeline被标记为finished(比如多个并发Task完成了所有的数据处理),则当前pipelined的最后Task会调用Schedule()函数来schedule下一个没有任何依赖的pipeline的event,此event会驱动一至多个task来执行,如此往复直到所有的pipeline都执行完毕,则root pipeline会pull所有的中间结果并生成最后结果。

Pipeline Initialization & Build

Pipeline的构造由Executor::InitializeInternal()函数开始,通过从根结点开始以递归调用每个结点的PhysicalOperator::BuildPipelines()方法来具体构造每个pipeline。

void Executor::InitializeInternal(PhysicalOperator *plan) {
    ...
    // build and ready the pipelines
    PipelineBuildState state;
    auto root_pipeline = make_shared<MetaPipeline>(*this, state, nullptr);
    root_pipeline->Build(physical_plan);
    root_pipeline->Ready();
    
    ...
    // set root pipelines, i.e., all pipelines that end in the final sink
    root_pipeline->GetPipelines(root_pipelines, false);
    root_pipeline_idx = 0;
    
    // collect all meta-pipelines from the root pipeline
    vector<shared_ptr<MetaPipeline>> to_schedule;
    root_pipeline->GetMetaPipelines(to_schedule, true, true);
    
    ...
    // finally, verify and schedule
    VerifyPipelines();
    ScheduleEvents(to_schedule);
}

其中root_pipeline->Build(physical_plan);开始从root pipeline开始创造pipeline,这里可以看出来,构建的Pipeline DAG是由root pipeline开始,但是并不包括root pipeline。在DuckDB实现中,root pipeline有单独的实现来执行并获取最终结果。

void MetaPipeline::Build(PhysicalOperator *op) {
    ...
    op->BuildPipelines(*pipelines.back(), *this);
}
void PhysicalOperator::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
    op_state.reset();
    auto &state = meta_pipeline.GetState();
    if (IsSink()) {
        // operator is a sink, build a pipeline
        sink_state.reset();
        D_ASSERT(children.size() == 1);
        // the operator becomes the data source of the current pipeline
        state.SetPipelineSource(current, this);
        // we create a new pipeline starting from the child
        auto child_meta_pipeline = 
            meta_pipeline.CreateChildMetaPipeline(current, this);
        child_meta_pipeline->Build(children[0].get());
    } else {
        // operator is not a sink! recurse in children
        if (children.empty()) {
            // source
            state.SetPipelineSource(current, this);
        } else {
            ...
            state.AddPipelineOperator(current, this);
            children[0]->BuildPipelines(current, meta_pipeline);
        }
    }
}

Event Schedule

在构建好Pipeline之后,ScheduleEvents(to_schedule)会基于除了root pipeline以外的其他Pipeline构造Event DAG,完成初始Event的调度。

void Executor::ScheduleEvents(const vector<shared_ptr<MetaPipeline>> &meta_pipelines) {
    ScheduleEventData event_data(meta_pipelines, events, true);
    ScheduleEventsInternal(event_data);

void Executor::ScheduleEventsInternal(ScheduleEventData &event_data) {
    ...
    // create all the required pipeline events
    for (auto &pipeline : event_data.meta_pipelines) {
        SchedulePipeline(pipeline, event_data);
    }

    // set up the dependencies across MetaPipelines
    auto &event_map = event_data.event_map;
    for (auto &entry : event_map) {
        auto pipeline = entry.first;
        for (auto &dependency : pipeline->dependencies) {
            auto dep = dependency.lock();
            D_ASSERT(dep);
            auto event_map_entry = event_map.find(dep.get());
            D_ASSERT(event_map_entry != event_map.end());
            auto &dep_entry = event_map_entry->second;
            D_ASSERT(dep_entry.pipeline_complete_event);
            entry.second.pipeline_event->AddDependency(
                *dep_entry.pipeline_complete_event
            );
        }
    }

    ...
    // schedule the pipelines that do not have dependencies
    for (auto &event : events) {
        if (!event->HasDependencies()) {
            event->Schedule();
        }
    }
}

基本上pipeline或者operator会实现Schedule()函数,其中定了如何执行对应的Task。所以event->Schedule()就会驱动Task Execution。比如,一般的PipelineTaskSchedule()定义如下:

class PipelineTask : public ExecutorTask {
    ...
};

void Pipeline::Schedule(shared_ptr<Event> &event) {
    ...
    if (!ScheduleParallel(event)) {
        // could not parallelize this pipeline: push a sequential task instead
        ScheduleSequentialTask(event);
    }
}

其中ScheduleParallelScheduleSequentialTask分别确定如何平行或者顺序执行Task。

bool Pipeline::ScheduleParallel(shared_ptr<Event> &event) {
    // check if the sink, source and all intermediate operators support parallelism
    if (!sink->ParallelSink()) {
        return false;
    }
    if (!source->ParallelSource()) {
        return false;
    }
    for (auto &op : operators) {
        if (!op->ParallelOperator()) {
            return false;
        }
    }
    if (sink->RequiresBatchIndex()) {
        if (!source->SupportsBatchIndex()) {...}
    }
    idx_t max_threads = source_state->MaxThreads();
    return LaunchScanTasks(event, max_threads);
}

void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) {
    vector<unique_ptr<Task>> tasks;
    tasks.push_back(make_unique<PipelineTask>(*this, event));
    event->SetTasks(std::move(tasks));
}

而Hash Join Operator中HashJoinFinalizeEvent的Schedule()定义如下:

class HashJoinFinalizeEvent : public BasePipelineEvent {
    ...
    void Schedule() override {
        auto &context = pipeline->GetClientContext();
        vector<unique_ptr<Task>> finalize_tasks;
        auto &ht = *sink.hash_table;
        const auto &block_collection = ht.GetBlockCollection();
        const auto &blocks = block_collection.blocks;
        const auto num_blocks = blocks.size();
        if (block_collection.count < PARALLEL_CONSTRUCT_THRESHOLD 
            && !context.config.verify_parallelism) {
            // Single-threaded finalize
            finalize_tasks.push_back(
                make_unique<HashJoinFinalizeTask>(shared_from_this(), 
                                                  context, sink, 0, 
                                                  num_blocks, false)
            );
        } else {
            // Parallel finalize
            idx_t num_threads = 
                TaskScheduler::GetScheduler(context).NumberOfThreads();
            auto blocks_per_thread = 
                MaxValue<idx_t>((num_blocks + num_threads - 1) / num_threads, 1);
            
            idx_t block_idx = 0;
            for (idx_t thread_idx = 0; thread_idx < num_threads; thread_idx++) {
                auto block_idx_start = block_idx;
                auto block_idx_end = 
                    MinValue<idx_t>(block_idx_start + blocks_per_thread, num_blocks);
                finalize_tasks.push_back(
                    make_unique<HashJoinFinalizeTask>(shared_from_this(), 
                                                      context, sink, 
                                                      block_idx_start, 
                                                      block_idx_end, true)
                );
                block_idx = block_idx_end;
                if (block_idx == num_blocks) {
                    break;
                }
            }
        }
        SetTasks(std::move(finalize_tasks));
    }
}

Task Execution

通过追踪上述Schedule()方法,它们最终都会调用SetTasks()来让Task入队,等待执行。

void Event::SetTasks(vector<unique_ptr<Task>> tasks) {
    auto &ts = TaskScheduler::GetScheduler(executor.context);
    D_ASSERT(total_tasks == 0);
    D_ASSERT(!tasks.empty());
    this->total_tasks = tasks.size();
    for (auto &task : tasks) {
        ts.ScheduleTask(executor.GetToken(), std::move(task));
    }
}

void TaskScheduler::ScheduleTask(ProducerToken &token, unique_ptr<Task> task) {
    // Enqueue a task for the given producer token and signal any sleeping threads
    queue->Enqueue(token, std::move(task));
}

这里的queue是一个并发队列,DuckDB采用moodycamel::ConcurrentQueue作为一个无锁并发队列来对任务进行调度。其代码位于third_party/concurrentqueue/concurrentqueue.h,3000多行的代码实现。其中在third_party/concurrentqueue/concurrentqueue.h的开头:

Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue. An overview, including benchmark results, is provided here: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++. The full design is also described in excruciating detail at: http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue

那么这些Task是如何从queue中取出被不同的线程并发执行的呢?

这里分成两部分,第一部分是在DuckDB初始化的时候就开始的多个后台线程,通过ExecuteForever()函数定义,

void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
    unique_ptr<Task> task;
    // loop until the marker is set to false
    while (*marker) {
        // wait for a signal with a timeout
        queue->semaphore.wait();
        if (queue->q.try_dequeue(task)) {
            task->Execute(TaskExecutionMode::PROCESS_ALL);
            task.reset();
        }
    }
}

Execute()是最终会调用ExecutorTask(),而ExecutorTask()是一个虚函数,pipeline和operator都会由具体的实现,每个Task都依赖ExecutorTask类来执行。因为每个Task可能对应不同的场景,比如一个Task可能对应执行一个Operator或者对应一整条Pipeline,所以ExecuteTask会定义不同的执行逻辑。

class PipelineTask : public ExecutorTask {
    TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
        if (!pipeline_executor) {
            pipeline_executor = 
                make_unique<PipelineExecutor>(pipeline.GetClientContext(), pipeline);
        }
        if (mode == TaskExecutionMode::PROCESS_PARTIAL) {
            bool finished = pipeline_executor->Execute(PARTIAL_CHUNK_COUNT);
            if (!finished) {
                return TaskExecutionResult::TASK_NOT_FINISHED;
            }
        } else {
            pipeline_executor->Execute();
        }
        event->FinishTask();
        pipeline_executor.reset();
        return TaskExecutionResult::TASK_FINISHED;
    }
};

class HashJoinFinalizeTask : public ExecutorTask {
    TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
        sink.hash_table->Finalize(block_idx_start, block_idx_end, parallel);
        event->FinishTask();
        return TaskExecutionResult::TASK_FINISHED;
    }
}

当Task执行完会调用event->FinishTask()从而驱动下一个event并执行新的task。

DuckDB的Parallel Query Execution另一个重要的问题是,不同的thread执行的结果如何进行同步

这列涉及到两个概念GlobalStateLocalState,具体实现有GlobalSourceStateGlobalSinkStateLocalSourceStateLocalSinkState等等。

每个Thread执行Task会有独立的LocalState[因为LocalState都是通过unique_ptr创建的,很明显不应该被多线程共享],然后使用LocalState来储存中间计算结果,每次一个Thread处理完一个Data Chunk时,会调用Sink()方法将结果合并到LocalSinkState,比如在PipelineTask中调用pipeline_executor->Execute并进一步调用ExecutePushInternal,当完成一个Data Chunk时就会调用这个pipeline的sink operator的Sink方法,pipeline.sink->Sink

bool PipelineExecutor::Execute(idx_t max_chunks) {
    D_ASSERT(pipeline.sink);
    bool exhausted_source = false;
    auto &source_chunk = 
        pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];
    for (idx_t i = 0; i < max_chunks; i++) {
        if (IsFinished()) {
            break;
        }
        source_chunk.Reset();
        FetchFromSource(source_chunk);
        if (source_chunk.size() == 0) {
            exhausted_source = true;
            break;
        }
        auto result = ExecutePushInternal(source_chunk);
        if (result == OperatorResultType::FINISHED) {
            D_ASSERT(IsFinished());
            break;
        }
    }
    if (!exhausted_source && !IsFinished()) {
        return false;
    }
    PushFinalize();
    return true;
}

OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t initial_idx) {
    D_ASSERT(pipeline.sink);
    if (input.size() == 0) { // LCOV_EXCL_START
        return OperatorResultType::NEED_MORE_INPUT;
    } // LCOV_EXCL_STOP
    while (true) {
        OperatorResultType result;
        // Note: if input is the final_chunk, we don't do any executing.
        // Then the chunk just needs to be sinked
        if (&input != &final_chunk) {
            final_chunk.Reset();
            result = Execute(input, final_chunk, initial_idx);
            if (result == OperatorResultType::FINISHED) {
                return OperatorResultType::FINISHED;
            }
        } else {
            result = OperatorResultType::NEED_MORE_INPUT;
        }
        auto &sink_chunk = final_chunk;
        if (sink_chunk.size() > 0) {
            StartOperator(pipeline.sink);
            D_ASSERT(pipeline.sink);
            D_ASSERT(pipeline.sink->sink_state);
            auto sink_result = pipeline.sink->Sink(context, 
                                                   *pipeline.sink->sink_state, 
                                                   *local_sink_state, 
                                                   sink_chunk);
            EndOperator(pipeline.sink, nullptr);
            if (sink_result == SinkResultType::FINISHED) {
                FinishProcessing();
                return OperatorResultType::FINISHED;
            }
        }
        if (result == OperatorResultType::NEED_MORE_INPUT) {
            return OperatorResultType::NEED_MORE_INPUT;
        }
    }
}

最后通过调用PushFinalize来调用Sink Operator中的Combine()方法,LocalState中的中间结果会被同步到这个Operator的GlobalState,当然同步过程需要加锁。所以DuckDB通过LocalState和GlobalState来拆分私有内存空间和公共内存空间,并发的基础是在私有内存空间上进行运算,在公有内存空间进行同步。

void PipelineExecutor::PushFinalize() {
    ...
    finalized = true;
    ...
    // run the combine for the sink
    pipeline.sink->Combine(context, *pipeline.sink->sink_state, *local_sink_state);

    // flush all query profiler info
    for (idx_t i = 0; i < intermediate_states.size(); i++) {
        intermediate_states[i]->Finalize(pipeline.operators[i], context);
    }
    pipeline.executor.Flush(thread);
    local_sink_state.reset();
}

void PhysicalHashJoin::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
    auto &gstate = (HashJoinGlobalSinkState &)gstate_p;
    auto &lstate = (HashJoinLocalSinkState &)lstate_p;
    if (lstate.hash_table) {
        lock_guard<mutex> local_ht_lock(gstate.lock);
        gstate.local_hash_tables.push_back(std::move(lstate.hash_table));
    }
    auto &client_profiler = QueryProfiler::Get(context.client);
    context.thread.profiler.Flush(this, &lstate.build_executor, "build_executor", 1);
    client_profiler.Flush(context.thread.profiler);
}

因为每个Thread会执行Execute()Combine(),所以在pipeline.sink->Combine()会有多线程竞争。多线程的竞争只会发生在Sink上,也就是一个Pipeline的尾端,同时如何处理parallelism的算法也需要实现在Sink端,其他的非Sink的operators[例如,Hash Join Probe, Projection, Filter等)则不需要感知多线程同步的问题。

当一个Pipeline执行完成之后,会调度执行PipelineFinishEvent::FinishEvent()进而调用该pipeline的Sink的Finalize方法。

void PipelineFinishEvent::FinishEvent() {
    pipeline->Finalize(*this);
}

void Pipeline::Finalize(Event &event) {
    ...
    try {
        auto sink_state = sink->Finalize(*this, event, 
                                         executor.context, 
                                         *sink->sink_state);
        sink->sink_state->state = sink_state;
    } 
}

例如,一个HT Build的pipeline,就会调用PhysicalHashJoin::Finalize,并进一步通过调用ScheduleFinalize()来调度HashJoinFinalizeEventHashJoinFinalizeEvent会调用Sink的Finalize,比如,JoinHashTable::Finalize会创建真正的Hash Table。为了确保一致性,每个Pipeline只会在所有任务都完成了才会调用一次SinkFinalize()

//! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is
//! entirely single threaded.
//! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished
virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const;

SinkFinalizeType PhysicalHashJoin::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const {
    auto &sink = (HashJoinGlobalSinkState &)gstate;

    if (sink.external) {
        D_ASSERT(can_go_external);
        // External join - partition HT
        sink.perfect_join_executor.reset();
        sink.hash_table->ComputePartitionSizes(context.config, 
                                               sink.local_hash_tables, 
                                               sink.max_ht_size);
        auto new_event = make_shared<HashJoinPartitionEvent>(pipeline, sink, 
                                                             sink.local_hash_tables);
        event.InsertEvent(std::move(new_event));
        sink.finalized = true;
        return SinkFinalizeType::READY;
    } else {
        for (auto &local_ht : sink.local_hash_tables) {
            sink.hash_table->Merge(*local_ht);
        }
        sink.local_hash_tables.clear();
    }

    // check for possible perfect hash table
    auto use_perfect_hash = 
        sink.perfect_join_executor->CanDoPerfectHashJoin();
    if (use_perfect_hash) {
        D_ASSERT(sink.hash_table->equality_types.size() == 1);
        auto key_type = sink.hash_table->equality_types[0];
        use_perfect_hash = sink.perfect_join_executor->BuildPerfectHashTable(key_type);
    }
    // In case of a large build side or duplicates, use regular hash join
    if (!use_perfect_hash) {
        sink.perfect_join_executor.reset();
        sink.ScheduleFinalize(pipeline, event);
    }
    sink.finalized = true;
    if (sink.hash_table->Count() == 0 && EmptyResultIfRHSIsEmpty()) {
        return SinkFinalizeType::NO_OUTPUT_POSSIBLE;
    }
    return SinkFinalizeType::READY;
}

void HashJoinGlobalSinkState::ScheduleFinalize(Pipeline &pipeline, Event &event) {
    if (hash_table->Count() == 0) {
        hash_table->finalized = true;
        return;
    }
    hash_table->InitializePointerTable();
    auto new_event = make_shared<HashJoinFinalizeEvent>(pipeline, *this);
    event.InsertEvent(std::move(new_event));
}

class HashJoinFinalizeTask : public ExecutorTask {
public:
    HashJoinFinalizeTask(shared_ptr<Event> event_p, ClientContext &context, HashJoinGlobalSinkState &sink, idx_t block_idx_start, idx_t block_idx_end, bool parallel)
        : ExecutorTask(context), event(std::move(event_p)), sink(sink), block_idx_start(block_idx_start), block_idx_end(block_idx_end), parallel(parallel) {
    }

    TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
        sink.hash_table->Finalize(block_idx_start, block_idx_end, parallel);
        event->FinishTask();
        return TaskExecutionResult::TASK_FINISHED;
    }
}

//! Finalize the build of the HT, constructing the actual hash table and making the HT ready for probing.
//! Finalize must be called before any call to Probe, 
//! And after Finalize is called Build should no longer be ever called.
void JoinHashTable::Finalize(idx_t block_idx_start, idx_t block_idx_end, bool parallel) {
    ...
    Vector hashes(LogicalType::HASH);
    auto hash_data = FlatVector::GetData<hash_t>(hashes);
    data_ptr_t key_locations[STANDARD_VECTOR_SIZE];
    // now construct the actual hash table; scan the nodes
    // as we scan the nodes we pin all the blocks of the HT and keep them pinned until the HT is destroyed
    // this is so that we can keep pointers around to the blocks
    for (idx_t block_idx = block_idx_start; block_idx < block_idx_end; block_idx++) {
        auto &block = block_collection->blocks[block_idx];
        auto handle = buffer_manager.Pin(block->block);
        data_ptr_t dataptr = handle.Ptr();

        data_ptr_t heap_ptr = nullptr;
        if (unswizzle) {
            auto &heap_block = string_heap->blocks[block_idx];
            auto heap_handle = buffer_manager.Pin(heap_block->block);
            heap_ptr = heap_handle.Ptr();
            local_pinned_handles.push_back(std::move(heap_handle));
        }

        idx_t entry = 0;
        while (entry < block->count) {
            idx_t next = MinValue<idx_t>(STANDARD_VECTOR_SIZE, block->count - entry);

            if (unswizzle) {
                RowOperations::UnswizzlePointers(layout, dataptr, heap_ptr, next);
            }

            // fetch the next vector of entries from the blocks
            for (idx_t i = 0; i < next; i++) {
                hash_data[i] = Load<hash_t>((data_ptr_t)(dataptr + pointer_offset));
                key_locations[i] = dataptr;
                dataptr += entry_size;
            }
            // now insert into the hash table
            InsertHashes(hashes, next, key_locations, parallel);

            entry += next;
        }
        local_pinned_handles.push_back(std::move(handle));
    }

    lock_guard<mutex> lock(pinned_handles_lock);
    for (auto &local_pinned_handle : local_pinned_handles) {
        pinned_handles.push_back(std::move(local_pinned_handle));
    }
}

Root Pipeline & Final Result

root pipeline一般是指sink是原始physical plan的根结点的pipeline,在构建Event DAG的时候不会将这部分Pipeline考虑进去,这些root pipeline是由主线程执行,而并不会被TaskScheduler启动的后台线程异步执行,而root pipeline要想得到执行也需要等待所有中间pipeline执行结束。

root pipeline是主线程通过调用PipelineExecutor的Execute函数完成的。主线程完成中间pipeline的初始调度后,因为root pipeline在中间结果没有准备好之前也不能计算,这时为了加速查询的执行最好的办法就是主线程也参与到中间pipeline的执行当中去。所以,主线程会停留在一个不断获取小部分数据进行处理得while循环,比如在DuckDB Python Client中的CompletePendingQuery,而pending_query.ExecuteTask()会不断调用task->Execute(TaskExecutionMode::PROCESS_PARTIAL),从而让主线程每次只处理一小部分数据。

unique_ptr<QueryResult> DuckDBPyConnection::CompletePendingQuery(PendingQueryResult &pending_query) {
    PendingExecutionResult execution_result;
    do {
        execution_result = pending_query.ExecuteTask();
        {
            py::gil_scoped_acquire gil;
            if (PyErr_CheckSignals() != 0) {
                throw std::runtime_error("Query interrupted");
            }
        }
    } while (execution_result == PendingExecutionResult::RESULT_NOT_READY);
    ...
    return pending_query.Execute();
}

而一旦execution_result的状态变为RESULT_READY,就意味着我们结束了所有中间pipeline的执行,最外层的do-while循环也会退出,从而进入下一阶段,通过最后的return pending_query.Execute()来执行root pipeline,经过一系列函数调用,最用来到FetchChunk()已经其中会调用的ExecutePull(),虽然名字有Pull,但其实这个函数内部实现仍旧是push的方式,先从source拿到一批数据,然后再依次的经过所有operators的计算得到最终结果,与其他pipeline无异。

unique_ptr<DataChunk> Executor::FetchChunk() {
    auto chunk = make_unique<DataChunk>();
    root_executor->InitializeChunk(*chunk);
    while (true) {
        root_executor->ExecutePull(*chunk);
        if (chunk->size() == 0) {
            root_executor->PullFinalize();
            if (NextExecutor()) {
                continue;
            }
            break;
        } else {
            break;
        }
    }
    return chunk;
}

void PipelineExecutor::ExecutePull(DataChunk &result) {
    if (IsFinished()) {
        return;
    }
    auto &executor = pipeline.executor;
    try {
        D_ASSERT(!pipeline.sink);
        auto &source_chunk = 
            pipeline.operators.empty() ? result : *intermediate_chunks[0];
        while (result.size() == 0) {
            if (in_process_operators.empty()) {
                source_chunk.Reset();
                FetchFromSource(source_chunk);
                if (source_chunk.size() == 0) {
                    break;
                }
            }
            if (!pipeline.operators.empty()) {
                auto state = Execute(source_chunk, result);
                if (state == OperatorResultType::FINISHED) {
                    break;
                }
            }
        }
    } catch (const Exception &ex) { // LCOV_EXCL_START
        ...
    } // LCOV_EXCL_STOP
}

Reference

  1. Notes on Duckdb: Build Pipelines, https://zhuanlan.zhihu.com/p/609337363
  2. [DuckDB] Push-Based Execution Model, https://zhuanlan.zhihu.com/p/402355976
  3. Viktor Leis, Peter Boncz, Alfons Kemper, and Thomas Neumann. “Morsel-driven parallelism: a NUMA-aware query evaluation framework for the many-core age.” ACM International Conference on Management of Data (SIGMOD), pp. 743-754. 2014.
  4. Thomas Neumann. “Efficiently compiling efficient query plans for modern hardware.” VLDB Endowment 4, no. 9 (2011): 539-550.