From c583b992ba4bbdb7cabfecf04130e3aa66ceb031 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 6 Nov 2023 18:42:56 +0000 Subject: [PATCH] Update development documentation about data streams --- docs/en/development/architecture.md | 30 +++++++++++--- docs/en/development/style.md | 2 +- .../operations/system-tables/stack_trace.md | 40 +++++++++---------- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/docs/en/development/architecture.md b/docs/en/development/architecture.md index ba81b31b8ef..cfdd2bbcc41 100644 --- a/docs/en/development/architecture.md +++ b/docs/en/development/architecture.md @@ -67,22 +67,30 @@ Implementations of `ReadBuffer`/`WriteBuffer` are used for working with files an Read/WriteBuffers only deal with bytes. There are functions from `ReadHelpers` and `WriteHelpers` header files to help with formatting input/output. For example, there are helpers to write a number in decimal format. -Let’s look at what happens when you want to write a result set in `JSON` format to stdout. You have a result set ready to be fetched from `IBlockInputStream`. You create `WriteBufferFromFileDescriptor(STDOUT_FILENO)` to write bytes to stdout. You create `JSONRowOutputStream`, initialized with that `WriteBuffer`, to write rows in `JSON` to stdout. You create `BlockOutputStreamFromRowOutputStream` on top of it, to represent it as `IBlockOutputStream`. Then you call `copyData` to transfer data from `IBlockInputStream` to `IBlockOutputStream`, and everything works. Internally, `JSONRowOutputStream` will write various JSON delimiters and call the `IDataType::serializeTextJSON` method with a reference to `IColumn` and the row number as arguments. Consequently, `IDataType::serializeTextJSON` will call a method from `WriteHelpers.h`: for example, `writeText` for numeric types and `writeJSONString` for `DataTypeString`. +Let's examine what happens when you want to write a result set in `JSON` format to stdout. +You have a result set ready to be fetched from a pulling `QueryPipeline`. +First, you create a `WriteBufferFromFileDescriptor(STDOUT_FILENO)` to write bytes to stdout. +Next, you connect the result from the query pipeline to `JSONRowOutputFormat`, which is initialized with that `WriteBuffer`, to write rows in `JSON` format to stdout. +This can be done via the `complete` method, which turns a pulling `QueryPipeline` into a completed `QueryPipeline`. +Internally, `JSONRowOutputFormat` will write various JSON delimiters and call the `IDataType::serializeTextJSON` method with a reference to `IColumn` and the row number as arguments. Consequently, `IDataType::serializeTextJSON` will call a method from `WriteHelpers.h`: for example, `writeText` for numeric types and `writeJSONString` for `DataTypeString`. ## Tables {#tables} The `IStorage` interface represents tables. Different implementations of that interface are different table engines. Examples are `StorageMergeTree`, `StorageMemory`, and so on. Instances of these classes are just tables. -The key `IStorage` methods are `read` and `write`. There are also `alter`, `rename`, `drop`, and so on. The `read` method accepts the following arguments: the set of columns to read from a table, the `AST` query to consider, and the desired number of streams to return. It returns one or multiple `IBlockInputStream` objects and information about the stage of data processing that was completed inside a table engine during query execution. +The key methods in `IStorage` are `read` and `write`, along with others such as `alter`, `rename`, and `drop`. The `read` method accepts the following arguments: a set of columns to read from a table, the `AST` query to consider, and the desired number of streams. It returns a `Pipe`. -In most cases, the read method is only responsible for reading the specified columns from a table, not for any further data processing. All further data processing is done by the query interpreter and is outside the responsibility of `IStorage`. +In most cases, the read method is responsible only for reading the specified columns from a table, not for any further data processing. +All subsequent data processing is handled by another part of the pipeline, which falls outside the responsibility of `IStorage`. But there are notable exceptions: - The AST query is passed to the `read` method, and the table engine can use it to derive index usage and to read fewer data from a table. - Sometimes the table engine can process data itself to a specific stage. For example, `StorageDistributed` can send a query to remote servers, ask them to process data to a stage where data from different remote servers can be merged, and return that preprocessed data. The query interpreter then finishes processing the data. -The table’s `read` method can return multiple `IBlockInputStream` objects to allow parallel data processing. These multiple block input streams can read from a table in parallel. Then you can wrap these streams with various transformations (such as expression evaluation or filtering) that can be calculated independently and create a `UnionBlockInputStream` on top of them, to read from multiple streams in parallel. +The table’s `read` method can return a `Pipe` consisting of multiple `Processors`. These `Processors` can read from a table in parallel. +Then, you can connect these processors with various other transformations (such as expression evaluation or filtering), which can be calculated independently. +And then, create a `QueryPipeline` on top of them, and execute it via `PipelineExecutor`. There are also `TableFunction`s. These are functions that return a temporary `IStorage` object to use in the `FROM` clause of a query. @@ -98,9 +106,19 @@ A hand-written recursive descent parser parses a query. For example, `ParserSele ## Interpreters {#interpreters} -Interpreters are responsible for creating the query execution pipeline from an `AST`. There are simple interpreters, such as `InterpreterExistsQuery` and `InterpreterDropQuery`, or the more sophisticated `InterpreterSelectQuery`. The query execution pipeline is a combination of block input or output streams. For example, the result of interpreting the `SELECT` query is the `IBlockInputStream` to read the result set from; the result of the `INSERT` query is the `IBlockOutputStream` to write data for insertion to, and the result of interpreting the `INSERT SELECT` query is the `IBlockInputStream` that returns an empty result set on the first read, but that copies data from `SELECT` to `INSERT` at the same time. +Interpreters are responsible for creating the query execution pipeline from an AST. There are simple interpreters, such as `InterpreterExistsQuery` and `InterpreterDropQuery`, as well as the more sophisticated `InterpreterSelectQuery`. -`InterpreterSelectQuery` uses `ExpressionAnalyzer` and `ExpressionActions` machinery for query analysis and transformations. This is where most rule-based query optimizations are done. `ExpressionAnalyzer` is quite messy and should be rewritten: various query transformations and optimizations should be extracted to separate classes to allow modular transformations of query. +The query execution pipeline is a combination of processors that can consume and produce chunks (sets of columns with specific types). +A processor communicates via ports and can have multiple input ports and multiple output ports. +A more detailed description can be found in [src/Processors/IProcessor.h](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.h). + +For example, the result of interpreting the `SELECT` query is a "pulling" `QueryPipeline` which has a special output port to read the result set from. +The result of the `INSERT` query is a "pushing" `QueryPipeline` with an input port to write data for insertion. +And the result of interpreting the `INSERT SELECT` query is a "completed" `QueryPipeline` that has no inputs or outputs but copies data from `SELECT` to `INSERT` simultaneously. + +`InterpreterSelectQuery` uses `ExpressionAnalyzer` and `ExpressionActions` machinery for query analysis and transformations. This is where most rule-based query optimizations are performed. `ExpressionAnalyzer` is quite messy and should be rewritten: various query transformations and optimizations should be extracted into separate classes to allow for modular transformations of the query. + +To address current problems that exist in interpreters, a new `InterpreterSelectQueryAnalyzer` is being developed. It is a new version of `InterpreterSelectQuery` that does not use `ExpressionAnalyzer` and introduces an additional abstraction level between `AST` and `QueryPipeline` called `QueryTree`. It is not production-ready yet, but it can be tested with the `allow_experimental_analyzer` flag. ## Functions {#functions} diff --git a/docs/en/development/style.md b/docs/en/development/style.md index 5b03468623d..0b71a669638 100644 --- a/docs/en/development/style.md +++ b/docs/en/development/style.md @@ -345,7 +345,7 @@ struct ExtractDomain **7.** For abstract classes (interfaces) you can add the `I` prefix. ``` cpp -class IBlockInputStream +class IProcessor ``` **8.** If you use a variable locally, you can use the short name. diff --git a/docs/en/operations/system-tables/stack_trace.md b/docs/en/operations/system-tables/stack_trace.md index 52ee7088597..90f1f47e52f 100644 --- a/docs/en/operations/system-tables/stack_trace.md +++ b/docs/en/operations/system-tables/stack_trace.md @@ -35,27 +35,25 @@ WITH arrayMap(x -> demangle(addressToSymbol(x)), trace) AS all SELECT thread_nam ``` text Row 1: ────── -thread_name: clickhouse-serv - -thread_id: 686 -query_id: 1a11f70b-626d-47c1-b948-f9c7b206395d -res: sigqueue -DB::StorageSystemStackTrace::fillData(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, DB::Context const&, DB::SelectQueryInfo const&) const -DB::IStorageSystemOneBlock::read(std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int) -DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPipeline&, std::__1::shared_ptr const&, std::__1::vector, std::__1::allocator >, std::__1::allocator, std::__1::allocator > > > const&) -DB::InterpreterSelectQuery::executeImpl(DB::QueryPipeline&, std::__1::shared_ptr const&, std::__1::optional) -DB::InterpreterSelectQuery::execute() -DB::InterpreterSelectWithUnionQuery::execute() -DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*) -DB::executeQuery(std::__1::basic_string, std::__1::allocator > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool) -DB::TCPHandler::runImpl() -DB::TCPHandler::run() -Poco::Net::TCPServerConnection::start() -Poco::Net::TCPServerDispatcher::run() -Poco::PooledThread::run() -Poco::ThreadImpl::runnableEntry(void*) -start_thread -__clone +thread_name: QueryPipelineEx +thread_id: 743490 +query_id: dc55a564-febb-4e37-95bb-090ef182c6f1 +res: memcpy +large_ralloc +arena_ralloc +do_rallocx +Allocator::realloc(void*, unsigned long, unsigned long, unsigned long) +HashTable, HashTableNoState, PairNoInit>, HashCRC32, HashTableGrowerWithPrecalculation<8ul>, Allocator>::resize(unsigned long, unsigned long) +void DB::Aggregator::executeImplBatch, HashTableNoState, PairNoInit>, HashCRC32, HashTableGrowerWithPrecalculation<8ul>, Allocator>, true, false>>(DB::AggregationMethodOneNumber, HashTableNoState, PairNoInit>, HashCRC32, HashTableGrowerWithPrecalculation<8ul>, Allocator>, true, false>&, DB::AggregationMethodOneNumber, HashTableNoState, PairNoInit>, HashCRC32, HashTableGrowerWithPrecalculation<8ul>, Allocator>, true, false>::State&, DB::Arena*, unsigned long, unsigned long, DB::Aggregator::AggregateFunctionInstruction*, bool, char*) const +DB::Aggregator::executeImpl(DB::AggregatedDataVariants&, unsigned long, unsigned long, std::__1::vector>&, DB::Aggregator::AggregateFunctionInstruction*, bool, bool, char*) const +DB::Aggregator::executeOnBlock(std::__1::vector::immutable_ptr, std::__1::allocator::immutable_ptr>>, unsigned long, unsigned long, DB::AggregatedDataVariants&, std::__1::vector>&, std::__1::vector>, std::__1::allocator>>>&, bool&) const +DB::AggregatingTransform::work() +DB::ExecutionThreadContext::executeTask() +DB::PipelineExecutor::executeStepImpl(unsigned long, std::__1::atomic*) +void std::__1::__function::__policy_invoker::__call_impl>(std::__1::__function::__policy_storage const*) +ThreadPoolImpl>::worker(std::__1::__list_iterator, void*>) +void std::__1::__function::__policy_invoker::__call_impl::ThreadFromGlobalPoolImpl>::scheduleImpl(std::__1::function, Priority, std::__1::optional, bool)::'lambda0'()>(void&&)::'lambda'(), void ()>>(std::__1::__function::__policy_storage const*) +void* std::__1::__thread_proxy[abi:v15000]>, void ThreadPoolImpl::scheduleImpl(std::__1::function, Priority, std::__1::optional, bool)::'lambda0'()>>(void*) ``` Getting filenames and line numbers in ClickHouse source code: