Merge pull request #33849 from CurtizJ/fix-sparse-columns

Fix sparse columns
This commit is contained in:
alexey-milovidov 2022-01-21 21:13:02 +03:00 committed by GitHub
commit 43f2e2c178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 125 additions and 46 deletions

View File

@ -158,6 +158,19 @@ void SerializationInfoByName::add(const SerializationInfoByName & other)
}
}
void SerializationInfoByName::replaceData(const SerializationInfoByName & other)
{
for (const auto & [name, new_info] : other)
{
auto & old_info = (*this)[name];
if (old_info)
old_info->replaceData(*new_info);
else
old_info = new_info->clone();
}
}
void SerializationInfoByName::writeJSON(WriteBuffer & out) const
{
Poco::JSON::Object object;

View File

@ -89,6 +89,11 @@ public:
void add(const Block & block);
void add(const SerializationInfoByName & other);
/// Takes data from @other, but keeps current serialization kinds.
/// If column exists in @other infos, but not in current infos,
/// it's cloned to current infos.
void replaceData(const SerializationInfoByName & other);
void writeJSON(WriteBuffer & out) const;
void readJSON(ReadBuffer & in);
};

View File

@ -416,7 +416,7 @@ std::pair<time_t, time_t> IMergeTreeDataPart::getMinMaxTime() const
}
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos)
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
{
columns = new_columns;
@ -425,21 +425,12 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
size_t pos = 0;
for (const auto & column : columns)
{
column_name_to_position.emplace(column.name, pos++);
}
auto it = new_infos.find(column.name);
if (it != new_infos.end())
{
auto & old_info = serialization_infos[column.name];
const auto & new_info = it->second;
if (old_info)
old_info->replaceData(*new_info);
else
old_info = new_info->clone();
}
}
void IMergeTreeDataPart::setSerializationInfos(const SerializationInfoByName & new_infos)
{
serialization_infos = new_infos;
}
SerializationPtr IMergeTreeDataPart::getSerialization(const NameAndTypePair & column) const
@ -1098,7 +1089,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
if (volume->getDisk()->exists(path))
infos.readJSON(*volume->getDisk()->readFile(path));
setColumns(loaded_columns, infos);
setColumns(loaded_columns);
setSerializationInfos(infos);
}
bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const

View File

@ -128,11 +128,14 @@ public:
String getTypeName() const { return getType().toString(); }
void setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos = {});
void setColumns(const NamesAndTypesList & new_columns);
const NamesAndTypesList & getColumns() const { return columns; }
void setSerializationInfos(const SerializationInfoByName & new_infos);
const SerializationInfoByName & getSerializationInfos() const { return serialization_infos; }
SerializationInfoByName & getSerializationInfos() { return serialization_infos; }
SerializationPtr getSerialization(const NameAndTypePair & column) const;
/// Throws an exception if part is not stored in on-disk format.

View File

@ -186,7 +186,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
infos.add(part->getSerializationInfos());
}
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos);
global_ctx->new_data_part->setColumns(global_ctx->storage_columns);
global_ctx->new_data_part->setSerializationInfos(infos);
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge)

View File

@ -370,7 +370,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos);
new_data_part->setColumns(columns);
new_data_part->setSerializationInfos(infos);
new_data_part->rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
@ -468,7 +469,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos);
new_data_part->setColumns(columns);
new_data_part->setSerializationInfos(infos);
if (new_data_part->isStoredOnDisk())
{

View File

@ -72,21 +72,20 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
projection_part->checksums.getTotalSizeOnDisk(),
projection_part->checksums.getTotalChecksumUInt128());
NamesAndTypesList part_columns;
if (!total_columns_list)
part_columns = columns_list;
else
part_columns = *total_columns_list;
if (reset_columns)
{
auto part_columns = total_columns_list ? *total_columns_list : columns_list;
auto serialization_infos = new_part->getSerializationInfos();
auto & serialization_infos = reset_columns
? new_serialization_infos
: new_part->getSerializationInfos();
serialization_infos.replaceData(new_serialization_infos);
removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
new_part->setColumns(part_columns);
new_part->setSerializationInfos(serialization_infos);
}
if (new_part->isStoredOnDisk())
finalizePartOnDisk(new_part, part_columns, serialization_infos, checksums, sync);
if (reset_columns)
new_part->setColumns(part_columns, serialization_infos);
finalizePartOnDisk(new_part, checksums, sync);
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
@ -102,9 +101,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
}
void MergedBlockOutputStream::finalizePartOnDisk(
const MergeTreeData::MutableDataPartPtr & new_part,
NamesAndTypesList & part_columns,
SerializationInfoByName & serialization_infos,
const MergeTreeData::DataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums,
bool sync)
{
@ -171,13 +168,11 @@ void MergedBlockOutputStream::finalizePartOnDisk(
out->sync();
}
removeEmptyColumnsFromPart(new_part, part_columns, serialization_infos, checksums);
if (!serialization_infos.empty())
if (!new_part->getSerializationInfos().empty())
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
HashingWriteBuffer out_hashing(*out);
serialization_infos.writeJSON(out_hashing);
new_part->getSerializationInfos().writeJSON(out_hashing);
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
out->finalize();
@ -188,7 +183,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
{
/// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "columns.txt", 4096);
part_columns.writeText(*out);
new_part->getColumns().writeText(*out);
out->finalize();
if (sync)
out->sync();

View File

@ -47,9 +47,7 @@ private:
void writeImpl(const Block & block, const IColumn::Permutation * permutation);
void finalizePartOnDisk(
const MergeTreeData::MutableDataPartPtr & new_part,
NamesAndTypesList & part_columns,
SerializationInfoByName & serialization_infos,
const MergeTreeData::DataPartPtr & new_part,
MergeTreeData::DataPart::Checksums & checksums,
bool sync);

View File

@ -71,13 +71,17 @@ MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(
projection_part->checksums.getTotalChecksumUInt128());
auto columns = new_part->getColumns();
auto serialization_infos = new_part->getSerializationInfos();
serialization_infos.replaceData(new_serialization_infos);
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, new_serialization_infos, checksums);
auto removed_files = removeEmptyColumnsFromPart(new_part, columns, serialization_infos, checksums);
for (const String & removed_file : removed_files)
if (all_checksums.files.count(removed_file))
all_checksums.files.erase(removed_file);
new_part->setColumns(columns, new_serialization_infos);
new_part->setColumns(columns);
new_part->setSerializationInfos(serialization_infos);
return checksums;
}

View File

@ -1295,7 +1295,8 @@ bool MutateTask::prepare()
ctx->source_part, ctx->updated_header, ctx->storage_columns,
ctx->source_part->getSerializationInfos(), ctx->commands_for_part);
ctx->new_data_part->setColumns(new_columns, new_infos);
ctx->new_data_part->setColumns(new_columns);
ctx->new_data_part->setSerializationInfos(new_infos);
ctx->new_data_part->partition.assign(ctx->source_part->partition);
ctx->disk = ctx->new_data_part->volume->getDisk();

View File

@ -0,0 +1,12 @@
1000
id Default
s Sparse
1000
id Default
s Sparse
1000
id Default
s Sparse
1000
id Default
s Sparse

View File

@ -0,0 +1,53 @@
DROP TABLE IF EXISTS t_sparse_detach;
CREATE TABLE t_sparse_detach(id UInt64, s String)
ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.9;
INSERT INTO t_sparse_detach SELECT number, number % 20 = 0 ? toString(number) : '' FROM numbers(10000);
INSERT INTO t_sparse_detach SELECT number, number % 20 = 0 ? toString(number) : '' FROM numbers(10000);
OPTIMIZE TABLE t_sparse_detach FINAL;
SELECT count() FROM t_sparse_detach WHERE s != '';
SELECT column, serialization_kind FROM system.parts_columns
WHERE table = 't_sparse_detach' AND database = currentDatabase() AND active
ORDER BY column;
DETACH TABLE t_sparse_detach;
ATTACH TABLE t_sparse_detach;
SELECT count() FROM t_sparse_detach WHERE s != '';
SELECT column, serialization_kind FROM system.parts_columns
WHERE table = 't_sparse_detach' AND database = currentDatabase() AND active
ORDER BY column;
TRUNCATE TABLE t_sparse_detach;
ALTER TABLE t_sparse_detach
MODIFY SETTING vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_sparse_detach SELECT number, number % 20 = 0 ? toString(number) : '' FROM numbers(10000);
INSERT INTO t_sparse_detach SELECT number, number % 20 = 0 ? toString(number) : '' FROM numbers(10000);
OPTIMIZE TABLE t_sparse_detach FINAL;
SELECT count() FROM t_sparse_detach WHERE s != '';
SELECT column, serialization_kind FROM system.parts_columns
WHERE table = 't_sparse_detach' AND database = currentDatabase() AND active
ORDER BY column;
DETACH TABLE t_sparse_detach;
ATTACH TABLE t_sparse_detach;
SELECT count() FROM t_sparse_detach WHERE s != '';
SELECT column, serialization_kind FROM system.parts_columns
WHERE table = 't_sparse_detach' AND database = currentDatabase() AND active
ORDER BY column;
DROP TABLE t_sparse_detach;