Merge branch 'master' into refactor-read-metrics-and-callbacks

This commit is contained in:
Nikolai Kochetov 2022-06-02 17:00:08 +00:00
commit 8991f39412
70 changed files with 961 additions and 330 deletions

View File

@ -81,7 +81,6 @@ jobs:
cat >> "$GITHUB_ENV" << 'EOF'
BUILD_NAME=coverity
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
TEMP_PATH=${{runner.temp}}/build_check
@ -99,13 +98,15 @@ jobs:
id: coverity-checkout
uses: actions/checkout@v2
with:
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
git -C "$GITHUB_WORKSPACE" submodule sync
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --init --jobs=10
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$CHECK_NAME" "$BUILD_NAME"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload Coverity Analysis
if: ${{ success() || failure() }}
run: |

2
contrib/libxml2 vendored

@ -1 +1 @@
Subproject commit a075d256fd9ff15590b86d981b75a50ead124fca
Subproject commit 7846b0a677f8d3ce72486125fa281e92ac9970e8

View File

@ -352,7 +352,7 @@ Elements set to `NULL` are handled as normal values.
## arrayCount(\[func,\] arr1, …) {#array-count}
Returns the number of elements in the arr array for which func returns something other than 0. If func is not specified, it returns the number of non-zero elements in the array.
Returns the number of elements for which `func(arr1[i], …, arrN[i])` returns something other than 0. If `func` is not specified, it returns the number of non-zero elements in the array.
Note that the `arrayCount` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You can pass a lambda function to it as the first argument.
@ -1244,7 +1244,7 @@ Result:
## arrayMap(func, arr1, …) {#array-map}
Returns an array obtained from the original application of the `func` function to each element in the `arr` array.
Returns an array obtained from the original arrays by application of `func(arr1[i], …, arrN[i])` for each element. Arrays `arr1``arrN` must have the same number of elements.
Examples:
@ -1274,7 +1274,7 @@ Note that the `arrayMap` is a [higher-order function](../../sql-reference/functi
## arrayFilter(func, arr1, …) {#array-filter}
Returns an array containing only the elements in `arr1` for which `func` returns something other than 0.
Returns an array containing only the elements in `arr1` for which `func(arr1[i], …, arrN[i])` returns something other than 0.
Examples:
@ -1307,7 +1307,7 @@ Note that the `arrayFilter` is a [higher-order function](../../sql-reference/fun
## arrayFill(func, arr1, …) {#array-fill}
Scan through `arr1` from the first element to the last element and replace `arr1[i]` by `arr1[i - 1]` if `func` returns 0. The first element of `arr1` will not be replaced.
Scan through `arr1` from the first element to the last element and replace `arr1[i]` by `arr1[i - 1]` if `func(arr1[i], …, arrN[i])` returns 0. The first element of `arr1` will not be replaced.
Examples:
@ -1325,7 +1325,7 @@ Note that the `arrayFill` is a [higher-order function](../../sql-reference/funct
## arrayReverseFill(func, arr1, …) {#array-reverse-fill}
Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func` returns 0. The last element of `arr1` will not be replaced.
Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func(arr1[i], …, arrN[i])` returns 0. The last element of `arr1` will not be replaced.
Examples:
@ -1343,7 +1343,7 @@ Note that the `arrayReverseFill` is a [higher-order function](../../sql-referenc
## arraySplit(func, arr1, …) {#array-split}
Split `arr1` into multiple arrays. When `func` returns something other than 0, the array will be split on the left hand side of the element. The array will not be split before the first element.
Split `arr1` into multiple arrays. When `func(arr1[i], …, arrN[i])` returns something other than 0, the array will be split on the left hand side of the element. The array will not be split before the first element.
Examples:
@ -1361,7 +1361,7 @@ Note that the `arraySplit` is a [higher-order function](../../sql-reference/func
## arrayReverseSplit(func, arr1, …) {#array-reverse-split}
Split `arr1` into multiple arrays. When `func` returns something other than 0, the array will be split on the right hand side of the element. The array will not be split after the last element.
Split `arr1` into multiple arrays. When `func(arr1[i], …, arrN[i])` returns something other than 0, the array will be split on the right hand side of the element. The array will not be split after the last element.
Examples:
@ -1379,37 +1379,37 @@ Note that the `arrayReverseSplit` is a [higher-order function](../../sql-referen
## arrayExists(\[func,\] arr1, …) {#arrayexistsfunc-arr1}
Returns 1 if there is at least one element in `arr` for which `func` returns something other than 0. Otherwise, it returns 0.
Returns 1 if there is at least one element in `arr` for which `func(arr1[i], …, arrN[i])` returns something other than 0. Otherwise, it returns 0.
Note that the `arrayExists` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You can pass a lambda function to it as the first argument.
## arrayAll(\[func,\] arr1, …) {#arrayallfunc-arr1}
Returns 1 if `func` returns something other than 0 for all the elements in `arr`. Otherwise, it returns 0.
Returns 1 if `func(arr1[i], …, arrN[i])` returns something other than 0 for all the elements in arrays. Otherwise, it returns 0.
Note that the `arrayAll` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You can pass a lambda function to it as the first argument.
## arrayFirst(func, arr1, …) {#array-first}
Returns the first element in the `arr1` array for which `func` returns something other than 0.
Returns the first element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0.
Note that the `arrayFirst` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
## arrayLast(func, arr1, …) {#array-last}
Returns the last element in the `arr1` array for which `func` returns something other than 0.
Returns the last element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0.
Note that the `arrayLast` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
## arrayFirstIndex(func, arr1, …) {#array-first-index}
Returns the index of the first element in the `arr1` array for which `func` returns something other than 0.
Returns the index of the first element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0.
Note that the `arrayFirstIndex` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
## arrayLastIndex(func, arr1, …) {#array-last-index}
Returns the index of the last element in the `arr1` array for which `func` returns something other than 0.
Returns the index of the last element in the `arr1` array for which `func(arr1[i], …, arrN[i])` returns something other than 0.
Note that the `arrayLastIndex` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
@ -1635,7 +1635,7 @@ Result:
## arrayCumSum(\[func,\] arr1, …) {#arraycumsumfunc-arr1}
Returns an array of partial sums of elements in the source array (a running sum). If the `func` function is specified, then the values of the array elements are converted by this function before summing.
Returns an array of partial sums of elements in the source array (a running sum). If the `func` function is specified, then the values of the array elements are converted by `func(arr1[i], …, arrN[i])` before summing.
Example:

View File

@ -695,7 +695,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \
\
M(Bool, output_format_json_escape_forward_slashes, true, "Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.", 0) \
M(Bool, output_format_json_named_tuples_as_objects, false, "Serialize named tuple columns as JSON objects.", 0) \
M(Bool, output_format_json_named_tuples_as_objects, true, "Serialize named tuple columns as JSON objects.", 0) \
M(Bool, output_format_json_array_of_rows, false, "Output a JSON array of all rows in JSONEachRow(Compact) format.", 0) \
\
M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \

View File

@ -26,6 +26,7 @@
# include <Common/setThreadName.h>
# include <filesystem>
# include <Common/filesystemHelpers.h>
# include <Parsers/ASTIdentifier.h>
namespace fs = std::filesystem;
@ -148,8 +149,16 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const String & table_name, Context
auto storage_engine_arguments = ast_storage->engine->arguments;
/// Add table_name to engine arguments
auto mysql_table_name = std::make_shared<ASTLiteral>(table_name);
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
if (typeid_cast<ASTIdentifier *>(storage_engine_arguments->children[0].get()))
{
storage_engine_arguments->children.push_back(
makeASTFunction("equals", std::make_shared<ASTIdentifier>("table"), std::make_shared<ASTLiteral>(table_name)));
}
else
{
auto mysql_table_name = std::make_shared<ASTLiteral>(table_name);
storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
}
/// Unset settings
std::erase_if(storage_children, [&](const ASTPtr & element) { return element.get() == ast_storage->settings; });

View File

@ -51,9 +51,11 @@ public:
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 1)
if (arguments.empty())
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Function {} expects at least one argument", getName());
const auto & id_col = arguments[0];
@ -114,18 +116,16 @@ public:
const auto & numcolumn = arguments[0].column;
if (checkAndGetColumn<ColumnUInt8>(numcolumn.get()) || checkAndGetColumn<ColumnUInt16>(numcolumn.get())
|| checkAndGetColumn<ColumnUInt32>(numcolumn.get()) || checkAndGetColumn<ColumnUInt64>(numcolumn.get())
|| checkAndGetColumnConst<ColumnUInt8>(numcolumn.get()) || checkAndGetColumnConst<ColumnUInt16>(numcolumn.get())
|| checkAndGetColumnConst<ColumnUInt32>(numcolumn.get()) || checkAndGetColumnConst<ColumnUInt64>(numcolumn.get()))
|| checkAndGetColumn<ColumnUInt32>(numcolumn.get()) || checkAndGetColumn<ColumnUInt64>(numcolumn.get()))
{
std::string salt;
UInt8 minLength = 0;
UInt8 min_length = 0;
std::string alphabet;
if (arguments.size() >= 4)
{
const auto & alphabetcolumn = arguments[3].column;
if (auto alpha_col = checkAndGetColumnConst<ColumnString>(alphabetcolumn.get()))
if (const auto * alpha_col = checkAndGetColumnConst<ColumnString>(alphabetcolumn.get()))
{
alphabet = alpha_col->getValue<String>();
if (alphabet.find('\0') != std::string::npos)
@ -138,18 +138,18 @@ public:
if (arguments.size() >= 3)
{
const auto & minlengthcolumn = arguments[2].column;
if (auto min_length_col = checkAndGetColumnConst<ColumnUInt8>(minlengthcolumn.get()))
minLength = min_length_col->getValue<UInt8>();
if (const auto * min_length_col = checkAndGetColumnConst<ColumnUInt8>(minlengthcolumn.get()))
min_length = min_length_col->getValue<UInt8>();
}
if (arguments.size() >= 2)
{
const auto & saltcolumn = arguments[1].column;
if (auto salt_col = checkAndGetColumnConst<ColumnString>(saltcolumn.get()))
if (const auto * salt_col = checkAndGetColumnConst<ColumnString>(saltcolumn.get()))
salt = salt_col->getValue<String>();
}
hashidsxx::Hashids hash(salt, minLength, alphabet);
hashidsxx::Hashids hash(salt, min_length, alphabet);
auto col_res = ColumnString::create();

View File

@ -705,7 +705,7 @@ template <>
struct FormatImpl<DataTypeDate32>
{
template <typename ReturnType = void>
static ReturnType execute(const DataTypeDate::FieldType x, WriteBuffer & wb, const DataTypeDate32 *, const DateLUTImpl *)
static ReturnType execute(const DataTypeDate32::FieldType x, WriteBuffer & wb, const DataTypeDate32 *, const DateLUTImpl *)
{
writeDateText(ExtendedDayNum(x), wb);
return ReturnType(true);

View File

@ -49,7 +49,15 @@ ParallelReadBuffer::ParallelReadBuffer(
, schedule(std::move(schedule_))
, reader_factory(std::move(reader_factory_))
{
addReaders();
try
{
addReaders();
}
catch (const Exception &)
{
finishAndWait();
throw;
}
}
bool ParallelReadBuffer::addReaderToPool()

View File

@ -152,7 +152,7 @@ void WriteBufferFromS3::allocateBuffer()
WriteBufferFromS3::~WriteBufferFromS3()
{
#ifndef NDEBUG
if (!is_finalized)
if (!finalized)
{
LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It's a bug");
std::terminate();
@ -200,8 +200,6 @@ void WriteBufferFromS3::finalizeImpl()
if (!multipart_upload_id.empty())
completeMultipartUpload();
is_finalized = true;
}
void WriteBufferFromS3::createMultipartUpload()

View File

@ -106,7 +106,6 @@ private:
std::vector<String> part_tags;
bool is_prefinalized = false;
bool is_finalized = false;
/// Following fields are for background uploads in thread pool (if specified).
/// We use std::function to avoid dependency of Interpreters

View File

@ -418,6 +418,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name, address.port,
@ -485,6 +487,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
}
Addresses shard_local_addresses;
Addresses shard_all_addresses;
ConnectionPoolPtrs all_replicas_pools;
all_replicas_pools.reserve(replica_addresses.size());
@ -502,6 +505,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local)
shard_local_addresses.push_back(replica);
shard_all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
@ -516,6 +520,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
current_shard_num,
weight,
std::move(shard_local_addresses),
std::move(shard_all_addresses),
std::move(shard_pool),
std::move(all_replicas_pools),
internal_replication
@ -571,6 +576,7 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
Addresses shard_local_addresses;
Addresses all_addresses;
ConnectionPoolPtrs all_replicas;
all_replicas.reserve(current.size());
@ -585,6 +591,7 @@ Cluster::Cluster(
all_replicas.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
@ -597,6 +604,7 @@ Cluster::Cluster(
current_shard_num,
default_weight,
std::move(shard_local_addresses),
std::move(all_addresses),
std::move(shard_pool),
std::move(all_replicas),
false // has_internal_replication
@ -680,6 +688,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name,

View File

@ -202,6 +202,7 @@ public:
UInt32 shard_num = 0;
UInt32 weight = 1;
Addresses local_addresses;
Addresses all_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
/// Connection pool for each replica, contains nullptr for local replicas

View File

@ -1,65 +0,0 @@
#pragma once
#include <Client/ConnectionPool.h>
#include <Interpreters/Cluster.h>
#include <Parsers/IAST.h>
namespace DB
{
struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
struct StorageID;
namespace ClusterProxy
{
/// Base class for the implementation of the details of distributed query
/// execution that are specific to the query type.
class IStreamFactory
{
public:
virtual ~IStreamFactory() = default;
struct Shard
{
/// Query and header may be changed depending on shard.
ASTPtr query;
Block header;
size_t shard_num = 0;
size_t num_replicas = 0;
ConnectionPoolWithFailoverPtr pool;
ConnectionPoolPtrs per_replica_pools;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).
bool lazy = false;
UInt32 local_delay = 0;
};
using Shards = std::vector<Shard>;
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count) = 0;
};
}
}

View File

@ -1,4 +1,5 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
@ -10,14 +11,15 @@
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <DataTypes/ObjectUtils.h>
#include <Client/IConnections.h>
#include <Common/logger_useful.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace ProfileEvents
{
extern const Event DistributedConnectionMissingTable;
@ -63,7 +65,8 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*coordinator=*/nullptr));
local_plans.emplace_back(createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*replica_num=*/0, /*replica_count=*/0, /*coordinator=*/nullptr));
};
auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0)
@ -71,10 +74,7 @@ void SelectStreamFactory::createForShard(
remote_shards.emplace_back(Shard{
.query = query_ast,
.header = header,
.shard_num = shard_info.shard_num,
.num_replicas = shard_info.getAllNodeCount(),
.pool = shard_info.pool,
.per_replica_pools = shard_info.per_replica_pools,
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
});
@ -173,5 +173,97 @@ void SelectStreamFactory::createForShard(
emplace_remote_stream();
}
SelectStreamFactory::ShardPlans SelectStreamFactory::createForShardWithParallelReplicas(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_function_ptr,
const ThrottlerPtr & throttler,
ContextPtr context,
UInt32 shard_count)
{
SelectStreamFactory::ShardPlans result;
if (auto it = objects_by_shard.find(shard_info.shard_num); it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
const auto & settings = context->getSettingsRef();
auto is_local_replica_obsolete = [&]()
{
auto resolved_id = context->resolveStorageID(main_table);
auto main_table_storage = DatabaseCatalog::instance().tryGetTable(resolved_id, context);
const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());
if (!replicated_storage)
return false;
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
if (!max_allowed_delay)
return false;
UInt32 local_delay = replicated_storage->getAbsoluteDelay();
return local_delay >= max_allowed_delay;
};
size_t next_replica_number = 0;
size_t all_replicas_count = shard_info.getRemoteNodeCount();
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>();
auto remote_plan = std::make_unique<QueryPlan>();
if (settings.prefer_localhost_replica && shard_info.isLocal())
{
/// We don't need more than one local replica in parallel reading
if (!is_local_replica_obsolete())
{
++all_replicas_count;
result.local_plan = createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, next_replica_number, all_replicas_count, coordinator);
++next_replica_number;
}
}
Scalars scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
scalars.emplace(
"_shard_count", Block{{DataTypeUInt32().createColumnConst(1, shard_count), std::make_shared<DataTypeUInt32>(), "_shard_count"}});
auto external_tables = context->getExternalTables();
auto shard = Shard{
.query = query_ast,
.header = header,
.shard_info = shard_info,
.lazy = false,
.local_delay = 0,
};
if (shard_info.hasRemoteConnections())
{
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
coordinator,
shard,
header,
processed_stage,
main_table,
table_function_ptr,
context,
throttler,
std::move(scalars),
std::move(external_tables),
&Poco::Logger::get("ReadFromParallelRemoteReplicasStep"),
shard_count);
remote_plan->addStep(std::move(read_from_remote));
result.remote_plan = std::move(remote_plan);
}
return result;
}
}
}

View File

@ -1,22 +1,56 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Interpreters/StorageID.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageSnapshot.h>
#include <Client/ConnectionPool.h>
#include <Interpreters/Cluster.h>
#include <Parsers/IAST.h>
namespace DB
{
struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
struct StorageID;
namespace ClusterProxy
{
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
class SelectStreamFactory final : public IStreamFactory
class SelectStreamFactory
{
public:
struct Shard
{
/// Query and header may be changed depending on shard.
ASTPtr query;
Block header;
Cluster::ShardInfo shard_info;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).
bool lazy = false;
UInt32 local_delay = 0;
};
using Shards = std::vector<Shard>;
SelectStreamFactory(
const Block & header_,
const ColumnsDescriptionByShardNum & objects_by_shard_,
@ -31,7 +65,26 @@ public:
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count) override;
UInt32 shard_count);
struct ShardPlans
{
/// If a shard has local replicas this won't be nullptr
std::unique_ptr<QueryPlan> local_plan;
/// Contains several steps to read from all remote replicas
std::unique_ptr<QueryPlan> remote_plan;
};
ShardPlans createForShardWithParallelReplicas(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_function_ptr,
const ThrottlerPtr & throttler,
ContextPtr context,
UInt32 shard_count
);
private:
const Block header;

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR;
}
namespace ClusterProxy
@ -106,21 +107,19 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
IStreamFactory & stream_factory, Poco::Logger * log,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
{
assert(log);
const Settings & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::vector<QueryPlanPtr> plans;
IStreamFactory::Shards remote_shards;
SelectStreamFactory::Shards remote_shards;
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, log);
@ -215,6 +214,91 @@ void executeQuery(
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
{
const Settings & settings = context->getSettingsRef();
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context->getProcessListElement())
user_level_throttler = process_list_element->getUserNetworkThrottler();
/// Network bandwidth limit, if needed.
ThrottlerPtr throttler;
if (settings.max_network_bandwidth || settings.max_network_bytes)
{
throttler = std::make_shared<Throttler>(
settings.max_network_bandwidth,
settings.max_network_bytes,
"Limit for bytes to send or receive over network exceeded.",
user_level_throttler);
}
else
throttler = user_level_throttler;
std::vector<QueryPlanPtr> plans;
size_t shards = query_info.getCluster()->getShardCount();
for (const auto & shard_info : query_info.getCluster()->getShardsInfo())
{
ASTPtr query_ast_for_shard;
if (query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
query_ast_for_shard = query_ast->clone();
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
}
else
query_ast_for_shard = query_ast;
auto shard_plans = stream_factory.createForShardWithParallelReplicas(shard_info,
query_ast_for_shard, main_table, table_func_ptr, throttler, context, shards);
if (!shard_plans.local_plan && !shard_plans.remote_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from shard. This is a bug");
if (shard_plans.local_plan)
plans.emplace_back(std::move(shard_plans.local_plan));
if (shard_plans.remote_plan)
plans.emplace_back(std::move(shard_plans.remote_plan));
}
if (plans.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from Distributed. This is a bug");
if (plans.size() == 1)
{
query_plan = std::move(*plans.front());
return;
}
DataStreams input_streams;
input_streams.reserve(plans.size());
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
}
}

View File

@ -23,7 +23,7 @@ struct StorageID;
namespace ClusterProxy
{
class IStreamFactory;
class SelectStreamFactory;
/// Update settings for Distributed query.
///
@ -46,7 +46,18 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
IStreamFactory & stream_factory, Poco::Logger * log,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster);
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,

View File

@ -119,7 +119,11 @@ TemporaryTableHolder & TemporaryTableHolder::operator=(TemporaryTableHolder && r
TemporaryTableHolder::~TemporaryTableHolder()
{
if (id != UUIDHelpers::Nil)
{
auto table = getTable();
table->flushAndShutdown();
temporary_tables->dropTable(getContext(), "_tmp_" + toString(id));
}
}
StorageID TemporaryTableHolder::getGlobalTableID() const

View File

@ -1999,7 +1999,6 @@ void ExpressionAnalysisResult::checkActions() const
};
check_actions(prewhere_info->prewhere_actions);
check_actions(prewhere_info->alias_actions);
}
}

View File

@ -1637,15 +1637,6 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
{
auto & prewhere_info = *prewhere_info_ptr;
if (prewhere_info.alias_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header,
std::make_shared<ExpressionActions>(prewhere_info.alias_actions));
});
}
if (prewhere_info.row_level_filter)
{
pipe.addSimpleTransform([&](const Block & header)
@ -1711,12 +1702,11 @@ void InterpreterSelectQuery::setMergeTreeReadTaskCallbackAndClientInfo(MergeTree
context->setMergeTreeReadTaskCallback(std::move(callback));
}
void InterpreterSelectQuery::setProperClientInfo()
void InterpreterSelectQuery::setProperClientInfo(size_t replica_num, size_t replica_count)
{
context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
assert(options.shard_count.has_value() && options.shard_num.has_value());
context->getClientInfo().count_participating_replicas = *options.shard_count;
context->getClientInfo().number_of_current_replica = *options.shard_num;
context->getClientInfo().count_participating_replicas = replica_count;
context->getClientInfo().number_of_current_replica = replica_num;
}
bool InterpreterSelectQuery::shouldMoveToPrewhere()
@ -1885,19 +1875,6 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
for (const auto & name : required_columns)
prewhere_info->prewhere_actions->tryRestoreColumn(name);
auto analyzed_result
= TreeRewriter(context).analyze(required_columns_from_prewhere_expr, metadata_snapshot->getColumns().getAllPhysical());
prewhere_info->alias_actions
= ExpressionAnalyzer(required_columns_from_prewhere_expr, analyzed_result, context).getActionsDAG(true, false);
/// Add (physical?) columns required by alias actions.
auto required_columns_from_alias = prewhere_info->alias_actions->getRequiredColumns();
Block prewhere_actions_result = prewhere_info->prewhere_actions->getResultColumns();
for (auto & column : required_columns_from_alias)
if (!prewhere_actions_result.has(column.name))
if (required_columns.end() == std::find(required_columns.begin(), required_columns.end(), column.name))
required_columns.push_back(column.name);
/// Add physical columns required by prewhere actions.
for (const auto & column : required_columns_from_prewhere)
if (!required_aliases_from_prewhere.contains(column))

View File

@ -126,7 +126,7 @@ public:
void setMergeTreeReadTaskCallbackAndClientInfo(MergeTreeReadTaskCallback && callback);
/// It will set shard_num and shard_count to the client_info
void setProperClientInfo();
void setProperClientInfo(size_t replica_num, size_t replica_count);
private:
InterpreterSelectQuery(

View File

@ -1213,7 +1213,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
all_source_columns_set.insert(name);
}
normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings, /* allow_self_aliases = */ true);
normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings, /* allow_self_aliases = */ true, getContext());
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
@ -1309,7 +1309,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
TreeRewriterResult result(source_columns, storage, storage_snapshot, false);
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases, getContext());
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
executeScalarSubqueries(query, getContext(), 0, result.scalars, result.local_scalars, !execute_scalar_subqueries);
@ -1338,7 +1338,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
}
void TreeRewriter::normalize(
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases)
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases, ContextPtr context_)
{
UserDefinedSQLFunctionVisitor::Data data_user_defined_functions_visitor;
UserDefinedSQLFunctionVisitor(data_user_defined_functions_visitor).visit(query);
@ -1400,7 +1400,10 @@ void TreeRewriter::normalize(
MarkTableIdentifiersVisitor(identifiers_data).visit(query);
/// Rewrite function names to their canonical ones.
if (settings.normalize_function_names)
/// Notice: function name normalization is disabled when it's a secondary query, because queries are either
/// already normalized on initiator node, or not normalized and should remain unnormalized for
/// compatibility.
if (context_->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY && settings.normalize_function_names)
FunctionNameNormalizer().visit(query.get());
/// Common subexpression elimination. Rewrite rules.

View File

@ -129,7 +129,7 @@ public:
std::shared_ptr<TableJoin> table_join = {}) const;
private:
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases);
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases, ContextPtr context_);
};
}

View File

@ -31,11 +31,21 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static std::pair<Field, std::shared_ptr<const IDataType>> getFieldAndDataTypeFromLiteral(ASTLiteral * literal)
{
auto type = applyVisitor(FieldToDataType(), literal->value);
/// In case of Array field nested fields can have different types.
/// Example: Array [1, 2.3] will have 2 fields with types UInt64 and Float64
/// when result type is Array(Float64).
/// So, we need to convert this field to the result type.
Field res = convertFieldToType(literal->value, *type);
return {res, type};
}
std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(const ASTPtr & node, ContextPtr context)
{
if (ASTLiteral * literal = node->as<ASTLiteral>())
return std::make_pair(literal->value, applyVisitor(FieldToDataType(), literal->value));
return getFieldAndDataTypeFromLiteral(literal);
NamesAndTypesList source_columns = {{ "_dummy", std::make_shared<DataTypeUInt8>() }};
@ -54,7 +64,10 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
ReplaceQueryParameterVisitor param_visitor(context->getQueryParameters());
param_visitor.visit(ast);
if (context->getSettingsRef().normalize_function_names)
/// Notice: function name normalization is disabled when it's a secondary query, because queries are either
/// already normalized on initiator node, or not normalized and should remain unnormalized for
/// compatibility.
if (context->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY && context->getSettingsRef().normalize_function_names)
FunctionNameNormalizer().visit(ast.get());
String name = ast->getColumnName();
@ -63,7 +76,7 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
/// AST potentially could be transformed to literal during TreeRewriter analyze.
/// For example if we have SQL user defined function that return literal AS subquery.
if (ASTLiteral * literal = ast->as<ASTLiteral>())
return std::make_pair(literal->value, applyVisitor(FieldToDataType(), literal->value));
return getFieldAndDataTypeFromLiteral(literal);
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(ast, syntax_result, context).getConstActions();

View File

@ -41,6 +41,8 @@ std::unique_ptr<QueryPlan> createLocalPlan(
QueryProcessingStage::Enum processed_stage,
UInt32 shard_num,
UInt32 shard_count,
size_t replica_num,
size_t replica_count,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator)
{
checkStackSize();
@ -56,7 +58,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
.setShardInfo(shard_num, shard_count)
.ignoreASTOptimizations());
interpreter.setProperClientInfo();
interpreter.setProperClientInfo(replica_num, replica_count);
if (coordinator)
{
interpreter.setMergeTreeReadTaskCallbackAndClientInfo([coordinator](PartitionReadRequest request) -> std::optional<PartitionReadResponse>

View File

@ -15,6 +15,8 @@ std::unique_ptr<QueryPlan> createLocalPlan(
QueryProcessingStage::Enum processed_stage,
UInt32 shard_num,
UInt32 shard_count,
size_t replica_num,
size_t replica_count,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator);
}

View File

@ -17,6 +17,8 @@
#include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
@ -63,7 +65,7 @@ static String formattedAST(const ASTPtr & ast)
}
ReadFromRemote::ReadFromRemote(
ClusterProxy::IStreamFactory::Shards shards_,
ClusterProxy::SelectStreamFactory::Shards shards_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
@ -90,10 +92,7 @@ ReadFromRemote::ReadFromRemote(
{
}
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info)
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -106,10 +105,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
}
auto lazily_create_stream = [
replica_info = replica_info,
pool = pool ? pool : shard.pool,
coordinator = coordinator,
shard_num = shard.shard_num, shard_count = shard_count, query = shard.query, header = shard.header,
shard = shard, shard_count = shard_count, query = shard.query, header = shard.header,
context = context, throttler = throttler,
main_table = main_table, table_func_ptr = table_func_ptr,
scalars = scalars, external_tables = external_tables,
@ -125,15 +121,15 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
try
{
if (table_func_ptr)
try_results = pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
try_results = shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
try_results = shard.shard_info.pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"),
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num);
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard.shard_info.shard_num);
else
throw;
}
@ -147,7 +143,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(query, header, context, stage, shard_num, shard_count, coordinator);
auto plan = createLocalPlan(query, header, context, stage, shard.shard_info.shard_num, shard_count, 0, 0, /*coordinator=*/nullptr);
return std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
@ -163,10 +159,9 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
String query_string = formattedAST(query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = replica_info});
shard.shard_info.pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
QueryPipelineBuilder builder;
@ -179,10 +174,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
addConvertingActions(pipes.back(), output_stream->header);
}
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info)
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -197,20 +189,15 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool ? pool : shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = std::move(replica_info)});
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
/// In case of parallel reading from replicas we have a connection pool per replica.
/// Setting PoolMode will make no sense.
if (!pool)
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
@ -223,48 +210,80 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B
{
Pipes pipes;
const auto & settings = context->getSettingsRef();
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
/// We have to create a pipe for each replica
/// FIXME: The second condition is only for tests to work, because hedged connections enabled by default.
if (settings.max_parallel_replicas > 1 && !enable_sample_offset_parallel_processing && !context->getSettingsRef().use_hedged_requests)
for (const auto & shard : shards)
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard : shards)
{
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>();
for (size_t replica_num = 0; replica_num < shard.num_replicas; ++replica_num)
{
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = shard.num_replicas,
.number_of_current_replica = replica_num
};
auto pool = shard.per_replica_pools[replica_num];
auto pool_with_failover = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, current_settings.load_balancing);
if (shard.lazy)
addLazyPipe(pipes, shard, coordinator, pool_with_failover, replica_info);
else
addPipe(pipes, shard, coordinator, pool_with_failover, replica_info);
}
}
if (shard.lazy)
addLazyPipe(pipes, shard);
else
addPipe(pipes, shard);
}
else
auto pipe = Pipe::unitePipes(std::move(pipes));
pipeline.init(std::move(pipe));
}
ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ParallelReplicasReadingCoordinatorPtr coordinator_,
ClusterProxy::SelectStreamFactory::Shard shard_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_)
: ISourceStep(DataStream{.header = std::move(header_)})
, coordinator(std::move(coordinator_))
, shard(std::move(shard_))
, stage(std::move(stage_))
, main_table(std::move(main_table_))
, table_func_ptr(table_func_ptr_)
, context(context_)
, throttler(throttler_)
, scalars(scalars_)
, external_tables{external_tables_}
, log(log_)
, shard_count(shard_count_)
{
std::vector<String> description;
for (const auto & address : shard.shard_info.all_addresses)
if (!address.is_local)
description.push_back(fmt::format("Replica: {}", address.host_name));
setStepDescription(boost::algorithm::join(description, ", "));
}
void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (size_t replica_num = 0; replica_num < shard.shard_info.getAllNodeCount(); ++replica_num)
{
for (const auto & shard : shards)
if (shard.shard_info.all_addresses[replica_num].is_local)
continue;
IConnections::ReplicaInfo replica_info
{
if (shard.lazy)
addLazyPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt);
else
addPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt);
}
.all_replicas_count = shard.shard_info.getAllNodeCount(),
.number_of_current_replica = replica_num
};
auto pool = shard.shard_info.per_replica_pools[replica_num];
assert(pool);
auto pool_with_failover = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, current_settings.load_balancing);
addPipeForSingeReplica(pipes, pool_with_failover, replica_info);
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -273,6 +292,41 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B
processor->setStorageLimits(storage_limits);
pipeline.init(std::move(pipe));
}
void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
bool add_extremes = false;
bool async_read = context->getSettingsRef().async_socket_for_remote;
if (stage == QueryProcessingStage::Complete)
{
add_totals = shard.query->as<ASTSelectQuery &>().group_by_with_totals;
add_extremes = context->getSettingsRef().extremes;
}
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, query_string, shard.header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)});
remote_query_executor->setLogger(log);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
pipes.back().addInterpreterContext(context);
addConvertingActions(pipes.back(), output_stream->header);
}
}

View File

@ -4,7 +4,7 @@
#include <Client/IConnections.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
namespace DB
@ -22,7 +22,7 @@ class ReadFromRemote final : public ISourceStep
{
public:
ReadFromRemote(
ClusterProxy::IStreamFactory::Shards shards_,
ClusterProxy::SelectStreamFactory::Shards shards_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
@ -46,7 +46,7 @@ private:
PerShard
};
ClusterProxy::IStreamFactory::Shards shards;
ClusterProxy::SelectStreamFactory::Shards shards;
QueryProcessingStage::Enum stage;
StorageID main_table;
@ -63,16 +63,52 @@ private:
Poco::Logger * log;
UInt32 shard_count;
void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info);
void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info);
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
};
void addPipeForReplica();
class ReadFromParallelRemoteReplicasStep : public ISourceStep
{
public:
ReadFromParallelRemoteReplicasStep(
ParallelReplicasReadingCoordinatorPtr coordinator_,
ClusterProxy::SelectStreamFactory::Shard shard,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_);
String getName() const override { return "ReadFromRemoteParallelReplicas"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
void addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info);
ParallelReplicasReadingCoordinatorPtr coordinator;
ClusterProxy::SelectStreamFactory::Shard shard;
QueryProcessingStage::Enum stage;
StorageID main_table;
ASTPtr table_func_ptr;
ContextPtr context;
ThrottlerPtr throttler;
Scalars scalars;
Tables external_tables;
Poco::Logger * log;
UInt32 shard_count{0};
};
}

View File

@ -1359,14 +1359,6 @@ void TCPHandler::receiveQuery()
/// so we have to apply the changes first.
query_context->setCurrentQueryId(state.query_id);
/// Disable function name normalization when it's a secondary query, because queries are either
/// already normalized on initiator node, or not normalized and should remain unnormalized for
/// compatibility.
if (query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
query_context->setSetting("normalize_function_names", false);
}
/// For testing hedged requests
if (unlikely(sleep_after_receiving_query.totalMilliseconds()))
{

View File

@ -419,7 +419,9 @@ public:
void onException() override
{
write_buf->finalize();
if (!writer)
return;
onFinish();
}
void onFinish() override
@ -433,6 +435,7 @@ public:
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
writer.reset();
throw;
}

View File

@ -261,11 +261,6 @@ std::string PrewhereInfo::dump() const
WriteBufferFromOwnString ss;
ss << "PrewhereDagInfo\n";
if (alias_actions)
{
ss << "alias_actions " << alias_actions->dumpDAG() << "\n";
}
if (prewhere_actions)
{
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";

View File

@ -72,8 +72,6 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
if (prewhere_info)
{
prewhere_actions = std::make_unique<PrewhereExprInfo>();
if (prewhere_info->alias_actions)
prewhere_actions->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
if (prewhere_info->row_level_filter)
prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
@ -556,9 +554,6 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
{
if (prewhere_info)
{
if (prewhere_info->alias_actions)
block = prewhere_info->alias_actions->updateHeader(std::move(block));
if (prewhere_info->row_level_filter)
{
block = prewhere_info->row_level_filter->updateHeader(std::move(block));

View File

@ -281,21 +281,16 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (prewhere_info)
{
if (prewhere_info->alias_actions)
pre_column_names = prewhere_info->alias_actions->getRequiredColumnsNames();
else
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
if (prewhere_info->row_level_filter)
{
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
NameSet names(pre_column_names.begin(), pre_column_names.end());
if (prewhere_info->row_level_filter)
for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames())
{
NameSet names(pre_column_names.begin(), pre_column_names.end());
for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames())
{
if (!names.contains(name))
pre_column_names.push_back(name);
}
if (!names.contains(name))
pre_column_names.push_back(name);
}
}

View File

@ -5439,17 +5439,6 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
candidate.prewhere_info->row_level_filter = row_level_filter_actions;
}
if (candidate.prewhere_info->alias_actions)
{
auto alias_actions = candidate.prewhere_info->alias_actions->clone();
// alias_action should not add missing keys.
auto new_prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
return false;
prewhere_required_columns = std::move(new_prewhere_required_columns);
candidate.prewhere_info->alias_actions = alias_actions;
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}
@ -5619,8 +5608,6 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
if (minmax_count_projection_candidate->prewhere_info)
{
const auto & prewhere_info = minmax_count_projection_candidate->prewhere_info;
if (prewhere_info->alias_actions)
ExpressionActions(prewhere_info->alias_actions, actions_settings).execute(query_info.minmax_count_projection_block);
if (prewhere_info->row_level_filter)
{

View File

@ -593,9 +593,6 @@ MergeTreeRangeReader::MergeTreeRangeReader(
if (prewhere_info)
{
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(sample_block, true);
if (prewhere_info->row_level_filter)
{
prewhere_info->row_level_filter->execute(sample_block, true);
@ -1058,9 +1055,6 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
++pos;
}
if (prewhere_info->alias_actions)
prewhere_info->alias_actions->execute(block);
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
result.block_before_prewhere = block;

View File

@ -21,8 +21,6 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
struct PrewhereExprInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
ExpressionActionsPtr alias_actions;
/// Actions for row level security filter. Applied separately before prewhere_actions.
/// This actions are separate because prewhere condition should not be executed over filtered rows.
ExpressionActionsPtr row_level_filter;

View File

@ -17,4 +17,6 @@ private:
std::unique_ptr<Impl> pimpl;
};
using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr<ParallelReplicasReadingCoordinator>;
}

View File

@ -2183,6 +2183,29 @@ bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart(
}
bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const
{
std::lock_guard<std::mutex> lock(queue.state_mutex);
for (const auto & entry : queue.queue)
{
if (entry->type != ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
continue;
for (const auto & part_name : entry->replace_range_entry->new_part_names)
{
if (part->info.isDisjoint(MergeTreePartInfo::fromPartName(part_name, queue.format_version)))
continue;
if (out_reason)
*out_reason = fmt::format("Part {} participates in REPLACE_RANGE {} ({})", part_name, entry->new_part_name, entry->znode_name);
return true;
}
}
return false;
}
std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
{
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as

View File

@ -501,6 +501,10 @@ public:
/// This predicate is checked for the first part of each range.
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
/// Returns true if part is needed for some REPLACE_RANGE entry.
/// We should not drop part in this case, because replication queue may stuck without that part.
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
/// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible
/// mutation version (and -1 as alter version). In other case, we return biggest mutation version with

View File

@ -49,8 +49,6 @@ using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
struct PrewhereInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
ActionsDAGPtr alias_actions;
/// Actions for row level security filter. Applied separately before prewhere_actions.
/// This actions are separate because prewhere condition should not be executed over filtered rows.
ActionsDAGPtr row_level_filter;

View File

@ -353,15 +353,6 @@ void StorageBuffer::read(
if (query_info.prewhere_info)
{
auto actions_settings = ExpressionActionsSettings::fromContext(local_context);
if (query_info.prewhere_info->alias_actions)
{
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header,
std::make_shared<ExpressionActions>(query_info.prewhere_info->alias_actions, actions_settings));
});
}
if (query_info.prewhere_info->row_level_filter)
{

View File

@ -691,13 +691,25 @@ void StorageDistributed::read(
storage_snapshot,
processed_stage);
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
auto settings = local_context->getSettingsRef();
bool parallel_replicas = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas && !settings.use_hedged_requests;
if (parallel_replicas)
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, main_table, remote_table_function_ptr,
select_stream_factory, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
else
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())
@ -1504,4 +1516,3 @@ void registerStorageDistributed(StorageFactory & factory)
}
}

View File

@ -813,7 +813,9 @@ public:
void onException() override
{
write_buf->finalize();
if (!writer)
return;
onFinish();
}
void onFinish() override

View File

@ -7111,6 +7111,13 @@ bool StorageReplicatedMergeTree::dropPartImpl(
return false;
}
if (merge_pred.partParticipatesInReplaceRange(part, &out_reason))
{
if (throw_if_noop)
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, out_reason);
return false;
}
if (partIsLastQuorumPart(part->info))
{
if (throw_if_noop)

View File

@ -603,7 +603,9 @@ public:
void onException() override
{
write_buf->finalize();
if (!writer)
return;
onFinish();
}
void onFinish() override

View File

@ -445,14 +445,25 @@ void StorageURLSink::consume(Chunk chunk)
void StorageURLSink::onException()
{
write_buf->finalize();
if (!writer)
return;
onFinish();
}
void StorageURLSink::onFinish()
{
writer->finalize();
writer->flush();
write_buf->finalize();
try
{
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
writer.reset();
throw;
}
}
class PartitionedStorageURLSink : public PartitionedSink

View File

@ -16,8 +16,12 @@
#include <Interpreters/Set.h>
#include <Interpreters/interpretSubquery.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string.hpp>
#include <algorithm>
#include <deque>
#include <climits>
namespace DB
@ -28,6 +32,157 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
/** ZkNodeCache is a trie tree to cache all the zookeeper writes. The purpose of this struct is to avoid creating/setting nodes
* repeatedly. For example, If we create path /a/b/c/d/e and path /a/b/d/f in the same transaction. We don't want to create
* their common path "/a/b" twice. This data structure will cache this changes and generates the eventual requests within one pass.
*/
struct ZkNodeCache
{
using ZkNodeCachePtr = std::shared_ptr<ZkNodeCache>;
std::unordered_map<String, ZkNodeCachePtr> children;
String value;
String path;
bool exists;
bool changed;
ZkNodeCache() : exists(true), changed(false) { }
ZkNodeCache(String path_, bool exists_) : path(path_), exists(exists_), changed(false) { }
void insert(const std::vector<String> & nodes, zkutil::ZooKeeperPtr zookeeper, const String & value_to_set, size_t index)
{
/// If this node has an empty name, just skip it.
/// Possibly a "/a//b///c//d/" will cause empty node.
while (index < nodes.size() && nodes[index].empty())
++index;
if (index == nodes.size())
{
value = value_to_set;
changed = true;
return;
}
const String & child_name = nodes[index];
++index;
if (!children.contains(child_name))
{
String sub_path = path + "/" + child_name;
bool child_exist = false;
if (exists)
{
/// If this node doesn't exists, neither will its child.
child_exist = zookeeper->exists(sub_path);
}
children[child_name] = std::make_shared<ZkNodeCache>(sub_path, child_exist);
}
children[child_name]->insert(nodes, zookeeper, value_to_set, index);
}
void generateRequests(Coordination::Requests & requests)
{
/** If the node doesn't exists, we should generate create request.
* If the node exists, we should generate set request.
* This dfs will prove ancestor nodes are processed first.
*/
if (!exists)
{
auto request = zkutil::makeCreateRequest(path, value, zkutil::CreateMode::Persistent);
requests.push_back(request);
}
else if (changed)
{
auto request = zkutil::makeSetRequest(path, value, -1);
requests.push_back(request);
}
for (auto [_, child] : children)
child->generateRequests(requests);
}
};
class ZooKeeperSink : public SinkToStorage
{
zkutil::ZooKeeperPtr zookeeper;
ZkNodeCache cache;
public:
ZooKeeperSink(const Block & header, ContextPtr context) : SinkToStorage(header), zookeeper(context->getZooKeeper()) { }
String getName() const override { return "ZooKeeperSink"; }
void consume(Chunk chunk) override
{
auto block = getHeader().cloneWithColumns(chunk.getColumns());
size_t rows = block.rows();
for (size_t i = 0; i < rows; i++)
{
String name = block.getByPosition(0).column->getDataAt(i).toString();
String value = block.getByPosition(1).column->getDataAt(i).toString();
String path = block.getByPosition(2).column->getDataAt(i).toString();
/// We don't expect a "name" contains a path.
if (name.find('/') != std::string::npos)
{
throw Exception("Column `name` should not contain '/'", ErrorCodes::BAD_ARGUMENTS);
}
if (name.empty())
{
throw Exception("Column `name` should not be empty", ErrorCodes::BAD_ARGUMENTS);
}
if (path.empty())
{
throw Exception("Column `path` should not be empty", ErrorCodes::BAD_ARGUMENTS);
}
if (path.size() + name.size() > PATH_MAX)
{
throw Exception("Sum of `name` length and `path` length should not exceed PATH_MAX", ErrorCodes::BAD_ARGUMENTS);
}
std::vector<String> path_vec;
boost::split(path_vec, path, boost::is_any_of("/"));
path_vec.push_back(name);
cache.insert(path_vec, zookeeper, value, 0);
}
}
void onFinish() override
{
Coordination::Requests requests;
cache.generateRequests(requests);
zookeeper->multi(requests);
}
};
StorageSystemZooKeeper::StorageSystemZooKeeper(const StorageID & table_id_)
: IStorageSystemOneBlock<StorageSystemZooKeeper>(table_id_)
{
StorageInMemoryMetadata storage_metadata;
ColumnsDescription desc;
auto columns = getNamesAndTypes();
for (const auto & col : columns)
{
ColumnDescription col_desc(col.name, col.type);
/// We only allow column `name`, `path`, `value` to insert.
if (col.name != "name" && col.name != "path" && col.name != "value")
col_desc.default_desc.kind = ColumnDefaultKind::Materialized;
desc.add(col_desc);
}
storage_metadata.setColumns(desc);
setInMemoryMetadata(storage_metadata);
}
SinkToStoragePtr StorageSystemZooKeeper::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr context)
{
if (!context->getConfigRef().getBool("allow_zookeeper_write", false))
throw Exception("Prohibit writing to system.zookeeper, unless config `allow_zookeeper_write` as true", ErrorCodes::BAD_ARGUMENTS);
Block write_header;
write_header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "name"));
write_header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "value"));
write_header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "path"));
return std::make_shared<ZooKeeperSink>(write_header, context);
}
NamesAndTypesList StorageSystemZooKeeper::getNamesAndTypes()
{

View File

@ -14,10 +14,14 @@ class Context;
class StorageSystemZooKeeper final : public IStorageSystemOneBlock<StorageSystemZooKeeper>
{
public:
explicit StorageSystemZooKeeper(const StorageID & table_id_);
std::string getName() const override { return "SystemZooKeeper"; }
static NamesAndTypesList getNamesAndTypes();
SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override;
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;

View File

@ -0,0 +1,3 @@
<clickhouse>
<allow_zookeeper_write>true</allow_zookeeper_write>
</clickhouse>

View File

@ -16,6 +16,7 @@ mkdir -p $DEST_SERVER_PATH/users.d/
mkdir -p $DEST_CLIENT_PATH
ln -sf $SRC_PATH/config.d/zookeeper.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/zookeeper_write.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/listen.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/text_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/custom_settings_prefixes.xml $DEST_SERVER_PATH/config.d/

View File

@ -117,6 +117,7 @@ def get_counters(fname):
# Lines like:
# [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client
# [gw3] [ 40%] PASSED test_replicated_users/test.py::test_rename_replicated[QUOTA]
state = line_arr[-2]
test_name = line_arr[-1]
@ -941,6 +942,16 @@ class ClickhouseIntegrationTestsRunner:
if "(memory)" in self.params["context_name"]:
result_state = "success"
for res in test_result:
# It's not easy to parse output of pytest
# Especially when test names may contain spaces
# Do not allow it to avoid obscure failures
if " " not in res[0]:
continue
logging.warning("Found invalid test name with space: %s", res[0])
status_text = "Found test with invalid name, see main log"
result_state = "failure"
return result_state, status_text, test_result, []

View File

@ -16,12 +16,6 @@ import traceback
import urllib.parse
import shlex
import urllib3
from cassandra.policies import RoundRobinPolicy
import cassandra.cluster
import psycopg2
import pymongo
import meilisearch
import pymysql
import requests
try:
@ -34,6 +28,7 @@ try:
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import pymongo
import pymysql
import meilisearch
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)

View File

@ -930,6 +930,12 @@ def test_predefined_connection_configuration(started_cluster):
== "100"
)
result = clickhouse_node.query("show create table test_database.test_table")
assert (
result.strip()
== "CREATE TABLE test_database.test_table\\n(\\n `id` Int32\\n)\\nENGINE = MySQL(mysql1, table = \\'test_table\\')"
)
clickhouse_node.query("DROP DATABASE test_database")
clickhouse_node.query_and_get_error(
"CREATE DATABASE test_database ENGINE = MySQL(mysql2)"

View File

@ -41,7 +41,7 @@ entities = [
def get_entity_id(entity):
return entity.keyword
return entity.keyword.replace(" ", "_")
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)

View File

@ -1,4 +1,13 @@
-- Tags: replica
#!/usr/bin/env bash
# Tags: replica
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS part_header_r1;
DROP TABLE IF EXISTS part_header_r2;
@ -6,13 +15,13 @@ DROP TABLE IF EXISTS part_header_r2;
SET replication_alter_partitions_sync = 2;
CREATE TABLE part_header_r1(x UInt32, y UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/{shard}', '1{replica}') ORDER BY x
ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/{shard}', '1{replica}') ORDER BY x
SETTINGS use_minimalistic_part_header_in_zookeeper = 0,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
cleanup_delay_period_random_add = 0;
CREATE TABLE part_header_r2(x UInt32, y UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/{shard}', '2{replica}') ORDER BY x
ENGINE ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/{shard}', '2{replica}') ORDER BY x
SETTINGS use_minimalistic_part_header_in_zookeeper = 1,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
@ -36,15 +45,26 @@ SELECT _part, x FROM part_header_r1 ORDER BY x;
SELECT '*** replica 2 ***';
SELECT _part, x FROM part_header_r2 ORDER BY x;
SELECT sleep(3) FORMAT Null;
"
elapsed=1
until [ $elapsed -eq 5 ];
do
sleep $(( elapsed++ ))
count1=$($CLICKHOUSE_CLIENT --query="SELECT count(name) FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/1r1/parts'")
count2=$($CLICKHOUSE_CLIENT --query="SELECT count(name) FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/2r1/parts'")
[[ $count1 == 1 && $count2 == 1 ]] && break
done
$CLICKHOUSE_CLIENT -nm -q "
SELECT '*** Test part removal ***';
SELECT '*** replica 1 ***';
SELECT name FROM system.parts WHERE active AND database = currentDatabase() AND table = 'part_header_r1';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/s1/replicas/1r1/parts';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/1r1/parts';
SELECT '*** replica 2 ***';
SELECT name FROM system.parts WHERE active AND database = currentDatabase() AND table = 'part_header_r2';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/s1/replicas/1r1/parts';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00814/part_header/s1/replicas/2r1/parts';
SELECT '*** Test ALTER ***';
ALTER TABLE part_header_r1 MODIFY COLUMN y String;
@ -63,3 +83,5 @@ SELECT x, length(y) FROM part_header_r2 ORDER BY x;
DROP TABLE part_header_r1;
DROP TABLE part_header_r2;
"

View File

@ -5,16 +5,13 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
DATA_FILE=$CUR_DIR/data_orc/test.orc
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS orc_load"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE orc_load (int Int32, smallint Int8, bigint Int64, float Float32, double Float64, date Date, y String, datetime64 DateTime64(3)) ENGINE = Memory"
${CLICKHOUSE_CLIENT} --query="insert into orc_load values (0, 0, 0, 0, 0, '2019-01-01', 'test1', toDateTime64('2019-01-01 02:03:04.567', 3)), (2147483647, -1, 9223372036854775806, 123.345345, 345345.3453451212, '2019-01-01', 'test2', toDateTime64('2019-01-01 02:03:04.567', 3))"
${CLICKHOUSE_CLIENT} --query="select * from orc_load FORMAT ORC" > $DATA_FILE
${CLICKHOUSE_CLIENT} --query="select * from orc_load FORMAT ORC" > "${CLICKHOUSE_TMP}"/test.orc
${CLICKHOUSE_CLIENT} --query="truncate table orc_load"
cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC"
timeout 3 ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC" < $DATA_FILE
cat "${CLICKHOUSE_TMP}"/test.orc | ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC"
timeout 3 ${CLICKHOUSE_CLIENT} -q "insert into orc_load format ORC" < "${CLICKHOUSE_TMP}"/test.orc
${CLICKHOUSE_CLIENT} --query="select * from orc_load"
${CLICKHOUSE_CLIENT} --query="drop table orc_load"
rm -rf "$DATA_FILE"

View File

@ -283,3 +283,9 @@
1925-01-01 \N
1925-01-01
\N
1925-01-01
1969-12-31
1970-01-01
2149-06-06
2149-06-07
2283-11-11

View File

@ -118,4 +118,15 @@ select toDate32OrZero('1924-01-01'), toDate32OrNull('1924-01-01');
select toDate32OrZero(''), toDate32OrNull('');
select (select toDate32OrZero(''));
select (select toDate32OrNull(''));
SELECT toString(T.d) dateStr
FROM
(
SELECT '1925-01-01'::Date32 d
UNION ALL SELECT '1969-12-31'::Date32
UNION ALL SELECT '1970-01-01'::Date32
UNION ALL SELECT '2149-06-06'::Date32
UNION ALL SELECT '2149-06-07'::Date32
UNION ALL SELECT '2283-11-11'::Date32
) AS T
ORDER BY T.d

View File

@ -8,4 +8,5 @@
2 obmgndljgajpkeao
3 dldokmpjpgjgeanb
4 nkdlpgajngjnobme
YQrvD5XGvbx
xkOpDGxQpVB
jR

View File

@ -3,3 +3,5 @@ SET allow_experimental_hash_functions = 1;
select number, hashid(number) from system.numbers limit 5;
select number, hashid(number, 's3cr3t', 16, 'abcdefghijklmnop') from system.numbers limit 5;
select hashid(1234567890123456, 's3cr3t');
SELECT hashid(1, hashid(2));

View File

@ -0,0 +1,27 @@
/default/1-insert-testc c
/default/1-insert-testc/c c
/default/1-insert-testc/c/c c
/default/1-insert-testc/c/c d
/default/1-insert-testc/c/c e
/default/1-insert-testc/c/c f
/default/1-insert-testc/c/c kk
/default/1-insert-testc/c/c/c c
/default/1-insert-testc/c/c/c/c c
/default/1-insert-testc/c/c/c/c/c c
/default/1-insert-testc/c/c/c/c/c/c c 9
/default/1-insert-testc/c/c/c/c/c/c/c c 10
/default/1-insert-testc/c/c/d e 10
/default/1-insert-testc/c/c/d f 11
/default/1-insert-testc/c/c/d g 12
/default/1-insert-testc/c/c/e g 13
/default/1-insert-testc/c/c/f g 14
/default/1-insert-testc/c/c/kk g 14
-------------------------
/default/2-insert-testx testb z
/default/2-insert-testx testc x
/default/2-insert-testx testz y
/default/2-insert-testz c
/default/2-insert-testz/c cd
/default/2-insert-testz/c/cd dd
/default/2-insert-testz/c/cd testc
/default/2-insert-testz/c/cd/dd testc y

View File

@ -0,0 +1,43 @@
-- Tags: zookeeper
set allow_unrestricted_reads_from_keeper = 'true';
drop table if exists test_zkinsert;
create table test_zkinsert (
name String,
path String,
value String
) ENGINE Memory;
-- test recursive create and big transaction
insert into test_zkinsert (name, path, value) values ('c', '/1-insert-testc/c/c/c/c/c/c', 11), ('e', '/1-insert-testc/c/c/d', 10), ('c', '/1-insert-testc/c/c/c/c/c/c/c', 10), ('c', '/1-insert-testc/c/c/c/c/c/c', 9), ('f', '/1-insert-testc/c/c/d', 11), ('g', '/1-insert-testc/c/c/d', 12), ('g', '/1-insert-testc/c/c/e', 13), ('g', '/1-insert-testc/c/c/f', 14), ('g', '/1-insert-testc/c/c/kk', 14);
-- insert same value, suppose to have no side effects
insert into system.zookeeper (name, path, value) SELECT name, '/' || currentDatabase() || path, value from test_zkinsert;
SELECT * FROM (SELECT path, name, value FROM system.zookeeper ORDER BY path, name) WHERE path LIKE '/' || currentDatabase() || '/1-insert-test%';
SELECT '-------------------------';
-- test inserting into root path
insert into test_zkinsert (name, path, value) values ('testc', '/2-insert-testx', 'x');
insert into test_zkinsert (name, path, value) values ('testz', '/2-insert-testx', 'y');
insert into test_zkinsert (name, path, value) values ('testc', '/2-insert-testz//c/cd/dd//', 'y');
insert into test_zkinsert (name, path) values ('testc', '/2-insert-testz//c/cd/');
insert into test_zkinsert (name, value, path) values ('testb', 'z', '/2-insert-testx');
insert into system.zookeeper (name, path, value) SELECT name, '/' || currentDatabase() || path, value from test_zkinsert;
SELECT * FROM (SELECT path, name, value FROM system.zookeeper ORDER BY path, name) WHERE path LIKE '/' || currentDatabase() || '/2-insert-test%';
-- test exceptions
insert into system.zookeeper (name, value) values ('abc', 'y'); -- { serverError 36 }
insert into system.zookeeper (path, value) values ('a/b/c', 'y'); -- { serverError 36 }
insert into system.zookeeper (name, version) values ('abc', 111); -- { serverError 44 }
insert into system.zookeeper (name, versionxyz) values ('abc', 111); -- { serverError 16 }
insert into system.zookeeper (name, path, value) values ('a/b/c', '/', 'y'); -- { serverError 36 }
insert into system.zookeeper (name, path, value) values ('/', '/a/b/c', 'z'); -- { serverError 36 }
insert into system.zookeeper (name, path, value) values ('', '/', 'y'); -- { serverError 36 }
insert into system.zookeeper (name, path, value) values ('abc', '/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc/abc', 'y'); -- { serverError 36 }
drop table if exists test_zkinsert;

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT_BINARY} --query "drop user if exists u_02311"
${CLICKHOUSE_CLIENT_BINARY} --query "create user u_02311"
error="$(${CLICKHOUSE_CLIENT_BINARY} --user=u_02311 --query "insert into system.zookeeper (path, name, value) values ('//3-insert-testc/c/c/kk', 'kk', '11')" 2>&1 > /dev/null)"
echo "${error}" | grep -Fc "ACCESS_DENIED"
${CLICKHOUSE_CLIENT_BINARY} --query "drop user u_02311"

View File

@ -0,0 +1,3 @@
2022-01-01 00:00:00 1
2022-01-01 00:00:00 1
2022-01-01 00:00:00 1

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS bug_36995;
CREATE TABLE bug_36995(
`time` DateTime,
`product` String)
ENGINE = MergeTree
ORDER BY time AS
SELECT '2022-01-01 00:00:00','1';
SELECT * FROM bug_36995
WHERE (time IS NOT NULL) AND (product IN (SELECT '1'))
SETTINGS optimize_move_to_prewhere = 1;
SELECT * FROM bug_36995
WHERE (time IS NOT NULL) AND (product IN (SELECT '1'))
SETTINGS optimize_move_to_prewhere = 0;
SELECT * FROM bug_36995
PREWHERE (time IS NOT NULL) WHERE (product IN (SELECT '1'));
DROP TABLE bug_36995;

View File

@ -1 +1,3 @@
-- Tags: no-backward-compatibility-check
SELECT 'Play ClickHouse' InterSect SELECT 'Play ClickHouse'

View File

@ -0,0 +1,2 @@
[1,2.2]
[[1,2,3],[1.1,2.2,3.3]]

View File

@ -0,0 +1,2 @@
select * from values([1, 2.2]);
select * from values([[1, 2, 3], [1.1, 2.2, 3.3]]);

View File

@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# All replicas are localhost, disable `prefer_localhost_replica` option to test network interface
# Currently this feature could not work with hedged requests
# Enabling `enable_sample_offset_parallel_processing` feature could lead to intersecting marks, so some of them would be thrown away and it will lead to incorrect result of SELECT query
SETTINGS="--max_parallel_replicas=3 --use_hedged_requests=false --async_socket_for_remote=false --allow_experimental_parallel_reading_from_replicas=true"
SETTINGS="--max_parallel_replicas=3 --use_hedged_requests=false --allow_experimental_parallel_reading_from_replicas=true"
# Prepare tables
$CLICKHOUSE_CLIENT $SETTINGS -nm -q '''