Merge branch 'master' into thread_local_rng

This commit is contained in:
Alexey Milovidov 2019-07-30 03:19:48 +03:00
commit b5b80fb342
18 changed files with 63 additions and 39 deletions

View File

@ -322,7 +322,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.") \
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.") \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.") \
M(SettingBool, optimize_pk_order, true, "Enable order by optimization in reading in primary key order.") \
M(SettingBool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.") \
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \
M(SettingBool, allow_experimental_multiple_joins_emulation, true, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, true, "Convert CROSS JOIN to INNER JOIN if possible") \

View File

@ -12,9 +12,9 @@ using namespace MySQLProtocol;
MySQLWireBlockOutputStream::MySQLWireBlockOutputStream(WriteBuffer & buf, const Block & header, Context & context)
: header(header)
, context(context)
, packet_sender(std::make_shared<PacketSender>(buf, context.mysql.sequence_id))
, packet_sender(buf, context.mysql.sequence_id)
{
packet_sender->max_packet_size = context.mysql.max_packet_size;
packet_sender.max_packet_size = context.mysql.max_packet_size;
}
void MySQLWireBlockOutputStream::writePrefix()
@ -22,17 +22,17 @@ void MySQLWireBlockOutputStream::writePrefix()
if (header.columns() == 0)
return;
packet_sender->sendPacket(LengthEncodedNumber(header.columns()));
packet_sender.sendPacket(LengthEncodedNumber(header.columns()));
for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName())
{
ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING, 0, 0);
packet_sender->sendPacket(column_definition);
packet_sender.sendPacket(column_definition);
}
if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
{
packet_sender->sendPacket(EOF_Packet(0, 0));
packet_sender.sendPacket(EOF_Packet(0, 0));
}
}
@ -49,7 +49,7 @@ void MySQLWireBlockOutputStream::write(const Block & block)
column.type->serializeAsText(*column.column.get(), i, ostr, format_settings);
row_packet.appendColumn(std::move(ostr.str()));
}
packet_sender->sendPacket(row_packet);
packet_sender.sendPacket(row_packet);
}
}
@ -67,17 +67,17 @@ void MySQLWireBlockOutputStream::writeSuffix()
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
if (header.columns() == 0)
packet_sender->sendPacket(OK_Packet(0x0, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
packet_sender.sendPacket(OK_Packet(0x0, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
else
if (context.mysql.client_capabilities & CLIENT_DEPRECATE_EOF)
packet_sender->sendPacket(OK_Packet(0xfe, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
packet_sender.sendPacket(OK_Packet(0xfe, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
else
packet_sender->sendPacket(EOF_Packet(0, 0), true);
packet_sender.sendPacket(EOF_Packet(0, 0), true);
}
void MySQLWireBlockOutputStream::flush()
{
packet_sender->out->next();
packet_sender.out->next();
}
}

View File

@ -27,7 +27,7 @@ public:
private:
Block header;
Context & context;
std::shared_ptr<MySQLProtocol::PacketSender> packet_sender;
MySQLProtocol::PacketSender packet_sender;
FormatSettings format_settings;
};

View File

@ -120,9 +120,10 @@ private:
template <typename Base>
struct AggregationDataWithNullKeyTwoLevel : public Base
{
using Base::Base;
using Base::impls;
AggregationDataWithNullKeyTwoLevel() {}
template <typename Other>
explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other)
{

View File

@ -648,7 +648,7 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
}
static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, const ASTSelectQuery & query, const Context & context)
static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, const ASTSelectQuery & query, const Context & context)
{
if (!merge_tree.hasSortingKey())
return {};
@ -794,10 +794,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
}
SortingInfoPtr sorting_info;
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
if (settings.optimize_read_in_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query, context);
sorting_info = optimizeReadInOrder(*merge_tree_data, query, context);
}
if (dry_run)

View File

@ -589,15 +589,23 @@ BlockInputStreams MergeTreeDataSelectExecutor::readFromParts(
virt_column_names,
settings);
}
else if (settings.optimize_pk_order && query_info.sorting_info)
else if (settings.optimize_read_in_order && query_info.sorting_info)
{
res = spreadMarkRangesAmongStreamsPKOrder(
size_t prefix_size = query_info.sorting_info->prefix_order_descr.size();
auto order_key_prefix_ast = data.sorting_key_expr_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
res = spreadMarkRangesAmongStreamsWithOrder(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info,
sorting_key_prefix_expr,
virt_column_names,
settings);
}
@ -807,13 +815,14 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
return res;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsPKOrder(
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings) const
{
@ -981,7 +990,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsPKOrd
sorting_info->direction, 1);
for (auto & stream : streams_per_thread)
stream = std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_expr);
stream = std::make_shared<ExpressionBlockInputStream>(stream, sorting_key_prefix_expr);
streams.push_back(std::make_shared<MergingSortedBlockInputStream>(
streams_per_thread, sort_description, max_block_size));

View File

@ -56,13 +56,14 @@ private:
const Names & virt_columns,
const Settings & settings) const;
BlockInputStreams spreadMarkRangesAmongStreamsPKOrder(
BlockInputStreams spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings) const;

View File

@ -124,17 +124,12 @@ def main():
sql_file = open(base_name + '.sql', 'wt')
ref_file = open(base_name + '.reference', 'wt')
num_int_tests = len(VALUES) ** 2
num_int_tests = len(list(itertools.combinations(VALUES, 2)))
if 'int1' in sys.argv[1:]:
for (v1, v2) in itertools.islice(itertools.combinations(VALUES, 2), None, num_int_tests / 2):
q, a = test_pair(v1, v2)
if GENERATE_TEST_FILES:
sql_file.write(q + ";\n")
ref_file.write(a + "\n")
if 'int2' in sys.argv[1:]:
for (v1, v2) in itertools.islice(itertools.combinations(VALUES, 2), num_int_tests / 2, None):
num_parts = 4
for part in xrange(0, num_parts):
if 'int' + str(part + 1) in sys.argv[1:]:
for (v1, v2) in itertools.islice(itertools.combinations(VALUES, 2), part * num_int_tests / num_parts, (part + 1) * num_int_tests / num_parts):
q, a = test_pair(v1, v2)
if GENERATE_TEST_FILES:
sql_file.write(q + ";\n")

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python $CURDIR/00411_long_accurate_number_comparison.python int3

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python $CURDIR/00411_long_accurate_number_comparison.python int4

View File

@ -1,7 +1,7 @@
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.pk_order;
SET optimize_pk_order = 1;
SET optimize_read_in_order = 1;
CREATE TABLE test.pk_order(a UInt64, b UInt64, c UInt64, d UInt64) ENGINE=MergeTree() ORDER BY (a, b);
INSERT INTO test.pk_order(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);

View File

@ -1,4 +1,4 @@
SET optimize_pk_order = 1;
SET optimize_read_in_order = 1;
SELECT CounterID FROM test.hits ORDER BY CounterID DESC LIMIT 50;
SELECT CounterID FROM test.hits ORDER BY CounterID LIMIT 50;
SELECT CounterID FROM test.hits ORDER BY CounterID, EventDate LIMIT 50;

View File

@ -1,8 +1,8 @@
# ClickHouse 架构概述
ClickHouse 是一个真正的列式数据库管理系统DBMS)。在 ClickHouse 中,数据始终是按列存储的,包括失量(向量或列块)执行的过程。只要有可能,操作都是基于失量进行分派的,而不是单个的值,这被称为“矢量化查询执行”,它有利于降低实际的数据处理开销。
ClickHouse 是一个真正的列式数据库管理系统DBMS)。在 ClickHouse 中,数据始终是按列存储的,包括矢量(向量或列块)执行的过程。只要有可能,操作都是基于矢量进行分派的,而不是单个的值,这被称为“矢量化查询执行”,它有利于降低实际的数据处理开销。
> 这个想法并不新鲜,其可以追溯到 `APL` 编程语言及其后代:`A +`、`J`、`K` 和 `Q`量编程被大量用于科学数据处理中。即使在关系型数据库中,这个想法也不是什么新的东西:比如,量编程也被大量用于 `Vectorwise` 系统中。
> 这个想法并不新鲜,其可以追溯到 `APL` 编程语言及其后代:`A +`、`J`、`K` 和 `Q`量编程被大量用于科学数据处理中。即使在关系型数据库中,这个想法也不是什么新的东西:比如,量编程也被大量用于 `Vectorwise` 系统中。
通常有两种不同的加速查询处理的方法:矢量化查询执行和运行时代码生成。在后者中,动态地为每一类查询生成代码,消除了间接分派和动态分派。这两种方法中,并没有哪一种严格地比另一种好。运行时代码生成可以更好地将多个操作融合在一起,从而充分利用 CPU 执行单元和流水线。矢量化查询执行不是特别实用,因为它涉及必须写到缓存并读回的临时向量。如果 L2 缓存容纳不下临时数据,那么这将成为一个问题。但矢量化查询执行更容易利用 CPU 的 SIMD 功能。朋友写的一篇[研究论文](http://15721.courses.cs.cmu.edu/spring2016/papers/p5-sompolski.pdf)表明将两种方法结合起来是更好的选择。ClickHouse 使用了矢量化查询执行,同时初步提供了有限的运行时动态代码生成。