mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Merge branch 'master' into better-tests-for-data-lakes
This commit is contained in:
commit
54518bf928
6
.github/workflows/nightly.yml
vendored
6
.github/workflows/nightly.yml
vendored
@ -118,9 +118,11 @@ jobs:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
SonarCloud:
|
||||
# TODO: Remove if: whenever SonarCloud supports c++23
|
||||
if: ${{ false }}
|
||||
runs-on: [self-hosted, builder]
|
||||
env:
|
||||
SONAR_SCANNER_VERSION: 4.7.0.2747
|
||||
SONAR_SCANNER_VERSION: 4.8.0.2856
|
||||
SONAR_SERVER_URL: "https://sonarcloud.io"
|
||||
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory # Directory where build-wrapper output will be placed
|
||||
CC: clang-15
|
||||
@ -173,4 +175,4 @@ jobs:
|
||||
--define sonar.cfamily.build-wrapper-output="${{ env.BUILD_WRAPPER_OUT_DIR }}" \
|
||||
--define sonar.projectKey="ClickHouse_ClickHouse" \
|
||||
--define sonar.organization="clickhouse-java" \
|
||||
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql"
|
||||
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql" \
|
||||
|
@ -1045,7 +1045,7 @@ Default value: `0`.
|
||||
|
||||
## background_pool_size {#background_pool_size}
|
||||
|
||||
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
|
||||
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
|
||||
|
||||
Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation).
|
||||
|
||||
@ -1063,8 +1063,8 @@ Default value: 16.
|
||||
|
||||
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
|
||||
|
||||
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
|
||||
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
|
||||
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
|
||||
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
|
||||
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
|
||||
|
||||
Possible values:
|
||||
@ -1079,33 +1079,6 @@ Default value: 2.
|
||||
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
|
||||
```
|
||||
|
||||
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
|
||||
|
||||
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
|
||||
Zero means unlimited.
|
||||
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Any positive integer.
|
||||
|
||||
**Example**
|
||||
|
||||
```xml
|
||||
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
|
||||
```
|
||||
|
||||
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
|
||||
|
||||
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
|
||||
|
||||
Default value: `0.5`.
|
||||
|
||||
**See also**
|
||||
|
||||
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
|
||||
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
|
||||
|
||||
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
|
||||
|
||||
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.
|
||||
|
@ -135,7 +135,6 @@ namespace CurrentMetrics
|
||||
extern const Metric Revision;
|
||||
extern const Metric VersionInteger;
|
||||
extern const Metric MemoryTracking;
|
||||
extern const Metric MergesMutationsMemoryTracking;
|
||||
extern const Metric MaxDDLEntryID;
|
||||
extern const Metric MaxPushedDDLEntryID;
|
||||
}
|
||||
@ -1193,12 +1192,12 @@ try
|
||||
{
|
||||
Settings::checkNoSettingNamesAtTopLevel(*config, config_path);
|
||||
|
||||
ServerSettings server_settings;
|
||||
server_settings.loadSettingsFromConfig(*config);
|
||||
ServerSettings server_settings_;
|
||||
server_settings_.loadSettingsFromConfig(*config);
|
||||
|
||||
size_t max_server_memory_usage = server_settings.max_server_memory_usage;
|
||||
size_t max_server_memory_usage = server_settings_.max_server_memory_usage;
|
||||
|
||||
double max_server_memory_usage_to_ram_ratio = server_settings.max_server_memory_usage_to_ram_ratio;
|
||||
double max_server_memory_usage_to_ram_ratio = server_settings_.max_server_memory_usage_to_ram_ratio;
|
||||
size_t default_max_server_memory_usage = static_cast<size_t>(memory_amount * max_server_memory_usage_to_ram_ratio);
|
||||
|
||||
if (max_server_memory_usage == 0)
|
||||
@ -1226,26 +1225,7 @@ try
|
||||
total_memory_tracker.setDescription("(total)");
|
||||
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
|
||||
|
||||
size_t merges_mutations_memory_usage_soft_limit = server_settings.merges_mutations_memory_usage_soft_limit;
|
||||
|
||||
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings.merges_mutations_memory_usage_to_ram_ratio);
|
||||
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
|
||||
{
|
||||
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
|
||||
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
|
||||
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
|
||||
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
|
||||
formatReadableSizeWithBinarySuffix(memory_amount),
|
||||
server_settings.merges_mutations_memory_usage_to_ram_ratio);
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
|
||||
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
|
||||
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
|
||||
background_memory_tracker.setDescription("(background)");
|
||||
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
|
||||
|
||||
total_memory_tracker.setAllowUseJemallocMemory(server_settings.allow_use_jemalloc_memory);
|
||||
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
|
||||
|
||||
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
|
||||
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
|
||||
@ -1263,23 +1243,23 @@ try
|
||||
|
||||
global_context->setRemoteHostFilter(*config);
|
||||
|
||||
global_context->setMaxTableSizeToDrop(server_settings.max_table_size_to_drop);
|
||||
global_context->setMaxPartitionSizeToDrop(server_settings.max_partition_size_to_drop);
|
||||
global_context->setMaxTableSizeToDrop(server_settings_.max_table_size_to_drop);
|
||||
global_context->setMaxPartitionSizeToDrop(server_settings_.max_partition_size_to_drop);
|
||||
|
||||
ConcurrencyControl::SlotCount concurrent_threads_soft_limit = ConcurrencyControl::Unlimited;
|
||||
if (server_settings.concurrent_threads_soft_limit_num > 0 && server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
|
||||
concurrent_threads_soft_limit = server_settings.concurrent_threads_soft_limit_num;
|
||||
if (server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)
|
||||
if (server_settings_.concurrent_threads_soft_limit_num > 0 && server_settings_.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
|
||||
concurrent_threads_soft_limit = server_settings_.concurrent_threads_soft_limit_num;
|
||||
if (server_settings_.concurrent_threads_soft_limit_ratio_to_cores > 0)
|
||||
{
|
||||
auto value = server_settings.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
|
||||
auto value = server_settings_.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
|
||||
if (value > 0 && value < concurrent_threads_soft_limit)
|
||||
concurrent_threads_soft_limit = value;
|
||||
}
|
||||
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
|
||||
|
||||
global_context->getProcessList().setMaxSize(server_settings.max_concurrent_queries);
|
||||
global_context->getProcessList().setMaxInsertQueriesAmount(server_settings.max_concurrent_insert_queries);
|
||||
global_context->getProcessList().setMaxSelectQueriesAmount(server_settings.max_concurrent_select_queries);
|
||||
global_context->getProcessList().setMaxSize(server_settings_.max_concurrent_queries);
|
||||
global_context->getProcessList().setMaxInsertQueriesAmount(server_settings_.max_concurrent_insert_queries);
|
||||
global_context->getProcessList().setMaxSelectQueriesAmount(server_settings_.max_concurrent_select_queries);
|
||||
|
||||
if (config->has("keeper_server"))
|
||||
global_context->updateKeeperConfiguration(*config);
|
||||
@ -1290,34 +1270,34 @@ try
|
||||
/// This is done for backward compatibility.
|
||||
if (global_context->areBackgroundExecutorsInitialized())
|
||||
{
|
||||
auto new_pool_size = server_settings.background_pool_size;
|
||||
auto new_ratio = server_settings.background_merges_mutations_concurrency_ratio;
|
||||
auto new_pool_size = server_settings_.background_pool_size;
|
||||
auto new_ratio = server_settings_.background_merges_mutations_concurrency_ratio;
|
||||
global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, static_cast<size_t>(new_pool_size * new_ratio));
|
||||
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(server_settings.background_merges_mutations_scheduling_policy.toString());
|
||||
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(server_settings_.background_merges_mutations_scheduling_policy.toString());
|
||||
}
|
||||
|
||||
if (global_context->areBackgroundExecutorsInitialized())
|
||||
{
|
||||
auto new_pool_size = server_settings.background_move_pool_size;
|
||||
auto new_pool_size = server_settings_.background_move_pool_size;
|
||||
global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
|
||||
}
|
||||
|
||||
if (global_context->areBackgroundExecutorsInitialized())
|
||||
{
|
||||
auto new_pool_size = server_settings.background_fetches_pool_size;
|
||||
auto new_pool_size = server_settings_.background_fetches_pool_size;
|
||||
global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
|
||||
}
|
||||
|
||||
if (global_context->areBackgroundExecutorsInitialized())
|
||||
{
|
||||
auto new_pool_size = server_settings.background_common_pool_size;
|
||||
auto new_pool_size = server_settings_.background_common_pool_size;
|
||||
global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
|
||||
}
|
||||
|
||||
global_context->getBufferFlushSchedulePool().increaseThreadsCount(server_settings.background_buffer_flush_schedule_pool_size);
|
||||
global_context->getSchedulePool().increaseThreadsCount(server_settings.background_schedule_pool_size);
|
||||
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings.background_message_broker_schedule_pool_size);
|
||||
global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings.background_distributed_schedule_pool_size);
|
||||
global_context->getBufferFlushSchedulePool().increaseThreadsCount(server_settings_.background_buffer_flush_schedule_pool_size);
|
||||
global_context->getSchedulePool().increaseThreadsCount(server_settings_.background_schedule_pool_size);
|
||||
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings_.background_message_broker_schedule_pool_size);
|
||||
global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings_.background_distributed_schedule_pool_size);
|
||||
|
||||
if (config->has("resources"))
|
||||
{
|
||||
|
@ -53,7 +53,6 @@
|
||||
M(QueryThread, "Number of query processing threads") \
|
||||
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
|
||||
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
|
||||
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
|
||||
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
|
||||
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
|
||||
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \
|
||||
|
@ -96,7 +96,6 @@ using namespace std::chrono_literals;
|
||||
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
||||
|
||||
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
||||
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User);
|
||||
|
||||
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
|
||||
|
||||
@ -529,10 +528,3 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
|
||||
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
|
||||
;
|
||||
}
|
||||
|
||||
bool canEnqueueBackgroundTask()
|
||||
{
|
||||
auto limit = background_memory_tracker.getSoftLimit();
|
||||
auto amount = background_memory_tracker.get();
|
||||
return limit == 0 || amount < limit;
|
||||
}
|
||||
|
@ -110,22 +110,6 @@ public:
|
||||
return amount.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
|
||||
// MemoryTracker for background task may have a non-zero counter.
|
||||
// This method is intended to fix the counter inside of background_memory_tracker.
|
||||
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
|
||||
// of total_memory_tracker.
|
||||
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
|
||||
{
|
||||
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
|
||||
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
|
||||
|
||||
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
|
||||
auto metric_loaded = metric.load(std::memory_order_relaxed);
|
||||
if (metric_loaded != CurrentMetrics::end())
|
||||
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
|
||||
}
|
||||
|
||||
Int64 getPeak() const
|
||||
{
|
||||
return peak.load(std::memory_order_relaxed);
|
||||
@ -236,6 +220,3 @@ public:
|
||||
};
|
||||
|
||||
extern MemoryTracker total_memory_tracker;
|
||||
extern MemoryTracker background_memory_tracker;
|
||||
|
||||
bool canEnqueueBackgroundTask();
|
||||
|
@ -40,8 +40,6 @@ namespace DB
|
||||
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
|
||||
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
|
||||
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
|
||||
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
|
||||
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
|
||||
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
|
||||
\
|
||||
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \
|
||||
|
@ -177,7 +177,7 @@ void TablesLoader::removeUnresolvableDependencies()
|
||||
}
|
||||
|
||||
|
||||
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
|
||||
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool_)
|
||||
{
|
||||
/// Compatibility setting which should be enabled by default on attach
|
||||
/// Otherwise server will be unable to start for some old-format of IPv6/IPv4 types of columns
|
||||
@ -189,12 +189,12 @@ void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
|
||||
|
||||
for (size_t level = 0; level != tables_to_load.size(); ++level)
|
||||
{
|
||||
startLoadingTables(pool, load_context, tables_to_load[level], level);
|
||||
pool.wait();
|
||||
startLoadingTables(pool_, load_context, tables_to_load[level], level);
|
||||
pool_.wait();
|
||||
}
|
||||
}
|
||||
|
||||
void TablesLoader::startLoadingTables(ThreadPool & pool, ContextMutablePtr load_context, const std::vector<StorageID> & tables_to_load, size_t level)
|
||||
void TablesLoader::startLoadingTables(ThreadPool & pool_, ContextMutablePtr load_context, const std::vector<StorageID> & tables_to_load, size_t level)
|
||||
{
|
||||
size_t total_tables = metadata.parsed_tables.size();
|
||||
|
||||
@ -202,7 +202,7 @@ void TablesLoader::startLoadingTables(ThreadPool & pool, ContextMutablePtr load_
|
||||
|
||||
for (const auto & table_id : tables_to_load)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([this, load_context, total_tables, table_name = table_id.getQualifiedName()]()
|
||||
pool_.scheduleOrThrowOnError([this, load_context, total_tables, table_name = table_id.getQualifiedName()]()
|
||||
{
|
||||
const auto & path_and_query = metadata.parsed_tables[table_name];
|
||||
databases[table_name.database]->loadTableFromMetadata(load_context, path_and_query.path, table_name, path_and_query.ast, strictness_mode);
|
||||
|
@ -16,9 +16,6 @@
|
||||
#include <Interpreters/IExternalLoadable.h>
|
||||
|
||||
|
||||
/// Clang mistakenly warns about the names in enum class.
|
||||
#pragma clang diagnostic ignored "-Wshadow"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
using TypeIndexUnderlying = magic_enum::underlying_type_t<TypeIndex>;
|
||||
|
@ -139,13 +139,13 @@ private:
|
||||
void threadWorker(size_t shard)
|
||||
{
|
||||
Block block;
|
||||
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
|
||||
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder_;
|
||||
auto & shard_queue = *shards_queues[shard];
|
||||
|
||||
while (shard_queue.pop(block))
|
||||
{
|
||||
Stopwatch watch;
|
||||
dictionary.blockToAttributes(block, arena_holder, shard);
|
||||
dictionary.blockToAttributes(block, arena_holder_, shard);
|
||||
UInt64 elapsed_ms = watch.elapsedMilliseconds();
|
||||
if (elapsed_ms > 1'000)
|
||||
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {}).", shard, elapsed_ms, block.rows());
|
||||
|
@ -1227,7 +1227,7 @@ Pipe RangeHashedDictionary<dictionary_key_type>::read(const Names & column_names
|
||||
DictionarySourceCoordinator::ReadColumnsFunc read_keys_func = [dictionary_copy = dictionary](
|
||||
const Strings & attribute_names,
|
||||
const DataTypes & result_types,
|
||||
const Columns & key_columns,
|
||||
const Columns & key_columns_,
|
||||
const DataTypes,
|
||||
const Columns &)
|
||||
{
|
||||
@ -1238,15 +1238,15 @@ Pipe RangeHashedDictionary<dictionary_key_type>::read(const Names & column_names
|
||||
Columns result;
|
||||
result.reserve(attribute_names_size);
|
||||
|
||||
const ColumnPtr & key_column = key_columns.back();
|
||||
const ColumnPtr & key_column = key_columns_.back();
|
||||
|
||||
const auto * key_to_index_column = typeid_cast<const ColumnUInt64 *>(key_column.get());
|
||||
if (!key_to_index_column)
|
||||
const auto * key_to_index_column_ = typeid_cast<const ColumnUInt64 *>(key_column.get());
|
||||
if (!key_to_index_column_)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Dictionary {} read expect indexes column with type UInt64",
|
||||
range_dictionary_ptr->getFullName());
|
||||
|
||||
const auto & data = key_to_index_column->getData();
|
||||
const auto & data = key_to_index_column_->getData();
|
||||
|
||||
for (size_t i = 0; i < attribute_names_size; ++i)
|
||||
{
|
||||
|
@ -198,22 +198,22 @@ void RegExpTreeDictionary::initRegexNodes(Block & block)
|
||||
Array keys = (*keys_column)[i].safeGet<Array>();
|
||||
Array values = (*values_column)[i].safeGet<Array>();
|
||||
size_t keys_size = keys.size();
|
||||
for (size_t i = 0; i < keys_size; i++)
|
||||
for (size_t j = 0; j < keys_size; j++)
|
||||
{
|
||||
const String & name = keys[i].safeGet<String>();
|
||||
const String & value = values[i].safeGet<String>();
|
||||
if (structure.hasAttribute(name))
|
||||
const String & name_ = keys[j].safeGet<String>();
|
||||
const String & value = values[j].safeGet<String>();
|
||||
if (structure.hasAttribute(name_))
|
||||
{
|
||||
const auto & attr = structure.getAttribute(name);
|
||||
const auto & attr = structure.getAttribute(name_);
|
||||
auto string_pieces = createStringPieces(value, num_captures, regex, logger);
|
||||
if (!string_pieces.empty())
|
||||
{
|
||||
node->attributes[name] = RegexTreeNode::AttributeValue{.field = values[i], .pieces = std::move(string_pieces)};
|
||||
node->attributes[name_] = RegexTreeNode::AttributeValue{.field = values[j], .pieces = std::move(string_pieces)};
|
||||
}
|
||||
else
|
||||
{
|
||||
Field field = parseStringToField(values[i].safeGet<String>(), attr.type);
|
||||
node->attributes[name] = RegexTreeNode::AttributeValue{.field = std::move(field)};
|
||||
Field field = parseStringToField(values[j].safeGet<String>(), attr.type);
|
||||
node->attributes[name_] = RegexTreeNode::AttributeValue{.field = std::move(field)};
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -424,23 +424,23 @@ bool RegExpTreeDictionary::setAttributes(
|
||||
return attributes_to_set.size() == attributes.size();
|
||||
visited_nodes.emplace(id);
|
||||
const auto & node_attributes = regex_nodes.at(id)->attributes;
|
||||
for (const auto & [name, value] : node_attributes)
|
||||
for (const auto & [name_, value] : node_attributes)
|
||||
{
|
||||
if (!attributes.contains(name) || attributes_to_set.contains(name))
|
||||
if (!attributes.contains(name_) || attributes_to_set.contains(name_))
|
||||
continue;
|
||||
if (value.containsBackRefs())
|
||||
{
|
||||
auto [updated_str, use_default] = processBackRefs(data, regex_nodes.at(id)->searcher, value.pieces);
|
||||
if (use_default)
|
||||
{
|
||||
DefaultValueProvider default_value(attributes.at(name).null_value, defaults.at(name));
|
||||
attributes_to_set[name] = default_value.getDefaultValue(key_index);
|
||||
DefaultValueProvider default_value(attributes.at(name_).null_value, defaults.at(name_));
|
||||
attributes_to_set[name_] = default_value.getDefaultValue(key_index);
|
||||
}
|
||||
else
|
||||
attributes_to_set[name] = parseStringToField(updated_str, attributes.at(name).type);
|
||||
attributes_to_set[name_] = parseStringToField(updated_str, attributes.at(name_).type);
|
||||
}
|
||||
else
|
||||
attributes_to_set[name] = value.field;
|
||||
attributes_to_set[name_] = value.field;
|
||||
}
|
||||
|
||||
auto parent_id = regex_nodes.at(id)->parent_id;
|
||||
@ -541,11 +541,11 @@ std::unordered_map<String, ColumnPtr> RegExpTreeDictionary::match(
|
||||
std::unordered_map<String, MutableColumnPtr> columns;
|
||||
|
||||
/// initialize columns
|
||||
for (const auto & [name, attr] : attributes)
|
||||
for (const auto & [name_, attr] : attributes)
|
||||
{
|
||||
auto col_ptr = attr.type->createColumn();
|
||||
col_ptr->reserve(keys_offsets.size());
|
||||
columns[name] = std::move(col_ptr);
|
||||
columns[name_] = std::move(col_ptr);
|
||||
}
|
||||
|
||||
UInt64 offset = 0;
|
||||
@ -628,25 +628,25 @@ std::unordered_map<String, ColumnPtr> RegExpTreeDictionary::match(
|
||||
break;
|
||||
}
|
||||
|
||||
for (const auto & [name, attr] : attributes)
|
||||
for (const auto & [name_, attr] : attributes)
|
||||
{
|
||||
if (attributes_to_set.contains(name))
|
||||
if (attributes_to_set.contains(name_))
|
||||
continue;
|
||||
|
||||
DefaultValueProvider default_value(attr.null_value, defaults.at(name));
|
||||
columns[name]->insert(default_value.getDefaultValue(key_idx));
|
||||
DefaultValueProvider default_value(attr.null_value, defaults.at(name_));
|
||||
columns[name_]->insert(default_value.getDefaultValue(key_idx));
|
||||
}
|
||||
|
||||
/// insert to columns
|
||||
for (const auto & [name, value] : attributes_to_set)
|
||||
columns[name]->insert(value);
|
||||
for (const auto & [name_, value] : attributes_to_set)
|
||||
columns[name_]->insert(value);
|
||||
|
||||
offset = key_offset;
|
||||
}
|
||||
|
||||
std::unordered_map<String, ColumnPtr> result;
|
||||
for (auto & [name, mutable_ptr] : columns)
|
||||
result.emplace(name, std::move(mutable_ptr));
|
||||
for (auto & [name_, mutable_ptr] : columns)
|
||||
result.emplace(name_, std::move(mutable_ptr));
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -684,8 +684,8 @@ Columns RegExpTreeDictionary::getColumns(
|
||||
defaults);
|
||||
|
||||
Columns result;
|
||||
for (const String & name : attribute_names)
|
||||
result.push_back(columns_map.at(name));
|
||||
for (const String & name_ : attribute_names)
|
||||
result.push_back(columns_map.at(name_));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -229,23 +229,23 @@ void parseMatchNode(UInt64 parent_id, UInt64 & id, const YAML::Node & node, Resu
|
||||
{
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Yaml match rule must contain key {}", key_name);
|
||||
}
|
||||
for (const auto & [key, node] : match)
|
||||
for (const auto & [key, node_] : match)
|
||||
{
|
||||
if (key == key_name)
|
||||
{
|
||||
if (!node.IsScalar())
|
||||
if (!node_.IsScalar())
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "`{}` should be a String", key_name);
|
||||
|
||||
attributes_to_insert.reg_exp = node.as<String>();
|
||||
attributes_to_insert.reg_exp = node_.as<String>();
|
||||
}
|
||||
else if (structure.hasAttribute(key))
|
||||
{
|
||||
attributes_to_insert.keys.push_back(key);
|
||||
attributes_to_insert.values.push_back(node.as<String>());
|
||||
attributes_to_insert.values.push_back(node_.as<String>());
|
||||
}
|
||||
else if (node.IsSequence())
|
||||
else if (node_.IsSequence())
|
||||
{
|
||||
parseMatchList(attributes_to_insert.id, id, node, result, key_name, structure);
|
||||
parseMatchList(attributes_to_insert.id, id, node_, result, key_name, structure);
|
||||
}
|
||||
/// unknown attributes.
|
||||
}
|
||||
|
@ -68,8 +68,7 @@ public:
|
||||
String timezone;
|
||||
if (arguments.size() == 2)
|
||||
{
|
||||
if (arguments[1].column)
|
||||
timezone = extractTimeZoneNameFromColumn(*arguments[1].column);
|
||||
timezone = extractTimeZoneNameFromColumn(arguments[1].column.get(), arguments[1].name);
|
||||
|
||||
if (timezone.empty())
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
|
@ -1068,11 +1068,11 @@ public:
|
||||
FunctionDictGetDescendantsExecutable(
|
||||
String name_,
|
||||
size_t level_,
|
||||
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index,
|
||||
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index_,
|
||||
std::shared_ptr<FunctionDictHelper> dictionary_helper_)
|
||||
: name(std::move(name_))
|
||||
, level(level_)
|
||||
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index))
|
||||
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index_))
|
||||
, dictionary_helper(std::move(dictionary_helper_))
|
||||
{}
|
||||
|
||||
@ -1110,13 +1110,13 @@ public:
|
||||
const DataTypes & argument_types_,
|
||||
const DataTypePtr & result_type_,
|
||||
size_t level_,
|
||||
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index,
|
||||
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index_,
|
||||
std::shared_ptr<FunctionDictHelper> helper_)
|
||||
: name(std::move(name_))
|
||||
, argument_types(argument_types_)
|
||||
, result_type(result_type_)
|
||||
, level(level_)
|
||||
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index))
|
||||
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index_))
|
||||
, helper(std::move(helper_))
|
||||
{}
|
||||
|
||||
|
@ -245,7 +245,8 @@ private:
|
||||
{
|
||||
if (additional_argument_index < arguments.size())
|
||||
{
|
||||
time_zone = extractTimeZoneNameFromColumn(*arguments[additional_argument_index].column);
|
||||
time_zone = extractTimeZoneNameFromColumn(arguments[additional_argument_index].column.get(),
|
||||
arguments[additional_argument_index].name);
|
||||
++additional_argument_index;
|
||||
}
|
||||
}
|
||||
|
@ -17,14 +17,14 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
std::string extractTimeZoneNameFromColumn(const IColumn & column)
|
||||
std::string extractTimeZoneNameFromColumn(const IColumn * column, const String & column_name)
|
||||
{
|
||||
const ColumnConst * time_zone_column = checkAndGetColumnConst<ColumnString>(&column);
|
||||
const ColumnConst * time_zone_column = checkAndGetColumnConst<ColumnString>(column);
|
||||
|
||||
if (!time_zone_column)
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Illegal column {} of time zone argument of function, must be constant string",
|
||||
column.getName());
|
||||
"Illegal column {} of time zone argument of function, must be a constant string",
|
||||
column_name);
|
||||
|
||||
return time_zone_column->getValue<String>();
|
||||
}
|
||||
@ -33,9 +33,9 @@ std::string extractTimeZoneNameFromColumn(const IColumn & column)
|
||||
std::string extractTimeZoneNameFromFunctionArguments(const ColumnsWithTypeAndName & arguments, size_t time_zone_arg_num, size_t datetime_arg_num)
|
||||
{
|
||||
/// Explicit time zone may be passed in last argument.
|
||||
if (arguments.size() == time_zone_arg_num + 1 && arguments[time_zone_arg_num].column)
|
||||
if (arguments.size() == time_zone_arg_num + 1)
|
||||
{
|
||||
return extractTimeZoneNameFromColumn(*arguments[time_zone_arg_num].column);
|
||||
return extractTimeZoneNameFromColumn(arguments[time_zone_arg_num].column.get(), arguments[time_zone_arg_num].name);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -57,7 +57,7 @@ const DateLUTImpl & extractTimeZoneFromFunctionArguments(const ColumnsWithTypeAn
|
||||
{
|
||||
if (arguments.size() == time_zone_arg_num + 1)
|
||||
{
|
||||
std::string time_zone = extractTimeZoneNameFromColumn(*arguments[time_zone_arg_num].column);
|
||||
std::string time_zone = extractTimeZoneNameFromColumn(arguments[time_zone_arg_num].column.get(), arguments[time_zone_arg_num].name);
|
||||
if (time_zone.empty())
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Provided time zone must be non-empty and be a valid time zone");
|
||||
return DateLUT::instance(time_zone);
|
||||
|
@ -12,7 +12,7 @@ namespace DB
|
||||
|
||||
class Block;
|
||||
|
||||
std::string extractTimeZoneNameFromColumn(const IColumn & column);
|
||||
std::string extractTimeZoneNameFromColumn(const IColumn * column, const String & column_name);
|
||||
|
||||
/// Determine working timezone either from optional argument with time zone name or from time zone in DateTime type of argument.
|
||||
/// Returns empty string if default time zone should be used.
|
||||
|
@ -8,7 +8,6 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/assert_cast.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -100,6 +99,7 @@ public:
|
||||
"Should be DateTime or DateTime64", arguments[0].type->getName(), getName());
|
||||
|
||||
String time_zone_name = extractTimeZoneNameFromFunctionArguments(arguments, 1, 0);
|
||||
|
||||
if (which_type.isDateTime())
|
||||
return std::make_shared<DataTypeDateTime>(time_zone_name);
|
||||
|
||||
|
@ -342,10 +342,15 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
|
||||
}
|
||||
}
|
||||
|
||||
static void appendElementsToLogSafe(
|
||||
namespace
|
||||
{
|
||||
|
||||
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
|
||||
|
||||
void appendElementsToLogSafe(
|
||||
AsynchronousInsertLog & log,
|
||||
std::vector<AsynchronousInsertLogElement> elements,
|
||||
std::chrono::time_point<std::chrono::system_clock> flush_time,
|
||||
TimePoint flush_time,
|
||||
const String & flush_query_id,
|
||||
const String & flush_exception)
|
||||
try
|
||||
@ -367,6 +372,8 @@ catch (...)
|
||||
tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// static
|
||||
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
|
||||
try
|
||||
@ -473,8 +480,26 @@ try
|
||||
format->addBuffer(std::move(last_buffer));
|
||||
auto insert_query_id = insert_context->getCurrentQueryId();
|
||||
|
||||
auto finish_entries = [&]
|
||||
{
|
||||
for (const auto & entry : data->entries)
|
||||
{
|
||||
if (!entry->isFinished())
|
||||
entry->finish();
|
||||
}
|
||||
|
||||
if (!log_elements.empty())
|
||||
{
|
||||
auto flush_time = std::chrono::system_clock::now();
|
||||
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, "");
|
||||
}
|
||||
};
|
||||
|
||||
if (total_rows == 0)
|
||||
{
|
||||
finish_entries();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
@ -502,17 +527,7 @@ try
|
||||
throw;
|
||||
}
|
||||
|
||||
for (const auto & entry : data->entries)
|
||||
{
|
||||
if (!entry->isFinished())
|
||||
entry->finish();
|
||||
}
|
||||
|
||||
if (!log_elements.empty())
|
||||
{
|
||||
auto flush_time = std::chrono::system_clock::now();
|
||||
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, "");
|
||||
}
|
||||
finish_entries();
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
|
@ -1525,9 +1525,9 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
|
||||
|
||||
asterisk = true;
|
||||
}
|
||||
else if (auto * function = (*expression)->as<ASTFunction>())
|
||||
else if (auto * func = (*expression)->as<ASTFunction>())
|
||||
{
|
||||
if (use_structure_from_insertion_table_in_table_functions == 2 && findIdentifier(function))
|
||||
if (use_structure_from_insertion_table_in_table_functions == 2 && findIdentifier(func))
|
||||
{
|
||||
use_columns_from_insert_query = false;
|
||||
break;
|
||||
@ -2074,9 +2074,9 @@ BackupsWorker & Context::getBackupsWorker() const
|
||||
const bool allow_concurrent_restores = this->getConfigRef().getBool("backups.allow_concurrent_restores", true);
|
||||
|
||||
const auto & config = getConfigRef();
|
||||
const auto & settings = getSettingsRef();
|
||||
UInt64 backup_threads = config.getUInt64("backup_threads", settings.backup_threads);
|
||||
UInt64 restore_threads = config.getUInt64("restore_threads", settings.restore_threads);
|
||||
const auto & settings_ = getSettingsRef();
|
||||
UInt64 backup_threads = config.getUInt64("backup_threads", settings_.backup_threads);
|
||||
UInt64 restore_threads = config.getUInt64("restore_threads", settings_.restore_threads);
|
||||
|
||||
if (!shared->backups_worker)
|
||||
shared->backups_worker.emplace(backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores);
|
||||
@ -4285,10 +4285,10 @@ ReadSettings Context::getReadSettings() const
|
||||
|
||||
ReadSettings Context::getBackupReadSettings() const
|
||||
{
|
||||
ReadSettings settings = getReadSettings();
|
||||
settings.remote_throttler = getBackupsThrottler();
|
||||
settings.local_throttler = getBackupsThrottler();
|
||||
return settings;
|
||||
ReadSettings settings_ = getReadSettings();
|
||||
settings_.remote_throttler = getBackupsThrottler();
|
||||
settings_.local_throttler = getBackupsThrottler();
|
||||
return settings_;
|
||||
}
|
||||
|
||||
WriteSettings Context::getWriteSettings() const
|
||||
@ -4317,14 +4317,14 @@ std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
|
||||
|
||||
Context::ParallelReplicasMode Context::getParallelReplicasMode() const
|
||||
{
|
||||
const auto & settings = getSettingsRef();
|
||||
const auto & settings_ = getSettingsRef();
|
||||
|
||||
using enum Context::ParallelReplicasMode;
|
||||
if (!settings.parallel_replicas_custom_key.value.empty())
|
||||
if (!settings_.parallel_replicas_custom_key.value.empty())
|
||||
return CUSTOM_KEY;
|
||||
|
||||
if (settings.allow_experimental_parallel_reading_from_replicas
|
||||
&& !settings.use_hedged_requests)
|
||||
if (settings_.allow_experimental_parallel_reading_from_replicas
|
||||
&& !settings_.use_hedged_requests)
|
||||
return READ_TASKS;
|
||||
|
||||
return SAMPLE_KEY;
|
||||
@ -4332,17 +4332,17 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
|
||||
|
||||
bool Context::canUseParallelReplicasOnInitiator() const
|
||||
{
|
||||
const auto & settings = getSettingsRef();
|
||||
const auto & settings_ = getSettingsRef();
|
||||
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
|
||||
&& settings.max_parallel_replicas > 1
|
||||
&& settings_.max_parallel_replicas > 1
|
||||
&& !getClientInfo().collaborate_with_initiator;
|
||||
}
|
||||
|
||||
bool Context::canUseParallelReplicasOnFollower() const
|
||||
{
|
||||
const auto & settings = getSettingsRef();
|
||||
const auto & settings_ = getSettingsRef();
|
||||
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
|
||||
&& settings.max_parallel_replicas > 1
|
||||
&& settings_.max_parallel_replicas > 1
|
||||
&& getClientInfo().collaborate_with_initiator;
|
||||
}
|
||||
|
||||
|
@ -1866,8 +1866,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
{
|
||||
ExpressionActionsChain::Step & step = *chain.steps.at(prewhere_step_num);
|
||||
|
||||
auto required_columns = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
||||
NameSet required_source_columns(required_columns.begin(), required_columns.end());
|
||||
auto required_columns_ = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
||||
NameSet required_source_columns(required_columns_.begin(), required_columns_.end());
|
||||
/// Add required columns to required output in order not to remove them after prewhere execution.
|
||||
/// TODO: add sampling and final execution to common chain.
|
||||
for (const auto & column : additional_required_columns_after_prewhere)
|
||||
|
@ -825,11 +825,11 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
|
||||
if (query.replica.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name is empty");
|
||||
|
||||
auto check_not_local_replica = [](const DatabaseReplicated * replicated, const ASTSystemQuery & query)
|
||||
auto check_not_local_replica = [](const DatabaseReplicated * replicated, const ASTSystemQuery & query_)
|
||||
{
|
||||
if (!query.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query.replica_zk_path))
|
||||
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
|
||||
return;
|
||||
if (replicated->getFullReplicaName() != query.replica)
|
||||
if (replicated->getFullReplicaName() != query_.replica)
|
||||
return;
|
||||
|
||||
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "There is a local database {}, which has the same path in ZooKeeper "
|
||||
|
@ -301,7 +301,7 @@ void JoinedTables::rewriteDistributedInAndJoins(ASTPtr & query)
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & select_query)
|
||||
std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & select_query_)
|
||||
{
|
||||
if (tables_with_columns.size() < 2)
|
||||
return {};
|
||||
@ -310,7 +310,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
|
||||
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
|
||||
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());
|
||||
|
||||
const ASTTablesInSelectQueryElement * ast_join = select_query.join();
|
||||
const ASTTablesInSelectQueryElement * ast_join = select_query_.join();
|
||||
const auto & table_to_join = ast_join->table_expression->as<ASTTableExpression &>();
|
||||
|
||||
/// TODO This syntax does not support specifying a database name.
|
||||
@ -352,16 +352,16 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
|
||||
|
||||
if (!table_join->isSpecialStorage() &&
|
||||
settings.enable_optimize_predicate_expression)
|
||||
replaceJoinedTable(select_query);
|
||||
replaceJoinedTable(select_query_);
|
||||
|
||||
return table_join;
|
||||
}
|
||||
|
||||
void JoinedTables::reset(const ASTSelectQuery & select_query)
|
||||
void JoinedTables::reset(const ASTSelectQuery & select_query_)
|
||||
{
|
||||
table_expressions = getTableExpressions(select_query);
|
||||
left_table_expression = extractTableExpression(select_query, 0);
|
||||
left_db_and_table = getDatabaseAndTable(select_query, 0);
|
||||
table_expressions = getTableExpressions(select_query_);
|
||||
left_table_expression = extractTableExpression(select_query_, 0);
|
||||
left_db_and_table = getDatabaseAndTable(select_query_, 0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -84,7 +84,6 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex
|
||||
group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
|
||||
group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
|
||||
group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
|
||||
group->memory_tracker.setParent(&background_memory_tracker);
|
||||
if (settings.memory_tracker_fault_probability > 0.0)
|
||||
group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
|
||||
|
||||
|
@ -15,23 +15,18 @@ ColumnPtr tryGetColumnFromBlock(const Block & block, const NameAndTypePair & req
|
||||
if (!elem)
|
||||
return nullptr;
|
||||
|
||||
DataTypePtr elem_type;
|
||||
ColumnPtr elem_column;
|
||||
auto elem_type = elem->type;
|
||||
auto elem_column = elem->column->decompress();
|
||||
|
||||
if (requested_column.isSubcolumn())
|
||||
{
|
||||
auto subcolumn_name = requested_column.getSubcolumnName();
|
||||
elem_type = elem->type->tryGetSubcolumnType(subcolumn_name);
|
||||
elem_column = elem->type->tryGetSubcolumn(subcolumn_name, elem->column);
|
||||
elem_column = elem_type->tryGetSubcolumn(subcolumn_name, elem_column);
|
||||
elem_type = elem_type->tryGetSubcolumnType(subcolumn_name);
|
||||
|
||||
if (!elem_type || !elem_column)
|
||||
return nullptr;
|
||||
}
|
||||
else
|
||||
{
|
||||
elem_type = elem->type;
|
||||
elem_column = elem->column;
|
||||
}
|
||||
|
||||
return castColumn({elem_column, elem_type, ""}, requested_column.type);
|
||||
}
|
||||
|
@ -607,8 +607,8 @@ std::shared_ptr<DirectKeyValueJoin> tryDirectJoin(const std::shared_ptr<TableJoi
|
||||
|
||||
for (const auto & right_table_expression_column : right_table_expression_header)
|
||||
{
|
||||
const auto * table_column_name = right_table_expression_data.getColumnNameOrNull(right_table_expression_column.name);
|
||||
if (!table_column_name)
|
||||
const auto * table_column_name_ = right_table_expression_data.getColumnNameOrNull(right_table_expression_column.name);
|
||||
if (!table_column_name_)
|
||||
return {};
|
||||
|
||||
auto right_table_expression_column_with_storage_column_name = right_table_expression_column;
|
||||
|
@ -647,9 +647,9 @@ QueryPipelineBuilder StorageLiveView::completeQuery(Pipes pipes)
|
||||
}
|
||||
else
|
||||
{
|
||||
auto inner_blocks_query = getInnerBlocksQuery();
|
||||
auto inner_blocks_query_ = getInnerBlocksQuery();
|
||||
block_context->addExternalTable(getBlocksTableName(), std::move(blocks_storage_table_holder));
|
||||
InterpreterSelectQuery interpreter(inner_blocks_query,
|
||||
InterpreterSelectQuery interpreter(inner_blocks_query_,
|
||||
block_context,
|
||||
StoragePtr(),
|
||||
nullptr,
|
||||
|
@ -78,9 +78,4 @@ MergeInfo MergeListElement::getInfo() const
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeListElement::~MergeListElement()
|
||||
{
|
||||
background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -113,8 +113,6 @@ struct MergeListElement : boost::noncopyable
|
||||
MergeListElement * ptr() { return this; }
|
||||
|
||||
MergeListElement & ref() { return *this; }
|
||||
|
||||
~MergeListElement();
|
||||
};
|
||||
|
||||
/** Maintains a list of currently running merges.
|
||||
|
@ -179,8 +179,6 @@ void ReplicatedMergeTreeAttachThread::runImpl()
|
||||
/// don't allow to reinitialize them, delete each of them immediately.
|
||||
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
|
||||
storage.clearOldWriteAheadLogs();
|
||||
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
|
||||
storage.clearOldBrokenPartsFromDetachedDirectory();
|
||||
|
||||
storage.createNewZooKeeperNodes();
|
||||
storage.syncPinnedPartUUIDs();
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <base/sort.h>
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/ProfileEventsScope.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
@ -42,7 +41,6 @@
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <fmt/core.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -920,14 +918,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
|
||||
|
||||
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
|
||||
|
||||
if (!canEnqueueBackgroundTask())
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
|
||||
}
|
||||
else if (partition_id.empty())
|
||||
if (partition_id.empty())
|
||||
{
|
||||
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
|
||||
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;
|
||||
|
@ -5,7 +5,6 @@
|
||||
|
||||
#include <base/hex.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/ProfileEventsScope.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
@ -3224,14 +3223,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
|
||||
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
|
||||
size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations;
|
||||
if (!canEnqueueBackgroundTask())
|
||||
{
|
||||
LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate."
|
||||
"Current background tasks memory usage: {}.",
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()),
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()));
|
||||
}
|
||||
else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
|
||||
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
|
||||
{
|
||||
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
|
||||
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",
|
||||
|
@ -725,12 +725,12 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery()
|
||||
|
||||
if (!is_time_column_func_now)
|
||||
{
|
||||
auto query = select_query->clone();
|
||||
auto query_ = select_query->clone();
|
||||
DropTableIdentifierMatcher::Data drop_table_identifier_data;
|
||||
DropTableIdentifierMatcher::Visitor(drop_table_identifier_data).visit(query);
|
||||
DropTableIdentifierMatcher::Visitor(drop_table_identifier_data).visit(query_);
|
||||
|
||||
WindowFunctionMatcher::Data query_info_data;
|
||||
WindowFunctionMatcher::Visitor(query_info_data).visit(query);
|
||||
WindowFunctionMatcher::Visitor(query_info_data).visit(query_);
|
||||
|
||||
auto order_by = std::make_shared<ASTExpressionList>();
|
||||
auto order_by_elem = std::make_shared<ASTOrderByElement>();
|
||||
@ -749,12 +749,12 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery()
|
||||
return select_with_union_query;
|
||||
}
|
||||
|
||||
ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id)
|
||||
ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id_)
|
||||
{
|
||||
/// We will create a query to create an internal table.
|
||||
auto inner_create_query = std::make_shared<ASTCreateQuery>();
|
||||
inner_create_query->setDatabase(inner_table_id.getDatabaseName());
|
||||
inner_create_query->setTable(inner_table_id.getTableName());
|
||||
inner_create_query->setDatabase(inner_table_id_.getDatabaseName());
|
||||
inner_create_query->setTable(inner_table_id_.getTableName());
|
||||
|
||||
Aliases aliases;
|
||||
QueryAliasesVisitor(aliases).visit(inner_query);
|
||||
@ -1208,7 +1208,7 @@ StorageWindowView::StorageWindowView(
|
||||
if (has_inner_target_table)
|
||||
{
|
||||
/// create inner target table
|
||||
auto create_context = Context::createCopy(context_);
|
||||
auto create_context_ = Context::createCopy(context_);
|
||||
auto target_create_query = std::make_shared<ASTCreateQuery>();
|
||||
target_create_query->setDatabase(table_id_.database_name);
|
||||
target_create_query->setTable(generateTargetTableName(table_id_));
|
||||
@ -1219,9 +1219,9 @@ StorageWindowView::StorageWindowView(
|
||||
target_create_query->set(target_create_query->columns_list, new_columns_list);
|
||||
target_create_query->set(target_create_query->storage, query.storage->ptr());
|
||||
|
||||
InterpreterCreateQuery create_interpreter(target_create_query, create_context);
|
||||
create_interpreter.setInternal(true);
|
||||
create_interpreter.execute();
|
||||
InterpreterCreateQuery create_interpreter_(target_create_query, create_context_);
|
||||
create_interpreter_.setInternal(true);
|
||||
create_interpreter_.execute();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,38 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance("node")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_memory_limit_success():
|
||||
node.query(
|
||||
"CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)"
|
||||
)
|
||||
node.query("SYSTEM STOP MERGES test_merge_oom")
|
||||
node.query(
|
||||
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)"
|
||||
)
|
||||
node.query(
|
||||
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)"
|
||||
)
|
||||
node.query(
|
||||
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)"
|
||||
)
|
||||
_, error = node.query_and_get_answer_with_error(
|
||||
"SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL"
|
||||
)
|
||||
|
||||
assert not error
|
||||
node.query("DROP TABLE test_merge_oom")
|
@ -20,3 +20,17 @@ SELECT toTimeZone(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), 'Asia/Kolk
|
||||
SELECT toString(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'));
|
||||
SELECT toString(toTimeZone(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), 'Asia/Kolkata'));
|
||||
SELECT toString(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), 'Asia/Kolkata');
|
||||
|
||||
SELECT toTimeZone(dt, tz) FROM (
|
||||
SELECT toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul') AS dt, arrayJoin(['Asia/Kolkata', 'UTC']) AS tz
|
||||
); -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT materialize('Asia/Kolkata') t, toTimeZone(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), t); -- { serverError ILLEGAL_COLUMN }
|
||||
|
||||
CREATE TEMPORARY TABLE tmp AS SELECT arrayJoin(['Europe/Istanbul', 'Asia/Istanbul']);
|
||||
SELECT toTimeZone(now(), (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT now((*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT now64(1, (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT toStartOfInterval(now(), INTERVAL 3 HOUR, (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT snowflakeToDateTime(toInt64(123), (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT toUnixTimestamp(now(), (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
SELECT toDateTimeOrDefault('2023-04-12 16:43:32', (*,).1, now()) FROM tmp; -- { serverError ILLEGAL_COLUMN }
|
||||
|
@ -0,0 +1,2 @@
|
||||
1 foo ['0','1','2','3','4'] {'k1':'v1'}
|
||||
2 bar ['0','1','2','3','4'] {'k2':'v2'}
|
@ -0,0 +1,11 @@
|
||||
DROP TABLE IF EXISTS t_memory_compressed;
|
||||
|
||||
CREATE TABLE t_memory_compressed (id UInt64, s String, arr Array(LowCardinality(String)), m Map(String, String))
|
||||
ENGINE = Memory SETTINGS compress = 1;
|
||||
|
||||
INSERT INTO t_memory_compressed VALUES (1, 'foo', range(5), map('k1', 'v1'));
|
||||
INSERT INTO t_memory_compressed VALUES (2, 'bar', range(5), map('k2', 'v2'));
|
||||
|
||||
SELECT * FROM t_memory_compressed ORDER BY id;
|
||||
|
||||
DROP TABLE t_memory_compressed;
|
@ -0,0 +1,2 @@
|
||||
0
|
||||
Ok 0
|
18
tests/queries/0_stateless/02714_async_inserts_empty_data.sh
Executable file
18
tests/queries/0_stateless/02714_async_inserts_empty_data.sh
Executable file
@ -0,0 +1,18 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_insert_empty_data"
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_async_insert_empty_data (id UInt32) ENGINE = Memory"
|
||||
|
||||
echo -n '' | ${CLICKHOUSE_CURL} -sS "$url&query=INSERT%20INTO%20t_async_insert_empty_data%20FORMAT%20JSONEachRow" --data-binary @-
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_async_insert_empty_data"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT status, bytes FROM system.asynchronous_insert_log WHERE database = '$CLICKHOUSE_DATABASE' AND table = 't_async_insert_empty_data'"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_insert_empty_data"
|
Loading…
Reference in New Issue
Block a user