support rename

This commit is contained in:
Han Fei 2023-10-04 00:58:26 +02:00
parent d6c1c0e805
commit ba878fb43a
9 changed files with 74 additions and 30 deletions

View File

@ -727,6 +727,8 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
rename_visitor.visit(column_to_modify.default_desc.expression);
if (column_to_modify.ttl)
rename_visitor.visit(column_to_modify.ttl);
if (column_to_modify.name == column_name && column_to_modify.stat)
column_to_modify.stat->column_name = rename_to;
});
}
if (metadata.table_ttl.definition_ast)

View File

@ -331,7 +331,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeStatistics(const Block
{
for (const auto & stat_ptr : stats)
{
stat_ptr->update(block);
stat_ptr->update(block.getByName(stat_ptr->columnName()).column);
}
}

View File

@ -275,7 +275,7 @@ void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const RPNBuilderTree
cond.selectivity = estimator.estimateSelectivity(node);
if (node.getASTNode() != nullptr)
LOG_TEST(log, "Condition {} has selectivity {}", node.getASTNode()->dumpTree(), cond.selectivity);
LOG_TRACE(log, "Condition {} has selectivity {}", node.getASTNode()->dumpTree(), cond.selectivity);
}
if (where_optimizer_context.move_primary_key_columns_to_end_of_prewhere)

View File

@ -733,6 +733,10 @@ static NameToNameVector collectFilesForRenames(
if (auto serialization = source_part->tryGetSerialization(command.column_name))
serialization->enumerateStreams(callback);
/// if we rename a column with statistic, we should also rename the stat file.
if (source_part->checksums.has(STAT_FILE_PREFIX + command.column_name + STAT_FILE_SUFFIX))
add_rename(STAT_FILE_PREFIX + command.column_name + STAT_FILE_SUFFIX, STAT_FILE_PREFIX + command.rename_to + STAT_FILE_SUFFIX);
}
else if (command.type == MutationCommand::Type::READ_COLUMN)
{
@ -919,7 +923,7 @@ struct MutationContext
std::set<MergeTreeIndexPtr> indices_to_recalc;
std::set<StatisticPtr> stats_to_recalc;
std::set<ProjectionDescriptionRawPtr> projections_to_recalc;
MergeTreeData::DataPart::Checksums existing_indices_checksums;
MergeTreeData::DataPart::Checksums existing_indices_stats_checksums;
NameSet files_to_skip;
NameToNameVector files_to_rename;
@ -1340,6 +1344,8 @@ private:
NameSet removed_indices;
NameSet removed_stats;
/// A stat file need to be renamed iff the column is renamed.
NameToNameMap renamed_stats;
for (const auto & command : ctx->for_file_renames)
{
if (command.type == MutationCommand::DROP_INDEX)
@ -1347,6 +1353,9 @@ private:
else if (command.type == MutationCommand::DROP_STATISTIC)
for (const auto & column_name : command.statistic_columns)
removed_stats.insert(column_name);
else if (command.type == MutationCommand::RENAME_COLUMN
&& ctx->source_part->checksums.files.contains(STAT_FILE_PREFIX + command.column_name + STAT_FILE_SUFFIX))
renamed_stats[STAT_FILE_PREFIX + command.column_name + STAT_FILE_SUFFIX] = STAT_FILE_PREFIX + command.rename_to + STAT_FILE_SUFFIX;
}
bool is_full_part_storage = isFullPartStorage(ctx->new_data_part->getDataPartStorage());
@ -1376,7 +1385,7 @@ private:
break;
entries_to_hardlink.insert(it->first);
ctx->existing_indices_checksums.addFile(it->first, it->second.file_size, it->second.file_hash);
ctx->existing_indices_stats_checksums.addFile(it->first, it->second.file_size, it->second.file_hash);
++it;
}
}
@ -1395,15 +1404,15 @@ private:
}
else
{
/// We only hard-link statistics which
/// 1. not in `DROP STATISTIC` statement. It is filtered by `removed_stats`
/// 2. not in column list anymore, including `DROP COLUMN`. It is not touched by this loop.
auto prefix = fmt::format("{}{}.", STAT_FILE_PREFIX, col.name);
auto it = ctx->source_part->checksums.files.upper_bound(prefix);
if (it != ctx->source_part->checksums.files.end() && startsWith(it->first, prefix))
/// We do not hard-link statistics which
/// 1. In `DROP STATISTIC` statement. It is filtered by `removed_stats`
/// 2. Not in column list anymore, including `DROP COLUMN`. It is not touched by this loop.
String stat_file_name = STAT_FILE_PREFIX + col.name + STAT_FILE_SUFFIX;
auto it = ctx->source_part->checksums.files.find(stat_file_name);
if (it != ctx->source_part->checksums.files.end())
{
entries_to_hardlink.insert(it->first);
ctx->existing_indices_checksums.addFile(it->first, it->second.file_size, it->second.file_hash);
ctx->existing_indices_stats_checksums.addFile(it->first, it->second.file_size, it->second.file_hash);
}
}
}
@ -1441,9 +1450,18 @@ private:
for (auto it = ctx->source_part->getDataPartStorage().iterate(); it->isValid(); it->next())
{
if (!entries_to_hardlink.contains(it->name()))
continue;
if (it->isFile())
{
if (renamed_stats.contains(it->name()))
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), it->name(), renamed_stats.at(it->name()));
hardlinked_files.insert(it->name());
/// Also we need to "rename" checksums to finalize correctly.
const auto & check_sum = ctx->source_part->checksums.files.at(it->name());
ctx->existing_indices_stats_checksums.addFile(renamed_stats.at(it->name()), check_sum.file_size, check_sum.file_hash);
}
}
else if (it->isFile())
{
ctx->new_data_part->getDataPartStorage().createHardLinkFrom(
ctx->source_part->getDataPartStorage(), it->name(), it->name());
@ -1526,7 +1544,7 @@ private:
ctx->mutating_pipeline.reset();
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->finalizePart(
ctx->new_data_part, ctx->need_sync, nullptr, &ctx->existing_indices_checksums);
ctx->new_data_part, ctx->need_sync, nullptr, &ctx->existing_indices_stats_checksums);
ctx->out.reset();
}

View File

@ -17,9 +17,20 @@ namespace ErrorCodes
extern const int ILLEGAL_STATISTIC;
}
void TDigestStatistic::update(const ColumnPtr & column)
{
size_t size = column->size();
for (size_t i = 0; i < size; ++i)
{
/// TODO: support more types.
Float64 value = column->getFloat64(i);
data.add(value, 1);
}
}
StatisticPtr TDigestCreator(const StatisticDescription & stat)
{
/// TODO: check column data types.
return StatisticPtr(new TDigestStatistic(stat));
}

View File

@ -35,7 +35,6 @@ public:
}
virtual ~IStatistic() = default;
/// statistic_[col_name]_[type]
String getFileName() const
{
return STAT_FILE_PREFIX + columnName();
@ -50,7 +49,7 @@ public:
virtual void deserialize(ReadBuffer & buf) = 0;
virtual void update(const Block & block) = 0;
virtual void update(const ColumnPtr & column) = 0;
virtual UInt64 count() = 0;
@ -84,18 +83,7 @@ public:
data.deserialize(buf);
}
void update(const Block & block) override
{
const auto & column_with_type = block.getByName(columnName());
size_t size = block.rows();
for (size_t i = 0; i < size; ++i)
{
/// TODO: support more types.
Float64 value = column_with_type.column->getFloat64(i);
data.add(value, 1);
}
}
void update(const ColumnPtr & column) override;
UInt64 count() override
{

View File

@ -86,6 +86,18 @@ def run_test_single_node(started_cluster):
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_6", "b", True)
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_6", "c", False)
node1.query("ALTER TABLE test_stat RENAME COLUMN b TO c")
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_7", "a", True)
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_7", "b", False)
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_7", "c", True)
node1.query("ALTER TABLE test_stat RENAME COLUMN c TO b")
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_8", "a", True)
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_8", "b", True)
check_stat_file_on_disk(node1, "test_stat", "all_1_1_0_8", "c", False)
def test_single_node_wide(started_cluster):
node1.query("DROP TABLE IF EXISTS test_stat")

View File

@ -23,3 +23,9 @@ SELECT count()
FROM t1
PREWHERE (a < 10) AND (b < 10)
20
CREATE TABLE default.t1\n(\n `a` Float64 STATISTIC(tdigest),\n `c` Int64 STATISTIC(tdigest),\n `pk` String\n)\nENGINE = MergeTree\nORDER BY pk\nSETTINGS min_bytes_for_wide_part = 0, index_granularity = 8192
After rename
SELECT count()
FROM t1
PREWHERE (a < 10) AND (c < 10)
20

View File

@ -47,4 +47,11 @@ SELECT 'After merge';
EXPLAIN SYNTAX SELECT count(*) FROM t1 WHERE b < 10 and a < 10;
SELECT count(*) FROM t1 WHERE b < 10 and a < 10;
ALTER TABLE t1 RENAME COLUMN b TO c;
SHOW CREATE TABLE t1;
SELECT 'After rename';
EXPLAIN SYNTAX SELECT count(*) FROM t1 WHERE c < 10 and a < 10;
SELECT count(*) FROM t1 WHERE c < 10 and a < 10;
DROP TABLE IF EXISTS t1;