Merge branch 'threadpool-fix-ub' of github.com:yandex/ClickHouse into threadpool-fix-ub

This commit is contained in:
Alexey Milovidov 2019-03-06 22:58:28 +03:00
commit 2f368b24df
19 changed files with 265 additions and 43 deletions

View File

@ -3,6 +3,7 @@
#include <Formats/ODBCDriver2BlockOutputStream.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
@ -82,8 +83,10 @@ void ODBCDriver2BlockOutputStream::writePrefix()
writeODBCString(out, "type");
for (size_t i = 0; i < columns; ++i)
{
const ColumnWithTypeAndName & col = header.getByPosition(i);
writeODBCString(out, col.type->getName());
auto type = header.getByPosition(i).type;
if (type->lowCardinality())
type = recursiveRemoveLowCardinality(type);
writeODBCString(out, type->getName());
}
}

View File

@ -103,7 +103,7 @@ struct Settings
\
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ") \
\
M(SettingUInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for input/output operations is bypassing the page cache. 0 - disabled.") \
M(SettingUInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for reading the data with O_DIRECT option during SELECT queries execution. 0 - disabled.") \
\
M(SettingBool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.") \
M(SettingBool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.") \

View File

@ -11,6 +11,7 @@
#include <Storages/MergeTree/MarkRange.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <DataTypes/DataTypeLowCardinality.h>
constexpr auto INDEX_FILE_PREFIX = "skp_idx_";

View File

@ -18,7 +18,7 @@ namespace ErrorCodes
}
/// 0b11 -- can be true and false at the same time
const Field UNKNOWN_FIELD(3);
const Field UNKNOWN_FIELD(3u);
MergeTreeSetIndexGranule::MergeTreeSetIndexGranule(const MergeTreeSetSkippingIndex & index)
@ -47,7 +47,16 @@ void MergeTreeSetIndexGranule::serializeBinary(WriteBuffer & ostr) const
for (size_t i = 0; i < index.columns.size(); ++i)
{
const auto & type = index.data_types[i];
type->serializeBinaryBulk(*columns[i], ostr, 0, size());
IDataType::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](IDataType::SubstreamPath) -> WriteBuffer * { return &ostr; };
settings.position_independent_encoding = false;
settings.low_cardinality_max_dictionary_size = 0;
IDataType::SerializeBinaryBulkStatePtr state;
type->serializeBinaryBulkStatePrefix(settings, state);
type->serializeBinaryBulkWithMultipleStreams(*columns[i], 0, size(), settings, state);
type->serializeBinaryBulkStateSuffix(settings, state);
}
}
@ -66,11 +75,21 @@ void MergeTreeSetIndexGranule::deserializeBinary(ReadBuffer & istr)
size_type->deserializeBinary(field_rows, istr);
size_t rows_to_read = field_rows.get<size_t>();
if (rows_to_read == 0)
return;
for (size_t i = 0; i < index.columns.size(); ++i)
{
const auto & type = index.data_types[i];
auto new_column = type->createColumn();
type->deserializeBinaryBulk(*new_column, istr, rows_to_read, 0);
IDataType::DeserializeBinaryBulkSettings settings;
settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.position_independent_encoding = false;
IDataType::DeserializeBinaryBulkStatePtr state;
type->deserializeBinaryBulkStatePrefix(settings, state);
type->deserializeBinaryBulkWithMultipleStreams(*new_column, rows_to_read, settings, state);
block.insert(ColumnWithTypeAndName(new_column->getPtr(), type, index.columns[i]));
}
@ -177,10 +196,24 @@ bool SetIndexCondition::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx_granule)
Block result = granule->getElementsBlock();
actions->execute(result);
const auto & column = result.getByName(expression_ast->getColumnName()).column;
auto column = result.getByName(expression_ast->getColumnName()).column->convertToFullColumnIfLowCardinality();
auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
const NullMap * null_map = nullptr;
if (auto * col_nullable = typeid_cast<const ColumnNullable *>(column.get()))
{
col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
null_map = &col_nullable->getNullMapData();
}
if (!col_uint8)
throw Exception("ColumnUInt8 expected as Set index condition result.", ErrorCodes::LOGICAL_ERROR);
auto & condition = col_uint8->getData();
for (size_t i = 0; i < column->size(); ++i)
if (column->getInt(i) & 1)
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
return true;
return false;

View File

@ -0,0 +1,30 @@
1 a
-
2 b
-
--
1 a
-
2 b
-
--
1 a
-
2 b
-
----
1 a
-
2 b
-
--
1 a
-
2 b
-
--
1 a
-
2 b
-
----

View File

@ -0,0 +1,69 @@
SET allow_experimental_data_skipping_indices=1;
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b Nullable(String), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b Nullable(String), INDEX b_index b TYPE set(1) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b Nullable(String), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a'), (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '----';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b LowCardinality(Nullable(String)), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b LowCardinality(Nullable(String)), INDEX b_index b TYPE set(1) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a');
insert into test.nullable_set_index values (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '--';
drop table if exists test.nullable_set_index;
create table test.nullable_set_index (a UInt64, b LowCardinality(Nullable(String)), INDEX b_index b TYPE set(0) GRANULARITY 8192) engine = MergeTree order by a;
insert into test.nullable_set_index values (1, 'a'), (2, 'b');
select * from test.nullable_set_index where b = 'a';
select '-';
select * from test.nullable_set_index where b = 'b';
select '-';
select * from test.nullable_set_index where b = 'c';
select '----';
drop table if exists test.nullable_set_index;

View File

@ -0,0 +1,20 @@
SET allow_experimental_data_skipping_indices=1;
drop table if exists test.null_lc_set_index;
CREATE TABLE test.null_lc_set_index (
timestamp DateTime,
action LowCardinality(Nullable(String)),
user LowCardinality(Nullable(String)),
INDEX test_user_idx (user) TYPE set(0) GRANULARITY 8192
) ENGINE=MergeTree
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, action, cityHash64(user))
SAMPLE BY cityHash64(user);
INSERT INTO test.null_lc_set_index VALUES (1550883010, 'subscribe', 'alice');
INSERT INTO test.null_lc_set_index VALUES (1550883020, 'follow', 'bob');
SELECT action, user FROM test.null_lc_set_index WHERE user = 'alice';
drop table if exists test.null_lc_set_index;

View File

@ -10,20 +10,37 @@ $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.cannot_kill_query"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test.cannot_kill_query (x UInt64) ENGINE = MergeTree ORDER BY x" &> /dev/null
$CLICKHOUSE_CLIENT -q "INSERT INTO test.cannot_kill_query SELECT * FROM numbers(10000000)" &> /dev/null
# This SELECT query will run for a long time. It's used as bloker for ALTER query. It will be killed with SYNC kill.
query_for_pending="SELECT count() FROM test.cannot_kill_query WHERE NOT ignore(sleep(1)) SETTINGS max_threads=1, max_block_size=1"
$CLICKHOUSE_CLIENT -q "$query_for_pending" &>/dev/null &
sleep 1 # queries should be in strict order
# This ALTER query will wait until $query_for_pending finished. Also it will block $query_to_kill.
$CLICKHOUSE_CLIENT -q "ALTER TABLE test.cannot_kill_query MODIFY COLUMN x UInt64" &>/dev/null &
sleep 1
# This SELECT query will also run for a long time. Also it's blocked by ALTER query. It will be killed with ASYNC kill.
# This is main idea which we check -- blocked queries can be killed with ASYNC kill.
query_to_kill="SELECT sum(1) FROM test.cannot_kill_query WHERE NOT ignore(sleep(1)) SETTINGS max_threads=1"
$CLICKHOUSE_CLIENT -q "$query_to_kill" &>/dev/null &
sleep 3 # just to be sure that 'KILL ...' will be executed after 'SELECT ... WHERE NOT ignore(sleep(1))'
sleep 1 # just to be sure that kill of $query_to_kill will be executed after $query_to_kill.
timeout 15 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill' SYNC" &>/dev/null
# Kill $query_to_kill with ASYNC kill. We will check that information about KILL is not lost.
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill' ASYNC" &>/dev/null
sleep 1
# Kill $query_for_pending SYNC. This query is not blocker, so it should be killed fast.
timeout 5 $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending' SYNC" &>/dev/null
# But let's sleep a little time, just to be sure
sleep 3
# Both queries have to be killed, doesn't matter with SYNC or ASYNC kill
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query='$query_for_pending'"
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query='$query_to_kill'"
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending'" &>/dev/null & # kill pending query
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.cannot_kill_query" &>/dev/null

2
debian/changelog.in vendored
View File

@ -1,4 +1,4 @@
clickhouse (2:@VERSION_STRING@) unstable; urgency=low
clickhouse (@VERSION_STRING@) unstable; urgency=low
* Modified source code

View File

@ -32,6 +32,7 @@
- [RClickhouse](https://github.com/IMSMWU/RClickhouse)
- Java
- [clickhouse-client-java](https://github.com/VirtusAI/clickhouse-client-java)
- [clickhouse-client](https://github.com/Ecwid/clickhouse-client)
- Scala
- [clickhouse-scala-client](https://github.com/crobox/clickhouse-scala-client)
- Kotlin

View File

@ -196,7 +196,7 @@ For more details, see [GraphiteMergeTree](../../operations/table_engines/graphit
The port for connecting to the server over HTTP(s).
If `https_port` is specified, [openSSL](#openssl) must be configured.
If `https_port` is specified, [openSSL](#server_settings-openssl) must be configured.
If `http_port` is specified, the openSSL configuration is ignored even if it is set.
@ -417,7 +417,7 @@ The value 0 means that you can delete all tables without any restrictions.
## merge_tree {#server_settings-merge_tree}
Fine tuning for tables in the [ MergeTree](../../operations/table_engines/mergetree.md).
Fine tuning for tables in the [MergeTree](../../operations/table_engines/mergetree.md).
For more information, see the MergeTreeSettings.h header file.
@ -430,7 +430,7 @@ For more information, see the MergeTreeSettings.h header file.
```
## openSSL
## openSSL {#server_settings-openssl}
SSL client/server configuration.
@ -609,6 +609,19 @@ Port for communicating with clients over the TCP protocol.
<tcp_port>9000</tcp_port>
```
## tcp_port_secure {#server_settings-tcp_port_secure}
Port for communicating with the clients over the secure connection by TCP protocol. Use it with [OpenSSL](#server_settings-openssl) settings.
**Possible values**
Positive integer.
**Default value**
```xml
<tcp_port_secure>9440</tcp_port_secure>
```
## tmp_path

View File

@ -175,6 +175,20 @@ Any positive integer.
**Default value**: 1048576.
## min_bytes_to_use_direct_io {#settings-min_bytes_to_use_direct_io}
The minimum data volume to be read from storage required for using of the direct I/O access to the storage disk.
ClickHouse uses this setting when selecting the data from tables. If summary storage volume of all the data to be read exceeds `min_bytes_to_use_direct_io` bytes, then ClickHouse reads the data from the storage disk with `O_DIRECT` option.
**Possible values**
Positive integer.
0 — The direct I/O is disabled.
**Default value**: 0.
## log_queries
Setting up query logging.

View File

@ -9,38 +9,38 @@ Kafka lets you:
- Process streams as they become available.
Old format:
## Creating a Table {#table_engine-kafka-creating-a-table}
```
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_skip_broken_messages = <0|1>]
```
New format:
```
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_schema = '',
kafka_num_consumers = 2
```
Required parameters:
- `kafka_broker_list` A comma-separated list of brokers (`localhost:9092`).
- `kafka_topic_list` A list of Kafka topics (`my_topic`).
- `kafka_group_name` A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section.
- `kafka_broker_list` A comma-separated list of brokers (for example, `localhost:9092`).
- `kafka_topic_list` A list of Kafka topics.
- `kafka_group_name` A group of Kafka consumers. Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL `FORMAT` function, such as ` JSONEachRow`. For more information, see the [Formats](../../interfaces/formats.md) section.
Optional parameters:
- `kafka_row_delimiter` - Character-delimiter of records (rows), which ends the message.
- `kafka_schema` An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_row_delimiter` Delimiter character, which ends the message.
- `kafka_schema` Parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
- `kafka_skip_broken_messages` Mode of Kafka messages parser. If `kafka_skip_broken_messages = 1` then the engine skips the Kafka messages (message equals a row of data) that can't be parsed.
Examples:
@ -72,6 +72,23 @@ Examples:
kafka_num_consumers = 4;
```
<details markdown="1"><summary>Deprecated Method for Creating a Table</summary>
!!! attention
Do not use this method in new projects and, if possible, switch the old projects to the method described above.
```
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
```
</details>
## Description
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.
Groups are flexible and synced on the cluster. For instance, if you have 10 topics and 5 copies of a table in a cluster, then each copy gets 2 topics. If the number of copies changes, the topics are redistributed across the copies automatically. Read more about this at [http://kafka.apache.org/intro](http://kafka.apache.org/intro).

View File

@ -70,6 +70,8 @@ For a description of request parameters, see [request description](../../query_l
- `SETTINGS` — Additional parameters that control the behavior of the `MergeTree`:
- `index_granularity` — The granularity of an index. The number of data rows between the "marks" of an index. By default, 8192. The list of all available parameters you can see in [MergeTreeSettings.h](https://github.com/yandex/ClickHouse/blob/master/dbms/src/Storages/MergeTree/MergeTreeSettings.h).
- `min_merge_bytes_to_use_direct_io` — The minimum data volume for merge operation required for using of the direct I/O access to the storage disk. During the merging of the data parts, ClickHouse calculates summary storage volume of all the data to be merged. If the volume exceeds `min_merge_bytes_to_use_direct_io` bytes, thеn ClickHouse reads and writes the data using direct I/O interface (`O_DIRECT` option) to the storage disk. If `min_merge_bytes_to_use_direct_io = 0`, then the direct I/O is disabled. Default value: `10 * 1024 * 1024 * 1024` bytes.
**Example of sections setting**

View File

@ -111,8 +111,8 @@ Check:
Check:
- The `tcp_port_secure` setting.
- Settings for SSL sertificates.
- The [tcp_port_secure](server_settings/settings.md#server_settings-tcp_port_secure) setting.
- Settings for [SSL sertificates](server_settings/settings.md#server_settings-openssl).
Use proper parameters while connecting. For example, use the `port_secure` parameter with `clickhouse_client`.

View File

@ -88,7 +88,7 @@ Example of settings:
</source>
```
In order for ClickHouse to access an HTTPS resource, you must [configure openSSL](../../operations/server_settings/settings.md) in the server configuration.
In order for ClickHouse to access an HTTPS resource, you must [configure openSSL](../../operations/server_settings/settings.md#server_settings-openssl) in the server configuration.
Setting fields:

View File

@ -72,6 +72,6 @@ The `remote` table function can be useful in the following cases:
If the user is not specified, `default` is used.
If the password is not specified, an empty password is used.
`remoteSecure` - same as `remote` but with secured connection. Default port - `tcp_port_secure` from config or 9440.
`remoteSecure` - same as `remote` but with secured connection. Default port — [tcp_port_secure](../../operations/server_settings/settings.md#server_settings-tcp_port_secure) from config or 9440.
[Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/remote/) <!--hide-->