Merge branch 'master' into improve-performance-of-aggregate-functions

This commit is contained in:
Alexey Milovidov 2021-02-02 06:48:08 +03:00
commit 06bd0345eb
26 changed files with 631 additions and 99 deletions

View File

@ -1,22 +1,21 @@
# system.distributed_ddl_queue {#system_tables-distributed_ddl_queue}
Contains information about distributed ddl queries (ON CLUSTER queries) that were executed on a cluster.
Contains information about [distributed ddl queries (ON CLUSTER clause)](../../sql-reference/distributed-ddl.md) that were executed on a cluster.
Columns:
- `entry` ([String](../../sql-reference/data-types/string.md)) - Query id.
- `host_name` ([String](../../sql-reference/data-types/string.md)) - Hostname.
- `host_address` ([String](../../sql-reference/data-types/string.md)) - IP address that the Hostname resolves to.
- `port` ([UInt16](../../sql-reference/data-types/int-uint.md)) - Host Port.
- `status` ([Enum](../../sql-reference/data-types/enum.md)) - Stats of the query.
- `cluster` ([String](../../sql-reference/data-types/string.md)) - Cluster name.
- `query` ([String](../../sql-reference/data-types/string.md)) - Query executed.
- `initiator` ([String](../../sql-reference/data-types/string.md)) - Nod that executed the query.
- `query_start_time` ([Date](../../sql-reference/data-types/date.md)) — Query start time.
- `query_finish_time` ([Date](../../sql-reference/data-types/date.md)) — Query finish time.
- `query_duration_ms` ([UInt64](../../sql-reference/data-types/datetime64.md)) — Duration of query execution in milliseconds.
- `exception_code` ([Enum](../../sql-reference/data-types/enum.md)) - Exception code from ZooKeeper.
- `entry` ([String](../../sql-reference/data-types/string.md)) — Query id.
- `host_name` ([String](../../sql-reference/data-types/string.md)) — Hostname.
- `host_address` ([String](../../sql-reference/data-types/string.md)) — IP address that the Hostname resolves to.
- `port` ([UInt16](../../sql-reference/data-types/int-uint.md)) — Host Port.
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — Status of the query.
- `cluster` ([String](../../sql-reference/data-types/string.md)) — Cluster name.
- `query` ([String](../../sql-reference/data-types/string.md)) — Query executed.
- `initiator` ([String](../../sql-reference/data-types/string.md)) — Node that executed the query.
- `query_start_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query start time.
- `query_finish_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Query finish time.
- `query_duration_ms` ([UInt64](../../sql-reference/data-types/datetime64.md)) — Duration of query execution (in milliseconds).
- `exception_code` ([Enum8](../../sql-reference/data-types/enum.md)) — Exception code from [ZooKeeper](../../operations/tips.md#zookeeper).
**Example**
@ -62,6 +61,5 @@ exception_code: ZOK
2 rows in set. Elapsed: 0.025 sec.
```
[Original article](https://clickhouse.tech/docs/en/operations/system_tables/distributed_ddl_queuedistributed_ddl_queue.md) <!--hide-->

View File

@ -0,0 +1,65 @@
# system.distributed_ddl_queue {#system_tables-distributed_ddl_queue}
Содержит информацию о [распределенных ddl запросах (секция ON CLUSTER)](../../sql-reference/distributed-ddl.md), которые были выполнены на кластере.
Столбцы:
- `entry` ([String](../../sql-reference/data-types/string.md)) — идентификатор запроса.
- `host_name` ([String](../../sql-reference/data-types/string.md)) — имя хоста.
- `host_address` ([String](../../sql-reference/data-types/string.md)) — IP-адрес хоста.
- `port` ([UInt16](../../sql-reference/data-types/int-uint.md)) — порт для соединения с сервером.
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — состояние запроса.
- `cluster` ([String](../../sql-reference/data-types/string.md)) — имя кластера.
- `query` ([String](../../sql-reference/data-types/string.md)) — выполненный запрос.
- `initiator` ([String](../../sql-reference/data-types/string.md)) — узел, выполнивший запрос.
- `query_start_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время начала запроса.
- `query_finish_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время окончания запроса.
- `query_duration_ms` ([UInt64](../../sql-reference/data-types/datetime64.md)) — продолжительность выполнения запроса (в миллисекундах).
- `exception_code` ([Enum8](../../sql-reference/data-types/enum.md)) — код исключения из [ZooKeeper](../../operations/tips.md#zookeeper).
**Пример**
``` sql
SELECT *
FROM system.distributed_ddl_queue
WHERE cluster = 'test_cluster'
LIMIT 2
FORMAT Vertical
Query id: f544e72a-6641-43f1-836b-24baa1c9632a
Row 1:
──────
entry: query-0000000000
host_name: clickhouse01
host_address: 172.23.0.11
port: 9000
status: Finished
cluster: test_cluster
query: CREATE DATABASE test_db UUID '4a82697e-c85e-4e5b-a01e-a36f2a758456' ON CLUSTER test_cluster
initiator: clickhouse01:9000
query_start_time: 2020-12-30 13:07:51
query_finish_time: 2020-12-30 13:07:51
query_duration_ms: 6
exception_code: ZOK
Row 2:
──────
entry: query-0000000000
host_name: clickhouse02
host_address: 172.23.0.12
port: 9000
status: Finished
cluster: test_cluster
query: CREATE DATABASE test_db UUID '4a82697e-c85e-4e5b-a01e-a36f2a758456' ON CLUSTER test_cluster
initiator: clickhouse01:9000
query_start_time: 2020-12-30 13:07:51
query_finish_time: 2020-12-30 13:07:51
query_duration_ms: 6
exception_code: ZOK
2 rows in set. Elapsed: 0.025 sec.
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/operations/system_tables/distributed_ddl_queuedistributed_ddl_queue.md) <!--hide-->

View File

@ -409,6 +409,15 @@ Block Block::cloneWithoutColumns() const
return res;
}
Block Block::cloneWithCutColumns(size_t start, size_t length) const
{
Block copy = *this;
for (auto & column_to_cut : copy.data)
column_to_cut.column = column_to_cut.column->cut(start, length);
return copy;
}
Block Block::sortColumns() const
{

View File

@ -129,6 +129,7 @@ public:
void setColumns(const Columns & columns);
Block cloneWithColumns(const Columns & columns) const;
Block cloneWithoutColumns() const;
Block cloneWithCutColumns(size_t start, size_t length) const;
/** Get empty columns with the same types as in block. */
MutableColumns cloneEmptyColumns() const;

View File

@ -1,6 +1,5 @@
#include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypesNumber.h>
@ -13,44 +12,54 @@
namespace DB
{
/// For simple key
void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids)
void formatBlock(BlockOutputStreamPtr & out, const Block & block)
{
auto column = ColumnUInt64::create(ids.size());
memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front()));
Block block{{std::move(column), std::make_shared<DataTypeUInt64>(), "id"}};
out->writePrefix();
out->write(block);
out->writeSuffix();
out->flush();
}
/// For composite key
void formatKeys(
/// For simple key
Block blockForIds(
const DictionaryStructure & dict_struct,
const std::vector<UInt64> & ids)
{
auto column = ColumnUInt64::create(ids.size());
memcpy(column->getData().data(), ids.data(), ids.size() * sizeof(ids.front()));
Block block{{std::move(column), std::make_shared<DataTypeUInt64>(), (*dict_struct.id).name}};
return block;
}
/// For composite key
Block blockForKeys(
const DictionaryStructure & dict_struct,
BlockOutputStreamPtr & out,
const Columns & key_columns,
const std::vector<size_t> & requested_rows)
{
Block block;
for (size_t i = 0, size = key_columns.size(); i < size; ++i)
{
const ColumnPtr & source_column = key_columns[i];
auto filtered_column = source_column->cloneEmpty();
filtered_column->reserve(requested_rows.size());
size_t column_rows_size = source_column->size();
PaddedPODArray<UInt8> filter(column_rows_size, false);
for (size_t idx : requested_rows)
filtered_column->insertFrom(*source_column, idx);
filter[idx] = true;
block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, toString(i)});
auto filtered_column = source_column->filter(filter, requested_rows.size());
block.insert({std::move(filtered_column), (*dict_struct.key)[i].type, (*dict_struct.key)[i].name});
}
out->writePrefix();
out->write(block);
out->writeSuffix();
out->flush();
return block;
}
Context copyContextAndApplySettings(

View File

@ -1,11 +1,15 @@
#pragma once
#include <vector>
#include <Columns/IColumn.h>
#include <common/types.h>
#include <Poco/File.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Columns/IColumn.h>
#include <Core/Block.h>
namespace DB
{
class IBlockOutputStream;
@ -16,13 +20,18 @@ class Context;
/// Write keys to block output stream.
void formatBlock(BlockOutputStreamPtr & out, const Block & block);
/// For simple key
void formatIDs(BlockOutputStreamPtr & out, const std::vector<UInt64> & ids);
Block blockForIds(
const DictionaryStructure & dict_struct,
const std::vector<UInt64> & ids);
/// For composite key
void formatKeys(
Block blockForKeys(
const DictionaryStructure & dict_struct,
BlockOutputStreamPtr & out,
const Columns & key_columns,
const std::vector<size_t> & requested_rows);
@ -36,4 +45,5 @@ void applySettingsToContext(
const std::string & config_prefix,
Context & context,
const Poco::Util::AbstractConfiguration & config);
}

View File

@ -281,6 +281,21 @@ size_t DictionaryStructure::getKeySize() const
});
}
Strings DictionaryStructure::getKeysNames() const
{
if (id)
return { id->name };
const auto & key_attributes = *key;
Strings keys_names;
keys_names.reserve(key_attributes.size());
for (const auto & key_attribute : key_attributes)
keys_names.emplace_back(key_attribute.name);
return keys_names;
}
static void checkAttributeKeys(const Poco::Util::AbstractConfiguration::Keys & keys)
{

View File

@ -158,6 +158,8 @@ struct DictionaryStructure final
std::string getKeyDescription() const;
bool isKeySizeFixed() const;
size_t getKeySize() const;
Strings getKeysNames() const;
private:
/// range_min and range_max have to be parsed before this function call
std::vector<DictionaryAttribute> getAttributes(

View File

@ -26,6 +26,8 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DICTIONARY_ACCESS_DENIED;
extern const int UNSUPPORTED_METHOD;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
namespace
@ -65,18 +67,34 @@ ExecutableDictionarySource::ExecutableDictionarySource(
const Context & context_)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, dict_struct{dict_struct_}
, implicit_key{config.getBool(config_prefix + ".implicit_key", false)}
, command{config.getString(config_prefix + ".command")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, sample_block{sample_block_}
, context(context_)
{
/// Remove keys from sample_block for implicit_key dictionary because
/// these columns will not be returned from source
/// Implicit key means that the source script will return only values,
/// and the correspondence to the requested keys is determined implicitly - by the order of rows in the result.
if (implicit_key)
{
auto keys_names = dict_struct.getKeysNames();
for (auto & key_name : keys_names)
{
size_t key_column_position_in_block = sample_block.getPositionByName(key_name);
sample_block.erase(key_column_position_in_block);
}
}
}
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, implicit_key{other.implicit_key}
, command{other.command}
, update_field{other.update_field}
, format{other.format}
@ -87,6 +105,9 @@ ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionar
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
{
if (implicit_key)
throw Exception("ExecutableDictionarySource with implicit_key does not support loadAll method", ErrorCodes::UNSUPPORTED_METHOD);
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
@ -95,6 +116,9 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
{
if (implicit_key)
throw Exception("ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method", ErrorCodes::UNSUPPORTED_METHOD);
time_t new_update_time = time(nullptr);
SCOPE_EXIT(update_time = new_update_time);
@ -173,6 +197,77 @@ namespace
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
/** A stream, adds additional columns to each block that it will read from inner stream.
*
* block_to_add rows size must be equal to final sum rows size of all inner stream blocks.
*/
class BlockInputStreamWithAdditionalColumns final: public IBlockInputStream
{
public:
BlockInputStreamWithAdditionalColumns(
Block block_to_add_,
std::unique_ptr<IBlockInputStream>&& stream_)
: block_to_add(std::move(block_to_add_))
, stream(std::move(stream_))
{
}
Block getHeader() const override
{
auto header = stream->getHeader();
if (header)
{
for (Int64 i = static_cast<Int64>(block_to_add.columns() - 1); i >= 0; --i)
header.insert(0, block_to_add.getByPosition(i).cloneEmpty());
}
return header;
}
Block readImpl() override
{
auto block = stream->read();
if (block)
{
auto block_rows = block.rows();
auto cut_block = block_to_add.cloneWithCutColumns(current_range_index, block_rows);
if (cut_block.rows() != block_rows)
throw Exception(
"Number of rows in block to add after cut must equal to number of rows in block from inner stream",
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (Int64 i = static_cast<Int64>(cut_block.columns() - 1); i >= 0; --i)
block.insert(0, cut_block.getByPosition(i));
current_range_index += block_rows;
}
return block;
}
void readPrefix() override
{
stream->readPrefix();
}
void readSuffix() override
{
stream->readSuffix();
}
String getName() const override { return "BlockInputStreamWithAdditionalColumns"; }
private:
Block block_to_add;
std::unique_ptr<IBlockInputStream> stream;
size_t current_range_index = 0;
};
}
@ -180,28 +275,44 @@ BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
return std::make_shared<BlockInputStreamWithBackgroundThread>(
auto block = blockForIds(dict_struct, ids);
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
context, format, sample_block, command, log,
[&ids, this](WriteBufferFromFile & out) mutable
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputStream(format, out, sample_block);
formatIDs(output_stream, ids);
auto output_stream = context.getOutputStream(format, out, block.cloneEmpty());
formatBlock(output_stream, block);
out.close();
});
if (implicit_key)
{
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
}
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
}
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
return std::make_shared<BlockInputStreamWithBackgroundThread>(
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
context, format, sample_block, command, log,
[key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputStream(format, out, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
auto output_stream = context.getOutputStream(format, out, block.cloneEmpty());
formatBlock(output_stream, block);
out.close();
});
if (implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
}
bool ExecutableDictionarySource::isModified() const

View File

@ -49,9 +49,9 @@ public:
private:
Poco::Logger * log;
time_t update_time = 0;
const DictionaryStructure dict_struct;
bool implicit_key;
const std::string command;
const std::string update_field;
const std::string format;

View File

@ -131,11 +131,13 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector<UInt64> & id
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
auto block = blockForIds(dict_struct, ids);
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
formatIDs(output_stream, ids);
formatBlock(output_stream, block);
};
Poco::URI uri(url);
@ -150,11 +152,13 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys(const Columns & key_columns,
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [&](std::ostream & ostr)
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [block, this](std::ostream & ostr)
{
WriteBufferFromOStream out_buffer(ostr);
auto output_stream = context.getOutputStream(format, out_buffer, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
formatBlock(output_stream, block);
};
Poco::URI uri(url);

View File

@ -1,7 +1,6 @@
#pragma once
#include <Core/UUID.h>
#include <Common/UInt128.h>
#include <common/DayNum.h>
#include <memory>

View File

@ -3,8 +3,10 @@
#include <cmath>
#include <type_traits>
#include <Common/Exception.h>
#include <Common/NaNUtils.h>
#include <DataTypes/NumberTraits.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
@ -87,7 +89,19 @@ struct DivideIntegralImpl
return static_cast<Result>(checkedDivision(static_cast<SignedCastA>(a), static_cast<SignedCastB>(b)));
}
else
{
if constexpr (std::is_floating_point_v<A>)
if (isNaN(a) || a > std::numeric_limits<CastA>::max() || a < std::numeric_limits<CastA>::lowest())
throw Exception("Cannot perform integer division on infinite or too large floating point numbers",
ErrorCodes::ILLEGAL_DIVISION);
if constexpr (std::is_floating_point_v<B>)
if (isNaN(b) || b > std::numeric_limits<CastB>::max() || b < std::numeric_limits<CastB>::lowest())
throw Exception("Cannot perform integer division on infinite or too large floating point numbers",
ErrorCodes::ILLEGAL_DIVISION);
return static_cast<Result>(checkedDivision(CastA(a), CastB(b)));
}
}
#if USE_EMBEDDED_COMPILER
@ -114,6 +128,16 @@ struct ModuloImpl
}
else
{
if constexpr (std::is_floating_point_v<A>)
if (isNaN(a) || a > std::numeric_limits<IntegerAType>::max() || a < std::numeric_limits<IntegerAType>::lowest())
throw Exception("Cannot perform integer division on infinite or too large floating point numbers",
ErrorCodes::ILLEGAL_DIVISION);
if constexpr (std::is_floating_point_v<B>)
if (isNaN(b) || b > std::numeric_limits<IntegerBType>::max() || b < std::numeric_limits<IntegerBType>::lowest())
throw Exception("Cannot perform integer division on infinite or too large floating point numbers",
ErrorCodes::ILLEGAL_DIVISION);
throwIfDivisionLeadsToFPE(IntegerAType(a), IntegerBType(b));
if constexpr (is_big_int_v<IntegerAType> || is_big_int_v<IntegerBType>)

View File

@ -246,7 +246,7 @@ void SystemLog<LogElement>::add(const LogElement & element)
/// The size of allocation can be in order of a few megabytes.
/// But this should not be accounted for query memory usage.
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker(VariableContext::Global);
/// Should not log messages under mutex.
bool queue_is_half_full = false;

View File

@ -269,14 +269,6 @@ ASTPtr tryParseQuery(
// most of the checks.
if (insert && insert->data)
{
if (!parse_res)
{
// Generic parse error.
out_error_message = getSyntaxErrorMessage(query_begin, all_queries_end,
last_token, expected, hilite, query_description);
return nullptr;
}
return res;
}

View File

@ -1394,6 +1394,12 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
std::vector<QueryPlanPtr> partition_plans;
/// If do_not_merge_across_partitions_select_final is true and num_streams > 1
/// we will store lonely parts with level > 0 to use parallel select on them.
std::vector<RangesInDataPart> lonely_parts;
size_t total_rows_in_lonely_parts = 0;
size_t sum_marks_in_lonely_parts = 0;
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
QueryPlanPtr plan;
@ -1401,25 +1407,41 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
{
Pipes pipes;
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part and if num_streams > 1 we
/// can use parallel select on such parts. We save such parts in one vector and then use
/// MergeTreeReadPool and MergeTreeThreadSelectBlockInputProcessor for parallel select.
if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part_it->data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part_it->part_index_in_query);
total_rows_in_lonely_parts += parts_to_merge_ranges[range_index]->getRowsCount();
sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount();
lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index]));
continue;
}
else
{
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part_it->data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part_it->part_index_in_query);
pipes.emplace_back(std::move(source_processor));
pipes.emplace_back(std::move(source_processor));
}
}
if (pipes.empty())
@ -1434,6 +1456,13 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
plan = createPlanFromPipe(std::move(pipe), query_id, data, "with final");
}
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
metadata_snapshot->getSortingKey().expression->getActionsDAG().clone());
expression_step->setStepDescription("Calculate sorting key expression");
plan->addStep(std::move(expression_step));
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part
if (settings.do_not_merge_across_partitions_select_final &&
@ -1444,13 +1473,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
continue;
}
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
metadata_snapshot->getSortingKey().expression->getActionsDAG().clone());
expression_step->setStepDescription("Calculate sorting key expression");
plan->addStep(std::move(expression_step));
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
@ -1476,6 +1498,69 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
partition_plans.emplace_back(std::move(plan));
}
if (!lonely_parts.empty())
{
Pipes pipes;
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,
data_settings->index_granularity,
index_granularity_bytes,
sum_marks_in_lonely_parts);
/// Reduce the number of num_streams_for_lonely_parts if the data is small.
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams_for_lonely_parts,
sum_marks_in_lonely_parts,
min_marks_for_concurrent_read,
std::move(lonely_parts),
data,
metadata_snapshot,
query_info.prewhere_info,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
settings.preferred_block_size_bytes,
false);
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
for (size_t i = 0; i < num_streams_for_lonely_parts; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
query_info.prewhere_info, reader_settings, virt_columns);
pipes.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe.getHeader());
QueryPlanPtr plan = createPlanFromPipe(std::move(pipe), query_id, data, "with final");
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
metadata_snapshot->getSortingKey().expression->getActionsDAG().clone());
expression_step->setStepDescription("Calculate sorting key expression");
plan->addStep(std::move(expression_step));
partition_plans.emplace_back(std::move(plan));
}
if (partition_plans.empty())
return {};

View File

@ -105,4 +105,152 @@
</structure>
</dictionary>
<dictionary>
<name>simple_executable_cache_dictionary_no_implicit_key</name>
<structure>
<id>
<name>id</name>
<type>UInt64</type>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "1\tValue"</command>
<format>TabSeparated</format>
<implicit_key>false</implicit_key>
</executable>
</source>
<layout>
<cache>
<size_in_cells>10000</size_in_cells>
</cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
<dictionary>
<name>simple_executable_cache_dictionary_implicit_key</name>
<structure>
<id>
<name>id</name>
<type>UInt64</type>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "Value"</command>
<format>TabSeparated</format>
<implicit_key>true</implicit_key>
</executable>
</source>
<layout>
<cache>
<size_in_cells>10000</size_in_cells>
</cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
<dictionary>
<name>complex_executable_cache_dictionary_no_implicit_key</name>
<structure>
<key>
<attribute>
<name>id</name>
<type>UInt64</type>
<null_value></null_value>
</attribute>
<attribute>
<name>id_key</name>
<type>String</type>
<null_value></null_value>
</attribute>
</key>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "1\tFirstKey\tValue"</command>
<format>TabSeparated</format>
<implicit_key>false</implicit_key>
</executable>
</source>
<layout>
<complex_key_cache>
<size_in_cells>10000</size_in_cells>
</complex_key_cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
<dictionary>
<name>complex_executable_cache_dictionary_implicit_key</name>
<structure>
<key>
<attribute>
<name>id</name>
<type>UInt64</type>
<null_value></null_value>
</attribute>
<attribute>
<name>id_key</name>
<type>String</type>
<null_value></null_value>
</attribute>
</key>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<source>
<executable>
<command>echo "Value"</command>
<format>TabSeparated</format>
<implicit_key>true</implicit_key>
</executable>
</source>
<layout>
<complex_key_cache>
<size_in_cells>10000</size_in_cells>
</complex_key_cache>
</layout>
<lifetime>300</lifetime>
</dictionary>
</dictionaries>

View File

@ -0,0 +1,20 @@
<test max_ignored_relative_change="0.2">
<settings>
<do_not_merge_across_partitions_select_final>1</do_not_merge_across_partitions_select_final>
</settings>
<create_query>
CREATE TABLE optimized_select_final (t DateTime, x Int32, s String)
ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(t) ORDER BY x
</create_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number, 'string' FROM numbers(50000000)</fill_query>
<fill_query>OPTIMIZE TABLE optimized_select_final FINAL</fill_query>
<query>SELECT * FROM optimized_select_final FINAL where s = 'string' FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS optimized_select_final</drop_query>
</test>

View File

@ -1,6 +1,9 @@
2000-01-01 00:00:00 0
2020-01-01 00:00:00 0
2000-01-01 00:00:00 1
2020-01-01 00:00:00 1
2000-01-01 00:00:00 2
2020-01-01 00:00:00 2
2000-01-01 00:00:00 0
2020-01-01 00:00:00 0
2000-01-01 00:00:00 1
2020-01-01 00:00:00 1
2000-01-01 00:00:00 2
2020-01-01 00:00:00 2
1
499999
5

View File

@ -1,15 +1,40 @@
DROP TABLE IF EXISTS select_final;
CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x;
SET do_not_merge_across_partitions_select_final = 1;
INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2);
CREATE TABLE select_final (t DateTime, x Int32, string String) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY (x, t);
INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1, '' FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number, '' FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1, '' FROM numbers(2);
SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;
SELECT * FROM select_final FINAL ORDER BY x;
TRUNCATE TABLE select_final;
INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number, 'updated' FROM numbers(2);
OPTIMIZE TABLE select_final FINAL;
INSERT INTO select_final SELECT toDate('2020-01-01'), number, '' FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number, 'updated' FROM numbers(2);
SELECT max(x) FROM select_final FINAL where string = 'updated';
TRUNCATE TABLE select_final;
INSERT INTO select_final SELECT toDate('2000-01-01'), number, '' FROM numbers(500000);
OPTIMIZE TABLE select_final FINAL;
SELECT max(x) FROM select_final FINAL;
SYSTEM FLUSH LOGS;
SELECT length(thread_ids) FROM system.query_log WHERE query='SELECT max(x) FROM select_final FINAL;' AND type='QueryFinish' AND current_database = currentDatabase() ORDER BY event_time DESC LIMIT 1;
DROP TABLE select_final;

View File

@ -0,0 +1,4 @@
Value
Value
Value
Value

View File

@ -0,0 +1,5 @@
SELECT dictGet('simple_executable_cache_dictionary_no_implicit_key', 'value', toUInt64(1));
SELECT dictGet('simple_executable_cache_dictionary_implicit_key', 'value', toUInt64(1));
SELECT dictGet('complex_executable_cache_dictionary_no_implicit_key', 'value', (toUInt64(1), 'FirstKey'));
SELECT dictGet('complex_executable_cache_dictionary_implicit_key', 'value', (toUInt64(1), 'FirstKey'));

View File

@ -0,0 +1 @@
SELECT DISTINCT intDiv(number, nan) FROM numbers(10); -- { serverError 153 }

View File

@ -198,3 +198,4 @@
01659_test_base64Decode_mysql_compatibility
01675_data_type_coroutine
01671_aggregate_function_group_bitmap_data
01674_executable_dictionary_implicit_key

View File

@ -311,7 +311,8 @@
"01643_system_suspend",
"01655_plan_optimizations",
"01475_read_subcolumns_storages",
"01674_clickhouse_client_query_param_cte"
"01674_clickhouse_client_query_param_cte",
"01666_merge_tree_max_query_limit"
],
"parallel":
[