Update rus development documentation about data streams

This commit is contained in:
vdimir 2023-11-06 18:43:17 +00:00
parent c583b992ba
commit 3236f269b5
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
3 changed files with 39 additions and 40 deletions

View File

@ -49,21 +49,9 @@ ClickHouse — полноценная столбцовая СУБД. Данны
Блоки создаются для всех обработанных фрагментов данных. Напоминаем, что одни и те же типы вычислений, имена столбцов и типы переиспользуются в разных блоках и только данные колонок изменяются. Лучше разделить данные и заголовок блока потому, что в блоках маленького размера мы имеем большой оверхэд по временным строкам при копировании умных указателей (`shared_ptrs`) и имен столбцов.
## Потоки блоков (Block Streams) {#block-streams}
## Процессоры
Потоки блоков обрабатывают данные. Мы используем потоки блоков для чтения данных, трансформации или записи данных куда-либо. `IBlockInputStream` предоставляет метод `read` для получения следующего блока, пока это возможно, и метод `write`, чтобы продвигать (push) блок куда-либо.
Потоки отвечают за:
1. Чтение и запись в таблицу. Таблица лишь возвращает поток для чтения или записи блоков.
2. Реализацию форматов данных. Например, при выводе данных в терминал в формате `Pretty`, вы создаете выходной поток блоков, который форматирует поступающие в него блоки.
3. Трансформацию данных. Допустим, у вас есть `IBlockInputStream` и вы хотите создать отфильтрованный поток. Вы создаете `FilterBlockInputStream` и инициализируете его вашим потоком. Затем вы тянете (pull) блоки из `FilterBlockInputStream`, а он тянет блоки исходного потока, фильтрует их и возвращает отфильтрованные блоки вам. Таким образом построены конвейеры выполнения запросов.
Имеются и более сложные трансформации. Например, когда вы тянете блоки из `AggregatingBlockInputStream`, он считывает все данные из своего источника, агрегирует их, и возвращает поток агрегированных данных вам. Другой пример: конструктор `UnionBlockInputStream` принимает множество источников входных данных и число потоков. Такой `Stream` работает в несколько потоков и читает данные источников параллельно.
> Потоки блоков используют «втягивающий» (pull) подход к управлению потоком выполнения: когда вы вытягиваете блок из первого потока, он, следовательно, вытягивает необходимые блоки из вложенных потоков, так и работает весь конвейер выполнения. Ни «pull» ни «push» не имеют явного преимущества, потому что поток управления неявный, и это ограничивает в реализации различных функций, таких как одновременное выполнение нескольких запросов (слияние нескольких конвейеров вместе). Это ограничение можно преодолеть с помощью сопрограмм (coroutines) или просто запуском дополнительных потоков, которые ждут друг друга. У нас может быть больше возможностей, если мы сделаем поток управления явным: если мы локализуем логику для передачи данных из одной расчетной единицы в другую вне этих расчетных единиц. Читайте эту [статью](http://journal.stuffwithstuff.com/2013/01/13/iteration-inside-and-out/) для углубленного изучения.
Следует отметить, что конвейер выполнения запроса создает временные данные на каждом шаге. Мы стараемся сохранить размер блока достаточно маленьким, чтобы временные данные помещались в кэш процессора. При таком допущении запись и чтение временных данных практически бесплатны по сравнению с другими расчетами. Мы могли бы рассмотреть альтернативу, которая заключается в том, чтобы объединить многие операции в конвейере вместе. Это может сделать конвейер как можно короче и удалить большую часть временных данных, что может быть преимуществом, но у такого подхода также есть недостатки. Например, разделенный конвейер позволяет легко реализовать кэширование промежуточных данных, использование промежуточных данных из аналогичных запросов, выполняемых одновременно, и объединение конвейеров для аналогичных запросов.
Смотрите описание в файле [src/Processors/IProcessor.h](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.h) исходного кода.
## Форматы {#formats}
@ -81,13 +69,16 @@ ClickHouse — полноценная столбцовая СУБД. Данны
Буферы чтения-записи имеют дело только с байтами. В заголовочных файлах `ReadHelpers` и `WriteHelpers` объявлены некоторые функции, чтобы помочь с форматированием ввода-вывода. Например, есть помощники для записи числа в десятичном формате.
Давайте посмотрим, что происходит, когда вы хотите вывести результат в `JSON` формате в стандартный вывод (stdout). У вас есть результирующий набор данных, готовый к извлечению из `IBlockInputStream`. Вы создаете `WriteBufferFromFileDescriptor(STDOUT_FILENO)` чтобы записать байты в stdout. Вы создаете `JSONRowOutputStream`, инициализируете с этим `WriteBuffer`'ом, чтобы записать строки `JSON` в stdout. Кроме того вы создаете `BlockOutputStreamFromRowOutputStream`, реализуя `IBlockOutputStream`. Затем вызывается `copyData` для передачи данных из `IBlockInputStream` в `IBlockOutputStream` и все работает. Внутренний `JSONRowOutputStream` будет писать в формате `JSON` различные разделители и вызвать `IDataType::serializeTextJSON` метод со ссылкой на `IColumn` и номер строки в качестве аргументов. Следовательно, `IDataType::serializeTextJSON` вызовет метод из `WriteHelpers.h`: например, `writeText` для числовых типов и `writeJSONString` для `DataTypeString`.
Давайте посмотрим, что происходит, когда вы хотите вывести результат в `JSON` формате в стандартный вывод (stdout). У вас есть результирующий набор данных, готовый к извлечению из `QueryPipeline`. Вы создаете `WriteBufferFromFileDescriptor(STDOUT_FILENO)` чтобы записать байты в stdout. Вы создаете `JSONRowOutputFormat`, инициализируете с этим `WriteBuffer`'ом, чтобы записать строки `JSON` в stdout.
Чтобы соеденить выход `QueryPipeline` с форматом, можно использовать метод `complete`, который превращает `QueryPipeline` в завершенный `QueryPipeline`.
Внутренний `JSONRowOutputStream` будет писать в формате `JSON` различные разделители и вызвать `IDataType::serializeTextJSON` метод со ссылкой на `IColumn` и номер строки в качестве аргументов. Следовательно, `IDataType::serializeTextJSON` вызовет метод из `WriteHelpers.h`: например, `writeText` для числовых типов и `writeJSONString` для `DataTypeString`.
## Таблицы {#tables}
Интерфейс `IStorage` служит для отображения таблицы. Различные движки таблиц являются реализациями этого интерфейса. Примеры `StorageMergeTree`, `StorageMemory` и так далее. Экземпляры этих классов являются просто таблицами.
Ключевые методы `IStorage` это `read` и `write`. Есть и другие варианты — `alter`, `rename`, `drop` и так далее. Метод `read` принимает следующие аргументы: набор столбцов для чтения из таблицы, `AST` запрос и желаемое количество потоков для вывода. Он возвращает один или несколько объектов `IBlockInputStream` и информацию о стадии обработки данных, которая была завершена внутри табличного движка во время выполнения запроса.
Ключевые методы `IStorage` это `read` и `write`. Есть и другие варианты — `alter`, `rename`, `drop` и так далее.
Метод `read` принимает следующие аргументы: набор столбцов для чтения из таблицы, `AST` запрос и желаемое количество потоков для вывода и возвращает `Pipe`.
В большинстве случаев метод read отвечает только за чтение указанных столбцов из таблицы, а не за дальнейшую обработку данных. Вся дальнейшая обработка данных осуществляется интерпретатором запросов и не входит в сферу ответственности `IStorage`.
@ -96,7 +87,9 @@ ClickHouse — полноценная столбцовая СУБД. Данны
- AST-запрос, передающийся в метод `read`, может использоваться движком таблицы для получения информации о возможности использования индекса и считывания меньшего количества данных из таблицы.
- Иногда движок таблиц может сам обрабатывать данные до определенного этапа. Например, `StorageDistributed` можно отправить запрос на удаленные серверы, попросить их обработать данные до этапа, когда данные с разных удаленных серверов могут быть объединены, и вернуть эти предварительно обработанные данные. Затем интерпретатор запросов завершает обработку данных.
Метод `read` может возвращать несколько объектов `IBlockInputStream`, позволяя осуществлять параллельную обработку данных. Эти несколько блочных входных потоков могут считываться из таблицы параллельно. Затем вы можете обернуть эти потоки различными преобразованиями (такими как вычисление выражений или фильтрация), которые могут быть вычислены независимо, и создать `UnionBlockInputStream` поверх них, чтобы читать из нескольких потоков параллельно.
Метод `read` может возвращать `Pipe`, состоящий из нескольких процессоров. Каждый их этих процессоров может читать данные параллельно.
Затем, вы можете соеденить эти просессоры с другими преобразованиями (такими как вычисление выражений или фильтрация), которые могут быть вычислены независимо.
Далее, создан `QueryPipeline` поверх них, можно выполнить пайплайн с помощью `PipelineExecutor`.
Есть и другие варианты. Например, `TableFunction` возвращает временный объект `IStorage`, который можно подставить во `FROM`.
@ -112,10 +105,18 @@ ClickHouse — полноценная столбцовая СУБД. Данны
## Интерпретаторы {#interpreters}
Интерпретаторы отвечают за создание конвейера выполнения запроса из `AST`. Есть простые интерпретаторы, такие как `InterpreterExistsQuery` и `InterpreterDropQuery` или более сложный `InterpreterSelectQuery`. Конвейер выполнения запроса представляет собой комбинацию входных и выходных потоков блоков. Например, результатом интерпретации `SELECT` запроса является `IBlockInputStream` для чтения результирующего набора данных; результат интерпретации `INSERT` запроса — это `IBlockOutputStream`, для записи данных, предназначенных для вставки; результат интерпретации `INSERT SELECT` запроса — это `IBlockInputStream`, который возвращает пустой результирующий набор при первом чтении, но копирует данные из `SELECT` к `INSERT`.
Интерпретаторы отвечают за создание конвейера выполнения запроса из `AST`. Есть простые интерпретаторы, такие как `InterpreterExistsQuery` и `InterpreterDropQuery` или более сложный `InterpreterSelectQuery`.
Конвейер выполнения запроса представляет собой комбинацию процессоров, которые могут принимать на вход и также возвращать чанки (набор колонок с их типами)
Процессоры обмениваются данными через порты и могут иметь несколько входных и выходных портов.
Более подробное описание можно найти в файле [src/Processors/IProcessor.h](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.h).
Например, результатом интерпретации `SELECT` запроса является `QueryPipeline`, который имеет специальный выходной порт для чтения результирующего набора данных. Результатом интерпретации `INSERT` запроса является `QueryPipeline` с входным портом для записи данных для вставки. Результатом интерпретации `INSERT SELECT` запроса является завершенный `QueryPipeline`, который не имеет входов или выходов, но копирует данные из `SELECT` в `INSERT` одновременно.
`InterpreterSelectQuery` использует `ExpressionAnalyzer` и `ExpressionActions` механизмы для анализа запросов и преобразований. Именно здесь выполняется большинство оптимизаций запросов на основе правил. `ExpressionAnalyzer` написан довольно грязно и должен быть переписан: различные преобразования запросов и оптимизации должны быть извлечены в отдельные классы, чтобы позволить модульные преобразования или запросы.
Для решения текущих проблем, существующих в интерпретаторах, разрабатывается новый `InterpreterSelectQueryAnalyzer`. Это новая версия `InterpreterSelectQuery`, которая не использует `ExpressionAnalyzer` и вводит дополнительный уровень абстракции между `AST` и `QueryPipeline`, называемый `QueryTree`. Он еще не готов к использованию в продакшене, но его можно протестировать с помощью флага `allow_experimental_analyzer`.
## Функции {#functions}
Существуют обычные функции и агрегатные функции. Агрегатные функции смотрите в следующем разделе.

View File

@ -345,7 +345,7 @@ struct ExtractDomain
**7.** Для абстрактных классов (интерфейсов) можно добавить в начало имени букву `I`.
``` cpp
class IBlockInputStream
class IProcessor
```
**8.** Если переменная используется достаточно локально, то можно использовать короткое имя.

View File

@ -31,27 +31,25 @@ WITH arrayMap(x -> demangle(addressToSymbol(x)), trace) AS all SELECT thread_nam
``` text
Row 1:
──────
thread_name: clickhouse-serv
thread_id: 686
query_id: 1a11f70b-626d-47c1-b948-f9c7b206395d
res: sigqueue
DB::StorageSystemStackTrace::fillData(std::__1::vector<COW<DB::IColumn>::mutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::mutable_ptr<DB::IColumn> > >&, DB::Context const&, DB::SelectQueryInfo const&) const
DB::IStorageSystemOneBlock<DB::StorageSystemStackTrace>::read(std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&, DB::SelectQueryInfo const&, DB::Context const&, DB::QueryProcessingStage::Enum, unsigned long, unsigned int)
DB::InterpreterSelectQuery::executeFetchColumns(DB::QueryProcessingStage::Enum, DB::QueryPipeline&, std::__1::shared_ptr<DB::PrewhereInfo> const&, std::__1::vector<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::allocator<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > > > const&)
DB::InterpreterSelectQuery::executeImpl(DB::QueryPipeline&, std::__1::shared_ptr<DB::IBlockInputStream> const&, std::__1::optional<DB::Pipe>)
DB::InterpreterSelectQuery::execute()
DB::InterpreterSelectWithUnionQuery::execute()
DB::executeQueryImpl(char const*, char const*, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool, DB::ReadBuffer*)
DB::executeQuery(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, DB::Context&, bool, DB::QueryProcessingStage::Enum, bool)
DB::TCPHandler::runImpl()
DB::TCPHandler::run()
Poco::Net::TCPServerConnection::start()
Poco::Net::TCPServerDispatcher::run()
Poco::PooledThread::run()
Poco::ThreadImpl::runnableEntry(void*)
start_thread
__clone
thread_name: QueryPipelineEx
thread_id: 743490
query_id: dc55a564-febb-4e37-95bb-090ef182c6f1
res: memcpy
large_ralloc
arena_ralloc
do_rallocx
Allocator<true, true>::realloc(void*, unsigned long, unsigned long, unsigned long)
HashTable<unsigned long, HashMapCell<unsigned long, char*, HashCRC32<unsigned long>, HashTableNoState, PairNoInit<unsigned long, char*>>, HashCRC32<unsigned long>, HashTableGrowerWithPrecalculation<8ul>, Allocator<true, true>>::resize(unsigned long, unsigned long)
void DB::Aggregator::executeImplBatch<false, false, true, DB::AggregationMethodOneNumber<unsigned long, HashMapTable<unsigned long, HashMapCell<unsigned long, char*, HashCRC32<unsigned long>, HashTableNoState, PairNoInit<unsigned long, char*>>, HashCRC32<unsigned long>, HashTableGrowerWithPrecalculation<8ul>, Allocator<true, true>>, true, false>>(DB::AggregationMethodOneNumber<unsigned long, HashMapTable<unsigned long, HashMapCell<unsigned long, char*, HashCRC32<unsigned long>, HashTableNoState, PairNoInit<unsigned long, char*>>, HashCRC32<unsigned long>, HashTableGrowerWithPrecalculation<8ul>, Allocator<true, true>>, true, false>&, DB::AggregationMethodOneNumber<unsigned long, HashMapTable<unsigned long, HashMapCell<unsigned long, char*, HashCRC32<unsigned long>, HashTableNoState, PairNoInit<unsigned long, char*>>, HashCRC32<unsigned long>, HashTableGrowerWithPrecalculation<8ul>, Allocator<true, true>>, true, false>::State&, DB::Arena*, unsigned long, unsigned long, DB::Aggregator::AggregateFunctionInstruction*, bool, char*) const
DB::Aggregator::executeImpl(DB::AggregatedDataVariants&, unsigned long, unsigned long, std::__1::vector<DB::IColumn const*, std::__1::allocator<DB::IColumn const*>>&, DB::Aggregator::AggregateFunctionInstruction*, bool, bool, char*) const
DB::Aggregator::executeOnBlock(std::__1::vector<COW<DB::IColumn>::immutable_ptr<DB::IColumn>, std::__1::allocator<COW<DB::IColumn>::immutable_ptr<DB::IColumn>>>, unsigned long, unsigned long, DB::AggregatedDataVariants&, std::__1::vector<DB::IColumn const*, std::__1::allocator<DB::IColumn const*>>&, std::__1::vector<std::__1::vector<DB::IColumn const*, std::__1::allocator<DB::IColumn const*>>, std::__1::allocator<std::__1::vector<DB::IColumn const*, std::__1::allocator<DB::IColumn const*>>>>&, bool&) const
DB::AggregatingTransform::work()
DB::ExecutionThreadContext::executeTask()
DB::PipelineExecutor::executeStepImpl(unsigned long, std::__1::atomic<bool>*)
void std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<DB::PipelineExecutor::spawnThreads()::$_0, void ()>>(std::__1::__function::__policy_storage const*)
ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>::worker(std::__1::__list_iterator<ThreadFromGlobalPoolImpl<false>, void*>)
void std::__1::__function::__policy_invoker<void ()>::__call_impl<std::__1::__function::__default_alloc_func<ThreadFromGlobalPoolImpl<false>::ThreadFromGlobalPoolImpl<void ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>::scheduleImpl<void>(std::__1::function<void ()>, Priority, std::__1::optional<unsigned long>, bool)::'lambda0'()>(void&&)::'lambda'(), void ()>>(std::__1::__function::__policy_storage const*)
void* std::__1::__thread_proxy[abi:v15000]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, Priority, std::__1::optional<unsigned long>, bool)::'lambda0'()>>(void*)
```
Получение имен файлов и номеров строк в исходном коде ClickHouse: