mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Modulo adjustment for partition key
This commit is contained in:
parent
d303975a1c
commit
c7f51fe60e
@ -140,7 +140,7 @@ void registerFunctionModulo(FunctionFactory & factory)
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct NameModuloLegacy { static constexpr auto name = "moduloLegacy"; };
|
struct NameModuloLegacy { static constexpr auto name = "moduloLegacy"; };
|
||||||
using FunctionModuloLegacy = BinaryArithmeticOverloadResolver<ModuloLegacyImpl, NameModuloUnsigned, false>;
|
using FunctionModuloLegacy = BinaryArithmeticOverloadResolver<ModuloLegacyImpl, NameModuloLegacy, false>;
|
||||||
|
|
||||||
void registerFunctionModuloLegacy(FunctionFactory & factory)
|
void registerFunctionModuloLegacy(FunctionFactory & factory)
|
||||||
{
|
{
|
||||||
|
@ -379,6 +379,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
|||||||
|
|
||||||
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
|
ContextPtr context,
|
||||||
const String & part_name,
|
const String & part_name,
|
||||||
const String & replica_path,
|
const String & replica_path,
|
||||||
const String & host,
|
const String & host,
|
||||||
@ -493,7 +494,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
|||||||
if (e.code() != ErrorCodes::S3_ERROR)
|
if (e.code() != ErrorCodes::S3_ERROR)
|
||||||
throw;
|
throw;
|
||||||
/// Try again but without S3 copy
|
/// Try again but without S3 copy
|
||||||
return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts,
|
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
||||||
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
|
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -557,7 +558,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
|||||||
|
|
||||||
MergeTreeData::DataPart::Checksums checksums;
|
MergeTreeData::DataPart::Checksums checksums;
|
||||||
return part_type == "InMemory"
|
return part_type == "InMemory"
|
||||||
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections)
|
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections)
|
||||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
|
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -565,6 +566,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
|||||||
const String & part_name,
|
const String & part_name,
|
||||||
const UUID & part_uuid,
|
const UUID & part_uuid,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
|
ContextPtr context,
|
||||||
ReservationPtr reservation,
|
ReservationPtr reservation,
|
||||||
PooledReadWriteBufferFromHTTP & in,
|
PooledReadWriteBufferFromHTTP & in,
|
||||||
size_t projections)
|
size_t projections)
|
||||||
@ -619,7 +621,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
|||||||
new_data_part->is_temp = true;
|
new_data_part->is_temp = true;
|
||||||
new_data_part->setColumns(block.getNamesAndTypesList());
|
new_data_part->setColumns(block.getNamesAndTypesList());
|
||||||
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
||||||
new_data_part->partition.create(metadata_snapshot, block, 0);
|
new_data_part->partition.create(metadata_snapshot, block, 0, context);
|
||||||
|
|
||||||
MergedBlockOutputStream part_out(
|
MergedBlockOutputStream part_out(
|
||||||
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
||||||
|
@ -65,6 +65,7 @@ public:
|
|||||||
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
||||||
MergeTreeData::MutableDataPartPtr fetchPart(
|
MergeTreeData::MutableDataPartPtr fetchPart(
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
|
ContextPtr context,
|
||||||
const String & part_name,
|
const String & part_name,
|
||||||
const String & replica_path,
|
const String & replica_path,
|
||||||
const String & host,
|
const String & host,
|
||||||
@ -106,6 +107,7 @@ private:
|
|||||||
const String & part_name,
|
const String & part_name,
|
||||||
const UUID & part_uuid,
|
const UUID & part_uuid,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
|
ContextPtr context,
|
||||||
ReservationPtr reservation,
|
ReservationPtr reservation,
|
||||||
PooledReadWriteBufferFromHTTP & in,
|
PooledReadWriteBufferFromHTTP & in,
|
||||||
size_t projections);
|
size_t projections);
|
||||||
|
@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix()
|
|||||||
|
|
||||||
void MergeTreeBlockOutputStream::write(const Block & block)
|
void MergeTreeBlockOutputStream::write(const Block & block)
|
||||||
{
|
{
|
||||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||||
for (auto & current_block : part_blocks)
|
for (auto & current_block : part_blocks)
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
|
@ -876,13 +876,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
{
|
{
|
||||||
/// Create and correctly initialize global WAL object
|
/// Create and correctly initialize global WAL object
|
||||||
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
|
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
|
||||||
for (auto && part : write_ahead_log->restore(metadata_snapshot))
|
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
|
||||||
parts_from_wal.push_back(std::move(part));
|
parts_from_wal.push_back(std::move(part));
|
||||||
}
|
}
|
||||||
else if (settings->in_memory_parts_enable_wal)
|
else if (settings->in_memory_parts_enable_wal)
|
||||||
{
|
{
|
||||||
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
|
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
|
||||||
for (auto && part : wal.restore(metadata_snapshot))
|
for (auto && part : wal.restore(metadata_snapshot, getContext()))
|
||||||
parts_from_wal.push_back(std::move(part));
|
parts_from_wal.push_back(std::move(part));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,7 +140,8 @@ void updateTTL(
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot)
|
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||||
|
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||||
{
|
{
|
||||||
BlocksWithPartition result;
|
BlocksWithPartition result;
|
||||||
if (!block || !block.rows())
|
if (!block || !block.rows())
|
||||||
@ -155,12 +156,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
|
|||||||
}
|
}
|
||||||
|
|
||||||
Block block_copy = block;
|
Block block_copy = block;
|
||||||
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
/// After expression execution partition key columns will be added to block_copy with names regarding partition function.
|
||||||
partition_key.expression->execute(block_copy);
|
auto partition_key_sample_block = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context);
|
||||||
|
|
||||||
ColumnRawPtrs partition_columns;
|
ColumnRawPtrs partition_columns;
|
||||||
partition_columns.reserve(partition_key.sample_block.columns());
|
partition_columns.reserve(partition_key_sample_block.columns());
|
||||||
for (const ColumnWithTypeAndName & element : partition_key.sample_block)
|
for (const ColumnWithTypeAndName & element : partition_key_sample_block)
|
||||||
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
|
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
|
||||||
|
|
||||||
PODArray<size_t> partition_num_to_first_row;
|
PODArray<size_t> partition_num_to_first_row;
|
||||||
|
@ -40,7 +40,7 @@ public:
|
|||||||
* (split rows by partition)
|
* (split rows by partition)
|
||||||
* Works deterministically: if same block was passed, function will return same result in same order.
|
* Works deterministically: if same block was passed, function will return same result in same order.
|
||||||
*/
|
*/
|
||||||
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot);
|
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||||
|
|
||||||
/** All rows must correspond to same partition.
|
/** All rows must correspond to same partition.
|
||||||
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
|
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
|
||||||
|
@ -134,8 +134,17 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
|
|||||||
auto file = openForReading(disk, partition_file_path);
|
auto file = openForReading(disk, partition_file_path);
|
||||||
value.resize(partition_key_sample.columns());
|
value.resize(partition_key_sample.columns());
|
||||||
for (size_t i = 0; i < partition_key_sample.columns(); ++i)
|
for (size_t i = 0; i < partition_key_sample.columns(); ++i)
|
||||||
|
{
|
||||||
|
if (partition_key_sample.getByPosition(i).name.starts_with("modulo"))
|
||||||
|
{
|
||||||
|
SerializationNumber<Int8>().deserializeBinary(value[i], *file);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file);
|
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||||
{
|
{
|
||||||
@ -152,29 +161,75 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
|
|||||||
auto out = disk->writeFile(part_path + "partition.dat");
|
auto out = disk->writeFile(part_path + "partition.dat");
|
||||||
HashingWriteBuffer out_hashing(*out);
|
HashingWriteBuffer out_hashing(*out);
|
||||||
for (size_t i = 0; i < value.size(); ++i)
|
for (size_t i = 0; i < value.size(); ++i)
|
||||||
|
{
|
||||||
|
if (partition_key_sample.getByPosition(i).name.starts_with("modulo"))
|
||||||
|
{
|
||||||
|
SerializationNumber<Int8>().serializeBinary(value[i], out_hashing);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing);
|
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
out_hashing.next();
|
out_hashing.next();
|
||||||
checksums.files["partition.dat"].file_size = out_hashing.count();
|
checksums.files["partition.dat"].file_size = out_hashing.count();
|
||||||
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
|
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
|
||||||
out->finalize();
|
out->finalize();
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)
|
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context)
|
||||||
{
|
{
|
||||||
if (!metadata_snapshot->hasPartitionKey())
|
if (!metadata_snapshot->hasPartitionKey())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
auto partition_key_sample_block = executePartitionByExpression(metadata_snapshot, block, context);
|
||||||
partition_key.expression->execute(block);
|
size_t partition_columns_num = partition_key_sample_block.columns();
|
||||||
size_t partition_columns_num = partition_key.sample_block.columns();
|
|
||||||
value.resize(partition_columns_num);
|
value.resize(partition_columns_num);
|
||||||
|
|
||||||
for (size_t i = 0; i < partition_columns_num; ++i)
|
for (size_t i = 0; i < partition_columns_num; ++i)
|
||||||
{
|
{
|
||||||
const auto & column_name = partition_key.sample_block.getByPosition(i).name;
|
const auto & column_name = partition_key_sample_block.getByPosition(i).name;
|
||||||
const auto & partition_column = block.getByName(column_name).column;
|
const auto & partition_column = block.getByName(column_name).column;
|
||||||
partition_column->get(row, value[i]);
|
partition_column->get(row, value[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Block MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context)
|
||||||
|
{
|
||||||
|
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
||||||
|
bool adjusted_expression = false;
|
||||||
|
auto modulo_to_modulo_legacy = [&](ASTFunction * function_ast)
|
||||||
|
{
|
||||||
|
if (function_ast->name == "modulo")
|
||||||
|
{
|
||||||
|
function_ast->name = "moduloLegacy";
|
||||||
|
adjusted_expression = true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ASTPtr new_ast = partition_key.definition_ast->clone();
|
||||||
|
if (auto * function_ast = typeid_cast<ASTFunction *>(new_ast.get()))
|
||||||
|
{
|
||||||
|
if (function_ast->name == "tuple")
|
||||||
|
{
|
||||||
|
auto children = function_ast->arguments->children;
|
||||||
|
for (auto child : children)
|
||||||
|
{
|
||||||
|
if (auto * child_function_ast = typeid_cast<ASTFunction *>(child.get()))
|
||||||
|
modulo_to_modulo_legacy(child_function_ast);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
modulo_to_modulo_legacy(function_ast);
|
||||||
|
|
||||||
|
if (adjusted_expression)
|
||||||
|
{
|
||||||
|
auto adjusted_partition_key = KeyDescription::getKeyFromAST(new_ast, metadata_snapshot->columns, context);
|
||||||
|
adjusted_partition_key.expression->execute(block);
|
||||||
|
return adjusted_partition_key.sample_block;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
partition_key.expression->execute(block);
|
||||||
|
return partition_key.sample_block;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,10 @@ public:
|
|||||||
|
|
||||||
void assign(const MergeTreePartition & other) { value.assign(other.value); }
|
void assign(const MergeTreePartition & other) { value.assign(other.value); }
|
||||||
|
|
||||||
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row);
|
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
|
||||||
|
|
||||||
|
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.
|
||||||
|
static Block executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
|
|||||||
init();
|
init();
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
|
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||||
{
|
{
|
||||||
std::unique_lock lock(write_mutex);
|
std::unique_lock lock(write_mutex);
|
||||||
|
|
||||||
@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
|||||||
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
||||||
|
|
||||||
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
||||||
part->partition.create(metadata_snapshot, block, 0);
|
part->partition.create(metadata_snapshot, block, 0, context);
|
||||||
if (metadata_snapshot->hasSortingKey())
|
if (metadata_snapshot->hasSortingKey())
|
||||||
metadata_snapshot->getSortingKey().expression->execute(block);
|
metadata_snapshot->getSortingKey().expression->execute(block);
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ public:
|
|||||||
|
|
||||||
void addPart(DataPartInMemoryPtr & part);
|
void addPart(DataPartInMemoryPtr & part);
|
||||||
void dropPart(const String & part_name);
|
void dropPart(const String & part_name);
|
||||||
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot);
|
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||||
|
|
||||||
using MinMaxBlockNumber = std::pair<Int64, Int64>;
|
using MinMaxBlockNumber = std::pair<Int64, Int64>;
|
||||||
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
|
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
|
||||||
|
@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
|||||||
if (quorum)
|
if (quorum)
|
||||||
checkQuorumPrecondition(zookeeper);
|
checkQuorumPrecondition(zookeeper);
|
||||||
|
|
||||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||||
|
|
||||||
for (auto & current_block : part_blocks)
|
for (auto & current_block : part_blocks)
|
||||||
{
|
{
|
||||||
|
@ -2427,7 +2427,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
part_desc->res_part = fetcher.fetchPart(
|
part_desc->res_part = fetcher.fetchPart(
|
||||||
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
|
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
|
||||||
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
|
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
|
||||||
|
|
||||||
/// TODO: check columns_version of fetched part
|
/// TODO: check columns_version of fetched part
|
||||||
@ -3880,6 +3880,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
|||||||
|
|
||||||
return fetcher.fetchPart(
|
return fetcher.fetchPart(
|
||||||
metadata_snapshot,
|
metadata_snapshot,
|
||||||
|
getContext(),
|
||||||
part_name,
|
part_name,
|
||||||
source_replica_path,
|
source_replica_path,
|
||||||
address.host,
|
address.host,
|
||||||
@ -4035,7 +4036,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
|
|||||||
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
|
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
|
||||||
|
|
||||||
return fetcher.fetchPart(
|
return fetcher.fetchPart(
|
||||||
metadata_snapshot, part_name, source_replica_path,
|
metadata_snapshot, getContext(), part_name, source_replica_path,
|
||||||
address.host, address.replication_port,
|
address.host, address.replication_port,
|
||||||
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
|
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
|
||||||
replaced_disk);
|
replaced_disk);
|
||||||
|
@ -1,3 +1,2 @@
|
|||||||
199
|
199
|
||||||
99
|
|
||||||
57
|
57
|
||||||
|
@ -1,3 +1,2 @@
|
|||||||
SELECT moduloLegacy(199, 200);
|
SELECT moduloLegacy(199, 200);
|
||||||
SELECT moduloLegacy(299, 200);
|
|
||||||
SELECT moduloLegacy(-199, 200);
|
SELECT moduloLegacy(-199, 200);
|
||||||
|
@ -0,0 +1,63 @@
|
|||||||
|
simple partition key:
|
||||||
|
-61
|
||||||
|
-60
|
||||||
|
-59
|
||||||
|
-58
|
||||||
|
-57
|
||||||
|
-5
|
||||||
|
-4
|
||||||
|
-3
|
||||||
|
-2
|
||||||
|
-1
|
||||||
|
0
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
57
|
||||||
|
58
|
||||||
|
59
|
||||||
|
60
|
||||||
|
complex partition key:
|
||||||
|
(-1,-1)
|
||||||
|
(-1,0)
|
||||||
|
(-2,-2)
|
||||||
|
(-2,-3)
|
||||||
|
(-2,59)
|
||||||
|
(-2,60)
|
||||||
|
(0,-4)
|
||||||
|
(0,-5)
|
||||||
|
(0,-57)
|
||||||
|
(0,-58)
|
||||||
|
(0,4)
|
||||||
|
(0,57)
|
||||||
|
(0,58)
|
||||||
|
(1,-61)
|
||||||
|
(1,0)
|
||||||
|
(1,1)
|
||||||
|
(2,-59)
|
||||||
|
(2,-60)
|
||||||
|
(2,2)
|
||||||
|
(2,3)
|
||||||
|
comparison:
|
||||||
|
0 -205 -5 -5
|
||||||
|
1 -204 -4 -4
|
||||||
|
2 -203 -3 -3
|
||||||
|
3 -202 -2 -2
|
||||||
|
4 -201 -1 -1
|
||||||
|
5 -200 0 0
|
||||||
|
6 -199 57 -199
|
||||||
|
7 -198 58 -198
|
||||||
|
8 -197 59 -197
|
||||||
|
9 -196 60 -196
|
||||||
|
400 195 -61 195
|
||||||
|
401 196 -60 196
|
||||||
|
402 197 -59 197
|
||||||
|
403 198 -58 198
|
||||||
|
404 199 -57 199
|
||||||
|
405 200 0 0
|
||||||
|
406 201 1 1
|
||||||
|
407 202 2 2
|
||||||
|
408 203 3 3
|
||||||
|
409 204 4 4
|
21
tests/queries/0_stateless/01870_modulo_partition_key.sql
Normal file
21
tests/queries/0_stateless/01870_modulo_partition_key.sql
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
SELECT 'simple partition key:';
|
||||||
|
DROP TABLE IF EXISTS table1;
|
||||||
|
CREATE TABLE table1 (id Int64, v UInt64)
|
||||||
|
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/test/tables/table', '1', v)
|
||||||
|
PARTITION BY id % 200 ORDER BY id;
|
||||||
|
INSERT INTO table1 SELECT number-205, number FROM numbers(10);
|
||||||
|
INSERT INTO table1 SELECT number-205, number FROM numbers(400, 10);
|
||||||
|
SELECT toInt64(partition) as p FROM system.parts WHERE table='table1' ORDER BY p;
|
||||||
|
|
||||||
|
SELECT 'complex partition key:';
|
||||||
|
DROP TABLE IF EXISTS table2;
|
||||||
|
CREATE TABLE table2 (id Int64, v UInt64)
|
||||||
|
ENGINE = MergeTree()
|
||||||
|
PARTITION BY (toInt32(id / 2) % 3, id % 200) ORDER BY id;
|
||||||
|
INSERT INTO table2 SELECT number-205, number FROM numbers(10);
|
||||||
|
INSERT INTO table2 SELECT number-205, number FROM numbers(400, 10);
|
||||||
|
SELECT partition as p FROM system.parts WHERE table='table2' ORDER BY p;
|
||||||
|
|
||||||
|
SELECT 'comparison:';
|
||||||
|
SELECT v, v-205 as vv, moduloLegacy(vv, 200), modulo(vv, 200) FROM table1 ORDER BY v;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user