In other words, all the values related to a row are stored next to each other. Examples of a row-oriented DBMS are MySQL, Postgres, MS SQL Server, and others.
The data access scenario refers to what queries are made, how often, and in what proportion; how much data is read for each type of query - rows, columns, and bytes; the relationship between reading and updating data; the working size of the data and how locally it is used; whether transactions are used, and how isolated they are; requirements for data replication and logical integrity; requirements for latency and throughput for each type of query, and so on.
The higher the load on the system, the more important it is to customize the system to the scenario, and the more specific this customization becomes. There is no system that is equally well-suited to significantly different scenarios. If a system is adaptable to a wide set of scenarios, under a high load, the system will handle all the scenarios equally poorly, or will work well for just one of the scenarios.
We'll say that the following is true for the OLAP (online analytical processing) scenario:
It is easy to see that the OLAP scenario is very different from other popular scenarios (such as OLTP or Key-Value access). So it doesn't make sense to try to use OLTP or a Key-Value DB for processing analytical queries if you want to get decent performance. For example, if you try to use MongoDB or Elliptics for analytics, you will get very poor performance compared to OLAP databases.
Columnar-oriented databases are better suited to OLAP scenarios (at least 100 times better in processing speed for most queries), for the following reasons:
1.1. For an analytical query, only a small number of table columns need to be read. In a column-oriented database, you can read just the data you need. For example, if you need 5 columns out of 100, you can expect a 20-fold reduction in I/O.
1.2. Since data is read in packets, it is easier to compress. Data in columns is also easier to compress. This further reduces the I/O volume.
1.3. Due to the reduced I/O, more data fits in the system cache.
For example, the query "count the number of records for each advertising platform" requires reading one "advertising platform ID" column, which takes up 1 byte uncompressed. If most of the traffic was not from advertising platforms, you can expect at least 10-fold compression of this column. When using a quick compression algorithm, data decompression is possible at a speed of at least several gigabytes of uncompressed data per second. In other words, this query can be processed at a speed of approximately several billion rows per second on a single server. This speed is actually achieved in practice.
Since executing a query requires processing a large number of rows, it helps to dispatch all operations for entire vectors instead of for separate rows, or to implement the query engine so that there is almost no dispatching cost. If you don't do this, with any half-decent disk subsystem, the query interpreter inevitably stalls the CPU.
It makes sense to both store data in columns and process it, when possible, by columns.
1. A vector engine. All operations are written for vectors, instead of for separate values. This means you don't need to call operations very often, and dispatching costs are negligible. Operation code contains an optimized internal cycle.
2. Code generation. The code generated for the query has all the indirect calls in it.
This is not done in "normal" databases, because it doesn't make sense when running simple queries. However, there are exceptions. For example, MemSQL uses code generation to reduce latency when processing SQL queries. (For comparison, analytical DBMSs require optimization of throughput, not latency.)
Note that for CPU efficiency, the query language must be declarative (SQL or MDX), or at least a vector (J, K). The query should only contain implicit loops, allowing for optimization.
In a true column-oriented DBMS, there isn't any "garbage" stored with the values. For example, constant-length values must be supported, to avoid storing their length "number" next to the values. As an example, a billion UInt8-type values should actually consume around 1 GB uncompressed, or this will strongly affect the CPU use. It is very important to store data compactly (without any "garbage") even when uncompressed, since the speed of decompression (CPU usage) depends mainly on the volume of uncompressed data.
This is worth noting because there are systems that can store values of separate columns separately, but that can't effectively process analytical queries due to their optimization for other scenarios. Example are HBase, BigTable, Cassandra, and HyperTable. In these systems, you will get throughput around a hundred thousand rows per second, but not hundreds of millions of rows per second.
Also note that ClickHouse is a DBMS, not a single database. ClickHouse allows creating tables and databases in runtime, loading data, and running queries without reconfiguring and restarting the server.
Many column-oriented DBMSs (SAP HANA, and Google PowerDrill) can only work in RAM. But even on thousands of servers, the RAM is too small for storing all the pageviews and sessions in Yandex.Metrica.
Almost none of the columnar DBMSs listed above have support for distributed processing.
In ClickHouse, data can reside on different shards. Each shard can be a group of replicas that are used for fault tolerance. The query is processed on all the shards in parallel. This is transparent for the user.
NULLs are not supported. All the functions have different names. However, this is a declarative query language based on SQL that can't be differentiated from SQL in many instances.
ClickHouse supports primary key tables. In order to quickly perform queries on the range of the primary key, the data is sorted incrementally using the merge tree. Due to this, data can continually be added to the table. There is no locking when adding data.
Having a primary key allows, for example, extracting data for specific clients (Metrica counters) for a specific time range, with low latency less than several dozen milliseconds.
This lets us use the system as the back-end for a web interface. Low latency means queries can be processed without delay, while the Yandex.Metrica interface page is loading (in online mode).
1. The system contains aggregate functions for approximated calculation of the number of various values, medians, and quantiles.
2. Supports running a query based on a part (sample) of data and getting an approximated result. In this case, proportionally less data is retrieved from the disk.
3. Supports running an aggregation for a limited number of random keys, instead of for all keys. Under certain conditions for key distribution in the data, this provides a reasonably accurate result while using fewer resources.
Uses asynchronous multimaster replication. After being written to any available replica, data is distributed to all the remaining replicas. The system maintains identical data on different replicas. Data is restored automatically after a failure, or using a "button" for complex cases.
For more information, see the section "Data replication".
We need to get custom reports based on hits and sessions, with custom segments set by the user. Data for the reports is updated in real-time. Queries must be run immediately (in online mode). We must be able to build reports for any time period. Complex aggregates must be calculated, such as the number of unique visitors.
At this time (April 2014), Yandex.Metrica receives approximately 12 billion events (pageviews and mouse clicks) daily. All these events must be stored in order to build custom reports. A single query may require scanning hundreds of millions of rows over a few seconds, or millions of rows in no more than a few hundred milliseconds.
- You must have a pre-defined list of reports the user will need. The user can't make custom reports.
- When aggregating a large quantity of keys, the volume of data is not reduced, and aggregation is useless.
- For a large number of reports, there are too many aggregation variations (combinatorial explosion).
- When aggregating keys with high cardinality (such as URLs), the volume of data is not reduced by much (less than twofold). For this reason, the volume of data with aggregation might grow instead of shrink.
- Users will not view all the reports we calculate for them. A large portion of calculations are useless.
However, with aggregation, a significant part of the work is taken offline and completed relatively calmly. In contrast, online calculations require calculating as fast as possible, since the user is waiting for the result.
Yandex.Metrica has a specialized system for aggregating data called Metrage, which is used for the majority of reports. Starting in 2009, Yandex.Metrica also used a specialized OLAP database for non-aggregated data called OLAPServer, which was previously used for the report builder. OLAPServer worked well for non-aggregated data, but it had many restrictions that did not allow it to be used for all reports as desired. These included the lack of support for data types (only numbers), and the inability to incrementally update data in real-time (it could only be done by rewriting data daily). OLAPServer is not a DBMS, but a specialized DB.
ClickHouse is used for multiple purposes in Yandex.Metrica. Its main task is to build reports in online mode using non-aggregated data. It uses a cluster of 374 servers, which store over 8 trillion rows (more than a quadrillion values) in the database. The volume of compressed data, without counting duplication and replication, is about 800 TB. The volume of uncompressed data (in TSV format) would be approximately 7 PB.
ClickHouse has at least a dozen installations in other Yandex services: in search verticals, Market, Direct, business analytics, mobile development, AdFox, personal services, and others.
At this time (May 2016), there aren't any available open-source and free systems that have all the features listed above. However, these features are absolutely necessary for Yandex.Metrica.
Regarding this aspect, map-reduce is similar to other systems like YAMR, <ahref="http://hadoop.apache.org/">Hadoop</a>, <ahref="https://yandexdataschool.ru/about/conference/program/babenko">YT</a>.
Distributed sorting is not optimal solution for reduce operations, if the result of the operation and all intermediate results, shall they exist, fit in operational memory of a single server, as usually happens in case of online analytical queries.
In this case the optimal way to perform reduce operations is by using a hash-table. A common optimization method for map-reduce tasks is combine operation (partial reduce) which uses hash-tables in memory. This optimization is done by the user manually.
Systems similar to map-reduce enable running any code on the cluster. But for OLAP use-cases declarative query languages are better suited as they allow to carry out investigations faster. For example, for Hadoop there are <ahref="https://hive.apache.org/">Hive</a> and <ahref="https://pig.apache.org/">Pig</a>. There are others: <ahref="http://impala.io/">Cloudera Impala</a>, <ahref="http://shark.cs.berkeley.edu/">Shark (deprecated)</a> and <ahref="http://spark.apache.org/sql/">Spark SQL</a> for <ahref="http://spark.apache.org/">Spark</a>, <ahref="https://prestodb.io/">Presto</a>, <ahref="https://drill.apache.org/">Apache Drill</a>.
However, performance of such tasks is highly sub-optimal compared to the performance of specialized systems and relatively high latency does not allow the use of these systems as a backend for the web interface.
YT allows you to store separate groups of columns. But YT is not a truly columnar storage system, as the system has no fixed length data types (so you can efficiently store a number without "garbage"), and there is no vector engine. Tasks in YT are performed by arbitrary code in streaming mode, so can not be sufficiently optimized (up to hundreds of millions of lines per second per server). In 2014-2016 YT is to develop "dynamic table sorting" functionality using Merge Tree, strongly typed values and SQL-like language support. Dynamically sorted tables are not suited for OLAP tasks, since the data is stored in rows. Query language development in YT is still in incubating phase, which does not allow it to focus on this functionality. YT developers are considering dynamically sorted tables for use in OLTP and Key-Value scenarios.
According to internal testing results, ClickHouse shows the best performance for comparable operating scenarios among systems of its class that were available for testing. This includes the highest throughput for long queries, and the lowest latency on short queries. Testing results are shown <ahref="benchmark.html">on this page</a>.
Throughput can be measured in rows per second or in megabytes per second. If the data is placed in the page cache, a query that is not too complex is processed on modern hardware at a speed of approximately 2-10 GB/s of uncompressed data on a single server (for the simplest cases, the speed may reach 30 GB/s). If data is not placed in the page cache, the speed depends on the disk subsystem and the data compression rate. For example, if the disk subsystem allows reading data at 400 MB/s, and the data compression rate is 3, the speed will be around 1.2 GB/s. To get the speed in rows per second, divide the speed in bytes per second by the total size of the columns used in the query. For example, if 10 bytes of columns are extracted, the speed will be around 100-200 million rows per second.
The processing speed increases almost linearly for distributed processing, but only if the number of rows resulting from aggregation or sorting is not too large.
If a query uses a primary key and does not select too many rows to process (hundreds of thousands), and does not use too many columns, we can expect less than 50 milliseconds of latency (single digits of milliseconds in the best case) if data is placed in the page cache. Otherwise, latency is calculated from the number of seeks. If you use rotating drives, for a system that is not overloaded, the latency is calculated by this formula: seek time (10 ms) * number of columns queried * number of data parts.
Under the same conditions, ClickHouse can handle several hundred queries per second on a single server (up to several thousand in the best case). Since this scenario is not typical for analytical DBMSs, we recommend expecting a maximum of 100 queries per second.
===Performance on data insertion.===
We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed will be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance will be higher in rows per second (on Yandex Banner System data -> 500,000 rows per second, on Graphite data -> 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, and performance will increase linearly.
ClickHouse contains access restriction settings. They are located in the 'users.xml' file (next to 'config.xml').
By default, access is allowed from everywhere for the default user without a password. See 'user/default/networks'. For more information, see the section "Configuration files".
In this case, the log will be printed to the console, which is convenient during development. If the configuration file is in the current directory, you don't need to specify the '--config-file' parameter. By default, it uses './config.xml'.
If you are Yandex employee, you can use Yandex.Metrica test data to explore the system's capabilities. You can find instructions for using the test data <ahref="https://github.yandex-team.ru/Metrika/ClickHouse_private/tree/master/tests">here</a>.
Otherwise, you could use one of available public datasets, described <ahref="https://github.com/yandex/ClickHouse/tree/master/doc/example_datasets">here</a>.
Otherwise, you could ask questions on <ahref="https://stackoverflow.com/">Stackoverflow</a>; discuss in <ahref="https://groups.google.com/group/clickhouse">Google Groups</a>; or send private message to developers to address <ahref="mailto:clickhouse-feedback@yandex-team.com">clickhouse-feedback@yandex-team.com</a>.
The HTTP interface lets you use ClickHouse on any platform from any programming language. We use it for working from Java and Perl, as well as shell scripts. In other departments, the HTTP interface is used from Perl, Python, and Go. The HTTP interface is more limited than the native interface, but it has better compatibility.
By default, clickhouse-server listens for HTTP on port 8123 (this can be changed in the config).
If you make a GET / request without parameters, it returns the string "Ok" (with a line break at the end). You can use this in health-check scripts.
Send the request as a URL 'query' parameter, or as a POST. Or send the beginning of the request in the 'query' parameter, and the rest in the POST (we'll explain later why this is necessary).
When using the GET method, 'readonly' is set. In other words, for queries that modify data, you can only use the POST method. You can send the query itself either in the POST body, or in the URL parameter.
As you can see, curl is not very convenient because spaces have to be URL-escaped. Although wget escapes everything on its own, we don't recommend it because it doesn't work well over HTTP 1.1 when using keep-alive and Transfer-Encoding: chunked.
The POST method of transmitting data is necessary for INSERT queries. In this case, you can write the beginning of the query in the URL parameter, and use POST to pass the data to insert. The data to insert could be, for example, a tab-separated dump from MySQL. In this way, the INSERT query replaces LOAD DATA LOCAL INFILE from MySQL.
You can use compression when transmitting data. The compressed data has a non-standard format, and you will need to use a special compressor program to work with it (%%sudo apt-get install compressor-metrika-yandex%%).
By default, the database that is registered in the server settings is used as the default database. By default, this is the database called 'default'. Alternatively, you can always specify the database using a dot before the table name.
In contrast to the native interface, the HTTP interface does not support the concept of sessions or session settings, does not allow aborting a query (to be exact, it allows this in only a few cases), and does not show the progress of query processing. Parsing and data formatting are performed on the server side, and using the network might be ineffective.
The optional 'query_id' parameter can be passed as the query ID (any string). For more information, see the section "Settings, replace_running_query".
The HTTP interface allows passing external data (external temporary tables) for querying. For more information, see the section "External data for query processing".
The native interface is used in the "clickhouse-client" command-line client for interaction between servers with distributed query processing, and also in C++ programs. We will only cover the command-line client.
--vertical, -E - If specified, use the Vertical format by default to output the result. This is the same as '--format=Vertical'. In this format, each value is printed on a separate line, which is helpful when displaying wide tables.
You can also specify any settings that will be used for processing queries. For example, %%clickhouse-client --max_threads=1%%. For more information, see the section "Settings".
To use batch mode, specify the 'query' parameter, or send data to 'stdin' (it verifies that 'stdin' is not a terminal), or both.
Similar to the HTTP interface, when using the 'query' parameter and sending data to 'stdin', the request is a concatenation of the 'query' parameter, a line break, and the data in 'stdin'. This is convenient for large INSERT queries.
echo -ne "1, 'some text', '2016-08-14 00:00:00'\n2, 'some more text', '2016-08-14 00:00:01'" | clickhouse-client --database=test --query="INSERT INTO test FORMAT CSV";
cat <<_EOF | clickhouse-client --database=test --query="INSERT INTO test FORMAT CSV";
3, 'some text', '2016-08-14 00:00:00'
4, 'some more text', '2016-08-14 00:00:01'
_EOF
cat file.csv | clickhouse-client --database=test --query="INSERT INTO test FORMAT CSV";
By default, you can only process a single query in batch mode. To make multiple queries from a "script," use the 'multiquery' parameter. This works for all queries except INSERT. Query results are output consecutively without additional separators.
Similarly, to process a large number of queries, you can run 'clickhouse-client' for each query. Note that it may take tens of milliseconds to launch the 'clickhouse-client' program.
To run a query, press Enter. The semicolon is not necessary at the end of the query. To enter a multiline query, enter a backslash %%\%% before the line break - after you press Enter, you will be asked to enter the next line of the query.
To run a query, end it with a semicolon and press Enter. If the semicolon was omitted at the end of the entered line, you will be asked to enter the next line of the query.
You can specify %%\G%% instead of or after the semicolon. This indicates using Vertical format. In this format, each value is printed on a separate line, which is convenient for wide tables. This unusual feature was added for compatibility with the MySQL CLI.
The command line is based on 'readline' (and 'history') (or 'libedit', or even nothing, depending on build). In other words, it uses the familiar keyboard shortcuts and keeps a history. The history is written to /.clickhouse-client-history.
By default, the format used is PrettyCompact. You can change the format in the FORMAT clause of the query, or by specifying '\G' at the end of the query, using the '--format' or '--vertical' argument in the command line, or using the client configuration file.
To cancel a lengthy query, press Ctrl+C. However, you will still need to wait a little for the server to abort the request. It is not possible to cancel a query at certain stages. If you don't wait and press Ctrl+C a second time, the client will exit.
The command-line client allows passing external data (external temporary tables) for querying. For more information, see the section "External data for request processing".
There are two types of parsers in the system: a full SQL parser (a recursive descent parser), and a data format parser (a fast stream parser). In all cases except the INSERT query, only the full SQL parser is used.
The %%INSERT INTO t VALUES%% fragment is parsed by the full parser, and the data %%(1, 'Hello, world'), (2, 'abc'), (3, 'def')%% is parsed by the fast stream parser.
Data can have any format. When a query is received, the server calculates no more than 'max_query_size' bytes of the request in RAM (by default, 1 MB), and the rest is stream parsed. This means the system doesn't have problems with large INSERT queries, like MySQL does.
When using the Values format in an INSERT query, it may seem that data is parsed the same as expressions in a SELECT query, but this is not true. The Values format is much more limited.
There may be any number of space symbols between syntactical constructions (including the beginning and end of a query). Space symbols include the space, tab, line break, CR, and form feed.
Keywords (such as SELECT) are not case-sensitive. Everything else (column names, functions, and so on), in contrast to standard SQL, is case-sensitive. Keywords are not reserved (they are just parsed as keywords in the corresponding context).
Identifiers (column names, functions, and data types) can be quoted or non-quoted.
Non-quoted identifiers start with a Latin letter or underscore, and continue with a Latin letter, underscore, or number. In other words, they must match the regex %%^[a-zA-Z_][0-9a-zA-Z_]*$%%. Examples: %%x%%, %%_1%%, %%X_y__Z123_%%.
Quoted identifiers are placed in reversed quotation marks %%`id`%% (the same as in MySQL), and can indicate any set of bytes (non-empty). In addition, symbols (for example, the reverse quotation mark) inside this type of identifier can be backslash-escaped. Escaping rules are the same as for string literals (see below).
We recommend using identifiers that do not need to be quoted.
Only string literals in single quotes are supported. The enclosed characters can be backslash-escaped. The following escape sequences have special meanings: %%\b%%, %%\f%%, %%\r%%, %%\n%%, %%\t%%, %%\0%%, %%\a%%, %%\v%%, <spanclass="inline-example">\x<i>HH</i></span>. In all other cases, escape sequences like <spanclass="inline-example">\<i>c</i></span>, where <i>c</i> is any character, are transformed to <i>c</i>. This means that the sequences %%\'%% and %%\\%% can be used. The value will have the String type.
Minimum set of symbols that must be escaped in string literal is %%'%% and %%\%%.
Actually, these are not literals, but expressions with the array creation operator and the tuple creation operator, respectively. For more information, see the section "Operators2".
An array must consist of at least one item, and a tuple must have at least two items.
Tuples have a special purpose for use in the IN clause of a SELECT query. Tuples can be obtained as the result of a query, but they can't be saved to a database (with the exception of Memory-type tables).
Functions are written like an identifier with a list of arguments (possibly empty) in brackets. In contrast to standard SQL, the brackets are required, even for an empty arguments list. Example: %%now()%%.
There are regular and aggregate functions (see the section "Aggregate functions"). Some aggregate functions can contain two lists of arguments in brackets. Example: %%quantile(0.9)(x)%%. These aggregate functions are called "parametric" functions, and the arguments in the first list are called "parameters". The syntax of aggregate functions without parameters is the same as for regular functions.
Data types and table engines in the CREATE query are written the same way as identifiers or functions. In other words, they may or may not contain an arguments list in brackets. For more information, see the sections "Data types,""Table engines," and "CREATE".
In the SELECT query, expressions can specify synonyms using the AS keyword. Any expression is placed to the left of AS. The identifier name for the synonym is placed to the right of AS. As opposed to standard SQL, synonyms are not only declared on the top level of expressions:
An expression is a function, identifier, literal, application of an operator, expression in brackets, subquery, or asterisk. It can also contain a synonym.
Creates a table named 'name' in the 'db' database or the current database if 'db' is not set, with the structure specified in brackets and the 'engine' engine. The structure of the table is a list of column descriptions. If indexes are supported by the engine, they are indicated as parameters for the table engine.
Creates a table with the same structure as another table. You can specify a different engine for the table. If the engine is not specified, the same engine will be used as for the 'db2.name2' table.
In all cases, if IF NOT EXISTS is specified, the query won't return an error if the table already exists. In this case, the query won't do anything.
If an expression for the default value is not defined, the default values will be set to zeros for numbers, empty strings for strings, empty arrays for arrays, and 0000-00-00 for dates or 0000-00-00 00:00:00 for dates with time. NULLs are not supported.
If the default expression is defined, the column type is optional. If there isn't an explicitly defined type, the default expression type is used. Example: %%EventDate DEFAULT toDate(EventTime)%% - the 'Date' type will be used for the 'EventDate' column.
If the data type and default expression are defined explicitly, this expression will be cast to the specified type using type casting functions. Example: %%Hits UInt32 DEFAULT 0%% means the same thing as %%Hits UInt32 DEFAULT toUInt32(0)%%.
Default expressions may be defined as an arbitrary expression from table constants and columns. When creating and changing the table structure, it checks that expressions don't contain loops. For INSERT, it checks that expressions are resolvable - that all columns they can be calculated from have been passed.
Normal default value. If the INSERT query doesn't specify the corresponding column, it will be filled in by computing the corresponding expression.
In addition, this column is not substituted when using an asterisk in a SELECT query. This is to preserve the invariant that the dump obtained using SELECT * can be inserted back into the table using INSERT without specifying the list of columns.
When using the ALTER query to add new columns, old data for these columns is not written. Instead, when reading old data that does not have values for the new columns, expressions are computed on the fly by default. However, if running the expressions requires different columns that are not indicated in the query, these columns will additionally be read, but only for the blocks of data that need it.
If you add a new column to a table but later change its default expression, the values used for old data will change (for data where values were not stored on the disk). Note that when running background merges, data for columns that are missing in one of the merging parts is written to the merged part.
- Temporary tables disappear when the session ends, including if the connection is lost.
- A temporary table is created with the Memory engine. The other table engines are not supported.
- The DB can't be specified for a temporary table. It is created outside of databases.
- If a temporary table has the same name as another one and a query specifies the table name without specifying the DB, the temporary table will be used.
In most cases, temporary tables are not created manually, but when using external data for a query, or for distributed (GLOBAL) IN. For more information, see the appropriate sections.
Normal views don't store any data, but just perform a read from another table. In other words, a normal view is nothing more than a saved query. When reading from a view, this saved query is used as a subquery in the FROM clause.
When creating a materialized view, you can specify ENGINE - the table engine for storing data. By default, it uses the same engine as for the table that the SELECT query is made from.
A materialized view is arranged as follows: when inserting data to the table specified in SELECT, part of the inserted data is converted by this SELECT query, and the result is inserted in the view.
If you specify POPULATE, the existing table data is inserted in the view when creating it, as if making a CREATE TABLE ... AS SELECT ... query. Otherwise, the query contains only the data inserted in the table after creating the view. We don't recommend using POPULATE, since data inserted in the table during the view creation will not be inserted in it.
The SELECT query can contain DISTINCT, GROUP BY, ORDER BY, LIMIT ... Note that the corresponding conversions are performed independently on each block of inserted data. For example, if GROUP BY is set, data is aggregated during insertion, but only within a single packet of inserted data. The data won't be further aggregated. The exception is when using an ENGINE that independently performs data aggregation, such as SummingMergeTree.
- The query doesn't create data on the disk, but assumes that data is already in the appropriate places, and just adds information about the table to the server.
This query is used when starting the server. The server stores table metadata as files with ATTACH queries, which it simply runs at launch (with the exception of system tables, which are explicitly created on the server).
Deletes information about the table from the server. The server stops knowing about the table's existence. This does not delete the table's data or metadata. On the next server launch, the server will read the metadata and find out about the table again. Similarly, a "detached" table can be re-attached using the ATTACH query (with the exception of system tables, which do not have metadata stored for them).
Renames one or more tables. All tables are renamed under global locking. Renaming tables is a light operation. If you indicated another database after TO, the table will be moved to this database. However, the directories with databases must reside in the same file system (otherwise, an error is returned).
Adds a new column to the table with the specified name, type, and default expression (see the section "Default expressions"). If you specify 'AFTER name_after' (the name of another column), the column is added after the specified one in the list of table columns. Otherwise, the column is added to the end of the table. Note that there is no way to add a column to the beginning of a table. For a chain of actions, 'name_after' can be the name of a column that is added in one of the previous actions.
Adding a column just changes the table structure, without performing any actions with data. The data doesn't appear on the disk after ALTER. If the data is missing for a column when reading from the table, it is filled in with default values (by performing the default expression if there is one, or using zeros or empty strings). The column appears on the disk after merging data parts (see MergeTree).
Changes the 'name' column's type to 'type' and/or the default expression to 'default_expr'. When changing the type, values are converted as if the 'to<i>Type</i>' function were applied to them.
Only the first stage takes time. If there is a failure at this stage, the data is not changed.
If there is a failure during one of the successive stages, data can be restored manually. The exception is if the old files were deleted from the file system but the data for the new files did not get written to the disk and was lost.
The ALTER query lets you create and delete separate elements (columns) in nested data structures, but not whole nested data structures. To add a nested data structure, you can add columns with a name like 'name.nested_name' and the type 'Array(<i>T</i>)'. A nested data structure is equivalent to multiple array columns with a name that has the same prefix before the dot.
If the ALTER query is not sufficient for making the table changes you need, you can create a new table, copy the data to it using the INSERT SELECT query, then switch the tables using the RENAME query and delete the old table.
The ALTER query blocks all reads and writes for the table. In other words, if a long SELECT is running at the time of the ALTER query, the ALTER query will wait for the SELECT to complete. At the same time, all new queries to the same table will wait while this ALTER is running.
For tables that don't store data themselves (Merge and Distributed), ALTER just changes the table structure, and does not change the structure of subordinate tables. For example, when running ALTER for a Distributed table, you will also need to run ALTER for the tables on all remote servers.
The ALTER query for changing columns is replicated. The instructions are saved in ZooKeeper, then each replica applies them. All ALTER queries are run in the same order. The query waits for the appropriate actions to be completed on the other replicas. However, a query to change columns in a replicated table can be interrupted, and all actions will be performed asynchronously.
A partition in a table is data for a single calendar month. This is determined by the values of the date key specified in the table engine parameters. Each month's data is stored separately in order to simplify manipulations with this data.
active - Only count active parts. Inactive parts are, for example, source parts remaining after merging to a larger part - these parts are deleted approximately 10 minutes after merging.
On an operating server, you can't manually change the set of parts or their data on the file system, since the server won't know about it. For non-replicated tables, you can do this when the server is stopped, but we don't recommended it. For replicated tables, the set of parts can't be changed in any case.
The 'detached' directory contains parts that are not used by the server - detached from the table using the ALTER ... DETACH query. Parts that are damaged are also moved to this directory, instead of deleting them. You can add, delete, or modify the data in the 'detached' directory at any time - the server won't know about this until you make the ALTER TABLE ... ATTACH query.
After the query is executed, you can do whatever you want with the data in the 'detached' directory — delete it from the file system, or just leave it.
The query is replicated - data will be moved to the 'detached' directory and forgotten on all replicas. The query can only be sent to a leader replica. To find out if a replica is a leader, perform SELECT to the 'system.replicas' system table. Alternatively, it is easier to make a query on all replicas, and all except one will throw an exception.
Similar to the DETACH operation. Deletes data from the table. Data parts will be tagged as inactive and will be completely deleted in approximately 10 minutes. The query is replicated - data will be deleted on all replicas.
The query is replicated. Each replica checks whether there is data in the 'detached' directory. If there is data, it checks the integrity, verifies that it matches the data on the server that initiated the query, and then adds it if everything is correct. If not, it downloads data from the query requestor replica, or from another replica where the data has already been added.
Creates a local backup of one or multiple partitions. The name can be the full name of the partition (for example, 201403), or its prefix (for example, 2014) - then the backup will be created for all the corresponding partitions.
The query does the following: for a data snapshot at the time of execution, it creates hardlinks to table data in the directory /opt/clickhouse/shadow/N/...
The backup is created almost instantly (but first it waits for current queries to the corresponding table to finish running). At first, the backup doesn't take any space on the disk. As the system works, the backup can take disk space, as data is modified. If the backup is made for old enough data, it won't take space on the disk.
After creating the backup, data from /opt/clickhouse/shadow/ can be copied to the remote server and then deleted on the local server. The entire backup process is performed without stopping the server.
As an alternative, you can manually copy data from the /opt/clickhouse/data/database/table directory. But if you do this while the server is running, race conditions are possible when copying directories with files being added or changed, and the backup may be inconsistent. You can do this if the server isn't running - then the resulting data will be the same as after the ALTER TABLE t FREEZE PARTITION query.
ALTER TABLE ... FREEZE PARTITION only copies data, not table metadata. To make a backup of table metadata, copy the file /opt/clickhouse/metadata/database/table.sql
Replication provides protection from device failures. If all data disappeared on one of your replicas, follow the instructions in the "Restoration after failure" section to restore it.
Backups protect against human error (accidentally deleting data, deleting the wrong data or in the wrong cluster, or corrupting data). For high-volume databases, it can be difficult to copy backups to remote servers. In such cases, to protect from human error, you can keep a backup on the same server (it will reside in /opt/clickhouse/shadow/).
It downloads the specified partition from the shard that has its ZooKeeper path specified in the FROM clause, then puts it in the 'detached' directory for the specified table.
The path to ZooKeeper is specified in the FROM clause. For example, %%/clickhouse/tables/01-01/visits%%.
Before downloading, the system checks that the partition exists and the table structure matches. The most appropriate replica is selected automatically from the healthy replicas.
The ALTER ... FETCH PARTITION query is not replicated. The partition will be downloaded to the 'detached' directory only on the local server. Note that if after this you use the ALTER TABLE ... ATTACH query to add data to the table, the data will be added on all replicas (on one of the replicas it will be added from the 'detached' directory, and on the rest it will be loaded from neighboring replicas).
For non-replicatable tables, all ALTER queries are performed synchronously. For replicatable tables, the query just adds instructions for the appropriate actions to ZooKeeper, and the actions themselves are performed as soon as possible. However, the query can wait for these actions to be completed on all the replicas.
<b>user</b> is the user who made the query. Keep in mind that for distributed processing, queries are sent to remote servers under the 'default' user. SHOW PROCESSLIST shows the username for a specific query, not for a query that this query initiated.
<b>address</b> is the name of the host that the query was sent from. For distributed processing, on remote servers, this is the name of the query requestor host. To track where a distributed query was originally made from, look at SHOW PROCESSLIST on the query requestor server.
<b>rows_read</b>, <b>bytes_read</b> - How many rows and bytes of uncompressed data were read when processing the query. For distributed processing, data is totaled from all the remote servers. This is the data used for restrictions and quotas.
<b>query_id</b> - The query identifier. Non-empty only if it was explicitly defined by the user. For distributed processing, the query ID is not passed to remote servers.
Returns a single UInt8-type column, which contains the single value 0 if the table or database doesn't exist, or 1 if the table exists in the specified database.
Lets you set the 'param' setting to 'value'. You can also make all the settings from the specified settings profile in a single query. To do this, specify 'profile' as the setting name. For more information, see the section "Settings". The setting is made for the session, or for the server (globally) if GLOBAL is specified.
When making a global setting, the setting is not applied to sessions already running, including the current session. It will only be used for new sessions.
Settings made using SET GLOBAL have a lower priority compared with settings made in the config file in the user profile. In other words, user settings can't be overridden by SET GLOBAL.
For replicated tables, the OPTIMIZE query is only applied to non-replicated data (this data exists only after converting non-replicated tables to replicated ones). If there isn't any, the query doesn't do anything.
The data itself comes after 'format', after all space symbols up to the first line break if there is one and including it, or after all space symbols if there isn't a line break. We recommend writing data starting from the next line (this is important if the data starts with space characters).
For more information about data formats, see the section "Formats". The "Interfaces" section describes how to insert data separately from the query when using the command-line client or the HTTP interface.
The query may optionally specify a list of columns for insertion. In this case, the default values are written to the other columns.
Default values are calculated from DEFAULT expressions specified in table definitions, or, if the DEFAULT is not explicitly defined, zeros and empty strings are used. If the 'strict_insert_default' setting is set to 1, all the columns that do not have explicit DEFAULTS must be specified in the query.
If the query omits the DISTINCT, GROUP BY, and ORDER BY clauses and the IN and JOIN subqueries, the query will be completely stream processed, using O(1) amount of RAM.
Otherwise, the query may consume too much RAM, if appropriate restrictions are not defined (max_memory_usage, max_rows_to_group_by, max_rows_to_sort, max_rows_in_distinct, max_bytes_in_distinct, max_rows_in_set, max_bytes_in_set, max_rows_in_join, max_bytes_in_join, max_bytes_before_external_sort). For more information, see the section "Settings". It is possible to use external sorting (saving temporary tables to a disk). The system does not have external aggregation or merge join.
The FROM clause specifies the table to read data from, or a subquery, or a table function; ARRAY JOIN and the regular JOIN may also be included (see below).
Instead of a table, the SELECT subquery may be specified in brackets. In this case, the subquery processing pipeline will be built into the processing pipeline of an external query.
In contrast to standard SQL, a synonym does not need to be specified after a subquery. For compatibility, it is possible to write 'AS name' after a subquery, but the specified name isn't used anywhere.
To execute a query, all the columns listed in the query are extracted from the appropriate table. Any columns not needed for the external query are thrown out of the subqueries.
If a query does not list any columns (for example, SELECT count() FROM t), some column is extracted from the table anyway (the smallest one is preferred), in order to calculate the number of rows.
The FINAL modifier can be used only for a SELECT from a CollapsingMergeTree table. When you specify FINAL, data is selected fully "collapsed". Keep in mind that using FINAL leads to a selection that includes columns related to the primary key, in addition to the columns specified in the SELECT. Additionally, the query will be executed in a single stream, and data will be merged during query execution. This means that when using FINAL, the query is processed more slowly. In most cases, you should avoid using FINAL. For more information, see the section "CollapsingMergeTree engine".
Approximated query processing is only supported by MergeTree* type tables, and only if the sampling expression was specified during table creation (see the section "MergeTree engine").
SAMPLE has the format %%SAMPLE k%%, where 'k' is a decimal number from 0 to 1, or %%SAMPLE n%%, where 'n' is a sufficiently large integer.
In the first case, the query will be executed on 'k' percent of data. For example, %%SAMPLE 0.1%% runs the query on 10% of data.
In the second case, the query will be executed on a sample of no more than 'n' rows. For example, %%SAMPLE 10000000%% runs the query on a maximum of 10,000,000 rows.
In this example, the query is executed on a sample from 0.1 (10%) of data. Values of aggregate functions are not corrected automatically, so to get an approximate result, the value 'count()' is manually multiplied by 10.
When using something like %%SAMPLE 10000000%%, there isn't any information about which relative percent of data was processed or what the aggregate functions should be multiplied by, so this method of writing is not always appropriate to the situation.
A sample with a relative coefficient is "consistent": if we look at all possible data that could be in the table, a sample (when using a single sampling expression specified during table creation) with the same coefficient always selects the same subset of possible data. In other words, a sample from different tables on different servers at different times is made the same way.
For example, a sample of user IDs takes rows with the same subset of all the possible user IDs from different tables. This allows using the sample in subqueries in the IN clause, as well as for manually correlating results of different queries with samples.
Allows executing JOIN with an array or nested data structure. The intent is similar to the 'arrayJoin' function, but its functionality is broader.
An alias can be specified for an array in the ARRAY JOIN clause. In this case, an array item can be accessed by this alias, but the array itself by the original name. Example:
Multiple arrays of the same size can be comma-separated in the ARRAY JOIN clause. In this case, JOIN is performed with them simultaneously (the direct sum, not the direct product).
When specifying names of nested data structures in ARRAY JOIN, the meaning is the same as ARRAY JOIN with all the array elements that it consists of. Example:
The corresponding conversion can be performed before the WHERE/PREWHERE clause (if its result is needed in this clause), or after completing WHERE/PREWHERE (to reduce the volume of calculations).
Performs joins with data from the subquery. At the beginning of query execution, the subquery specified after JOIN is run, and its result is saved in memory. Then it is read from the "left" table specified in the FROM clause, and while it is being read, for each of the read rows from the "left" table, rows are selected from the subquery results table (the "right" table) that meet the condition for matching the values of the columns specified in USING.
The table name can be specified instead of a subquery. This is equivalent to the 'SELECT * FROM table' subquery, except in a special case when the table has the Join engine - an array prepared for joining.
If LEFT is specified, any rows in the left table that don't have matching rows in the right table will be assigned the default value - zeros or empty rows. LEFT OUTER may be written instead of LEFT; the word OUTER does not affect anything.
Using ALL corresponds to the normal JOIN semantic from standard SQL.
Using ANY is optimal. If the right table has only one matching row, the results of ANY and ALL are the same. You must specify either ANY or ALL (neither of them is selected by default).
When using a normal %%JOIN%%, the query is sent to remote servers. Subqueries are run on each of them in order to make the right table, and the join is performed with this table. In other words, the right table is formed on each server separately.
When using %%GLOBAL ... JOIN%%, first the requestor server runs a subquery to calculate the right table. This temporary table is passed to each remote server, and queries are run on them using the temporary data that was transmitted.
When running JOINs, there is no optimization of the order of execution in relation to other stages of the query. The join (a search in the right table) is run before filtering in WHERE and before aggregation. In order to explicitly set the order of execution, we recommend running a JOIN subquery with a subquery.
Subqueries don't allow you to set names or use them for referencing a column from a specific subquery.
The columns specified in USING must have the same names in both subqueries, and the other columns must be named differently. You can use aliases to change the names of columns in subqueries (the example uses the aliases 'hits' and 'visits').
The USING clause specifies one or more columns to join, which establishes the equality of these columns. The list of columns is set without brackets. More complex join conditions are not supported.
Each time a query is run with the same JOIN, the subquery is run again - the result is not cached. To avoid this, use the special 'Join' table engine, which is a prepared array for joining that is always in RAM. For more information, see the section "Table engines, Join".
In some cases, it is more efficient to use IN instead of JOIN. Among the various types of JOINs, the most efficient is ANY LEFT JOIN, then ANY INNER JOIN. The least efficient are ALL LEFT JOIN and ALL INNER JOIN.
If you need a JOIN for joining with dimension tables (these are relatively small tables that contain dimension properties, such as names for advertising campaigns), a JOIN might not be very convenient due to the bulky syntax and the fact that the right table is re-accessed for every query. For such cases, there is an "external dictionaries" feature that you should use instead of JOIN. For more information, see the section "External dictionaries".
This clause has the same meaning as the WHERE clause. The difference is in which data is read from the table. When using PREWHERE, first only the columns necessary for executing PREWHERE are read. Then the other columns are read that are needed for running the query, but only those blocks where the PREWHERE expression is true.
It makes sense to use PREWHERE if there are filtration conditions that are not suitable for indexes that are used by a minority of the columns in the query, but that provide strong data filtration. This reduces the volume of data to read.
Keep in mind that it does not make much sense for PREWHERE to only specify those columns that have an index, because when using an index, only the data blocks that match the index are read.
If the 'optimize_move_to_prewhere' setting is set to 1 and PREWHERE is omitted, the system uses heuristics to automatically move parts of expressions from WHERE to PREWHERE.
If there is a GROUP BY clause, it must contain a list of expressions. Each expression will be referred to here as a "key".
All the expressions in the SELECT, HAVING, and ORDER BY clauses must be calculated from keys or from aggregate functions. In other words, each column selected from the table must be used either in keys or inside aggregate functions.
If a query contains only table columns inside aggregate functions, the GROUP BY clause can be omitted, and aggregation by an empty set of keys is assumed.
However, in contrast to standard SQL, if the table doesn't have any rows (either there aren't any at all, or there aren't any after using WHERE to filter), an empty result is returned, and not the result from one of the rows containing the initial values of aggregate functions.
As opposed to MySQL (and conforming to standard SQL), you can't get some value of some column that is not in a key or aggregate function (except constant expressions). To work around this, you can use the 'any' aggregate function (get the first encountered value) or 'min/max'.
A constant can't be specified as arguments for aggregate functions. Example: sum(1). Instead of this, you can get rid of the constant. Example: count().
If the WITH TOTALS modifier is specified, another row will be calculated. This row will have key columns containing default values (zeros or empty lines), and columns of aggregate functions with the values calculated across all the rows (the "total" values).
In JSON* formats, this row is output as a separate 'totals' field. In TabSeparated formats, the row comes after the main result, preceded by an empty row (after the other data). In Pretty formats, the row is output as a separate table after the main result.
WITH TOTALS can be run in different ways when HAVING is present. The behavior depends on the 'totals_mode' setting.
By default, totals_mode = '<b>before_having</b>'. In this case, 'totals' is calculated across all rows, including the ones that don't pass through HAVING and 'max_rows_to_group_by'.
The other alternatives include only the rows that pass through HAVING in 'totals', and behave differently with the setting 'max_rows_to_group_by' and 'group_by_overflow_mode = 'any''.
<b>after_having_exclusive</b> - Don't include rows that didn't pass through 'max_rows_to_group_by'. In other words, 'totals' will have less than or the same number of rows as it would if 'max_rows_to_group_by' were omitted.
<b>after_having_inclusive</b> - Include all the rows that didn't pass through 'max_rows_to_group_by' in 'totals'. In other words, 'totals' will have more than or the same number of rows as it would if 'max_rows_to_group_by' were omitted.
<b>after_having_auto</b> - Count the number of rows that passed through HAVING. If it is more than a certain amount (by default, 50%), include all the rows that didn't pass through 'max_rows_to_group_by' in 'totals'. Otherwise, do not include them.
If 'max_rows_to_group_by' and 'group_by_overflow_mode = 'any'' are not used, all variations of 'after_having' are the same, and you can use any of them (for example, 'after_having_auto').
Allows filtering the result received after GROUP BY, similar to the WHERE clause.
WHERE and HAVING differ in that WHERE is performed before aggregation (GROUP BY), while HAVING is performed after it. If aggregation is not performed, HAVING can't be used.
The ORDER BY clause contains a list of expressions, which can each be assigned DESC or ASC (the sorting direction). If the direction is not specified, ASC is assumed. ASC is sorted in ascending order, and DESC in descending order. The sorting direction applies to a single expression, not to the entire list. Example: %%ORDER BY Visits DESC, SearchPhrase%%
For sorting by String values, you can specify collation (comparison). Example: %%ORDER BY SearchPhrase COLLATE 'tr'%% - for sorting by keyword in ascending order, using the Turkish alphabet, case insensitive, assuming that strings are UTF-8 encoded. COLLATE can be specified or not for each expression in ORDER BY independently. If ASC or DESC is specified, COLLATE is specified after it. When using COLLATE, sorting is always case-insensitive.
We only recommend using COLLATE for final sorting of a small number of rows, since sorting with COLLATE is less efficient than normal sorting by bytes.
Rows that have identical values for the list of sorting expressions are output in an arbitrary order, which can also be nondeterministic (different each time).
If the ORDER BY clause is omitted, the order of the rows is also undefined, and may be nondeterministic as well.
When floating point numbers are sorted, NaNs are separate from the other values. Regardless of the sorting order, NaNs come at the end. In other words, for ascending sorting they are placed as if they are larger than all the other numbers, while for descending sorting they are placed as if they are smaller than the rest.
Less RAM is used if a small enough LIMIT is specified in addition to ORDER BY. Otherwise, the amount of memory spent is proportional to the volume of data for sorting. For distributed query processing, if GROUP BY is omitted, sorting is partially done on remote servers, and the results are merged on the requestor server. This means that for distributed sorting, the volume of data to sort can be greater than the amount of memory on a single server.
If there is not enough RAM, it is possible to perform sorting in external memory (creating temporary files on a disk). Use the setting %%max_bytes_before_external_sort%% for this purpose. If it is set to 0 (the default), external sorting is disabled. If it is enabled, when the volume of data to sort reaches the specified number of bytes, the collected data is sorted and dumped into a temporary file. After all data is read, all the sorted files are merged and the results are output. Files are written to the /opt/clickhouse/tmp/ directory in the config (by default, but you can use the 'tmp_path' parameter to change this setting).
Running a query may use more memory than 'max_bytes_before_external_sort'. For this reason, this setting must have a value significantly smaller than 'max_memory_usage'. As an example, if your server has 128 GB of RAM and you need to run a single query, set 'max_memory_usage' to 100 GB, and 'max_bytes_before_external_sort' to 80 GB.
The expressions specified in the SELECT clause are analyzed after the calculations for all the clauses listed above are completed.
More specifically, expressions are analyzed that are above the aggregate functions, if there are any aggregate functions. The aggregate functions and everything below them are calculated during aggregation (GROUP BY). These expressions work as if they are applied to separate rows in the result.
The result will be the same as if GROUP BY were specified across all the fields specified in SELECT without aggregate functions. But there are several differences from GROUP BY:
- DISTINCT can be applied together with GROUP BY.
- When ORDER BY is omitted and LIMIT is defined, the query stops running immediately after the required number of different rows has been read. In this case, using DISTINCT is much more optimal.
Only UNION ALL is supported. The regular UNION (UNION DISTINCT) is not supported. If you need UNION DISTINCT, you can write SELECT DISTINCT from a subquery containing UNION ALL.
The structure of results (the number and type of columns) must match for the queries, but the column names can differ. In this case, the column names for the final result will be taken from the first query.
Queries that are parts of UNION ALL can't be enclosed in brackets. ORDER BY and LIMIT are applied to separate queries, not to the final result. If you need to apply a conversion to the final result, you can put all the queries with UNION ALL in a subquery in the FROM clause.
If the FORMAT clause is omitted, the default format is used, which depends on both the settings and the interface used for accessing the DB. For the HTTP interface and the command-line client in batch mode, the default format is TabSeparated. For the command-line client in interactive mode, the default format is PrettyCompact (it has attractive and compact tables).
When using the command-line client, data is passed to the client in an internal efficient format. The client independently interprets the FORMAT clause of the query and formats the data itself (thus relieving the network and the server from the load).
If the left side is a single column that is in the index, and the right side is a set of constants, the system uses the index for processing the query.
Don't list too many values explicitly (i.e. millions). If a data set is large, put it in a temporary table (for example, see the section "External data for query processing"), then use a subquery.
The right side of the operator can be a set of constant expressions, a set of tuples with constant expressions (shown in the examples above), or the name of a database table or SELECT subquery in brackets.
If the right side of the operator is the name of a table (for example, %%UserID IN users%%), this is equivalent to the subquery %%UserID IN (SELECT * FROM users)%%. Use this when working with external data that is sent along with the query. For example, the query can be sent together with a set of user IDs loaded to the 'users' temporary table, which should be filtered.
If the right side of the operator is a table name that has the Set engine (a prepared data set that is always in RAM), the data set will not be created over again for each query.
Exception: If there is an array to the left of IN, it checks that at least one array item belongs to the data set.
For example, %%[1, 2, 3] IN (3, 4, 5)%% returns 1. This is somewhat illogical, but it is convenient for implementing certain Yandex.Metrica functionality.
If there is an array to the left of NOT IN, it checks that at least one array item does not belong to the data set.
For example, %%[1, 2, 3] NOT IN (3, 4, 5)%% returns 1. This is completely illogical, so don't rely on this behavior. We recommend using higher-order functions instead. Example: <spanclass="inline-example">arrayAll(x -> x IN (3, 4, 5), [1, 2, 3])</span> returns 0 - it checks whether all the array items belong to the data set.
There are two versions of INs with subqueries (and for JOINs): the regular %%IN%% / %%JOIN%%, and %%GLOBAL IN%% / %%GLOBAL JOIN%%. They differ in how they are run for distributed query processing.
When using %%GLOBAL IN%% / %%GLOBAL JOIN%%, first all the subqueries for %%GLOBAL IN%% / %%GLOBAL JOIN%% are run, and the results are collected in temporary tables. Then the temporary tables are sent to each remote server, where the queries are run using this temporary data.
Let's look at some examples. Assume that each server in the cluster has a normal <b>local_table</b>. Each server also has a <b>distributed_table</b> table with the <b>Distributed</b> type, which looks at all the servers in the cluster.
and run on each of them in parallel, until it reaches the stage where intermediate results can be combined. Then the intermediate results will be returned to the requestor server and merged on it, and the final result will be sent to the client.
This query will be sent to all the remote servers as
%%SELECT uniq(UserID) FROM local_table WHERE CounterID = 101500 AND UserID IN (SELECT UserID FROM local_table WHERE CounterID = 34)%%
In other words, the data set in the %%IN%% clause will be collected on each server independently, only across the data that is stored locally on each of the servers.
This will work correctly and optimally if you are prepared for this case and have spread data across the cluster servers such that the data for a single UserID resides entirely on a single server. In this case, all the necessary data will be available locally on each server. Otherwise, the result will be inaccurate. We refer to this variation of the query as "local IN".
To correct how the query works when data is spread randomly across the cluster servers, you could specify <b>distributed_table</b> inside a subquery. The query would look like this:
%%SELECT uniq(UserID) FROM distributed_table WHERE CounterID = 101500 AND UserID IN (SELECT UserID FROM distributed_table WHERE CounterID = 34)%%
Each of the remote servers will start running the subquery. Since the subquery uses a distributed table, each remote server will re-send the subquery to every remote server, as
%%SELECT UserID FROM local_table WHERE CounterID = 34%%
For example, if you have a cluster of 100 servers, executing the entire query will require 10,000 elementary requests, which is generally considered unacceptable.
In such cases, you should always use %%GLOBAL IN%% instead of %%IN%%. Let's look at how it works for the query
%%SELECT uniq(UserID) FROM distributed_table WHERE CounterID = 101500 AND UserID GLOBAL IN (SELECT UserID FROM distributed_table WHERE CounterID = 34)%%
%%SELECT uniq(UserID) FROM local_table WHERE CounterID = 101500 AND UserID GLOBAL IN _data1%%
and the temporary table '_data1' will be sent to every remote server together with the query (the name of the temporary table is implementation-defined).
1. When creating a temporary table, data is not made unique. To reduce the volume of data transmitted over the network, specify %%DISTINCT%% in the subquery. (You don't need to do this for a normal IN.)
2. The temporary table will be sent to all the remote servers. Transmission does not account for network topology. For example, if 10 remote servers reside in a datacenter that is very remote in relation to the requestor server, the data will be sent 10 times over the channel to the remote datacenter. Try to avoid large data sets when using %%GLOBAL IN%%.
5. If you need to use %%GLOBAL IN%% often, plan the location of the ClickHouse cluster so that in each datacenter, there will be at least one replica of each shard, and there is a fast network between them - for possibility to process query with transferring data only inside datacenter.
It also makes sense to specify a local table in the %%GLOBAL IN%% clause, in case this local table is only available on the requestor server and you want to use data from it on remote servers.
In addition to results, you can also get minimum and maximum values for the results columns. To do this, set the 'extremes' setting to '1'. Minimums and maximums are calculated for numeric types, dates, and dates with times. For other columns, the default values are output.
An extra two rows are calculated - the minimums and maximums, respectively. These extra two rows are output in JSON*, TabSeparated*, and Pretty* formats, separate from the other rows. They are not output for other formats.
In JSON* formats, the extreme values are output in a separate 'extremes' field. In TabSeparated formats, the row comes after the main result, and after 'totals' if present. It is preceded by an empty row (after the other data). In Pretty formats, the row is output as a separate table after the main result, and after 'totals' if present.
Extreme values are calculated for rows that have passed through LIMIT. However, when using 'LIMIT offset, size', the rows before 'offset' are included in 'extremes'. In stream requests, the result may also include a small number of rows that passed through LIMIT.
You can put an asterisk in any part of a query instead of an expression. When the query is analyzed, the asterisk is expanded to a list of all table columns (excluding the MATERIALIZED and ALIAS columns). There are only a few cases when using an asterisk is justified:
ClickHouse allows sending a server the data that is needed for processing a query, together with a SELECT query. This data is put in a temporary table (see the section "Temporary tables") and can be used in the query (for example, in IN operators).
For example, if you have a text file with important user identifiers, you can upload it to the server along with a query that uses filtration by this list.
If you need to run more than one query with a large volume of external data, don't use this feature. It is better to upload the data to the DB ahead of time.
The files specified in %%file%% will be parsed by the format specified in %%format%%, using the data types specified in %%types%% or %%structure%%. The table will be uploaded to the server and accessible there as a temporary table with the name %%name%%.
%%cat /etc/passwd | sed 's/:/\t/g' | clickhouse-client --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String'
When using the HTTP interface, external data is passed in the multipart/form-data format. Each table is transmitted as a separate file. The table name is taken from the file name. The 'query_string' passes the parameters '<i>name</i>_format', '<i>name</i>_types', and '<i>name</i>_structure', where <i>name</i> is the name of the table that these parameters correspond to. The meaning of the parameters is the same as when using the command-line client.
- When reading data, the engine is only required to extract the necessary set of columns. However, in some cases, the query may be partially processed inside the table engine.
Each column is stored in a separate compressed file.
When writing, data is appended to the end of files.
Concurrent data access is not restricted in any way:
- If you are simultaneously reading from a table and writing to it in a different query, the read operation will complete with an error.
- If you are writing to a table in multiple queries simultaneously, the data will be broken.
The typical way to use this table is write-once: first just write the data one time, then read it as many times as needed.
Queries are executed in a single stream. In other words, this engine is intended for relatively small tables (recommended up to 1,000,000 rows).
It makes sense to use this table engine if you have many small tables, since it is simpler than the Log engine (fewer files need to be opened).
The situation when you have a large number of small tables guarantees poor productivity, but may already be used when working with another DBMS, and you may find it easier to switch to using TinyLog types of tables.
Log differs from TinyLog in that a small file of "marks" resides with the column files. These marks are written on every data block and contain offsets - where to start reading the file in order to skip the specified number of rows. This makes it possible to read table data in multiple threads. For concurrent data access, the read operations can be performed simultaneously, while write operations block reads and each other.
The Log engine does not support indexes. Similarly, if writing to a table failed, the table is broken, and reading from it returns an error. The Log engine is appropriate for temporary data, write-once tables, and for testing or demonstration purposes.
The Memory engine stores data in RAM, in uncompressed form. Data is stored in exactly the same form as it is received when read. In other words, reading from this table is completely free.
Concurrent data access is synchronized. Locks are short: read and write operations don't block each other.
Indexes are not supported. Reading is parallelized.
Maximal productivity (over 10 GB/sec) is reached on simple queries, because there is no reading from the disk, decompressing, or deserializing data. (We should note that in many cases, the productivity of the MergeTree engine is almost as high.)
When restarting a server, data disappears from the table and the table becomes empty.
Normally, using this table engine is not justified. However, it can be used for tests, and for tasks where maximum speed is required on a relatively small number of rows (up to approximately 100,000,000).
The Memory engine is used by the system for temporary tables with external query data (see the section "External data for processing a query"), and for implementing GLOBAL IN (see the section "IN operators").
Reading is automatically parallelized. Writing to a table is not supported. When reading, the indexes of tables that are actually being read are used, if they exist.
Regular expressions are re2 (similar to PCRE), case-sensitive. See the notes about escaping symbols in regular expressions in the "match" section.
Virtual columns are columns that are provided by the table engine, regardless of the table definition. In other words, these columns are not specified in CREATE TABLE, but they are accessible for SELECT.
A Merge table contains the virtual column <b>_table</b> of the String type. (If the table already has a '_table' column, the virtual column is named '_table1', and if it already has '_table1', it is named '_table2', and so on.) It contains the name of the table that data was read from.
If the WHERE or PREWHERE clause contains conditions for the '_table' column that do not depend on other table columns (as one of the conjunction elements, or as an entire expression), these conditions are used as an index. The conditions are performed on a data set of table names to read data from, and the read operation will be performed from only those tables that the condition was triggered on.
Reading is automatically parallelized. During a read, the table indexes on remote servers are used, if there are any.
The Distributed engine accepts parameters: the cluster name in the server's config file, the name of a remote database, the name of a remote table, and (optionally) a sharding key.
For example, for a query with GROUP BY, data will be aggregated on remote servers, and the intermediate states of aggregate functions will be sent to the requestor server. Then data will be further aggregated.
Here a cluster is defined with the name 'logs' that consists of two shards, each of which contains two replicas. Shards refer to the servers that contain different parts of the data (in order to read all the data, you must access all the shards).
Replicas are duplicating servers (in order to read all the data, you can access the data on any one of the replicas).
<b>host</b> - address of remote server. May be specified as domain name or IPv4 or IPv6 address. If you specify domain, server will perform DNS lookup at startup, and result will be cached till server shutdown. If DNS request is failed, server won't start. If you are changing DNS records, restart the server for new records to take effect.
<b>port</b> - TCP-port for interserver communication (tcp_port in configuration file, usually 9000). Don't get confused with http_port.
<b>user</b> - user name to connect to remote server. By default user is 'default'. This user must have access rights to connect to remote server. Access rights are managed in users.xml configuration file. For additional info, consider "Access rights" section.
When specifying replicas, one of the available replicas will be selected for each of the shards when reading. You can configure the algorithm for load balancing (the preference for which replica to access) - see the 'load_balancing' setting.
If the connection with the server is not established, there will be an attempt to connect with a short timeout. If the connection failed, the next replica will be selected, and so on for all the replicas. If the connection attempt failed for all the replicas, the attempt will be repeated the same way, several times.
This works in favor of resiliency, but does not provide complete fault tolerance: a remote server might accept the connection, but might not work, or work poorly.
You can specify just one of the shards (in this case, query processing should be called remote, rather than distributed) or up to any number of shards. In each shard, you can specify from one to any number of replicas. You can specify a different number of replicas for each shard.
The Distributed engine allows working with a cluster like a local server. However, the cluster is inextensible: you must write its configuration in the server config file (even better, for all the cluster's servers).
There is no support for Distributed tables that look at other Distributed tables (except in cases when a Distributed table only has one shard). As an alternative, make the Distributed table look at the "final" tables.
The Distributed engine requires writing clusters to the config file. Adding clusters and servers requires restarting. If you need to send a query to an unknown set of shards and replicas each time, you don't need to create a Distributed table - use the 'remote' table function instead. See the section "Table functions".
First, you can define which servers to write which data to, and perform the write directly on each shard. In other words, perform INSERT in the tables that the distributed table "looks at".
Second, you can perform INSERT in a Distributed table. In this case, the table will distribute the inserted data across servers itself.
In order to write to a Distributed table, it must have a sharding key set (the last parameter). In addition, if there is only one shard, the write operation works without specifying the sharding key, since it doesn't have any meaning in this case.
Each shard can have a weight defined in the config file. By default, the weight is equal to one. Data is distributed across shards in the amount proportional to the shard weight. For example, if there are two shards and the first has a weight of 9 while the second has a weight of 10, the first will be sent 9 / 19 parts of the rows, and the second will be sent 10 / 19.
If this parameter is set to 'true', the write operation selects the first healthy replica and writes data to it. Use this alternative if the Distributed table "looks at" replicated tables. In other words, if the table where data will be written is going to replicate them itself.
If it is set to 'false' (the default), data is written to all replicas. In essence, this means that the Distributed table replicates data itself. This is worse than using replicated tables, because the consistency of replicas is not checked, and over time they will contain slightly different data.
To select the shard that a row of data is sent to, the sharding expression is analyzed, and its remainder is taken from dividing it by the total weight of the shards. The row is sent to the shard that corresponds to the half-interval of the remainders from 'prev_weight' to 'prev_weights + weight', where 'prev_weights' is the total weight of the shards with the smallest number, and 'weight' is the weight of this shard. For example, if there are two shards, and the first has a weight of 9 while the second has a weight of 10, the row will be sent to the first shard for the remainders from the range [0, 9), and to the second for the remainders from the range [10, 19).
The sharding expression can be any expression from constants and table columns that returns an integer. For example, you can use the expression 'rand()' for random distribution of data, or 'UserID' for distribution by the remainder from dividing the user's ID (then the data of a single user will reside on a single shard, which simplifies running IN and JOIN by users). If one of the columns is not distributed evenly enough, you can wrap it in a hash function: intHash64(UserID).
A simple remainder from division is a limited solution for sharding and isn't always appropriate. It works for medium and large volumes of data (dozens of servers), but not for very large volumes of data (hundreds of servers or more). In the latter case, use the sharding scheme required by the subject area, rather than using entries in Distributed tables.
When using Replicated tables, it is possible to reshard data - look at "Resharding" section. But in many cases, better to do without it. SELECT queries are sent to all the shards, and work regardless of how data is distributed across the shards (they can be distributed completely randomly). When you add a new shard, you don't have to transfer the old data to it. You can write new data with a heavier weight - the data will be distributed slightly unevenly, but queries will work correctly and efficiently.
- Queries are used that require joining data (IN or JOIN) by a specific key. If data is sharded by this key, you can use local IN or JOIN instead of GLOBAL IN or GLOBAL JOIN, which is much more efficient.
- A large number of servers is used (hundreds or more) with a large number of small queries (queries of individual clients - websites, advertisers, or partners). In order for the small queries to not affect the entire cluster, it makes sense to locate data for a single client on a single shard. Alternatively, as we've done in Yandex.Metrica, you can set up bi-level sharding: divide the entire cluster into "layers", where a layer may consist of multiple shards. Data for a single client is located on a single layer, but shards can be added to a layer as necessary, and data is randomly distributed within them. Distributed tables are created for each layer, and a single shared distributed table is created for global queries.
Data is written asynchronously. For an INSERT to a Distributed table, the data block is just written to the local file system. The data is sent to the remote servers in the background as soon as possible. You should check whether data is sent successfully by checking the list of files (data waiting to be sent) in the table directory:
If the server ceased to exist or had a rough restart (for example, after a device failure) after an INSERT to a Distributed table, the inserted data might be lost. If a damaged data part is detected in the table directory, it is transferred to the 'broken' subdirectory and no longer used.
The engine accepts parameters: the name of a Date type column containing the date, a sampling expression (optional), a tuple that defines the table's primary key, and the index granularity.
A MergeTree type table must have a separate column containing the date. In this example, it is the 'EventDate' column. The type of the date column must be 'Date' (not 'DateTime').
The sampling expression (optional) can be any expression. It must also be present in the primary key. The example uses a hash of user IDs to pseudo-randomly disperse data in the table for each CounterID and EventDate. In other words, when using the SAMPLE clause in a query, you get an evenly pseudo-random sample of data for a subset of users.
The table is implemented as a set of parts. Each part is sorted by the primary key. In addition, each part has the minimum and maximum date assigned. When inserting in the table, a new sorted part is created. The merge process is periodically initiated in the background. When merging, several parts are selected, usually the smallest ones, and then merged into one large sorted part.
In other words, incremental sorting occurs when inserting to the table. Merging is implemented so that the table always consists of a small number of sorted parts, and the merge itself doesn't do too much work.
During insertion, data belonging to different months is separated into different parts. The parts that correspond to different months are never combined. The purpose of this is to provide local data modification (for ease in backups).
For each part, an index file is also written. The index file contains the primary key value for every 'index_granularity' row in the table. In other words, this is an abbreviated index of sorted data.
When reading from a table, the SELECT query is analyzed for whether indexes can be used. An index can be used if the WHERE or PREWHERE clause has an expression (as one of the conjunction elements, or entirely) that represents an equality or inequality comparison operation, or if it has IN above columns that are in the primary key or date, or Boolean operators over them.
Thus, it is possible to quickly run queries on one or many ranges of the primary key. In the example given, queries will work quickly for a specific counter, for a specific counter and range of dates, for a specific counter and date, for multiple counters and a range of dates, and so on.
%%SELECT count() FROM table WHERE EventDate = toDate(now()) AND CounterID = 34%%
%%SELECT count() FROM table WHERE EventDate = toDate(now()) AND (CounterID = 34 OR CounterID = 42)%%
%%SELECT count() FROM table WHERE ((EventDate >= toDate('2014-01-01') AND EventDate <= toDate('2014-01-31')) OR EventDate = toDate('2014-05-01')) AND CounterID IN (101500, 731962, 160656) AND (CounterID = 101500 OR EventDate != toDate('2014-05-01'))%%
All of these cases will use the index by date and by primary key. The index is used even for complex expressions. Reading from the table is organized so that using the index can't be slower than a full scan.
The index by date only allows reading those parts that contain dates from the desired range. However, a data part may contain data for many dates (up to an entire month), while within a single part the data is ordered by the primary key, which might not contain the date as the first column. Because of this, using a query with only a date condition that does not specify the primary key prefix will cause more data to be read than for a single date.
For concurrent table access, we use multi-versioning. In other words, when a table is simultaneously read and updated, data is read from a set of parts that is current at the time of the query. There are no lengthy locks. Inserts do not get in the way of read operations.
Yandex.Metrica has normal logs (such as hit logs) and change logs. Change logs are used for incrementally calculating statistics on data that is constantly changing. Examples are the log of session changes, or logs of changes to user histories. Sessions are constantly changing in Yandex.Metrica. For example, the number of hits per session increases. We refer to changes in any object as a pair (?old values, ?new values). Old values may be missing if the object was created. New values may be missing if the object was deleted. If the object was changed, but existed previously and was not deleted, both values are present. In the change log, one or two entries are made for each change. Each entry contains all the attributes that the object has, plus a special attribute for differentiating between the old and new values. When objects change, only the new entries are added to the change log, and the existing ones are not touched.
The change log makes it possible to incrementally calculate almost any statistics. To do this, we need to consider "new" rows with a plus sign, and "old" rows with a minus sign. In other words, incremental calculation is possible for all statistics whose algebraic structure contains an operation for taking the inverse of an element. This is true of most statistics. We can also calculate "idempotent" statistics, such as the number of unique visitors, since the unique visitors are not deleted when making changes to sessions.
When merging, each group of consecutive identical primary key values (columns for sorting data) is reduced to no more than one row with the column value 'sign_column = -1' (the "negative row") and no more than one row with the column value 'sign_column = 1' (the "positive row"). In other words, entries from the change log are collapsed.
Otherwise, there will be a logical error and none of the rows will be written. (A logical error can occur if the same section of the log was accidentally inserted more than once. The error is just recorded in the server log, and the merge continues.)
1. Write a query with GROUP BY and aggregate functions that accounts for the sign. For example, to calculate quantity, write 'sum(Sign)' instead of 'count()'. To calculate the sum of something, write 'sum(Sign * x)' instead of 'sum(x)', and so on, and also add 'HAVING sum(Sign) > 0'. Not all amounts can be calculated this way. For example, the aggregate functions 'min' and 'max' can't be rewritten.
2. If you must extract data without aggregation (for example, to check whether rows are present whose newest values match certain conditions), you can use the FINAL modifier for the FROM clause. This approach is significantly less efficient.
The columns to total are implicit. When merging, all rows with the same primary key value (in the example, OrderId, EventDate, BannerID, ...) have their values totaled in numeric columns that are not part of the primary key.
The columns to total are set explicitly (the last parameter - Shows, Clicks, Cost, ...). When merging, all rows with the same primary key value have their values totaled in the specified columns. The specified columns also must be numeric and must not be part of the primary key.
If the name of a nested table ends in 'Map' and it contains at least two columns that meet the following criteria:
- for the first table, numeric ((U)IntN, Date, DateTime), we'll refer to it as 'key'
- for other tables, arithmetic ((U)IntN, Float32/64), we'll refer to it as '(values...)'
then this nested table is interpreted as a mapping of key => (values...), and when merging its rows, the elements of two data sets are merged by 'key' with a summation of the corresponding (values...).
This engine differs from MergeTree in that the merge combines the states of aggregate functions stored in the table for rows with the same primary key value.
In order for this to work, it uses the AggregateFunction data type and the -State and -Merge modifiers for aggregate functions. Let's examine it more closely.
There is an AggregateFunction data type, which is a parametric data type. As parameters, the name of the aggregate function is passed, then the types of its arguments.
To get this type of value, use aggregate functions with the 'State' suffix.
Example: uniqState(UserID), quantilesState(0.5, 0.9)(SendTiming) - in contrast to the corresponding 'uniq' and 'quantiles' functions, these functions return the state, rather than the prepared value. In other words, they return an AggregateFunction type value.
An AggregateFunction type value can't be output in Pretty formats. In other formats, these types of values are output as implementation-specific binary data. The AggregateFunction type values are not intended for output or saving in a dump.
The only useful thing you can do with AggregateFunction type values is combine the states and get a result, which essentially means to finish aggregation. Aggregate functions with the 'Merge' suffix are used for this purpose.
Example: uniqMerge(UserIDState), where UserIDState has the AggregateFunction type.
There is an AggregatingMergeTree engine. Its job during a merge is to combine the states of aggregate functions from different table rows with the same primary key value.
You can't use a normal INSERT to insert a row in a table containing AggregateFunction columns, because you can't explicitly define the AggregateFunction value. Instead, use INSERT SELECT with '-State' aggregate functions for inserting data.
With SELECT from an AggregatingMergeTree table, use GROUP BY and aggregate functions with the '-Merge' modifier in order to complete data aggregation.
Used for implementing views (for more information, see the CREATE VIEW query). It does not store data, but only stores the specified SELECT query. When reading from a table, it runs this query (and deletes all unnecessary columns from the query).
Used for implementing materialized views (for more information, see CREATE MATERIALIZED VIEW). For storing data, it uses a different engine that was specified when creating the view. When reading from a table, it just uses this engine.
Data is always located in RAM. For INSERT, the blocks of inserted data are also written to the directory of tables on the disk. When starting the server, this data is loaded to RAM. In other words, after restarting, the data remains in place.
For a rough server restart, the block of data on the disk might be lost or damaged. In the latter case, you may need to manually delete the file with damaged data.
Engine parameters: ANY|ALL - strictness, and LEFT|INNER - the type. These parameters are set without quotes and must match the JOIN that the table will be used for. k1, k2, ... are the key columns from the USING clause that the join will be made on.
You can use INSERT to add data to the table, similar to the Set engine. For ANY, data for duplicated keys will be ignored. For ALL, it will be counted. You can't perform SELECT directly from the table. The only way to retrieve data is to use it as the "right-hand" table for JOIN.
Buffers the data to write in RAM, periodically flushing it to another table. During the read operation, data is read from the buffer and the other table simultaneously.
database, table - The table to flush data to. Instead of the database name, you can use a constant expression that returns a string.
num_layers - The level of parallelism. Physically, the table will be represented as 'num_layers' of independent buffers. The recommended value is 16.
Data is flushed from the buffer and written to the destination table if all the 'min' conditions or at least one 'max' condition are met.
During the write operation, data is inserted to a 'num_layers' number of random buffers. Or, if the data part to insert is large enough (greater than 'max_rows' or 'max_bytes'), it is written directly to the destination table, omitting the buffer.
The conditions for flushing the data are calculated separately for each of the 'num_layers' buffers. For example, if num_layers = 16 and max_bytes = 100000000, the maximum RAM consumption is 1.6 GB.
Creating a 'merge.hits_buffer' table with the same structure as 'merge.hits' and using the Buffer engine. When writing to this table, data is buffered in RAM and later written to the 'merge.hits' table. 16 buffers are created. The data in each of them is flushed if either 100 seconds have passed, or one million rows have been written, or 100 MB of data have been written; or if simultaneously 10 seconds have passed and 10,000 rows and 10 MB of data have been written. For example, if just one row has been written, after 100 seconds it will be flushed, no matter what. But if many rows have been written, the data will be flushed sooner.
You can set empty strings in single quotation marks for the database and table name. This indicates the absence of a destination table. In this case, when the data flush conditions are reached, the buffer is simply cleared. This may be useful for keeping a window of data in memory.
When reading from a Buffer table, data is processed both from the buffer and from the destination table (if there is one).
Note that the Buffer tables does not support an index. In other words, data in the buffer is fully scanned, which might be slow for large buffers. (For data in a subordinate table, the index it supports will be used.)
If the set of columns in the Buffer table doesn't match the set of columns in a subordinate table, a subset of columns that exist in both tables is inserted.
If the types don't match for one of the columns in the Buffer table and a subordinate table, an error message is entered in the server log and the buffer is cleared.
The same thing happens if the subordinate table doesn't exist when the buffer is flushed.
If you need to run ALTER for a subordinate table and the Buffer table, we recommend first deleting the Buffer table, running ALTER for the subordinate table, then creating the Buffer table again.
PREWHERE, FINAL and SAMPLE do not work correctly for Buffer tables. These conditions are passed to the destination table, but are not used for processing data in the buffer. Because of this, we recommend only using the Buffer table for writing, while reading from the destination table.
Data that is inserted to a Buffer table may end up in the subordinate table in a different order and in different blocks. Because of this, a Buffer table is difficult to use for writing to a CollapsingMergeTree correctly. To avoid problems, you can set 'num_layers' to 1.
If the destination table is replicated, some expected characteristics of replicated tables are lost when writing to a Buffer table. The random changes to the order of rows and sizes of data parts cause data deduplication to quit working, which means it is not possible to have a reliable 'exactly once' write to replicated tables.
A Buffer table is used when too many INSERTs are received from a large number of servers over a unit of time and data can't be buffered before insertion, which means the INSERTs can't run fast enough.
Note that it doesn't make sense to insert data one row at a time, even for Buffer tables. This will only produce a speed of a few thousand rows per second, while inserting larger blocks of data can produce over a million rows per second (see the section "Performance").
Replication is only supported for tables in the MergeTree family. Replication works at the level of an individual table, not the entire server. A server can store both replicated and non-replicated tables at the same time.
INSERT and ALTER are replicated (for more information, see ALTER). Compressed data is replicated, not query texts.
The CREATE, DROP, ATTACH, DETACH, and RENAME queries are not replicated. In other words, they belong to a single server. The CREATE TABLE query creates a new replicatable table on the server where the query is run. If this table already exists on other servers, it adds a new replica. The DROP TABLE query deletes the replica located on the server where the query is run. The RENAME query renames the table on one of the replicas. In other words, replicated tables can have different names on different replicas.
You can specify any existing ZooKeeper cluster - the system will use a directory on it for its own data (the directory is specified when creating a replicatable table).
ZooKeeper isn't used for SELECT queries. In other words, replication doesn't affect the productivity of SELECT queries - they work just as fast as for non-replicated tables.
For each INSERT query (more precisely, for each inserted block of data; the INSERT query contains a single block, or per block for every max_insert_block_size = 1048576 rows), approximately ten entries are made in ZooKeeper in several transactions. This leads to slightly longer latencies for INSERT compared to non-replicated tables. But if you follow the recommendations to insert data in batches of no more than one INSERT per second, it doesn't create any problems. The entire ClickHouse cluster used for coordinating one ZooKeeper cluster has a total of several hundred INSERTs per second. The throughput on data inserts (the number of rows per second) is just as high as for non-replicated data.
For very large clusters, you can use different ZooKeeper clusters for different shards. However, this hasn't proven necessary on the Yandex.Metrica cluster (approximately 300 servers).
Replication is asynchronous and multi-master. INSERT queries (as well as ALTER) can be sent to any available server. Data is inserted on this server, then sent to the other servers. Because it is asynchronous, recently inserted data appears on the other replicas with some latency. If a part of the replicas is not available, the data on them is written when they become available. If a replica is available, the latency is the amount of time it takes to transfer the block of compressed data over the network.
There are no quorum writes. You can't write data with confirmation that it was received by more than one replica. If you write a batch of data to one replica and the server with this data ceases to exist before the data has time to get to the other replicas, this data will be lost.
Each block of data is written atomically. The INSERT query is divided into blocks up to max_insert_block_size = 1048576 rows. In other words, if the INSERT query has less than 1048576 rows, it is made atomically.
Blocks of data are duplicated. For multiple writes of the same data block (data blocks of the same size containing the same rows in the same order), the block is only written once. The reason for this is in case of network failures when the client application doesn't know if the data was written to the DB, so the INSERT query can simply be repeated. It doesn't matter which replica INSERTs were sent to with identical data - INSERTs are idempotent. This only works for the last 100 blocks inserted in a table.
During replication, only the source data to insert is transferred over the network. Further data transformation (merging) is coordinated and performed on all the replicas in the same way. This minimizes network usage, which means that replication works well when replicas reside in different datacenters. (Note that duplicating data in different datacenters is the main goal of replication.)
You can have any number of replicas of the same data. Yandex.Metrica uses double replication in production. Each server uses RAID-5 or RAID-6, and RAID-10 in some cases. This is a relatively reliable and convenient solution.
The system monitors data synchronicity on replicas and is able to recover after a failure. Failover is automatic (for small differences in data) or semi-automatic (when data differs too much, which may indicate a configuration error).
As the example shows, these parameters can contain substitutions in curly brackets. The substituted values are taken from the 'macros' section of the config file. Example:
%%{layer}-{shard}%% is the shard identifier. In this example it consists of two parts, since the Yandex.Metrica cluster uses bi-level sharding. For most tasks, you can leave just the {shard} substitution, which will be expanded to the shard identifier.
%%hits%% is the name of the node for the table in ZooKeeper. It is a good idea to make it the same as the table name. It is defined explicitly, because in contrast to the table name, it doesn't change after a RENAME query.
The replica name identifies different replicas of the same table. You can use the server name for this, as in the example. The name only needs to be unique within each shard.
You can define everything explicitly instead of using substitutions. This might be convenient for testing and for configuring small clusters, but it is inconvenient when working with large clusters.
If you add a new replica after the table already contains some data on other replicas, the data will be copied from the other replicas to the new one after running the query. In other words, the new replica syncs itself with the others.
If ZooKeeper is unavailable when a server starts, replicated tables switch to read-only mode. The system periodically attempts to connect to ZooKeeper.
After connecting to ZooKeeper, the system checks whether the set of data in the local file system matches the expected set of data (ZooKeeper stores this information). If there are minor inconsistencies, the system resolves them by syncing data with the replicas.
If the system detects broken data parts (with the wrong size of files) or unrecognized parts (parts written to the file system but not recorded in ZooKeeper), it moves them to the 'detached' subdirectory (they are not deleted). Any missing parts are copied from the replicas.
When the server starts (or establishes a new session with ZooKeeper), it only checks the quantity and sizes of all files. If the file sizes match but bytes have been changed somewhere in the middle, this is not detected immediately, but only when attempting to read the data for a SELECT query. The query throws an exception about a non-matching checksum or size of a compressed block. In this case, data parts are added to the verification queue and copied from the replicas if necessary.
If the local set of data differs too much from the expected one, a safety mechanism is triggered. The server enters this in the log and refuses to launch. The reason for this is that this case may indicate a configuration error, such as if a replica on a shard was accidentally configured like a replica on a different shard. However, the thresholds for this mechanism are set fairly low, and this situation might occur during normal failure recovery. In this case, data is restored semi-automatically - by "pushing a button".
To start recovery, create the node <spanclass="inline-example">/<i>path_to_table</i>/<i>replica_name</i>/flags/force_restore_data</span> in ZooKeeper with any content or run command to recover all replicated tables:
1. Install ClickHouse on the server. Define substitutions correctly in the config file that contains the shard identifier and replicas, if you use them.
2. If you had unreplicated tables that must be manually duplicated on the servers, copy their data from a replica (in the directory <spanclass="inline-example">/opt/clickhouse/data/<i>db_name</i>/<i>table_name</i>/</span>).
3. Copy table definitions located in %%/opt/clickhouse/metadata/%% from a replica. If a shard or replica identifier is defined explicitly in the table definitions, correct it so that it corresponds to this replica. (Alternatively, launch the server and make all the ATTACH TABLE queries that should have been in the .sql files in %%/opt/clickhouse/metadata/%%.)
4. Create the <spanclass="inline-example">/<i>path_to_table</i>/<i>replica_name</i>/flags/force_restore_data</span> node in ZooKeeper with any content or run command to recover all replicated tables:
An alternative recovery option is to delete information about the lost replica from ZooKeeper ( <spanclass="inline-example">/<i>path_to_table</i>/<i>replica_name</i></span>), then create the replica again as described in "Creating replicated tables".
If you had a MergeTree table that was manually replicated, you can convert it to a replicatable table. You might need to do this if you have already collected a large amount of data in a MergeTree table and now you want to enable replication.
Move the data from the old table to the 'detached' subdirectory inside the directory with the new table data (<spanclass="inline-example">/opt/clickhouse/data/<i>db_name</i>/<i>table_name</i>/</span>).
If exactly the same parts exist on the other replicas, they are added to the working set on them. If not, the parts are downloaded from the replica that has them.
Create a MergeTree table with a different name. Move all the data from the directory with the ReplicatedMergeTree table data to the new table's data directory. Then delete the ReplicatedMergeTree table and restart the server.
%%ALTER TABLE t RESHARD [COPY] [PARTITION partition] TO <i>cluster description</i> USING <i>sharding key</i>%%
Query works only for Replicated tables and for Distributed tables that are looking at Replicated tables.
When executing, query first checks correctness of query, sufficiency of free space on nodes and writes to ZooKeeper at some path a task to to. Next work is done asynchronously.
For using resharding, you must specify path in ZooKeeper for task queue in configuration file:
Sharding key (%%UserID%% in example) has same semantic as for Distributed tables. You could specify %%rand()%% as sharding key for random distribution of data.
When query is run, it checks:
- identity of table structure on all shards.
- availability of free space on local node in amount of partition size in bytes, with additional 10% reserve.
- availability of free space on all replicas of all specified shards, except local replica, if exists, in amount of patition size times ratio of shard weight to total weight of all shards, with additional 10% reserve.
Next, asynchronous processing of query is of following steps:
1. Split patition to parts on local node.
It merges all parts forming a partition and in the same time, splits them to several, according sharding key.
Result is placed to /reshard directory in table data directory.
Source parts doesn't modified and all process doesn't intervent table working data set.
2. Copying all parts to remote nodes (to each replica of corresponding shard).
3. Execution of queries %%ALTER TABLE t DROP PARTITION%% on local node and %%ALTER TABLE t ATTACH PARTITION%% on all shards.
Note: this operation is not atomic. There are time point when user could see absence of data.
When %%COPY%% keyword is specified, source data is not removed. It is suitable for copying data from one cluster to another with changing sharding scheme in same time.
4. Removing temporary data from local node.
When having multiple resharding queries, their tasks will be done sequentially.
Query in example is to reshard single partition.
If you don't specify partition in query, then tasks to reshard all partitions will be created. Example:
%%
ALTER TABLE merge.hits
RESHARD
TO ...
%%
When resharding Distributed tables, each shard will be resharded (corresponding query is sent to each shard).
You could reshard Distributed table to itself or to another table.
Resharding is intended for "old" data: in case when during job, resharded partition was modified, task for that partition will be cancelled.
On each server, resharding is done in single thread. It is doing that way to not disturb normal query processing.
As of June 2016, resharding is in "beta" state: it was tested only for small data sets - up to 5 TB.
System tables are used for implementing part of the system's functionality, and for providing access to information about how the system is working.
user String - Name of the user who made the request. For distributed query processing, this is the user who helped the requestor server send the query to this server, not the user who made the distributed request on the requestor server.
address String - The IP address the request was made from. The same for distributed processing.
elapsed Float64 - The time in seconds since request execution started.
rows_read UInt64 - The number of rows read from the table. For distributed processing, on the requestor server, this is the total for all remote servers.
bytes_read UInt64 - The number of uncompressed bytes read from the table. For distributed processing, on the requestor server, this is the total for all remote servers.
total_rows_approx UInt64 - The approximation of the total number of rows that should be read. For distributed processing, on the requestor server, this is the total for all remote servers. It can be updated during request processing, when new sources to process become known.
memory_usage UInt64 - How much memory the request uses. It might not include some types of dedicated memory.
query String - The query text. For INSERT, it doesn't include the data to insert.
Note that the amount of memory used by the dictionary is not proportional to the number of items stored in it. So for flat and cached dictionaries, all the memory cells are pre-assigned, regardless of how full the dictionary actually is.
Contains information and status for replicated tables residing on the local server. This table can be used for monitoring. The table contains a row for every Replicated* table.
Only one replica can be the leader at a time. The leader is responsible for selecting background merges to perform.
Note that writes can be performed to any replica that is available and has a session in ZK, regardless of whether it is a leader.
is_readonly: Whether the replica is in read-only mode.
This mode is turned on if the config doesn't have sections with ZK, if an unknown error occurred when reinitializing sessions in ZK, and during session reinitialization in ZK.
is_session_expired: Whether the session with ZK has expired.
Basically the same as 'is_readonly'.
future_parts: The number of data parts that will appear as the result of INSERTs or merges that haven't been done yet.
parts_to_check: The number of data parts in the queue for verification.
A part is put in the verification queue if there is suspicion that it might be damaged.
zookeeper_path: Path to table data in ZK.
replica_name: Replica name in ZK. Different replicas of the same table have different names.
replica_path: Path to replica data in ZK. The same as concatenating 'zookeeper_path/replicas/replica_path'.
columns_version: Version number of the table structure. Indicates how many times ALTER was performed. If replicas have different versions, it means some replicas haven't made all of the ALTERs yet.
queue_size: Size of the queue for operations waiting to be performed. Operations include inserting blocks of data, merges, and certain other actions. It usually coincides with 'future_parts'.
inserts_in_queue: Number of inserts of blocks of data that need to be made. Insertions are usually replicated fairly quickly. If this number is large, it means something is wrong.
merges_in_queue: The number of merges waiting to be made. Sometimes merges are lengthy, so this value may be greater than one for a long time.
The next 4 columns have a non-zero value only where there is an active session with ZK.
log_max_index: Maximum entry number in the log of general activity.
log_pointer: Maximum entry number from the log of general activity that the replica copied to its queue for execution, plus one.
If log_pointer is much smaller than log_max_index, something is wrong.
total_replicas: The total number of known replicas of this table.
active_replicas: The number of replicas of this table that have a session in ZK (i.e., the number of functioning replicas).
Contains information about settings that are currently in use (i.e. used for executing the query you are using to read from the system.settings table).
Allows reading data from the ZooKeeper cluster defined in the config.
The query must have a 'path' equality condition in the WHERE clause. This is the path in ZooKeeper for the children that you want to get data for.
This may be just one server address. The server address is host:port, or just the host. The host can be specified as the server name, or as the IPv4 or IPv6 address. An IPv6 address is specified in square brackets. The port is the TCP port on the remote server. If the port is omitted, it uses %%tcp_port%% from the server's config file (by default, 9000).
Multiple addresses can be comma-separated. In this case, the query goes to all the specified addresses (like to shards with different data) and uses distributed processing.
Curly brackets can contain a range of numbers separated by two dots (non-negative integers). In this case, the range is expanded to a set of values that generate shard addresses. If the first number starts with zero, the values are formed with the same zero alignment.
Addresses and fragments in curly brackets can be separated by the pipe (|) symbol. In this case, the corresponding sets of addresses are interpreted as replicas, and the query will be sent to the first healthy replica. The replicas are evaluated in the order currently set in the 'load_balancing' setting.
Using the 'remote' table function is less optimal than creating a Distributed table, because in this case, the server connection is re-established for every request. In addition, if host names are set, the names are resolved, and errors are not counted when working with various replicas. When processing a large number of queries, always create the Distributed table ahead of time, and don't use the 'remote' table function.
The format determines how data is given (written by server as output) to you after SELECTs, and how it is accepted (read by server as input) for INSERTs.
The most efficient format. Data is written and read by blocks in binary format. For each block, the number of rows, number of columns, column names and types, and parts of columns in this block are recorded one after another. In other words, this format is "columnar" - it doesn't convert columns to rows. This is the format used in the native interface for interaction between servers, for using the command-line client, and for C++ clients.
You can use this format to quickly generate dumps that can only be read by the ClickHouse DBMS. It doesn't make sense to work with this format yourself.
In TabSeparated format, data is written by row. Each row contains values separated by tabs. Each value is follow by a tab, except the last value in the row, which is followed by a line break. Strictly Unix line breaks are assumed everywhere. The last row also must contain a line break at the end. Values are written in text format, without enclosing quotation marks, and with special characters escaped.
Numbers are written in decimal form. Numbers may contain an extra "+" symbol at the beginning (but it is not recorded during an output). Non-negative numbers can't contain the negative sign. When parsing, it is allowed to parse an empty string as a zero, or (for signed types) a string consisting of just a minus sign as a zero. Numbers that do not fit into the corresponding data type may be parsed as a different number, without an error message.
Floating-point numbers are formatted in decimal form. The dot is used as the decimal separator. Exponential entries are supported, as are 'inf', '+inf', '-inf', and 'nan'. An entry of floating-point numbers may begin or end with a decimal point.
During formatting, accuracy may be lost on floating-point numbers.
During parsing, a result is not necessarily the nearest machine-representable number.
Dates are formatted in YYYY-MM-DD format and parsed in the same format, but with any characters as separators.
DateTimes are formatted in the format YYYY-MM-DD hh:mm:ss and parsed in the same format, but with any characters as separators.
This all occurs in the system time zone at the time the client or server starts (depending on which one formats data). For DateTimes, daylight saving time is not specified. So if a dump has times during daylight saving time, the dump does not unequivocally match the data, and parsing will select one of the two times.
During a parsing operation, incorrect dates and dates with times can be parsed with natural overflow or as null dates and times, without an error message.
As an exception, parsing DateTime is also supported in Unix timestamp format, if it consists of exactly 10 decimal digits. The result is not time zone-dependent. The formats YYYY-MM-DD hh:mm:ss and NNNNNNNNNN are differentiated automatically.
Strings are parsed and formatted with backslash-escaped special characters. The following escape sequences are used while formatting: %%\b%%, %%\f%%, %%\r,%% %%\n%%, %%\t%%, %%\0%%, %%\'%%, and %%\\%%. For parsing, also supported %%\a%%, %%\v%% and <spanclass="inline-example">\x<i>HH</i></span> (hex escape sequence) and any sequences of the type <spanclass="inline-example">\<i>c</i></span> where <i>c</i> is any character (these sequences are converted to <i>c</i>). This means that parsing supports formats where a line break can be written as %%\n%% or as %%\%% and a line break. For example, the string 'Hello world' with a line break between the words instead of a space can be retrieved in any of the following variations:
Arrays are formatted as a list of comma-separated values in square brackets. Number items in the array are formatted as normally, but dates, dates with times, and strings are formatted in single quotes with the same escaping rules as above.
The TabSeparated format is convenient for processing data using custom programs and scripts. It is used by default in the HTTP interface, and in the command-line client's batch mode. This format also allows transferring data between different DBMSs. For example, you can get a dump from MySQL and upload it to ClickHouse, or vice versa.
The TabSeparated format supports outputting total values (when using WITH TOTALS) and extreme values (when 'extremes' is set to 1). In these cases, the total values and extremes are output after the main data. The main result, total values, and extremes are separated from each other by an empty line. Example:
String values are output in double quotes. Double quote inside a string is output as two consecutive double quotes. That's all escaping rules. Date and DateTime values are output in double quotes. Numbers are output without quotes. Fields are delimited by commas. Rows are delimited by unix newlines (LF). Arrays are output in following way: first, array are serialized to String (as in TabSeparated or Values formats), and then the String value are output in double quotes. Tuples are narrowed and serialized as separate columns.
During parsing, values could be enclosed or not enclosed in quotes. Supported both single and double quotes. In particular, Strings could be represented without quotes - in that case, they are parsed up to comma or newline (CR or LF). Contrary to RFC, in case of parsing strings without quotes, leading and trailing spaces and tabs are ignored. As line delimiter, both Unix (LF), Windows (CR LF) or Mac OS Classic (LF CR) variants are supported.
CSV format supports output of totals and extremes similar to TabSeparated format.
==CSVWithNames==
Also contains header, similar to TabSeparatedWithNames.
A full grid of the table is drawn, and each row occupies two lines in the terminal. Each result block is output as a separate table. This is necessary so that blocks can be output without buffering results (buffering would be necessary in order to pre-calculate the visible width of all the values).
To avoid dumping too much data to the terminal, only the first 10,000 rows are printed. If the number of rows is greater than or equal to 10,000, the message "Showed first 10,000" is printed.
The Pretty format supports outputting total values (when using WITH TOTALS) and extremes (when 'extremes' is set to 1). In these cases, total values and extreme values are output after the main data, in separate tables. Example (shown for the PrettyCompact format):
Differs from Pretty in that the grid is drawn between rows and the result is more compact. This format is used by default in the command-line client in interactive mode.
Differs from Pretty in that ANSI-escape sequences aren't used. This is necessary for displaying this format in a browser, as well as for using the 'watch' command-line utility.
Prints each value on a separate line with the column name specified. This format is convenient for printing just one or a few rows, if each row consists of a large number of columns.
Prints every row in parentheses. Rows are separated by commas. There is no comma after the last row. The values inside the parentheses are also comma-separated. Numbers are output in decimal format without quotes. Arrays are output in square brackets. Strings, dates, and dates with times are output in quotes. Escaping rules and parsing are same as in the TabSeparated format. During formatting, extra spaces aren't inserted, but during parsing, they are allowed and skipped (except for spaces inside array values, which are not allowed).
Outputs data in JSON format. Besides data tables, it also outputs column names and types, along with some additional information - the total number of output rows, and the number of rows that could have been output if there weren't a LIMIT. Example:
JSON is compatible with JavaScript. For this purpose, certain symbols are additionally escaped: the forward slash %%/%% is escaped as %%\/%%; alternative line breaks %%U+2028%% and %%U+2029%%, which don't work in some browsers, are escaped as <spanclass="inline-example">\u<i>XXXX</i></span>-sequences. ASCII control characters are escaped: backspace, form feed, line feed, carriage return, and horizontal tab as %%\b%%, %%\f%%, %%\n%%, %%\r%%, and %%\t%% respectively, along with the rest of the bytes from the range 00-1F using <spanclass="inline-example">\u<i>XXXX</i></span>-sequences. Invalid UTF-8 sequences are changed to the replacement character %%<25>%% and, thus, the output text will consist of valid UTF-8 sequences. UInt64 and Int64 numbers are output in double quotes for compatibility with JavaScript.
In INSERT queries JSON data can be supplied with arbitrary order of columns (JSON key-value pairs). It is also possible to omit values in which case the default value of the column is inserted. N.B. when using JSONEachRow format, complex default values are not supported, so when omitting a column its value will be zeros or empty string depending on its type.
Space characters between JSON objects are skipped. Between objects there can be a comma which is ignored. Newline character is not a mandatory separator for objects.
Similar to TabSeparated, but displays data in %%name=value%% format. Names are displayed just as in TabSeparated. Additionally, a %%=%% symbol is displayed.
%%
SearchPhrase= count()=8267016
SearchPhrase=bathroom interior count()=2166
SearchPhrase=yandex count()=1655
SearchPhrase=spring 2014 fashion count()=1549
SearchPhrase=free-form photo count()=1480
SearchPhrase=Angelina Jolie count()=1245
SearchPhrase=omsk count()=1112
SearchPhrase=photos of dog breeds count()=1091
SearchPhrase=curtain design count()=1064
SearchPhrase=baku count()=1000
%%
In case of many small columns this format is obviously not effective and there usually is no reason to use it. This format is supported because it is used for some cases in Yandex.
Format is supported both for input and output. In INSERT queries data can be supplied with arbitrary order of columns. It is also possible to omit values in which case the default value of the column is inserted. N.B. when using TSKV format, complex default values are not supported, so when omitting a column its value will be zeros or empty string depending on its type.
Nothing is output. However, the query is processed, and when using the command-line client, data is transmitted to the client. This is used for tests, including productivity testing. Obviously, this format is only appropriate for outputting a query result, not for parsing.
If you need to store texts, we recommend using UTF-8 encoding. At the very least, if your terminal uses UTF-8 (as recommended), you can read and write your values without making conversions.
Similarly, certain functions for working with strings have separate variations that work under the assumption that the string contains a set of bytes representing a UTF-8 encoded text.
For example, the 'length' function calculates the string length in bytes, while the 'lengthUTF8' function calculates the string length in Unicode code points, assuming that the value is UTF-8 encoded.
When server reads a string (as an input passed in INSERT query, for example) that contains fewer bytes, the string is padded to N bytes by appending null bytes at the right.
When server reads a string that contains more bytes, an error message is returned.
When server writes a string (as an output of SELECT query, for example), null bytes are not trimmed off of the end of the string, but are output.
A date. Stored in two bytes as the number of days since 1970-01-01 (unsigned). Allows storing values from just after the beginning of the Unix Epoch to the upper threshold defined by a constant at the compilation stage (currently, this is until the year 2038, but it may be expanded to 2106).
Date with time. Stored in four bytes as a Unix timestamp (unsigned). Allows storing values in the same range as for the Date type. The minimal value is output as 0000-00-00 00:00:00. The time is stored with accuracy up to one second (without leap seconds).
The date with time is converted from text (divided into component parts) to binary and back, using the system's time zone at the time the client or server starts. In text format, information about daylight savings is lost.
Note that by default the client adopts the server time zone at the beginning of the session. You can change this behaviour with the --use_client_time_zone command line switch.
Supports only those time zones that never had the time differ from UTC for a partial number of hours (without leap seconds) over the entire time range you will be working with.
So when working with a textual date (for example, when saving text dumps), keep in mind that there may be ambiguity during changes for daylight savings time, and there may be problems matching data if the time zone changed.
Enum8 or Enum16. A set of enumerated string values that are stored as Int8 or Int16. Example:
%%Enum8('hello' = 1, 'world' = 2)%%
- This data type has two possible values - 'hello' and 'world'.
The numeric values must be within -128..127 for %%Enum8%% and -32768..32767 for %%Enum16%%. Every member of the enum must also have different numbers. The empty string is a valid value. The numbers do not need to be sequential and can be in any order. The order does not matter.
In memory, the data is stored in the same way as the numeric types %%Int8%% and %%Int16%%.
When reading in text format, the string is read and the corresponding numeric value is looked up. An exception will be thrown if it is not found.
When writing in text format, the stored number is looked up and the corresponding string is written out. An exception will be thrown if the number does not correspond to a known value.
In binary format, the information is saved in the same way as %%Int8%% and %%Int16%%.
The implicit default value for an Enum is the value having the smallest numeric value.
In %%ORDER BY%%, %%GROUP BY%%, %%IN%%, %%DISTINCT%%, etc. Enums behave like the numeric value. e.g. they will be sorted by the numeric value in an %%ORDER BY%%. Equality and comparison operators behave like they do on the underlying numeric value.
Enum values cannot be compared to numbers, they must be compared to a string. If the string compared to is not a valid value for the Enum, an exception will be thrown. The %%IN%% operator is supported with the Enum on the left hand side and a set of strings on the right hand side.
Most numeric and string operations are not defined for Enum values, e.g. adding a number to an Enum or concatenating a string to an Enum. However, the %%toString%% function can be used to convert the Enum to its string value. Enum values are also convertible to numeric types using the %%to<i>T</i>%% function where <i>T</i> is a numeric type. When T corresponds to the enum's underlying numeric type, this conversion is zero-cost.
It is possible to add new members to the Enum using ALTER. If the only change is to the set of values, the operation will be almost instant. It is also possible to remove members of the Enum using ALTER. Removing members is only safe if the removed value has never been used in the table. As a safeguard, changing the numeric value of a previously defined Enum member will throw an exception.
Using ALTER, it is possible to change an %%Enum8%% to an %%Enum16%% or vice versa - just like changing an %%Int8%% to %%Int16%%.
Array of T-type items. The T type can be any type, including an array.
We don't recommend using multidimensional arrays, because they are not well supported (for example, you can't store multidimensional arrays in any tables except Memory tables).
Tuples can't be written to tables (other than Memory tables). They are used for temporary column grouping. Columns can be grouped when an IN expression is used in a query, and for specifying certain formal parameters of lambda functions. For more information, see "IN operators" and "Higher order functions".
Tuples can be output as the result of running a query. In this case, for text formats other than JSON*, values are comma-separated in brackets. In JSON* formats, tuples are output as arrays (in square brackets).
A nested data structure is like a nested table. The parameters of a nested data structure - the column names and types - are specified the same way as in a CREATE query. Each table row can correspond to any number of rows in a nested data structure.
This example declares the 'Goals' nested data structure, which contains data about conversions (goals reached). Each row in the 'visits' table can correspond to zero or any number of conversions.
In most cases, when working with a nested data structure, its individual columns are specified. To do this, the column names are separated by a dot. These columns make up an array of matching types. All the column arrays of a single nested data structure have the same length.
The only place where a SELECT query can specify the name of an entire nested data structure instead of individual columns is the ARRAY JOIN clause. For more information, see "ARRAY JOIN clause". Example:
For an INSERT query, you should pass all the component column arrays of a nested data structure separately (as if they were individual column arrays). During insertion, the system checks that they have the same length.
The intermediate state of an aggregate function. To get it, use aggregate functions with the '-State' suffix. For more information, see "AggregatingMergeTree".
For efficiency, the 'and' and 'or' functions accept any number of arguments. The corresponding chains of AND and OR operators are transformed to a single call of these functions.
There are at least* two types of functions - regular functions (they are just called "functions") and aggregate functions. These are completely different concepts. Regular functions work as if they are applied to each row separately (for each row, the result of the function doesn't depend on the other rows). Aggregate functions accumulate a set of values from various rows (i.e. they depend on the entire set of rows).
In contrast to standard SQL, ClickHouse has strong typing. In other words, it doesn't make implicit conversions between types. Each function works for a specific set of types. This means that sometimes you need to use type conversion functions.
All expressions in a query that have the same AST (the same record or same result of syntactic parsing) are considered to have identical values. Such expressions are concatenated and executed once. Identical subqueries are also eliminated this way.
All functions return a single return as the result (not several values, and not zero values). The type of result is usually defined only by the types of arguments, not by the values. Exceptions are the tupleElement function (the a.N operator), and the toFixedString function.
For simplicity, certain functions can only work with constants for some arguments. For example, the right argument of the LIKE operator must be a constant.
Almost all functions return a constant for constant arguments. The exception is functions that generate random numbers.
The 'now' function returns different values for queries that were run at different times, but the result is considered a constant, since constancy is only important within a single query.
Functions can be implemented in different ways for constant and non-constant arguments (different code is executed). But the results for a constant and for a true column containing only the same value should match each other.
Functions can't change the values of their arguments - any changes are returned as the result. Thus, the result of calculating separate functions does not depend on the order in which the functions are written in the query.
Some functions might throw an exception if the data is invalid. In this case, the query is canceled and an error text is returned to the client. For distributed processing, when an exception occurs on one of the servers, the other servers also attempt to abort the query.
In almost all programming languages, one of the arguments might not be evaluated for certain operators. This is usually for the operators &&, ||, ?:.
But in ClickHouse, arguments of functions (operators) are always evaluated. This is because entire parts of columns are evaluated at once, instead of calculating each row separately.
For distributed query processing, as many stages of query processing as possible are performed on remote servers, and the rest of the stages (merging intermediate results and everything after that) are performed on the requestor server.
- if %%distributed_table%% has at least two shards, the functions %%g%% and %%h%% are performed on remote servers, and the function %%f%% - is performed on the requestor server.
- if %%distributed_table%% has only one shard, all the functions %%f%%, %%g%%, and %%h%% are performed on this shard's server.
Another example is the %%hostName%% function, which returns the name of the server it is running on in order to make GROUP BY by servers in a SELECT query.
If a function in a query is performed on the requestor server, but you need to perform it on remote servers, you can wrap it in an 'any' aggregate function or add it to a key in GROUP BY.
For all arithmetic functions, the result type is calculated as the smallest number type that the result fits in, if there is such a type. The minimum is taken simultaneously based on the number of bits, whether it is signed, and whether it floats. If there are not enough bits, the highest bit type is taken.
You can also add whole numbers with a date or date and time. In the case of a date, adding a whole number means adding the corresponding number of days. For a date with time, it means adding the corresponding number of seconds.
If arguments are floating-point numbers, they are pre-converted to integers by dropping the decimal portion. The remainder is taken in the same sense as in C++. Truncated division is used for negative numbers.
The result type is an integer with bits equal to the maximum bits of its arguments. If at least one of the arguments is signed, the result is a signed number. If an argument is a floating-point number, it is cast to Int64.
Signed and unsigned numbers are compared the same way as in C++. That is, you can get an incorrect result in some cases. Example: SELECT 9223372036854775807 > -1
When converting to or from a string, the value is formatted or parsed using the same rules as for the TabSeparated format (and almost all other text formats). If the string can't be parsed, an exception is thrown and the request is canceled.
When converting dates to numbers or vice versa, the date corresponds to the number of days since the beginning of the Unix epoch.
When converting dates with times to numbers or vice versa, the date with time corresponds to the number of seconds since the beginning of the Unix epoch.
As an exception, if converting from UInt32, Int32, UInt64, or Int64 type numbers to Date, and if the number is greater than or equal to 65536, the number is interpreted as a Unix timestamp (and not as the number of days) and is rounded to the date. This allows support for the common occurrence of writing 'toDate(unix_timestamp)', which otherwise would be an error and would require writing the more cumbersome 'toDate(toDateTime(unix_timestamp))'.
Converts a String type argument to a FixedString(N) type (a string with fixed length N). N must be a constant. If the string has fewer bytes than N, it is passed with null bytes to the right. If the string has more bytes than N, an exception is thrown.
These functions accept a string and interpret the bytes placed at the beginning of the string as a number in host order (little endian). If the string isn't long enough, the functions work as if the string is padded with the necessary number of null bytes. If the string is longer than needed, the extra bytes are ignored. A date is interpreted as the number of days since the beginning of the Unix Epoch, and a date with time is interpreted as the number of seconds since the beginning of the Unix Epoch.
This function accepts a number or date or date with time, and returns a string containing bytes representing the corresponding value in host order (little endian). Null bytes are dropped from the end. For example, a UInt32 type value of 255 is a string that is one byte long.
- Converts a date with time to a UInt8 number containing the number of the hour in 24-hour time (0-23).
This function assumes that if clocks are moved ahead, it is by one hour and occurs at 2 a.m., and if clocks are moved back, it is by one hour and occurs at 3 a.m. (which is not always true - even in Moscow the clocks were once changed at a different time).
This function is specific to Yandex.Metrica, since half an hour is the minimum amount of time for breaking a session into two sessions if a counter shows a single user's consecutive pageviews that differ in time by strictly more than this amount. This means that tuples (the counter number, user ID, and time slot) can be used to search for pageviews that are included in the corresponding session.
- For a time interval starting at 'StartTime' and continuing for 'Duration' seconds, it returns an array of moments in time, consisting of points from this interval rounded down to the half hour.
For example, %%timeSlots(toDateTime('2012-01-01 12:20:00'), toUInt32(600)) = [toDateTime('2012-01-01 12:00:00'), toDateTime('2012-01-01 12:30:00')]%%.
- Returns the length of a string in Unicode code points (not in characters), assuming that the string contains a set of bytes that make up UTF-8 encoded text. If this assumption is not met, it returns some result (it doesn't throw an exception).
- Converts a string to lowercase, assuming the string contains a set of bytes that make up a UTF-8 encoded text. It doesn't detect the language. So for Turkish the result might not be exactly correct.
- Converts a string to uppercase, assuming the string contains a set of bytes that make up a UTF-8 encoded text. It doesn't detect the language. So for Turkish the result might not be exactly correct.
- Reverses a sequence of Unicode code points, assuming that the string contains a set of bytes representing a UTF-8 text. Otherwise, it does something else (it doesn't throw an exception).
- Returns a substring starting with the byte from the 'offset' index that is 'length' bytes long. Character indexing starts from one (as in standard SQL). The 'offset' and 'length' arguments must be constants.
The same as 'substring', but for Unicode code points. Works under the assumption that the string contains a set of bytes representing a UTF-8 encoded text. If this assumption is not met, it returns some result (it doesn't throw an exception).
The same as 'position', but the position is returned in Unicode code points. Works under the assumption that the string contains a set of bytes representing a UTF-8 encoded text. If this assumption is not met, it returns some result (it doesn't throw an exception).
Note that the backslash symbol (%%\%%) is used for escaping in the regular expression. The same symbol is used for escaping in string literals. So in order to escape the symbol in a regular expression, you must write two backslashes (%%\\%%) in a string literal.
Extracts a fragment of a string using a regular expression. If 'haystack' doesn't match the 'pattern' regex, an empty string is returned. If the regex doesn't contain subpatterns, it takes the fragment that matches the entire regex. Otherwise, it takes the fragment that matches the first subpattern.
Extracts all the fragments of a string using a regular expression. If 'haystack' doesn't match the 'pattern' regex, an empty string is returned. Returns an array of strings consisting of all matches to the regex. In general, the behavior is the same as the 'extract' function (it takes the first subpattern, or the entire expression if there isn't a subpattern).
For regular expressions like%%%needle%%%, the code is more optimal and works as fast as the 'position' function. For other regular expressions, the code is the same as for the 'match' function.
The arguments must be constants and have types that have the smallest common type. At least one argument must be passed, because otherwise it isn't clear which type of array to create. That is, you can't use this function to create an empty array (to do that, use the 'emptyArray*' function described above).
Negative indexes are supported - in this case, it selects the corresponding element numbered from the end. For example, 'arr[-1]' is the last item in the array.
- Returns the number of elements in the array equal to 'x'. Equivalent to <spanclass="inline-example">arrayCount(elem -> elem = x, arr)</span>.
In this example, Reaches is the number of conversions (the strings received after applying ARRAY JOIN), and Hits is the number of pageviews (strings before ARRAY JOIN). In this particular case, you can get the same result in an easier way:
In this example, each goal ID has a calculation of the number of conversions (each element in the Goals nested data structure is a goal that was reached, which we refer to as a conversion) and the number of sessions. Without ARRAY JOIN, we would have counted the number of sessions as %%sum(Sign)%%. But in this particular case, the rows were multiplied by the nested Goals structure, so in order to count each session one time after this, we apply a condition to the value of the %%arrayEnumerateUniq(Goals.ID)%% function.
The arrayEnumerateUniq function can take multiple arrays of the same size as arguments. In this case, uniqueness is considered for tuples of elements in the same positions in all the arrays.
Allows describing a lambda function for passing to a higher-order function. The left side of the arrow has a formal parameter - any ID, or multiple formal parameters - any IDs in a tuple. The right side of the arrow has an expression that can use these formal parameters, as well as any table columns.
A lambda function that accepts multiple arguments can be passed to a higher-order function. In this case, the higher-order function is passed several arrays of identical length that these arguments will correspond to.
For all functions other than 'arrayMap' and 'arrayFilter', the first argument (the lambda function) can be omitted. In this case, identical mapping is assumed.
Returns the number of elements in 'arr' for which 'func' returns something other than 0. If 'func' is not specified, it returns the number of non-zero items in the array.
Returns an array of selected substrings. Empty substrings may be selected if the separator occurs at the beginning or end of the string, or if there are multiple consecutive separators.
The first significant subdomain is a second-level domain if it is 'com', 'net', 'org', or 'co'. Otherwise, it is a third-level domain.
For example, firstSignificantSubdomain('https://news.yandex.ru/') = 'yandex', firstSignificantSubdomain('https://news.yandex.com.tr/') = 'yandex'.
- Selects the value of the 'name' parameter in the URL, if present. Otherwise, selects an empty string. If there are many parameters with this name, it returns the first occurrence. This function works under the assumption that the parameter name is encoded in the URL in exactly the same way as in the argument passed.
- Gets an array containing the URL trimmed to the %%/%%, %%?%% characters in the path and query-string. Consecutive separator characters are counted as one. The cut is made in the position after all the consecutive separator characters. Example:
- Removes the URL parameter named 'name', if present. This function works under the assumption that the parameter name is encoded in the URL exactly the same way as in the passed argument.
Takes a UInt32 number. Interprets it as an IPv4 address in big endian. Returns a string containing the corresponding IPv4 address in the format A.B.C.d (dot-separated numbers in decimal form).
Since using 'xxx' is highly unusual, this may be changed in the future. We recommend that you don't rely on the exact format of this fragment.
The only purpose of this argument is to prevent common subexpression elimination, so that two different instances of the same function return different columns with different random numbers.
SipHash is a cryptographic hash function. It works at least three times faster than MD5. For more information, see <ahref="https://131002.net/siphash/">https://131002.net/siphash/</a>
The function works fairly slowly (SHA-1 processes about 5 million short strings per second per processor core, while SHA-224 and SHA-256 process about 2.2 million).
We recommend using this function only in cases when you need a specific hash function and you can't select it.
Even in these cases, we recommend applying the function offline and pre-calculating values when inserting them into the table, instead of applying it in SELECTS.
URLHash(s) - Calculates a hash from a string without one of the trailing symbols /,? or # at the end, if present.
URL Hash(s, N) - Calculates a hash from a string up to the N level in the URL hierarchy, without one of the trailing symbols /,? or # at the end, if present.
Accepts a string, number, date, or date with time. Returns a string containing the argument's hexadecimal representation. Uses uppercase letters A-F. Doesn't use %%0x%% prefixes or %%h%% suffixes. For strings, all bytes are simply encoded as two hexadecimal numbers. Numbers are converted to big endian ("human readable") format. For numbers, older zeros are trimmed, but only by entire bytes. For example, %%hex(1) = '01'%%. Dates are encoded as the number of days since the beginning of the Unix Epoch. Dates with times are encoded as the number of seconds since the beginning of the Unix Epoch.
Accepts a string containing any number of hexadecimal digits, and returns a string containing the corresponding bytes. Supports both uppercase and lowercase letters A-F. The number of hexadecimal digits doesn't have to be even. If it is odd, the last digit is interpreted as the younger half of the 00-0F byte. If the argument string contains anything other than hexadecimal digits, some implementation-defined result is returned (an exception isn't thrown).
If you want to convert the result to a number, you can use the functions 'reverse' and 'reinterpretAs<i>Type</i>'.
Accepts an integer. Returns a string containing the list of powers of two that total the source number when summed. They are comma-separated without spaces in text format, in ascending order.
Accepts an integer. Returns an array of UInt64 numbers containing the list of powers of two that total the source number when summed. Numbers in the array are in ascending order.
Returns a rounder number that is less than or equal to 'x'.
A round number is a multiple of <spanclass="inline-example">1 / 10<sup>N</sup></span>, or the nearest number of the appropriate data type if <spanclass="inline-example">1 / 10<sup>N</sup></span> isn't exact.
Returns the smallest round number that is greater than or equal to 'x'. In every other way, it is the same as the 'floor' function (see above).
Accepts a number. If the number is less than one, it returns 0. Otherwise, it rounds the number down to the nearest (whole non-negative) degree of two.
Accepts a number. If the number is less than one, it returns 0. Otherwise, it rounds the number down to numbers from the set: 1, 10, 30, 60, 120, 180, 240, 300, 600, 1200, 1800, 3600, 7200, 18000, 36000. This function is specific to Yandex.Metrica and used for implementing the report on session length.
Accepts a number. If the number is less than 18, it returns 0. Otherwise, it rounds the number down to numbers from the set: 18, 25, 35, 45. This function is specific to Yandex.Metrica and used for implementing the report on user age.
All the functions return a Float64 number. The accuracy of the result is close to the maximum precision possible, but the result might not coincide with the machine representable number nearest to the corresponding real number.
If 'x' is non-negative, then %%erf(x / σ√2)%% is the probability that a random variable having a normal distribution with standard deviation 'σ' takes the value that is separated from the expected value by more than 'x'.
In order for the functions below to work, the server config must specify the paths and addresses for getting all the Yandex.Metrica dictionaries. The dictionaries are loaded at the first call of any of these functions. If the reference lists can't be loaded, an exception is thrown.
ClickHouse supports working with multiple alternative geobases (regional hierarchies) simultaneously, in order to support various perspectives on which countries certain regions belong to.
All the dictionaries are re-loaded in runtime (once every certain number of seconds, as defined in the builtin_dictionaries_reload_interval config parameter, or once an hour by default). However, the list of available dictionaries is defined one time, when the server starts.
Accepts a UInt32 number - the region ID from the Yandex geobase. If this region is a city or part of a city, it returns the region ID for the appropriate city. Otherwise, returns 0.
Checks whether a 'lhs' region belongs to a 'rhs' region. Returns a UInt8 number equal to 1 if it belongs, or 0 if it doesn't belong.
The relationship is reflexive - any region also belongs to itself.
Accepts a UInt32 number - the region ID from the Yandex geobase. Returns an array of region IDs consisting of the passed region and all parents along the chain.
Accepts a UInt32 number - the region ID from the Yandex geobase. A string with the name of the language can be passed as a second argument. Supported languages are: ru, en, ua, uk, by, kz, tr. If the second argument is omitted, the language 'ru' is used. If the language is not supported, an exception is thrown. Returns a string - the name of the region in the corresponding language. If the region with the specified ID doesn't exist, an empty string is returned.
Accepts a UInt8 number - the ID of the operating system from the Yandex.Metrica dictionary. If any OS matches the passed number, it returns a UInt8 number - the ID of the corresponding root OS (for example, it converts Windows Vista to Windows). Otherwise, returns 0.
Accepts a UInt8 number - the ID of the operating system from the Yandex.Metrica dictionary. Returns an array with a hierarchy of operating systems. Similar to the 'regionHierarchy' function.
Accepts a UInt8 number - the ID of the search engine from the Yandex.Metrica dictionary. If any search engine matches the passed number, it returns a UInt8 number - the ID of the corresponding root search engine (for example, it converts Yandex.Images to Yandex). Otherwise, returns 0.
Accepts a UInt8 number - the ID of the search engine from the Yandex.Metrica dictionary. Returns an array with a hierarchy of search engines. Similar to the 'regionHierarchy' function.
- For the 'dict_name' hierarchical dictionary, finds out whether the 'child_id' key is located inside 'ancestor_id' (or matches 'ancestor_id'). Returns UInt8.
- For the 'dict_name' hierarchical dictionary, returns an array of dictionary keys starting from 'id' and continuing along the chain of parent elements. Returns Array(UInt64).
In Yandex.Metrica, JSON is passed by users as <i>session parameters</i>. There are several functions for working with this JSON. (Although in most of the cases, the JSONs are additionally pre-processed, and the resulting values are put in separate columns in their processed format.) All these functions are based on strong assumptions about what the JSON can be, but they try not to do anything.
Parses UInt64 from the value of the field named 'name'. If this is a string field, it tries to parse a number from the beginning of the string. If the field doesn't exist, or it exists but doesn't contain a number, it returns 0.
Currently, there is no support for code points not from the basic multilingual plane written in the format \uXXXX\uYYYY (they are converted to CESU-8 instead of UTF-8).
Tuples are normally used as intermediate values for an argument of IN operators, or for creating a list of formal parameters of lambda functions. Tuples can't be written to a table.
'N' is the column index, starting from 1. 'N' must be a constant. 'N' must be a strict postive integer no greater than the size of the tuple.
- Returns a string with the name of the host that this function was performed on. For distributed processing, this is the name of the remote server host, if the function is performed on a remote server.
- Calculates the approximate width when outputting values to the console in text format (tab-separated). This function is used by the system for implementing Pretty formats.
- Turns a constant into a full column containing just one value.
In ClickHouse, full columns and constants are represented differently in memory. Functions work differently for constant arguments and normal arguments (different code is executed), although the result is almost always the same. This function is for debugging this behavior.
If the 'x' value is equal to one of the elements in the 'array_from' array, it returns the existing element (that is numbered the same) from the 'array_to' array. Otherwise, it returns 'default'. If there are multiple matching elements in 'array_from', it returns one of the matches.
Differs from the first variation in that the 'default' argument is omitted.
If the 'x' value is equal to one of the elements in the 'array_from' array, it returns the matching element (that is numbered the same) from the 'array_to' array. Otherwise, it returns 'x'.
Normal functions don't change a set of rows, but just change the values in each row (map). Aggregate functions compress a set of rows (fold or reduce).
The 'arrayJoin' function takes each row and generates a set of rows (unfold).
This function takes an array as an argument, and propagates the source row to multiple rows for the number of elements in the array.
All the values in columns are simply copied, except the values in the column where this function is applied - it is replaced with the corresponding array value.
A 'SELECT count() FROM table' query is not optimized, because the number of entries in the table is not stored separately. It will select some small column from the table and count the number of values in it.
When a SELECT query has the GROUP BY clause or at least one aggregate function, ClickHouse (in contrast to MySQL) requires that all expressions in the SELECT, HAVING, and ORDER BY clauses be calculated from keys or from aggregate functions. That is, each column selected from the table must be used either in keys, or inside aggregate functions. To get behavior like in MySQL, you can put the other columns in the 'any' aggregate function.
Calculates the 'arg' value for a minimal 'val' value. If there are several different values of 'arg' for minimal values of 'val', the first of these values encountered is output.
Calculates the 'arg' value for a maximum 'val' value. If there are several different values of 'arg' for maximum values of 'val', the first of these values encountered is output.
Uses an adaptive sampling algorithm: for the calculation state, it uses a sample of element hash values with a size up to 65535.
Compared with the widely known HyperLogLog algorithm, this algorithm is less effective in terms of accuracy and memory consumption (even up to proportionality), but it is adaptive. This means that with fairly high accuracy, it consumes less memory during simultaneous computation of cardinality for a large number of data sets whose cardinality has power law distribution (i.e. in cases when most of the data sets are small). This algorithm is also very accurate for data sets with small cardinality (up to 65536) and very efficient on CPU (when computing not too many of these functions, using 'uniq' is almost as fast as using other aggregate functions).
There is no compensation for the bias of an estimate, so for large data sets the results are systematically deflated. This function is normally used for computing the number of unique visitors in Yandex.Metrica, so this bias does not play a role.
Uses the HyperLogLog algorithm to approximate the number of different values of the argument. It uses 2<sup>12</sup> 5-bit cells. The size of the state is slightly more than 2.5 KB.
The 'uniqExact' function uses more memory than the 'uniq' function, because the size of the state has unbounded growth as the number of different values increases.
The returned value has the Float32 type. If no values were passed to the function (when using 'medianTimingIf' or 'quantileTimingIf'), 'nan' is returned. The purpose of this is to differentiate these instances from zeros. See the note on sorting NaNs in "ORDER BY clause".
For its purpose (calculating quantiles of page loading times), using this function is more effective and the result is more accurate than for the 'median/quantile' function.
This function works similarly to the 'median' function - it approximates the median. However, in contrast to 'median', the result is deterministic and does not depend on the order of query execution.
To achieve this, the function takes a second argument - the "determinator". This is a number whose hash is used instead of a random number generator in the reservoir sampling algorithm. For the function to work correctly, the same determinator value should not occur too often. For the determinator, you can use an event ID, user ID, and so on.
Don't use this function for calculating timings. The 'medianTiming', 'quantileTiming', and 'quantilesTiming' functions are better suited to this purpose.
Calculates the amount <spanclass="inline-example">Σ((x - x̅)<sup>2</sup>) / (n - 1)</span>, where 'n' is the sample size and 'x̅' is the average value of 'x'.
Calculates the amount <spanclass="inline-example">Σ((x - x̅)<sup>2</sup>) / n</span>, where 'n' is the sample size and 'x̅' is the average value of 'x'.
Some aggregate functions can accept not only argument columns (used for compression), but a set of parameters - constants for initialization. The syntax is two pairs of brackets instead of one. The first is for parameters, and the second is for arguments.
Approximates the 'level' quantile. 'level' is a constant, a floating-point number from 0 to 1. We recommend using a 'level' value in the range of 0.01 .. 0.99.
Don't use a 'level' value equal to 0 or 1 - use the 'min' and 'max' functions for these cases.
The algorithm is the same as for the 'median' function. Actually, 'quantile' and 'median' are internally the same function. You can use the 'quantile' function without parameters - in this case, it calculates the median, and you can use the 'median' function with parameters - in this case, it calculates the quantile of the set level.
When using multiple 'quantile' and 'median' functions with different levels in a query, the internal states are not combined (that is, the query works less efficiently than it could). In this case, use the 'quantiles' function.
The suffix -%%If%% can be appended to the name of any aggregate function. In this case, the aggregate function accepts an extra argument - a condition (Uint8 type). The aggregate function processes only the rows that trigger the condition. If the condition was not triggered even once, it returns a default value (usually zeros or empty strings).
The -%%Array%% suffix can be appended to any aggregate function. In this case, the aggregate function takes arguments of the 'Array(T)' type (arrays) instead of 'T' type arguments. If the aggregate function accepts multiple arguments, this must be arrays of equal lengths. When processing arrays, the aggregate function works like the original aggregate function across all array elements.
Example 1: %%sumArray(arr)%% - Totals all the elements of all 'arr' arrays. In this example, it could have been written more simply: %%sum(arraySum(arr))%%.
Example 2: %%uniqArray(arr)%% - Count the number of unique elements in all 'arr' arrays. This could be done an easier way: %%uniq(arrayJoin(arr))%%, but it's not always possible to add 'arrayJoin' to a query.
The -%%If%% and -%%Array%% combinators can be used together. However, 'Array' must come first, then 'If'. Examples: %%uniqArrayIf(arr, cond)%%, %%quantilesTimingArrayIf(level1, level2)(arr, cond)%%. Due to this order, the 'cond' argument can't be an array.
A dictionary is a mapping (key -> attributes) that can be used in a query as functions. You can think of this as a more convenient and efficient type of JOIN with dimension tables.
All the functions support "translocality," the ability to simultaneously use different perspectives on region ownership. For more information, see the section "Functions for working with Yandex.Metrica dictionaries".
Put the regions_hierarchy*.txt files in the path_to_regions_hierarchy_file directory. This configuration parameter must contain the path to the regions_hierarchy.txt file (the default regional hierarchy), and the other files (regions_hierarchy_ua.txt) must be located in the same directory.
Dictionaries can be updated without the server restart. However, the set of available dictionaries is not updated. For updates, the file modification times are checked. If a file has changed, the dictionary is updated.
Dictionary updates (other than loading at first use) do not block queries. During updates, queries use the old versions of dictionaries. If an error occurs during an update, the error is written to the server log, while queries continue using the old version of dictionaries.
We recommend periodically updating the dictionaries with the geobase. During an update, generate new files and write them to a separate location. When everything is ready, rename them to the files used by the server.
It is possible to add your own dictionaries from various data sources. The data source for a dictionary can be a file in the local file system, the ClickHouse server, or a MySQL server.
A dictionary can be stored completely in RAM and updated regularly, or it can be partially cached in RAM and dynamically load missing values.
The configuration of external dictionaries is in a separate file or files specified in the 'dictionaries_config' configuration parameter.
This parameter contains the absolute or relative path to the file with the dictionary configuration. A relative path is relative to the directory with the server config file. The path can contain wildcards * and ?, in which case all matching files are found. Example: dictionaries/*.xml.
The dictionary configuration, as well as the set of files with the configuration, can be updated without restarting the server. The server checks updates every 5 seconds. This means that dictionaries can be enabled dynamically.
Dictionaries can be created when starting the server, or at first use. This is defined by the 'dictionaries_lazy_load' parameter in the main server config file. This parameter is optional, 'true' by default. If set to 'true', each dictionary is created at first use. If dictionary creation failed, the function that was using the dictionary throws an exception. If 'false', all dictionaries are created when the server starts, and if there is an error, the server shuts down.
<comment>Optional element with any content; completely ignored.</comment>
<!--You can set any number of different dictionaries. -->
<dictionary>
<!-- Dictionary name. The dictionary will be accessed for use by this name. -->
<name>os</name>
<!-- Data source. -->
<source>
<!-- Source is a file in the local file system. -->
<file>
<!-- Path on the local file system. -->
<path>/opt/dictionaries/os.tsv</path>
<!-- Which format to use for reading the file. -->
<format>TabSeparated</format>
</file>
<!-- or the source is a table on a MySQL server.
<mysql>
<!- - These parameters can be specified outside (common for all replicas) or inside a specific replica - ->
<port>3306</port>
<user>metrika</user>
<password>qwerty</password>
<!- - Specify from one to any number of replicas for fault tolerance. - ->
<replica>
<host>example01-1</host>
<priority>1</priority> <!- - The lower the value, the higher the priority. - ->
</replica>
<replica>
<host>example01-2</host>
<priority>1</priority>
</replica>
<db>conv_main</db>
<table>counters</table>
</mysql>
-->
<!-- or the source is a table on the ClickHouse server.
<clickhouse>
<host>example01-01-1</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>default</db>
<table>counters</table>
</clickhouse>
<!- - If the address is similar to localhost, the request is made without network interaction. For fault tolerance, you can create a Distributed table on localhost and enter it. - ->
-->
</source>
<!-- Update interval for fully loaded dictionaries. 0 - never update. -->
<lifetime>
<min>300</min>
<max>360</max>
<!-- The update interval is selected uniformly randomly between min and max, in order to spread out the load when updating dictionaries on a large number of servers. -->
</lifetime>
<!-- or <!- - The update interval for fully loaded dictionaries or invalidation time for cached dictionaries. 0 - never update. - ->
<lifetime>300</lifetime>
-->
<layout> <!-- Method for storing in memory. -->
<flat />
<!-- or <hashed />
or
<cache>
<!- - Cache size in number of cells; rounded up to a degree of two. - ->
<size_in_cells>1000000000</size_in_cells>
</cache> -->
</layout>
<!-- Structure. -->
<structure>
<!-- Description of the column that serves as the dictionary identifier (key). -->
<id>
<!-- Column name with ID. -->
<name>Id</name>
</id>
<attribute>
<!-- Column name. -->
<name>Name</name>
<!-- Column type. (How the column is understood when loading. For MySQL, a table can have TEXT, VARCHAR, and BLOB, but these are all loaded as String) -->
<type>String</type>
<!-- Value to use for a non-existing element. In the example, an empty string. -->
<null_value></null_value>
</attribute>
<!-- Any number of attributes can be specified. -->
<attribute>
<name>ParentID</name>
<type>UInt64</type>
<null_value>0</null_value>
<!-- Whether it defines a hierarchy - mapping to the parent ID (by default, false). -->
<hierarchical>true</hierarchical>
<!-- The mapping id -> attribute can be considered injective, in order to optimize GROUP BY. (by default, false) -->
The dictionary identifier (key attribute) must be a number that fits into UInt64. Compound and string keys are not supported. However, if your dictionary has a complex key, you can hash it and use the hash as the key. You can use View for this purpose (in both ClickHouse and MySQL).
This is the most effective method. It works if all keys are smaller than 500,000. If a larger key is discovered when creating the dictionary, an exception is thrown and the dictionary is not created. The dictionary is loaded to RAM in its entirety. The dictionary uses the amount of memory proportional to maximum key value. With the limit of 500,000, memory consumption is not likely to be high. All types of sources are supported. When updating, data (from a file or from a table) is read in its entirety.
This method is slightly less effective than the first one. The dictionary is also loaded to RAM in its entirety, and can contain any number of items with any identifiers. In practice, it makes sense to use up to tens of millions of items, while there is enough RAM.
3. %%cache%% - This is the least effective method. It is appropriate if the dictionary doesn't fit in RAM. It is a cache of a fixed number of cells, where frequently-used data can be located. MySQL and ClickHouse sources are supported, but file sources are not supported. When searching a dictionary, the cache is searched first. For each data block, all keys not found in the cache (or expired keys) are collected in a package, which is sent to the source with the query %%SELECT attrs... FROM db.table WHERE id IN (k1, k2, ...)%%. The received data is then written to the cache.
Use the cache method only in cases when it is unavoidable. The speed of the cache depends strongly on correct settings and the usage scenario. A cache type dictionary only works normally for high enough hit rates (recommended 99% and higher). You can view the average hit rate in the system.dictionaries table. Set a large enough cache size. You will need to experiment to find the right number of cells - select a value, use a query to get the cache completely full, look at the memory consumption (this information is in the system.dictionaries table), then proportionally increase the number of cells so that a reasonable amount of memory is consumed. We recommend MySQL as the source for the cache, because ClickHouse doesn't handle requests with random reads very well.
In all cases, performance is better if you call the function for working with a dictionary after GROUP BY, and if the attribute being fetched is marked as injective. For a dictionary cache, performance improves if you call the function after LIMIT. To do this, you can use a subquery with LIMIT, and call the function with the dictionary from the outside.
An attribute is called injective if different attribute values correspond to different keys. So when GROUP BY uses a function that fetches an attribute value by the key, this function is automatically taken out of GROUP BY.
When updating dictionaries from a file, first the file modification time is checked, and it is loaded only if the file has changed.
When updating from MySQL, for flat and hashed dictionaries, first a SHOW TABLE STATUS query is made, and the table update time is checked. If it is not NULL, it is compared to the stored time. This works for MyISAM tables, but for InnoDB tables the update time is unknown, so loading from InnoDB is performed on each update.
For cache dictionaries, the expiration (lifetime) of data in the cache can be set. If more time than 'lifetime' has passed since loading the data in a cell, the cell's value is not used, and it is re-requested the next time it needs to be used.
Dictionary updates (other than loading for first use) do not block queries. During updates, the old version of a dictionary is used. If an error occurs during an update, the error is written to the server log, and queries continue using the old version of dictionaries.
Note that you can convert values for a small dictionary by specifying all the contents of the dictionary directly in a SELECT query (see the section "transform function"). This functionality is not related to external dictionaries.
In this section, we review settings that you can make using a SET query or in a config file. Remember that these settings can be set for a session or globally. Settings that can only be made in the server config file are not covered here.
In ClickHouse, data is processed by blocks (sets of column parts). The internal processing cycles for a single block are efficient enough, but there are noticeable expenditures on each block. 'max_block_size' is a recommendation for what size of block (in number of rows) to load from tables. The block size shouldn't be too small, so that the expenditures on each block are still noticeable, but not too large, so that the query with LIMIT that is completed after the first block is processed quickly, so that too much memory isn't consumed when extracting a large number of columns in multiple threads, and so that at least some cache locality is preserved.
Blocks the size of 'max_block_size' are not always loaded from the table. If it is obvious that less data needs to be retrieved, a smaller block is processed.
This setting only applies in cases when the server forms the blocks.
For example, for an INSERT via the HTTP interface, the server parses the data format and forms blocks of the specified size.
But when using clickhouse-client, the client parses the data itself, and the 'max_insert_block_size' setting on the server doesn't affect the size of the inserted blocks.
This is slightly more than 'max_block_size'. The reason for this is because certain table engines (*MergeTree) form a data part on the disk for each inserted block, which is a fairly large entity. Similarly, *MergeTree tables sort data during insertion, and a large enough block size allows sorting more data in RAM.
This parameter applies to threads that perform the same stages of the query execution pipeline in parallel.
For example, if reading from a table, evaluating expressions with functions, filtering with WHERE and pre-aggregating for GROUP BY can all be done in parallel using at least 'max_threads' number of threads, then 'max_threads' are used.
If less than one SELECT query is normally run on a server at a time, set this parameter to a value slightly less than the actual number of processor cores.
For queries that are completed quickly because of a LIMIT, you can set a lower 'max_threads'. For example, if the necessary number of entries are located in every block and max_threads = 8, 8 blocks are retrieved, although it would have been enough to read just one.
The maximum size of blocks of uncompressed data before compressing for writing to a table. By default, 1,048,576 (1 MiB). If the size is reduced, the compression rate is significantly reduced, the compression and decompression speed increases slightly due to cache locality, and memory consumption is reduced. There usually isn't any reason to change this setting.
For *MergeTree tables. In order to reduce latency when processing queries, a block is compressed when writing the next mark if its size is at least 'min_compress_block_size'. By default, 65,536.
The actual size of the block, if the uncompressed data less than 'max_compress_block_size' is no less than this value and no less than the volume of data for one mark.
We are writing a UInt32-type column (4 bytes per value). When writing 8192 rows, the total will be 32 KB of data. Since min_compress_block_size = 65,536, a compressed block will be formed for every two marks.
We are writing a URL column with the String type (average size of 60 bytes per value). When writing 8192 rows, the average will be slightly less than 500 KB of data. Since this is more than 65,536, a compressed block will be formed for each mark. In this case, when reading data from the disk in the range of a single mark, extra data won't be decompressed.
The maximum part of a query that can be taken to RAM for parsing with the SQL parser.
The INSERT query also contains data for INSERT that is processed by a separate stream parser (that consumes O(1) RAM), which is not included in this restriction.
The maximum number of simultaneous connections with remote servers for distributed processing of a single query to a single Distributed table. We recommend setting a value no less than the number of servers in the cluster.
The following parameters are only used when creating Distributed tables (and when launching a server), so there is no reason to change them at runtime.
The maximum number of simultaneous connections with remote servers for distributed processing of all queries to a single Distributed table. We recommend setting a value no less than the number of servers in the cluster.
The timeout in milliseconds for connecting to a remote server for a Distributed table engine, if the 'shard' and 'replica' sections are used in the cluster definition.
Whether to use a cache of uncompressed blocks. Accepts 0 or 1. By default, 0 (disabled).
The uncompressed cache (only for tables in the MergeTree family) allows significantly reducing latency and increasing throughput when working with a large number of short queries. Enable this setting for users who send frequent short requests. Also pay attention to the 'uncompressed_cache_size' configuration parameter (only set in the config file) - the size of uncompressed cache blocks. By default, it is 8 GiB. The uncompressed cache is filled in as needed; the least-used data is automatically deleted.
For queries that read at least a somewhat large volume of data (one million rows or more), the uncompressed cache is disabled automatically in order to save space for truly small queries. So you can keep the 'use_uncompressed_cache' setting always set to 1.
When using the HTTP interface, the 'query_id' parameter can be passed. This is any string that serves as the query identifier.
If a query from the same user with the same 'query_id' already exists at this time, the behavior depends on the 'replace_running_query' parameter.
Yandex.Metrica uses this parameter set to 1 for implementing suggestions for segmentation conditions. After entering the next character, if the old query hasn't finished yet, it should be canceled.
The number of errors is counted for each replica. The query is sent to the replica with the fewest errors, and if there are several of these, to any one of them.
Disadvantages: Server proximity is not accounted for; if the replicas have different data, you will also get different data.
The number of errors is counted for each replica. Every 5 minutes, the number of errors is integrally divided by 2. Thus, the number of errors is calculated for a recent time with exponential smoothing. If there is one replica with a minimal number of errors (i.e. errors occurred recently on the other replicas), the query is sent to it. If there are multiple replicas with the same minimal number of errors, the query is sent to the replica with a host name that is most similar to the server's host name in the config file (for the number of different characters in identical positions, up to the minimum length of both host names).
As an example, example01-01-1 and example01-01-2.yandex.ru are different in one position, while example01-01-1 and example01-02-2 differ in two places.
This method might seem a little stupid, but it doesn't use external data about network topology, and it doesn't compare IP addresses, which would be complicated for our IPv6 addresses.
Thus, if there are equivalent replicas, the closest one by name is preferred.
We can also assume that when sending a query to the same server, in the absence of failures, a distributed query will also go to the same servers. So even if different data is placed on the replicas, the query will return mostly the same results.
Replicas are accessed in the same order as they are specified. The number of errors does not matter. This method is appropriate when you know exactly which replica is preferable.
Most restrictions also have an 'overflow_mode' setting, meaning what to do when the limit is exceeded.
It can take one of two values: 'throw' or 'break'. Restrictions on aggregation (group_by_overflow_mode) also have the value 'any'.
throw - Throw an exception (default).
break - Stop executing the query and return the partial result, as if the source data ran out.
When using the GET method in the HTTP interface, 'readonly = 1' is set automatically. In other words, for queries that modify data, you can only use the POST method. You can send the query itself either in the POST body, or in the URL parameter.
Memory consumption is not fully considered for aggregate function states 'min', 'max', 'any', 'anyLast', 'argMin', and 'argMax' from String and Array arguments.
What to do when the number of unique keys for aggregation exceeds the limit: 'throw', 'break', or 'any'. By default, throw.
Using the 'any' value lets you run an approximation of GROUP BY. The quality of this approximation depends on the statistical nature of the data.
Minimal execution speed in rows per second. Checked on every data block when 'timeout_before_checking_execution_speed' expires. If the execution speed is lower, an exception is thrown.
Maximum number of columns that can be read from a table in a single query. If a query requires reading a greater number of columns, it throws an exception.
Maximum number of temporary columns that must be kept in RAM at the same time when running a query, including constant columns. If there are more temporary columns than this, it throws an exception.
Maximum pipeline depth. Corresponds to the number of transformations that each data block goes through during query processing. Counted within the limits of a single server. If the pipeline depth is greater, an exception is thrown. By default, 1000.
Maximum nesting depth of a query syntactic tree. If exceeded, an exception is thrown. At this time, it isn't checked during parsing, but only after parsing the query. That is, a syntactic tree that is too deep can be created during parsing, but the query will fail. By default, 1000.
In the example, two profiles are set: 'default' and 'web'. The 'default' profile has a special purpose - it must always be present and is applied when starting the server. In other words, the 'default' profile contains default settings. The 'web' profile is a regular profile that can be set using the SET query or using a URL parameter in an HTTP query.
Settings profiles can inherit from each other. To use inheritance, indicate the 'profile' setting before the other settings that are listed in the profile.
The config can also define "substitutions". If an element has the 'incl' attribute, the corresponding substitution from the file will be used as the value. By default, the path to the file with substitutions is '/etc/metrika.xml'. This can be changed in the config in the 'include_from' element. The substitution values are specified in '/yandex/<i>substitution_name</i>' elements of this file.
The 'config.xml' file can specify a separate config with user settings, profiles, and quotas. The relative path to this config is set in the 'users_config' element. By default, it is 'users.xml'. If 'users_config' is omitted, the user settings, profiles, and quotas are specified directly in 'config.xml'. The server tracks changes to 'users_config' and reloads it in runtime. That is, you can add or change users and their settings without relaunching the server.
For 'users.config', overrides and substitutions may also exist in files from the '<i>users_config</i>.d' directory (for example, 'users.d'). Note that the server tracks updates only directly in the 'users.xml' file, so all the possible overrides are not updated in runtime.
For each config file, the server also generates <i>file</i>-preprocessed.xml files on launch. These files contain all the completed substitutions and overrides, and they are intended for informational use. The server itself does not use these files, and you do not need to edit them.
Here we can see that two users are declared: 'default' and 'web'. We added the 'web' user ourselves.
The 'default' user is chosen in cases when the username is not passed, so this user must be present in the config file. The 'default' user is also used for distributed query processing - the system accesses remote servers under this username. So the 'default' user must have an empty password and must not have substantial restrictions or quotas - otherwise, distributed queries will fail.
The password is specified in plain text directly in the config. In this regard, you should not consider these passwords as providing security against potential malicious attacks. Rather, they are necessary for protection from Yandex employees.
A list of networks is specified that access is allowed from. In this example, the list of networks for both users is loaded from a separate file (/etc/metrika.xml) containing the 'networks' substitution. Here is a fragment of it:
We could have defined this list of networks directly in 'users.xml', or in a file in the 'users.d' directory (for more information, see the section "Configuration files").
For use in production, only specify IP elements (IP addresses and their masks), since using 'host' and 'host_regexp' might cause extra latency.
Next the user settings profile is specified (see the section "Settings profiles"). You can specify the default profile, 'default'. The profile can have any name. You can specify the same profile for different users. The most important thing you can write in the settings profile is 'readonly' set to 1, which provides read-only access.
After this, the quota is defined (see the section "Quotas"). You can specify the default quota, 'default'. It is set in the config by default so that it only counts resource usage, but does not restrict it. The quota can have any name. You can specify the same quota for different users - in this case, resource usage is calculated for each user individually.
For the 'statbox' quota, restrictions are set for every hour and for every 24 hours (86,400 seconds). The time interval is counted starting from an implementation-defined fixed moment in time. In other words, the 24-hour interval doesn't necessarily begin at midnight.
If the limit is exceeded for at least one time interval, an exception is thrown with a text about which restriction was exceeded, for which interval, and when the new interval begins (when queries can be sent again).
For distributed query processing, the accumulated amounts are stored on the requestor server. So if the user goes to another server, the quota there will "start over".