fix nested

This commit is contained in:
Anton Popov 2021-11-02 06:03:52 +03:00
parent d50137013c
commit 9823f28855
20 changed files with 101 additions and 105 deletions

View File

@ -128,7 +128,7 @@ SerializationInfoByName::SerializationInfoByName(
const NamesAndTypesList & columns,
const SerializationInfo::Settings & settings)
{
if (settings.ratio_for_sparse >= 1.0)
if (settings.isAlwaysDefault())
return;
for (const auto & column : columns)
@ -206,7 +206,7 @@ void SerializationInfoByName::readText(ReadBuffer & in)
if (it == end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"There is not column {} in serialization infos", name);
"There is no column {} in serialization infos", name);
it->second->fromJSON(*elem_object);
}

View File

@ -40,6 +40,8 @@ public:
{
const double ratio_for_sparse = 1.0;
const bool choose_kind = false;
bool isAlwaysDefault() const { return ratio_for_sparse >= 1.0; }
};
SerializationInfo(ISerialization::Kind kind_, const Settings & settings_);
@ -82,9 +84,7 @@ class SerializationInfoByName : public std::unordered_map<String, MutableSeriali
{
public:
SerializationInfoByName() = default;
SerializationInfoByName(
const NamesAndTypesList & columns,
const SerializationInfo::Settings & settings);
SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings);
void add(const Block & block);
void add(const SerializationInfoByName & other);

View File

@ -47,8 +47,7 @@ void SerializationLowCardinality::enumerateStreams(
{
const auto * column_lc = data.column ? &getColumnLowCardinality(*data.column) : nullptr;
path.push_back(Substream::DictionaryKeys);
path.back().data =
SubstreamData dict_data =
{
dict_inner_serialization,
data.type ? dictionary_type : nullptr,
@ -56,7 +55,10 @@ void SerializationLowCardinality::enumerateStreams(
data.serialization_info,
};
dict_inner_serialization->enumerateStreams(path, callback, path.back().data);
path.push_back(Substream::DictionaryKeys);
path.back().data = dict_data;
dict_inner_serialization->enumerateStreams(path, callback, dict_data);
path.back() = Substream::DictionaryIndexes;
path.back().data = data;

View File

@ -416,8 +416,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
for (const auto & column : columns)
{
auto & serialization = serializations[column.name];
column_name_to_position.emplace(column.name, pos);
column_name_to_position.emplace(column.name, pos++);
auto it = new_infos.find(column.name);
if (it != new_infos.end())
@ -426,37 +425,19 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const
const auto & new_info = it->second;
if (old_info)
{
old_info->replaceData(*new_info);
}
else
{
old_info = new_info->clone();
serialization = column.type->getSerialization(*old_info);
}
}
else
{
serialization = column.type->getDefaultSerialization();
}
IDataType::forEachSubcolumn([&](const auto &, const auto & subname, const auto & subdata)
{
auto subcolumn_name = Nested::concatenateName(column.name, subname);
column_name_to_position.emplace(subcolumn_name, pos);
serializations.emplace(subcolumn_name, subdata.serialization);
}, {serialization, column.type, nullptr, nullptr});
++pos;
}
}
SerializationPtr IMergeTreeDataPart::getSerializationOrDefault(const NameAndTypePair & column) const
SerializationPtr IMergeTreeDataPart::getSerialization(const NameAndTypePair & column) const
{
auto it = serializations.find(column.name);
return it == serializations.end()
? column.type->getDefaultSerialization()
: it->second;
auto it = serialization_infos.find(column.getNameInStorage());
return it == serialization_infos.end()
? IDataType::getSerialization(column)
: IDataType::getSerialization(column, *it->second);
}
void IMergeTreeDataPart::removeIfNeeded()
@ -788,7 +769,7 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
if (column_size.data_compressed != 0 && !storage_columns.hasCompressionCodec(part_column.name))
{
String path_to_data_file;
serializations.at(part_column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
getSerialization(part_column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (path_to_data_file.empty())
{
@ -974,7 +955,7 @@ void IMergeTreeDataPart::loadRowsCount()
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn(*serializations.at(column.name));
ColumnPtr column_col = column.type->createColumn(*getSerialization(column));
if (!column_col->isFixedAndContiguous() || column_col->lowCardinality())
continue;

View File

@ -132,9 +132,7 @@ public:
const NamesAndTypesList & getColumns() const { return columns; }
const SerializationInfoByName & getSerializationInfos() const { return serialization_infos; }
const SerializationByName & getSerializations() const { return serializations; }
const SerializationPtr & getSerialization(const String & column_name) const { return serializations.at(column_name); }
SerializationPtr getSerializationOrDefault(const NameAndTypePair & column) const;
SerializationPtr getSerialization(const NameAndTypePair & column) const;
/// Throws an exception if part is not stored in on-disk format.
void assertOnDisk() const;
@ -427,10 +425,6 @@ protected:
/// Columns description. Cannot be changed, after part initialization.
NamesAndTypesList columns;
SerializationInfoByName serialization_infos;
SerializationByName serializations;
const Type part_type;
/// Not null when it's a projection part.
@ -455,6 +449,10 @@ private:
/// In compact parts order of columns is necessary
NameToNumber column_name_to_position;
SerializationInfoByName serialization_infos;
SerializationByName serializations;
/// Reads part unique identifier (if exists) from uuid.txt
void loadUUID();

View File

@ -40,9 +40,16 @@ IMergeTreeReader::IMergeTreeReader(
, storage(data_part_->storage)
, metadata_snapshot(metadata_snapshot_)
, all_mark_ranges(all_mark_ranges_)
, serializations(data_part_->getSerializations())
, alter_conversions(storage.getAlterConversionsForPart(data_part))
{
if (isWidePart(data_part))
{
/// For wide parts convert plain arrays of Nested to subcolumns
/// to allow to use shared offset column from cache.
columns = Nested::convertToSubcolumns(columns);
part_columns = Nested::collect(part_columns);
}
for (const auto & column_from_part : part_columns)
columns_from_part[column_from_part.name] = &column_from_part.type;
}
@ -290,7 +297,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const St
{
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
{
auto position = data_part->getColumnPosition(part_column.name);
auto position = data_part->getColumnPosition(part_column.getNameInStorage());
if (position && Nested::extractTableName(part_column.name) == table_name)
return position;
}
@ -299,14 +306,6 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const St
return {};
}
SerializationPtr IMergeTreeReader::getSerialization(const NameAndTypePair & column) const
{
auto it = serializations.find(column.name);
if (it != serializations.end())
return it->second;
return column.type->getDefaultSerialization();
}
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
{
if (num_columns_to_read != columns.size())

View File

@ -89,13 +89,9 @@ protected:
using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const String & column_name) const;
SerializationPtr getSerialization(const NameAndTypePair & column) const;
friend class MergeTreeRangeReader::DelayedStream;
private:
const SerializationByName & serializations;
/// Alter conversions, which must be applied on fly if required
MergeTreeData::AlterConversions alter_conversions;

View File

@ -17,7 +17,15 @@ IMergedBlockOutputStream::IMergedBlockOutputStream(
, reset_columns(reset_columns_)
{
if (reset_columns)
new_serialization_infos = SerializationInfoByName(columns_list, {});
{
SerializationInfo::Settings info_settings =
{
.ratio_for_sparse = storage.getSettings()->ratio_of_defaults_for_sparse_serialization,
.choose_kind = false,
};
new_serialization_infos = SerializationInfoByName(columns_list, info_settings);
}
}
NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
@ -34,10 +42,9 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
const auto & serializations = data_part->getSerializations();
for (const auto & column : columns)
{
serializations.at(column.name)->enumerateStreams(
data_part->getSerialization(column)->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column, substream_path)];
@ -63,7 +70,7 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
}
};
serializations.at(column_name)->enumerateStreams(callback);
data_part->getSerialization(*column_with_type)->enumerateStreams(callback);
}
/// Remove files on disk and checksums

View File

@ -80,7 +80,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
if (checksums.empty())
return size;
serializations.at(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
getSerialization(column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path);
@ -162,7 +162,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
{
for (const NameAndTypePair & name_type : columns)
{
serializations.at(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
getSerialization(name_type)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(name_type, substream_path);
String mrk_file_name = file_name + index_granularity_info.marks_file_extension;
@ -183,7 +183,7 @@ void MergeTreeDataPartWide::checkConsistency(bool require_part_metadata) const
std::optional<UInt64> marks_size;
for (const NameAndTypePair & name_type : columns)
{
serializations.at(name_type.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
getSerialization(name_type)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
auto file_path = path + ISerialization::getFileNameForStream(name_type, substream_path) + index_granularity_info.marks_file_extension;
@ -218,7 +218,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
};
bool res = true;
getSerializationOrDefault(column)->enumerateStreams([&](const auto & substream_path)
getSerialization(column)->enumerateStreams([&](const auto & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path);
if (!check_stream_exists(file_name))
@ -231,7 +231,7 @@ bool MergeTreeDataPartWide::hasColumnFiles(const NameAndTypePair & column) const
String MergeTreeDataPartWide::getFileNameForColumn(const NameAndTypePair & column) const
{
String filename;
serializations.at(column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
getSerialization(column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (filename.empty())
filename = ISerialization::getFileNameForStream(column, substream_path);
@ -253,7 +253,7 @@ void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_col
if (rows_count != 0
&& column.type->isValueRepresentedByNumber()
&& !column.type->haveSubtypes()
&& serializations.at(column.name)->getKind() == ISerialization::Kind::DEFAULT)
&& getSerialization(column)->getKind() == ISerialization::Kind::DEFAULT)
{
size_t rows_in_column = size.data_uncompressed / column.type->getSizeOfValueInMemory();
if (rows_in_column != rows_count)

View File

@ -66,7 +66,7 @@ void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column,
};
ISerialization::SubstreamPath path;
serializations.at(column.name)->enumerateStreams(path, callback, column.type);
data_part->getSerialization(column)->enumerateStreams(path, callback, column.type);
}
namespace
@ -207,7 +207,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
writeIntBinary(UInt64(0), marks);
writeColumnSingleGranule(
block.getByName(name_and_type->name), serializations.at(name_and_type->name),
block.getByName(name_and_type->name), data_part->getSerialization(*name_and_type),
stream_getter, granule.start_row, granule.rows_to_write);
/// Each type always have at least one substream

View File

@ -75,7 +75,6 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
, skip_indices(indices_to_recalc_)
, part_path(data_part_->getFullRelativePath())
, marks_file_extension(marks_file_extension_)
, serializations(data_part_->getSerializations())
, default_codec(default_codec_)
, compute_granularity(index_granularity.empty())
{

View File

@ -123,7 +123,6 @@ protected:
const String part_path;
const String marks_file_extension;
const SerializationByName & serializations;
const CompressionCodecPtr default_codec;
const bool compute_granularity;

View File

@ -118,7 +118,7 @@ void MergeTreeDataPartWriterWide::addStreams(
};
ISerialization::SubstreamPath path;
serializations.at(column.name)->enumerateStreams(path, callback, column.type);
data_part->getSerialization(column)->enumerateStreams(path, callback, column.type);
}
@ -213,7 +213,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
{
auto & column = block_to_write.getByName(it->name);
if (serializations.at(column.name)->getKind() != ISerialization::Kind::SPARSE)
if (data_part->getSerialization(*it)->getKind() != ISerialization::Kind::SPARSE)
column.column = recursiveRemoveSparse(column.column);
if (permutation)
@ -275,7 +275,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
ISerialization::SubstreamPath & path)
{
StreamsWithMarks result;
serializations.at(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
data_part->getSerialization(column)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
@ -310,7 +310,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
ISerialization::SerializeBinaryBulkSettings & serialize_settings,
const Granule & granule)
{
const auto & serialization = serializations.at(name_and_type.name);
const auto & serialization = data_part->getSerialization(name_and_type);
serialization->serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state);
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
@ -340,12 +340,13 @@ void MergeTreeDataPartWriterWide::writeColumn(
const auto & [name, type] = name_and_type;
auto [it, inserted] = serialization_states.emplace(name, nullptr);
auto serialization = data_part->getSerialization(name_and_type);
if (inserted)
{
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.getter = createStreamGetter(name_and_type, offset_columns);
serializations.at(name)->serializeBinaryBulkStatePrefix(serialize_settings, it->second);
serialization->serializeBinaryBulkStatePrefix(serialize_settings, it->second);
}
const auto & global_settings = storage.getContext()->getSettingsRef();
@ -386,7 +387,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
}
}
serializations.at(name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
serialization->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets)
@ -398,12 +399,13 @@ void MergeTreeDataPartWriterWide::writeColumn(
}
void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, const IDataType & type)
void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePair & name_type)
{
const auto & serialization = serializations.at(name);
const auto & [name, type] = name_type;
const auto & serialization = data_part->getSerialization(name_type);
if (!type.isValueRepresentedByNumber() || type.haveSubtypes() || serialization->getKind() != ISerialization::Kind::DEFAULT)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot validate column of non fixed type {}", type.getName());
if (!type->isValueRepresentedByNumber() || type->haveSubtypes() || serialization->getKind() != ISerialization::Kind::DEFAULT)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot validate column of non fixed type {}", type->getName());
auto disk = data_part->volume->getDisk();
String escaped_name = escapeForFileName(name);
@ -443,7 +445,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
if (index_granularity_rows == 0)
{
auto column = type.createColumn();
auto column = type->createColumn();
serialization->deserializeBinaryBulk(*column, bin_in, 1000000000, 0.0);
@ -463,7 +465,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for part {} for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}",
data_part->getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount());
auto column = type.createColumn();
auto column = type->createColumn();
serialization->deserializeBinaryBulk(*column, bin_in, index_granularity_rows, 0.0);
@ -502,7 +504,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
"Still have something in marks stream, last mark #{} index granularity size {}, last rows {}", mark_num, index_granularity.getMarksCount(), index_granularity_rows);
if (!bin_in.eof())
{
auto column = type.createColumn();
auto column = type->createColumn();
serialization->deserializeBinaryBulk(*column, bin_in, 1000000000, 0.0);
@ -538,7 +540,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
if (!serialization_states.empty())
{
serialize_settings.getter = createStreamGetter(*it, written_offset_columns ? *written_offset_columns : offset_columns);
serializations.at(it->name)->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
data_part->getSerialization(*it)->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
}
if (write_final_mark)
@ -563,9 +565,9 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
{
if (column.type->isValueRepresentedByNumber()
&& !column.type->haveSubtypes()
&& serializations.at(column.name)->getKind() == ISerialization::Kind::DEFAULT)
&& data_part->getSerialization(column)->getKind() == ISerialization::Kind::DEFAULT)
{
validateColumnOfFixedSize(column.name, *column.type);
validateColumnOfFixedSize(column);
}
}
#endif
@ -591,7 +593,7 @@ void MergeTreeDataPartWriterWide::writeFinalMark(
{
writeSingleMark(column, offset_columns, 0, path);
/// Memoize information about offsets
serializations.at(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
data_part->getSerialization(column)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (is_offsets)

View File

@ -84,7 +84,7 @@ private:
/// Method for self check (used in debug-build only). Checks that written
/// data and corresponding marks are consistent. Otherwise throws logical
/// errors.
void validateColumnOfFixedSize(const String & name, const IDataType & type);
void validateColumnOfFixedSize(const NameAndTypePair & name_type);
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;

View File

@ -54,7 +54,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{
auto column_from_part = getColumnFromPart(*name_and_type);
auto position = data_part->getColumnPosition(column_from_part.name);
auto position = data_part->getColumnPosition(column_from_part.getNameInStorage());
if (!position && typeid_cast<const DataTypeArray *>(column_from_part.type.get()))
{
/// If array of Nested column is missing in part,
@ -139,7 +139,10 @@ size_t MergeTreeReaderCompact::readRows(
auto column_from_part = getColumnFromPart(*column_it);
if (res_columns[i] == nullptr)
res_columns[i] = column_from_part.type->createColumn(*getSerialization(column_from_part));
{
auto serialization = data_part->getSerialization(column_from_part);
res_columns[i] = column_from_part.type->createColumn(*serialization);
}
}
while (read_rows < max_rows_to_read)
@ -220,7 +223,7 @@ void MergeTreeReaderCompact::readData(
const auto & type_in_storage = name_and_type.getTypeInStorage();
const auto & name_in_storage = name_and_type.getNameInStorage();
auto serialization = getSerialization(NameAndTypePair{name_in_storage, type_in_storage});
auto serialization = data_part->getSerialization(NameAndTypePair{name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
@ -236,7 +239,7 @@ void MergeTreeReaderCompact::readData(
}
else
{
auto serialization = getSerialization(name_and_type);
auto serialization = data_part->getSerialization(name_and_type);
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state, nullptr);
}

View File

@ -106,7 +106,10 @@ size_t MergeTreeReaderWide::readRows(
/// The column is already present in the block so we will append the values to the end.
bool append = res_columns[pos] != nullptr;
if (!append)
res_columns[pos] = type->createColumn(*getSerialization(column_from_part));
{
auto serialization = data_part->getSerialization(column_from_part);
res_columns[pos] = type->createColumn(*serialization);
}
auto & column = res_columns[pos];
try
@ -185,7 +188,7 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
profile_callback, clock_type));
};
getSerialization(name_and_type)->enumerateStreams(callback);
data_part->getSerialization(name_and_type)->enumerateStreams(callback);
}
@ -228,7 +231,7 @@ void MergeTreeReaderWide::prefetch(
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams)
{
auto serialization = getSerialization(name_and_type);
auto serialization = data_part->getSerialization(name_and_type);
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
@ -256,7 +259,7 @@ void MergeTreeReaderWide::readData(
deserialize_settings.avg_value_size_hint = avg_value_size_hint;
const auto & name = name_and_type.name;
auto serialization = getSerialization(name_and_type);
auto serialization = data_part->getSerialization(name_and_type);
if (deserialize_binary_bulk_state_map.count(name) == 0)
{

View File

@ -199,6 +199,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
part->minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context);
part->setColumns(block.getNamesAndTypesList());
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);

View File

@ -313,7 +313,7 @@ NameSet collectFilesToSkip(
files_to_skip.insert(stream_name + mrk_extension);
};
source_part->getSerialization(entry.name)->enumerateStreams(callback);
source_part->getSerialization({entry.name, entry.type})->enumerateStreams(callback);
}
for (const auto & index : indices_to_recalc)
{
@ -338,7 +338,7 @@ static NameToNameVector collectFilesForRenames(
std::map<String, size_t> stream_counts;
for (const auto & column : source_part->getColumns())
{
source_part->getSerialization(column.name)->enumerateStreams(
source_part->getSerialization(column)->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
++stream_counts[ISerialization::getFileNameForStream(column, substream_path)];
@ -382,7 +382,7 @@ static NameToNameVector collectFilesForRenames(
auto column = source_part->getColumns().tryGetByName(command.column_name);
if (column)
source_part->getSerialization(column->name)->enumerateStreams(callback);
source_part->getSerialization(*column)->enumerateStreams(callback);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
@ -404,7 +404,7 @@ static NameToNameVector collectFilesForRenames(
auto column = source_part->getColumns().tryGetByName(command.column_name);
if (column)
source_part->getSerialization(column->name)->enumerateStreams(callback);
source_part->getSerialization(*column)->enumerateStreams(callback);
}
}

View File

@ -107,6 +107,14 @@ IMergeTreeDataPart::Checksums checkDataPart(
serialization_infos.readText(*serialization_file);
}
auto get_serialization = [&serialization_infos](const auto & column)
{
auto it = serialization_infos.find(column.name);
return it == serialization_infos.end()
? column.type->getDefaultSerialization()
: column.type->getSerialization(*it->second);
};
/// This function calculates only checksum of file content (compressed or uncompressed).
/// It also calculates checksum of projections.
auto checksum_file = [&](const String & file_path, const String & file_name)
@ -141,8 +149,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
const NamesAndTypesList & projection_columns_list = projection->getColumns();
for (const auto & projection_column : projection_columns_list)
{
auto serialization = projection_column.type->getSerialization(*serialization_infos.at(projection_column.name));
serialization->enumerateStreams(
get_serialization(projection_column)->enumerateStreams(
[&](const ISerialization::SubstreamPath & substream_path)
{
String projection_file_name = ISerialization::getFileNameForStream(projection_column, substream_path) + ".bin";
@ -214,8 +221,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
{
for (const auto & column : columns_list)
{
auto serialization = column.type->getSerialization(*serialization_infos.at(column .name));
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
get_serialization(column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
String file_name = ISerialization::getFileNameForStream(column, substream_path) + ".bin";
checksums_data.files[file_name] = checksum_compressed_file(disk, path + file_name);

View File

@ -222,7 +222,7 @@ void StorageSystemPartsColumns::processNextStorage(
if (columns_mask[src_index++])
columns[res_index++]->insert(column_size.marks);
auto serialization = part->getSerialization(column.name);
auto serialization = part->getSerialization(column);
if (columns_mask[src_index++])
columns[res_index++]->insert(ISerialization::kindToString(serialization->getKind()));