diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index b2ddd87d173..e712ada1551 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -81,7 +81,6 @@ jobs: cat >> "$GITHUB_ENV" << 'EOF' BUILD_NAME=coverity CACHES_PATH=${{runner.temp}}/../ccaches - CHECK_NAME=ClickHouse build check (actions) IMAGES_PATH=${{runner.temp}}/images_path REPO_COPY=${{runner.temp}}/build_check/ClickHouse TEMP_PATH=${{runner.temp}}/build_check @@ -99,13 +98,15 @@ jobs: id: coverity-checkout uses: actions/checkout@v2 with: - submodules: 'true' + fetch-depth: 0 # otherwise we will have no info about contributors - name: Build run: | + git -C "$GITHUB_WORKSPACE" submodule sync + git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --init --jobs=10 sudo rm -fr "$TEMP_PATH" mkdir -p "$TEMP_PATH" cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH" - cd "$REPO_COPY/tests/ci" && python3 build_check.py "$CHECK_NAME" "$BUILD_NAME" + cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME" - name: Upload Coverity Analysis if: ${{ success() || failure() }} run: | diff --git a/contrib/libxml2 b/contrib/libxml2 index a075d256fd9..7846b0a677f 160000 --- a/contrib/libxml2 +++ b/contrib/libxml2 @@ -1 +1 @@ -Subproject commit a075d256fd9ff15590b86d981b75a50ead124fca +Subproject commit 7846b0a677f8d3ce72486125fa281e92ac9970e8 diff --git a/docs/en/sql-reference/functions/array-functions.md b/docs/en/sql-reference/functions/array-functions.md index 4fe7e44aad7..c445c322aac 100644 --- a/docs/en/sql-reference/functions/array-functions.md +++ b/docs/en/sql-reference/functions/array-functions.md @@ -352,7 +352,7 @@ Elements set to `NULL` are handled as normal values. ## arrayCount(\[func,\] arr1, …) {#array-count} -Returns the number of elements in the arr array for which func returns something other than 0. If ‘func’ is not specified, it returns the number of non-zero elements in the array. +Returns the number of elements for which `func(arr1[i], …, arrN[i])` returns something other than 0. If `func` is not specified, it returns the number of non-zero elements in the array. Note that the `arrayCount` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You can pass a lambda function to it as the first argument. @@ -1244,7 +1244,7 @@ Result: ## arrayMap(func, arr1, …) {#array-map} -Returns an array obtained from the original application of the `func` function to each element in the `arr` array. +Returns an array obtained from the original arrays by application of `func(arr1[i], …, arrN[i])` for each element. Arrays `arr1` … `arrN` must have the same number of elements. Examples: @@ -1274,7 +1274,7 @@ Note that the `arrayMap` is a [higher-order function](../../sql-reference/functi ## arrayFilter(func, arr1, …) {#array-filter} -Returns an array containing only the elements in `arr1` for which `func` returns something other than 0. +Returns an array containing only the elements in `arr1` for which `func(arr1[i], …, arrN[i])` returns something other than 0. Examples: @@ -1307,7 +1307,7 @@ Note that the `arrayFilter` is a [higher-order function](../../sql-reference/fun ## arrayFill(func, arr1, …) {#array-fill} -Scan through `arr1` from the first element to the last element and replace `arr1[i]` by `arr1[i - 1]` if `func` returns 0. The first element of `arr1` will not be replaced. +Scan through `arr1` from the first element to the last element and replace `arr1[i]` by `arr1[i - 1]` if `func(arr1[i], …, arrN[i])` returns 0. The first element of `arr1` will not be replaced. Examples: @@ -1325,7 +1325,7 @@ Note that the `arrayFill` is a [higher-order function](../../sql-reference/funct ## arrayReverseFill(func, arr1, …) {#array-reverse-fill} -Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func` returns 0. The last element of `arr1` will not be replaced. +Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func(arr1[i], …, arrN[i])` returns 0. The last element of `arr1` will not be replaced. Examples: @@ -1343,7 +1343,7 @@ Note that the `arrayReverseFill` is a [higher-order function](../../sql-referenc ## arraySplit(func, arr1, …) {#array-split} -Split `arr1` into multiple arrays. When `func` returns something other than 0, the array will be split on the left hand side of the element. The array will not be split before the first element. +Split `arr1` into multiple arrays. When `func(arr1[i], …, arrN[i])` returns something other than 0, the array will be split on the left hand side of the element. The array will not be split before the first element. Examples: @@ -1361,7 +1361,7 @@ Note that the `arraySplit` is a [higher-order function](../../sql-reference/func ## arrayReverseSplit(func, arr1, …) {#array-reverse-split} -Split `arr1` into multiple arrays. When `func` returns something other than 0, the array will be split on the right hand side of the element. The array will not be split after the last element. +Split `arr1` into multiple arrays. When `func(arr1[i], …, arrN[i])` returns something other than 0, the array will be split on the right hand side of the element. The array will not be split after the last element. Examples: @@ -1379,37 +1379,37 @@ Note that the `arrayReverseSplit` is a [higher-order function](../../sql-referen ## arrayExists(\[func,\] arr1, …) {#arrayexistsfunc-arr1} -Returns 1 if there is at least one element in `arr` for which `func` returns something other than 0. Otherwise, it returns 0. +Returns 1 if there is at least one element in `arr` for which `func(arr1[i], …, arrN[i])` returns something other than 0. Otherwise, it returns 0. Note that the `arrayExists` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You can pass a lambda function to it as the first argument. ## arrayAll(\[func,\] arr1, …) {#arrayallfunc-arr1} -Returns 1 if `func` returns something other than 0 for all the elements in `arr`. Otherwise, it returns 0. +Returns 1 if `func(arr1[i], …, arrN[i])` returns something other than 0 for all the elements in arrays. Otherwise, it returns 0. Note that the `arrayAll` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You can pass a lambda function to it as the first argument. ## arrayFirst(func, arr1, …) {#array-first} -Returns the first element in the `arr1` array for which `func` returns something other than 0. +Returns the first element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0. Note that the `arrayFirst` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it can’t be omitted. ## arrayLast(func, arr1, …) {#array-last} -Returns the last element in the `arr1` array for which `func` returns something other than 0. +Returns the last element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0. Note that the `arrayLast` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it can’t be omitted. ## arrayFirstIndex(func, arr1, …) {#array-first-index} -Returns the index of the first element in the `arr1` array for which `func` returns something other than 0. +Returns the index of the first element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0. Note that the `arrayFirstIndex` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it can’t be omitted. ## arrayLastIndex(func, arr1, …) {#array-last-index} -Returns the index of the last element in the `arr1` array for which `func` returns something other than 0. +Returns the index of the last element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0. Note that the `arrayLastIndex` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it can’t be omitted. @@ -1635,7 +1635,7 @@ Result: ## arrayCumSum(\[func,\] arr1, …) {#arraycumsumfunc-arr1} -Returns an array of partial sums of elements in the source array (a running sum). If the `func` function is specified, then the values of the array elements are converted by this function before summing. +Returns an array of partial sums of elements in the source array (a running sum). If the `func` function is specified, then the values of the array elements are converted by `func(arr1[i], …, arrN[i])` before summing. Example: diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 18123a5a784..1b3e5de8adb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -695,7 +695,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \ \ M(Bool, output_format_json_escape_forward_slashes, true, "Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.", 0) \ - M(Bool, output_format_json_named_tuples_as_objects, false, "Serialize named tuple columns as JSON objects.", 0) \ + M(Bool, output_format_json_named_tuples_as_objects, true, "Serialize named tuple columns as JSON objects.", 0) \ M(Bool, output_format_json_array_of_rows, false, "Output a JSON array of all rows in JSONEachRow(Compact) format.", 0) \ \ M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \ diff --git a/src/Databases/MySQL/DatabaseMySQL.cpp b/src/Databases/MySQL/DatabaseMySQL.cpp index 446518be5cd..58be682bd73 100644 --- a/src/Databases/MySQL/DatabaseMySQL.cpp +++ b/src/Databases/MySQL/DatabaseMySQL.cpp @@ -26,6 +26,7 @@ # include # include # include +# include namespace fs = std::filesystem; @@ -148,8 +149,16 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context auto storage_engine_arguments = ast_storage->engine->arguments; /// Add table_name to engine arguments - auto mysql_table_name = std::make_shared(table_name); - storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name); + if (typeid_cast(storage_engine_arguments->children[0].get())) + { + storage_engine_arguments->children.push_back( + makeASTFunction("equals", std::make_shared("table"), std::make_shared(table_name))); + } + else + { + auto mysql_table_name = std::make_shared(table_name); + storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name); + } /// Unset settings std::erase_if(storage_children, [&](const ASTPtr & element) { return element.get() == ast_storage->settings; }); diff --git a/src/Functions/FunctionHashID.h b/src/Functions/FunctionHashID.h index fbfb368bec7..30f08c96eca 100644 --- a/src/Functions/FunctionHashID.h +++ b/src/Functions/FunctionHashID.h @@ -51,9 +51,11 @@ public: bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } + bool useDefaultImplementationForConstants() const override { return true; } + DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override { - if (arguments.size() < 1) + if (arguments.empty()) throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Function {} expects at least one argument", getName()); const auto & id_col = arguments[0]; @@ -114,18 +116,16 @@ public: const auto & numcolumn = arguments[0].column; if (checkAndGetColumn(numcolumn.get()) || checkAndGetColumn(numcolumn.get()) - || checkAndGetColumn(numcolumn.get()) || checkAndGetColumn(numcolumn.get()) - || checkAndGetColumnConst(numcolumn.get()) || checkAndGetColumnConst(numcolumn.get()) - || checkAndGetColumnConst(numcolumn.get()) || checkAndGetColumnConst(numcolumn.get())) + || checkAndGetColumn(numcolumn.get()) || checkAndGetColumn(numcolumn.get())) { std::string salt; - UInt8 minLength = 0; + UInt8 min_length = 0; std::string alphabet; if (arguments.size() >= 4) { const auto & alphabetcolumn = arguments[3].column; - if (auto alpha_col = checkAndGetColumnConst(alphabetcolumn.get())) + if (const auto * alpha_col = checkAndGetColumnConst(alphabetcolumn.get())) { alphabet = alpha_col->getValue(); if (alphabet.find('\0') != std::string::npos) @@ -138,18 +138,18 @@ public: if (arguments.size() >= 3) { const auto & minlengthcolumn = arguments[2].column; - if (auto min_length_col = checkAndGetColumnConst(minlengthcolumn.get())) - minLength = min_length_col->getValue(); + if (const auto * min_length_col = checkAndGetColumnConst(minlengthcolumn.get())) + min_length = min_length_col->getValue(); } if (arguments.size() >= 2) { const auto & saltcolumn = arguments[1].column; - if (auto salt_col = checkAndGetColumnConst(saltcolumn.get())) + if (const auto * salt_col = checkAndGetColumnConst(saltcolumn.get())) salt = salt_col->getValue(); } - hashidsxx::Hashids hash(salt, minLength, alphabet); + hashidsxx::Hashids hash(salt, min_length, alphabet); auto col_res = ColumnString::create(); diff --git a/src/Functions/FunctionsConversion.h b/src/Functions/FunctionsConversion.h index bffc15cdc57..419c45ce6df 100644 --- a/src/Functions/FunctionsConversion.h +++ b/src/Functions/FunctionsConversion.h @@ -705,7 +705,7 @@ template <> struct FormatImpl { template - static ReturnType execute(const DataTypeDate::FieldType x, WriteBuffer & wb, const DataTypeDate32 *, const DateLUTImpl *) + static ReturnType execute(const DataTypeDate32::FieldType x, WriteBuffer & wb, const DataTypeDate32 *, const DateLUTImpl *) { writeDateText(ExtendedDayNum(x), wb); return ReturnType(true); diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index 512f1c856b7..926d10bda5b 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -49,7 +49,15 @@ ParallelReadBuffer::ParallelReadBuffer( , schedule(std::move(schedule_)) , reader_factory(std::move(reader_factory_)) { - addReaders(); + try + { + addReaders(); + } + catch (const Exception &) + { + finishAndWait(); + throw; + } } bool ParallelReadBuffer::addReaderToPool() diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index 2570a0d7c17..1d3ec6095d5 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -152,7 +152,7 @@ void WriteBufferFromS3::allocateBuffer() WriteBufferFromS3::~WriteBufferFromS3() { #ifndef NDEBUG - if (!is_finalized) + if (!finalized) { LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It's a bug"); std::terminate(); @@ -200,8 +200,6 @@ void WriteBufferFromS3::finalizeImpl() if (!multipart_upload_id.empty()) completeMultipartUpload(); - - is_finalized = true; } void WriteBufferFromS3::createMultipartUpload() diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 7dbaad72940..4cdc39b80a0 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -106,7 +106,6 @@ private: std::vector part_tags; bool is_prefinalized = false; - bool is_finalized = false; /// Following fields are for background uploads in thread pool (if specified). /// We use std::function to avoid dependency of Interpreters diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index d817988e7b6..54f55c7b1f6 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -418,6 +418,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, if (address.is_local) info.local_addresses.push_back(address); + info.all_addresses.push_back(address); + auto pool = ConnectionPoolFactory::instance().get( settings.distributed_connections_pool_size, address.host_name, address.port, @@ -485,6 +487,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, } Addresses shard_local_addresses; + Addresses shard_all_addresses; ConnectionPoolPtrs all_replicas_pools; all_replicas_pools.reserve(replica_addresses.size()); @@ -502,6 +505,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, all_replicas_pools.emplace_back(replica_pool); if (replica.is_local) shard_local_addresses.push_back(replica); + shard_all_addresses.push_back(replica); } ConnectionPoolWithFailoverPtr shard_pool = std::make_shared( @@ -516,6 +520,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, current_shard_num, weight, std::move(shard_local_addresses), + std::move(shard_all_addresses), std::move(shard_pool), std::move(all_replicas_pools), internal_replication @@ -571,6 +576,7 @@ Cluster::Cluster( addresses_with_failover.emplace_back(current); Addresses shard_local_addresses; + Addresses all_addresses; ConnectionPoolPtrs all_replicas; all_replicas.reserve(current.size()); @@ -585,6 +591,7 @@ Cluster::Cluster( all_replicas.emplace_back(replica_pool); if (replica.is_local && !treat_local_as_remote) shard_local_addresses.push_back(replica); + all_addresses.push_back(replica); } ConnectionPoolWithFailoverPtr shard_pool = std::make_shared( @@ -597,6 +604,7 @@ Cluster::Cluster( current_shard_num, default_weight, std::move(shard_local_addresses), + std::move(all_addresses), std::move(shard_pool), std::move(all_replicas), false // has_internal_replication @@ -680,6 +688,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti if (address.is_local) info.local_addresses.push_back(address); + info.all_addresses.push_back(address); + auto pool = ConnectionPoolFactory::instance().get( settings.distributed_connections_pool_size, address.host_name, diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 7c8d15d0350..5ce011782fc 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -202,6 +202,7 @@ public: UInt32 shard_num = 0; UInt32 weight = 1; Addresses local_addresses; + Addresses all_addresses; /// nullptr if there are no remote addresses ConnectionPoolWithFailoverPtr pool; /// Connection pool for each replica, contains nullptr for local replicas diff --git a/src/Interpreters/ClusterProxy/IStreamFactory.h b/src/Interpreters/ClusterProxy/IStreamFactory.h deleted file mode 100644 index 483ce9dcab9..00000000000 --- a/src/Interpreters/ClusterProxy/IStreamFactory.h +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace DB -{ - -struct Settings; -class Cluster; -class Throttler; -struct SelectQueryInfo; - -class Pipe; -using Pipes = std::vector; - -class QueryPlan; -using QueryPlanPtr = std::unique_ptr; - -struct StorageID; - -namespace ClusterProxy -{ - -/// Base class for the implementation of the details of distributed query -/// execution that are specific to the query type. -class IStreamFactory -{ -public: - virtual ~IStreamFactory() = default; - - struct Shard - { - /// Query and header may be changed depending on shard. - ASTPtr query; - Block header; - - size_t shard_num = 0; - size_t num_replicas = 0; - ConnectionPoolWithFailoverPtr pool; - ConnectionPoolPtrs per_replica_pools; - - /// If we connect to replicas lazily. - /// (When there is a local replica with big delay). - bool lazy = false; - UInt32 local_delay = 0; - }; - - using Shards = std::vector; - - virtual void createForShard( - const Cluster::ShardInfo & shard_info, - const ASTPtr & query_ast, - const StorageID & main_table, - const ASTPtr & table_func_ptr, - ContextPtr context, - std::vector & local_plans, - Shards & remote_shards, - UInt32 shard_count) = 0; -}; - -} - -} diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index a2afed3759f..89123cda531 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -10,14 +11,15 @@ #include #include +#include #include #include +#include #include #include #include #include - namespace ProfileEvents { extern const Event DistributedConnectionMissingTable; @@ -63,7 +65,8 @@ void SelectStreamFactory::createForShard( auto emplace_local_stream = [&]() { - local_plans.emplace_back(createLocalPlan(query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*coordinator=*/nullptr)); + local_plans.emplace_back(createLocalPlan( + query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*replica_num=*/0, /*replica_count=*/0, /*coordinator=*/nullptr)); }; auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0) @@ -71,10 +74,7 @@ void SelectStreamFactory::createForShard( remote_shards.emplace_back(Shard{ .query = query_ast, .header = header, - .shard_num = shard_info.shard_num, - .num_replicas = shard_info.getAllNodeCount(), - .pool = shard_info.pool, - .per_replica_pools = shard_info.per_replica_pools, + .shard_info = shard_info, .lazy = lazy, .local_delay = local_delay, }); @@ -173,5 +173,97 @@ void SelectStreamFactory::createForShard( emplace_remote_stream(); } + +SelectStreamFactory::ShardPlans SelectStreamFactory::createForShardWithParallelReplicas( + const Cluster::ShardInfo & shard_info, + const ASTPtr & query_ast, + const StorageID & main_table, + const ASTPtr & table_function_ptr, + const ThrottlerPtr & throttler, + ContextPtr context, + UInt32 shard_count) +{ + SelectStreamFactory::ShardPlans result; + + if (auto it = objects_by_shard.find(shard_info.shard_num); it != objects_by_shard.end()) + replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast); + + const auto & settings = context->getSettingsRef(); + + auto is_local_replica_obsolete = [&]() + { + auto resolved_id = context->resolveStorageID(main_table); + auto main_table_storage = DatabaseCatalog::instance().tryGetTable(resolved_id, context); + const auto * replicated_storage = dynamic_cast(main_table_storage.get()); + + if (!replicated_storage) + return false; + + UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries; + + if (!max_allowed_delay) + return false; + + UInt32 local_delay = replicated_storage->getAbsoluteDelay(); + return local_delay >= max_allowed_delay; + }; + + size_t next_replica_number = 0; + size_t all_replicas_count = shard_info.getRemoteNodeCount(); + + auto coordinator = std::make_shared(); + auto remote_plan = std::make_unique(); + + + if (settings.prefer_localhost_replica && shard_info.isLocal()) + { + /// We don't need more than one local replica in parallel reading + if (!is_local_replica_obsolete()) + { + ++all_replicas_count; + + result.local_plan = createLocalPlan( + query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, next_replica_number, all_replicas_count, coordinator); + + ++next_replica_number; + } + } + + Scalars scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; + scalars.emplace( + "_shard_count", Block{{DataTypeUInt32().createColumnConst(1, shard_count), std::make_shared(), "_shard_count"}}); + auto external_tables = context->getExternalTables(); + + auto shard = Shard{ + .query = query_ast, + .header = header, + .shard_info = shard_info, + .lazy = false, + .local_delay = 0, + }; + + if (shard_info.hasRemoteConnections()) + { + auto read_from_remote = std::make_unique( + coordinator, + shard, + header, + processed_stage, + main_table, + table_function_ptr, + context, + throttler, + std::move(scalars), + std::move(external_tables), + &Poco::Logger::get("ReadFromParallelRemoteReplicasStep"), + shard_count); + + remote_plan->addStep(std::move(read_from_remote)); + result.remote_plan = std::move(remote_plan); + } + + return result; +} + } } diff --git a/src/Interpreters/ClusterProxy/SelectStreamFactory.h b/src/Interpreters/ClusterProxy/SelectStreamFactory.h index 731bf3acd10..f64e57e1316 100644 --- a/src/Interpreters/ClusterProxy/SelectStreamFactory.h +++ b/src/Interpreters/ClusterProxy/SelectStreamFactory.h @@ -1,22 +1,56 @@ #pragma once #include -#include #include #include #include +#include +#include +#include namespace DB { + +struct Settings; +class Cluster; +class Throttler; +struct SelectQueryInfo; + +class Pipe; +using Pipes = std::vector; + +class QueryPlan; +using QueryPlanPtr = std::unique_ptr; + +struct StorageID; + namespace ClusterProxy { + using ColumnsDescriptionByShardNum = std::unordered_map; -class SelectStreamFactory final : public IStreamFactory +class SelectStreamFactory { public: + + struct Shard + { + /// Query and header may be changed depending on shard. + ASTPtr query; + Block header; + + Cluster::ShardInfo shard_info; + + /// If we connect to replicas lazily. + /// (When there is a local replica with big delay). + bool lazy = false; + UInt32 local_delay = 0; + }; + + using Shards = std::vector; + SelectStreamFactory( const Block & header_, const ColumnsDescriptionByShardNum & objects_by_shard_, @@ -31,7 +65,26 @@ public: ContextPtr context, std::vector & local_plans, Shards & remote_shards, - UInt32 shard_count) override; + UInt32 shard_count); + + struct ShardPlans + { + /// If a shard has local replicas this won't be nullptr + std::unique_ptr local_plan; + + /// Contains several steps to read from all remote replicas + std::unique_ptr remote_plan; + }; + + ShardPlans createForShardWithParallelReplicas( + const Cluster::ShardInfo & shard_info, + const ASTPtr & query_ast, + const StorageID & main_table, + const ASTPtr & table_function_ptr, + const ThrottlerPtr & throttler, + ContextPtr context, + UInt32 shard_count + ); private: const Block header; diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 011ef8e0740..ddac4e7b466 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -20,6 +20,7 @@ namespace DB namespace ErrorCodes { extern const int TOO_LARGE_DISTRIBUTED_DEPTH; + extern const int LOGICAL_ERROR; } namespace ClusterProxy @@ -106,21 +107,19 @@ void executeQuery( QueryProcessingStage::Enum processed_stage, const StorageID & main_table, const ASTPtr & table_func_ptr, - IStreamFactory & stream_factory, Poco::Logger * log, + SelectStreamFactory & stream_factory, Poco::Logger * log, const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info, const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const ClusterPtr & not_optimized_cluster) { - assert(log); - const Settings & settings = context->getSettingsRef(); if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth) throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH); std::vector plans; - IStreamFactory::Shards remote_shards; + SelectStreamFactory::Shards remote_shards; auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, log); @@ -215,6 +214,91 @@ void executeQuery( query_plan.unitePlans(std::move(union_step), std::move(plans)); } + +void executeQueryWithParallelReplicas( + QueryPlan & query_plan, + const StorageID & main_table, + const ASTPtr & table_func_ptr, + SelectStreamFactory & stream_factory, + const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info, + const ExpressionActionsPtr & sharding_key_expr, + const std::string & sharding_key_column_name, + const ClusterPtr & not_optimized_cluster) +{ + const Settings & settings = context->getSettingsRef(); + + ThrottlerPtr user_level_throttler; + if (auto * process_list_element = context->getProcessListElement()) + user_level_throttler = process_list_element->getUserNetworkThrottler(); + + /// Network bandwidth limit, if needed. + ThrottlerPtr throttler; + if (settings.max_network_bandwidth || settings.max_network_bytes) + { + throttler = std::make_shared( + settings.max_network_bandwidth, + settings.max_network_bytes, + "Limit for bytes to send or receive over network exceeded.", + user_level_throttler); + } + else + throttler = user_level_throttler; + + + std::vector plans; + size_t shards = query_info.getCluster()->getShardCount(); + + for (const auto & shard_info : query_info.getCluster()->getShardsInfo()) + { + ASTPtr query_ast_for_shard; + if (query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1) + { + query_ast_for_shard = query_ast->clone(); + + OptimizeShardingKeyRewriteInVisitor::Data visitor_data{ + sharding_key_expr, + sharding_key_expr->getSampleBlock().getByPosition(0).type, + sharding_key_column_name, + shard_info, + not_optimized_cluster->getSlotToShard(), + }; + OptimizeShardingKeyRewriteInVisitor visitor(visitor_data); + visitor.visit(query_ast_for_shard); + } + else + query_ast_for_shard = query_ast; + + auto shard_plans = stream_factory.createForShardWithParallelReplicas(shard_info, + query_ast_for_shard, main_table, table_func_ptr, throttler, context, shards); + + if (!shard_plans.local_plan && !shard_plans.remote_plan) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from shard. This is a bug"); + + if (shard_plans.local_plan) + plans.emplace_back(std::move(shard_plans.local_plan)); + + if (shard_plans.remote_plan) + plans.emplace_back(std::move(shard_plans.remote_plan)); + } + + if (plans.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from Distributed. This is a bug"); + + if (plans.size() == 1) + { + query_plan = std::move(*plans.front()); + return; + } + + DataStreams input_streams; + input_streams.reserve(plans.size()); + for (const auto & plan : plans) + input_streams.emplace_back(plan->getCurrentDataStream()); + + auto union_step = std::make_unique(std::move(input_streams)); + query_plan.unitePlans(std::move(union_step), std::move(plans)); +} + } } diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index d38bbe0fd5b..1a5035015a7 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -23,7 +23,7 @@ struct StorageID; namespace ClusterProxy { -class IStreamFactory; +class SelectStreamFactory; /// Update settings for Distributed query. /// @@ -46,7 +46,18 @@ void executeQuery( QueryProcessingStage::Enum processed_stage, const StorageID & main_table, const ASTPtr & table_func_ptr, - IStreamFactory & stream_factory, Poco::Logger * log, + SelectStreamFactory & stream_factory, Poco::Logger * log, + const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info, + const ExpressionActionsPtr & sharding_key_expr, + const std::string & sharding_key_column_name, + const ClusterPtr & not_optimized_cluster); + + +void executeQueryWithParallelReplicas( + QueryPlan & query_plan, + const StorageID & main_table, + const ASTPtr & table_func_ptr, + SelectStreamFactory & stream_factory, const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info, const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 2589df0986a..c5ca5748066 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -119,7 +119,11 @@ TemporaryTableHolder & TemporaryTableHolder::operator=(TemporaryTableHolder && r TemporaryTableHolder::~TemporaryTableHolder() { if (id != UUIDHelpers::Nil) + { + auto table = getTable(); + table->flushAndShutdown(); temporary_tables->dropTable(getContext(), "_tmp_" + toString(id)); + } } StorageID TemporaryTableHolder::getGlobalTableID() const diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 699f73abd67..00333503db1 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1999,7 +1999,6 @@ void ExpressionAnalysisResult::checkActions() const }; check_actions(prewhere_info->prewhere_actions); - check_actions(prewhere_info->alias_actions); } } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index e9a90da3032..94ac7c26183 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1637,15 +1637,6 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan( { auto & prewhere_info = *prewhere_info_ptr; - if (prewhere_info.alias_actions) - { - pipe.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, - std::make_shared(prewhere_info.alias_actions)); - }); - } - if (prewhere_info.row_level_filter) { pipe.addSimpleTransform([&](const Block & header) @@ -1711,12 +1702,11 @@ void InterpreterSelectQuery::setMergeTreeReadTaskCallbackAndClientInfo(MergeTree context->setMergeTreeReadTaskCallback(std::move(callback)); } -void InterpreterSelectQuery::setProperClientInfo() +void InterpreterSelectQuery::setProperClientInfo(size_t replica_num, size_t replica_count) { context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - assert(options.shard_count.has_value() && options.shard_num.has_value()); - context->getClientInfo().count_participating_replicas = *options.shard_count; - context->getClientInfo().number_of_current_replica = *options.shard_num; + context->getClientInfo().count_participating_replicas = replica_count; + context->getClientInfo().number_of_current_replica = replica_num; } bool InterpreterSelectQuery::shouldMoveToPrewhere() @@ -1885,19 +1875,6 @@ void InterpreterSelectQuery::addPrewhereAliasActions() for (const auto & name : required_columns) prewhere_info->prewhere_actions->tryRestoreColumn(name); - auto analyzed_result - = TreeRewriter(context).analyze(required_columns_from_prewhere_expr, metadata_snapshot->getColumns().getAllPhysical()); - prewhere_info->alias_actions - = ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, context).getActionsDAG(true, false); - - /// Add (physical?) columns required by alias actions. - auto required_columns_from_alias = prewhere_info->alias_actions->getRequiredColumns(); - Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns(); - for (auto & column : required_columns_from_alias) - if (!prewhere_actions_result.has(column.name)) - if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column.name)) - required_columns.push_back(column.name); - /// Add physical columns required by prewhere actions. for (const auto & column : required_columns_from_prewhere) if (!required_aliases_from_prewhere.contains(column)) diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index efaa55b7b21..40afaaaeed0 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -126,7 +126,7 @@ public: void setMergeTreeReadTaskCallbackAndClientInfo(MergeTreeReadTaskCallback && callback); /// It will set shard_num and shard_count to the client_info - void setProperClientInfo(); + void setProperClientInfo(size_t replica_num, size_t replica_count); private: InterpreterSelectQuery( diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index fac70e862d4..52b7de8bcab 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -1213,7 +1213,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect( all_source_columns_set.insert(name); } - normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings, /* allow_self_aliases = */ true); + normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings, /* allow_self_aliases = */ true, getContext()); /// Remove unneeded columns according to 'required_result_columns'. /// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside. @@ -1309,7 +1309,7 @@ TreeRewriterResultPtr TreeRewriter::analyze( TreeRewriterResult result(source_columns, storage, storage_snapshot, false); - normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases); + normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases, getContext()); /// Executing scalar subqueries. Column defaults could be a scalar subquery. executeScalarSubqueries(query, getContext(), 0, result.scalars, result.local_scalars, !execute_scalar_subqueries); @@ -1338,7 +1338,7 @@ TreeRewriterResultPtr TreeRewriter::analyze( } void TreeRewriter::normalize( - ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases) + ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases, ContextPtr context_) { UserDefinedSQLFunctionVisitor::Data data_user_defined_functions_visitor; UserDefinedSQLFunctionVisitor(data_user_defined_functions_visitor).visit(query); @@ -1400,7 +1400,10 @@ void TreeRewriter::normalize( MarkTableIdentifiersVisitor(identifiers_data).visit(query); /// Rewrite function names to their canonical ones. - if (settings.normalize_function_names) + /// Notice: function name normalization is disabled when it's a secondary query, because queries are either + /// already normalized on initiator node, or not normalized and should remain unnormalized for + /// compatibility. + if (context_->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY && settings.normalize_function_names) FunctionNameNormalizer().visit(query.get()); /// Common subexpression elimination. Rewrite rules. diff --git a/src/Interpreters/TreeRewriter.h b/src/Interpreters/TreeRewriter.h index 7fbe4e45fb3..2c246455ade 100644 --- a/src/Interpreters/TreeRewriter.h +++ b/src/Interpreters/TreeRewriter.h @@ -129,7 +129,7 @@ public: std::shared_ptr table_join = {}) const; private: - static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases); + static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases, ContextPtr context_); }; } diff --git a/src/Interpreters/evaluateConstantExpression.cpp b/src/Interpreters/evaluateConstantExpression.cpp index f5ad0337629..039d79ed445 100644 --- a/src/Interpreters/evaluateConstantExpression.cpp +++ b/src/Interpreters/evaluateConstantExpression.cpp @@ -31,11 +31,21 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +static std::pair> getFieldAndDataTypeFromLiteral(ASTLiteral * literal) +{ + auto type = applyVisitor(FieldToDataType(), literal->value); + /// In case of Array field nested fields can have different types. + /// Example: Array [1, 2.3] will have 2 fields with types UInt64 and Float64 + /// when result type is Array(Float64). + /// So, we need to convert this field to the result type. + Field res = convertFieldToType(literal->value, *type); + return {res, type}; +} std::pair> evaluateConstantExpression(const ASTPtr & node, ContextPtr context) { if (ASTLiteral * literal = node->as()) - return std::make_pair(literal->value, applyVisitor(FieldToDataType(), literal->value)); + return getFieldAndDataTypeFromLiteral(literal); NamesAndTypesList source_columns = {{ "_dummy", std::make_shared() }}; @@ -54,7 +64,10 @@ std::pair> evaluateConstantExpression(co ReplaceQueryParameterVisitor param_visitor(context->getQueryParameters()); param_visitor.visit(ast); - if (context->getSettingsRef().normalize_function_names) + /// Notice: function name normalization is disabled when it's a secondary query, because queries are either + /// already normalized on initiator node, or not normalized and should remain unnormalized for + /// compatibility. + if (context->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY && context->getSettingsRef().normalize_function_names) FunctionNameNormalizer().visit(ast.get()); String name = ast->getColumnName(); @@ -63,7 +76,7 @@ std::pair> evaluateConstantExpression(co /// AST potentially could be transformed to literal during TreeRewriter analyze. /// For example if we have SQL user defined function that return literal AS subquery. if (ASTLiteral * literal = ast->as()) - return std::make_pair(literal->value, applyVisitor(FieldToDataType(), literal->value)); + return getFieldAndDataTypeFromLiteral(literal); ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(ast, syntax_result, context).getConstActions(); diff --git a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp index 854a677afb9..f91c8020509 100644 --- a/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp +++ b/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp @@ -41,6 +41,8 @@ std::unique_ptr createLocalPlan( QueryProcessingStage::Enum processed_stage, UInt32 shard_num, UInt32 shard_count, + size_t replica_num, + size_t replica_count, std::shared_ptr coordinator) { checkStackSize(); @@ -56,7 +58,7 @@ std::unique_ptr createLocalPlan( .setShardInfo(shard_num, shard_count) .ignoreASTOptimizations()); - interpreter.setProperClientInfo(); + interpreter.setProperClientInfo(replica_num, replica_count); if (coordinator) { interpreter.setMergeTreeReadTaskCallbackAndClientInfo([coordinator](PartitionReadRequest request) -> std::optional diff --git a/src/Processors/QueryPlan/DistributedCreateLocalPlan.h b/src/Processors/QueryPlan/DistributedCreateLocalPlan.h index fdfe1709833..b55cedf9871 100644 --- a/src/Processors/QueryPlan/DistributedCreateLocalPlan.h +++ b/src/Processors/QueryPlan/DistributedCreateLocalPlan.h @@ -15,6 +15,8 @@ std::unique_ptr createLocalPlan( QueryProcessingStage::Enum processed_stage, UInt32 shard_num, UInt32 shard_count, + size_t replica_num, + size_t replica_count, std::shared_ptr coordinator); } diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 67cc2472deb..d1454c8942a 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -17,6 +17,8 @@ #include #include +#include + namespace DB { @@ -63,7 +65,7 @@ static String formattedAST(const ASTPtr & ast) } ReadFromRemote::ReadFromRemote( - ClusterProxy::IStreamFactory::Shards shards_, + ClusterProxy::SelectStreamFactory::Shards shards_, Block header_, QueryProcessingStage::Enum stage_, StorageID main_table_, @@ -90,10 +92,7 @@ ReadFromRemote::ReadFromRemote( { } -void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard, - std::shared_ptr coordinator, - std::shared_ptr pool, - std::optional replica_info) +void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard) { bool add_agg_info = stage == QueryProcessingStage::WithMergeableState; bool add_totals = false; @@ -106,10 +105,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto } auto lazily_create_stream = [ - replica_info = replica_info, - pool = pool ? pool : shard.pool, - coordinator = coordinator, - shard_num = shard.shard_num, shard_count = shard_count, query = shard.query, header = shard.header, + shard = shard, shard_count = shard_count, query = shard.query, header = shard.header, context = context, throttler = throttler, main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables, @@ -125,15 +121,15 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto try { if (table_func_ptr) - try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY); + try_results = shard.shard_info.pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY); else - try_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table.getQualifiedName()); + try_results = shard.shard_info.pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table.getQualifiedName()); } catch (const Exception & ex) { if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED) LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"), - "Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num); + "Connections to remote replicas of local shard {} failed, will use stale local replica", shard.shard_info.shard_num); else throw; } @@ -147,7 +143,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto if (try_results.empty() || local_delay < max_remote_delay) { - auto plan = createLocalPlan(query, header, context, stage, shard_num, shard_count, coordinator); + auto plan = createLocalPlan(query, header, context, stage, shard.shard_info.shard_num, shard_count, 0, 0, /*coordinator=*/nullptr); return std::move(*plan->buildQueryPipeline( QueryPlanOptimizationSettings::fromContext(context), @@ -163,10 +159,9 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto String query_string = formattedAST(query); scalars["_shard_num"] - = Block{{DataTypeUInt32().createColumnConst(1, shard_num), std::make_shared(), "_shard_num"}}; + = Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; auto remote_query_executor = std::make_shared( - pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage, - RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = replica_info}); + shard.shard_info.pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage); auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read); QueryPipelineBuilder builder; @@ -179,10 +174,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto addConvertingActions(pipes.back(), output_stream->header); } -void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard, - std::shared_ptr coordinator, - std::shared_ptr pool, - std::optional replica_info) +void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard) { bool add_agg_info = stage == QueryProcessingStage::WithMergeableState; bool add_totals = false; @@ -197,20 +189,15 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory:: String query_string = formattedAST(shard.query); scalars["_shard_num"] - = Block{{DataTypeUInt32().createColumnConst(1, shard.shard_num), std::make_shared(), "_shard_num"}}; + = Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; std::shared_ptr remote_query_executor; remote_query_executor = std::make_shared( - pool ? pool : shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage, - RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = std::move(replica_info)}); + shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage); remote_query_executor->setLogger(log); - - /// In case of parallel reading from replicas we have a connection pool per replica. - /// Setting PoolMode will make no sense. - if (!pool) - remote_query_executor->setPoolMode(PoolMode::GET_MANY); + remote_query_executor->setPoolMode(PoolMode::GET_MANY); if (!table_func_ptr) remote_query_executor->setMainTable(main_table); @@ -223,48 +210,80 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B { Pipes pipes; - const auto & settings = context->getSettingsRef(); - const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas; - - /// We have to create a pipe for each replica - /// FIXME: The second condition is only for tests to work, because hedged connections enabled by default. - if (settings.max_parallel_replicas > 1 && !enable_sample_offset_parallel_processing && !context->getSettingsRef().use_hedged_requests) + for (const auto & shard : shards) { - const Settings & current_settings = context->getSettingsRef(); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - - for (const auto & shard : shards) - { - auto coordinator = std::make_shared(); - - for (size_t replica_num = 0; replica_num < shard.num_replicas; ++replica_num) - { - IConnections::ReplicaInfo replica_info - { - .all_replicas_count = shard.num_replicas, - .number_of_current_replica = replica_num - }; - - auto pool = shard.per_replica_pools[replica_num]; - auto pool_with_failover = std::make_shared( - ConnectionPoolPtrs{pool}, current_settings.load_balancing); - - if (shard.lazy) - addLazyPipe(pipes, shard, coordinator, pool_with_failover, replica_info); - else - addPipe(pipes, shard, coordinator, pool_with_failover, replica_info); - } - } + if (shard.lazy) + addLazyPipe(pipes, shard); + else + addPipe(pipes, shard); } - else + + auto pipe = Pipe::unitePipes(std::move(pipes)); + pipeline.init(std::move(pipe)); +} + + +ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( + ParallelReplicasReadingCoordinatorPtr coordinator_, + ClusterProxy::SelectStreamFactory::Shard shard_, + Block header_, + QueryProcessingStage::Enum stage_, + StorageID main_table_, + ASTPtr table_func_ptr_, + ContextPtr context_, + ThrottlerPtr throttler_, + Scalars scalars_, + Tables external_tables_, + Poco::Logger * log_, + UInt32 shard_count_) + : ISourceStep(DataStream{.header = std::move(header_)}) + , coordinator(std::move(coordinator_)) + , shard(std::move(shard_)) + , stage(std::move(stage_)) + , main_table(std::move(main_table_)) + , table_func_ptr(table_func_ptr_) + , context(context_) + , throttler(throttler_) + , scalars(scalars_) + , external_tables{external_tables_} + , log(log_) + , shard_count(shard_count_) +{ + std::vector description; + + for (const auto & address : shard.shard_info.all_addresses) + if (!address.is_local) + description.push_back(fmt::format("Replica: {}", address.host_name)); + + setStepDescription(boost::algorithm::join(description, ", ")); +} + + +void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + Pipes pipes; + + const Settings & current_settings = context->getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + + for (size_t replica_num = 0; replica_num < shard.shard_info.getAllNodeCount(); ++replica_num) { - for (const auto & shard : shards) + if (shard.shard_info.all_addresses[replica_num].is_local) + continue; + + IConnections::ReplicaInfo replica_info { - if (shard.lazy) - addLazyPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt); - else - addPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt); - } + .all_replicas_count = shard.shard_info.getAllNodeCount(), + .number_of_current_replica = replica_num + }; + + auto pool = shard.shard_info.per_replica_pools[replica_num]; + assert(pool); + + auto pool_with_failover = std::make_shared( + ConnectionPoolPtrs{pool}, current_settings.load_balancing); + + addPipeForSingeReplica(pipes, pool_with_failover, replica_info); } auto pipe = Pipe::unitePipes(std::move(pipes)); @@ -273,6 +292,41 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B processor->setStorageLimits(storage_limits); pipeline.init(std::move(pipe)); + +} + + +void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, std::shared_ptr pool, IConnections::ReplicaInfo replica_info) +{ + bool add_agg_info = stage == QueryProcessingStage::WithMergeableState; + bool add_totals = false; + bool add_extremes = false; + bool async_read = context->getSettingsRef().async_socket_for_remote; + if (stage == QueryProcessingStage::Complete) + { + add_totals = shard.query->as().group_by_with_totals; + add_extremes = context->getSettingsRef().extremes; + } + + String query_string = formattedAST(shard.query); + + scalars["_shard_num"] + = Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared(), "_shard_num"}}; + + std::shared_ptr remote_query_executor; + + remote_query_executor = std::make_shared( + pool, query_string, shard.header, context, throttler, scalars, external_tables, stage, + RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)}); + + remote_query_executor->setLogger(log); + + if (!table_func_ptr) + remote_query_executor->setMainTable(main_table); + + pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read)); + pipes.back().addInterpreterContext(context); + addConvertingActions(pipes.back(), output_stream->header); } } diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index bdee9039e65..eb416710df6 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include namespace DB @@ -22,7 +22,7 @@ class ReadFromRemote final : public ISourceStep { public: ReadFromRemote( - ClusterProxy::IStreamFactory::Shards shards_, + ClusterProxy::SelectStreamFactory::Shards shards_, Block header_, QueryProcessingStage::Enum stage_, StorageID main_table_, @@ -46,7 +46,7 @@ private: PerShard }; - ClusterProxy::IStreamFactory::Shards shards; + ClusterProxy::SelectStreamFactory::Shards shards; QueryProcessingStage::Enum stage; StorageID main_table; @@ -63,16 +63,52 @@ private: Poco::Logger * log; UInt32 shard_count; - void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard, - std::shared_ptr coordinator, - std::shared_ptr pool, - std::optional replica_info); - void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard, - std::shared_ptr coordinator, - std::shared_ptr pool, - std::optional replica_info); + void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); + void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard); +}; - void addPipeForReplica(); + +class ReadFromParallelRemoteReplicasStep : public ISourceStep +{ +public: + ReadFromParallelRemoteReplicasStep( + ParallelReplicasReadingCoordinatorPtr coordinator_, + ClusterProxy::SelectStreamFactory::Shard shard, + Block header_, + QueryProcessingStage::Enum stage_, + StorageID main_table_, + ASTPtr table_func_ptr_, + ContextPtr context_, + ThrottlerPtr throttler_, + Scalars scalars_, + Tables external_tables_, + Poco::Logger * log_, + UInt32 shard_count_); + + String getName() const override { return "ReadFromRemoteParallelReplicas"; } + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + +private: + + void addPipeForSingeReplica(Pipes & pipes, std::shared_ptr pool, IConnections::ReplicaInfo replica_info); + + ParallelReplicasReadingCoordinatorPtr coordinator; + ClusterProxy::SelectStreamFactory::Shard shard; + QueryProcessingStage::Enum stage; + + StorageID main_table; + ASTPtr table_func_ptr; + + ContextPtr context; + + ThrottlerPtr throttler; + Scalars scalars; + Tables external_tables; + + Poco::Logger * log; + + UInt32 shard_count{0}; }; } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index f056842926d..6171971cb85 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1359,14 +1359,6 @@ void TCPHandler::receiveQuery() /// so we have to apply the changes first. query_context->setCurrentQueryId(state.query_id); - /// Disable function name normalization when it's a secondary query, because queries are either - /// already normalized on initiator node, or not normalized and should remain unnormalized for - /// compatibility. - if (query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) - { - query_context->setSetting("normalize_function_names", false); - } - /// For testing hedged requests if (unlikely(sleep_after_receiving_query.totalMilliseconds())) { diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index c381fe30efa..2e4c608dc9d 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -419,7 +419,9 @@ public: void onException() override { - write_buf->finalize(); + if (!writer) + return; + onFinish(); } void onFinish() override @@ -433,6 +435,7 @@ public: } catch (...) { + /// Stop ParallelFormattingOutputFormat correctly. writer.reset(); throw; } diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index f056eb067c2..43b67657a87 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -261,11 +261,6 @@ std::string PrewhereInfo::dump() const WriteBufferFromOwnString ss; ss << "PrewhereDagInfo\n"; - if (alias_actions) - { - ss << "alias_actions " << alias_actions->dumpDAG() << "\n"; - } - if (prewhere_actions) { ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n"; diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index ff43473f08e..ca5e7393666 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -72,8 +72,6 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( if (prewhere_info) { prewhere_actions = std::make_unique(); - if (prewhere_info->alias_actions) - prewhere_actions->alias_actions = std::make_shared(prewhere_info->alias_actions, actions_settings); if (prewhere_info->row_level_filter) prewhere_actions->row_level_filter = std::make_shared(prewhere_info->row_level_filter, actions_settings); @@ -556,9 +554,6 @@ Block MergeTreeBaseSelectProcessor::transformHeader( { if (prewhere_info) { - if (prewhere_info->alias_actions) - block = prewhere_info->alias_actions->updateHeader(std::move(block)); - if (prewhere_info->row_level_filter) { block = prewhere_info->row_level_filter->updateHeader(std::move(block)); diff --git a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 5cc22503348..f74823eaec2 100644 --- a/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -281,21 +281,16 @@ MergeTreeReadTaskColumns getReadTaskColumns( if (prewhere_info) { - if (prewhere_info->alias_actions) - pre_column_names = prewhere_info->alias_actions->getRequiredColumnsNames(); - else + pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames(); + + if (prewhere_info->row_level_filter) { - pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames(); + NameSet names(pre_column_names.begin(), pre_column_names.end()); - if (prewhere_info->row_level_filter) + for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames()) { - NameSet names(pre_column_names.begin(), pre_column_names.end()); - - for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames()) - { - if (!names.contains(name)) - pre_column_names.push_back(name); - } + if (!names.contains(name)) + pre_column_names.push_back(name); } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c4c99e66873..0d959ec4821 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5439,17 +5439,6 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg candidate.prewhere_info->row_level_filter = row_level_filter_actions; } - if (candidate.prewhere_info->alias_actions) - { - auto alias_actions = candidate.prewhere_info->alias_actions->clone(); - // alias_action should not add missing keys. - auto new_prewhere_required_columns - = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false); - if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty()) - return false; - prewhere_required_columns = std::move(new_prewhere_required_columns); - candidate.prewhere_info->alias_actions = alias_actions; - } required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); } @@ -5619,8 +5608,6 @@ std::optional MergeTreeData::getQueryProcessingStageWithAgg if (minmax_count_projection_candidate->prewhere_info) { const auto & prewhere_info = minmax_count_projection_candidate->prewhere_info; - if (prewhere_info->alias_actions) - ExpressionActions(prewhere_info->alias_actions, actions_settings).execute(query_info.minmax_count_projection_block); if (prewhere_info->row_level_filter) { diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/src/Storages/MergeTree/MergeTreeRangeReader.cpp index c0a6ed9a962..50bcdfaf0f6 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -593,9 +593,6 @@ MergeTreeRangeReader::MergeTreeRangeReader( if (prewhere_info) { - if (prewhere_info->alias_actions) - prewhere_info->alias_actions->execute(sample_block, true); - if (prewhere_info->row_level_filter) { prewhere_info->row_level_filter->execute(sample_block, true); @@ -1058,9 +1055,6 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r ++pos; } - if (prewhere_info->alias_actions) - prewhere_info->alias_actions->execute(block); - /// Columns might be projected out. We need to store them here so that default columns can be evaluated later. result.block_before_prewhere = block; diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index ed5cc16add8..21ed35e6a78 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -21,8 +21,6 @@ using ExpressionActionsPtr = std::shared_ptr; /// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG struct PrewhereExprInfo { - /// Actions which are executed in order to alias columns are used for prewhere actions. - ExpressionActionsPtr alias_actions; /// Actions for row level security filter. Applied separately before prewhere_actions. /// This actions are separate because prewhere condition should not be executed over filtered rows. ExpressionActionsPtr row_level_filter; diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h index af74e0fae49..bd2082be6c2 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h @@ -17,4 +17,6 @@ private: std::unique_ptr pimpl; }; +using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr; + } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index fbdb1dabd88..9f679f121b8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -2183,6 +2183,29 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart( } +bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const +{ + std::lock_guard lock(queue.state_mutex); + for (const auto & entry : queue.queue) + { + if (entry->type != ReplicatedMergeTreeLogEntry::REPLACE_RANGE) + continue; + + for (const auto & part_name : entry->replace_range_entry->new_part_names) + { + if (part->info.isDisjoint(MergeTreePartInfo::fromPartName(part_name, queue.format_version))) + continue; + + if (out_reason) + *out_reason = fmt::format("Part {} participates in REPLACE_RANGE {} ({})", part_name, entry->new_part_name, entry->znode_name); + + return true; + } + } + return false; +} + + std::optional> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const { /// Assigning mutations is easier than assigning merges because mutations appear in the same order as diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 0c0e872b0ac..dea4d0573db 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -501,6 +501,10 @@ public: /// This predicate is checked for the first part of each range. bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const; + /// Returns true if part is needed for some REPLACE_RANGE entry. + /// We should not drop part in this case, because replication queue may stuck without that part. + bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const; + /// Return nonempty optional of desired mutation version and alter version. /// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible /// mutation version (and -1 as alter version). In other case, we return biggest mutation version with diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 6bd2c1922f5..bdb4c392c48 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -49,8 +49,6 @@ using SubqueriesForSets = std::unordered_map; struct PrewhereInfo { - /// Actions which are executed in order to alias columns are used for prewhere actions. - ActionsDAGPtr alias_actions; /// Actions for row level security filter. Applied separately before prewhere_actions. /// This actions are separate because prewhere condition should not be executed over filtered rows. ActionsDAGPtr row_level_filter; diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index bb7f58048b1..4c962f36e4f 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -353,15 +353,6 @@ void StorageBuffer::read( if (query_info.prewhere_info) { auto actions_settings = ExpressionActionsSettings::fromContext(local_context); - if (query_info.prewhere_info->alias_actions) - { - pipe_from_buffers.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, - std::make_shared(query_info.prewhere_info->alias_actions, actions_settings)); - }); - } if (query_info.prewhere_info->row_level_filter) { diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 054eac9d2b6..1c785df9be4 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -691,13 +691,25 @@ void StorageDistributed::read( storage_snapshot, processed_stage); - ClusterProxy::executeQuery( - query_plan, header, processed_stage, - main_table, remote_table_function_ptr, - select_stream_factory, log, modified_query_ast, - local_context, query_info, - sharding_key_expr, sharding_key_column_name, - query_info.cluster); + + auto settings = local_context->getSettingsRef(); + bool parallel_replicas = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas && !settings.use_hedged_requests; + + if (parallel_replicas) + ClusterProxy::executeQueryWithParallelReplicas( + query_plan, main_table, remote_table_function_ptr, + select_stream_factory, modified_query_ast, + local_context, query_info, + sharding_key_expr, sharding_key_column_name, + query_info.cluster); + else + ClusterProxy::executeQuery( + query_plan, header, processed_stage, + main_table, remote_table_function_ptr, + select_stream_factory, log, modified_query_ast, + local_context, query_info, + sharding_key_expr, sharding_key_column_name, + query_info.cluster); /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. if (!query_plan.isInitialized()) @@ -1504,4 +1516,3 @@ void registerStorageDistributed(StorageFactory & factory) } } - diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index d3cbfc380e6..d466096c8ba 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -813,7 +813,9 @@ public: void onException() override { - write_buf->finalize(); + if (!writer) + return; + onFinish(); } void onFinish() override diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c744f880fd7..f10838c8148 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7111,6 +7111,13 @@ bool StorageReplicatedMergeTree::dropPartImpl( return false; } + if (merge_pred.partParticipatesInReplaceRange(part, &out_reason)) + { + if (throw_if_noop) + throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, out_reason); + return false; + } + if (partIsLastQuorumPart(part->info)) { if (throw_if_noop) diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 0ab3333c7f1..f524a405c9b 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -603,7 +603,9 @@ public: void onException() override { - write_buf->finalize(); + if (!writer) + return; + onFinish(); } void onFinish() override diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 6fb6204cee8..cd55c32fb9c 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -445,14 +445,25 @@ void StorageURLSink::consume(Chunk chunk) void StorageURLSink::onException() { - write_buf->finalize(); + if (!writer) + return; + onFinish(); } void StorageURLSink::onFinish() { - writer->finalize(); - writer->flush(); - write_buf->finalize(); + try + { + writer->finalize(); + writer->flush(); + write_buf->finalize(); + } + catch (...) + { + /// Stop ParallelFormattingOutputFormat correctly. + writer.reset(); + throw; + } } class PartitionedStorageURLSink : public PartitionedSink diff --git a/src/Storages/System/StorageSystemZooKeeper.cpp b/src/Storages/System/StorageSystemZooKeeper.cpp index 4ba6c00ad9d..1212d9da60a 100644 --- a/src/Storages/System/StorageSystemZooKeeper.cpp +++ b/src/Storages/System/StorageSystemZooKeeper.cpp @@ -16,8 +16,12 @@ #include #include #include +#include +#include +#include #include #include +#include namespace DB @@ -28,6 +32,157 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +/** ZkNodeCache is a trie tree to cache all the zookeeper writes. The purpose of this struct is to avoid creating/setting nodes + * repeatedly. For example, If we create path /a/b/c/d/e and path /a/b/d/f in the same transaction. We don't want to create + * their common path "/a/b" twice. This data structure will cache this changes and generates the eventual requests within one pass. + */ +struct ZkNodeCache +{ + using ZkNodeCachePtr = std::shared_ptr; + + std::unordered_map children; + String value; + String path; + bool exists; + bool changed; + + ZkNodeCache() : exists(true), changed(false) { } + ZkNodeCache(String path_, bool exists_) : path(path_), exists(exists_), changed(false) { } + + void insert(const std::vector & nodes, zkutil::ZooKeeperPtr zookeeper, const String & value_to_set, size_t index) + { + /// If this node has an empty name, just skip it. + /// Possibly a "/a//b///c//d/" will cause empty node. + while (index < nodes.size() && nodes[index].empty()) + ++index; + + if (index == nodes.size()) + { + value = value_to_set; + changed = true; + return; + } + const String & child_name = nodes[index]; + ++index; + if (!children.contains(child_name)) + { + String sub_path = path + "/" + child_name; + bool child_exist = false; + if (exists) + { + /// If this node doesn't exists, neither will its child. + child_exist = zookeeper->exists(sub_path); + } + children[child_name] = std::make_shared(sub_path, child_exist); + } + children[child_name]->insert(nodes, zookeeper, value_to_set, index); + } + + void generateRequests(Coordination::Requests & requests) + { + /** If the node doesn't exists, we should generate create request. + * If the node exists, we should generate set request. + * This dfs will prove ancestor nodes are processed first. + */ + if (!exists) + { + auto request = zkutil::makeCreateRequest(path, value, zkutil::CreateMode::Persistent); + requests.push_back(request); + } + else if (changed) + { + auto request = zkutil::makeSetRequest(path, value, -1); + requests.push_back(request); + } + for (auto [_, child] : children) + child->generateRequests(requests); + } +}; + +class ZooKeeperSink : public SinkToStorage +{ + zkutil::ZooKeeperPtr zookeeper; + + ZkNodeCache cache; + +public: + ZooKeeperSink(const Block & header, ContextPtr context) : SinkToStorage(header), zookeeper(context->getZooKeeper()) { } + String getName() const override { return "ZooKeeperSink"; } + + void consume(Chunk chunk) override + { + auto block = getHeader().cloneWithColumns(chunk.getColumns()); + size_t rows = block.rows(); + for (size_t i = 0; i < rows; i++) + { + String name = block.getByPosition(0).column->getDataAt(i).toString(); + String value = block.getByPosition(1).column->getDataAt(i).toString(); + String path = block.getByPosition(2).column->getDataAt(i).toString(); + + /// We don't expect a "name" contains a path. + if (name.find('/') != std::string::npos) + { + throw Exception("Column `name` should not contain '/'", ErrorCodes::BAD_ARGUMENTS); + } + + if (name.empty()) + { + throw Exception("Column `name` should not be empty", ErrorCodes::BAD_ARGUMENTS); + } + + if (path.empty()) + { + throw Exception("Column `path` should not be empty", ErrorCodes::BAD_ARGUMENTS); + } + + if (path.size() + name.size() > PATH_MAX) + { + throw Exception("Sum of `name` length and `path` length should not exceed PATH_MAX", ErrorCodes::BAD_ARGUMENTS); + } + + std::vector path_vec; + boost::split(path_vec, path, boost::is_any_of("/")); + path_vec.push_back(name); + cache.insert(path_vec, zookeeper, value, 0); + } + } + + void onFinish() override + { + Coordination::Requests requests; + cache.generateRequests(requests); + zookeeper->multi(requests); + } +}; + +StorageSystemZooKeeper::StorageSystemZooKeeper(const StorageID & table_id_) + : IStorageSystemOneBlock(table_id_) +{ + StorageInMemoryMetadata storage_metadata; + ColumnsDescription desc; + auto columns = getNamesAndTypes(); + for (const auto & col : columns) + { + ColumnDescription col_desc(col.name, col.type); + /// We only allow column `name`, `path`, `value` to insert. + if (col.name != "name" && col.name != "path" && col.name != "value") + col_desc.default_desc.kind = ColumnDefaultKind::Materialized; + desc.add(col_desc); + } + storage_metadata.setColumns(desc); + setInMemoryMetadata(storage_metadata); +} + +SinkToStoragePtr StorageSystemZooKeeper::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context) +{ + if (!context->getConfigRef().getBool("allow_zookeeper_write", false)) + throw Exception("Prohibit writing to system.zookeeper, unless config `allow_zookeeper_write` as true", ErrorCodes::BAD_ARGUMENTS); + Block write_header; + write_header.insert(ColumnWithTypeAndName(std::make_shared(), "name")); + write_header.insert(ColumnWithTypeAndName(std::make_shared(), "value")); + write_header.insert(ColumnWithTypeAndName(std::make_shared(), "path")); + return std::make_shared(write_header, context); +} NamesAndTypesList StorageSystemZooKeeper::getNamesAndTypes() { diff --git a/src/Storages/System/StorageSystemZooKeeper.h b/src/Storages/System/StorageSystemZooKeeper.h index 32ca767ac24..20ad29af481 100644 --- a/src/Storages/System/StorageSystemZooKeeper.h +++ b/src/Storages/System/StorageSystemZooKeeper.h @@ -14,10 +14,14 @@ class Context; class StorageSystemZooKeeper final : public IStorageSystemOneBlock { public: + explicit StorageSystemZooKeeper(const StorageID & table_id_); + std::string getName() const override { return "SystemZooKeeper"; } static NamesAndTypesList getNamesAndTypes(); + SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override; + protected: using IStorageSystemOneBlock::IStorageSystemOneBlock; diff --git a/tests/config/config.d/zookeeper_write.xml b/tests/config/config.d/zookeeper_write.xml new file mode 100644 index 00000000000..ce484261aba --- /dev/null +++ b/tests/config/config.d/zookeeper_write.xml @@ -0,0 +1,3 @@ + + true + diff --git a/tests/config/install.sh b/tests/config/install.sh index fb015b4f931..e4a4fdacdd0 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -16,6 +16,7 @@ mkdir -p $DEST_SERVER_PATH/users.d/ mkdir -p $DEST_CLIENT_PATH ln -sf $SRC_PATH/config.d/zookeeper.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/zookeeper_write.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/listen.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/text_log.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/custom_settings_prefixes.xml $DEST_SERVER_PATH/config.d/ diff --git a/tests/integration/ci-runner.py b/tests/integration/ci-runner.py index 9ced81d73f0..e4bd1be9027 100755 --- a/tests/integration/ci-runner.py +++ b/tests/integration/ci-runner.py @@ -117,6 +117,7 @@ def get_counters(fname): # Lines like: # [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client + # [gw3] [ 40%] PASSED test_replicated_users/test.py::test_rename_replicated[QUOTA] state = line_arr[-2] test_name = line_arr[-1] @@ -941,6 +942,16 @@ class ClickhouseIntegrationTestsRunner: if "(memory)" in self.params["context_name"]: result_state = "success" + for res in test_result: + # It's not easy to parse output of pytest + # Especially when test names may contain spaces + # Do not allow it to avoid obscure failures + if " " not in res[0]: + continue + logging.warning("Found invalid test name with space: %s", res[0]) + status_text = "Found test with invalid name, see main log" + result_state = "failure" + return result_state, status_text, test_result, [] diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 6716157082c..f8ad9213e5b 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -16,12 +16,6 @@ import traceback import urllib.parse import shlex import urllib3 -from cassandra.policies import RoundRobinPolicy -import cassandra.cluster -import psycopg2 -import pymongo -import meilisearch -import pymysql import requests try: @@ -34,6 +28,7 @@ try: from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import pymongo import pymysql + import meilisearch from confluent_kafka.avro.cached_schema_registry_client import ( CachedSchemaRegistryClient, ) diff --git a/tests/integration/test_mysql_database_engine/test.py b/tests/integration/test_mysql_database_engine/test.py index 3e0a1a549d1..8626980a768 100644 --- a/tests/integration/test_mysql_database_engine/test.py +++ b/tests/integration/test_mysql_database_engine/test.py @@ -930,6 +930,12 @@ def test_predefined_connection_configuration(started_cluster): == "100" ) + result = clickhouse_node.query("show create table test_database.test_table") + assert ( + result.strip() + == "CREATE TABLE test_database.test_table\\n(\\n `id` Int32\\n)\\nENGINE = MySQL(mysql1, table = \\'test_table\\')" + ) + clickhouse_node.query("DROP DATABASE test_database") clickhouse_node.query_and_get_error( "CREATE DATABASE test_database ENGINE = MySQL(mysql2)" diff --git a/tests/integration/test_replicated_users/test.py b/tests/integration/test_replicated_users/test.py index add45d262e6..56383f0d2df 100644 --- a/tests/integration/test_replicated_users/test.py +++ b/tests/integration/test_replicated_users/test.py @@ -41,7 +41,7 @@ entities = [ def get_entity_id(entity): - return entity.keyword + return entity.keyword.replace(" ", "_") @pytest.mark.parametrize("entity", entities, ids=get_entity_id) diff --git a/tests/queries/0_stateless/00814_replicated_minimalistic_part_header_zookeeper.sql b/tests/queries/0_stateless/00814_replicated_minimalistic_part_header_zookeeper.sh old mode 100644 new mode 100755 similarity index 64% rename from tests/queries/0_stateless/00814_replicated_minimalistic_part_header_zookeeper.sql rename to tests/queries/0_stateless/00814_replicated_minimalistic_part_header_zookeeper.sh index 5d03823dde3..6f609065c01 --- a/tests/queries/0_stateless/00814_replicated_minimalistic_part_header_zookeeper.sql +++ b/tests/queries/0_stateless/00814_replicated_minimalistic_part_header_zookeeper.sh @@ -1,4 +1,13 @@ --- Tags: replica +#!/usr/bin/env bash +# Tags: replica + +set -e + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -nm -q " DROP TABLE IF EXISTS part_header_r1; DROP TABLE IF EXISTS part_header_r2; @@ -6,13 +15,13 @@ DROP TABLE IF EXISTS part_header_r2; SET replication_alter_partitions_sync = 2; CREATE TABLE part_header_r1(x UInt32, y UInt32) - ENGINE ReplicatedMergeTree('/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/{shard}', '1{replica}') ORDER BY x + ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/{shard}', '1{replica}') ORDER BY x SETTINGS use_minimalistic_part_header_in_zookeeper = 0, old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 0; CREATE TABLE part_header_r2(x UInt32, y UInt32) - ENGINE ReplicatedMergeTree('/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/{shard}', '2{replica}') ORDER BY x + ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/{shard}', '2{replica}') ORDER BY x SETTINGS use_minimalistic_part_header_in_zookeeper = 1, old_parts_lifetime = 1, cleanup_delay_period = 0, @@ -36,15 +45,26 @@ SELECT _part, x FROM part_header_r1 ORDER BY x; SELECT '*** replica 2 ***'; SELECT _part, x FROM part_header_r2 ORDER BY x; -SELECT sleep(3) FORMAT Null; +" + +elapsed=1 +until [ $elapsed -eq 5 ]; +do + sleep $(( elapsed++ )) + count1=$($CLICKHOUSE_CLIENT --query="SELECT count(name) FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/1r1/parts'") + count2=$($CLICKHOUSE_CLIENT --query="SELECT count(name) FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/2r1/parts'") + [[ $count1 == 1 && $count2 == 1 ]] && break +done + +$CLICKHOUSE_CLIENT -nm -q " SELECT '*** Test part removal ***'; SELECT '*** replica 1 ***'; SELECT name FROM system.parts WHERE active AND database = currentDatabase() AND table = 'part_header_r1'; -SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/s1/replicas/1r1/parts'; +SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/1r1/parts'; SELECT '*** replica 2 ***'; SELECT name FROM system.parts WHERE active AND database = currentDatabase() AND table = 'part_header_r2'; -SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/s1/replicas/1r1/parts'; +SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/2r1/parts'; SELECT '*** Test ALTER ***'; ALTER TABLE part_header_r1 MODIFY COLUMN y String; @@ -63,3 +83,5 @@ SELECT x, length(y) FROM part_header_r2 ORDER BY x; DROP TABLE part_header_r1; DROP TABLE part_header_r2; + +" diff --git a/tests/queries/0_stateless/00900_orc_load.sh b/tests/queries/0_stateless/00900_orc_load.sh index b3f2c39e5d2..62149fa554e 100755 --- a/tests/queries/0_stateless/00900_orc_load.sh +++ b/tests/queries/0_stateless/00900_orc_load.sh @@ -5,16 +5,13 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CUR_DIR"/../shell_config.sh -DATA_FILE=$CUR_DIR/data_orc/test.orc - ${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS orc_load" ${CLICKHOUSE_CLIENT} --query="CREATE TABLE orc_load (int Int32, smallint Int8, bigint Int64, float Float32, double Float64, date Date, y String, datetime64 DateTime64(3)) ENGINE = Memory" ${CLICKHOUSE_CLIENT} --query="insert into orc_load values (0, 0, 0, 0, 0, '2019-01-01', 'test1', toDateTime64('2019-01-01 02:03:04.567', 3)), (2147483647, -1, 9223372036854775806, 123.345345, 345345.3453451212, '2019-01-01', 'test2', toDateTime64('2019-01-01 02:03:04.567', 3))" -${CLICKHOUSE_CLIENT} --query="select * from orc_load FORMAT ORC" > $DATA_FILE +${CLICKHOUSE_CLIENT} --query="select * from orc_load FORMAT ORC" > "${CLICKHOUSE_TMP}"/test.orc ${CLICKHOUSE_CLIENT} --query="truncate table orc_load" -cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC" -timeout 3 ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC" < $DATA_FILE +cat "${CLICKHOUSE_TMP}"/test.orc | ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC" +timeout 3 ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC" < "${CLICKHOUSE_TMP}"/test.orc ${CLICKHOUSE_CLIENT} --query="select * from orc_load" ${CLICKHOUSE_CLIENT} --query="drop table orc_load" -rm -rf "$DATA_FILE" diff --git a/tests/queries/0_stateless/01921_datatype_date32.reference b/tests/queries/0_stateless/01921_datatype_date32.reference index a6b03123b33..70eebc76c01 100644 --- a/tests/queries/0_stateless/01921_datatype_date32.reference +++ b/tests/queries/0_stateless/01921_datatype_date32.reference @@ -283,3 +283,9 @@ 1925-01-01 \N 1925-01-01 \N +1925-01-01 +1969-12-31 +1970-01-01 +2149-06-06 +2149-06-07 +2283-11-11 diff --git a/tests/queries/0_stateless/01921_datatype_date32.sql b/tests/queries/0_stateless/01921_datatype_date32.sql index 0805b94a9fe..ef6e3e5ee89 100644 --- a/tests/queries/0_stateless/01921_datatype_date32.sql +++ b/tests/queries/0_stateless/01921_datatype_date32.sql @@ -118,4 +118,15 @@ select toDate32OrZero('1924-01-01'), toDate32OrNull('1924-01-01'); select toDate32OrZero(''), toDate32OrNull(''); select (select toDate32OrZero('')); select (select toDate32OrNull('')); +SELECT toString(T.d) dateStr +FROM + ( + SELECT '1925-01-01'::Date32 d + UNION ALL SELECT '1969-12-31'::Date32 + UNION ALL SELECT '1970-01-01'::Date32 + UNION ALL SELECT '2149-06-06'::Date32 + UNION ALL SELECT '2149-06-07'::Date32 + UNION ALL SELECT '2283-11-11'::Date32 + ) AS T +ORDER BY T.d diff --git a/tests/queries/0_stateless/02293_hashid.reference b/tests/queries/0_stateless/02293_hashid.reference index 9ae4cce3944..f36b1500288 100644 --- a/tests/queries/0_stateless/02293_hashid.reference +++ b/tests/queries/0_stateless/02293_hashid.reference @@ -8,4 +8,5 @@ 2 obmgndljgajpkeao 3 dldokmpjpgjgeanb 4 nkdlpgajngjnobme -YQrvD5XGvbx +xkOpDGxQpVB +jR diff --git a/tests/queries/0_stateless/02293_hashid.sql b/tests/queries/0_stateless/02293_hashid.sql index 145bd76ccbf..45aaefe7356 100644 --- a/tests/queries/0_stateless/02293_hashid.sql +++ b/tests/queries/0_stateless/02293_hashid.sql @@ -3,3 +3,5 @@ SET allow_experimental_hash_functions = 1; select number, hashid(number) from system.numbers limit 5; select number, hashid(number, 's3cr3t', 16, 'abcdefghijklmnop') from system.numbers limit 5; select hashid(1234567890123456, 's3cr3t'); + +SELECT hashid(1, hashid(2)); diff --git a/tests/queries/0_stateless/02311_system_zookeeper_insert.reference b/tests/queries/0_stateless/02311_system_zookeeper_insert.reference new file mode 100644 index 00000000000..d8a35d57e07 --- /dev/null +++ b/tests/queries/0_stateless/02311_system_zookeeper_insert.reference @@ -0,0 +1,27 @@ +/default/1-insert-testc c +/default/1-insert-testc/c c +/default/1-insert-testc/c/c c +/default/1-insert-testc/c/c d +/default/1-insert-testc/c/c e +/default/1-insert-testc/c/c f +/default/1-insert-testc/c/c kk +/default/1-insert-testc/c/c/c c +/default/1-insert-testc/c/c/c/c c +/default/1-insert-testc/c/c/c/c/c c +/default/1-insert-testc/c/c/c/c/c/c c 9 +/default/1-insert-testc/c/c/c/c/c/c/c c 10 +/default/1-insert-testc/c/c/d e 10 +/default/1-insert-testc/c/c/d f 11 +/default/1-insert-testc/c/c/d g 12 +/default/1-insert-testc/c/c/e g 13 +/default/1-insert-testc/c/c/f g 14 +/default/1-insert-testc/c/c/kk g 14 +------------------------- +/default/2-insert-testx testb z +/default/2-insert-testx testc x +/default/2-insert-testx testz y +/default/2-insert-testz c +/default/2-insert-testz/c cd +/default/2-insert-testz/c/cd dd +/default/2-insert-testz/c/cd testc +/default/2-insert-testz/c/cd/dd testc y diff --git a/tests/queries/0_stateless/02311_system_zookeeper_insert.sql b/tests/queries/0_stateless/02311_system_zookeeper_insert.sql new file mode 100644 index 00000000000..e1c42278086 --- /dev/null +++ b/tests/queries/0_stateless/02311_system_zookeeper_insert.sql @@ -0,0 +1,43 @@ +-- Tags: zookeeper + +set allow_unrestricted_reads_from_keeper = 'true'; + +drop table if exists test_zkinsert; + +create table test_zkinsert ( + name String, + path String, + value String +) ENGINE Memory; + +-- test recursive create and big transaction +insert into test_zkinsert (name, path, value) values ('c', '/1-insert-testc/c/c/c/c/c/c', 11), ('e', '/1-insert-testc/c/c/d', 10), ('c', '/1-insert-testc/c/c/c/c/c/c/c', 10), ('c', '/1-insert-testc/c/c/c/c/c/c', 9), ('f', '/1-insert-testc/c/c/d', 11), ('g', '/1-insert-testc/c/c/d', 12), ('g', '/1-insert-testc/c/c/e', 13), ('g', '/1-insert-testc/c/c/f', 14), ('g', '/1-insert-testc/c/c/kk', 14); +-- insert same value, suppose to have no side effects +insert into system.zookeeper (name, path, value) SELECT name, '/' || currentDatabase() || path, value from test_zkinsert; + +SELECT * FROM (SELECT path, name, value FROM system.zookeeper ORDER BY path, name) WHERE path LIKE '/' || currentDatabase() || '/1-insert-test%'; + +SELECT '-------------------------'; + +-- test inserting into root path +insert into test_zkinsert (name, path, value) values ('testc', '/2-insert-testx', 'x'); +insert into test_zkinsert (name, path, value) values ('testz', '/2-insert-testx', 'y'); +insert into test_zkinsert (name, path, value) values ('testc', '/2-insert-testz//c/cd/dd//', 'y'); +insert into test_zkinsert (name, path) values ('testc', '/2-insert-testz//c/cd/'); +insert into test_zkinsert (name, value, path) values ('testb', 'z', '/2-insert-testx'); + +insert into system.zookeeper (name, path, value) SELECT name, '/' || currentDatabase() || path, value from test_zkinsert; + +SELECT * FROM (SELECT path, name, value FROM system.zookeeper ORDER BY path, name) WHERE path LIKE '/' || currentDatabase() || '/2-insert-test%'; + +-- test exceptions +insert into system.zookeeper (name, value) values ('abc', 'y'); -- { serverError 36 } +insert into system.zookeeper (path, value) values ('a/b/c', 'y'); -- { serverError 36 } +insert into system.zookeeper (name, version) values ('abc', 111); -- { serverError 44 } +insert into system.zookeeper (name, versionxyz) values ('abc', 111); -- { serverError 16 } +insert into system.zookeeper (name, path, value) values ('a/b/c', '/', 'y'); -- { serverError 36 } +insert into system.zookeeper (name, path, value) values ('/', '/a/b/c', 'z'); -- { serverError 36 } +insert into system.zookeeper (name, path, value) values ('', '/', 'y'); -- { serverError 36 } +insert into system.zookeeper (name, path, value) values ('abc', '/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc', 'y'); -- { serverError 36 } + +drop table if exists test_zkinsert; diff --git a/tests/queries/0_stateless/02311_system_zookeeper_insert_priv.reference b/tests/queries/0_stateless/02311_system_zookeeper_insert_priv.reference new file mode 100644 index 00000000000..d00491fd7e5 --- /dev/null +++ b/tests/queries/0_stateless/02311_system_zookeeper_insert_priv.reference @@ -0,0 +1 @@ +1 diff --git a/tests/queries/0_stateless/02311_system_zookeeper_insert_priv.sh b/tests/queries/0_stateless/02311_system_zookeeper_insert_priv.sh new file mode 100755 index 00000000000..24a1f7e7c39 --- /dev/null +++ b/tests/queries/0_stateless/02311_system_zookeeper_insert_priv.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +# Tags: no-parallel + + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + + +${CLICKHOUSE_CLIENT_BINARY} --query "drop user if exists u_02311" +${CLICKHOUSE_CLIENT_BINARY} --query "create user u_02311" +error="$(${CLICKHOUSE_CLIENT_BINARY} --user=u_02311 --query "insert into system.zookeeper (path, name, value) values ('//3-insert-testc/c/c/kk', 'kk', '11')" 2>&1 > /dev/null)" +echo "${error}" | grep -Fc "ACCESS_DENIED" + +${CLICKHOUSE_CLIENT_BINARY} --query "drop user u_02311" diff --git a/tests/queries/0_stateless/02312_is_not_null_prewhere.reference b/tests/queries/0_stateless/02312_is_not_null_prewhere.reference new file mode 100644 index 00000000000..bdaa7374c1b --- /dev/null +++ b/tests/queries/0_stateless/02312_is_not_null_prewhere.reference @@ -0,0 +1,3 @@ +2022-01-01 00:00:00 1 +2022-01-01 00:00:00 1 +2022-01-01 00:00:00 1 diff --git a/tests/queries/0_stateless/02312_is_not_null_prewhere.sql b/tests/queries/0_stateless/02312_is_not_null_prewhere.sql new file mode 100644 index 00000000000..56371d0ec6c --- /dev/null +++ b/tests/queries/0_stateless/02312_is_not_null_prewhere.sql @@ -0,0 +1,21 @@ +DROP TABLE IF EXISTS bug_36995; + +CREATE TABLE bug_36995( + `time` DateTime, + `product` String) +ENGINE = MergeTree +ORDER BY time AS +SELECT '2022-01-01 00:00:00','1'; + +SELECT * FROM bug_36995 +WHERE (time IS NOT NULL) AND (product IN (SELECT '1')) +SETTINGS optimize_move_to_prewhere = 1; + +SELECT * FROM bug_36995 +WHERE (time IS NOT NULL) AND (product IN (SELECT '1')) +SETTINGS optimize_move_to_prewhere = 0; + +SELECT * FROM bug_36995 +PREWHERE (time IS NOT NULL) WHERE (product IN (SELECT '1')); + +DROP TABLE bug_36995; diff --git a/tests/queries/0_stateless/02316_const_string_intersact.sql b/tests/queries/0_stateless/02316_const_string_intersact.sql index ace3c8d03c5..18af398aa5d 100644 --- a/tests/queries/0_stateless/02316_const_string_intersact.sql +++ b/tests/queries/0_stateless/02316_const_string_intersact.sql @@ -1 +1,3 @@ +-- Tags: no-backward-compatibility-check + SELECT 'Play ClickHouse' InterSect SELECT 'Play ClickHouse' diff --git a/tests/queries/0_stateless/02316_values_table_func_bug.reference b/tests/queries/0_stateless/02316_values_table_func_bug.reference new file mode 100644 index 00000000000..63f5d8d96c8 --- /dev/null +++ b/tests/queries/0_stateless/02316_values_table_func_bug.reference @@ -0,0 +1,2 @@ +[1,2.2] +[[1,2,3],[1.1,2.2,3.3]] diff --git a/tests/queries/0_stateless/02316_values_table_func_bug.sql b/tests/queries/0_stateless/02316_values_table_func_bug.sql new file mode 100644 index 00000000000..7c66cf125e1 --- /dev/null +++ b/tests/queries/0_stateless/02316_values_table_func_bug.sql @@ -0,0 +1,2 @@ +select * from values([1, 2.2]); +select * from values([[1, 2, 3], [1.1, 2.2, 3.3]]); diff --git a/tests/queries/1_stateful/00168_parallel_processing_on_replicas_part_1.sh b/tests/queries/1_stateful/00168_parallel_processing_on_replicas_part_1.sh index bbb5d903ea9..ecd0d281b53 100755 --- a/tests/queries/1_stateful/00168_parallel_processing_on_replicas_part_1.sh +++ b/tests/queries/1_stateful/00168_parallel_processing_on_replicas_part_1.sh @@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # All replicas are localhost, disable `prefer_localhost_replica` option to test network interface # Currently this feature could not work with hedged requests # Enabling `enable_sample_offset_parallel_processing` feature could lead to intersecting marks, so some of them would be thrown away and it will lead to incorrect result of SELECT query -SETTINGS="--max_parallel_replicas=3 --use_hedged_requests=false --async_socket_for_remote=false --allow_experimental_parallel_reading_from_replicas=true" +SETTINGS="--max_parallel_replicas=3 --use_hedged_requests=false --allow_experimental_parallel_reading_from_replicas=true" # Prepare tables $CLICKHOUSE_CLIENT $SETTINGS -nm -q '''