fix ALTER MODIFY COLUMN from nested

This commit is contained in:
Anton Popov 2022-08-03 15:08:38 +00:00
parent a333cc4146
commit 5da32fafb3
5 changed files with 115 additions and 39 deletions

View File

@ -434,11 +434,34 @@ std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
return projections_to_recalc;
}
static std::unordered_map<String, size_t> getStreamCounts(
const MergeTreeDataPartPtr & data_part, const Names & column_names)
{
std::unordered_map<String, size_t> stream_counts;
for (const auto & column_name : column_names)
{
if (auto serialization = data_part->getSerialization(column_name))
{
auto callback = [&](const ISerialization::SubstreamPath & substream_path)
{
auto stream_name = ISerialization::getFileNameForStream(column_name, substream_path);
++stream_counts[stream_name];
};
serialization->enumerateStreams(callback);
}
}
return stream_counts;
}
/// Files, that we don't need to remove and don't need to hardlink, for example columns.txt and checksums.txt.
/// Because we will generate new versions of them after we perform mutation.
NameSet collectFilesToSkip(
static NameSet collectFilesToSkip(
const MergeTreeDataPartPtr & source_part,
const MergeTreeDataPartPtr & new_part,
const Block & updated_header,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const String & mrk_extension,
@ -446,24 +469,31 @@ NameSet collectFilesToSkip(
{
NameSet files_to_skip = source_part->getFileNamesWithoutChecksums();
auto new_stream_counts = getStreamCounts(new_part, new_part->getColumns().getNames());
auto source_updated_stream_counts = getStreamCounts(source_part, updated_header.getNames());
auto new_updated_stream_counts = getStreamCounts(new_part, updated_header.getNames());
/// Skip updated files
for (const auto & entry : updated_header)
for (const auto & [stream_name, _] : source_updated_stream_counts)
{
ISerialization::StreamCallback callback = [&](const ISerialization::SubstreamPath & substream_path)
/// If we read shared stream and do not write it
/// (e.g. while ALTER MODIFY COLUMN from array of Nested type to String),
/// we need to hardlink its files, because they will be lost otherwise.
bool need_hardlink = new_updated_stream_counts[stream_name] == 0 && new_stream_counts[stream_name] != 0;
if (!need_hardlink)
{
String stream_name = ISerialization::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
};
if (auto serialization = source_part->tryGetSerialization(entry.name))
serialization->enumerateStreams(callback);
}
}
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
for (const auto & projection : projections_to_recalc)
files_to_skip.insert(projection->getDirectoryName());
@ -482,19 +512,7 @@ static NameToNameVector collectFilesForRenames(
const String & mrk_extension)
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::unordered_map<String, size_t> stream_counts;
for (const auto & column : source_part->getColumns())
{
if (auto serialization = source_part->tryGetSerialization(column.name))
{
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column, substream_path)];
});
}
}
auto stream_counts = getStreamCounts(source_part, source_part->getColumns().getNames());
NameToNameVector rename_vector;
/// Remove old data
@ -560,26 +578,12 @@ static NameToNameVector collectFilesForRenames(
/// but were removed in new_part by MODIFY COLUMN from
/// type with higher number of streams (e.g. LowCardinality -> String).
auto collect_stream_names = [&](const auto & data_part)
{
NameSet res;
if (auto serialization = data_part->tryGetSerialization(command.column_name))
{
serialization->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
res.insert(ISerialization::getFileNameForStream(command.column_name, substream_path));
});
}
return res;
};
auto old_streams = getStreamCounts(source_part, source_part->getColumns().getNames());
auto new_streams = getStreamCounts(new_part, source_part->getColumns().getNames());
auto old_streams = collect_stream_names(source_part);
auto new_streams = collect_stream_names(new_part);
for (const auto & old_stream : old_streams)
for (const auto & [old_stream, _] : old_streams)
{
if (!new_streams.contains(old_stream))
if (!new_streams.contains(old_stream) && --stream_counts[old_stream] == 0)
{
rename_vector.emplace_back(old_stream + ".bin", "");
rename_vector.emplace_back(old_stream + mrk_extension, "");
@ -1580,6 +1584,7 @@ bool MutateTask::prepare()
ctx->files_to_skip = MutationHelpers::collectFilesToSkip(
ctx->source_part,
ctx->new_data_part,
ctx->updated_header,
ctx->indices_to_recalc,
ctx->mrk_extension,

View File

@ -0,0 +1 @@
2 1

View File

@ -0,0 +1,43 @@
DROP TABLE IF EXISTS t_modify_from_lc_1;
DROP TABLE IF EXISTS t_modify_from_lc_2;
SET allow_suspicious_low_cardinality_types = 1;
CREATE TABLE t_modify_from_lc_1
(
id UInt64,
a LowCardinality(UInt32) CODEC(NONE)
)
ENGINE = MergeTree ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
CREATE TABLE t_modify_from_lc_2
(
id UInt64,
a LowCardinality(UInt32) CODEC(NONE)
)
ENGINE = MergeTree ORDER BY tuple()
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_modify_from_lc_1 SELECT number, number FROM numbers(100000);
INSERT INTO t_modify_from_lc_2 SELECT number, number FROM numbers(100000);
OPTIMIZE TABLE t_modify_from_lc_1 FINAL;
OPTIMIZE TABLE t_modify_from_lc_2 FINAL;
ALTER TABLE t_modify_from_lc_1 MODIFY COLUMN a UInt32;
-- Check that dictionary of LowCardinality is actually
-- dropped and total size on disk is reduced.
WITH groupArray((table, bytes))::Map(String, UInt64) AS stats
SELECT
length(stats), stats['t_modify_from_lc_1'] < stats['t_modify_from_lc_2']
FROM
(
SELECT table, sum(bytes_on_disk) AS bytes FROM system.parts
WHERE database = currentDatabase() AND table LIKE 't_modify_from_lc%' AND active
GROUP BY table
);
DROP TABLE IF EXISTS t_modify_from_lc_1;
DROP TABLE IF EXISTS t_modify_from_lc_2;

View File

@ -0,0 +1,6 @@
1 [2] ['aa'] Array(String)
2 [44,55] ['bb','cc'] Array(String)
1 [2] [\'aa\'] String
2 [44,55] [\'bb\',\'cc\'] String
1 [2] [\'aa\'] String
2 [44,55] [\'bb\',\'cc\'] String

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS t_nested_modify;
CREATE TABLE t_nested_modify (id UInt64, `n.a` Array(UInt32), `n.b` Array(String))
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_nested_modify VALUES (1, [2], ['aa']);
INSERT INTO t_nested_modify VALUES (2, [44, 55], ['bb', 'cc']);
SELECT id, `n.a`, `n.b`, toTypeName(`n.b`) FROM t_nested_modify ORDER BY id;
ALTER TABLE t_nested_modify MODIFY COLUMN `n.b` String;
SELECT id, `n.a`, `n.b`, toTypeName(`n.b`) FROM t_nested_modify ORDER BY id;
DETACH TABLE t_nested_modify;
ATTACH TABLE t_nested_modify;
SELECT id, `n.a`, `n.b`, toTypeName(`n.b`) FROM t_nested_modify ORDER BY id;
DROP TABLE t_nested_modify;