Merge pull request #307 from krishnevsky/patch-1

Update architecture.md
This commit is contained in:
alexey-milovidov 2017-01-09 20:39:22 +04:00 committed by GitHub
commit a775438d74

View File

@ -1,214 +1,214 @@
# ClickHouse quick architecture overview
# Overview of ClickHouse architecture
> Optional side notes are in grey.
ClickHouse is a true column oriented DBMS. Data is stored by columns, and furthermore, during query execution data is processed by arrays (vectors, chunks of columns). Whenever possible, operations are dispatched not on individual values but on arrays. It is called "vectorized query execution", and it helps lower dispatch cost relative to the cost of actual data processing.
ClickHouse is a true column-oriented DBMS. Data is stored by columns, and during query execution data is processed by arrays (vectors or chunks of columns). Whenever possible, operations are dispatched on arrays, rather than on individual values. This is called "vectorized query execution," and it helps lower dispatch cost relative to the cost of actual data processing.
>This idea is nothing new. It dates back to the `APL` programming language and its descendants: `A+`, `J`, `K`, `Q`. Array programming is widely used in scientific data processing. Neither is this idea something new in relational databases: for example, it is used in the `Vectorwise` system.
>This idea is nothing new. It dates back to the `APL` programming language and its descendants: `A+`, `J`, `K`, and `Q`. Array programming is widely used in scientific data processing. Neither is this idea something new in relational databases: for example, it is used in the `Vectorwise` system.
>There are two different approaches for speeding up query processing: vectorized query execution and runtime code generation. In the latter, the code is generated for every kind of query on the fly, removing all indirection and dynamic dispatch. None of these approaches is strictly better than the other. Runtime code generation can be better when fuses many operations together, thus fully utilizing CPU execution units and pipeline. Vectorized query execution can be worse, because it must deal with temporary vectors that must be written to cache and read back. If the temporary data does not fit in L2 cache, this becomes an issue. But vectorized query execution more easily utilizes SIMD capabilities of CPU. A [research paper](http://15721.courses.cs.cmu.edu/spring2016/papers/p5-sompolski.pdf) written by our friends shows that it is better to combine both approaches. ClickHouse mostly uses vectorized query execution and has limited initial support for runtime code generation (only the inner loop of first stage of GROUP BY can be compiled).
>There are two different approaches for speeding up query processing: vectorized query execution and runtime code generation. In the latter, the code is generated for every kind of query on the fly, removing all indirection and dynamic dispatch. Neither of these approaches is strictly better than the other. Runtime code generation can be better when it fuses many operations together, thus fully utilizing CPU execution units and the pipeline. Vectorized query execution can be less practical, because it involves temporary vectors that must be written to cache and read back. If the temporary data does not fit in the L2 cache, this becomes an issue. But vectorized query execution more easily utilizes the SIMD capabilities of the CPU. A [research paper](http://15721.courses.cs.cmu.edu/spring2016/papers/p5-sompolski.pdf) written by our friends shows that it is better to combine both approaches. ClickHouse mainly uses vectorized query execution and has limited initial support for runtime code generation (only the inner loop of the first stage of GROUP BY can be compiled).
## Columns
To represent columns in memory (in fact, chunks of columns), `IColumn` interface is used. This interface provide helper methods for implementation of various relational operators. Almost all operations are immutable: they are not modifies original column, but create new, modified one. For example, there is `IColumn::filter` method that accept filter byte mask and create new, filtered column. It is used for `WHERE` and `HAVING` relational operators. Another examples: `IColumn::permute` method to support `ORDER BY`, `IColumn::cut` method to support `LIMIT` and so on.
To represent columns in memory (actually, chunks of columns), the `IColumn` interface is used. This interface provides helper methods for implementation of various relational operators. Almost all operations are immutable: they do not modify the original column, but create a new modified one. For example, the `IColumn::filter` method accepts a filter byte mask and creates a new filtered column. It is used for the `WHERE` and `HAVING` relational operators. Additional examples: the `IColumn::permute` method to support `ORDER BY`, the `IColumn::cut` method to support `LIMIT`, and so on.
Various `IColumn` implementations (`ColumnUInt8`, `ColumnString` and so on) is responsible for memory layout of columns. Memory layout is usually contiguous array. For integer type of columns it is just one contiguous array, like `std::vector`. For `String` and `Array` columns, it is two vectors: one for all array elements, placed contiguously and second is for offsets to beginning of each array. There is also `ColumnConst`, that store just one value in memory, but looks like column.
Various `IColumn` implementations (`ColumnUInt8`, `ColumnString` and so on) are responsible for the memory layout of columns. Memory layout is usually a contiguous array. For the integer type of columns it is just one contiguous array, like `std::vector`. For `String` and `Array` columns, it is two vectors: one for all array elements, placed contiguously, and a second one for offsets to the beginning of each array. There is also `ColumnConst` that stores just one value in memory, but looks like a column.
## Field
Nevertheless, it is possible to work with individual values too. To represent individual value, `Field` is used. `Field` is just a discriminated union of `UInt64`, `Int64`, `Float64`, `String` and `Array`. `IColumn` has method `operator[]` to get n-th value as a `Field`, and `insert` method to append a `Field` to end of a column. These methods are not very efficient, because they require to deal with temporary `Field` object, representing individual value. There are more efficient methods, for example: `insertFrom`, `insertRangeFrom` and so on.
Nevertheless, it is possible to work with individual values as well. To represent an individual value, `Field` is used. `Field` is just a discriminated union of `UInt64`, `Int64`, `Float64`, `String` and `Array`. `IColumn` has the `operator[]` method to get the n-th value as a `Field`, and the `insert` method to append a `Field` to the end of a column. These methods are not very efficient, because they require dealing with temporary `Field` objects representing an individual value. There are more efficient methods, such as `insertFrom`, `insertRangeFrom`, and so on.
`Field` has not enough information about specific data type for table. For example, `UInt8`, `UInt16`, `UInt32`, `UInt64` all represented as `UInt64` in a `Field`.
`Field` doesn't have enough information about a specific data type for a table. For example, `UInt8`, `UInt16`, `UInt32`, and `UInt64` are all represented as `UInt64` in a `Field`.
### Leaky abstractions
`IColumn` has methods for common relational transformations of data. But not for all needs. For example, `ColumnUInt64` has no method to calculate sum of two columns, or `ColumnString` has no method to run substring search. These countless routines are implemented outside of `IColumn`.
`IColumn` has methods for common relational transformations of data, but they don't meet all needs. For example, `ColumnUInt64` doesn't have a method to calculate the sum of two columns, and `ColumnString` doesn't have a method to run a substring search. These countless routines are implemented outside of `IColumn`.
Various functions on columns could be implemented in generic, non-efficient way, using methods of `IColumn` to extract `Field` values, or in specialized way, using knowledge of inner memory layout of data in concrete `IColumn` implementation. To do this, functions are casting to concrete `IColumn` type and deal with internal representation directly. For example, `ColumnUInt64` has `getData` method that return reference to internal array; and then separate routine could read or fill that array directly. In fact, we have "leaky abstractions" to allow efficient specializations of various routines.
Various functions on columns can be implemented in a generic, non-efficient way using `IColumn` methods to extract `Field` values, or in a specialized way using knowledge of inner memory layout of data in a specific `IColumn` implementation. To do this, functions are cast to a specific `IColumn` type and deal with internal representation directly. For example, `ColumnUInt64` has the `getData` method that returns a reference to an internal array, then a separate routine reads or fills that array directly. In fact, we have "leaky abstractions" to allow efficient specializations of various routines.
## Data Types
## Data types
`IDataType` is responsible for serialization and deserialization: for reading and writing chunks of columns or individual values in binary or text form.
`IDataType` is directly corresponding to data types in tables. For example, there are `DataTypeUInt32`, `DataTypeDateTime`, `DataTypeString` and so on.
`IDataType` directly corresponds to data types in tables. For example, there are `DataTypeUInt32`, `DataTypeDateTime`, `DataTypeString` and so on.
`IDataType` and `IColumn` are only loosely related to each other. Different data types could be represented in memory by same `IColumn` implementations. For example, `DataTypeUInt32` and `DataTypeDateTime` are represented both with `ColumnUInt32` or `ColumnConstUInt32`. Also, same data type could be represented by different `IColumn` implementations. For example. `DataTypeUInt8` could be represented with `ColumnUInt8` or `ColumnConstUInt8`.
`IDataType` and `IColumn` are only loosely related to each other. Different data types can be represented in memory by the same `IColumn` implementations. For example, `DataTypeUInt32` and `DataTypeDateTime` are both represented by `ColumnUInt32` or `ColumnConstUInt32`. In addition, the same data type can be represented by different `IColumn` implementations. For example, `DataTypeUInt8` can be represented by `ColumnUInt8` or `ColumnConstUInt8`.
`IDataType` only store metadata. For example, `DataTypeUInt8` doesn't store anything at all (except vptr) and `DataTypeFixedString` stores just `N` - size of fixed-size strings.
`IDataType` only stores metadata. For instance, `DataTypeUInt8` doesn't store anything at all (except vptr) and `DataTypeFixedString` stores just `N` (the size of fixed-size strings).
`IDataType` has helper methods for various data formats. For example, it has methods to serialize value with possible quoting; to serialize value suitable to be placed in JSON; to serialize value as part of XML format. There is no direct correspondence with data formats. For example, different data formats: `Pretty`, `TabSeparated` could use one helper method `serializeTextEscaped` from `IDataType` interface.
`IDataType` has helper methods for various data formats. Examples are methods to serialize a value with possible quoting, to serialize a value for JSON, and to serialize a value as part of XML format. There is no direct correspondence to data formats. For example, the different data formats `Pretty` and `TabSeparated` can use the same `serializeTextEscaped` helper method from the `IDataType` interface.
## Block
`Block` is a container to represent subset (chunk) of table in memory. It is just set of triples: `(IColumn, IDataType, column name)`. During query execution, data is processed by `Block`s. Having `Block`, we have data (in `IColumn` object), we have information about its type (in `IDataType`) - how to deal with that column, and we have column name (some name, either original name of column from table, or some artificial name, assigned for temporary results of calculations).
A `Block` is a container that represents a subset (chunk) of a table in memory. It is just a set of triples: `(IColumn, IDataType, column name)`. During query execution, data is processed by `Block`s. If we have a `Block`, we have data (in the `IColumn` object), we have information about its type (in `IDataType`) that tells us how to deal with that column, and we have the column name (either the original column name from the table, or some artificial name assigned for getting temporary results of calculations).
When we calculate some function over columns in a block, we add another column with its result to a block, and don't touch columns for arguments of function: operations are immutable. Later, unneeded columns could be removed from block, but not modified. It is convenient to allow common subexpressions elimination.
When we calculate some function over columns in a block, we add another column with its result to the block, and we don't touch columns for arguments of the function because operations are immutable. Later, unneeded columns can be removed from the block, but not modified. This is convenient for elimination of common subexpressions.
>Blocks are created for every processed chunk of data. Worth to note that in one type of calculation, column names and types remaining the same for different blocks, only column data changes. It will be better to split block data and block header, because for small block sizes, we will have high overhead of temporary string for column names and shared_ptrs copying.
>Blocks are created for every processed chunk of data. Note that for the same type of calculation, the column names and types remain the same for different blocks, and only column data changes. It is better to split block data from the block header, because small block sizes will have a high overhead of temporary strings for copying shared_ptrs and column names.
## Block Streams
To process data: to read data from somewhere, to do transformations on data, or to write data somewhere, we have streams of blocks. `IBlockInputStream` has method `read`: to fetch next block while available. `IBlockOutputStream` has method `write`: to push block somewhere.
Block streams are for processing data. We use streams of blocks to read data from somewhere, perform data transformations, or write data to somewhere. `IBlockInputStream` has the `read` method to fetch the next block while available. `IBlockOutputStream` has the `write` method to push the block somewhere.
Streams are responsible for various things:
Streams are responsible for:
1. To read or write to a table. Table just returns a stream, from where you could read or where to write blocks.
1. Reading or writing to a table. The table just returns a stream for reading or writing blocks.
2. To implement data formats. For example, if you want to output data to a terminal in `Pretty` format, you create block output stream, where you push blocks, and it will format them.
2. Implementing data formats. For example, if you want to output data to a terminal in `Pretty` format, you create a block output stream where you push blocks, and it formats them.
3. To do transformations on data. Let you have a `IBlockInputStream` and want to create filtered stream. You create `FilterBlockInputStream` and initialize it with your stream. Then when you pull a block from `FilterBlockInputStream`, it will pull a block from your stream, do filtering and return filtered block to you. Query execution pipelines are represented that way.
3. Performing data transformations. Let's say you have `IBlockInputStream` and want to create a filtered stream. You create `FilterBlockInputStream` and initialize it with your stream. Then when you pull a block from `FilterBlockInputStream`, it pulls a block from your stream, filters it, and returns the filtered block to you. Query execution pipelines are represented this way.
There are more sophisticated transformations. For example, when you pull from `AggregatingBlockInputStream`, it will read all data from its source, aggregate it, and then return stream of aggregated data for you. Another example: `UnionBlockInputStream` will accept many input sources in constructor and also a number of threads. It will launch many threads and read from many sources in parallel.
There are more sophisticated transformations. For example, when you pull from `AggregatingBlockInputStream`, it reads all data from its source, aggregates it, and then returns a stream of aggregated data for you. Another example: `UnionBlockInputStream` accepts many input sources in the constructor and also a number of threads. It launches multiple threads and reads from multiple sources in parallel.
> Block streams are using "pull" approach to control flow: when you pull a block from first stream, it will consequently pull required blocks from nested streams, and all execution pipeline will work. Neither "pull" nor "push" is the best solution, because control flow is implicit, and that limits implementation of various features like simultaneous execution of multiple queries (merging many pipelines together). That limitation could be overcomed with coroutines or just running extra threads that wait for each other. We may have more possibilities if we made control flow explicit: if we locate logic for passing data from one calculation unit to another outside of that calculation units. Read [nice article](http://journal.stuffwithstuff.com/2013/01/13/iteration-inside-and-out/) for more thoughts.
> Block streams use the "pull" approach to control flow: when you pull a block from the first stream, it consequently pulls the required blocks from nested streams, and the entire execution pipeline will work. Neither "pull" nor "push" is the best solution, because control flow is implicit, and that limits implementation of various features like simultaneous execution of multiple queries (merging many pipelines together). This limitation could be overcome with coroutines or just running extra threads that wait for each other. We may have more possibilities if we make control flow explicit: if we locate the logic for passing data from one calculation unit to another outside of those calculation units. Read this [nice article](http://journal.stuffwithstuff.com/2013/01/13/iteration-inside-and-out/) for more thoughts.
> Worth to note that query execution pipeline creates temporary data on each step. We are trying to keep block size small enough for that temporary data to fit in CPU cache. With that assumption, writing and reading temporary data is almost free comparable to other calculations. We could consider an alternative: to fuse many operations in pipeline together, to make pipeline as short as possible and remove much of temporary data. It could be an advantage, but there are also drawbacks. For example, in splitted pipeline it is easy to implement caching of intermediate data, stealing of intermediate data from similar queries running at the same time, and merging pipelines for similar queries.
> We should note that the query execution pipeline creates temporary data at each step. We try to keep block size small enough so that temporary data fits in the CPU cache. With that assumption, writing and reading temporary data is almost free in comparison with other calculations. We could consider an alternative, which is to fuse many operations in the pipeline together, to make the pipeline as short as possible and remove much of the temporary data. This could be an advantage, but it also has drawbacks. For example, a split pipeline makes it easy to implement caching intermediate data, stealing intermediate data from similar queries running at the same time, and merging pipelines for similar queries.
## Formats
Data formats are implemented with block streams. There are "presentational" formats, only suitable for output of data to the client: for example, `Pretty` format: it provides only `IBlockOutputStream`. And there are input/output formats, example: `TabSeparated` or `JSONEachRow`.
Data formats are implemented with block streams. There are "presentational" formats only suitable for output of data to the client, such as `Pretty` format, which provides only `IBlockOutputStream`. And there are input/output formats, such as `TabSeparated` or `JSONEachRow`.
There are also row streams: `IRowInputStream`, `IRowOutputStream`. They allow to pull/push data by individual rows, not by blocks. And they are only needed to simplify implementation of row oriented formats. There are wrappers `BlockInputStreamFromRowInputStream`, `BlockOutputStreamFromRowOutputStream` to convert row oriented streams to usual block oriented streams.
There are also row streams: `IRowInputStream` and `IRowOutputStream`. They allow you to pull/push data by individual rows, not by blocks. And they are only needed to simplify implementation of row-oriented formats. The wrappers `BlockInputStreamFromRowInputStream` and `BlockOutputStreamFromRowOutputStream` allow you to convert row-oriented streams to regular block-oriented streams.
## IO
For byte oriented input/output, there are `ReadBuffer` and `WriteBuffer` abstract classes. They are used instead of C++ `iostream`s. Don't worry: every mature C++ project is using something instead of `iostream`s for good reasons.
For byte-oriented input/output, there are `ReadBuffer` and `WriteBuffer` abstract classes. They are used instead of C++ `iostream`s. Don't worry: every mature C++ project is using something other than `iostream`s for good reasons.
`ReadBuffer` and `WriteBuffer` is just a contiguous buffer and a cursor pointing to position in that buffer. Implementations could do or do not own the memory for buffer. There is a virtual method to fill buffer with next data (in case or `ReadBuffer`) or to flush buffer somewhere (in case of `WriteBuffer`). Virtual methods are only rarely called.
`ReadBuffer` and `WriteBuffer` are just a contiguous buffer and a cursor pointing to the position in that buffer. Implementations may own or not own the memory for the buffer. There is a virtual method to fill the buffer with the following data (for `ReadBuffer`) or to flush the buffer somewhere (for `WriteBuffer`). The virtual methods are rarely called.
Implementations of `ReadBuffer`/`WriteBuffer` are used to work with files and file descriptors, network sockets; to implement compression (`CompressedWriteBuffer` is initialized with another WriteBuffer and do compression before writing data to it); names `ConcatReadBuffer`, `LimitReadBuffer`, `HashingWriteBuffer` speak for themself.
Implementations of `ReadBuffer`/`WriteBuffer` are used for working with files and file descriptors and network sockets, for implementing compression (`CompressedWriteBuffer` is initialized with another WriteBuffer and performs compression before writing data to it), and for other purposes the names `ConcatReadBuffer`, `LimitReadBuffer`, and `HashingWriteBuffer` speak for themselves.
Read/WriteBuffers only deal with bytes. To help with formatted input/output (example: write a number in decimal format), there are functions from `ReadHelpers` and `WriteHelpers` header files.
Read/WriteBuffers only deal with bytes. To help with formatted input/output (for instance, to write a number in decimal format), there are functions from `ReadHelpers` and `WriteHelpers` header files.
Lets look what happens when you want to write resultset in `JSON` format to stdout. You have result set, ready to be fetched from `IBlockInputStream`. You create `WriteBufferFromFileDescriptor(STDOUT_FILENO)` to write bytes to stdout. You create `JSONRowOutputStream`, initialized with that `WriteBuffer`, to write rows in `JSON` to stdout. You create `BlockOutputStreamFromRowOutputStream` on top of it, to represent it as `IBlockOutputStream`. Then you call `copyData` to transfer data from `IBlockInputStream` to `IBlockOutputStream`, and all works. Internally, `JSONRowOutputStream` will write various JSON delimiters and call method `IDataType::serializeTextJSON` with reference to `IColumn` and row number as arguments. Consequently, `IDataType::serializeTextJSON` will call a method from `WriteHelpers.h`: for example, `writeText` for numeric types and `writeJSONString` for `DataTypeString`.
Let's look at what happens when you want to write a result set in `JSON` format to stdout. You have a result set ready to be fetched from `IBlockInputStream`. You create `WriteBufferFromFileDescriptor(STDOUT_FILENO)` to write bytes to stdout. You create `JSONRowOutputStream`, initialized with that `WriteBuffer`, to write rows in `JSON` to stdout. You create `BlockOutputStreamFromRowOutputStream` on top of it, to represent it as `IBlockOutputStream`. Then you call `copyData` to transfer data from `IBlockInputStream` to `IBlockOutputStream`, and everything works. Internally, `JSONRowOutputStream` will write various JSON delimiters and call the `IDataType::serializeTextJSON` method with a reference to `IColumn` and the row number as arguments. Consequently, `IDataType::serializeTextJSON` will call a method from `WriteHelpers.h`: for example, `writeText` for numeric types and `writeJSONString` for `DataTypeString`.
## Tables
Tables are represented with `IStorage` interface. Different implementations of that interface are different table engines. Example: `StorageMergeTree`, `StorageMemory` and so on. Instances of that classes are just tables.
Tables are represented by the `IStorage` interface. Different implementations of that interface are different table engines. Examples are `StorageMergeTree`, `StorageMemory`, and so on. Instances of these classes are just tables.
Most important methods of `IStorage` are `read` and `write`. There are also `alter`, `rename`, `drop` and so on. `read` method accepts following arguments: set of columns to read from a table, query `AST` to consider, desired number of streams to return. And it returns: one or multiple of `IBlockInputStream` objects and information about to what stage data was already processed inside a table engine during query execution.
The most important `IStorage` methods are `read` and `write`. There are also `alter`, `rename`, `drop`, and so on. The `read` method accepts the following arguments: the set of columns to read from a table, the `AST` query to consider, and the desired number of streams to return. It returns one or multiple `IBlockInputStream` objects and information about the stage of data processing that was completed inside a table engine during query execution.
In most cases, method read is responsible only for reading specified columns from a table. Not for any further data processing. All further data processing are done by query interpreter - outside of responsibility of `IStorage`.
In most cases, the read method is only responsible for reading the specified columns from a table, not for any further data processing. All further data processing is done by the query interpreter and is outside the responsibility of `IStorage`.
But there are notable exceptions:
- query AST is passed to `read` method and table engine could consider it to derive index usage and to read fewer data from a table;
- sometimes, table engine could process data itself to specific stage: for example, `StorageDistributed` will send query to remote servers, ask them to process data to a stage, where data from different remote servers could be merged, and return that preprocessed data.
Data will be completely processed to final result using query interpreter.
- The AST query is passed to the `read` method and the table engine can use it to derive index usage and to read less data from a table.
- Sometimes the table engine can process data itself to a specific stage. For example, `StorageDistributed` can send a query to remote servers, ask them to process data to a stage where data from different remote servers can be merged, and return that preprocessed data.
The query interpreter then finishes processing the data.
`read` method of table could return many `IBlockInputStream` objects - to allow parallel data processing. That multiple block input streams could read from a table in parallel. Then you could wrap that streams with various transformations (expression evaluation, filtering) that could be calculated independently and create `UnionBlockInputStream` on top of them, to read from multiple streams in parallel.
The table's `read` method can return multiple `IBlockInputStream` objects to allow parallel data processing. These multiple block input streams can read from a table in parallel. Then you can wrap these streams with various transformations (such as expression evaluation or filtering) that can be calculated independently and create a `UnionBlockInputStream` on top of them, to read from multiple streams in parallel.
There are also `TableFunction`s. That are functions that returns temporary `IStorage` object, to use in `FROM` clause of a query.
There are also `TableFunction`s. These are functions that return a temporary `IStorage` object to use in the `FROM` clause of a query.
To get quick understanding, how to implement your own table engine - look at something simple, like `StorageMemory` or `StorageTinyLog`.
To get a quick idea of how to implement your own table engine, look at something simple, like `StorageMemory` or `StorageTinyLog`.
> As result of `read` method, `IStorage` will return `QueryProcessingStage` - information about what parts of query was already calculated inside a storage. Currently we have only very coarse granularity of that information. There is no way for storage to say "I have already processed that part of expression in WHERE, for that ranges of data". Need to work on that.
> As the result of the `read` method, `IStorage` returns `QueryProcessingStage` information about what parts of the query were already calculated inside storage. Currently we have only very coarse granularity for that information. There is no way for the storage to say "I have already processed this part of the expression in WHERE, for this range of data". We need to work on that.
## Parsers
Query is parsed by hand-written recursive descent parser. For example, `ParserSelectQuery` just recursively calls underlying parsers for various parts of query. Parsers create an `AST`. `AST` is represented by nodes - instances of `IAST`.
A query is parsed by a hand-written recursive descent parser. For example, `ParserSelectQuery` just recursively calls the underlying parsers for various parts of the query. Parsers create an `AST`. The `AST` is represented by nodes, which are instances of `IAST`.
> Parser generators are not used for historical reason.
> Parser generators are not used for historical reasons.
## Interpreters
Interpreters are responsible for creating query execution pipeline from an `AST`. There are simple interpreters, for example, `InterpreterExistsQuery`, `InterpreterDropQuery` or more sophisticated, `InterpreterSelectQuery`. Query execution pipeline is a composition of block input or output streams. For example, result of interpretation of `SELECT` query is `IBlockInputStream` from where to read resultset; result of INSERT query is `IBlockOutputStream` where to write a data for insertion; and result of interpretation of `INSERT SELECT` query is an `IBlockInputStream`, that returns empty resultset on first read, but copy data from `SELECT` to `INSERT` in same time.
Interpreters are responsible for creating the query execution pipeline from an `AST`. There are simple interpreters, such as `InterpreterExistsQuery`and `InterpreterDropQuery`, or the more sophisticated `InterpreterSelectQuery`. The query execution pipeline is a combination of block input or output streams. For example, the result of interpreting the `SELECT` query is the `IBlockInputStream` to read the result set from; the result of the INSERT query is the `IBlockOutputStream` to write data for insertion to; and the result of interpreting the `INSERT SELECT` query is the `IBlockInputStream` that returns an empty result set on the first read, but that copies data from `SELECT` to `INSERT` at the same time.
`InterpreterSelectQuery` uses `ExpressionAnalyzer` and `ExpressionActions` machinery to do analyze and transformations of query. Most of rule based query optimizations are done in that place. `ExpressionAnalyzer` is quite messy and should be rewritten: various query transformations and optimizations should be extracted to separate classes to allow modular transformations or query.
`InterpreterSelectQuery` uses `ExpressionAnalyzer` and `ExpressionActions` machinery for query analysis and transformations. This is where most rule-based query optimizations are done. `ExpressionAnalyzer` is quite messy and should be rewritten: various query transformations and optimizations should be extracted to separate classes to allow modular transformations or query.
## Functions
There are ordinary functions and aggregate functions. For aggregate functions, see next section.
There are ordinary functions and aggregate functions. For aggregate functions, see the next section.
Ordinary functions doesn't change number of rows - they work as if they processing each row independently of each other. In fact, functions are called not for individual rows, but for `Block`s of data, to implement vectorized query execution.
Ordinary functions don't change the number of rows they work as if they are processing each row independently. In fact, functions are not called for individual rows, but for `Block`s of data to implement vectorized query execution.
There are some miscellaneous functions, like `blockSize`, `rowNumberInBlock`, `runningAccumulate`, that exploit block processing and violates independence of rows.
There are some miscellaneous functions, like `blockSize`, `rowNumberInBlock`, and `runningAccumulate`, that exploit block processing and violate the independence of rows.
ClickHouse has strong typing: no implicit type conversion happens. If a function doesn't support specific combination of types, an exception will be thrown. But functions could work (be overloaded) for many different combinations of types. For example, function `plus` (to implement operator `+`) works for any combination of numeric types: `UInt8` + `Float32`, `UInt16` + `Int8` and so on. Also, some functions could accept variadic number of arguments, for example, function `concat`.
ClickHouse has strong typing, so implicit type conversion doesn't occur. If a function doesn't support a specific combination of types, an exception will be thrown. But functions can work (be overloaded) for many different combinations of types. For example, the `plus` function (to implement the `+` operator) works for any combination of numeric types: `UInt8` + `Float32`, `UInt16` + `Int8`, and so on. Also, some variadic functions can accept any number of arguments, such as the `concat` function.
Implementing a function could be slightly unconvenient, because a function is doing explicit dispatching on supported data types, and on supported `IColumns`. For example, function `plus` has code, generated by instantination of C++ template, for each combination of numeric types, and for constant or non-constant left and right arguments.
Implementing a function may be slightly inconvenient because a function explicitly dispatches supported data types and supported `IColumns`. For example, the `plus` function has code generated by instantiation of a C++ template for each combination of numeric types, and for constant or non-constant left and right arguments.
> Nice place to implement runtime code generation to avoid template code bloat. Also it will make possible to add fused functions, like fused multiply-add or to do few comparisons in one loop iteration.
> This is a nice place to implement runtime code generation to avoid template code bloat. Also, it will make it possible to add fused functions like fused multiply-add, or to make multiple comparisons in one loop iteration.
> Due to vectorized query execution, functions are not short-curcuit. For example, if you write `WHERE f(x) AND g(y)`, both sides will be calculated, even for rows, when `f(x)` is zero (except the case when `f(x)` is zero constant expression). But if selectivity of `f(x)` condition is high, and calculation of `f(x)` is much cheaper than `g(y)`, it will be better to implement multi-pass calculation: first calculate `f(x)`, then filter columns by its result, and then calculate `g(y)` only for smaller, filtered chunks of data.
> Due to vectorized query execution, functions are not short-circuit. For example, if you write `WHERE f(x) AND g(y)`, both sides will be calculated, even for rows, when `f(x)` is zero (except when `f(x)` is a zero constant expression). But if selectivity of the `f(x)` condition is high, and calculation of `f(x)` is much cheaper than `g(y)`, it's better to implement multi-pass calculation: first calculate `f(x)`, then filter columns by the result, and then calculate `g(y)` only for smaller, filtered chunks of data.
## Aggregate Functions
Aggregate functions is a stateful functions. They accumulate passed values into some state, and finally allow to get result from that state. They are managed with `IAggregateFunction` interface. States could be rather simple (for example, state for `AggregateFunctionCount` is just single `UInt64` value) or quite complex (example: state of `AggregateFunctionUniqCombined` is a combination of linear array, hash table and `HyperLogLog` probabilistic data structure).
Aggregate functions are stateful functions. They accumulate passed values into some state, and allow you to get results from that state. They are managed with the `IAggregateFunction` interface. States can be rather simple (the state for `AggregateFunctionCount` is just a single `UInt64` value) or quite complex (the state of `AggregateFunctionUniqCombined` is a combination of a linear array, a hash table and a `HyperLogLog` probabilistic data structure).
To deal with many states while executing high-cardinality `GROUP BY` query, states are allocated in `Arena` - a memory pool, or they could be allocated in any suitable piece of memory. States could have non-trivial constructor and destructor: for example, complex aggregation states could allocate additional memory by themself. It requires some attention to create and destroy states and to proper pass their ownership: to mind, who and when will destroy states.
To deal with multiple states while executing a high-cardinality `GROUP BY` query, states are allocated in `Arena` (a memory pool), or they could be allocated in any suitable piece of memory. States can have a non-trivial constructor and destructor: for example, complex aggregation states can allocate additional memory themselves. This requires some attention to creating and destroying states and properly passing their ownership, to keep track of who and when will destroy states.
Aggregation states could be serialized and deserialized to pass it over network during distributed query execution or to write them on disk where there is not enough RAM. Or even, they could be stored in a table, with `DataTypeAggregateFunction`, to allow incremental aggregation of data.
Aggregation states can be serialized and deserialized to pass over the network during distributed query execution or to write them on disk where there is not enough RAM. They can even be stored in a table with the `DataTypeAggregateFunction` to allow incremental aggregation of data.
> Serialized data format for aggregate functions states is not versioned right now. It is Ok if aggregate states are only stored temporarily. But we have `AggregatingMergeTree` table engine for incremental aggregation, and people already using it in production. That's why we should add support for backward compatibility at next change of serialized format for any aggregate function.
> The serialized data format for aggregate function states is not versioned right now. This is ok if aggregate states are only stored temporarily. But we have the `AggregatingMergeTree` table engine for incremental aggregation, and people are already using it in production. This is why we should add support for backward compatibility when changing the serialized format for any aggregate function in the future.
## Server
Server implements few different interfaces:
- HTTP interface for any foreign clients;
- TCP interface for native clickhouse-client and for cross-server communication during distributed query execution;
- interface for transferring data for replication.
The server implements several different interfaces:
- An HTTP interface for any foreign clients.
- A TCP interface for the native ClickHouse client and for cross-server communication during distributed query execution.
- An interface for transferring data for replication.
Internally it is just simple multithreaded server. No coroutines, fibers, etc. Because server is not intended to process high rate of simple queries, but to process relatively low rate of hard queries, each of them could process vast amount of data for analytics.
Internally, it is just a basic multithreaded server without coroutines, fibers, etc. Since the server is not designed to process a high rate of simple queries but is intended to process a relatively low rate of complex queries, each of them can process a vast amount of data for analytics.
Server initializes `Context` class with necessary environment for query execution: list of available databases, users and access rights, settings, clusters, process list, query log and so on. That environment is used by interpreters.
The server initializes the `Context` class with the necessary environment for query execution: the list of available databases, users and access rights, settings, clusters, the process list, the query log, and so on. This environment is used by interpreters.
We maintain full backward and forward compatibility for server TCP protocol: old clients could talk to new servers and new clients could talk to old servers. But we don't want to maintain it eternally and we are removing support for old versions after about one year.
We maintain full backward and forward compatibility for the server TCP protocol: old clients can talk to new servers and new clients can talk to old servers. But we don't want to maintain it eternally, and we are removing support for old versions after about one year.
> For all external applications, it is recommended to use HTTP interface, because it is simple and easy to use. TCP protocol is more tight to internal data structures: it uses internal format for passing blocks of data and it uses custom framing for compressed data. We don't release C library for that protocol, because it requires linking most of ClickHouse codebase, which is not practical.
> For all external applications, we recommend using the HTTP interface because it is simple and easy to use. The TCP protocol is more tightly linked to internal data structures: it uses an internal format for passing blocks of data and it uses custom framing for compressed data. We haven't released a C library for that protocol because it requires linking most of the ClickHouse codebase, which is not practical.
## Distributed query execution
Servers in a cluster setup are mostly independent. You could create a `Distributed` table on one or all servers in a cluster. `Distributed` table does not store data itself, it only provides a "view" to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it will rewrite that query, choose remote nodes according to load balancing settings and send query to them. `Distributed` table requests for remote servers to process query not up to final result, but to a stage where intermediate results from different servers could be merged. Then it receives intermediate results and merge them. Distributed table is trying to distribute as much work as possible to remote servers, and do not send much intermediate data over network.
Servers in a cluster setup are mostly independent. You can create a `Distributed` table on one or all servers in a cluster. The `Distributed` table does not store data itself it only provides a "view" to all local tables on multiple nodes of a cluster. When you SELECT from a `Distributed` table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The `Distributed` table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers, and does not send much intermediate data over the network.
> Things become more complicated when you have subqueries in IN or JOIN clause, each of them uses `Distributed` table. We have different strategies for execution of that queries.
> Things become more complicated when you have subqueries in IN or JOIN clauses and each of them uses a `Distributed` table. We have different strategies for execution of these queries.
> There is no global query plan for distributed query execution. Each node has its own local query plan for its part of job. We have only simple one-pass distributed query execution: we send queries for remote nodes and then merge its results. But it is not feasible for difficult queries with high cardinality GROUP BYs or with large amount of temporary data for JOIN: in that cases we need to do "reshuffle" data between servers, that requires additional coordination. ClickHouse does not support that kind of query execution and we need to work on it.
> There is no global query plan for distributed query execution. Each node has its own local query plan for its part of the job. We only have simple one-pass distributed query execution: we send queries for remote nodes and then merge the results. But this is not feasible for difficult queries with high cardinality GROUP BYs or with a large amount of temporary data for JOIN: in such cases, we need to "reshuffle" data between servers, which requires additional coordination. ClickHouse does not support that kind of query execution, and we need to work on it.
## Merge Tree
`MergeTree` is a family of storage engines, that support index by primary key. Primary key could be arbitary tuple of columns or expressions. Data in `MergeTree` table is stored in "parts". Each part stores data in primary key order (data is ordered lexicographically by primary key tuple). All columns of table are stored in separate `column.bin` files in that parts. That files consists of compressed blocks. Each block is usually from 64 KB to 1 MB of uncompressed data, depending of average size of value. That blocks consists of column values, placed contiguously one after the other. Column values are in same order (defined by primary key) for each column - so, when you iterate by many columns, you get values for corresponding rows.
`MergeTree` is a family of storage engines that supports indexing by primary key. The primary key can be an arbitary tuple of columns or expressions. Data in a `MergeTree` table is stored in "parts". Each part stores data in the primary key order (data is ordered lexicographically by the primary key tuple). All the table columns are stored in separate `column.bin` files in these parts. The files consist of compressed blocks. Each block is usually from 64 KB to 1 MB of uncompressed data, depending on the average value size. The blocks consist of column values placed contiguously one after the other. Column values are in the same order for each column (the order is defined by the primary key), so when you iterate by many columns, you get values for the corresponding rows.
Primary key itself is "sparse". It could address not each single row, but only some ranges of data. In separate `primary.idx` file we have value of primary key for each N-th row, where N is called `index_granularity`. Usually N = 8192. Also, for each column, we have `column.mrk` files with "marks": offsets to each N-th row in data file. Each mark is a pair: offset in file to begin of compressed block and offset in decompressed block to begin of data. Usually compressed blocks are aligned by marks, and offset in decompressed block is zero. Data for `primary.idx` is always reside in memory and data for `column.mrk` files are cached.
The primary key itself is "sparse". It doesn't address each single row, but only some ranges of data. A separate `primary.idx` file has the value of the primary key for each N-th row, where N is called `index_granularity` (usually, N = 8192). Also, for each column, we have `column.mrk` files with "marks," which are offsets to each N-th row in the data file. Each mark is a pair: the offset in the file to the beginning of the compressed block, and the offset in the decompressed block to the beginning of data. Usually compressed blocks are aligned by marks, and the offset in the decompressed block is zero. Data for `primary.idx` always resides in memory and data for `column.mrk` files is cached.
When we are going to read something from part in `MergeTree`, we look at `primary.idx` data, locate ranges, that could possibly contain requested data, then look at `column.mrk` data and calculate offsets, from where to start read that ranges. Because of sparseness, excess data could be read. ClickHouse is not suitable for high load of simple point queries, because for each key, whole range with `index_granularity` rows must be read, and whole compressed block must be decompressed, for each column. We made index sparse, because we must be able to maintain trillions of rows per single server without noticeably memory consumption for index. Also, because primary key is sparse, it is not unique: it cannot check existence of key in table at INSERT time. You could have many rows with same key in a table.
When we are going to read something from a part in `MergeTree`, we look at `primary.idx` data and locate ranges that could possibly contain requested data, then look at `column.mrk` data and calculate offsets for where to start reading those ranges. Because of sparseness, excess data may be read. ClickHouse is not suitable for a high load of simple point queries, because the entire range with `index_granularity` rows must be read for each key, and the entire compressed block must be decompressed for each column. We made the index sparse because we must be able to maintain trillions of rows per single server without noticeable memory consumption for the index. Also, because the primary key is sparse, it is not unique: it cannot check the existence of the key in the table at INSERT time. You could have many rows with the same key in a table.
When you `INSERT` bunch of data into `MergeTree`, that bunch is sorted by primary key order and forms a new part. To keep number of parts relatively low, there is background threads, that periodically select some parts and merge it to single sorted part. That's why it is called `MergeTree`. Of course, merging leads to "write amplification". All parts are immutable: they are only created and deleted, but not modified. When SELECT is run, it holds a snapshot (set of parts) of table. Also, after merging, we keep old parts for some time to make recovery after failure easier: if we see that some merged part is probably broken, we could replace it with its source parts.
When you `INSERT` a bunch of data into `MergeTree`, that bunch is sorted by primary key order and forms a new part. To keep the number of parts relatively low, there are background threads that periodically select some parts and merge them to a single sorted part. That's why it is called `MergeTree`. Of course, merging leads to "write amplification". All parts are immutable: they are only created and deleted, but not modified. When SELECT is run, it holds a snapshot of the table (a set of parts). After merging, we also keep old parts for some time to make recovery after failure easier, so if we see that some merged part is probably broken, we can replace it with its source parts.
`MergeTree` is not a LSM-Tree because it doesn't contain "memtable" and "log": inserted data are written directly to filesystem. That makes it suitable only to INSERT data in batches, not by one row and not very frequently: about each second is Ok, but thousand times a second is not. We done it that way for simplicity reasons and because we are already inserting data in batches in our applications.
`MergeTree` is not an LSM tree because it doesn't contain "memtable" and "log": inserted data is written directly to the filesystem. This makes it suitable only to INSERT data in batches, not by individual row and not very frequently about once per second is ok, but a thousand times a second is not. We did it this way for simplicity's sake, and because we are already inserting data in batches in our applications.
> MergeTree tables could only have one (primary) index: there is no secondary indices. It would be nice to allow multiple of physical representations under one logical table: for example, to store data in more than one physical order or even to allow representations with pre-aggregated data along with original data.
> MergeTree tables can only have one (primary) index: there aren't any secondary indices. It would be nice to allow multiple physical representations under one logical table, for example, to store data in more than one physical order or even to allow representations with pre-aggregated data along with original data.
> There is MergeTree engines, that doing additional work while background merges. Examples are `CollapsingMergeTree`, `AggregatingMergeTree`. This could be treat as special support for updates. Remind that it is not real updates because user usually have no control over moment of time when background merges will be executed, and data in `MergeTree` table is almost always stored in more than one part - not in completely merged form.
> There are MergeTree engines that are doing additional work during background merges. Examples are `CollapsingMergeTree` and `AggregatingMergeTree`. This could be treated as special support for updates. Keep in mind that these are not real updates because users usually have no control over the time when background merges will be executed, and data in a `MergeTree` table is almost always stored in more than one part, not in completely merged form.
## Replication
Replication in ClickHouse is implemented on per-table basis. You could have some replicated and some non-replicated tables on single server. And you could have tables replicated in different ways: for example, one table with two-factor replication and another with three-factor.
Replication in ClickHouse is implemented on a per-table basis. You could have some replicated and some non-replicated tables on the same server. You could also have tables replicated in different ways, such as one table with two-factor replication and another with three-factor.
Replication is implemented in `ReplicatedMergeTree` storage engine. As a parameter for storage engine, path in `ZooKeeper` is specified. All tables with same path in `ZooKeeper` becomes replicas of each other: they will synchronise its data and maintain consistency. Replicas could be added and removed dynamically: simply by creation or dropping a table.
Replication is implemented in the `ReplicatedMergeTree` storage engine. The path in `ZooKeeper` is specified as a parameter for the storage engine. All tables with the same path in `ZooKeeper` become replicas of each other: they synchronise their data and maintain consistency. Replicas can be added and removed dynamically simply by creating or dropping a table.
Replication works in asynchronous multi-master scheme. You could insert data into any replica which have a session with `ZooKeeper`, and data will be replicated to all other replicas asynchronously. Because ClickHouse doesn't support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts, just-inserted data could be lost in case of one node failure.
Replication uses an asynchronous multi-master scheme. You can insert data into any replica that has a session with `ZooKeeper`, and data is replicated to all other replicas asynchronously. Because ClickHouse doesn't support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts, just-inserted data might be lost if one node fails.
Metadata for replication is stored in ZooKeeper. There is replication log, that consists of what actions to do. Actions are: get part; merge parts; drop partition, etc. All replicas copy replication log to its queue and then execute actions from queue. For example, on insertion, "get part" action is created in log, and every replica will download that part. Merges are coordinated between replicas to get byte-identical result. All parts are merged in same way on all replicas. To achieve that, one replica is elected as leader, and that replica will initiate merges and write "merge parts" actions to log.
Metadata for replication is stored in ZooKeeper. There is a replication log that lists what actions to do. Actions are: get part; merge parts; drop partition, etc. Each replica copies the replication log to its queue and then executes the actions from the queue. For example, on insertion, the "get part" action is created in the log, and every replica downloads that part. Merges are coordinated between replicas to get byte-identical results. All parts are merged in the same way on all replicas. To achieve this, one replica is elected as the leader, and that replica initiates merges and writes "merge parts" actions to the log.
Replication is physical: only compressed parts are transferred between nodes, not queries. To lower network cost (to avoid network amplification), merges are processed on each replica independently in most cases. Large merged parts are sent over network only in case of significant replication lag.
Replication is physical: only compressed parts are transferred between nodes, not queries. To lower the network cost (to avoid network amplification), merges are processed on each replica independently in most cases. Large merged parts are sent over the network only in cases of significant replication lag.
Also each replica keeps their state in ZooKeeper: set of parts and its checksums. When state on local filesystem is diverged from reference state in ZooKeeper, replica will restore its consistency by downloading missing and broken parts from other replicas. When there is some unexpected or broken data in local filesystem, ClickHouse will not remove it, but move to separate directory and forget.
In addition, each replica stores its state in ZooKeeper as the set of parts and its checksums. When the state on the local filesystem diverges from the reference state in ZooKeeper, the replica restores its consistency by downloading missing and broken parts from other replicas. When there is some unexpected or broken data in the local filesystem, ClickHouse does not remove it, but moves it to a separate directory and forgets it.
> ClickHouse cluster consists of independent shards and each shard consists of replicas. Cluster is not elastic: after adding new shard, data is not rebalanced between shards automatically. Instead, cluster load will be uneven. This implementation gives you more control and it is fine for relatively small clusters: tens of nodes. But for hundreds of nodes clusters, that we are using in production, this approach becomes significant drawback. We should implement table engine that will span its data across cluster with dynamically replicated regions, that could be splitted and balanced between cluster automatically.
> The ClickHouse cluster consists of independent shards, and each shard consists of replicas. The cluster is not elastic, so after adding a new shard, data is not rebalanced between shards automatically. Instead, the cluster load will be uneven. This implementation gives you more control, and it is fine for relatively small clusters such as tens of nodes. But for clusters with hundreds of nodes that we are using in production, this approach becomes a significant drawback. We should implement a table engine that will span its data across the cluster with dynamically replicated regions that could be split and balanced between clusters automatically.