From 785f3ac5c9297dc70cfd5b0dbb9790b869551436 Mon Sep 17 00:00:00 2001 From: avogar Date: Fri, 21 Jun 2024 18:40:49 +0000 Subject: [PATCH] Revert unrelated changes --- .../domains/data-types-binary-encoding.md | 0 src/Core/Settings.h | 4 ++ src/Core/SettingsChangesHistory.h | 4 ++ src/Formats/FormatFactory.cpp | 2 + src/Formats/FormatSettings.h | 2 + src/Formats/NativeReader.cpp | 2 +- src/Interpreters/Context.cpp | 64 +++++++++++++++++-- src/Server/TCPHandler.cpp | 24 ++++--- 8 files changed, 87 insertions(+), 15 deletions(-) delete mode 100644 docs/en/sql-reference/data-types/domains/data-types-binary-encoding.md diff --git a/docs/en/sql-reference/data-types/domains/data-types-binary-encoding.md b/docs/en/sql-reference/data-types/domains/data-types-binary-encoding.md deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 23c5d7fc1a2..65146a65a0f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -31,6 +31,7 @@ class IColumn; * for tracking settings changes in different versions and for special `compatibility` setting to work correctly. */ +// clang-format off #define COMMON_SETTINGS(M, ALIAS) \ M(Dialect, dialect, Dialect::clickhouse, "Which dialect will be used to parse query", 0)\ M(UInt64, min_compress_block_size, 65536, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.", 0) \ @@ -933,6 +934,7 @@ class IColumn; M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.", 0) \ M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \ M(Bool, allow_deprecated_error_prone_window_functions, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)", 0) \ + M(Bool, uniform_snowflake_conversion_functions, true, "Enables functions snowflakeIDToDateTime[64] and dateTime[64]ToSnowflakeID while disabling functions snowflakeToDateTime[64] and dateTime[64]ToSnowflake.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS. @@ -1149,6 +1151,8 @@ class IColumn; M(UInt64, output_format_pretty_max_value_width_apply_for_single_value, false, "Only cut values (see the `output_format_pretty_max_value_width` setting) when it is not a single value in a block. Otherwise output it entirely, which is useful for the `SHOW CREATE TABLE` query.", 0) \ M(UInt64Auto, output_format_pretty_color, "auto", "Use ANSI escape sequences in Pretty formats. 0 - disabled, 1 - enabled, 'auto' - enabled if a terminal.", 0) \ M(String, output_format_pretty_grid_charset, "UTF-8", "Charset for printing grid borders. Available charsets: ASCII, UTF-8 (default one).", 0) \ + M(UInt64, output_format_pretty_display_footer_column_names, true, "Display column names in the footer if there are 999 or more rows.", 0) \ + M(UInt64, output_format_pretty_display_footer_column_names_min_rows, 50, "Sets the minimum threshold value of rows for which to enable displaying column names in the footer. 50 (default)", 0) \ M(UInt64, output_format_parquet_row_group_size, 1000000, "Target row group size in rows.", 0) \ M(UInt64, output_format_parquet_row_group_size_bytes, 512 * 1024 * 1024, "Target row group size in bytes, before compression.", 0) \ M(Bool, output_format_parquet_string_as_string, true, "Use Parquet String type instead of Binary for String columns.", 0) \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index e9da55e66c5..e48503bb705 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -75,6 +75,7 @@ namespace SettingsChangesHistory using SettingsChanges = std::vector; } +// clang-format off /// History of settings changes that controls some backward incompatible changes /// across all ClickHouse versions. It maps ClickHouse version to settings changes that were done /// in this version. This history contains both changes to existing settings and newly added settings. @@ -106,6 +107,7 @@ static const std::mapformat_settings_, + std::optional format_settings_, BlockMissingValues * block_missing_values_) : istr(istr_) , header(header_) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index db5c5a37125..90c52d683c2 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -280,6 +281,8 @@ struct ContextSharedPart : boost::noncopyable String default_profile_name; /// Default profile name used for default values. String system_profile_name; /// Profile used by system processes String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying + String merge_workload TSA_GUARDED_BY(mutex); /// Workload setting value that is used by all merges + String mutation_workload TSA_GUARDED_BY(mutex); /// Workload setting value that is used by all mutations std::unique_ptr access_control TSA_GUARDED_BY(mutex); mutable OnceFlag resource_manager_initialized; mutable ResourceManagerPtr resource_manager; @@ -610,6 +613,8 @@ struct ContextSharedPart : boost::noncopyable LOG_TRACE(log, "Shutting down database catalog"); DatabaseCatalog::shutdown(); + NamedCollectionFactory::instance().shutdown(); + delete_async_insert_queue.reset(); SHUTDOWN(log, "merges executor", merge_mutate_executor, wait()); @@ -830,6 +835,7 @@ ContextMutablePtr Context::createGlobal(ContextSharedPart * shared_part) auto res = std::shared_ptr(new Context); res->shared = shared_part; res->query_access_info = std::make_shared(); + res->query_privileges_info = std::make_shared(); return res; } @@ -1422,7 +1428,7 @@ void Context::checkAccess(const AccessFlags & flags, const StorageID & table_id, void Context::checkAccess(const AccessRightsElement & element) const { checkAccessImpl(element); } void Context::checkAccess(const AccessRightsElements & elements) const { checkAccessImpl(elements); } -std::shared_ptr Context::getAccess() const +std::shared_ptr Context::getAccess() const { /// A helper function to collect parameters for calculating access rights, called with Context::getLocalSharedLock() acquired. auto get_params = [this]() @@ -1439,14 +1445,14 @@ std::shared_ptr Context::getAccess() const { SharedLockGuard lock(mutex); if (access && !need_recalculate_access) - return access; /// No need to recalculate access rights. + return std::make_shared(access, shared_from_this()); /// No need to recalculate access rights. params.emplace(get_params()); if (access && (access->getParams() == *params)) { need_recalculate_access = false; - return access; /// No need to recalculate access rights. + return std::make_shared(access, shared_from_this()); /// No need to recalculate access rights. } } @@ -1466,7 +1472,7 @@ std::shared_ptr Context::getAccess() const } } - return res; + return std::make_shared(res, shared_from_this()); } RowPolicyFilterPtr Context::getRowPolicyFilter(const String & database, const String & table_name, RowPolicyFilterType filter_type) const @@ -1558,11 +1564,36 @@ ResourceManagerPtr Context::getResourceManager() const ClassifierPtr Context::getWorkloadClassifier() const { std::lock_guard lock(mutex); + // NOTE: Workload cannot be changed after query start, and getWorkloadClassifier() should not be called before proper `workload` is set if (!classifier) classifier = getResourceManager()->acquire(getSettingsRef().workload); return classifier; } +String Context::getMergeWorkload() const +{ + SharedLockGuard lock(shared->mutex); + return shared->merge_workload; +} + +void Context::setMergeWorkload(const String & value) +{ + std::lock_guard lock(shared->mutex); + shared->merge_workload = value; +} + +String Context::getMutationWorkload() const +{ + SharedLockGuard lock(shared->mutex); + return shared->mutation_workload; +} + +void Context::setMutationWorkload(const String & value) +{ + std::lock_guard lock(shared->mutex); + shared->mutation_workload = value; +} + Scalars Context::getScalars() const { @@ -1827,6 +1858,15 @@ void Context::addQueryFactoriesInfo(QueryLogFactories factory_type, const String } } +void Context::addQueryPrivilegesInfo(const String & privilege, bool granted) const +{ + std::lock_guard lock(query_privileges_info->mutex); + if (granted) + query_privileges_info->used_privileges.emplace(privilege); + else + query_privileges_info->missing_privileges.emplace(privilege); +} + static bool findIdentifier(const ASTFunction * function) { if (!function || !function->arguments) @@ -2508,6 +2548,21 @@ void Context::makeQueryContext() local_read_query_throttler.reset(); local_write_query_throttler.reset(); backups_query_throttler.reset(); + query_privileges_info = std::make_shared(*query_privileges_info); +} + +void Context::makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings) +{ + makeQueryContext(); + classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes + settings.workload = merge_tree_settings.merge_workload.value.empty() ? getMergeWorkload() : merge_tree_settings.merge_workload; +} + +void Context::makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings) +{ + makeQueryContext(); + classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes + settings.workload = merge_tree_settings.mutation_workload.value.empty() ? getMutationWorkload() : merge_tree_settings.mutation_workload; } void Context::makeSessionContext() @@ -3943,7 +3998,6 @@ std::shared_ptr Context::getQueryThreadLog() const std::shared_ptr Context::getQueryViewsLog() const { SharedLockGuard lock(shared->mutex); - if (!shared->system_logs) return {}; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index c8f86b1c2c9..d340fbe7e77 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1,9 +1,8 @@ -#include "Interpreters/AsynchronousInsertQueue.h" -#include "Interpreters/SquashingTransform.h" -#include "Parsers/ASTInsertQuery.h" +#include +#include +#include #include #include -#include #include #include #include @@ -246,7 +245,6 @@ TCPHandler::~TCPHandler() void TCPHandler::runImpl() { setThreadName("TCPHandler"); - ThreadStatus thread_status; extractConnectionSettingsFromContext(server.context()); @@ -886,13 +884,16 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro using PushResult = AsynchronousInsertQueue::PushResult; startInsertQuery(); - SquashingTransform squashing(0, query_context->getSettingsRef().async_insert_max_data_size); + Squashing squashing(state.input_header, 0, query_context->getSettingsRef().async_insert_max_data_size); while (readDataNext()) { - auto result = squashing.add(std::move(state.block_for_insert)); - if (result) + squashing.header = state.block_for_insert; + auto planned_chunk = squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()}); + if (planned_chunk.hasChunkInfo()) { + Chunk result_chunk = DB::Squashing::squash(std::move(planned_chunk)); + auto result = state.block_for_insert.cloneWithColumns(result_chunk.getColumns()); return PushResult { .status = PushResult::TOO_MUCH_DATA, @@ -901,7 +902,12 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro } } - auto result = squashing.add({}); + auto planned_chunk = squashing.flush(); + Chunk result_chunk; + if (planned_chunk.hasChunkInfo()) + result_chunk = DB::Squashing::squash(std::move(planned_chunk)); + + auto result = squashing.header.cloneWithColumns(result_chunk.getColumns()); return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); }