Merge branch 'master' into hdfs-virtuals

This commit is contained in:
Nikolai Kochetov 2024-01-04 18:00:10 +00:00
commit 85c1bb80fd
83 changed files with 1513 additions and 788 deletions

View File

@ -3847,6 +3847,8 @@ Possible values:
- `none` — Is similar to throw, but distributed DDL query returns no result set.
- `null_status_on_timeout` — Returns `NULL` as execution status in some rows of result set instead of throwing `TIMEOUT_EXCEEDED` if query is not finished on the corresponding hosts.
- `never_throw` — Do not throw `TIMEOUT_EXCEEDED` and do not rethrow exceptions if query has failed on some hosts.
- `null_status_on_timeout_only_active` — similar to `null_status_on_timeout`, but doesn't wait for inactive replicas of the `Replicated` database
- `throw_only_active` — similar to `throw`, but doesn't wait for inactive replicas of the `Replicated` database
Default value: `throw`.

View File

@ -1483,7 +1483,9 @@ For mode values with a meaning of “with 4 or more days this year,” weeks are
- Otherwise, it is the last week of the previous year, and the next week is week 1.
For mode values with a meaning of “contains January 1”, the week contains January 1 is week 1. It does not matter how many days in the new year the week contained, even if it contained only one day.
For mode values with a meaning of “contains January 1”, the week contains January 1 is week 1.
It does not matter how many days in the new year the week contained, even if it contained only one day.
I.e. if the last week of December contains January 1 of the next year, it will be week 1 of the next year.
**Syntax**

View File

@ -53,7 +53,7 @@ The rounded number of the same type as the input number.
**Example of use with Float**
``` sql
SELECT number / 2 AS x, round(x) FROM system.numbers LIMIT 3
SELECT number / 2 AS x, round(x) FROM system.numbers LIMIT 3;
```
``` text
@ -67,7 +67,22 @@ SELECT number / 2 AS x, round(x) FROM system.numbers LIMIT 3
**Example of use with Decimal**
``` sql
SELECT cast(number / 2 AS Decimal(10,4)) AS x, round(x) FROM system.numbers LIMIT 3
SELECT cast(number / 2 AS Decimal(10,4)) AS x, round(x) FROM system.numbers LIMIT 3;
```
``` text
┌───x─┬─round(CAST(divide(number, 2), 'Decimal(10, 4)'))─┐
│ 0 │ 0 │
│ 0.5 │ 1 │
│ 1 │ 1 │
└─────┴──────────────────────────────────────────────────┘
```
If you want to keep the trailing zeros, you need to enable `output_format_decimal_trailing_zeros`
``` sql
SELECT cast(number / 2 AS Decimal(10,4)) AS x, round(x) FROM system.numbers LIMIT 3 settings output_format_decimal_trailing_zeros=1;
```
``` text

View File

@ -578,7 +578,9 @@ SELECT
- В противном случае это последняя неделя предыдущего года, а следующая неделя - неделя 1.
Для режимов со значением «содержит 1 января», неделя 1 это неделя содержащая 1 января. Не имеет значения, сколько дней в новом году содержала неделя, даже если она содержала только один день.
Для режимов со значением «содержит 1 января», неделя 1 это неделя, содержащая 1 января.
Не имеет значения, сколько дней нового года содержит эта неделя, даже если она содержит только один день.
Так, если последняя неделя декабря содержит 1 января следующего года, то она считается неделей 1 следующего года.
**Пример**

View File

@ -1260,11 +1260,11 @@ try
{
Settings::checkNoSettingNamesAtTopLevel(*config, config_path);
ServerSettings server_settings_;
server_settings_.loadSettingsFromConfig(*config);
ServerSettings new_server_settings;
new_server_settings.loadSettingsFromConfig(*config);
size_t max_server_memory_usage = server_settings_.max_server_memory_usage;
double max_server_memory_usage_to_ram_ratio = server_settings_.max_server_memory_usage_to_ram_ratio;
size_t max_server_memory_usage = new_server_settings.max_server_memory_usage;
double max_server_memory_usage_to_ram_ratio = new_server_settings.max_server_memory_usage_to_ram_ratio;
size_t current_physical_server_memory = getMemoryAmount(); /// With cgroups, the amount of memory available to the server can be changed dynamically.
size_t default_max_server_memory_usage = static_cast<size_t>(current_physical_server_memory * max_server_memory_usage_to_ram_ratio);
@ -1294,9 +1294,9 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t merges_mutations_memory_usage_soft_limit = new_server_settings.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * new_server_settings.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
@ -1304,7 +1304,7 @@ try
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(current_physical_server_memory),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
new_server_settings.merges_mutations_memory_usage_to_ram_ratio);
}
else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
@ -1313,7 +1313,7 @@ try
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(current_physical_server_memory),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
new_server_settings.merges_mutations_memory_usage_to_ram_ratio);
}
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
@ -1322,7 +1322,7 @@ try
background_memory_tracker.setDescription("(background)");
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
total_memory_tracker.setAllowUseJemallocMemory(new_server_settings.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
@ -1346,26 +1346,26 @@ try
global_context->setRemoteHostFilter(*config);
global_context->setHTTPHeaderFilter(*config);
global_context->setMaxTableSizeToDrop(server_settings_.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(server_settings_.max_partition_size_to_drop);
global_context->setMaxTableNumToWarn(server_settings_.max_table_num_to_warn);
global_context->setMaxDatabaseNumToWarn(server_settings_.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(server_settings_.max_part_num_to_warn);
global_context->setMaxTableSizeToDrop(new_server_settings.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(new_server_settings.max_partition_size_to_drop);
global_context->setMaxTableNumToWarn(new_server_settings.max_table_num_to_warn);
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);
ConcurrencyControl::SlotCount concurrent_threads_soft_limit = ConcurrencyControl::Unlimited;
if (server_settings_.concurrent_threads_soft_limit_num > 0 && server_settings_.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = server_settings_.concurrent_threads_soft_limit_num;
if (server_settings_.concurrent_threads_soft_limit_ratio_to_cores > 0)
if (new_server_settings.concurrent_threads_soft_limit_num > 0 && new_server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = new_server_settings.concurrent_threads_soft_limit_num;
if (new_server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)
{
auto value = server_settings_.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
auto value = new_server_settings.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
if (value > 0 && value < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = value;
}
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(server_settings_.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(server_settings_.max_concurrent_insert_queries);
global_context->getProcessList().setMaxSelectQueriesAmount(server_settings_.max_concurrent_select_queries);
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);
global_context->getProcessList().setMaxSelectQueriesAmount(new_server_settings.max_concurrent_select_queries);
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
@ -1376,68 +1376,68 @@ try
/// This is done for backward compatibility.
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings_.background_pool_size;
auto new_ratio = server_settings_.background_merges_mutations_concurrency_ratio;
auto new_pool_size = new_server_settings.background_pool_size;
auto new_ratio = new_server_settings.background_merges_mutations_concurrency_ratio;
global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, static_cast<size_t>(new_pool_size * new_ratio));
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(server_settings_.background_merges_mutations_scheduling_policy.toString());
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(new_server_settings.background_merges_mutations_scheduling_policy.toString());
}
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings_.background_move_pool_size;
auto new_pool_size = new_server_settings.background_move_pool_size;
global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings_.background_fetches_pool_size;
auto new_pool_size = new_server_settings.background_fetches_pool_size;
global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings_.background_common_pool_size;
auto new_pool_size = new_server_settings.background_common_pool_size;
global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
global_context->getBufferFlushSchedulePool().increaseThreadsCount(server_settings_.background_buffer_flush_schedule_pool_size);
global_context->getSchedulePool().increaseThreadsCount(server_settings_.background_schedule_pool_size);
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings_.background_message_broker_schedule_pool_size);
global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings_.background_distributed_schedule_pool_size);
global_context->getBufferFlushSchedulePool().increaseThreadsCount(new_server_settings.background_buffer_flush_schedule_pool_size);
global_context->getSchedulePool().increaseThreadsCount(new_server_settings.background_schedule_pool_size);
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(new_server_settings.background_message_broker_schedule_pool_size);
global_context->getDistributedSchedulePool().increaseThreadsCount(new_server_settings.background_distributed_schedule_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderForegroundPoolId, server_settings_.tables_loader_foreground_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundLoadPoolId, server_settings_.tables_loader_background_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundStartupPoolId, server_settings_.tables_loader_background_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderForegroundPoolId, new_server_settings.tables_loader_foreground_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundLoadPoolId, new_server_settings.tables_loader_background_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundStartupPoolId, new_server_settings.tables_loader_background_pool_size);
getIOThreadPool().reloadConfiguration(
server_settings.max_io_thread_pool_size,
server_settings.max_io_thread_pool_free_size,
server_settings.io_thread_pool_queue_size);
new_server_settings.max_io_thread_pool_size,
new_server_settings.max_io_thread_pool_free_size,
new_server_settings.io_thread_pool_queue_size);
getBackupsIOThreadPool().reloadConfiguration(
server_settings.max_backups_io_thread_pool_size,
server_settings.max_backups_io_thread_pool_free_size,
server_settings.backups_io_thread_pool_queue_size);
new_server_settings.max_backups_io_thread_pool_size,
new_server_settings.max_backups_io_thread_pool_free_size,
new_server_settings.backups_io_thread_pool_queue_size);
getActivePartsLoadingThreadPool().reloadConfiguration(
server_settings.max_active_parts_loading_thread_pool_size,
new_server_settings.max_active_parts_loading_thread_pool_size,
0, // We don't need any threads once all the parts will be loaded
server_settings.max_active_parts_loading_thread_pool_size);
new_server_settings.max_active_parts_loading_thread_pool_size);
getOutdatedPartsLoadingThreadPool().reloadConfiguration(
server_settings.max_outdated_parts_loading_thread_pool_size,
new_server_settings.max_outdated_parts_loading_thread_pool_size,
0, // We don't need any threads once all the parts will be loaded
server_settings.max_outdated_parts_loading_thread_pool_size);
new_server_settings.max_outdated_parts_loading_thread_pool_size);
/// It could grow if we need to synchronously wait until all the data parts will be loaded.
getOutdatedPartsLoadingThreadPool().setMaxTurboThreads(
server_settings.max_active_parts_loading_thread_pool_size
new_server_settings.max_active_parts_loading_thread_pool_size
);
getPartsCleaningThreadPool().reloadConfiguration(
server_settings.max_parts_cleaning_thread_pool_size,
new_server_settings.max_parts_cleaning_thread_pool_size,
0, // We don't need any threads one all the parts will be deleted
server_settings.max_parts_cleaning_thread_pool_size);
new_server_settings.max_parts_cleaning_thread_pool_size);
if (config->has("resources"))
{

View File

@ -1379,6 +1379,9 @@
<!-- Controls how many tasks could be in the queue -->
<!-- <max_tasks_in_queue>1000</max_tasks_in_queue> -->
<!-- Host name of the current node. If specified, will only compare and not resolve hostnames inside the DDL tasks -->
<!-- <host_name>replica</host_name> -->
</distributed_ddl>
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->

View File

@ -14,8 +14,9 @@
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <Common/PODArray.h>
#include <Common/assert_cast.h>
#include <Common/PODArray.h>
#include <Common/iota.h>
#include <base/types.h>
#include <boost/math/distributions/normal.hpp>
@ -48,7 +49,7 @@ struct LargestTriangleThreeBucketsData : public StatisticalSample<Float64, Float
// sort the this->x and this->y in ascending order of this->x using index
std::vector<size_t> index(this->x.size());
std::iota(index.begin(), index.end(), 0);
iota(index.data(), index.size(), size_t(0));
::sort(index.begin(), index.end(), [&](size_t i1, size_t i2) { return this->x[i1] < this->x[i2]; });
SampleX temp_x{};

View File

@ -6,6 +6,7 @@
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/NaNUtils.h>
#include <Common/iota.h>
namespace DB
@ -63,10 +64,9 @@ struct QuantileLevels
if (isNaN(levels[i]) || levels[i] < 0 || levels[i] > 1)
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "Quantile level is out of range [0..1]");
permutation[i] = i;
}
iota(permutation.data(), size, Permutation::value_type(0));
::sort(permutation.begin(), permutation.end(), [this] (size_t a, size_t b) { return levels[a] < levels[b]; });
}
};

View File

@ -7,6 +7,7 @@
#include <base/sort.h>
#include <Common/ArenaAllocator.h>
#include <Common/iota.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -30,7 +31,7 @@ std::pair<RanksArray, Float64> computeRanksAndTieCorrection(const Values & value
const size_t size = values.size();
/// Save initial positions, than sort indices according to the values.
std::vector<size_t> indexes(size);
std::iota(indexes.begin(), indexes.end(), 0);
iota(indexes.data(), indexes.size(), size_t(0));
std::sort(indexes.begin(), indexes.end(),
[&] (size_t lhs, size_t rhs) { return values[lhs] < values[rhs]; });

View File

@ -1,5 +1,6 @@
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Common/iota.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
@ -184,7 +185,7 @@ FunctionNodePtr createFusedQuantilesNode(std::vector<QueryTreeNodePtr *> & nodes
{
/// Sort nodes and parameters in ascending order of quantile level
std::vector<size_t> permutation(nodes.size());
std::iota(permutation.begin(), permutation.end(), 0);
iota(permutation.data(), permutation.size(), size_t(0));
std::sort(permutation.begin(), permutation.end(), [&](size_t i, size_t j) { return parameters[i].get<Float64>() < parameters[j].get<Float64>(); });
std::vector<QueryTreeNodePtr *> new_nodes;

View File

@ -1,18 +1,19 @@
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/MaskOperations.h>
#include <Common/assert_cast.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/FieldVisitorToString.h>
#include <Common/SipHash.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/AlignedBuffer.h>
#include <Common/typeid_cast.h>
#include <Common/Arena.h>
#include <Common/WeakHash.h>
#include <Common/FieldVisitorToString.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
namespace DB
@ -626,8 +627,7 @@ void ColumnAggregateFunction::getPermutation(PermutationSortDirection /*directio
{
size_t s = data.size();
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
iota(res.data(), s, IColumn::Permutation::value_type(0));
}
void ColumnAggregateFunction::updatePermutation(PermutationSortDirection, PermutationSortStability,

View File

@ -2,9 +2,10 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h>
#include <Common/typeid_cast.h>
#include <Common/WeakHash.h>
#include <Common/HashTable/Hash.h>
#include <Common/WeakHash.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
#include <base/defines.h>
@ -128,8 +129,7 @@ void ColumnConst::getPermutation(PermutationSortDirection /*direction*/, Permuta
size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const
{
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
iota(res.data(), s, IColumn::Permutation::value_type(0));
}
void ColumnConst::updatePermutation(PermutationSortDirection /*direction*/, PermutationSortStability /*stability*/,

View File

@ -1,10 +1,11 @@
#include <Common/Exception.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <Common/WeakHash.h>
#include <Common/Exception.h>
#include <Common/HashTable/Hash.h>
#include <Common/RadixSort.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <base/sort.h>
@ -163,8 +164,7 @@ void ColumnDecimal<T>::getPermutation(IColumn::PermutationSortDirection directio
if (limit >= data_size)
limit = 0;
for (size_t i = 0; i < data_size; ++i)
res[i] = i;
iota(res.data(), data_size, IColumn::Permutation::value_type(0));
if constexpr (is_arithmetic_v<NativeT> && !is_big_int_v<NativeT>)
{
@ -183,8 +183,7 @@ void ColumnDecimal<T>::getPermutation(IColumn::PermutationSortDirection directio
/// Thresholds on size. Lower threshold is arbitrary. Upper threshold is chosen by the type for histogram counters.
if (data_size >= 256 && data_size <= std::numeric_limits<UInt32>::max() && use_radix_sort)
{
for (size_t i = 0; i < data_size; ++i)
res[i] = i;
iota(res.data(), data_size, IColumn::Permutation::value_type(0));
bool try_sort = false;

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnObject.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Common/iota.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/DataTypeNothing.h>
@ -838,7 +839,7 @@ MutableColumnPtr ColumnObject::cloneResized(size_t new_size) const
void ColumnObject::getPermutation(PermutationSortDirection, PermutationSortStability, size_t, int, Permutation & res) const
{
res.resize(num_rows);
std::iota(res.begin(), res.end(), 0);
iota(res.data(), res.size(), size_t(0));
}
void ColumnObject::compareColumn(const IColumn & rhs, size_t rhs_row_num,

View File

@ -1,11 +1,12 @@
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnTuple.h>
#include <Common/WeakHash.h>
#include <Common/SipHash.h>
#include <Common/HashTable/Hash.h>
#include <Columns/ColumnsCommon.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/HashTable/Hash.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/iota.h>
#include <algorithm>
#include <bit>
@ -499,8 +500,7 @@ void ColumnSparse::getPermutationImpl(IColumn::PermutationSortDirection directio
res.resize(_size);
if (offsets->empty())
{
for (size_t i = 0; i < _size; ++i)
res[i] = i;
iota(res.data(), _size, IColumn::Permutation::value_type(0));
return;
}

View File

@ -1,16 +1,17 @@
#include <Columns/ColumnTuple.h>
#include <base/sort.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/IColumnImpl.h>
#include <Core/Field.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <DataTypes/Serializations/SerializationInfoTuple.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <base/sort.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
#include <DataTypes/Serializations/SerializationInfoTuple.h>
namespace DB
@ -378,8 +379,7 @@ void ColumnTuple::getPermutationImpl(IColumn::PermutationSortDirection direction
{
size_t rows = size();
res.resize(rows);
for (size_t i = 0; i < rows; ++i)
res[i] = i;
iota(res.data(), rows, IColumn::Permutation::value_type(0));
if (limit >= rows)
limit = 0;

View File

@ -1,24 +1,25 @@
#include "ColumnVector.h"
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/MaskOperations.h>
#include <Columns/RadixSortHelper.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteHelpers.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <base/bit_cast.h>
#include <base/scope_guard.h>
#include <base/sort.h>
#include <base/unaligned.h>
#include <Common/Arena.h>
#include <Common/Exception.h>
#include <Common/HashTable/Hash.h>
#include <Common/NaNUtils.h>
#include <Common/RadixSort.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/TargetSpecific.h>
#include <Common/WeakHash.h>
#include <Common/assert_cast.h>
#include <base/sort.h>
#include <base/unaligned.h>
#include <base/bit_cast.h>
#include <base/scope_guard.h>
#include <Common/iota.h>
#include <bit>
#include <cmath>
@ -244,8 +245,7 @@ void ColumnVector<T>::getPermutation(IColumn::PermutationSortDirection direction
if (limit >= data_size)
limit = 0;
for (size_t i = 0; i < data_size; ++i)
res[i] = i;
iota(res.data(), data_size, IColumn::Permutation::value_type(0));
if constexpr (is_arithmetic_v<T> && !is_big_int_v<T>)
{

View File

@ -1,7 +1,8 @@
#include <Common/Arena.h>
#include <Core/Field.h>
#include <Columns/IColumnDummy.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/IColumnDummy.h>
#include <Core/Field.h>
#include <Common/Arena.h>
#include <Common/iota.h>
namespace DB
@ -87,8 +88,7 @@ void IColumnDummy::getPermutation(IColumn::PermutationSortDirection /*direction*
size_t /*limit*/, int /*nan_direction_hint*/, Permutation & res) const
{
res.resize(s);
for (size_t i = 0; i < s; ++i)
res[i] = i;
iota(res.data(), s, IColumn::Permutation::value_type(0));
}
ColumnPtr IColumnDummy::replicate(const Offsets & offsets) const

View File

@ -6,10 +6,11 @@
* implementation.
*/
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
#include <base/sort.h>
#include <algorithm>
#include <Columns/IColumn.h>
#include <base/sort.h>
#include <Common/PODArray.h>
#include <Common/iota.h>
namespace DB
@ -299,8 +300,7 @@ void IColumn::getPermutationImpl(
if (limit >= data_size)
limit = 0;
for (size_t i = 0; i < data_size; ++i)
res[i] = i;
iota(res.data(), data_size, Permutation::value_type(0));
if (limit)
{

View File

@ -1,6 +1,7 @@
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnsNumber.h>
#include <Common/iota.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
#include <gtest/gtest.h>
@ -191,7 +192,7 @@ TEST(ColumnSparse, Permute)
auto [sparse_src, full_src] = createColumns(n, k);
IColumn::Permutation perm(n);
std::iota(perm.begin(), perm.end(), 0);
iota(perm.data(), perm.size(), size_t(0));
std::shuffle(perm.begin(), perm.end(), rng);
auto sparse_dst = sparse_src->permute(perm, limit);

View File

@ -9,7 +9,6 @@
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
@ -17,6 +16,7 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/iota.h>
using namespace DB;
@ -32,8 +32,7 @@ void stableGetColumnPermutation(
size_t size = column.size();
out_permutation.resize(size);
for (size_t i = 0; i < size; ++i)
out_permutation[i] = i;
iota(out_permutation.data(), size, IColumn::Permutation::value_type(0));
std::stable_sort(
out_permutation.begin(),
@ -146,10 +145,7 @@ void assertColumnPermutations(ColumnCreateFunc column_create_func, ValueTransfor
std::vector<std::vector<Field>> ranges(ranges_size);
std::vector<size_t> ranges_permutations(ranges_size);
for (size_t i = 0; i < ranges_size; ++i)
{
ranges_permutations[i] = i;
}
iota(ranges_permutations.data(), ranges_size, IColumn::Permutation::value_type(0));
IColumn::Permutation actual_permutation;
IColumn::Permutation expected_permutation;

View File

@ -288,6 +288,18 @@ The server successfully detected this situation and will download merged part fr
M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \
M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \
\
M(ParallelReplicasHandleRequestMicroseconds, "Time spent processing requests for marks from replicas") \
M(ParallelReplicasHandleAnnouncementMicroseconds, "Time spent processing replicas announcements") \
\
M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \
M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \
M(ParallelReplicasReadAssignedForStealingMarks, "Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash") \
\
M(ParallelReplicasStealingByHashMicroseconds, "Time spent collecting segments meant for stealing by hash") \
M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \
M(ParallelReplicasStealingLeftoversMicroseconds, "Time spent collecting orphaned segments") \
M(ParallelReplicasCollectingOwnedSegmentsMicroseconds, "Time spent collecting segments meant by hash") \
\
M(PerfCpuCycles, "Total cycles. Be wary of what happens during CPU frequency scaling.") \
M(PerfInstructions, "Retired instructions. Be careful, these can be affected by various issues, most notably hardware interrupt counts.") \
M(PerfCacheReferences, "Cache accesses. Usually, this indicates Last Level Cache accesses, but this may vary depending on your CPU. This may include prefetches and coherency messages; again this depends on the design of your CPU.") \

View File

@ -365,7 +365,7 @@ DECLARE_AVX512VBMI2_SPECIFIC_CODE(
FUNCTION_HEADER \
\
name \
FUNCTION_BODY \
FUNCTION_BODY \
/// NOLINTNEXTLINE
#define MULTITARGET_FUNCTION_AVX512BW_AVX512F_AVX2_SSE42(FUNCTION_HEADER, name, FUNCTION_BODY) \

36
src/Common/iota.cpp Normal file
View File

@ -0,0 +1,36 @@
#include <base/defines.h>
#include <Common/iota.h>
#include <Common/TargetSpecific.h>
namespace DB
{
MULTITARGET_FUNCTION_AVX2_SSE42(
MULTITARGET_FUNCTION_HEADER(template <iota_supported_types T> void NO_INLINE),
iotaImpl, MULTITARGET_FUNCTION_BODY((T * begin, size_t count, T first_value) /// NOLINT
{
for (size_t i = 0; i < count; i++)
*(begin + i) = static_cast<T>(first_value + i);
})
)
template <iota_supported_types T>
void iota(T * begin, size_t count, T first_value)
{
#if USE_MULTITARGET_CODE
if (isArchSupported(TargetArch::AVX2))
return iotaImplAVX2(begin, count, first_value);
if (isArchSupported(TargetArch::SSE42))
return iotaImplSSE42(begin, count, first_value);
#endif
return iotaImpl(begin, count, first_value);
}
template void iota(UInt8 * begin, size_t count, UInt8 first_value);
template void iota(UInt32 * begin, size_t count, UInt32 first_value);
template void iota(UInt64 * begin, size_t count, UInt64 first_value);
#if defined(OS_DARWIN)
template void iota(size_t * begin, size_t count, size_t first_value);
#endif
}

34
src/Common/iota.h Normal file
View File

@ -0,0 +1,34 @@
#pragma once
#include <base/types.h>
#include <Common/Concepts.h>
/// This is a replacement for std::iota to use dynamic dispatch
/// Note that is only defined for containers with contiguous memory only
namespace DB
{
/// Make sure to add any new type to the extern declaration at the end of the file and instantiate it in iota.cpp
template <typename T>
concept iota_supported_types = (is_any_of<
T,
UInt8,
UInt32,
UInt64
#if defined(OS_DARWIN)
,
size_t
#endif
>);
template <iota_supported_types T> void iota(T * begin, size_t count, T first_value);
extern template void iota(UInt8 * begin, size_t count, UInt8 first_value);
extern template void iota(UInt32 * begin, size_t count, UInt32 first_value);
extern template void iota(UInt64 * begin, size_t count, UInt64 first_value);
#if defined(OS_DARWIN)
extern template void iota(size_t * begin, size_t count, size_t first_value);
#endif
}

View File

@ -1,5 +1,6 @@
#include <Common/levenshteinDistance.h>
#include <Common/PODArray.h>
#include <Common/iota.h>
#include <Common/levenshteinDistance.h>
namespace DB
{
@ -11,8 +12,7 @@ size_t levenshteinDistance(const String & lhs, const String & rhs)
PODArrayWithStackMemory<size_t, 64> row(n + 1);
for (size_t i = 1; i <= n; ++i)
row[i] = i;
iota(row.data() + 1, n, size_t(1));
for (size_t j = 1; j <= m; ++j)
{

View File

@ -6,6 +6,7 @@
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/Hash.h>
#include <Common/iota.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteHelpers.h>
@ -20,7 +21,7 @@ namespace
std::vector<UInt64> getVectorWithNumbersUpToN(size_t n)
{
std::vector<UInt64> res(n);
std::iota(res.begin(), res.end(), 0);
iota(res.data(), res.size(), UInt64(0));
return res;
}

View File

@ -185,6 +185,7 @@ class IColumn;
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \
\
@ -584,6 +585,7 @@ class IColumn;
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \
M(Bool, deduplicate_blocks_in_dependent_materialized_views, false, "Should deduplicate blocks for materialized views if the block is not a duplicate for the table. Use true to always deduplicate in dependent tables.", 0) \
M(Bool, materialized_views_ignore_errors, false, "Allows to ignore errors for MATERIALIZED VIEW, and deliver original block to the table regardless of MVs", 0) \
M(Bool, ignore_materialized_views_with_dropped_target_table, false, "Ignore MVs with dropped taraget table during pushing to views", 0) \
M(Bool, allow_experimental_refreshable_materialized_view, false, "Allow refreshable materialized views (CREATE MATERIALIZED VIEW <name> REFRESH ...).", 0) \
M(Bool, stop_refreshable_materialized_views_on_startup, false, "On server startup, prevent scheduling of refreshable materialized views, as if with SYSTEM STOP VIEWS. You can manually start them with SYSTEM START VIEWS or SYSTEM START VIEW <name> afterwards. Also applies to newly created views. Has no effect on non-refreshable materialized views.", 0) \
M(Bool, use_compact_format_in_distributed_parts_names, true, "Changes format of directories names for distributed table insert parts.", 0) \

View File

@ -115,6 +115,8 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{{"none", DistributedDDLOutputMode::NONE},
{"throw", DistributedDDLOutputMode::THROW},
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"throw_only_active", DistributedDDLOutputMode::THROW_ONLY_ACTIVE},
{"null_status_on_timeout_only_active", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,

View File

@ -173,6 +173,8 @@ enum class DistributedDDLOutputMode
THROW,
NULL_STATUS_ON_TIMEOUT,
NEVER_THROW,
THROW_ONLY_ACTIVE,
NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE,
};
DECLARE_SETTING_ENUM(DistributedDDLOutputMode)

View File

@ -2,6 +2,7 @@
#include <Dictionaries/IDictionary.h>
#include <Common/CurrentThread.h>
#include <Common/iota.h>
#include <Common/scope_guard_safe.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
@ -53,7 +54,7 @@ public:
LOG_TRACE(dictionary.log, "Will load the dictionary using {} threads (with {} backlog)", shards, backlog);
shards_slots.resize(shards);
std::iota(shards_slots.begin(), shards_slots.end(), 0);
iota(shards_slots.data(), shards_slots.size(), UInt64(0));
for (size_t shard = 0; shard < shards; ++shard)
{

View File

@ -5,6 +5,7 @@
#include <base/sort.h>
#include <Common/iota.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeArray.h>
@ -507,7 +508,7 @@ const IColumn * unrollSimplePolygons(const ColumnPtr & column, Offset & offset)
if (!ptr_polygons)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected a column containing arrays of points");
offset.ring_offsets.assign(ptr_polygons->getOffsets());
std::iota(offset.polygon_offsets.begin(), offset.polygon_offsets.end(), 1);
iota<IColumn::Offsets::value_type>(offset.polygon_offsets.data(), offset.polygon_offsets.size(), IColumn::Offsets::value_type(1));
offset.multi_polygon_offsets.assign(offset.polygon_offsets);
return ptr_polygons->getDataPtr().get();

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <Common/iota.h>
#include <Common/ThreadPool.h>
#include <Poco/Logger.h>
@ -184,7 +185,7 @@ public:
{
setBoundingBox();
std::vector<size_t> order(polygons.size());
std::iota(order.begin(), order.end(), 0);
iota(order.data(), order.size(), size_t(0));
root = makeCell(min_x, min_y, max_x, max_y, order);
}

View File

@ -6,6 +6,7 @@
#include <Functions/FunctionsStringSimilarity.h>
#include <Common/PODArray.h>
#include <Common/UTF8Helpers.h>
#include <Common/iota.h>
#ifdef __SSE4_2__
# include <nmmintrin.h>
@ -246,8 +247,7 @@ struct ByteEditDistanceImpl
ResultType insertion = 0;
ResultType deletion = 0;
for (size_t i = 0; i <= haystack_size; ++i)
distances0[i] = i;
iota(distances0.data(), haystack_size + 1, ResultType(0));
for (size_t pos_needle = 0; pos_needle < needle_size; ++pos_needle)
{

View File

@ -1,5 +1,6 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Common/iota.h>
#include <Common/randomSeed.h>
#include <DataTypes/DataTypeArray.h>
#include <Functions/FunctionFactory.h>
@ -80,7 +81,7 @@ public:
const size_t cur_samples = std::min(num_elements, samples);
indices.resize(num_elements);
std::iota(indices.begin(), indices.end(), prev_array_offset);
iota(indices.data(), indices.size(), prev_array_offset);
std::shuffle(indices.begin(), indices.end(), rng);
for (UInt64 i = 0; i < cur_samples; i++)

View File

@ -7,6 +7,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Common/assert_cast.h>
#include <Common/iota.h>
#include <Common/randomSeed.h>
#include <Common/shuffle.h>
#include <Common/typeid_cast.h>
@ -150,7 +151,7 @@ ColumnPtr FunctionArrayShuffleImpl<Traits>::executeGeneric(const ColumnArray & a
size_t size = offsets.size();
size_t nested_size = array.getData().size();
IColumn::Permutation permutation(nested_size);
std::iota(std::begin(permutation), std::end(permutation), 0);
iota(permutation.data(), permutation.size(), IColumn::Permutation::value_type(0));
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)

View File

@ -1,5 +1,6 @@
#include <Functions/array/arraySort.h>
#include <Functions/FunctionFactory.h>
#include <Functions/array/arraySort.h>
#include <Common/iota.h>
namespace DB
{
@ -55,9 +56,7 @@ ColumnPtr ArraySortImpl<positive, is_partial>::execute(
size_t size = offsets.size();
size_t nested_size = array.getData().size();
IColumn::Permutation permutation(nested_size);
for (size_t i = 0; i < nested_size; ++i)
permutation[i] = i;
iota(permutation.data(), nested_size, IColumn::Permutation::value_type(0));
ColumnArray::Offset current_offset = 0;
for (size_t i = 0; i < size; ++i)

View File

@ -56,8 +56,7 @@ public:
auto column = ColumnUInt64::create();
auto & data = column->getData();
data.resize(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i)
data[i] = i;
iota(data.data(), input_rows_count, UInt64(0));
return column;
}

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Common/iota.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UTF8Helpers.h>
#include <Common/HashTable/HashMap.h>
@ -31,7 +32,7 @@ struct TranslateImpl
if (map_from.size() != map_to.size())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second and third arguments must be the same length");
std::iota(map.begin(), map.end(), 0);
iota(map.data(), map.size(), UInt8(0));
for (size_t i = 0; i < map_from.size(); ++i)
{
@ -129,7 +130,7 @@ struct TranslateUTF8Impl
if (map_from_size != map_to_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second and third arguments must be the same length");
std::iota(map_ascii.begin(), map_ascii.end(), 0);
iota(map_ascii.data(), map_ascii.size(), UInt32(0));
const UInt8 * map_from_ptr = reinterpret_cast<const UInt8 *>(map_from.data());
const UInt8 * map_from_end = map_from_ptr + map_from.size();

View File

@ -412,7 +412,8 @@ void executeQueryWithParallelReplicas(
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
}
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(new_cluster->getShardCount());
auto coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(new_cluster->getShardCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,

View File

@ -215,20 +215,47 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z
}
bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log, const ZooKeeperPtr & zookeeper)
bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log, const ZooKeeperPtr & zookeeper, const std::optional<std::string> & config_host_name)
{
bool host_in_hostlist = false;
std::exception_ptr first_exception = nullptr;
const auto maybe_secure_port = global_context->getTCPPortSecure();
const auto port = global_context->getTCPPort();
if (config_host_name)
{
bool is_local_port = (maybe_secure_port && HostID(*config_host_name, *maybe_secure_port).isLocalAddress(*maybe_secure_port)) ||
HostID(*config_host_name, port).isLocalAddress(port);
if (!is_local_port)
throw Exception(
ErrorCodes::DNS_ERROR,
"{} is not a local address. Check parameter 'host_name' in the configuration",
*config_host_name);
}
for (const HostID & host : entry.hosts)
{
auto maybe_secure_port = global_context->getTCPPortSecure();
if (config_host_name)
{
if (config_host_name != host.host_name)
continue;
if (maybe_secure_port != host.port && port != host.port)
continue;
host_in_hostlist = true;
host_id = host;
host_id_str = host.toString();
break;
}
try
{
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_port
= (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(global_context->getTCPPort());
= (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
if (!is_local_port)
continue;

View File

@ -44,6 +44,9 @@ struct HostID
explicit HostID(const Cluster::Address & address)
: host_name(address.host_name), port(address.port) {}
HostID(const String & host_name_, UInt16 port_)
: host_name(host_name_), port(port_) {}
static HostID fromString(const String & host_port_str);
String toString() const
@ -143,7 +146,7 @@ struct DDLTask : public DDLTaskBase
{
DDLTask(const String & name, const String & path) : DDLTaskBase(name, path) {}
bool findCurrentHostID(ContextPtr global_context, Poco::Logger * log, const ZooKeeperPtr & zookeeper);
bool findCurrentHostID(ContextPtr global_context, Poco::Logger * log, const ZooKeeperPtr & zookeeper, const std::optional<std::string> & config_host_name);
void setClusterInfo(ContextPtr context, Poco::Logger * log);

View File

@ -107,6 +107,9 @@ DDLWorker::DDLWorker(
cleanup_delay_period = config->getUInt64(prefix + ".cleanup_delay_period", static_cast<UInt64>(cleanup_delay_period));
max_tasks_in_queue = std::max<UInt64>(1, config->getUInt64(prefix + ".max_tasks_in_queue", max_tasks_in_queue));
if (config->has(prefix + ".host_name"))
config_host_name = config->getString(prefix + ".host_name");
if (config->has(prefix + ".profile"))
context->setSetting("profile", config->getString(prefix + ".profile"));
}
@ -214,7 +217,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
/// Stage 2: resolve host_id and check if we should execute query or not
/// Multiple clusters can use single DDL queue path in ZooKeeper,
/// So we should skip task if we cannot find current host in cluster hosts list.
if (!task->findCurrentHostID(context, log, zookeeper))
if (!task->findCurrentHostID(context, log, zookeeper, config_host_name))
{
out_reason = "There is no a local address in host list";
return add_to_skip_set();

View File

@ -153,6 +153,8 @@ protected:
ContextMutablePtr context;
Poco::Logger * log;
std::optional<std::string> config_host_name; /// host_name from config
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries

View File

@ -2501,7 +2501,12 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
max_block_size = std::max<UInt64>(1, max_block_limited);
max_threads_execute_query = max_streams = 1;
}
if (max_block_limited < local_limits.local_limits.size_limits.max_rows)
if (local_limits.local_limits.size_limits.max_rows != 0)
{
if (max_block_limited < local_limits.local_limits.size_limits.max_rows)
query_info.limit = max_block_limited;
}
else
{
query_info.limit = max_block_limited;
}

View File

@ -200,8 +200,6 @@ public:
Status prepare() override;
private:
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
@ -228,7 +226,8 @@ private:
NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that were in active state at the last check
Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of execution
@ -237,7 +236,10 @@ private:
Int64 timeout_seconds = 120;
bool is_replicated_database = false;
bool throw_on_timeout = true;
bool only_running_hosts = false;
bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
};
@ -310,12 +312,15 @@ DDLQueryStatusSource::DDLQueryStatusSource(
, log(&Poco::Logger::get("DDLQueryStatusSource"))
{
auto output_mode = context->getSettingsRef().distributed_ddl_output_mode;
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE
|| output_mode == DistributedDDLOutputMode::NONE;
if (hosts_to_wait)
{
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
is_replicated_database = true;
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE;
}
else
{
@ -377,6 +382,38 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
return Chunk(std::move(columns), unfinished_hosts.size());
}
static NameSet getOfflineHosts(const String & node_path, const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper, Poco::Logger * log)
{
fs::path replicas_path;
if (node_path.ends_with('/'))
replicas_path = fs::path(node_path).parent_path().parent_path().parent_path() / "replicas";
else
replicas_path = fs::path(node_path).parent_path().parent_path() / "replicas";
Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(replicas_path / host / "active");
}
NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}
return offline;
}
Chunk DDLQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
@ -398,7 +435,7 @@ Chunk DDLQueryStatusSource::generate()
if (isCancelled())
return {};
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
if (stop_waiting_offline_hosts || (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{
timeout_exceeded = true;
@ -406,7 +443,7 @@ Chunk DDLQueryStatusSource::generate()
size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), "
"There are {} unfinished hosts ({} of them are currently executing the task), "
"they are going to execute the query in background";
if (throw_on_timeout)
{
@ -425,10 +462,7 @@ Chunk DDLQueryStatusSource::generate()
return generateChunkWithUnfinishedHosts();
}
if (num_hosts_finished != 0 || try_number != 0)
{
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
}
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
bool node_exists = false;
Strings tmp_hosts;
@ -440,9 +474,21 @@ Chunk DDLQueryStatusSource::generate()
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
node_exists = zookeeper->exists(node_path);
tmp_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / node_to_wait);
tmp_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active");
Strings paths = {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
auto res = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;
if (only_running_hosts)
offline_hosts = getOfflineHosts(node_path, waiting_hosts, zookeeper, log);
});
}
@ -460,6 +506,17 @@ Chunk DDLQueryStatusSource::generate()
Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number;
if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}
if (new_hosts.empty())
continue;
@ -470,7 +527,13 @@ Chunk DDLQueryStatusSource::generate()
{
ExecutionStatus status(-1, "Cannot obtain error message");
if (node_to_wait == "finished")
/// Replicated database retries in case of error, it should not write error status.
#ifdef ABORT_ON_LOGICAL_ERROR
bool need_check_status = true;
#else
bool need_check_status = !is_replicated_database;
#endif
if (need_check_status)
{
String status_data;
bool finished_exists = false;
@ -496,7 +559,6 @@ Chunk DDLQueryStatusSource::generate()
if (status.code != 0 && !first_exception
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
{
/// Replicated database retries in case of error, it should not write error status.
if (is_replicated_database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
@ -555,15 +617,6 @@ IProcessor::Status DDLQueryStatusSource::prepare()
return ISource::prepare();
}
Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
{
Strings res;
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(code, node_path);
return res;
}
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
{
Strings diff;

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
#include <Functions/FunctionHelpers.h>
#include <Common/iota.h>
#ifdef __SSE2__
#include <emmintrin.h>
@ -155,8 +156,7 @@ void getBlockSortPermutationImpl(const Block & block, const SortDescription & de
{
size_t size = block.rows();
permutation.resize(size);
for (size_t i = 0; i < size; ++i)
permutation[i] = i;
iota(permutation.data(), size, IColumn::Permutation::value_type(0));
if (limit >= size)
limit = 0;

View File

@ -11,6 +11,7 @@
#include <memory>
#include <thread>
#include <Common/iota.h>
#include <Common/randomSeed.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
@ -788,7 +789,7 @@ TEST_F(FileCacheTest, writeBuffer)
/// get random permutation of indexes
std::vector<size_t> indexes(data.size());
std::iota(indexes.begin(), indexes.end(), 0);
iota(indexes.data(), indexes.size(), size_t(0));
std::shuffle(indexes.begin(), indexes.end(), rng);
for (auto i : indexes)

View File

@ -645,7 +645,12 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
max_threads_execute_query = 1;
}
if (max_block_size_limited < select_query_info.local_storage_limits.local_limits.size_limits.max_rows)
if (select_query_info.local_storage_limits.local_limits.size_limits.max_rows != 0)
{
if (max_block_size_limited < select_query_info.local_storage_limits.local_limits.size_limits.max_rows)
table_expression_query_info.limit = max_block_size_limited;
}
else
{
table_expression_query_info.limit = max_block_size_limited;
}

View File

@ -127,7 +127,12 @@ struct Pattern
{
hash.update(rule_type);
hash.update(regexp_str);
hash.update(function->getName());
if (function)
{
hash.update(function->getName());
for (const auto & p : function->getParameters())
hash.update(toString(p));
}
for (const auto & r : retentions)
{
hash.update(r.age);

View File

@ -418,7 +418,13 @@ Pipe ReadFromMergeTree::readFromPool(
&& settings.allow_prefetched_read_pool_for_local_filesystem
&& MergeTreePrefetchedReadPool::checkReadMethodAllowed(reader_settings.read_settings.local_fs_method);
if (allow_prefetched_remote || allow_prefetched_local)
/** Do not use prefetched read pool if query is trivial limit query.
* Because time spend during filling per thread tasks can be greater than whole query
* execution for big tables with small limit.
*/
bool use_prefetched_read_pool = query_info.limit == 0 && (allow_prefetched_remote || allow_prefetched_local);
if (use_prefetched_read_pool)
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
std::move(parts_with_range),

View File

@ -9,6 +9,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/System/StorageSystemNumbers.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
namespace DB
@ -40,11 +41,10 @@ protected:
auto column = ColumnUInt64::create(block_size);
ColumnUInt64::Container & vec = column->getData();
size_t curr = next; /// The local variable for some reason works faster (>20%) than member of class.
UInt64 curr = next; /// The local variable for some reason works faster (>20%) than member of class.
UInt64 * pos = vec.data(); /// This also accelerates the code.
UInt64 * end = &vec[block_size];
while (pos < end)
*pos++ = curr++;
iota(pos, static_cast<size_t>(end - pos), curr);
next += step;
@ -211,17 +211,18 @@ protected:
{
auto start_value_64 = static_cast<UInt64>(start_value);
auto end_value_64 = static_cast<UInt64>(end_value);
while (start_value_64 < end_value_64)
*(pos++) = start_value_64++;
auto size = end_value_64 - start_value_64;
iota(pos, static_cast<size_t>(size), start_value_64);
pos += size;
}
};
if (can_provide > need)
{
UInt64 start_value = first_value(range) + cursor.offset_in_range;
UInt64 end_value = start_value + need; /// end_value will never overflow
while (start_value < end_value)
*(pos++) = start_value++;
/// end_value will never overflow
iota(pos, static_cast<size_t>(need), start_value);
pos += need;
provided += need;
cursor.offset_in_range += need;

View File

@ -1,7 +1,8 @@
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Interpreters/sortBlock.h>
#include <Core/SortCursor.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Common/PODArray.h>
#include <Common/iota.h>
namespace DB
{
@ -36,9 +37,7 @@ size_t getFilterMask(const ColumnRawPtrs & raw_block_columns, const Columns & th
else
{
rows_to_compare.resize(num_rows);
for (size_t i = 0; i < num_rows; ++i)
rows_to_compare[i] = i;
iota(rows_to_compare.data(), num_rows, UInt64(0));
size_t size = description.size();
for (size_t i = 0; i < size; ++i)

View File

@ -39,6 +39,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
}
ThreadStatusesHolder::~ThreadStatusesHolder()
@ -316,7 +317,21 @@ Chain buildPushingToViewsChain(
type = QueryViewsLogElement::ViewType::MATERIALIZED;
result_chain.addTableLock(lock);
StoragePtr inner_table = materialized_view->getTargetTable();
StoragePtr inner_table = materialized_view->tryGetTargetTable();
/// If target table was dropped, ignore this materialized view.
if (!inner_table)
{
if (context->getSettingsRef().ignore_materialized_views_with_dropped_target_table)
continue;
throw Exception(
ErrorCodes::UNKNOWN_TABLE,
"Target table '{}' of view '{}' doesn't exists. To ignore this view use setting "
"ignore_materialized_views_with_dropped_target_table",
materialized_view->getTargetTableId().getFullTableName(),
view_id.getFullTableName());
}
auto inner_table_id = inner_table->getStorageID();
auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();

View File

@ -1,14 +1,12 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/CurrentThread.h>
#include <Common/typeid_cast.h>
#include "Core/UUID.h"
#include <Core/SortDescription.h>
#include <Core/UUID.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <IO/WriteHelpers.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/Executors/PipelineExecutor.h>
@ -25,11 +23,14 @@
#include <Processors/Transforms/ExtremesTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/MergeJoinTransform.h>
#include <Processors/Transforms/PasteJoinTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/PasteJoinTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <QueryPipeline/narrowPipe.h>
#include <Common/CurrentThread.h>
#include <Common/iota.h>
#include <Common/typeid_cast.h>
namespace DB
{
@ -619,8 +620,7 @@ void QueryPipelineBuilder::addPipelineBefore(QueryPipelineBuilder pipeline)
bool has_extremes = pipe.getExtremesPort();
size_t num_extra_ports = (has_totals ? 1 : 0) + (has_extremes ? 1 : 0);
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts() + num_extra_ports);
for (size_t i = 0; i < delayed_streams.size(); ++i)
delayed_streams[i] = i;
iota(delayed_streams.data(), delayed_streams.size(), IProcessor::PortNumbers::value_type(0));
auto * collected_processors = pipe.collected_processors;

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
namespace DB
{
@ -30,12 +31,10 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
settings_,
context_)
, extension(std::move(extension_))
, coordination_mode(CoordinationMode::Default)
{
extension.all_callback(InitialAllRangesAnnouncement(
CoordinationMode::Default,
parts_ranges.getDescriptions(),
extension.number_of_current_replica
));
extension.all_callback(
InitialAllRangesAnnouncement(coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica));
}
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task)
@ -48,7 +47,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id
if (buffered_ranges.empty())
{
auto result = extension.callback(ParallelReadRequest(
CoordinationMode::Default,
coordination_mode,
extension.number_of_current_replica,
pool_settings.min_marks_for_concurrent_read * pool_settings.threads,
/// For Default coordination mode we don't need to pass part names.

View File

@ -31,6 +31,7 @@ private:
mutable std::mutex mutex;
const ParallelReadingExtension extension;
const CoordinationMode coordination_mode;
RangesInDataPartsDescription buffered_ranges;
bool no_more_tasks_available{false};
Poco::Logger * log = &Poco::Logger::get("MergeTreeReadPoolParallelReplicas");

View File

@ -216,6 +216,10 @@ size_t MergeTreeReaderCompact::readRows(
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
/// If we need to read multiple subcolumns from a single column in storage,
/// we will read it this column only once and then reuse to extract all subcolumns.
std::unordered_map<String, ColumnPtr> columns_cache_for_subcolumns;
for (size_t pos = 0; pos < num_columns; ++pos)
{
if (!res_columns[pos])
@ -226,7 +230,7 @@ size_t MergeTreeReaderCompact::readRows(
auto & column = res_columns[pos];
size_t column_size_before_reading = column->size();
readData(columns_to_read[pos], column, from_mark, current_task_last_mark, *column_positions[pos], rows_to_read, columns_for_offsets[pos]);
readData(columns_to_read[pos], column, from_mark, current_task_last_mark, *column_positions[pos], rows_to_read, columns_for_offsets[pos], columns_cache_for_subcolumns);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column != rows_to_read)
@ -265,7 +269,7 @@ size_t MergeTreeReaderCompact::readRows(
void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
ColumnNameLevel name_level_for_offsets)
ColumnNameLevel name_level_for_offsets, std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns)
{
const auto & [name, type] = name_and_type;
std::optional<NameAndTypePair> column_for_offsets;
@ -327,34 +331,54 @@ void MergeTreeReaderCompact::readData(
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
bool columns_cache_was_used = false;
if (name_and_type.isSubcolumn())
{
NameAndTypePair name_type_in_storage{name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()};
ColumnPtr temp_column;
/// In case of reading onlys offset use the correct serialization for reading of the prefix
auto serialization = getSerializationInPart(name_type_in_storage);
ColumnPtr temp_column = name_type_in_storage.type->createColumn(*serialization);
if (column_for_offsets)
auto it = columns_cache_for_subcolumns.find(name_type_in_storage.name);
if (!column_for_offsets && it != columns_cache_for_subcolumns.end())
{
auto serialization_for_prefix = getSerializationInPart(*column_for_offsets);
temp_column = it->second;
auto subcolumn = name_type_in_storage.type->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
if (column->empty())
column = IColumn::mutate(subcolumn);
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
deserialize_settings.getter = buffer_getter_for_prefix;
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
columns_cache_was_used = true;
}
deserialize_settings.getter = buffer_getter;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state, nullptr);
auto subcolumn = name_type_in_storage.type->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
column = subcolumn;
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
{
/// In case of reading only offset use the correct serialization for reading of the prefix
auto serialization = getSerializationInPart(name_type_in_storage);
temp_column = name_type_in_storage.type->createColumn(*serialization);
if (column_for_offsets)
{
auto serialization_for_prefix = getSerializationInPart(*column_for_offsets);
deserialize_settings.getter = buffer_getter_for_prefix;
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
}
deserialize_settings.getter = buffer_getter;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state, nullptr);
if (!column_for_offsets)
columns_cache_for_subcolumns[name_type_in_storage.name] = temp_column;
auto subcolumn = name_type_in_storage.type->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
column = subcolumn;
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
}
}
else
{
@ -374,8 +398,8 @@ void MergeTreeReaderCompact::readData(
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state, nullptr);
}
/// The buffer is left in inconsistent state after reading single offsets
if (name_level_for_offsets.has_value())
/// The buffer is left in inconsistent state after reading single offsets or using columns cache during subcolumns reading.
if (name_level_for_offsets.has_value() || columns_cache_was_used)
last_read_granule.reset();
else
last_read_granule.emplace(from_mark, column_position);

View File

@ -76,7 +76,7 @@ private:
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
size_t current_task_last_mark, size_t column_position,
size_t rows_to_read, ColumnNameLevel name_level_for_offsets);
size_t rows_to_read, ColumnNameLevel name_level_for_offsets, std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns);
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.

View File

@ -1,27 +1,77 @@
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <algorithm>
#include <cmath>
#include <cstddef>
#include <iterator>
#include <map>
#include <mutex>
#include <numeric>
#include <vector>
#include <map>
#include <set>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <vector>
#include <consistent_hashing.h>
#include "Common/Exception.h"
#include <Common/logger_useful.h>
#include <Common/SipHash.h>
#include <Common/thread_local_rng.h>
#include <base/types.h>
#include "IO/WriteBufferFromString.h"
#include <IO/Progress.h>
#include "Storages/MergeTree/RangesInDataPart.h"
#include "Storages/MergeTree/RequestResponse.h"
#include <Storages/MergeTree/MarkRange.h>
#include <IO/WriteBufferFromString.h>
#include <Storages/MergeTree/IntersectionsIndexes.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <base/defines.h>
#include <base/types.h>
#include <boost/algorithm/string/split.hpp>
#include <fmt/core.h>
#include <fmt/format.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Common/logger_useful.h>
#include <Common/thread_local_rng.h>
using namespace DB;
namespace
{
size_t roundDownToMultiple(size_t num, size_t multiple)
{
return (num / multiple) * multiple;
}
size_t
takeFromRange(const MarkRange & range, size_t min_number_of_marks, size_t & current_marks_amount, RangesInDataPartDescription & result)
{
const auto marks_needed = min_number_of_marks - current_marks_amount;
chassert(marks_needed);
auto range_we_take = MarkRange{range.begin, range.begin + std::min(marks_needed, range.getNumberOfMarks())};
if (!result.ranges.empty() && result.ranges.back().end == range_we_take.begin)
/// Can extend the previous range
result.ranges.back().end = range_we_take.end;
else
result.ranges.emplace_back(range_we_take);
current_marks_amount += range_we_take.getNumberOfMarks();
return range_we_take.getNumberOfMarks();
}
}
namespace ProfileEvents
{
extern const Event ParallelReplicasHandleRequestMicroseconds;
extern const Event ParallelReplicasHandleAnnouncementMicroseconds;
extern const Event ParallelReplicasStealingByHashMicroseconds;
extern const Event ParallelReplicasProcessingPartsMicroseconds;
extern const Event ParallelReplicasStealingLeftoversMicroseconds;
extern const Event ParallelReplicasCollectingOwnedSegmentsMicroseconds;
extern const Event ParallelReplicasReadAssignedMarks;
extern const Event ParallelReplicasReadUnassignedMarks;
extern const Event ParallelReplicasReadAssignedForStealingMarks;
}
namespace ProfileEvents
{
@ -58,7 +108,8 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
class ParallelReplicasReadingCoordinator::ImplInterface
@ -68,6 +119,15 @@ public:
{
size_t number_of_requests{0};
size_t sum_marks{0};
/// Marks assigned to the given replica by consistent hash
size_t assigned_to_me = 0;
/// Marks stolen from other replicas
size_t stolen_unassigned = 0;
/// Stolen marks that were assigned for stealing to the given replica by hash. Makes sense only for DefaultCoordinator
size_t stolen_by_hash = 0;
bool is_unavailable{false};
};
using Stats = std::vector<Stat>;
@ -76,7 +136,15 @@ public:
String result = "Statistics: ";
std::vector<String> stats_by_replica;
for (size_t i = 0; i < stats.size(); ++i)
stats_by_replica.push_back(fmt::format("replica {}{} - {{requests: {} marks: {}}}", i, stats[i].is_unavailable ? " is unavailable" : "", stats[i].number_of_requests, stats[i].sum_marks));
stats_by_replica.push_back(fmt::format(
"replica {}{} - {{requests: {} marks: {} assigned_to_me: {} stolen_by_hash: {} stolen_unassigned: {}}}",
i,
stats[i].is_unavailable ? " is unavailable" : "",
stats[i].number_of_requests,
stats[i].sum_marks,
stats[i].assigned_to_me,
stats[i].stolen_by_hash,
stats[i].stolen_unassigned));
result += fmt::format("{}", fmt::join(stats_by_replica, "; "));
return result;
}
@ -92,6 +160,7 @@ public:
{}
virtual ~ImplInterface() = default;
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void markReplicaAsUnavailable(size_t replica_number) = 0;
@ -103,165 +172,227 @@ using Parts = std::set<Part>;
using PartRefs = std::deque<Parts::iterator>;
/// This coordinator relies heavily on the fact that we work with a single shard,
/// i.e. the difference in parts contained in each replica's snapshot is rather negligible (it is only recently inserted or merged parts).
/// So the guarantees we provide here are basically the same as with single-node reading: we will read from parts as their were seen by some node at the moment when query started.
///
/// Knowing that almost each part could be read by each node, we suppose ranges of each part to be available to all the replicas and thus distribute them evenly between them
/// (of course we still check if replica has access to the given part before scheduling a reading from it).
///
/// Of course we want to distribute marks evenly. Looks like it is better to split parts into reasonably small segments of equal size
/// (something between 16 and 128 granules i.e. ~100K and ~1M rows should work).
/// This approach seems to work ok for all three main cases: full scan, reading random sub-ranges and reading only {pre,suf}-fix of parts.
/// Also we could expect that more granular division will make distribution more even up to a certain point.
class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterface
{
public:
using ParallelReadRequestPtr = std::unique_ptr<ParallelReadRequest>;
using PartToMarkRanges = std::map<PartToRead::PartAndProjectionNames, HalfIntervals>;
explicit DefaultCoordinator(size_t replicas_count_)
explicit DefaultCoordinator(size_t replicas_count_, size_t mark_segment_size_)
: ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_)
, reading_state(replicas_count_)
, mark_segment_size(mark_segment_size_)
, replica_status(replicas_count_)
, distribution_by_hash_queue(replicas_count_)
{
if (mark_segment_size == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`");
}
~DefaultCoordinator() override;
struct PartitionReading
{
PartSegments part_ranges;
PartToMarkRanges mark_ranges_in_part;
};
ParallelReadResponse handleRequest(ParallelReadRequest request) override;
using PartitionToBlockRanges = std::map<String, PartitionReading>;
PartitionToBlockRanges partitions;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
private:
/// This many granules will represent a single segment of marks that will be assigned to a replica
const size_t mark_segment_size{0};
size_t sent_initial_requests{0};
bool state_initialized{false};
size_t finished_replicas{0};
Parts all_parts_to_read;
/// Contains only parts which we haven't started to read from
PartRefs delayed_parts;
/// Per-replica preferred parts split by consistent hash
/// Once all task will be done by some replica, it can steal tasks
std::vector<PartRefs> reading_state;
struct ReplicaStatus
{
bool is_finished{false};
bool is_announcement_received{false};
};
std::vector<ReplicaStatus> replica_status;
Poco::Logger * log = &Poco::Logger::get("DefaultCoordinator");
std::atomic<bool> state_initialized{false};
/// Workflow of a segment:
/// 0. `all_parts_to_read` contains all the parts and thus all the segments initially present there (virtually)
/// 1. when we traverse `all_parts_to_read` in selectPartsAndRanges() we either:
/// * take this segment into output
/// * put this segment into `distribution_by_hash_queue` for its owner if it's available and can read from it
/// * otherwise put this segment into `distribution_by_hash_queue` for its stealer_by_hash if it's available and can read from it
/// * otherwise put this segment into `ranges_for_stealing_queue`
/// 2. when we traverse `distribution_by_hash_queue` in `selectPartsAndRanges` we either:
/// * take this segment into output
/// * otherwise put this segment into `distribution_by_hash_queue` for its stealer_by_hash if it's available and can read from it
/// * otherwise put this segment into `ranges_for_stealing_queue`
/// 3. when we figuring out that some replica is unavailable we move all segments from its `distribution_by_hash_queue` to their stealers by hash or to `ranges_for_stealing_queue`
/// 4. when we get the announcement from a replica we move all segments it cannot read to their stealers by hash or to `ranges_for_stealing_queue`
///
/// So, segments always move in one direction down this path (possibly skipping some stops):
/// `all_parts_to_read` -> `distribution_by_hash_queue[owner]` -> `distribution_by_hash_queue[stealer_by_hash]` -> `ranges_for_stealing_queue`
ParallelReadResponse handleRequest(ParallelReadRequest request) override;
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) override;
void markReplicaAsUnavailable(size_t replica_number) override;
/// We take the set of parts announced by this replica as the working set for the whole query.
/// For this replica we know for sure that
/// 1. it sees all the parts from this set
/// 2. it was available in the beginning of execution (since we got announcement), so if it will become unavailable at some point - query will be failed with exception.
/// this means that we can delegate reading of all leftover segments (i.e. segments that were not read by their owner or stealer by hash) to this node
size_t source_replica_for_parts_snapshot{0};
void updateReadingState(InitialAllRangesAnnouncement announcement);
void finalizeReadingState();
/// Parts view from the first announcement we received
std::vector<Part> all_parts_to_read;
size_t computeConsistentHash(const MergeTreePartInfo & info) const
std::unordered_map<std::string, std::unordered_set<size_t>> part_visibility; /// part_name -> set of replicas announced that part
/// We order parts from biggest (= oldest) to newest and steal from newest. Because we assume
/// that they're gonna be merged soon anyway and for them we should already expect worse cache hit.
struct BiggerPartsFirst
{
auto hash = SipHash();
hash.update(info.getPartNameV1());
return ConsistentHashing(hash.get64(), replicas_count);
}
bool operator()(const auto & lhs, const auto & rhs) const { return lhs.info.getBlocksCount() > rhs.info.getBlocksCount(); }
};
void selectPartsAndRanges(const PartRefs & container, size_t replica_num, size_t min_number_of_marks, size_t & current_mark_size, ParallelReadResponse & response) const;
/// We don't precalculate the whole assignment for each node at the start.
/// When replica asks coordinator for a new portion of data to read, it traverses `all_parts_to_read` to find ranges relevant to this replica (by consistent hash).
/// Many hashes are being calculated during this process and just to not loose this time we save the information about all these ranges
/// observed along the way to what node they belong to.
/// Ranges in this queue might belong to a part that the given replica cannot read from - the corresponding check happens later.
/// TODO: consider making it bounded in size
std::vector<std::multiset<RangesInDataPartDescription, BiggerPartsFirst>> distribution_by_hash_queue;
/// For some ranges their owner and stealer (by consistent hash) cannot read from the given part at all. So this range have to be stolen anyway.
/// TODO: consider making it bounded in size
RangesInDataPartsDescription ranges_for_stealing_queue;
/// We take only first replica's set of parts as the whole working set for this query.
/// For other replicas we'll just discard parts that they know, but that weren't present in the first request we received.
/// The second and all subsequent announcements needed only to understand if we can schedule reading from the given part to the given replica.
void initializeReadingState(InitialAllRangesAnnouncement announcement);
void setProgressCallback();
enum class ScanMode
{
/// Main working set for the replica
TakeWhatsMineByHash,
/// We need to steal to optimize tail latency, let's do it by hash nevertheless
TakeWhatsMineForStealing,
/// All bets are off, we need to steal "for correctness" - to not leave any segments unread
TakeEverythingAvailable
};
void selectPartsAndRanges(
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description);
size_t computeConsistentHash(const std::string & part_name, size_t segment_begin, ScanMode scan_mode) const;
void tryToTakeFromDistributionQueue(
size_t replica_num, size_t min_number_of_marks, size_t & current_marks_amount, RangesInDataPartsDescription & description);
void tryToStealFromQueues(
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description);
void tryToStealFromQueue(
auto & queue,
ssize_t owner, /// In case `queue` is `distribution_by_hash_queue[replica]`
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description);
void processPartsFurther(
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description);
bool possiblyCanReadPart(size_t replica, const MergeTreePartInfo & info) const;
void enqueueSegment(const MergeTreePartInfo & info, const MarkRange & segment, size_t owner);
void enqueueToStealerOrStealingQueue(const MergeTreePartInfo & info, const MarkRange & segment);
};
DefaultCoordinator::~DefaultCoordinator()
{
LOG_DEBUG(log, "Coordination done: {}", toString(stats));
try
{
LOG_DEBUG(log, "Coordination done: {}", toString(stats));
}
catch (...)
{
tryLogCurrentException(log);
}
}
void DefaultCoordinator::updateReadingState(InitialAllRangesAnnouncement announcement)
void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement announcement)
{
PartRefs parts_diff;
/// To get rid of duplicates
for (auto && part_ranges: announcement.description)
for (const auto & part : announcement.description)
{
Part part{.description = std::move(part_ranges), .replicas = {announcement.replica_num}};
const MergeTreePartInfo & announced_part = part.description.info;
auto it = std::lower_bound(cbegin(all_parts_to_read), cend(all_parts_to_read), part);
if (it != all_parts_to_read.cend())
{
const MergeTreePartInfo & found_part = it->description.info;
if (found_part == announced_part)
{
/// We have the same part - add the info about presence on current replica
it->replicas.insert(announcement.replica_num);
continue;
}
else
{
/// check if it is covering or covered part
/// need to compare with 2 nearest parts in set, - lesser and greater than the part from the announcement
bool is_disjoint = found_part.isDisjoint(announced_part);
if (it != all_parts_to_read.cbegin() && is_disjoint)
{
const MergeTreePartInfo & lesser_part = (--it)->description.info;
is_disjoint &= lesser_part.isDisjoint(announced_part);
}
if (!is_disjoint)
continue;
}
}
else if (!all_parts_to_read.empty())
{
/// the announced part is greatest - check if it's disjoint with lesser part
const MergeTreePartInfo & lesser_part = all_parts_to_read.crbegin()->description.info;
if (!lesser_part.isDisjoint(announced_part))
continue;
}
auto [insert_it, _] = all_parts_to_read.emplace(std::move(part));
parts_diff.push_back(insert_it);
/// We don't really care here if this part will be included into the working set or not
part_visibility[part.info.getPartNameV1()].insert(announcement.replica_num);
}
/// Split all parts by consistent hash
while (!parts_diff.empty())
/// If state is already initialized - just register availabitily info and leave
if (state_initialized)
return;
for (auto && part : announcement.description)
{
auto current_part_it = parts_diff.front();
parts_diff.pop_front();
auto consistent_hash = computeConsistentHash(current_part_it->description.info);
auto intersecting_it = std::find_if(
all_parts_to_read.begin(),
all_parts_to_read.end(),
[&part](const Part & other) { return !other.description.info.isDisjoint(part.info); });
/// Check whether the new part can easy go to replica queue
if (current_part_it->replicas.contains(consistent_hash))
{
reading_state[consistent_hash].emplace_back(current_part_it);
continue;
}
if (intersecting_it != all_parts_to_read.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Intersecting parts found in announcement");
/// Add to delayed parts
delayed_parts.emplace_back(current_part_it);
all_parts_to_read.push_back(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
}
std::ranges::sort(
all_parts_to_read, [](const Part & lhs, const Part & rhs) { return BiggerPartsFirst()(lhs.description, rhs.description); });
state_initialized = true;
source_replica_for_parts_snapshot = announcement.replica_num;
LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; "));
}
void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
if (stats[replica_number].is_unavailable == false)
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
++unavailable_replicas_count;
stats[replica_number].is_unavailable = true;
if (sent_initial_requests == replicas_count - unavailable_replicas_count)
setProgressCallback();
for (const auto & segment : distribution_by_hash_queue[replica_number])
{
LOG_DEBUG(log, "Replica number {} is unavailable", replica_number);
stats[replica_number].is_unavailable = true;
++unavailable_replicas_count;
if (sent_initial_requests == replicas_count - unavailable_replicas_count)
finalizeReadingState();
chassert(segment.ranges.size() == 1);
enqueueToStealerOrStealingQueue(segment.info, segment.ranges.front());
}
distribution_by_hash_queue[replica_number].clear();
}
void DefaultCoordinator::finalizeReadingState()
void DefaultCoordinator::setProgressCallback()
{
/// Clear all the delayed queue
while (!delayed_parts.empty())
{
auto current_part_it = delayed_parts.front();
auto consistent_hash = computeConsistentHash(current_part_it->description.info);
if (current_part_it->replicas.contains(consistent_hash))
{
reading_state[consistent_hash].emplace_back(current_part_it);
delayed_parts.pop_front();
continue;
}
/// In this situation just assign to a random replica which has this part
auto replica = *(std::next(current_part_it->replicas.begin(), thread_local_rng() % current_part_it->replicas.size()));
reading_state[replica].emplace_back(current_part_it);
delayed_parts.pop_front();
}
// update progress with total rows
// Update progress with total rows
if (progress_callback)
{
size_t total_rows_to_read = 0;
@ -274,116 +405,378 @@ void DefaultCoordinator::finalizeReadingState()
LOG_DEBUG(log, "Total rows to read: {}", total_rows_to_read);
}
LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; "));
}
void DefaultCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{
const auto replica_num = announcement.replica_num;
updateReadingState(std::move(announcement));
LOG_DEBUG(log, "Initial request from replica {}: {}", announcement.replica_num, announcement.describe());
initializeReadingState(std::move(announcement));
if (replica_num >= stats.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica number ({}) is bigger than total replicas count ({})", replica_num, stats.size());
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Replica number ({}) is bigger than total replicas count ({})", replica_num, stats.size());
++stats[replica_num].number_of_requests;
replica_status[replica_num].is_announcement_received = true;
++sent_initial_requests;
LOG_DEBUG(log, "Sent initial requests: {} Replicas count: {}", sent_initial_requests, replicas_count);
if (sent_initial_requests == replicas_count)
finalizeReadingState();
}
setProgressCallback();
void DefaultCoordinator::selectPartsAndRanges(const PartRefs & container, size_t replica_num, size_t min_number_of_marks, size_t & current_mark_size, ParallelReadResponse & response) const
{
for (const auto & part : container)
/// Sift the queue to move out all invisible segments
for (const auto & segment : distribution_by_hash_queue[replica_num])
{
if (current_mark_size >= min_number_of_marks)
if (!part_visibility[segment.info.getPartNameV1()].contains(replica_num))
{
LOG_TEST(log, "Current mark size {} is bigger than min_number_marks {}", current_mark_size, min_number_of_marks);
break;
}
if (part->description.ranges.empty())
{
LOG_TEST(log, "Part {} is already empty in reading state", part->description.info.getPartNameV1());
continue;
}
if (std::find(part->replicas.begin(), part->replicas.end(), replica_num) == part->replicas.end())
{
LOG_TEST(log, "Not found part {} on replica {}", part->description.info.getPartNameV1(), replica_num);
continue;
}
response.description.push_back({
.info = part->description.info,
.ranges = {},
});
while (!part->description.ranges.empty() && current_mark_size < min_number_of_marks)
{
auto & range = part->description.ranges.front();
const size_t needed = min_number_of_marks - current_mark_size;
if (range.getNumberOfMarks() > needed)
{
auto range_we_take = MarkRange{range.begin, range.begin + needed};
response.description.back().ranges.emplace_back(range_we_take);
current_mark_size += range_we_take.getNumberOfMarks();
range.begin += needed;
break;
}
response.description.back().ranges.emplace_back(range);
current_mark_size += range.getNumberOfMarks();
part->description.ranges.pop_front();
chassert(segment.ranges.size() == 1);
enqueueToStealerOrStealingQueue(segment.info, segment.ranges.front());
}
}
}
void DefaultCoordinator::tryToTakeFromDistributionQueue(
size_t replica_num, size_t min_number_of_marks, size_t & current_marks_amount, RangesInDataPartsDescription & description)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasCollectingOwnedSegmentsMicroseconds);
auto & distribution_queue = distribution_by_hash_queue[replica_num];
auto replica_can_read_part = [&](auto replica, const auto & part) { return part_visibility[part.getPartNameV1()].contains(replica); };
RangesInDataPartDescription result;
while (!distribution_queue.empty() && current_marks_amount < min_number_of_marks)
{
if (result.ranges.empty() || distribution_queue.begin()->info != result.info)
{
if (!result.ranges.empty())
/// We're switching to a different part, so have to save currently accumulated ranges
description.push_back(result);
result = {.info = distribution_queue.begin()->info};
}
/// NOTE: this works because ranges are not considered by the comparator
auto & part_ranges = const_cast<RangesInDataPartDescription &>(*distribution_queue.begin());
chassert(part_ranges.ranges.size() == 1);
auto & range = part_ranges.ranges.front();
if (replica_can_read_part(replica_num, part_ranges.info))
{
if (auto taken = takeFromRange(range, min_number_of_marks, current_marks_amount, result); taken == range.getNumberOfMarks())
distribution_queue.erase(distribution_queue.begin());
else
{
range.begin += taken;
break;
}
}
else
{
/// It might be that `replica_num` is the stealer by hash itself - no problem,
/// we'll just have a redundant hash computation inside this function
enqueueToStealerOrStealingQueue(part_ranges.info, range);
distribution_queue.erase(distribution_queue.begin());
}
}
if (!result.ranges.empty())
description.push_back(result);
}
void DefaultCoordinator::tryToStealFromQueues(
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description)
{
auto steal_from_other_replicas = [&]()
{
/// Try to steal from other replicas starting from replicas with longest queues
std::vector<size_t> order(replicas_count);
std::iota(order.begin(), order.end(), 0);
std::ranges::sort(
order, [&](auto lhs, auto rhs) { return distribution_by_hash_queue[lhs].size() > distribution_by_hash_queue[rhs].size(); });
for (auto replica : order)
tryToStealFromQueue(
distribution_by_hash_queue[replica],
replica,
replica_num,
scan_mode,
min_number_of_marks,
current_marks_amount,
description);
};
if (scan_mode == ScanMode::TakeWhatsMineForStealing)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasStealingByHashMicroseconds);
steal_from_other_replicas();
}
else
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasStealingLeftoversMicroseconds);
/// Check orphaned ranges
tryToStealFromQueue(
ranges_for_stealing_queue, /*owner=*/-1, replica_num, scan_mode, min_number_of_marks, current_marks_amount, description);
/// Last hope. In case we haven't yet figured out that some node is unavailable its segments are still in the distribution queue.
steal_from_other_replicas();
}
}
void DefaultCoordinator::tryToStealFromQueue(
auto & queue,
ssize_t owner,
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description)
{
auto replica_can_read_part = [&](auto replica, const auto & part) { return part_visibility[part.getPartNameV1()].contains(replica); };
RangesInDataPartDescription result;
auto it = queue.rbegin();
while (it != queue.rend() && current_marks_amount < min_number_of_marks)
{
auto & part_ranges = const_cast<RangesInDataPartDescription &>(*it);
chassert(part_ranges.ranges.size() == 1);
auto & range = part_ranges.ranges.front();
if (result.ranges.empty() || part_ranges.info != result.info)
{
if (!result.ranges.empty())
/// We're switching to a different part, so have to save currently accumulated ranges
description.push_back(result);
result = {.info = part_ranges.info};
}
if (replica_can_read_part(replica_num, part_ranges.info))
{
bool can_take = false;
if (scan_mode == ScanMode::TakeWhatsMineForStealing)
{
chassert(owner >= 0);
const size_t segment_begin = roundDownToMultiple(range.begin, mark_segment_size);
can_take = computeConsistentHash(part_ranges.info.getPartNameV1(), segment_begin, scan_mode) == replica_num;
}
else
{
/// Don't steal segments with alive owner that sees them
can_take = owner == -1 || stats[owner].is_unavailable || !replica_status[owner].is_announcement_received;
}
if (can_take)
{
if (auto taken = takeFromRange(range, min_number_of_marks, current_marks_amount, result); taken == range.getNumberOfMarks())
{
it = decltype(it)(queue.erase(std::next(it).base()));
continue;
}
else
range.begin += taken;
}
}
++it;
}
if (!result.ranges.empty())
description.push_back(result);
}
void DefaultCoordinator::processPartsFurther(
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasProcessingPartsMicroseconds);
for (const auto & part : all_parts_to_read)
{
if (current_marks_amount >= min_number_of_marks)
{
LOG_TEST(log, "Current mark size {} is bigger than min_number_marks {}", current_marks_amount, min_number_of_marks);
return;
}
RangesInDataPartDescription result{.info = part.description.info};
while (!part.description.ranges.empty() && current_marks_amount < min_number_of_marks)
{
auto & range = part.description.ranges.front();
/// Parts are divided into segments of `mark_segment_size` granules staring from 0-th granule
for (size_t segment_begin = roundDownToMultiple(range.begin, mark_segment_size);
segment_begin < range.end && current_marks_amount < min_number_of_marks;
segment_begin += mark_segment_size)
{
const auto cur_segment
= MarkRange{std::max(range.begin, segment_begin), std::min(range.end, segment_begin + mark_segment_size)};
const auto owner = computeConsistentHash(part.description.info.getPartNameV1(), segment_begin, scan_mode);
if (owner == replica_num)
{
const auto taken = takeFromRange(cur_segment, min_number_of_marks, current_marks_amount, result);
if (taken == range.getNumberOfMarks())
part.description.ranges.pop_front();
else
{
range.begin += taken;
break;
}
}
else
{
chassert(scan_mode == ScanMode::TakeWhatsMineByHash);
enqueueSegment(part.description.info, cur_segment, owner);
range.begin += cur_segment.getNumberOfMarks();
if (range.getNumberOfMarks() == 0)
part.description.ranges.pop_front();
}
}
}
if (!result.ranges.empty())
description.push_back(std::move(result));
}
}
void DefaultCoordinator::selectPartsAndRanges(
size_t replica_num,
ScanMode scan_mode,
size_t min_number_of_marks,
size_t & current_marks_amount,
RangesInDataPartsDescription & description)
{
if (scan_mode == ScanMode::TakeWhatsMineByHash)
{
tryToTakeFromDistributionQueue(replica_num, min_number_of_marks, current_marks_amount, description);
processPartsFurther(replica_num, scan_mode, min_number_of_marks, current_marks_amount, description);
/// We might back-fill `distribution_by_hash_queue` for this replica in `enqueueToStealerOrStealingQueue`
tryToTakeFromDistributionQueue(replica_num, min_number_of_marks, current_marks_amount, description);
}
else
tryToStealFromQueues(replica_num, scan_mode, min_number_of_marks, current_marks_amount, description);
}
bool DefaultCoordinator::possiblyCanReadPart(size_t replica, const MergeTreePartInfo & info) const
{
/// At this point we might not be sure if `owner` can read from the given part.
/// Then we will check it while processing `owner`'s data requests - they are guaranteed to came after the announcement.
return !stats[replica].is_unavailable && !replica_status[replica].is_finished
&& (!replica_status[replica].is_announcement_received || part_visibility.at(info.getPartNameV1()).contains(replica));
}
void DefaultCoordinator::enqueueSegment(const MergeTreePartInfo & info, const MarkRange & segment, size_t owner)
{
if (possiblyCanReadPart(owner, info))
{
/// TODO: optimize me (maybe we can store something lighter than RangesInDataPartDescription)
distribution_by_hash_queue[owner].insert(RangesInDataPartDescription{.info = info, .ranges = {segment}});
LOG_TEST(log, "Segment {} is added to its owner's ({}) queue", segment, owner);
}
else
enqueueToStealerOrStealingQueue(info, segment);
}
void DefaultCoordinator::enqueueToStealerOrStealingQueue(const MergeTreePartInfo & info, const MarkRange & segment)
{
auto && range = RangesInDataPartDescription{.info = info, .ranges = {segment}};
const auto stealer_by_hash = computeConsistentHash(
info.getPartNameV1(), roundDownToMultiple(segment.begin, mark_segment_size), ScanMode::TakeWhatsMineForStealing);
if (possiblyCanReadPart(stealer_by_hash, info))
{
distribution_by_hash_queue[stealer_by_hash].insert(std::move(range));
LOG_TEST(log, "Segment {} is added to its stealer's ({}) queue", segment, stealer_by_hash);
}
else
{
ranges_for_stealing_queue.push_back(std::move(range));
LOG_TEST(log, "Segment {} is added to stealing queue", segment);
}
}
size_t DefaultCoordinator::computeConsistentHash(const std::string & part_name, size_t segment_begin, ScanMode scan_mode) const
{
chassert(segment_begin % mark_segment_size == 0);
auto hash = SipHash();
hash.update(part_name);
hash.update(segment_begin);
hash.update(scan_mode);
return ConsistentHashing(hash.get64(), replicas_count);
}
ParallelReadResponse DefaultCoordinator::handleRequest(ParallelReadRequest request)
{
LOG_TRACE(log, "Handling request from replica {}, minimal marks size is {}", request.replica_num, request.min_number_of_marks);
size_t current_mark_size = 0;
ParallelReadResponse response;
/// 1. Try to select from preferred set of parts for current replica
selectPartsAndRanges(reading_state[request.replica_num], request.replica_num, request.min_number_of_marks, current_mark_size, response);
size_t current_mark_size = 0;
/// 2. Try to use parts from delayed queue
while (!delayed_parts.empty() && current_mark_size < request.min_number_of_marks)
{
auto part = delayed_parts.front();
delayed_parts.pop_front();
reading_state[request.replica_num].emplace_back(part);
selectPartsAndRanges(reading_state[request.replica_num], request.replica_num, request.min_number_of_marks, current_mark_size, response);
}
/// 1. Try to select ranges meant for this replica by consistent hash
selectPartsAndRanges(
request.replica_num, ScanMode::TakeWhatsMineByHash, request.min_number_of_marks, current_mark_size, response.description);
const size_t assigned_to_me = current_mark_size;
/// 3. Try to steal tasks;
if (current_mark_size < request.min_number_of_marks)
{
for (size_t i = 0; i < replicas_count; ++i)
{
if (i != request.replica_num)
selectPartsAndRanges(reading_state[i], request.replica_num, request.min_number_of_marks, current_mark_size, response);
/// 2. Try to steal but with caching again (with different key)
selectPartsAndRanges(
request.replica_num, ScanMode::TakeWhatsMineForStealing, request.min_number_of_marks, current_mark_size, response.description);
const size_t stolen_by_hash = current_mark_size - assigned_to_me;
if (current_mark_size >= request.min_number_of_marks)
break;
}
}
/// 3. Try to steal with no preference. We're trying to postpone it as much as possible.
if (current_mark_size == 0 && request.replica_num == source_replica_for_parts_snapshot)
selectPartsAndRanges(
request.replica_num, ScanMode::TakeEverythingAvailable, request.min_number_of_marks, current_mark_size, response.description);
const size_t stolen_unassigned = current_mark_size - stolen_by_hash - assigned_to_me;
stats[request.replica_num].number_of_requests += 1;
stats[request.replica_num].sum_marks += current_mark_size;
stats[request.replica_num].assigned_to_me += assigned_to_me;
stats[request.replica_num].stolen_by_hash += stolen_by_hash;
stats[request.replica_num].stolen_unassigned += stolen_unassigned;
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadAssignedMarks, assigned_to_me);
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadUnassignedMarks, stolen_unassigned);
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadAssignedForStealingMarks, stolen_by_hash);
if (response.description.empty())
{
response.finish = true;
LOG_TRACE(log, "Going to respond to replica {} with {}", request.replica_num, response.describe());
replica_status[request.replica_num].is_finished = true;
if (++finished_replicas == replicas_count - unavailable_replicas_count)
{
/// Nobody will come to process any more data
if (!ranges_for_stealing_queue.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Some orphaned segments were left unread");
for (size_t replica = 0; replica < replicas_count; ++replica)
if (!distribution_by_hash_queue[replica].empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty distribution_by_hash_queue for replica {}", replica);
}
}
LOG_DEBUG(
log,
"Going to respond to replica {} with {}; mine_marks={}, stolen_by_hash={}, stolen_rest={}",
request.replica_num,
response.describe(),
assigned_to_me,
stolen_by_hash,
stolen_unassigned);
return response;
}
@ -456,6 +849,8 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
std::sort(ranges.begin(), ranges.end());
}
++stats[announcement.replica_num].number_of_requests;
if (new_rows_to_read > 0)
{
Progress progress;
@ -557,6 +952,8 @@ ParallelReadResponse InOrderCoordinator<mode>::handleRequest(ParallelReadRequest
void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasHandleAnnouncementMicroseconds);
std::lock_guard lock(mutex);
if (!pimpl)
@ -570,6 +967,8 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelReadRequest request)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasHandleRequestMicroseconds);
std::lock_guard lock(mutex);
if (!pimpl)
@ -604,7 +1003,7 @@ void ParallelReplicasReadingCoordinator::initialize()
switch (mode)
{
case CoordinationMode::Default:
pimpl = std::make_unique<DefaultCoordinator>(replicas_count);
pimpl = std::make_unique<DefaultCoordinator>(replicas_count, mark_segment_size);
break;
case CoordinationMode::WithOrder:
pimpl = std::make_unique<InOrderCoordinator<CoordinationMode::WithOrder>>(replicas_count);
@ -621,7 +1020,10 @@ void ParallelReplicasReadingCoordinator::initialize()
pimpl->markReplicaAsUnavailable(replica);
}
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_) : replicas_count(replicas_count_) {}
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_)
: replicas_count(replicas_count_), mark_segment_size(mark_segment_size_)
{
}
ParallelReplicasReadingCoordinator::~ParallelReplicasReadingCoordinator() = default;

View File

@ -15,7 +15,7 @@ class ParallelReplicasReadingCoordinator
public:
class ImplInterface;
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_);
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_ = 0);
~ParallelReplicasReadingCoordinator();
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
@ -35,8 +35,8 @@ private:
std::mutex mutex;
size_t replicas_count{0};
size_t mark_segment_size{0};
CoordinationMode mode{CoordinationMode::Default};
std::atomic<bool> initialized{false};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation
std::set<size_t> replicas_used;

View File

@ -73,6 +73,7 @@ public:
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
StorageID getTargetTableId() const;
/// Get the virtual column of the target table;
NamesAndTypesList getVirtuals() const override;
@ -119,7 +120,6 @@ private:
std::tuple<ContextMutablePtr, std::shared_ptr<ASTInsertQuery>> prepareRefresh() const;
StorageID exchangeTargetTable(StorageID fresh_table, ContextPtr refresh_context);
StorageID getTargetTableId() const;
void setTargetTableId(StorageID id);
void updateTargetTableId(std::optional<String> database_name, std::optional<String> table_name);
};

View File

@ -26,4 +26,28 @@
</retention>
</default>
</graphite_rollup_alternative>
<graphite_rollup_alternative_no_function>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>sum</regexp>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</pattern>
<default>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</default>
</graphite_rollup_alternative_no_function>
</clickhouse>

View File

@ -0,0 +1,19 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
<distributed_ddl>
<host_name>node1</host_name>
</distributed_ddl>
</clickhouse>

View File

@ -0,0 +1,76 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_ddl_queue_delete_add_replica(started_cluster):
# Some query started on the cluster, then we deleted some unfinished node
# and added a new node to the cluster. Considering that there are less
# finished nodes than expected and we can't resolve deleted node's hostname
# the queue will be stuck on a new node.
# <host_name> inside <distributed_ddl> allows us to simply discard deleted
# node's hostname by simple comparison without trying to resolve it.
node1.query(
"create table hostname_change on cluster test_cluster (n int) engine=Log"
)
# There's no easy way to change hostname of a container, so let's update values in zk
query_znode = node1.query(
"select max(name) from system.zookeeper where path='/clickhouse/task_queue/ddl'"
)[:-1]
value = (
node1.query(
f"select value from system.zookeeper where path='/clickhouse/task_queue/ddl' and name='{query_znode}' format TSVRaw"
)[:-1]
.replace(
"hosts: ['node1:9000']", "hosts: ['finished_node:9000','deleted_node:9000']"
)
.replace("initiator: node1:9000", "initiator: finished_node:9000")
.replace("\\'", "#")
.replace("'", "\\'")
.replace("\n", "\\n")
.replace("#", "\\'")
)
finished_znode = node1.query(
f"select name from system.zookeeper where path='/clickhouse/task_queue/ddl/{query_znode}/finished' and name like '%node1%'"
)[:-1]
node1.query(
f"insert into system.zookeeper (name, path, value) values ('{query_znode}', '/clickhouse/task_queue/ddl', '{value}')"
)
started_cluster.get_kazoo_client("zoo1").delete(
f"/clickhouse/task_queue/ddl/{query_znode}/finished/{finished_znode}"
)
finished_znode = finished_znode.replace("node1", "finished_node")
node1.query(
f"insert into system.zookeeper (name, path, value) values ('{finished_znode}', '/clickhouse/task_queue/ddl/{query_znode}/finished', '0\\n')"
)
node1.restart_clickhouse(kill=True)
node1.query(
"create table hostname_change2 on cluster test_cluster (n int) engine=Log"
)

View File

@ -0,0 +1,32 @@
<clickhouse>
<remote_servers>
<parallel_replicas_with_unavailable_nodes>
<shard>
<replica>
<host>node0</host>
<port>9000</port>
</replica>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
<replica>
<host>node5</host>
<port>9000</port>
</replica>
</shard>
</parallel_replicas_with_unavailable_nodes>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,156 @@
import json
import pytest
from helpers.cluster import ClickHouseCluster
from random import randint
cluster = ClickHouseCluster(__file__)
cluster_name = "parallel_replicas_with_unavailable_nodes"
nodes = [
cluster.add_instance(
f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for num in range(3)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def _create_tables(table_name, table_size, index_granularity):
for num in range(len(nodes)):
nodes[num].query(f"DROP TABLE IF EXISTS {table_name}")
nodes[num].query(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String)
Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', '{num}')
ORDER BY (key)
SETTINGS index_granularity = {index_granularity}
"""
)
nodes[0].query(
f"""
INSERT INTO {table_name}
SELECT number, toString(number) FROM numbers_mt({table_size})
"""
)
def _create_query(query_tmpl, table_name):
rand_set = [randint(0, 500) for i in range(42)]
return query_tmpl.format(table_name=table_name, rand_set=rand_set)
def _get_result_without_parallel_replicas(query):
return nodes[0].query(
query,
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
},
)
def _get_result_with_parallel_replicas(
query, query_id, cluster_name, parallel_replicas_mark_segment_size
):
return nodes[0].query(
query,
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 6,
"cluster_for_parallel_replicas": f"{cluster_name}",
"parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size,
"query_id": query_id,
},
)
def _get_expected_amount_of_marks_to_read(query):
return json.loads(
nodes[0].query(
f"""
EXPLAIN ESTIMATE
{query}
FORMAT JSONEachRow
"""
)
)["marks"]
def _get_number_of_marks_read_by_replicas(query_id):
nodes[0].query("SYSTEM FLUSH LOGS")
return (
nodes[0]
.query(
f"""
SELECT sum(
ProfileEvents['ParallelReplicasReadAssignedMarks']
+ ProfileEvents['ParallelReplicasReadUnassignedMarks']
+ ProfileEvents['ParallelReplicasReadAssignedForStealingMarks']
)
FROM system.query_log
WHERE query_id = '{query_id}'
"""
)
.strip()
)
@pytest.mark.parametrize(
"query_tmpl",
[
"SELECT sum(cityHash64(*)) FROM {table_name}",
"SELECT sum(cityHash64(*)) FROM {table_name} WHERE intDiv(key, 100) IN {rand_set}",
],
)
@pytest.mark.parametrize(
"table_size",
[1000, 10000, 100000],
)
@pytest.mark.parametrize(
"index_granularity",
[10, 100],
)
@pytest.mark.parametrize(
"parallel_replicas_mark_segment_size",
[1, 10],
)
def test_number_of_marks_read(
start_cluster,
query_tmpl,
table_size,
index_granularity,
parallel_replicas_mark_segment_size,
):
if nodes[0].is_built_with_sanitizer():
pytest.skip("Disabled for sanitizers (too slow)")
table_name = f"tbl_{len(query_tmpl)}_{cluster_name}_{table_size}_{index_granularity}_{parallel_replicas_mark_segment_size}"
_create_tables(table_name, table_size, index_granularity)
if "where" in query_tmpl.lower():
# We need all the replicas to see the same state of parts to make sure that index analysis will pick the same amount of marks for reading
# regardless of which replica's state will be chosen as the working set. This should became redundant once we start to always use initiator's snapshot.
nodes[0].query(f"OPTIMIZE TABLE {table_name} FINAL", settings={"alter_sync": 2})
for node in nodes:
node.query(f"SYSTEM SYNC REPLICA {table_name} STRICT")
query = _create_query(query_tmpl, table_name)
query_id = f"{table_name}_{randint(0, 1e9)}"
assert _get_result_with_parallel_replicas(
query, query_id, cluster_name, parallel_replicas_mark_segment_size
) == _get_result_without_parallel_replicas(query)
assert _get_number_of_marks_read_by_replicas(
query_id
) == _get_expected_amount_of_marks_to_read(query)

View File

@ -1,22 +0,0 @@
<clickhouse>
<remote_servers>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -1,156 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for i in (1, 2, 3)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name):
"""create replicated tables in special way
- each table is populated by equal number of rows
- fetches are disabled, so each replica will have different set of rows
which enforce parallel replicas read from each replica
"""
# create replicated tables
for node in nodes:
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
nodes[0].query(
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1')
ORDER BY (key)"""
)
nodes[1].query(
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2')
ORDER BY (key)"""
)
nodes[2].query(
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3')
ORDER BY (key)"""
)
# stop merges
nodes[0].query(f"system stop merges {table_name}")
nodes[1].query(f"system stop merges {table_name}")
nodes[2].query(f"system stop merges {table_name}")
# stop fetches
nodes[0].query(f"system stop fetches {table_name}")
nodes[1].query(f"system stop fetches {table_name}")
nodes[2].query(f"system stop fetches {table_name}")
# create distributed table
nodes[0].query(f"DROP TABLE IF EXISTS {table_name}_d SYNC")
nodes[0].query(
f"""
CREATE TABLE {table_name}_d AS {table_name}
Engine=Distributed(
{cluster},
currentDatabase(),
{table_name},
rand()
)
"""
)
# populate data, equal number of rows for each replica
nodes[0].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10)",
settings={"distributed_foreground_insert": 1},
)
nodes[0].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10, 10)",
settings={"distributed_foreground_insert": 1},
)
nodes[1].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(20, 10)",
settings={"distributed_foreground_insert": 1},
)
nodes[1].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(30, 10)",
settings={"distributed_foreground_insert": 1},
)
nodes[2].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(40, 10)",
settings={"distributed_foreground_insert": 1},
)
nodes[2].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(50, 10)",
settings={"distributed_foreground_insert": 1},
)
return "60\t0\t59\t1770\n"
@pytest.mark.parametrize(
"prefer_localhost_replica",
[
pytest.param(0),
pytest.param(1),
],
)
def test_read_equally_from_each_replica(start_cluster, prefer_localhost_replica):
"""create and populate table in special way (see create_table()),
so parallel replicas will read equal number of rows from each replica
"""
cluster = "test_single_shard_multiple_replicas"
table_name = "test_table"
expected_result = create_tables(cluster, table_name)
# parallel replicas
assert (
nodes[0].query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"prefer_localhost_replica": prefer_localhost_replica,
"max_parallel_replicas": 3,
},
)
== expected_result
)
# check logs for coordinator statistic
for n in nodes:
n.query("SYSTEM FLUSH LOGS")
# each replica has 2 distinct parts (non-intersecting with another replicas),
# each part less then index granularity, therefore 2 marks for each replica to handle
coordinator_statistic = "replica 0 - {requests: 3 marks: 2}; replica 1 - {requests: 3 marks: 2}; replica 2 - {requests: 3 marks: 2}"
assert (
nodes[0].contains_in_log(coordinator_statistic)
or nodes[1].contains_in_log(coordinator_statistic)
or nodes[2].contains_in_log(coordinator_statistic)
)
# w/o parallel replicas
# start fetches back, otherwise the result will be not as expected
nodes[0].query(f"system start fetches {table_name}")
nodes[1].query(f"system start fetches {table_name}")
nodes[2].query(f"system start fetches {table_name}")
# ensure that replica in sync before querying it to get stable result
nodes[0].query(f"system start merges {table_name}")
nodes[0].query(f"system sync replica {table_name}")
assert (
nodes[0].query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
},
)
== expected_result
)

View File

@ -1,22 +0,0 @@
<clickhouse>
<remote_servers>
<test_single_shard_multiple_replicas>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>n1</host>
<port>9000</port>
</replica>
<replica>
<host>n2</host>
<port>9000</port>
</replica>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
</test_single_shard_multiple_replicas>
</remote_servers>
</clickhouse>

View File

@ -1,140 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = [
cluster.add_instance(
f"n{i}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
for i in (1, 2, 3)
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def create_tables(cluster, table_name, node_with_covering_part):
# create replicated tables
for node in nodes:
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
nodes[0].query(
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r1')
ORDER BY (key)"""
)
nodes[1].query(
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r2')
ORDER BY (key)"""
)
nodes[2].query(
f"""CREATE TABLE IF NOT EXISTS {table_name} (key Int64, value String) Engine=ReplicatedMergeTree('/test_parallel_replicas/shard1/{table_name}', 'r3')
ORDER BY (key)"""
)
# stop merges to keep original parts
# stop fetches to keep only parts created on the nodes
for i in (0, 1, 2):
if i != node_with_covering_part:
nodes[i].query(f"system stop fetches {table_name}")
nodes[i].query(f"system stop merges {table_name}")
# populate data, equal number of rows for each replica
nodes[0].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10)",
)
nodes[0].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(10, 10)"
)
nodes[1].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(20, 10)"
)
nodes[1].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(30, 10)"
)
nodes[2].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(40, 10)"
)
nodes[2].query(
f"INSERT INTO {table_name} SELECT number, number FROM numbers(50, 10)"
)
nodes[node_with_covering_part].query(f"system sync replica {table_name}")
nodes[node_with_covering_part].query(f"optimize table {table_name}")
# check we have expected set of parts
expected_active_parts = ""
if node_with_covering_part == 0:
expected_active_parts = (
"all_0_5_1\nall_2_2_0\nall_3_3_0\nall_4_4_0\nall_5_5_0\n"
)
if node_with_covering_part == 1:
expected_active_parts = (
"all_0_0_0\nall_0_5_1\nall_1_1_0\nall_4_4_0\nall_5_5_0\n"
)
if node_with_covering_part == 2:
expected_active_parts = (
"all_0_0_0\nall_0_5_1\nall_1_1_0\nall_2_2_0\nall_3_3_0\n"
)
assert (
nodes[0].query(
f"select distinct name from clusterAllReplicas({cluster}, system.parts) where table='{table_name}' and active order by name"
)
== expected_active_parts
)
@pytest.mark.parametrize("node_with_covering_part", [0, 1, 2])
def test_covering_part_in_announcement(start_cluster, node_with_covering_part):
"""create and populate table in special way (see create_table()),
node_with_covering_part contains all parts merged into one,
other nodes contain only parts which are result of insert via the node
"""
cluster = "test_single_shard_multiple_replicas"
table_name = "test_table"
create_tables(cluster, table_name, node_with_covering_part)
# query result can be one of the following outcomes
# (1) query result if parallel replicas working set contains all_0_5_1
expected_full_result = "60\t0\t59\t1770\n"
expected_results = {expected_full_result}
# (2) query result if parallel replicas working set DOESN'T contain all_0_5_1
if node_with_covering_part == 0:
expected_results.add("40\t20\t59\t1580\n")
if node_with_covering_part == 1:
expected_results.add("40\t0\t59\t1180\n")
if node_with_covering_part == 2:
expected_results.add("40\t0\t39\t780\n")
# parallel replicas
result = nodes[0].query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 2,
"prefer_localhost_replica": 0,
"max_parallel_replicas": 3,
"use_hedged_requests": 0,
"cluster_for_parallel_replicas": cluster,
},
)
assert result in expected_results
# w/o parallel replicas
assert (
nodes[node_with_covering_part].query(
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}",
settings={
"allow_experimental_parallel_reading_from_replicas": 0,
},
)
== expected_full_result
)

View File

@ -507,7 +507,7 @@ def test_alters_from_different_replicas(started_cluster):
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently active)"
"There are 1 unfinished hosts (0 of them are currently executing the task"
in competing_node.query_and_get_error(
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
settings=settings,

View File

@ -96,7 +96,7 @@ def test_cluster_groups(started_cluster):
main_node_2.stop_clickhouse()
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently active)"
"There are 1 unfinished hosts (0 of them are currently executing the task)"
in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings,

View File

@ -3,7 +3,7 @@ Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57. Error: Table default.none already exists. (TABLE_ALREADY_EXISTS)
(query: create table none on cluster test_shard_localhost (n int) engine=Memory;)
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists none on cluster test_unavailable_shard;)
throw
localhost 9000 0 0 0
@ -12,7 +12,7 @@ Code: 57. Error: Received from localhost:9000. Error: There was an error on [loc
(query: create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;)
localhost 9000 0 1 0
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists throw on cluster test_unavailable_shard;)
null_status_on_timeout
localhost 9000 0 0 0

View File

@ -33,7 +33,7 @@ function run_until_out_contains()
done
}
RAND_COMMENT="01175_DDL_$RANDOM"
RAND_COMMENT="01175_DDL_$CLICKHOUSE_DATABASE"
LOG_COMMENT="${CLICKHOUSE_LOG_COMMENT}_$RAND_COMMENT"
CLICKHOUSE_CLIENT_WITH_SETTINGS=${CLICKHOUSE_CLIENT/--log_comment ${CLICKHOUSE_LOG_COMMENT}/--log_comment ${LOG_COMMENT}}

View File

@ -13,9 +13,15 @@ t
rdb_default 1 1 s1 r1 1
2
2
s1 r1 OK 2 0
s1 r2 QUEUED 2 0
s2 r1 QUEUED 2 0
2
rdb_default 1 1 s1 r1 1
rdb_default 1 2 s1 r2 0
2
2
t
t2
t3
rdb_default_4 1 1 s1 r1 1

View File

@ -32,6 +32,10 @@ $CLICKHOUSE_CLIENT -q "system sync database replica $db"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db' and shard_num=1 and replica_num=1"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it"
# Also check that it doesn't exceed distributed_ddl_task_timeout waiting for inactive replicas
timeout 60s $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=1000 --distributed_ddl_output_mode=throw_only_active -q "create table $db.t2 (n int) engine=Log" 2>&1| grep -Fac "TIMEOUT_EXCEEDED"
timeout 60s $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=1000 --distributed_ddl_output_mode=null_status_on_timeout_only_active -q "create table $db.t3 (n int) engine=Log" | sort
$CLICKHOUSE_CLIENT -q "detach database $db3"
$CLICKHOUSE_CLIENT -q "system drop database replica 'r1' from shard 's2' from database $db"
$CLICKHOUSE_CLIENT -q "attach database $db3" 2>/dev/null

View File

@ -14,7 +14,7 @@ function insert {
offset=500
while true;
do
${CLICKHOUSE_CLIENT} -q "INSERT INTO test_race_condition_landing SELECT number, toString(number), toString(number) from system.numbers limit $i, $offset"
${CLICKHOUSE_CLIENT} -q "INSERT INTO test_race_condition_landing SELECT number, toString(number), toString(number) from system.numbers limit $i, $offset settings ignore_materialized_views_with_dropped_target_table=1"
i=$(( $i + $RANDOM % 100 + 400 ))
done
}

View File

@ -8,13 +8,22 @@ CREATE TABLE t
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r1', legacy_ver)
ORDER BY id;
CREATE TABLE t_r
CREATE TABLE t_r_ok
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2', legacy_ver)
ORDER BY id;
CREATE TABLE t_r_error
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2')
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r3')
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t2
@ -27,14 +36,24 @@ CREATE TABLE t2
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r1', legacy_ver)
ORDER BY id;
CREATE TABLE t2_r
CREATE TABLE t2_r_ok
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver, deleted)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver)
ORDER BY id;
CREATE TABLE t2_r_error
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r3', legacy_ver, deleted)
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t3
@ -46,13 +65,23 @@ CREATE TABLE t3
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r1', metrics1)
ORDER BY key;
CREATE TABLE t3_r
CREATE TABLE t3_r_ok
(
`key` UInt64,
`metrics1` UInt64,
`metrics2` UInt64
)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r2', metrics2)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r2', metrics1)
ORDER BY key;
CREATE TABLE t3_r_error
(
`key` UInt64,
`metrics1` UInt64,
`metrics2` UInt64
)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r3', metrics2)
ORDER BY key; -- { serverError METADATA_MISMATCH }
CREATE TABLE t4
@ -67,7 +96,7 @@ CREATE TABLE t4
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r1', 'graphite_rollup')
ORDER BY key;
CREATE TABLE t4_r
CREATE TABLE t4_r_ok
(
`key` UInt32,
`Path` String,
@ -76,5 +105,30 @@ CREATE TABLE t4_r
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r2', 'graphite_rollup_alternative')
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r2', 'graphite_rollup')
ORDER BY key;
CREATE TABLE t4_r_error
(
`key` UInt32,
`Path` String,
`Time` DateTime('UTC'),
`Value` Float64,
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r3', 'graphite_rollup_alternative')
ORDER BY key; -- { serverError METADATA_MISMATCH }
-- https://github.com/ClickHouse/ClickHouse/issues/58451
CREATE TABLE t4_r_error_2
(
`key` UInt32,
`Path` String,
`Time` DateTime('UTC'),
`Value` Float64,
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r4', 'graphite_rollup_alternative_no_function')
ORDER BY key; -- { serverError METADATA_MISMATCH }

View File

@ -0,0 +1,21 @@
set ignore_materialized_views_with_dropped_target_table = 1;
drop table if exists from_table;
drop table if exists to_table;
drop table if exists mv;
create table from_table (x UInt32) engine=MergeTree order by x;
create table to_table (x UInt32) engine=MergeTree order by x;
create materialized view mv to to_table as select * from from_table;
insert into from_table select 42;
select * from from_table;
select * from to_table;
drop table to_table;
insert into from_table select 42;
select * from from_table;
drop table from_table;
drop view mv;