fix mutations with mixed-granularity parts

This commit is contained in:
CurtizJ 2019-12-25 16:03:59 +03:00
parent 4bd4ac715c
commit c8393f2c8b
7 changed files with 16 additions and 22 deletions

View File

@ -227,7 +227,6 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & columns_)
sample_block.clear();
for (const auto & column : columns)
sample_block.insert({column.type, column.name});
index_granularity_info.initialize(storage, getType(), columns.size());
}
IMergeTreeDataPart::~IMergeTreeDataPart() = default;
@ -591,7 +590,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
columns.readText(file);
}
index_granularity_info.initialize(storage, getType(), columns.size());
index_granularity_info = MergeTreeIndexGranularityInfo{storage, getType(), columns.size()};
for (const auto & it : columns)
sample_block.insert({it.type, it.name});
}

View File

@ -1536,11 +1536,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const NamesAndTypesList & columns_list,
size_t bytes_uncompressed,
size_t rows_count,
const String & relative_path) const
const String & relative_path,
const MergeTreeIndexGranularityInfo * index_granularity_info) const
{
auto part = createPart(name, choosePartType(bytes_uncompressed, rows_count), part_info, disk, relative_path);
auto type = choosePartType(bytes_uncompressed, rows_count);
auto part = createPart(name, type, part_info, disk, relative_path);
part->index_granularity_info = index_granularity_info ? *index_granularity_info
: MergeTreeIndexGranularityInfo{*this, type, columns_list.size()};
part->setColumns(columns_list);
/// Don't save rows_count count here as it can change later
/// Don't save rows_count count here as it can be changed later
return part;
}

View File

@ -187,7 +187,9 @@ public:
MutableDataPartPtr createPart(const String & name,
const MergeTreePartInfo & part_info,const DiskPtr & disk,
const NamesAndTypesList & columns,
size_t bytes_on_disk, size_t rows_num, const String & relative_path) const;
size_t bytes_on_disk, size_t rows_num,
const String & relative_path,
const MergeTreeIndexGranularityInfo * index_granularity_info = nullptr) const;
MutableDataPartPtr createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,

View File

@ -992,14 +992,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
std::move(new_columns),
source_part->bytes_on_disk,
source_part->rows_count,
"tmp_mut_" + future_part.name);
"tmp_mut_" + future_part.name,
&source_part->index_granularity_info); /// Granularity info can't be changed during mutation.
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
/// FIXME Now it's wrong code. Check if nothing will break
// new_data_part->index_granularity_info = source_part->index_granularity_info;
String new_part_tmp_path = new_data_part->getFullPath();
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex

View File

@ -160,7 +160,7 @@ String MergeTreeDataPartCompact::getColumnNameWithMinumumCompressedSize() const
void MergeTreeDataPartCompact::loadIndexGranularity()
{
index_granularity_info.initialize(storage, getType(), columns.size());
index_granularity_info = MergeTreeIndexGranularityInfo{storage, getType(), columns.size()};
String full_path = getFullPath();
if (columns.empty())

View File

@ -33,14 +33,6 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
const MergeTreeData & storage, MergeTreeDataPartType part_type, size_t columns_num)
{
initialize(storage, part_type, columns_num);
}
void MergeTreeIndexGranularityInfo::initialize(const MergeTreeData & storage, MergeTreeDataPartType part_type, size_t columns_num)
{
if (initialized)
return;
const auto storage_settings = storage.getSettings();
fixed_index_granularity = storage_settings->index_granularity;
@ -53,12 +45,12 @@ void MergeTreeIndexGranularityInfo::initialize(const MergeTreeData & storage, Me
}
else
setAdaptive(storage_settings->index_granularity_bytes, part_type, columns_num);
}
initialized = true;
}
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const std::string & path_to_part)
{
/// FIXME check when we cant create compact part
auto mrk_ext = getMrkExtensionFromFS(path_to_part);
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
setNonAdaptive();

View File

@ -162,7 +162,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr);
// new_part->setColumns(*total_column_list);
new_part->index = writer->releaseIndexColumns();
new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();