Merge pull request #49660 from CurtizJ/fix-sparse-columns-reload

Fix reading from sparse columns after restart
This commit is contained in:
Alexey Milovidov 2023-05-10 18:08:22 +03:00 committed by GitHub
commit 4a4eb5b171
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 10 deletions

View File

@ -246,7 +246,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const
return writeString(oss.str(), out);
}
void SerializationInfoByName::readJSON(ReadBuffer & in)
SerializationInfoByName SerializationInfoByName::readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in)
{
String json_str;
readString(json_str, in);
@ -262,8 +263,13 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
"Unknown version of serialization infos ({}). Should be less or equal than {}",
object->getValue<size_t>(KEY_VERSION), SERIALIZATION_INFO_VERSION);
SerializationInfoByName infos;
if (object->has(KEY_COLUMNS))
{
std::unordered_map<std::string_view, const IDataType *> column_type_by_name;
for (const auto & [name, type] : columns)
column_type_by_name.emplace(name, type.get());
auto array = object->getArray(KEY_COLUMNS);
for (const auto & elem : *array)
{
@ -271,13 +277,22 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
if (!elem_object->has(KEY_NAME))
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns", KEY_NAME);
"Missed field '{}' in serialization infos", KEY_NAME);
auto name = elem_object->getValue<String>(KEY_NAME);
if (auto it = find(name); it != end())
it->second->fromJSON(*elem_object);
}
auto it = column_type_by_name.find(name);
if (it == column_type_by_name.end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Found unexpected column '{}' in serialization infos", name);
auto info = it->second->createSerializationInfo(settings);
info->fromJSON(*elem_object);
infos.emplace(name, std::move(info));
}
}
return infos;
}
}

View File

@ -96,8 +96,10 @@ using MutableSerializationInfos = std::vector<MutableSerializationInfoPtr>;
class SerializationInfoByName : public std::map<String, MutableSerializationInfoPtr>
{
public:
using Settings = SerializationInfo::Settings;
SerializationInfoByName() = default;
SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings);
SerializationInfoByName(const NamesAndTypesList & columns, const Settings & settings);
void add(const Block & block);
void add(const SerializationInfoByName & other);
@ -108,7 +110,9 @@ public:
void replaceData(const SerializationInfoByName & other);
void writeJSON(WriteBuffer & out) const;
void readJSON(ReadBuffer & in);
static SerializationInfoByName readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in);
};
}

View File

@ -1341,11 +1341,11 @@ void IMergeTreeDataPart::loadColumns(bool require)
.choose_kind = false,
};
SerializationInfoByName infos(loaded_columns, settings);
SerializationInfoByName infos;
if (metadata_manager->exists(SERIALIZATION_FILE_NAME))
{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
infos = SerializationInfoByName::readJSON(loaded_columns, settings, *in);
}
int32_t loaded_metadata_version;

View File

@ -326,6 +326,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (!ctx->need_remove_expired_values)
{
size_t expired_columns = 0;
auto part_serialization_infos = global_ctx->new_data_part->getSerializationInfos();
for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl)
{
@ -335,6 +336,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name);
std::erase(global_ctx->gathering_column_names, column_name);
std::erase(global_ctx->merging_column_names, column_name);
std::erase(global_ctx->all_column_names, column_name);
part_serialization_infos.erase(column_name);
++expired_columns;
}
}
@ -343,6 +346,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{
global_ctx->gathering_columns = global_ctx->gathering_columns.filter(global_ctx->gathering_column_names);
global_ctx->merging_columns = global_ctx->merging_columns.filter(global_ctx->merging_column_names);
global_ctx->storage_columns = global_ctx->storage_columns.filter(global_ctx->all_column_names);
global_ctx->new_data_part->setColumns(
global_ctx->storage_columns,
part_serialization_infos,
global_ctx->metadata_snapshot->getMetadataVersion());
}
}

View File

@ -94,12 +94,13 @@ IMergeTreeDataPart::Checksums checkDataPart(
};
auto ratio_of_defaults = data_part->storage.getSettings()->ratio_of_defaults_for_sparse_serialization;
SerializationInfoByName serialization_infos(columns_txt, SerializationInfo::Settings{ratio_of_defaults, false});
SerializationInfoByName serialization_infos;
if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
serialization_infos.readJSON(*serialization_file);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}
auto get_serialization = [&serialization_infos](const auto & column)

View File

@ -0,0 +1,2 @@
100000
100000

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS t_sparse_reload;
CREATE TABLE t_sparse_reload (id UInt64, v UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.95;
INSERT INTO t_sparse_reload SELECT number, 0 FROM numbers(100000);
SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);
ALTER TABLE t_sparse_reload MODIFY SETTING ratio_of_defaults_for_sparse_serialization = 1.0;
DETACH TABLE t_sparse_reload;
ATTACH TABLE t_sparse_reload;
SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);
DROP TABLE t_sparse_reload;