diff --git a/docs/en/sql-reference/functions/other-functions.md b/docs/en/sql-reference/functions/other-functions.md index 2b0215115cb..11ee471d709 100644 --- a/docs/en/sql-reference/functions/other-functions.md +++ b/docs/en/sql-reference/functions/other-functions.md @@ -3301,3 +3301,31 @@ The setting is not enabled by default for security reasons, because some headers HTTP headers are case sensitive for this function. If the function is used in the context of a distributed query, it returns non-empty result only on the initiator node. + +## showCertificate + +Shows information about the current server's Secure Sockets Layer (SSL) certificate if it has been configured. See [Configuring SSL-TLS](https://clickhouse.com/docs/en/guides/sre/configuring-ssl) for more information on how to configure ClickHouse to use OpenSSL certificates to validate connections. + +**Syntax** + +```sql +showCertificate() +``` + +**Returned value** + +- Map of key-value pairs relating to the configured SSL certificate. [Map](../../sql-reference/data-types/map.md)([String](../../sql-reference/data-types/string.md), [String](../../sql-reference/data-types/string.md)). + +**Example** + +Query: + +```sql +SELECT showCertificate() FORMAT LineAsString; +``` + +Result: + +```response +{'version':'1','serial_number':'2D9071D64530052D48308473922C7ADAFA85D6C5','signature_algo':'sha256WithRSAEncryption','issuer':'/CN=marsnet.local CA','not_before':'May 7 17:01:21 2024 GMT','not_after':'May 7 17:01:21 2025 GMT','subject':'/CN=chnode1','pkey_algo':'rsaEncryption'} +``` diff --git a/docs/ru/operations/utilities/backupview.md b/docs/ru/operations/utilities/backupview.md index 702fafadc17..671d41cb016 100644 --- a/docs/ru/operations/utilities/backupview.md +++ b/docs/ru/operations/utilities/backupview.md @@ -1,5 +1,5 @@ --- -slug: /en/operations/utilities/backupview +slug: /ru/operations/utilities/backupview title: clickhouse_backupview --- diff --git a/docs/ru/sql-reference/functions/null-functions.md b/docs/ru/sql-reference/functions/null-functions.md index 7934b0f68c9..9b045d8a97d 100644 --- a/docs/ru/sql-reference/functions/null-functions.md +++ b/docs/ru/sql-reference/functions/null-functions.md @@ -1,5 +1,5 @@ --- -slug: /ru/sql-reference/functions/functions-for-nulls +slug: /ru/sql-reference/functions/null-functions sidebar_position: 63 sidebar_label: "Функции для работы с Nullable-аргументами" --- diff --git a/docs/zh/sql-reference/functions/null-functions.md b/docs/zh/sql-reference/functions/null-functions.md index b3dca3ac549..c721bca458c 100644 --- a/docs/zh/sql-reference/functions/null-functions.md +++ b/docs/zh/sql-reference/functions/null-functions.md @@ -1,5 +1,5 @@ --- -slug: /zh/sql-reference/functions/functions-for-nulls +slug: /zh/sql-reference/functions/null-functions --- # Nullable处理函数 {#nullablechu-li-han-shu} diff --git a/src/AggregateFunctions/AggregateFunctionGroupArray.cpp b/src/AggregateFunctions/AggregateFunctionGroupArray.cpp index 2b1e1a7c339..d4fb7afcb78 100644 --- a/src/AggregateFunctions/AggregateFunctionGroupArray.cpp +++ b/src/AggregateFunctions/AggregateFunctionGroupArray.cpp @@ -753,13 +753,21 @@ size_t getMaxArraySize() return 0xFFFFFF; } +bool hasLimitArraySize() +{ + if (auto context = Context::getGlobalContextInstance()) + return context->getServerSettings().aggregate_function_group_array_has_limit_size; + + return false; +} + template AggregateFunctionPtr createAggregateFunctionGroupArray( const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *) { assertUnary(name, argument_types); - bool limit_size = false; + bool limit_size = hasLimitArraySize(); UInt64 max_elems = getMaxArraySize(); if (parameters.empty()) diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index 487271a2cf9..0188ebc8173 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -28,7 +28,7 @@ namespace ErrorCodes extern const int USER_SESSION_LIMIT_EXCEEDED; } -static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion) +static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion, UInt64 server_revision) { /// NOTE: Once you will update the completion list, /// do not forget to update 01676_clickhouse_client_autocomplete.sh @@ -60,7 +60,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti add_column("name", "data_type_families", false, {}); add_column("name", "merge_tree_settings", false, {}); add_column("name", "settings", false, {}); - add_column("keyword", "keywords", false, {}); + + if (server_revision >= DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE) + add_column("keyword", "keywords", false, {}); if (!basic_suggestion) { @@ -101,7 +103,11 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p auto connection = ConnectionType::createConnection(connection_parameters, my_context); fetch(*connection, connection_parameters.timeouts, - getLoadSuggestionQuery(suggestion_limit, std::is_same_v), + getLoadSuggestionQuery( + suggestion_limit, + std::is_same_v, + connection->getServerRevision(connection_parameters.timeouts) + ), my_context->getClientInfo()); } catch (const Exception & e) @@ -146,7 +152,7 @@ void Suggest::load(IServerConnection & connection, { try { - fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true), client_info); + fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true, connection.getServerRevision(timeouts)), client_info); } catch (...) { diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index ed0b29c7b44..d3525010419 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -625,6 +625,8 @@ The server successfully detected this situation and will download merged part fr M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\ M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\ M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\ + M(S3QueueFailedFiles, "Number of files which failed to be processed")\ + M(S3QueueProcessedFiles, "Number of files which were processed")\ M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\ M(S3QueuePullMicroseconds, "Time spent to read file data")\ M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\ diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 159a4c28b6d..7e6893c6d85 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -79,6 +79,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466; /// Send read-only flag for Replicated tables as well static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467; +static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468; + /// Version of ClickHouse TCP protocol. /// /// Should be incremented manually on protocol changes. @@ -86,6 +88,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467; /// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION, /// later is just a number for server version (one number instead of commit SHA) /// for simplicity (sometimes it may be more convenient in some use cases). -static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54467; +static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54468; } diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index f41c596282f..db0a097a813 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -50,6 +50,7 @@ namespace DB M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \ M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \ + M(Bool, aggregate_function_group_array_has_limit_size, false, "When the max array element size is exceeded, a `Too large array size` exception will be thrown by default. When set to true, no exception will be thrown, and the excess elements will be discarded.", 0) \ M(UInt64, max_server_memory_usage, 0, "Maximum total memory usage of the server in bytes. Zero means unlimited.", 0) \ M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to RAM ratio. Allows to lower max memory on low-memory systems.", 0) \ M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Maximum total memory usage for merges and mutations in bytes. Zero means unlimited.", 0) \ @@ -57,7 +58,7 @@ namespace DB M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \ M(UInt64, cgroups_memory_usage_observer_wait_time, 15, "Polling interval in seconds to read the current memory usage from cgroups. Zero means disabled.", 0) \ M(Double, cgroup_memory_watcher_hard_limit_ratio, 0.95, "Hard memory limit ratio for cgroup memory usage observer", 0) \ - M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Sort memory limit ratio limit for cgroup memory usage observer", 0) \ + M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Soft memory limit ratio limit for cgroup memory usage observer", 0) \ M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \ M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index ece48620618..5f3e9ffb611 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -92,7 +92,7 @@ static std::map sett {"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."}, {"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."}, {"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"}, - {"output_format_pretty_preserve_border_for_multiline_string", 1, 1, "Applies better rendering for multiline strings."}, + {"output_format_pretty_preserve_border_for_multiline_string", 0, 1, "Applies better rendering for multiline strings."}, }}, {"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"}, {"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"}, diff --git a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp index 264ec2b258e..761ff24e648 100644 --- a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp @@ -139,7 +139,11 @@ namespace S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context) { String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint")); - S3::URI uri(endpoint); + String endpoint_subpath; + if (config.has(config_prefix + ".endpoint_subpath")) + endpoint_subpath = context->getMacros()->expand(config.getString(config_prefix + ".endpoint_subpath")); + + S3::URI uri(fs::path(endpoint) / endpoint_subpath); /// An empty key remains empty. if (!uri.key.empty() && !uri.key.ends_with('/')) diff --git a/src/Functions/FunctionsConversion.cpp b/src/Functions/FunctionsConversion.cpp index c727ce663b3..beb7e6feb47 100644 --- a/src/Functions/FunctionsConversion.cpp +++ b/src/Functions/FunctionsConversion.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -3770,6 +3771,12 @@ private: } else if (const auto * from_array = typeid_cast(from_type_untyped.get())) { + if (typeid_cast(from_array->getNestedType().get())) + return [nested = to_type->getNestedType()](ColumnsWithTypeAndName &, const DataTypePtr &, const ColumnNullable *, size_t size) + { + return ColumnMap::create(nested->createColumnConstWithDefaultValue(size)->convertToFullColumnIfConst()); + }; + const auto * nested_tuple = typeid_cast(from_array->getNestedType().get()); if (!nested_tuple || nested_tuple->getElements().size() != 2) throw Exception( diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index 604344df177..fe123d7fa0b 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -505,7 +505,7 @@ ASTPtr InterpreterCreateQuery::formatProjections(const ProjectionsDescription & } ColumnsDescription InterpreterCreateQuery::getColumnsDescription( - const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode) + const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode, bool is_restore_from_backup) { /// First, deduce implicit types. @@ -514,7 +514,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription( ASTPtr default_expr_list = std::make_shared(); NamesAndTypesList column_names_and_types; - bool make_columns_nullable = mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().data_type_default_nullable; + bool make_columns_nullable = mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().data_type_default_nullable; bool has_columns_with_default_without_type = false; for (const auto & ast : columns_ast.children) @@ -694,7 +694,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription( res.add(std::move(column)); } - if (mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().flatten_nested) + if (mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().flatten_nested) res.flattenNested(); @@ -739,7 +739,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti if (create.columns_list->columns) { - properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode); + properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode, is_restore_from_backup); } if (create.columns_list->indices) diff --git a/src/Interpreters/InterpreterCreateQuery.h b/src/Interpreters/InterpreterCreateQuery.h index 71bdeda05df..be4a10eaf1d 100644 --- a/src/Interpreters/InterpreterCreateQuery.h +++ b/src/Interpreters/InterpreterCreateQuery.h @@ -74,7 +74,7 @@ public: /// Obtain information about columns, their types, default values and column comments, /// for case when columns in CREATE query is specified explicitly. - static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode); + static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode, bool is_restore_from_backup = false); static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints); static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index c6df06cfac6..b374175d466 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -700,8 +700,10 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed) /// We need to check type of masks before `addConditionJoinColumn`, because it assumes that types is correct JoinCommon::checkTypesOfMasks(block, mask_column_name_left, right_sample_block, mask_column_name_right); - /// Add auxiliary column, will be removed after joining - addConditionJoinColumn(block, JoinTableSide::Left); + if (!not_processed) + /// Add an auxiliary column, which will be removed after joining + /// We do not need to add it twice when we are continuing to process the block from the previous iteration + addConditionJoinColumn(block, JoinTableSide::Left); /// Types of keys can be checked only after `checkTypesOfKeys` JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right); diff --git a/src/Parsers/ParserInsertQuery.cpp b/src/Parsers/ParserInsertQuery.cpp index 0bbb181b39c..04759f80388 100644 --- a/src/Parsers/ParserInsertQuery.cpp +++ b/src/Parsers/ParserInsertQuery.cpp @@ -107,6 +107,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (!columns_p.parse(pos, columns, expected)) return false; + /// Optional trailing comma + ParserToken(TokenType::Comma).ignore(pos); + if (!s_rparen.ignore(pos, expected)) return false; } diff --git a/src/Parsers/TokenIterator.cpp b/src/Parsers/TokenIterator.cpp index fa792e7c8b5..08877e0b2fe 100644 --- a/src/Parsers/TokenIterator.cpp +++ b/src/Parsers/TokenIterator.cpp @@ -4,20 +4,6 @@ namespace DB { -Tokens::Tokens(const char * begin, const char * end, size_t max_query_size, bool skip_insignificant) -{ - Lexer lexer(begin, end, max_query_size); - - bool stop = false; - do - { - Token token = lexer.nextToken(); - stop = token.isEnd() || token.type == TokenType::ErrorMaxQuerySizeExceeded; - if (token.isSignificant() || (!skip_insignificant && !data.empty() && data.back().isSignificant())) - data.emplace_back(std::move(token)); - } while (!stop); -} - UnmatchedParentheses checkUnmatchedParentheses(TokenIterator begin) { /// We have just two kind of parentheses: () and []. diff --git a/src/Parsers/TokenIterator.h b/src/Parsers/TokenIterator.h index 192f2f55e6a..207ddadb8bf 100644 --- a/src/Parsers/TokenIterator.h +++ b/src/Parsers/TokenIterator.h @@ -15,25 +15,44 @@ namespace DB */ /** Used as an input for parsers. - * All whitespace and comment tokens are transparently skipped. + * All whitespace and comment tokens are transparently skipped if `skip_insignificant`. */ class Tokens { private: std::vector data; - std::size_t last_accessed_index = 0; + Lexer lexer; + bool skip_insignificant; public: - Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant = true); - - ALWAYS_INLINE inline const Token & operator[](size_t index) + Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant_ = true) + : lexer(begin, end, max_query_size), skip_insignificant(skip_insignificant_) { - assert(index < data.size()); - last_accessed_index = std::max(last_accessed_index, index); - return data[index]; } - ALWAYS_INLINE inline const Token & max() { return data[last_accessed_index]; } + const Token & operator[] (size_t index) + { + while (true) + { + if (index < data.size()) + return data[index]; + + if (!data.empty() && data.back().isEnd()) + return data.back(); + + Token token = lexer.nextToken(); + + if (!skip_insignificant || token.isSignificant()) + data.emplace_back(token); + } + } + + const Token & max() + { + if (data.empty()) + return (*this)[0]; + return data.back(); + } }; diff --git a/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp b/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp index 4fc62afa125..a5cbb85eecd 100644 --- a/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp @@ -13,10 +13,14 @@ namespace ErrorCodes { extern const int INCORRECT_DATA; extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } -RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp(format_settings.regexp.regexp), skip_unmatched(format_settings.regexp.skip_unmatched) +RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp_str(format_settings.regexp.regexp), regexp(regexp_str), skip_unmatched(format_settings.regexp.skip_unmatched) { + if (regexp_str.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The regular expression is not set for the `Regexp` format. It requires setting the value of the `format_regexp` setting."); + size_t fields_count = regexp.NumberOfCapturingGroups(); matched_fields.resize(fields_count); re2_arguments.resize(fields_count); @@ -58,8 +62,8 @@ bool RegexpFieldExtractor::parseRow(PeekableReadBuffer & buf) static_cast(re2_arguments_ptrs.size())); if (!match && !skip_unmatched) - throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp.", - std::string(buf.position(), line_to_match)); + throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp: `{}`", + std::string(buf.position(), line_to_match), regexp_str); buf.position() += line_size; if (!buf.eof() && !checkChar('\n', buf)) diff --git a/src/Processors/Formats/Impl/RegexpRowInputFormat.h b/src/Processors/Formats/Impl/RegexpRowInputFormat.h index 7612228f8c4..8016593691f 100644 --- a/src/Processors/Formats/Impl/RegexpRowInputFormat.h +++ b/src/Processors/Formats/Impl/RegexpRowInputFormat.h @@ -31,6 +31,7 @@ public: size_t getNumberOfGroups() const { return regexp.NumberOfCapturingGroups(); } private: + String regexp_str; const re2::RE2 regexp; // The vector of fields extracted from line using regexp. std::vector matched_fields; diff --git a/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp b/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp index 353de76eea8..1493779ec2d 100644 --- a/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp @@ -572,9 +572,16 @@ bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx) skipWhitespaceIfAny(*buf); if (likely(column_idx + 1 != num_columns)) + { return checkChar(',', *buf); + } else + { + /// Optional trailing comma. + if (checkChar(',', *buf)) + skipWhitespaceIfAny(*buf); return checkChar(')', *buf); + } } bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx) diff --git a/src/Processors/QueryPlan/Optimizations/removeRedundantDistinct.cpp b/src/Processors/QueryPlan/Optimizations/removeRedundantDistinct.cpp index 232d3118612..8b92cc45cee 100644 --- a/src/Processors/QueryPlan/Optimizations/removeRedundantDistinct.cpp +++ b/src/Processors/QueryPlan/Optimizations/removeRedundantDistinct.cpp @@ -64,36 +64,61 @@ namespace return non_const_columns; } + /// build actions DAG from stack of steps + ActionsDAGPtr buildActionsForPlanPath(std::vector & dag_stack) + { + if (dag_stack.empty()) + return nullptr; + + ActionsDAGPtr path_actions = dag_stack.back()->clone(); + dag_stack.pop_back(); + while (!dag_stack.empty()) + { + ActionsDAGPtr clone = dag_stack.back()->clone(); + logActionsDAG("DAG to merge", clone); + dag_stack.pop_back(); + path_actions->mergeInplace(std::move(*clone)); + } + return path_actions; + } + bool compareAggregationKeysWithDistinctColumns( - const Names & aggregation_keys, const DistinctColumns & distinct_columns, const ActionsDAGPtr & path_actions) + const Names & aggregation_keys, const DistinctColumns & distinct_columns, std::vector> actions_chain) { logDebug("aggregation_keys", aggregation_keys); logDebug("aggregation_keys size", aggregation_keys.size()); logDebug("distinct_columns size", distinct_columns.size()); - std::set original_distinct_columns; - FindOriginalNodeForOutputName original_node_finder(path_actions); - for (const auto & column : distinct_columns) + std::set current_columns(begin(distinct_columns), end(distinct_columns)); + std::set source_columns; + for (auto & actions : actions_chain) { - logDebug("distinct column name", column); - const auto * alias_node = original_node_finder.find(String(column)); - if (!alias_node) + FindOriginalNodeForOutputName original_node_finder(buildActionsForPlanPath(actions)); + for (const auto & column : current_columns) { - logDebug("original name for alias is not found", column); - original_distinct_columns.insert(column); - } - else - { - logDebug("alias result name", alias_node->result_name); - original_distinct_columns.insert(alias_node->result_name); + logDebug("distinct column name", column); + const auto * alias_node = original_node_finder.find(String(column)); + if (!alias_node) + { + logDebug("original name for alias is not found", column); + source_columns.insert(String(column)); + } + else + { + logDebug("alias result name", alias_node->result_name); + source_columns.insert(alias_node->result_name); + } } + + current_columns = std::move(source_columns); + source_columns.clear(); } /// if aggregation keys are part of distinct columns then rows already distinct for (const auto & key : aggregation_keys) { - if (!original_distinct_columns.contains(key)) + if (!current_columns.contains(key)) { - logDebug("aggregation key NOT found: {}", key); + logDebug("aggregation key NOT found", key); return false; } } @@ -122,30 +147,13 @@ namespace return false; } - /// build actions DAG from stack of steps - ActionsDAGPtr buildActionsForPlanPath(std::vector & dag_stack) - { - if (dag_stack.empty()) - return nullptr; - - ActionsDAGPtr path_actions = dag_stack.back()->clone(); - dag_stack.pop_back(); - while (!dag_stack.empty()) - { - ActionsDAGPtr clone = dag_stack.back()->clone(); - logActionsDAG("DAG to merge", clone); - dag_stack.pop_back(); - path_actions->mergeInplace(std::move(*clone)); - } - return path_actions; - } - bool passTillAggregation(const QueryPlan::Node * distinct_node) { const DistinctStep * distinct_step = typeid_cast(distinct_node->step.get()); chassert(distinct_step); std::vector dag_stack; + std::vector> actions_chain; const DistinctStep * inner_distinct_step = nullptr; const IQueryPlanStep * aggregation_before_distinct = nullptr; const QueryPlan::Node * node = distinct_node; @@ -163,6 +171,12 @@ namespace break; } + if (typeid_cast(current_step)) + { + actions_chain.push_back(std::move(dag_stack)); + dag_stack.clear(); + } + if (const auto * const expr = typeid_cast(current_step); expr) dag_stack.push_back(expr->getExpression()); else if (const auto * const filter = typeid_cast(current_step); filter) @@ -177,16 +191,22 @@ namespace if (aggregation_before_distinct) { - ActionsDAGPtr actions = buildActionsForPlanPath(dag_stack); - logActionsDAG("aggregation pass: merged DAG", actions); + if (actions_chain.empty()) + actions_chain.push_back(std::move(dag_stack)); const auto distinct_columns = getDistinctColumns(distinct_step); if (const auto * aggregating_step = typeid_cast(aggregation_before_distinct); aggregating_step) - return compareAggregationKeysWithDistinctColumns(aggregating_step->getParams().keys, distinct_columns, actions); + { + return compareAggregationKeysWithDistinctColumns( + aggregating_step->getParams().keys, distinct_columns, std::move(actions_chain)); + } else if (const auto * merging_aggregated_step = typeid_cast(aggregation_before_distinct); merging_aggregated_step) - return compareAggregationKeysWithDistinctColumns(merging_aggregated_step->getParams().keys, distinct_columns, actions); + { + return compareAggregationKeysWithDistinctColumns( + merging_aggregated_step->getParams().keys, distinct_columns, std::move(actions_chain)); + } } return false; diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index e902c60f780..e1583b8329c 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -24,6 +24,8 @@ namespace ProfileEvents extern const Event S3QueueSetFileProcessingMicroseconds; extern const Event S3QueueSetFileProcessedMicroseconds; extern const Event S3QueueSetFileFailedMicroseconds; + extern const Event S3QueueFailedFiles; + extern const Event S3QueueProcessedFiles; extern const Event S3QueueCleanupMaxSetSizeOrTTLMicroseconds; extern const Event S3QueueLockLocalFileStatusesMicroseconds; extern const Event CannotRemoveEphemeralNode; @@ -138,7 +140,7 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con , zookeeper_failed_path(zookeeper_path_ / "failed") , zookeeper_shards_path(zookeeper_path_ / "shards") , zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock") - , log(getLogger("S3QueueFilesMetadata")) + , log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")")) { if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec)) { @@ -472,7 +474,7 @@ S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & if (code == Coordination::Error::ZOK) { auto holder = std::make_unique( - node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); + node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } @@ -558,7 +560,7 @@ S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & p if (code == Coordination::Error::ZOK) { auto holder = std::make_unique( - node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); + node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log); LOG_TEST(log, "File {} is ready to be processed", path); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; @@ -609,6 +611,8 @@ void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder) break; } } + + ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles); } void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder) @@ -626,7 +630,7 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder Coordination::Responses responses; if (holder->remove(&requests, &responses)) { - LOG_TEST(log, "Moved file `{}` to processed", path); + LOG_TRACE(log, "Moved file `{}` to processed", path); if (max_loading_retries) zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); return; @@ -663,7 +667,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); - LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path); + LOG_TRACE(log, "Setting file `{}` as processed (at {})", path, processed_node_path); while (true) { std::string res; @@ -693,7 +697,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( { if (holder->remove(&requests, &responses)) { - LOG_TEST(log, "Moved file `{}` to processed", path); + LOG_TRACE(log, "Moved file `{}` to processed", path); if (max_loading_retries) zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); return; @@ -704,7 +708,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( auto code = zk_client->tryMulti(requests, responses); if (code == Coordination::Error::ZOK) { - LOG_TEST(log, "Moved file `{}` to processed", path); + LOG_TRACE(log, "Moved file `{}` to processed", path); return; } } @@ -712,7 +716,8 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( /// Failed to update max processed node, retry. if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) { - LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error)); + LOG_TRACE(log, "Failed to update processed node for path {} ({}). Will retry.", + path, magic_enum::enum_name(responses[0]->error)); continue; } @@ -752,6 +757,8 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); } + ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles); + SCOPE_EXIT({ file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get()); timer.cancel(); @@ -774,7 +781,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S Coordination::Responses responses; if (holder->remove(&requests, &responses)) { - LOG_TEST(log, "File `{}` failed to process and will not be retried. " + LOG_TRACE(log, "File `{}` failed to process and will not be retried. " "Error: {}", path, exception_message); return; } @@ -788,6 +795,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S LOG_WARNING(log, "Cannot set file ({}) as processed since processing node " "does not exist with expected processing id does not exist, " "this could be a result of expired zookeeper session", path); + return; } @@ -812,7 +820,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S file_status->retries = node_metadata.retries; } - LOG_TEST(log, "File `{}` failed to process, try {}/{} (Error: {})", + LOG_TRACE(log, "File `{}` failed to process, try {}/{} (Error: {})", path, node_metadata.retries, max_loading_retries, exception_message); /// Check if file can be retried further or not. @@ -868,13 +876,14 @@ S3QueueFilesMetadata::ProcessingNodeHolder::ProcessingNodeHolder( const std::string & path_, const std::string & zk_node_path_, FileStatusPtr file_status_, - zkutil::ZooKeeperPtr zk_client_) + zkutil::ZooKeeperPtr zk_client_, + LoggerPtr logger_) : zk_client(zk_client_) , file_status(file_status_) , path(path_) , zk_node_path(zk_node_path_) , processing_id(processing_id_) - , log(getLogger("ProcessingNodeHolder")) + , log(logger_) { } @@ -939,7 +948,7 @@ bool S3QueueFilesMetadata::ProcessingNodeHolder::remove(Coordination::Requests * catch (...) { ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode); - DB::tryLogCurrentException(__PRETTY_FUNCTION__, "Cannot remove " + zk_node_path); + LOG_ERROR(log, "Failed to remove processing node for file {}: {}", path, getCurrentExceptionMessage(true)); } return false; } @@ -958,7 +967,7 @@ void S3QueueFilesMetadata::cleanupThreadFunc() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_ERROR(log, "Failed to cleanup nodes in zookeeper: {}", getCurrentExceptionMessage(true)); } if (shutdown) @@ -970,38 +979,52 @@ void S3QueueFilesMetadata::cleanupThreadFunc() void S3QueueFilesMetadata::cleanupThreadFuncImpl() { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds); - - chassert(max_set_size || max_set_age_sec); - - const bool check_nodes_limit = max_set_size > 0; - const bool check_nodes_ttl = max_set_age_sec > 0; - const auto zk_client = getZooKeeper(); - Strings nodes; - auto code = zk_client->tryGetChildren(zookeeper_processed_path, nodes); + + Strings processed_nodes; + auto code = zk_client->tryGetChildren(zookeeper_processed_path, processed_nodes); if (code != Coordination::Error::ZOK) { if (code == Coordination::Error::ZNONODE) { - LOG_TEST(log, "A `processed` not is not yet created"); - return; + LOG_TEST(log, "Path {} does not exist", zookeeper_processed_path.string()); } - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); } - if (nodes.empty()) + Strings failed_nodes; + code = zk_client->tryGetChildren(zookeeper_failed_path, failed_nodes); + if (code != Coordination::Error::ZOK) { - LOG_TEST(log, "A set of nodes is empty"); + if (code == Coordination::Error::ZNONODE) + { + LOG_TEST(log, "Path {} does not exist", zookeeper_failed_path.string()); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); + } + + const size_t nodes_num = processed_nodes.size() + failed_nodes.size(); + if (!nodes_num) + { + LOG_TEST(log, "There are neither processed nor failed nodes"); return; } - const bool nodes_limit_exceeded = nodes.size() > max_set_size; - if (!nodes_limit_exceeded && check_nodes_limit && !check_nodes_ttl) + chassert(max_set_size || max_set_age_sec); + const bool check_nodes_limit = max_set_size > 0; + const bool check_nodes_ttl = max_set_age_sec > 0; + + const bool nodes_limit_exceeded = nodes_num > max_set_size; + if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl) { LOG_TEST(log, "No limit exceeded"); return; } + LOG_TRACE(log, "Will check limits for {} nodes", nodes_num); + /// Create a lock so that with distributed processing /// multiple nodes do not execute cleanup in parallel. auto ephemeral_node = zkutil::EphemeralNodeHolder::tryCreate(zookeeper_cleanup_lock_path, *zk_client, toString(getCurrentTime())); @@ -1014,7 +1037,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() struct Node { - std::string name; + std::string zk_path; NodeMetadata metadata; }; auto node_cmp = [](const Node & a, const Node & b) @@ -1026,24 +1049,57 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() /// Ordered in ascending order of timestamps. std::set sorted_nodes(node_cmp); - LOG_TRACE(log, "Found {} nodes", nodes.size()); - - for (const auto & node : nodes) + for (const auto & node : processed_nodes) { + const std::string path = zookeeper_processed_path / node; try { std::string metadata_str; - if (zk_client->tryGet(zookeeper_processed_path / node, metadata_str)) + if (zk_client->tryGet(path, metadata_str)) { - sorted_nodes.emplace(node, NodeMetadata::fromString(metadata_str)); - LOG_TEST(log, "Fetched metadata for node {}", node); + sorted_nodes.emplace(path, NodeMetadata::fromString(metadata_str)); + LOG_TEST(log, "Fetched metadata for node {}", path); } else - LOG_TEST(log, "Failed to fetch node metadata {}", node); + LOG_ERROR(log, "Failed to fetch node metadata {}", path); } - catch (...) + catch (const zkutil::KeeperException & e) { - tryLogCurrentException(__PRETTY_FUNCTION__); + if (e.code != Coordination::Error::ZCONNECTIONLOSS) + { + LOG_WARNING(log, "Unexpected exception: {}", getCurrentExceptionMessage(true)); + chassert(false); + } + + /// Will retry with a new zk connection. + throw; + } + } + + for (const auto & node : failed_nodes) + { + const std::string path = zookeeper_failed_path / node; + try + { + std::string metadata_str; + if (zk_client->tryGet(path, metadata_str)) + { + sorted_nodes.emplace(path, NodeMetadata::fromString(metadata_str)); + LOG_TEST(log, "Fetched metadata for node {}", path); + } + else + LOG_ERROR(log, "Failed to fetch node metadata {}", path); + } + catch (const zkutil::KeeperException & e) + { + if (e.code != Coordination::Error::ZCONNECTIONLOSS) + { + LOG_WARNING(log, "Unexpected exception: {}", getCurrentExceptionMessage(true)); + chassert(false); + } + + /// Will retry with a new zk connection. + throw; } } @@ -1056,37 +1112,35 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() }; LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", max_set_size, max_set_age_sec, get_nodes_str()); - size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes.size() - max_set_size : 0; + size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - max_set_size : 0; for (const auto & node : sorted_nodes) { if (nodes_to_remove) { - auto path = zookeeper_processed_path / node.name; - LOG_TEST(log, "Removing node at path {} ({}) because max files limit is reached", - node.metadata.file_path, path.string()); + LOG_TRACE(log, "Removing node at path {} ({}) because max files limit is reached", + node.metadata.file_path, node.zk_path); local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); - code = zk_client->tryRemove(path); + code = zk_client->tryRemove(node.zk_path); if (code == Coordination::Error::ZOK) --nodes_to_remove; else - LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code); + LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code); } else if (check_nodes_ttl) { UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp; if (node_age >= max_set_age_sec) { - auto path = zookeeper_processed_path / node.name; - LOG_TEST(log, "Removing node at path {} ({}) because file is reached", - node.metadata.file_path, path.string()); + LOG_TRACE(log, "Removing node at path {} ({}) because file is reached", + node.metadata.file_path, node.zk_path); local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); - code = zk_client->tryRemove(path); + code = zk_client->tryRemove(node.zk_path); if (code != Coordination::Error::ZOK) - LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code); + LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code); } else if (!nodes_to_remove) { diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index 8ea9d8c3633..e26af1d25c5 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -193,7 +193,8 @@ public: const std::string & path_, const std::string & zk_node_path_, FileStatusPtr file_status_, - zkutil::ZooKeeperPtr zk_client_); + zkutil::ZooKeeperPtr zk_client_, + LoggerPtr logger_); ~ProcessingNodeHolder(); diff --git a/src/Storages/S3Queue/S3QueueMetadataFactory.cpp b/src/Storages/S3Queue/S3QueueMetadataFactory.cpp index 92cdab6355d..0c3c26adfe0 100644 --- a/src/Storages/S3Queue/S3QueueMetadataFactory.cpp +++ b/src/Storages/S3Queue/S3QueueMetadataFactory.cpp @@ -21,7 +21,8 @@ S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3 auto it = metadata_by_path.find(zookeeper_path); if (it == metadata_by_path.end()) { - it = metadata_by_path.emplace(zookeeper_path, std::make_shared(fs::path(zookeeper_path), settings)).first; + auto files_metadata = std::make_shared(zookeeper_path, settings); + it = metadata_by_path.emplace(zookeeper_path, std::move(files_metadata)).first; } else if (it->second.metadata->checkSettings(settings)) { diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 19c69d5c589..b5bee2cc8da 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -44,11 +44,12 @@ StorageS3QueueSource::FileIterator::FileIterator( std::shared_ptr metadata_, std::unique_ptr glob_iterator_, size_t current_shard_, - std::atomic & shutdown_called_) + std::atomic & shutdown_called_, + LoggerPtr logger_) : metadata(metadata_) , glob_iterator(std::move(glob_iterator_)) , shutdown_called(shutdown_called_) - , log(&Poco::Logger::get("StorageS3QueueSource")) + , log(logger_) , sharded_processing(metadata->isShardedProcessing()) , current_shard(current_shard_) { @@ -237,7 +238,8 @@ Chunk StorageS3QueueSource::generate() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_ERROR(log, "Failed to set file {} as failed: {}", + key_with_info->key, getCurrentExceptionMessage(true)); } appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false); @@ -263,7 +265,8 @@ Chunk StorageS3QueueSource::generate() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_ERROR(log, "Failed to set file {} as failed: {}", + key_with_info->key, getCurrentExceptionMessage(true)); } appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false); diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index 8fc7305ea08..a657459ed9d 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -42,7 +42,8 @@ public: std::shared_ptr metadata_, std::unique_ptr glob_iterator_, size_t current_shard_, - std::atomic & shutdown_called_); + std::atomic & shutdown_called_, + LoggerPtr logger_); /// Note: /// List results in s3 are always returned in UTF-8 binary order. @@ -56,7 +57,7 @@ public: const std::unique_ptr glob_iterator; std::atomic & shutdown_called; std::mutex mutex; - Poco::Logger * log; + LoggerPtr log; const bool sharded_processing; const size_t current_shard; diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index c267e3f1f25..90751ece767 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -116,7 +116,7 @@ StorageS3Queue::StorageS3Queue( , configuration{configuration_} , format_settings(format_settings_) , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) - , log(getLogger("StorageS3Queue (" + table_id_.table_name + ")")) + , log(getLogger("StorageS3Queue (" + table_id_.getFullTableName() + ")")) { if (configuration.url.key.empty()) { @@ -170,14 +170,7 @@ StorageS3Queue::StorageS3Queue( LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); - try - { - createOrCheckMetadata(storage_metadata); - } - catch (...) - { - throw; - } + createOrCheckMetadata(storage_metadata); /// Get metadata manager from S3QueueMetadataFactory, /// it will increase the ref count for the metadata object. @@ -469,7 +462,7 @@ void StorageS3Queue::threadFunc() } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_ERROR(log, "Failed to process data: {}", getCurrentExceptionMessage(true)); } if (!shutdown_called) @@ -562,17 +555,21 @@ void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & stora requests.emplace_back(zkutil::makeCreateRequest(zk_path / "metadata", metadata, zkutil::CreateMode::Persistent)); } - Coordination::Responses responses; - auto code = zookeeper->tryMulti(requests, responses); - if (code == Coordination::Error::ZNODEEXISTS) + if (!requests.empty()) { - LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string()); - continue; - } - else if (code != Coordination::Error::ZOK) - { - zkutil::KeeperMultiException::check(code, requests, responses); + Coordination::Responses responses; + auto code = zookeeper->tryMulti(requests, responses); + if (code == Coordination::Error::ZNODEEXISTS) + { + LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string()); + continue; + } + else if (code != Coordination::Error::ZOK) + { + zkutil::KeeperMultiException::check(code, requests, responses); + } } + return; } @@ -614,7 +611,8 @@ std::shared_ptr StorageS3Queue::createFileIterator *configuration.client, configuration.url, predicate, getVirtualsList(), local_context, /* read_keys */nullptr, configuration.request_settings); - return std::make_shared(files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called); + return std::make_shared( + files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called, log); } void registerStorageS3Queue(StorageFactory & factory) diff --git a/tests/ci/ci.py b/tests/ci/ci.py index 6f72d6b43c2..08048564383 100644 --- a/tests/ci/ci.py +++ b/tests/ci/ci.py @@ -1806,7 +1806,7 @@ def _upload_build_profile_data( address, size, type, - symbol, + symbol ) SELECT {pr_info.number}, '{pr_info.sha}', '{job_report.start_time}', '{build_name}', '{instance_type}', '{instance_id}', file, reinterpretAsUInt64(reverse(unhex(address))), reinterpretAsUInt64(reverse(unhex(size))), type, symbol diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 693e41253cb..c2bea3060aa 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -4294,6 +4294,9 @@ class ClickHouseInstance: ) return xml_str + def get_machine_name(self): + return platform.machine() + @property def odbc_drivers(self): if self.odbc_ini_path: @@ -4301,12 +4304,12 @@ class ClickHouseInstance: "SQLite3": { "DSN": "sqlite3_odbc", "Database": "/tmp/sqliteodbc", - "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so", - "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so", + "Driver": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libsqlite3odbc.so", + "Setup": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libsqlite3odbc.so", }, "MySQL": { "DSN": "mysql_odbc", - "Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so", + "Driver": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libmyodbc.so", "Database": odbc_mysql_db, "Uid": odbc_mysql_uid, "Pwd": odbc_mysql_pass, @@ -4323,8 +4326,8 @@ class ClickHouseInstance: "ReadOnly": "No", "RowVersioning": "No", "ShowSystemTables": "No", - "Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so", - "Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so", + "Driver": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/psqlodbca.so", + "Setup": f"/usr/lib/{self.get_machine_name()}-linux-gnu/odbc/libodbcpsqlS.so", "ConnSettings": "", }, } diff --git a/tests/integration/test_backward_compatibility/test_short_strings_aggregation.py b/tests/integration/test_backward_compatibility/test_short_strings_aggregation.py index 60375196366..ad573c7ffe0 100644 --- a/tests/integration/test_backward_compatibility/test_short_strings_aggregation.py +++ b/tests/integration/test_backward_compatibility/test_short_strings_aggregation.py @@ -1,13 +1,14 @@ import pytest -from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION +from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION, is_arm +# For arm version see https://github.com/ClickHouse/ClickHouse/pull/59132 cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", with_zookeeper=False, image="clickhouse/clickhouse-server", - tag=CLICKHOUSE_CI_MIN_TESTED_VERSION, + tag="24.1" if is_arm() else CLICKHOUSE_CI_MIN_TESTED_VERSION, stay_alive=True, with_installed_binary=True, ) @@ -15,7 +16,7 @@ node2 = cluster.add_instance( "node2", with_zookeeper=False, image="clickhouse/clickhouse-server", - tag=CLICKHOUSE_CI_MIN_TESTED_VERSION, + tag="24.1" if is_arm() else CLICKHOUSE_CI_MIN_TESTED_VERSION, stay_alive=True, with_installed_binary=True, ) diff --git a/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so b/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so deleted file mode 100755 index 388d9f887b4..00000000000 Binary files a/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so and /dev/null differ diff --git a/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so_aarch64 b/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so_aarch64 new file mode 100644 index 00000000000..d4102ba7dfb Binary files /dev/null and b/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so_aarch64 differ diff --git a/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so_x86_64 b/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so_x86_64 new file mode 100644 index 00000000000..3f251e595b8 Binary files /dev/null and b/tests/integration/test_catboost_evaluate/model/libcatboostmodel.so_x86_64 differ diff --git a/tests/integration/test_catboost_evaluate/test.py b/tests/integration/test_catboost_evaluate/test.py index bf4f9f85cac..22c24aa6ea8 100644 --- a/tests/integration/test_catboost_evaluate/test.py +++ b/tests/integration/test_catboost_evaluate/test.py @@ -23,13 +23,23 @@ def ch_cluster(): try: cluster.start() - os.system( - "docker cp {local} {cont_id}:{dist}".format( - local=os.path.join(SCRIPT_DIR, "model/."), - cont_id=instance.docker_id, - dist="/etc/clickhouse-server/model", + instance.exec_in_container(["mkdir", f"/etc/clickhouse-server/model/"]) + + machine = instance.get_machine_name() + for source_name in os.listdir(os.path.join(SCRIPT_DIR, "model/.")): + dest_name = source_name + if machine in source_name: + machine_suffix = "_" + machine + dest_name = source_name[: -len(machine_suffix)] + + os.system( + "docker cp {local} {cont_id}:{dist}".format( + local=os.path.join(SCRIPT_DIR, f"model/{source_name}"), + cont_id=instance.docker_id, + dist=f"/etc/clickhouse-server/model/{dest_name}", + ) ) - ) + instance.restart_clickhouse() yield cluster diff --git a/tests/integration/test_crash_log/test.py b/tests/integration/test_crash_log/test.py index a5b82039a84..fe24777de94 100644 --- a/tests/integration/test_crash_log/test.py +++ b/tests/integration/test_crash_log/test.py @@ -39,6 +39,10 @@ def wait_for_clickhouse_stop(started_node): assert result == "OK", "ClickHouse process is still running" +@pytest.mark.skipif( + helpers.cluster.is_arm(), + reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855", +) def test_pkill(started_node): if ( started_node.is_built_with_thread_sanitizer() @@ -59,6 +63,10 @@ def test_pkill(started_node): ) +@pytest.mark.skipif( + helpers.cluster.is_arm(), + reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855", +) def test_pkill_query_log(started_node): for signal in ["SEGV", "4"]: # force create query_log if it was not created diff --git a/tests/integration/test_disk_types/configs/storage.xml b/tests/integration/test_disk_types/configs/storage_amd.xml similarity index 100% rename from tests/integration/test_disk_types/configs/storage.xml rename to tests/integration/test_disk_types/configs/storage_amd.xml diff --git a/tests/integration/test_disk_types/configs/storage_arm.xml b/tests/integration/test_disk_types/configs/storage_arm.xml new file mode 100644 index 00000000000..a246cc8469e --- /dev/null +++ b/tests/integration/test_disk_types/configs/storage_arm.xml @@ -0,0 +1,17 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + encrypted + disk_s3 + 1234567812345678 + + + + diff --git a/tests/integration/test_disk_types/test.py b/tests/integration/test_disk_types/test.py index 3c4169be4de..1cc5048eb69 100644 --- a/tests/integration/test_disk_types/test.py +++ b/tests/integration/test_disk_types/test.py @@ -19,7 +19,9 @@ def cluster(): cluster = ClickHouseCluster(__file__) cluster.add_instance( "node", - main_configs=["configs/storage.xml"], + main_configs=["configs/storage_arm.xml"] + if is_arm() + else ["configs/storage_amd.xml"], with_minio=True, with_hdfs=not is_arm(), ) diff --git a/tests/integration/test_group_array_element_size/configs/group_array_max_element_size.xml b/tests/integration/test_group_array_element_size/configs/group_array_max_element_size.xml index 7a9cda7ccbd..80409d3e18b 100644 --- a/tests/integration/test_group_array_element_size/configs/group_array_max_element_size.xml +++ b/tests/integration/test_group_array_element_size/configs/group_array_max_element_size.xml @@ -1,3 +1,4 @@ 10 + false diff --git a/tests/integration/test_group_array_element_size/test.py b/tests/integration/test_group_array_element_size/test.py index 86b1d5feeee..1eb7647d734 100644 --- a/tests/integration/test_group_array_element_size/test.py +++ b/tests/integration/test_group_array_element_size/test.py @@ -9,6 +9,12 @@ node1 = cluster.add_instance( stay_alive=True, ) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/group_array_max_element_size.xml"], + stay_alive=True, +) + @pytest.fixture(scope="module") def started_cluster(): @@ -63,3 +69,33 @@ def test_max_exement_size(started_cluster): node1.restart_clickhouse() assert node1.query("select length(groupArrayMerge(x)) from tab3") == "21\n" + + +def test_limit_size(started_cluster): + node2.query( + "CREATE TABLE tab4 (x AggregateFunction(groupArray, Array(UInt8))) ENGINE = MergeTree ORDER BY tuple()" + ) + node2.query("insert into tab4 select groupArrayState([zero]) from zeros(10)") + assert node2.query("select length(groupArrayMerge(x)) from tab4") == "10\n" + + node2.replace_in_config( + "/etc/clickhouse-server/config.d/group_array_max_element_size.xml", + "false", + "true", + ) + + node2.restart_clickhouse() + + node2.query("insert into tab4 select groupArrayState([zero]) from zeros(100)") + assert node2.query("select length(groupArrayMerge(x)) from tab4") == "10\n" + + node2.replace_in_config( + "/etc/clickhouse-server/config.d/group_array_max_element_size.xml", + "true", + "false", + ) + + node2.restart_clickhouse() + + with pytest.raises(Exception, match=r"Too large array size"): + node2.query("insert into tab4 select groupArrayState([zero]) from zeros(11)") diff --git a/tests/integration/test_non_default_compression/test.py b/tests/integration/test_non_default_compression/test.py index 18e2eb43813..187cae5c957 100644 --- a/tests/integration/test_non_default_compression/test.py +++ b/tests/integration/test_non_default_compression/test.py @@ -2,7 +2,7 @@ import random import string import pytest -from helpers.cluster import ClickHouseCluster +from helpers.cluster import ClickHouseCluster, is_arm cluster = ClickHouseCluster(__file__) @@ -255,6 +255,11 @@ def test_uncompressed_cache_plus_zstd_codec(start_cluster): def test_preconfigured_deflateqpl_codec(start_cluster): + if is_arm(): + pytest.skip( + "Skipping test because it's special test for Intel code (doesn't work on ARM)" + ) + node6.query( """ CREATE TABLE compression_codec_multiple_with_key ( diff --git a/tests/integration/test_replicated_database/test.py b/tests/integration/test_replicated_database/test.py index 706620cfaef..6d1fa498e92 100644 --- a/tests/integration/test_replicated_database/test.py +++ b/tests/integration/test_replicated_database/test.py @@ -79,6 +79,29 @@ def started_cluster(): cluster.shutdown() +def test_flatten_nested(started_cluster): + main_node.query( + "CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');" + ) + dummy_node.query( + "CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica2');" + ) + + main_node.query( + "CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);" + ) + + main_node.query( + "CREATE MATERIALIZED VIEW create_replicated_table.mv ENGINE=ReplicatedMergeTree ORDER BY tuple() AS select d, cast([(k, toString(i32))] as Nested(a UInt64, b String)) from create_replicated_table.replicated_table" + ) + + assert main_node.query( + "show create create_replicated_table.mv" + ) == dummy_node.query("show create create_replicated_table.mv") + main_node.query("DROP DATABASE create_replicated_table SYNC") + dummy_node.query("DROP DATABASE create_replicated_table SYNC") + + def test_create_replicated_table(started_cluster): main_node.query( "CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');" diff --git a/tests/integration/test_s3_plain_rewritable/configs/storage_conf.xml b/tests/integration/test_s3_plain_rewritable/configs/storage_conf.xml index 1e4641fc8b2..560e6b6eca4 100644 --- a/tests/integration/test_s3_plain_rewritable/configs/storage_conf.xml +++ b/tests/integration/test_s3_plain_rewritable/configs/storage_conf.xml @@ -4,6 +4,7 @@ s3_plain_rewritable http://minio1:9001/root/data/ + minio minio123 diff --git a/tests/integration/test_s3_plain_rewritable/test.py b/tests/integration/test_s3_plain_rewritable/test.py index 5e27a690f1f..51786c55dab 100644 --- a/tests/integration/test_s3_plain_rewritable/test.py +++ b/tests/integration/test_s3_plain_rewritable/test.py @@ -1,24 +1,39 @@ import pytest import random import string +import threading from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) -node = cluster.add_instance( - "node", - main_configs=["configs/storage_conf.xml"], - with_minio=True, - stay_alive=True, -) -insert_values = [ - "(0,'data'),(1,'data')", - ",".join( +NUM_WORKERS = 5 + +nodes = [] +for i in range(NUM_WORKERS): + name = "node{}".format(i + 1) + node = cluster.add_instance( + name, + main_configs=["configs/storage_conf.xml"], + env_variables={"ENDPOINT_SUBPATH": name}, + with_minio=True, + stay_alive=True, + ) + nodes.append(node) + +MAX_ROWS = 1000 + + +def gen_insert_values(size): + return ",".join( f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')" - for i in range(10) - ), -] + for i in range(size) + ) + + +insert_values = ",".join( + f"({i},'{''.join(random.choices(string.ascii_lowercase, k=5))}')" for i in range(10) +) @pytest.fixture(scope="module", autouse=True) @@ -32,47 +47,71 @@ def start_cluster(): @pytest.mark.order(0) def test_insert(): - for index, value in enumerate(insert_values): + def create_insert(node, insert_values): node.query( """ - CREATE TABLE test_{} ( + CREATE TABLE test ( id Int64, data String ) ENGINE=MergeTree() ORDER BY id SETTINGS storage_policy='s3_plain_rewritable' - """.format( - index - ) + """ ) + node.query("INSERT INTO test VALUES {}".format(insert_values)) - node.query("INSERT INTO test_{} VALUES {}".format(index, value)) + insert_values_arr = [ + gen_insert_values(random.randint(1, MAX_ROWS)) for _ in range(0, NUM_WORKERS) + ] + threads = [] + for i in range(NUM_WORKERS): + t = threading.Thread( + target=create_insert, args=(nodes[i], insert_values_arr[i]) + ) + threads.append(t) + t.start() + + for t in threads: + t.join() + + for i in range(NUM_WORKERS): assert ( - node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index)) - == value + nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values") + == insert_values_arr[i] ) @pytest.mark.order(1) def test_restart(): - for index, value in enumerate(insert_values): - assert ( - node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index)) - == value + insert_values_arr = [] + for i in range(NUM_WORKERS): + insert_values_arr.append( + nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values") ) - node.restart_clickhouse() - for index, value in enumerate(insert_values): + def restart(node): + node.restart_clickhouse() + + threads = [] + for i in range(NUM_WORKERS): + t = threading.Thread(target=restart, args=(nodes[i],)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + for i in range(NUM_WORKERS): assert ( - node.query("SELECT * FROM test_{} ORDER BY id FORMAT Values".format(index)) - == value + nodes[i].query("SELECT * FROM test ORDER BY id FORMAT Values") + == insert_values_arr[i] ) @pytest.mark.order(2) def test_drop(): - for index, value in enumerate(insert_values): - node.query("DROP TABLE IF EXISTS test_{} SYNC".format(index)) + for i in range(NUM_WORKERS): + nodes[i].query("DROP TABLE IF EXISTS test SYNC") it = cluster.minio_client.list_objects( cluster.minio_bucket, "data/", recursive=True diff --git a/tests/integration/test_send_crash_reports/test.py b/tests/integration/test_send_crash_reports/test.py index 83c0827f891..15a15a13e2f 100644 --- a/tests/integration/test_send_crash_reports/test.py +++ b/tests/integration/test_send_crash_reports/test.py @@ -35,6 +35,10 @@ def started_node(): pass +@pytest.mark.skipif( + helpers.cluster.is_arm(), + reason="Fails on ARM, issue https://github.com/ClickHouse/ClickHouse/issues/63855", +) def test_send_segfault(started_node): # NOTE: another option is to increase waiting time. if ( diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index c7893c3e643..e7925d55d00 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -783,6 +783,7 @@ def test_max_set_age(started_cluster): "s3queue_tracked_file_ttl_sec": max_age, "s3queue_cleanup_interval_min_ms": 0, "s3queue_cleanup_interval_max_ms": 0, + "s3queue_loading_retries": 0, }, ) create_mv(node, table_name, dst_table_name) @@ -829,6 +830,61 @@ def test_max_set_age(started_cluster): for path_count in paths_count: assert 2 == path_count + failed_count = int( + node.query( + "SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1" + ) + ) + + values = [ + ["failed", 1, 1], + ] + values_csv = ( + "\n".join((",".join(map(str, row)) for row in values)) + "\n" + ).encode() + put_s3_file_content(started_cluster, f"{files_path}/fff.csv", values_csv) + + for _ in range(30): + if failed_count + 1 == int( + node.query( + "SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1" + ) + ): + break + time.sleep(1) + + assert failed_count + 1 == int( + node.query( + "SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1" + ) + ) + + node.query("SYSTEM FLUSH LOGS") + assert "Cannot parse input" in node.query( + "SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'" + ) + assert 1 == int( + node.query( + "SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)" + ) + ) + + time.sleep(max_age + 1) + + assert failed_count + 2 == int( + node.query("SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles'") + ) + + node.query("SYSTEM FLUSH LOGS") + assert "Cannot parse input" in node.query( + "SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv' ORDER BY processing_end_time DESC LIMIT 1" + ) + assert 2 == int( + node.query( + "SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)" + ) + ) + def test_max_set_size(started_cluster): node = started_cluster.instances["instance"] @@ -902,9 +958,9 @@ def test_drop_table(started_cluster): node.wait_for_log_line(f"Reading from file: test_drop_data") node.query(f"DROP TABLE {table_name} SYNC") assert node.contains_in_log( - f"StorageS3Queue ({table_name}): Table is being dropped" + f"StorageS3Queue (default.{table_name}): Table is being dropped" ) or node.contains_in_log( - f"StorageS3Queue ({table_name}): Shutdown was called, stopping sync" + f"StorageS3Queue (default.{table_name}): Shutdown was called, stopping sync" ) diff --git a/tests/queries/0_stateless/01787_map_remote.reference b/tests/queries/0_stateless/01787_map_remote.reference index c7828769f9f..c411899c87c 100644 --- a/tests/queries/0_stateless/01787_map_remote.reference +++ b/tests/queries/0_stateless/01787_map_remote.reference @@ -2,3 +2,6 @@ {'a':1,'b':2} {'a':1,'b':2} {'a':1,'b':2} +{} +{} +{'':''} diff --git a/tests/queries/0_stateless/01787_map_remote.sql b/tests/queries/0_stateless/01787_map_remote.sql index 217308e5141..e169ed62781 100644 --- a/tests/queries/0_stateless/01787_map_remote.sql +++ b/tests/queries/0_stateless/01787_map_remote.sql @@ -1,2 +1,8 @@ SELECT map('a', 1, 'b', 2) FROM remote('127.0.0.{1,2}', system, one); SELECT map('a', 1, 'b', 2) FROM remote('127.0.0.{1,2}'); + +SELECT map() from remote('127.0.0.{1,2}', system,one); + +drop table if exists bug_repro_local; +CREATE TABLE bug_repro_local (`attributes` Map(LowCardinality(String), String)) ENGINE = Log as select map('',''); +SELECT if(1, attributes, map()) from remote('127.0.0.{1,2}', currentDatabase(), bug_repro_local) limit 1; diff --git a/tests/queries/0_stateless/02455_one_row_from_csv_memory_usage.sh b/tests/queries/0_stateless/02455_one_row_from_csv_memory_usage.sh index 5b54666a6a3..05de3f05562 100755 --- a/tests/queries/0_stateless/02455_one_row_from_csv_memory_usage.sh +++ b/tests/queries/0_stateless/02455_one_row_from_csv_memory_usage.sh @@ -9,7 +9,7 @@ USER_FILES_PATH=$($CLICKHOUSE_CLIENT --query "select _path,_file from file('none cp "$CUR_DIR"/data_csv/10m_rows.csv.xz $USER_FILES_PATH/ -${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings max_threads=1, max_memory_usage=1000000000" -${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings max_threads=1, max_memory_usage=100000000" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings input_format_parallel_parsing=1, max_threads=1, max_parsing_threads=16, min_chunk_bytes_for_parallel_parsing=10485760, max_memory_usage=1000000000" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM file('10m_rows.csv.xz' , 'CSVWithNames') order by identifier, number, name, surname, birthday LIMIT 1 settings input_format_parallel_parsing=1, max_threads=1, max_parsing_threads=16, min_chunk_bytes_for_parallel_parsing=10485760, max_memory_usage=100000000" rm $USER_FILES_PATH/10m_rows.csv.xz diff --git a/tests/queries/0_stateless/03144_invalid_filter.reference b/tests/queries/0_stateless/03144_invalid_filter.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03144_invalid_filter.sql b/tests/queries/0_stateless/03144_invalid_filter.sql new file mode 100644 index 00000000000..deb8d7b96b3 --- /dev/null +++ b/tests/queries/0_stateless/03144_invalid_filter.sql @@ -0,0 +1,14 @@ +-- https://github.com/ClickHouse/ClickHouse/issues/48049 +SET allow_experimental_analyzer = 1; + +CREATE TABLE test_table (`id` UInt64, `value` String) ENGINE = TinyLog() AS Select number, number::String from numbers(10); + +WITH CAST(tuple(1), 'Tuple (value UInt64)') AS compound_value +SELECT id, test_table.* APPLY x -> compound_value.* +FROM test_table +WHERE arrayMap(x -> toString(x) AS lambda, [NULL, 256, 257, NULL, NULL]) +SETTINGS convert_query_to_cnf = true, optimize_using_constraints = true, optimize_substitute_columns = true; -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER } + +DESCRIBE TABLE (SELECT test_table.COLUMNS(id) FROM test_table WHERE '2147483647'); -- { serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER } + +DROP TABLE test_table; diff --git a/tests/queries/0_stateless/03151_pmj_join_non_procssed_clash.reference b/tests/queries/0_stateless/03151_pmj_join_non_procssed_clash.reference new file mode 100644 index 00000000000..3fbea507f11 --- /dev/null +++ b/tests/queries/0_stateless/03151_pmj_join_non_procssed_clash.reference @@ -0,0 +1 @@ +9900 49990050 49990050 49990050 diff --git a/tests/queries/0_stateless/03151_pmj_join_non_procssed_clash.sql b/tests/queries/0_stateless/03151_pmj_join_non_procssed_clash.sql new file mode 100644 index 00000000000..a54de889760 --- /dev/null +++ b/tests/queries/0_stateless/03151_pmj_join_non_procssed_clash.sql @@ -0,0 +1,8 @@ +SET join_algorithm = 'partial_merge'; +SET max_joined_block_size_rows = 100; + + +SELECT count(ignore(*)), sum(t1.a), sum(t1.b), sum(t2.a) +FROM ( SELECT number AS a, number AS b FROM numbers(10000) ) t1 +JOIN ( SELECT number + 100 AS a FROM numbers(10000) ) t2 +ON t1.a = t2.a AND t1.b > 0; diff --git a/tests/queries/0_stateless/03151_redundant_distinct_with_window.reference b/tests/queries/0_stateless/03151_redundant_distinct_with_window.reference new file mode 100644 index 00000000000..e321055f1e2 --- /dev/null +++ b/tests/queries/0_stateless/03151_redundant_distinct_with_window.reference @@ -0,0 +1,7 @@ +1 +2 +3 +-------- +1 2023-01-14 00:00:00 +2 2023-01-14 00:00:00 +3 2023-01-14 00:00:00 diff --git a/tests/queries/0_stateless/03151_redundant_distinct_with_window.sql b/tests/queries/0_stateless/03151_redundant_distinct_with_window.sql new file mode 100644 index 00000000000..79e0074e91b --- /dev/null +++ b/tests/queries/0_stateless/03151_redundant_distinct_with_window.sql @@ -0,0 +1,21 @@ +DROP TABLE IF EXISTS tab; +DROP TABLE IF EXISTS tab_v; + +CREATE TABLE tab (id Int32, val Nullable(Float64), dt Nullable(DateTime64(6)), type Nullable(Int32)) ENGINE = MergeTree ORDER BY id; + +insert into tab values (1,10,'2023-01-14 00:00:00',1),(2,20,'2023-01-14 00:00:00',1),(3,20,'2023-01-14 00:00:00',2),(4,40,'2023-01-14 00:00:00',3),(5,50,'2023-01-14 00:00:00',3); + +CREATE VIEW tab_v AS SELECT + t1.type AS type, + sum(t1.val) AS sval, + toStartOfDay(t1.dt) AS sday, + anyLast(sval) OVER w AS lval +FROM tab AS t1 +GROUP BY + type, + sday +WINDOW w AS (PARTITION BY type); + +select distinct type from tab_v order by type; +select '--------'; +select distinct type, sday from tab_v order by type, sday; diff --git a/tests/queries/0_stateless/03152_trailing_comma_in_columns_list_in_insert.reference b/tests/queries/0_stateless/03152_trailing_comma_in_columns_list_in_insert.reference new file mode 100644 index 00000000000..6622044ee6e --- /dev/null +++ b/tests/queries/0_stateless/03152_trailing_comma_in_columns_list_in_insert.reference @@ -0,0 +1,2 @@ +1 2 3 +4 5 6 diff --git a/tests/queries/0_stateless/03152_trailing_comma_in_columns_list_in_insert.sql b/tests/queries/0_stateless/03152_trailing_comma_in_columns_list_in_insert.sql new file mode 100644 index 00000000000..4031f9a7762 --- /dev/null +++ b/tests/queries/0_stateless/03152_trailing_comma_in_columns_list_in_insert.sql @@ -0,0 +1,4 @@ +CREATE TEMPORARY TABLE test (a UInt8, b UInt8, c UInt8); +INSERT INTO test (a, b, c, ) VALUES (1, 2, 3); +INSERT INTO test (a, b, c) VALUES (4, 5, 6); +SELECT * FROM test ORDER BY a; diff --git a/tests/queries/0_stateless/03153_format_regexp_usability.reference b/tests/queries/0_stateless/03153_format_regexp_usability.reference new file mode 100644 index 00000000000..c41b0adbd84 --- /dev/null +++ b/tests/queries/0_stateless/03153_format_regexp_usability.reference @@ -0,0 +1,2 @@ +regular expression is not set +`Upyachka` diff --git a/tests/queries/0_stateless/03153_format_regexp_usability.sh b/tests/queries/0_stateless/03153_format_regexp_usability.sh new file mode 100755 index 00000000000..03bed10dd17 --- /dev/null +++ b/tests/queries/0_stateless/03153_format_regexp_usability.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-parallel, no-ordinary-database, long + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +${CLICKHOUSE_LOCAL} --query "SELECT * FROM format(Regexp, 's String', 'Hello')" 2>&1 | grep -o -F 'regular expression is not set' +${CLICKHOUSE_LOCAL} --query "SELECT * FROM format(Regexp, 's String', 'Hello') SETTINGS format_regexp = 'Upyachka'" 2>&1 | grep -o -F '`Upyachka`' diff --git a/tests/queries/0_stateless/03153_trailing_comma_in_values_list_in_insert.reference b/tests/queries/0_stateless/03153_trailing_comma_in_values_list_in_insert.reference new file mode 100644 index 00000000000..e115855485d --- /dev/null +++ b/tests/queries/0_stateless/03153_trailing_comma_in_values_list_in_insert.reference @@ -0,0 +1,3 @@ +1 2 3 +4 5 6 +7 8 9 diff --git a/tests/queries/0_stateless/03153_trailing_comma_in_values_list_in_insert.sql b/tests/queries/0_stateless/03153_trailing_comma_in_values_list_in_insert.sql new file mode 100644 index 00000000000..65301c977c2 --- /dev/null +++ b/tests/queries/0_stateless/03153_trailing_comma_in_values_list_in_insert.sql @@ -0,0 +1,5 @@ +CREATE TEMPORARY TABLE test (a UInt8, b UInt8, c UInt8); +INSERT INTO test (a, b, c) VALUES (1, 2, 3, ); +INSERT INTO test (a, b, c) VALUES (4, 5, 6,); +INSERT INTO test (a, b, c) VALUES (7, 8, 9); +SELECT * FROM test ORDER BY a; diff --git a/tests/queries/0_stateless/03154_lazy_token_iterator.reference b/tests/queries/0_stateless/03154_lazy_token_iterator.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/03154_lazy_token_iterator.sh b/tests/queries/0_stateless/03154_lazy_token_iterator.sh new file mode 100755 index 00000000000..4794dafda4b --- /dev/null +++ b/tests/queries/0_stateless/03154_lazy_token_iterator.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-parallel, no-ordinary-database, long + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +# In previous versions this command took longer than ten minutes. Now it takes less than a second in release mode: + +python3 -c 'import sys; import struct; sys.stdout.buffer.write(b"".join(struct.pack("