Merge pull request #24157 from kssenii/modulo-partition-key

Adjust modulo function when used in partition key
This commit is contained in:
alesapin 2021-05-26 10:30:54 +03:00 committed by GitHub
commit 069d45790f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 395 additions and 43 deletions

View File

@ -116,6 +116,12 @@ template <typename A, typename B> struct ResultOfModulo
using Type = std::conditional_t<std::is_floating_point_v<A> || std::is_floating_point_v<B>, Float64, Type0>;
};
template <typename A, typename B> struct ResultOfModuloLegacy
{
using Type0 = typename Construct<is_signed_v<A> || is_signed_v<B>, false, sizeof(B)>::Type;
using Type = std::conditional_t<std::is_floating_point_v<A> || std::is_floating_point_v<B>, Float64, Type0>;
};
template <typename A> struct ResultOfNegate
{
using Type = typename Construct<

View File

@ -172,4 +172,10 @@ struct ModuloImpl
#endif
};
template <typename A, typename B>
struct ModuloLegacyImpl : ModuloImpl<A, B>
{
using ResultType = typename NumberTraits::ResultOfModuloLegacy<A, B>::Type;
};
}

View File

@ -96,6 +96,11 @@ struct ModuloByConstantImpl
}
};
template <typename A, typename B>
struct ModuloLegacyByConstantImpl : ModuloByConstantImpl<A, B>
{
using Op = ModuloLegacyImpl<A, B>;
};
}
/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
@ -134,4 +139,12 @@ void registerFunctionModulo(FunctionFactory & factory)
factory.registerAlias("mod", "modulo", FunctionFactory::CaseInsensitive);
}
struct NameModuloLegacy { static constexpr auto name = "moduloLegacy"; };
using FunctionModuloLegacy = BinaryArithmeticOverloadResolver<ModuloLegacyImpl, NameModuloLegacy, false>;
void registerFunctionModuloLegacy(FunctionFactory & factory)
{
factory.registerFunction<FunctionModuloLegacy>();
}
}

View File

@ -11,6 +11,7 @@ void registerFunctionIntDiv(FunctionFactory & factory);
void registerFunctionIntDivOrZero(FunctionFactory & factory);
void registerFunctionModulo(FunctionFactory & factory);
void registerFunctionModuloOrZero(FunctionFactory & factory);
void registerFunctionModuloLegacy(FunctionFactory & factory);
void registerFunctionNegate(FunctionFactory & factory);
void registerFunctionAbs(FunctionFactory & factory);
void registerFunctionBitAnd(FunctionFactory & factory);
@ -51,6 +52,7 @@ void registerFunctionsArithmetic(FunctionFactory & factory)
registerFunctionIntDivOrZero(factory);
registerFunctionModulo(factory);
registerFunctionModuloOrZero(factory);
registerFunctionModuloLegacy(factory);
registerFunctionNegate(factory);
registerFunctionAbs(factory);
registerFunctionBitAnd(factory);

View File

@ -2,6 +2,7 @@
#include <Functions/IFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
@ -86,6 +87,30 @@ KeyDescription KeyDescription::getKeyFromAST(
return getSortingKeyFromAST(definition_ast, columns, context, {});
}
bool KeyDescription::moduloToModuloLegacyRecursive(ASTPtr node_expr)
{
if (!node_expr)
return false;
auto * function_expr = node_expr->as<ASTFunction>();
bool modulo_in_ast = false;
if (function_expr)
{
if (function_expr->name == "modulo")
{
function_expr->name = "moduloLegacy";
modulo_in_ast = true;
}
if (function_expr->arguments)
{
auto children = function_expr->arguments->children;
for (const auto & child : children)
modulo_in_ast |= moduloToModuloLegacyRecursive(child);
}
}
return modulo_in_ast;
}
KeyDescription KeyDescription::getSortingKeyFromAST(
const ASTPtr & definition_ast,
const ColumnsDescription & columns,

View File

@ -69,6 +69,9 @@ struct KeyDescription
/// unintentionaly share AST variables and modify them.
KeyDescription(const KeyDescription & other);
KeyDescription & operator=(const KeyDescription & other);
/// Substitute modulo with moduloLegacy. Used in KeyCondition to allow proper comparison with keys.
static bool moduloToModuloLegacyRecursive(ASTPtr node_expr);
};
}

View File

@ -379,6 +379,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
@ -520,7 +521,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (e.code() != ErrorCodes::S3_ERROR)
throw;
/// Try again but without S3 copy
return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts,
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
}
}
@ -584,7 +585,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections)
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
}
@ -592,6 +593,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections)
@ -646,7 +648,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0);
new_data_part->partition.create(metadata_snapshot, block, 0, context);
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));

View File

@ -65,6 +65,7 @@ public:
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
@ -106,6 +107,7 @@ private:
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections);

View File

@ -21,6 +21,7 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/KeyDescription.h>
#include <cassert>
#include <stack>
@ -591,6 +592,30 @@ void KeyCondition::traverseAST(const ASTPtr & node, ContextPtr context, Block &
rpn.emplace_back(std::move(element));
}
bool KeyCondition::canConstantBeWrapped(const ASTPtr & node, const String & expr_name, String & result_expr_name)
{
const auto & sample_block = key_expr->getSampleBlock();
/// sample_block from key_expr cannot contain modulo and moduloLegacy at the same time.
/// For partition key it is always moduloLegacy.
if (sample_block.has(expr_name))
{
result_expr_name = expr_name;
}
else
{
auto adjusted_ast = node->clone();
KeyDescription::moduloToModuloLegacyRecursive(adjusted_ast);
String adjusted_expr_name = adjusted_ast->getColumnName();
if (!sample_block.has(adjusted_expr_name))
return false;
result_expr_name = adjusted_expr_name;
}
return true;
}
bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
@ -600,11 +625,13 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
DataTypePtr & out_type)
{
// Constant expr should use alias names if any
String expr_name = node->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
String passed_expr_name = node->getColumnName();
String expr_name;
if (!canConstantBeWrapped(node, passed_expr_name, expr_name))
return false;
const auto & sample_block = key_expr->getSampleBlock();
/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;
@ -668,11 +695,13 @@ bool KeyCondition::canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type)
{
// Constant expr should use alias names if any
String expr_name = ast->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
String passed_expr_name = ast->getColumnName();
String expr_name;
if (!canConstantBeWrapped(ast, passed_expr_name, expr_name))
return false;
const auto & sample_block = key_expr->getSampleBlock();
/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;

View File

@ -419,6 +419,12 @@ private:
bool canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type);
/// Check if ASTPtr node, passed to canConstantBeWrappedBy*, can be used by them for further checks.
/// Always call this method at start of other methods, which require key comparison, because it also checks if adjusted
/// key expression can also be used (with substitution from modulo to moduloLegacy). This is needed because partition key
/// is always modified, when passed into keyCondition, - with recursive substitution from modulo to moduloLegacy.
bool canConstantBeWrapped(const ASTPtr & node, const String & expr_name, String & result_expr_name);
/// If it's possible to make an RPNElement
/// that will filter values (possibly tuples) by the content of 'prepared_set',
/// do it and return true.

View File

@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix()
void MergeTreeBlockOutputStream::write(const Block & block)
{
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
{
Stopwatch watch;

View File

@ -773,7 +773,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast);
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, local_context, true /* strict */);
PartitionPruner partition_pruner(metadata_snapshot, query_info, local_context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};
@ -877,13 +877,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
/// Create and correctly initialize global WAL object
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot))
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot))
for (auto && part : wal.restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
}

View File

@ -503,7 +503,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
minmax_idx_condition.emplace(
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */);
partition_pruner.emplace(metadata_snapshot_base, query_info, context, false /* strict */);
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
{

View File

@ -140,7 +140,8 @@ void updateTTL(
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot)
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -155,12 +156,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
}
Block block_copy = block;
const auto & partition_key = metadata_snapshot->getPartitionKey();
partition_key.expression->execute(block_copy);
/// After expression execution partition key columns will be added to block_copy with names regarding partition function.
auto partition_key_names_and_types = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context);
ColumnRawPtrs partition_columns;
partition_columns.reserve(partition_key.sample_block.columns());
for (const ColumnWithTypeAndName & element : partition_key.sample_block)
partition_columns.reserve(partition_key_names_and_types.size());
for (const auto & element : partition_key_names_and_types)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
PODArray<size_t> partition_num_to_first_row;

View File

@ -39,7 +39,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot);
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.

View File

@ -129,7 +129,7 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
if (!metadata_snapshot->hasPartitionKey())
return;
const auto & partition_key_sample = metadata_snapshot->getPartitionKey().sample_block;
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
auto partition_file_path = part_path + "partition.dat";
auto file = openForReading(disk, partition_file_path);
value.resize(partition_key_sample.columns());
@ -140,7 +140,7 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & partition_key_sample = metadata_snapshot->getPartitionKey().sample_block;
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
store(partition_key_sample, disk, part_path, checksums);
}
@ -153,28 +153,62 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
HashingWriteBuffer out_hashing(*out);
for (size_t i = 0; i < value.size(); ++i)
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing);
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->finalize();
}
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context)
{
if (!metadata_snapshot->hasPartitionKey())
return;
const auto & partition_key = metadata_snapshot->getPartitionKey();
partition_key.expression->execute(block);
size_t partition_columns_num = partition_key.sample_block.columns();
value.resize(partition_columns_num);
auto partition_key_names_and_types = executePartitionByExpression(metadata_snapshot, block, context);
value.resize(partition_key_names_and_types.size());
for (size_t i = 0; i < partition_columns_num; ++i)
/// Executing partition_by expression adds new columns to passed block according to partition functions.
/// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back
/// with just `modulo`, because it was a temporary substitution.
static constexpr auto modulo_legacy_function_name = "moduloLegacy";
size_t i = 0;
for (const auto & element : partition_key_names_and_types)
{
const auto & column_name = partition_key.sample_block.getByPosition(i).name;
const auto & partition_column = block.getByName(column_name).column;
partition_column->get(row, value[i]);
auto & partition_column = block.getByName(element.name);
if (element.name.starts_with(modulo_legacy_function_name))
partition_column.name = "modulo" + partition_column.name.substr(std::strlen(modulo_legacy_function_name));
partition_column.column->get(row, value[i++]);
}
}
NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context)
{
auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context);
adjusted_partition_key.expression->execute(block);
return adjusted_partition_key.sample_block.getNamesAndTypesList();
}
KeyDescription MergeTreePartition::adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
if (!partition_key.definition_ast)
return partition_key;
ASTPtr ast_copy = partition_key.definition_ast->clone();
/// Implementation of modulo function was changed from 8bit result type to 16bit. For backward compatibility partition by expression is always
/// calculated according to previous version - `moduloLegacy`.
if (KeyDescription::moduloToModuloLegacyRecursive(ast_copy))
{
auto adjusted_partition_key = KeyDescription::getKeyFromAST(ast_copy, metadata_snapshot->columns, context);
return adjusted_partition_key;
}
return partition_key;
}
}

View File

@ -3,9 +3,9 @@
#include <common/types.h>
#include <Disks/IDisk.h>
#include <IO/WriteBuffer.h>
#include <Storages/KeyDescription.h>
#include <Core/Field.h>
namespace DB
{
@ -41,7 +41,13 @@ public:
void assign(const MergeTreePartition & other) { value = other.value; }
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row);
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.
static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context);
/// Make a modified partition key with substitution from modulo to moduloLegacy. Used in paritionPruner.
static KeyDescription adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
};
}

View File

@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
init();
}
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::unique_lock lock(write_mutex);
@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0);
part->partition.create(metadata_snapshot, block, 0, context);
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);

View File

@ -62,7 +62,7 @@ public:
void addPart(DataPartInMemoryPtr & part);
void dropPart(const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot);
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);

View File

@ -14,15 +14,18 @@ class PartitionPruner
{
private:
std::unordered_map<String, bool> partition_filter_map;
const KeyDescription & partition_key;
/// partition_key is adjusted here (with substitution from modulo to moduloLegacy).
KeyDescription partition_key;
KeyCondition partition_condition;
bool useless;
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
public:
PartitionPruner(const KeyDescription & partition_key_, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
: partition_key(partition_key_)
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
, partition_condition(
query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())

View File

@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (quorum)
checkQuorumPrecondition(zookeeper);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
{

View File

@ -2500,7 +2500,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
part_desc->res_part = fetcher.fetchPart(
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -2616,7 +2616,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
ErrorCodes::LOGICAL_ERROR);
return fetcher.fetchPart(
metadata_snapshot, entry.new_part_name, source_replica_path,
metadata_snapshot, getContext(), entry.new_part_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, true);
};
@ -4016,6 +4016,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
return fetcher.fetchPart(
metadata_snapshot,
getContext(),
part_name,
source_replica_path,
address.host,
@ -4171,7 +4172,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
return fetcher.fetchPart(
metadata_snapshot, part_name, source_replica_path,
metadata_snapshot, getContext(), part_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
replaced_disk);

View File

@ -0,0 +1,28 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server', tag='21.2', with_installed_binary=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_modulo_partition_key_after_update(start_cluster):
node1.query("CREATE TABLE test (id Int64, v UInt64, value String) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/table1', '1', v) PARTITION BY id % 20 ORDER BY (id, v)")
node1.query("INSERT INTO test SELECT number, number, toString(number) FROM numbers(10)")
expected = node1.query("SELECT number, number, toString(number) FROM numbers(10)")
partition_data = node1.query("SELECT partition, name FROM system.parts WHERE table='test' ORDER BY partition")
assert(expected == node1.query("SELECT * FROM test ORDER BY id"))
node1.restart_with_latest_version(signal=9)
assert(expected == node1.query("SELECT * FROM test ORDER BY id"))
assert(partition_data == node1.query("SELECT partition, name FROM system.parts WHERE table='test' ORDER BY partition"))

View File

@ -0,0 +1,2 @@
199
57

View File

@ -0,0 +1,2 @@
SELECT moduloLegacy(199, 200);
SELECT moduloLegacy(-199, 200);

View File

@ -0,0 +1,130 @@
simple partition key:
-61
-60
-59
-58
-57
-5
-4
-3
-2
-1
0
0
1
2
3
4
57
58
59
60
where id % 200 = +-2:
-202
202
where id % 200 > 0:
195
196
197
198
199
201
202
203
204
where id % 200 < 0:
-205
-204
-203
-202
-201
-199
-198
-197
-196
tuple as partition key:
(-1,-1)
(-1,0)
(-2,-2)
(-2,-3)
(-2,59)
(-2,60)
(0,-4)
(0,-5)
(0,-57)
(0,-58)
(0,4)
(0,57)
(0,58)
(1,-61)
(1,0)
(1,1)
(2,-59)
(2,-60)
(2,2)
(2,3)
recursive modulo partition key:
(-1,-1,0)
(-2,-2,-1)
(-3,-3,-2)
(-4,-4,-2)
(-5,-5,-2)
(-57,-7,-28)
(-58,-8,-29)
(-59,-9,-30)
(-60,0,-30)
(-61,-1,-30)
(0,0,0)
(0,0,0)
(1,1,0)
(2,2,1)
(3,3,2)
(4,4,2)
(57,7,28)
(58,8,29)
(59,9,30)
(60,0,30)
After detach:
(-1,-1,0)
(-2,-2,-1)
(-3,-3,-2)
(-4,-4,-2)
(-5,-5,-2)
(-57,-7,-28)
(-58,-8,-29)
(-59,-9,-30)
(-60,0,-30)
(-61,-1,-30)
(0,0,0)
(0,0,0)
(1,1,0)
(2,2,1)
(3,3,2)
(4,4,2)
(57,7,28)
(58,8,29)
(59,9,30)
(60,0,30)
Indexes:
100
comparison:
0 -205 -5 -5
1 -204 -4 -4
2 -203 -3 -3
3 -202 -2 -2
4 -201 -1 -1
5 -200 0 0
6 -199 -199 57
7 -198 -198 58
8 -197 -197 59
9 -196 -196 60
400 195 195 -61
401 196 196 -60
402 197 197 -59
403 198 198 -58
404 199 199 -57
405 200 0 0
406 201 1 1
407 202 2 2
408 203 3 3
409 204 4 4

View File

@ -0,0 +1,50 @@
SELECT 'simple partition key:';
DROP TABLE IF EXISTS table1 SYNC;
CREATE TABLE table1 (id Int64, v UInt64)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/test/tables/table12', '1', v)
PARTITION BY id % 200 ORDER BY id;
INSERT INTO table1 SELECT number-205, number FROM numbers(10);
INSERT INTO table1 SELECT number-205, number FROM numbers(400, 10);
SELECT toInt64(partition) as p FROM system.parts WHERE table='table1' and database=currentDatabase() ORDER BY p;
select 'where id % 200 = +-2:';
select id from table1 where id % 200 = 2 OR id % 200 = -2 order by id;
select 'where id % 200 > 0:';
select id from table1 where id % 200 > 0 order by id;
select 'where id % 200 < 0:';
select id from table1 where id % 200 < 0 order by id;
SELECT 'tuple as partition key:';
DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (id Int64, v UInt64)
ENGINE = MergeTree()
PARTITION BY (toInt32(id / 2) % 3, id % 200) ORDER BY id;
INSERT INTO table2 SELECT number-205, number FROM numbers(10);
INSERT INTO table2 SELECT number-205, number FROM numbers(400, 10);
SELECT partition as p FROM system.parts WHERE table='table2' and database=currentDatabase() ORDER BY p;
SELECT 'recursive modulo partition key:';
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (id Int64, v UInt64)
ENGINE = MergeTree()
PARTITION BY (id % 200, (id % 200) % 10, toInt32(round((id % 200) / 2, 0))) ORDER BY id;
INSERT INTO table3 SELECT number-205, number FROM numbers(10);
INSERT INTO table3 SELECT number-205, number FROM numbers(400, 10);
SELECT partition as p FROM system.parts WHERE table='table3' and database=currentDatabase() ORDER BY p;
DETACH TABLE table3;
ATTACH TABLE table3;
SELECT 'After detach:';
SELECT partition as p FROM system.parts WHERE table='table3' and database=currentDatabase() ORDER BY p;
SELECT 'Indexes:';
DROP TABLE IF EXISTS table4;
CREATE TABLE table4 (id Int64, v UInt64, s String,
INDEX a (id * 2, s) TYPE minmax GRANULARITY 3
) ENGINE = MergeTree() PARTITION BY id % 10 ORDER BY v;
INSERT INTO table4 SELECT number, number, toString(number) FROM numbers(1000);
SELECT count() FROM table4 WHERE id % 10 = 7;
SELECT 'comparison:';
SELECT v, v-205 as vv, modulo(vv, 200), moduloLegacy(vv, 200) FROM table1 ORDER BY v;

View File

@ -723,6 +723,7 @@
"01850_dist_INSERT_preserve_error", // uses cluster with different static databases shard_0/shard_1
"01821_table_comment",
"01710_projection_fetch",
"01870_modulo_partition_key",
"01870_buffer_flush" // creates database
]
}