mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-06 07:32:27 +00:00
Different approach to fix modulo
This commit is contained in:
parent
bd0bc8e3e7
commit
665f1aa0f6
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include <Functions/IFunction.h>
|
#include <Functions/IFunction.h>
|
||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Parsers/ASTIdentifier.h>
|
||||||
|
#include <Parsers/ASTFunction.h>
|
||||||
#include <Interpreters/ExpressionActions.h>
|
#include <Interpreters/ExpressionActions.h>
|
||||||
#include <Interpreters/ExpressionAnalyzer.h>
|
#include <Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <Interpreters/TreeRewriter.h>
|
#include <Interpreters/TreeRewriter.h>
|
||||||
@ -86,6 +87,27 @@ KeyDescription KeyDescription::getKeyFromAST(
|
|||||||
return getSortingKeyFromAST(definition_ast, columns, context, {});
|
return getSortingKeyFromAST(definition_ast, columns, context, {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void moduloToModuloLegacyRecursive(ASTPtr node_expr)
|
||||||
|
{
|
||||||
|
if (!node_expr)
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto * function_expr = node_expr->as<ASTFunction>();
|
||||||
|
if (function_expr)
|
||||||
|
{
|
||||||
|
if (function_expr->name == "modulo")
|
||||||
|
{
|
||||||
|
function_expr->name = "moduloLegacy";
|
||||||
|
}
|
||||||
|
if (function_expr->arguments)
|
||||||
|
{
|
||||||
|
auto children = function_expr->arguments->children;
|
||||||
|
for (const auto & child : children)
|
||||||
|
moduloToModuloLegacyRecursive(child);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
KeyDescription KeyDescription::getSortingKeyFromAST(
|
KeyDescription KeyDescription::getSortingKeyFromAST(
|
||||||
const ASTPtr & definition_ast,
|
const ASTPtr & definition_ast,
|
||||||
const ColumnsDescription & columns,
|
const ColumnsDescription & columns,
|
||||||
@ -93,8 +115,11 @@ KeyDescription KeyDescription::getSortingKeyFromAST(
|
|||||||
const std::optional<String> & additional_column)
|
const std::optional<String> & additional_column)
|
||||||
{
|
{
|
||||||
KeyDescription result;
|
KeyDescription result;
|
||||||
result.definition_ast = definition_ast;
|
ASTPtr adjusted_ast = definition_ast;
|
||||||
result.expression_list_ast = extractKeyExpressionList(definition_ast);
|
moduloToModuloLegacyRecursive(adjusted_ast);
|
||||||
|
|
||||||
|
result.definition_ast = adjusted_ast;
|
||||||
|
result.expression_list_ast = extractKeyExpressionList(adjusted_ast);
|
||||||
|
|
||||||
if (additional_column)
|
if (additional_column)
|
||||||
{
|
{
|
||||||
|
@ -379,7 +379,6 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
|||||||
|
|
||||||
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
ContextPtr context,
|
|
||||||
const String & part_name,
|
const String & part_name,
|
||||||
const String & replica_path,
|
const String & replica_path,
|
||||||
const String & host,
|
const String & host,
|
||||||
@ -494,7 +493,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
|||||||
if (e.code() != ErrorCodes::S3_ERROR)
|
if (e.code() != ErrorCodes::S3_ERROR)
|
||||||
throw;
|
throw;
|
||||||
/// Try again but without S3 copy
|
/// Try again but without S3 copy
|
||||||
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts,
|
||||||
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
|
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -558,7 +557,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
|||||||
|
|
||||||
MergeTreeData::DataPart::Checksums checksums;
|
MergeTreeData::DataPart::Checksums checksums;
|
||||||
return part_type == "InMemory"
|
return part_type == "InMemory"
|
||||||
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections)
|
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections)
|
||||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
|
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -566,7 +565,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
|||||||
const String & part_name,
|
const String & part_name,
|
||||||
const UUID & part_uuid,
|
const UUID & part_uuid,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
ContextPtr context,
|
|
||||||
ReservationPtr reservation,
|
ReservationPtr reservation,
|
||||||
PooledReadWriteBufferFromHTTP & in,
|
PooledReadWriteBufferFromHTTP & in,
|
||||||
size_t projections)
|
size_t projections)
|
||||||
@ -621,7 +619,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
|||||||
new_data_part->is_temp = true;
|
new_data_part->is_temp = true;
|
||||||
new_data_part->setColumns(block.getNamesAndTypesList());
|
new_data_part->setColumns(block.getNamesAndTypesList());
|
||||||
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
||||||
new_data_part->partition.create(metadata_snapshot, block, 0, context);
|
new_data_part->partition.create(metadata_snapshot, block, 0);
|
||||||
|
|
||||||
MergedBlockOutputStream part_out(
|
MergedBlockOutputStream part_out(
|
||||||
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
||||||
|
@ -65,7 +65,6 @@ public:
|
|||||||
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
||||||
MergeTreeData::MutableDataPartPtr fetchPart(
|
MergeTreeData::MutableDataPartPtr fetchPart(
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
ContextPtr context,
|
|
||||||
const String & part_name,
|
const String & part_name,
|
||||||
const String & replica_path,
|
const String & replica_path,
|
||||||
const String & host,
|
const String & host,
|
||||||
@ -107,7 +106,6 @@ private:
|
|||||||
const String & part_name,
|
const String & part_name,
|
||||||
const UUID & part_uuid,
|
const UUID & part_uuid,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
ContextPtr context,
|
|
||||||
ReservationPtr reservation,
|
ReservationPtr reservation,
|
||||||
PooledReadWriteBufferFromHTTP & in,
|
PooledReadWriteBufferFromHTTP & in,
|
||||||
size_t projections);
|
size_t projections);
|
||||||
|
@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix()
|
|||||||
|
|
||||||
void MergeTreeBlockOutputStream::write(const Block & block)
|
void MergeTreeBlockOutputStream::write(const Block & block)
|
||||||
{
|
{
|
||||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
||||||
for (auto & current_block : part_blocks)
|
for (auto & current_block : part_blocks)
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
|
@ -772,7 +772,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
|
|||||||
// Generate valid expressions for filtering
|
// Generate valid expressions for filtering
|
||||||
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast);
|
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast);
|
||||||
|
|
||||||
PartitionPruner partition_pruner(metadata_snapshot, query_info, local_context, true /* strict */);
|
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, local_context, true /* strict */);
|
||||||
if (partition_pruner.isUseless() && !valid)
|
if (partition_pruner.isUseless() && !valid)
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
@ -876,13 +876,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
{
|
{
|
||||||
/// Create and correctly initialize global WAL object
|
/// Create and correctly initialize global WAL object
|
||||||
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
|
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
|
||||||
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
|
for (auto && part : write_ahead_log->restore(metadata_snapshot))
|
||||||
parts_from_wal.push_back(std::move(part));
|
parts_from_wal.push_back(std::move(part));
|
||||||
}
|
}
|
||||||
else if (settings->in_memory_parts_enable_wal)
|
else if (settings->in_memory_parts_enable_wal)
|
||||||
{
|
{
|
||||||
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
|
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
|
||||||
for (auto && part : wal.restore(metadata_snapshot, getContext()))
|
for (auto && part : wal.restore(metadata_snapshot))
|
||||||
parts_from_wal.push_back(std::move(part));
|
parts_from_wal.push_back(std::move(part));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2350,7 +2350,7 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(c
|
|||||||
DataPartsVector parts_to_remove;
|
DataPartsVector parts_to_remove;
|
||||||
|
|
||||||
if (drop_range.min_block > drop_range.max_block)
|
if (drop_range.min_block > drop_range.max_block)
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid drop range: {}", drop_range.getPartName());
|
return parts_to_remove;
|
||||||
|
|
||||||
auto partition_range = getDataPartsPartitionRange(drop_range.partition_id);
|
auto partition_range = getDataPartsPartitionRange(drop_range.partition_id);
|
||||||
|
|
||||||
|
@ -503,7 +503,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
|
|||||||
|
|
||||||
minmax_idx_condition.emplace(
|
minmax_idx_condition.emplace(
|
||||||
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
|
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
|
||||||
partition_pruner.emplace(metadata_snapshot_base, query_info, context, false /* strict */);
|
partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */);
|
||||||
|
|
||||||
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
|
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
|
||||||
{
|
{
|
||||||
|
@ -140,8 +140,7 @@ void updateTTL(
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot)
|
||||||
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
|
||||||
{
|
{
|
||||||
BlocksWithPartition result;
|
BlocksWithPartition result;
|
||||||
if (!block || !block.rows())
|
if (!block || !block.rows())
|
||||||
@ -156,12 +155,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
|||||||
}
|
}
|
||||||
|
|
||||||
Block block_copy = block;
|
Block block_copy = block;
|
||||||
/// After expression execution partition key columns will be added to block_copy with names regarding partition function.
|
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
||||||
auto partition_key_sample_block = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context);
|
partition_key.expression->execute(block_copy);
|
||||||
|
|
||||||
ColumnRawPtrs partition_columns;
|
ColumnRawPtrs partition_columns;
|
||||||
partition_columns.reserve(partition_key_sample_block.columns());
|
partition_columns.reserve(partition_key.sample_block.columns());
|
||||||
for (const ColumnWithTypeAndName & element : partition_key_sample_block)
|
for (const ColumnWithTypeAndName & element : partition_key.sample_block)
|
||||||
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
|
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
|
||||||
|
|
||||||
PODArray<size_t> partition_num_to_first_row;
|
PODArray<size_t> partition_num_to_first_row;
|
||||||
|
@ -40,7 +40,7 @@ public:
|
|||||||
* (split rows by partition)
|
* (split rows by partition)
|
||||||
* Works deterministically: if same block was passed, function will return same result in same order.
|
* Works deterministically: if same block was passed, function will return same result in same order.
|
||||||
*/
|
*/
|
||||||
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot);
|
||||||
|
|
||||||
/** All rows must correspond to same partition.
|
/** All rows must correspond to same partition.
|
||||||
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
|
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
|
||||||
|
@ -169,79 +169,22 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
|
|||||||
out->finalize();
|
out->finalize();
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context)
|
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)
|
||||||
{
|
{
|
||||||
if (!metadata_snapshot->hasPartitionKey())
|
if (!metadata_snapshot->hasPartitionKey())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto partition_key_sample_block = executePartitionByExpression(metadata_snapshot, block, context);
|
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
||||||
size_t partition_columns_num = partition_key_sample_block.columns();
|
partition_key.expression->execute(block);
|
||||||
|
size_t partition_columns_num = partition_key.sample_block.columns();
|
||||||
value.resize(partition_columns_num);
|
value.resize(partition_columns_num);
|
||||||
const String modulo_legacy_function_name = "moduloLegacy";
|
|
||||||
|
|
||||||
for (size_t i = 0; i < partition_columns_num; ++i)
|
for (size_t i = 0; i < partition_columns_num; ++i)
|
||||||
{
|
{
|
||||||
const auto & column_name = partition_key_sample_block.getByPosition(i).name;
|
const auto & column_name = partition_key.sample_block.getByPosition(i).name;
|
||||||
auto & partition_column = block.getByName(column_name);
|
auto & partition_column = block.getByName(column_name).column;
|
||||||
|
partition_column->get(row, value[i]);
|
||||||
/// Executing partition_by expression adds new columns to passed block according to partition functions.
|
|
||||||
/// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back
|
|
||||||
/// with just `modulo`, because it was a temporary substitution.
|
|
||||||
if (column_name.starts_with(modulo_legacy_function_name))
|
|
||||||
partition_column.name = "modulo" + partition_column.name.substr(modulo_legacy_function_name.size());
|
|
||||||
|
|
||||||
partition_column.column->get(row, value[i]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool moduloToModuloLegacyRecursive(ASTPtr node_expr)
|
|
||||||
{
|
|
||||||
if (!node_expr)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
auto * function_expr = node_expr->as<ASTFunction>();
|
|
||||||
bool modulo_in_partition_key = false;
|
|
||||||
if (function_expr)
|
|
||||||
{
|
|
||||||
if (function_expr->name == "modulo")
|
|
||||||
{
|
|
||||||
function_expr->name = "moduloLegacy";
|
|
||||||
modulo_in_partition_key = true;
|
|
||||||
}
|
|
||||||
if (function_expr->arguments)
|
|
||||||
{
|
|
||||||
auto children = function_expr->arguments->children;
|
|
||||||
for (const auto & child : children)
|
|
||||||
modulo_in_partition_key |= moduloToModuloLegacyRecursive(child);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return modulo_in_partition_key;
|
|
||||||
}
|
|
||||||
|
|
||||||
Block MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context)
|
|
||||||
{
|
|
||||||
auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context);
|
|
||||||
adjusted_partition_key.expression->execute(block);
|
|
||||||
return adjusted_partition_key.sample_block;
|
|
||||||
}
|
|
||||||
|
|
||||||
KeyDescription MergeTreePartition::adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
|
||||||
{
|
|
||||||
const auto & partition_key = metadata_snapshot->getPartitionKey();
|
|
||||||
if (!partition_key.definition_ast)
|
|
||||||
return partition_key;
|
|
||||||
|
|
||||||
ASTPtr ast_copy = partition_key.definition_ast->clone();
|
|
||||||
|
|
||||||
/// Implementation of modulo function was changed from 8bit result type to 16bit. For backward compatibility partition by expression is always
|
|
||||||
/// calculated according to previous version - `moduloLegacy`.
|
|
||||||
if (moduloToModuloLegacyRecursive(ast_copy))
|
|
||||||
{
|
|
||||||
auto adjusted_partition_key = KeyDescription::getKeyFromAST(ast_copy, metadata_snapshot->columns, context);
|
|
||||||
return adjusted_partition_key;
|
|
||||||
}
|
|
||||||
|
|
||||||
return partition_key;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -41,13 +41,7 @@ public:
|
|||||||
|
|
||||||
void assign(const MergeTreePartition & other) { value.assign(other.value); }
|
void assign(const MergeTreePartition & other) { value.assign(other.value); }
|
||||||
|
|
||||||
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
|
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row);
|
||||||
|
|
||||||
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.
|
|
||||||
static Block executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context);
|
|
||||||
|
|
||||||
/// Make a modified partition key with substitution from modulo to moduloLegacy. Used in paritionPruner.
|
|
||||||
static KeyDescription adjustPartitionKey(const StorageMetadataPtr & partition_key, ContextPtr context);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
|
|||||||
init();
|
init();
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
|
||||||
{
|
{
|
||||||
std::unique_lock lock(write_mutex);
|
std::unique_lock lock(write_mutex);
|
||||||
|
|
||||||
@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
|||||||
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
|
||||||
|
|
||||||
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
||||||
part->partition.create(metadata_snapshot, block, 0, context);
|
part->partition.create(metadata_snapshot, block, 0);
|
||||||
if (metadata_snapshot->hasSortingKey())
|
if (metadata_snapshot->hasSortingKey())
|
||||||
metadata_snapshot->getSortingKey().expression->execute(block);
|
metadata_snapshot->getSortingKey().expression->execute(block);
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ public:
|
|||||||
|
|
||||||
void addPart(DataPartInMemoryPtr & part);
|
void addPart(DataPartInMemoryPtr & part);
|
||||||
void dropPart(const String & part_name);
|
void dropPart(const String & part_name);
|
||||||
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot);
|
||||||
|
|
||||||
using MinMaxBlockNumber = std::pair<Int64, Int64>;
|
using MinMaxBlockNumber = std::pair<Int64, Int64>;
|
||||||
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
|
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
|
||||||
|
@ -14,18 +14,15 @@ class PartitionPruner
|
|||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
std::unordered_map<String, bool> partition_filter_map;
|
std::unordered_map<String, bool> partition_filter_map;
|
||||||
|
const KeyDescription & partition_key;
|
||||||
/// partition_key is adjusted here (with substitution from modulo to moduloLegacy).
|
|
||||||
KeyDescription partition_key;
|
|
||||||
|
|
||||||
KeyCondition partition_condition;
|
KeyCondition partition_condition;
|
||||||
bool useless;
|
bool useless;
|
||||||
using DataPart = IMergeTreeDataPart;
|
using DataPart = IMergeTreeDataPart;
|
||||||
using DataPartPtr = std::shared_ptr<const DataPart>;
|
using DataPartPtr = std::shared_ptr<const DataPart>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
|
PartitionPruner(const KeyDescription & partition_key_, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
|
||||||
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
|
: partition_key(partition_key_)
|
||||||
, partition_condition(
|
, partition_condition(
|
||||||
query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
|
query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
|
||||||
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())
|
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())
|
||||||
|
@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
|
|||||||
if (quorum)
|
if (quorum)
|
||||||
checkQuorumPrecondition(zookeeper);
|
checkQuorumPrecondition(zookeeper);
|
||||||
|
|
||||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
||||||
|
|
||||||
for (auto & current_block : part_blocks)
|
for (auto & current_block : part_blocks)
|
||||||
{
|
{
|
||||||
|
@ -206,16 +206,18 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
|
|||||||
", local: " + DB::toString(data_format_version.toUnderType()),
|
", local: " + DB::toString(data_format_version.toUnderType()),
|
||||||
ErrorCodes::METADATA_MISMATCH);
|
ErrorCodes::METADATA_MISMATCH);
|
||||||
|
|
||||||
if (partition_key != from_zk.partition_key)
|
//if (partition_key != from_zk.partition_key)
|
||||||
throw Exception(
|
// throw Exception(
|
||||||
"Existing table metadata in ZooKeeper differs in partition key expression."
|
// "Existing table metadata in ZooKeeper differs in partition key expression."
|
||||||
" Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key,
|
// " Stored in ZooKeeper: " + from_zk.partition_key + ", local: " + partition_key,
|
||||||
ErrorCodes::METADATA_MISMATCH);
|
// ErrorCodes::METADATA_MISMATCH);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreeTableMetadata::checkEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const
|
void ReplicatedMergeTreeTableMetadata::checkEquals(const ReplicatedMergeTreeTableMetadata & from_zk, const ColumnsDescription & columns, ContextPtr context) const
|
||||||
{
|
{
|
||||||
|
std::cerr << "\n\nKSSENII partitition key: " << partition_key << ", in zk: " << from_zk.partition_key << std::endl;
|
||||||
|
//String parsed = IndicesDescription::parse(from_zk.partition_key, columns, context).toString();
|
||||||
|
|
||||||
checkImmutableFieldsEquals(from_zk);
|
checkImmutableFieldsEquals(from_zk);
|
||||||
|
|
||||||
|
@ -2466,7 +2466,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
part_desc->res_part = fetcher.fetchPart(
|
part_desc->res_part = fetcher.fetchPart(
|
||||||
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
|
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
|
||||||
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
|
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
|
||||||
|
|
||||||
/// TODO: check columns_version of fetched part
|
/// TODO: check columns_version of fetched part
|
||||||
@ -3929,7 +3929,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
|||||||
|
|
||||||
return fetcher.fetchPart(
|
return fetcher.fetchPart(
|
||||||
metadata_snapshot,
|
metadata_snapshot,
|
||||||
getContext(),
|
|
||||||
part_name,
|
part_name,
|
||||||
source_replica_path,
|
source_replica_path,
|
||||||
address.host,
|
address.host,
|
||||||
@ -4085,7 +4084,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
|
|||||||
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
|
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
|
||||||
|
|
||||||
return fetcher.fetchPart(
|
return fetcher.fetchPart(
|
||||||
metadata_snapshot, getContext(), part_name, source_replica_path,
|
metadata_snapshot, part_name, source_replica_path,
|
||||||
address.host, address.replication_port,
|
address.host, address.replication_port,
|
||||||
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
|
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
|
||||||
replaced_disk);
|
replaced_disk);
|
||||||
|
Loading…
Reference in New Issue
Block a user