Always materialize columns while merge [#METR-21119].

This commit is contained in:
Alexey Milovidov 2016-05-18 05:49:52 +03:00
parent 7aae41f5dd
commit c4407306d5

View File

@ -427,21 +427,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
if (Poco::File(merged_dir).exists())
throw Exception("Directory " + merged_dir + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
NameSet union_columns_set;
for (const MergeTreeData::DataPartPtr & part : parts)
{
Poco::ScopedReadRWLock part_lock(part->columns_lock);
Names part_columns = part->columns.getNames();
union_columns_set.insert(part_columns.begin(), part_columns.end());
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
}
NamesAndTypesList columns_list = data.getColumnsList();
NamesAndTypesList union_columns = columns_list.filter(union_columns_set);
Names union_column_names = union_columns.getNames();
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
if (aio_threshold > 0)
{
@ -449,6 +442,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
part->accumulateColumnSizes(merged_column_to_size);
}
Names column_names = data.getColumnNamesList();
NamesAndTypesList column_names_and_types = data.getColumnsList();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
ActiveDataPartSet::parsePartName(merged_name, *new_data_part);
new_data_part->name = "tmp_" + merged_name;
@ -469,7 +465,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
@ -543,7 +539,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merge_entry->total_size_bytes_compressed,
static_cast<double>(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes());
MergedBlockOutputStream to{data, new_part_tmp_path, union_columns, compression_method, merged_column_to_size, aio_threshold};
MergedBlockOutputStream to{
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold};
merged_stream->readPrefix();
to.writePrefix();
@ -568,7 +565,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
merged_stream->readSuffix();
new_data_part->columns = union_columns;
new_data_part->columns = column_names_and_types;
new_data_part->checksums = to.writeSuffixAndGetChecksums();
new_data_part->index.swap(to.getIndex());
@ -653,21 +650,14 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// Слияние всех кусков партиции.
NameSet union_columns_set;
for (const MergeTreeData::DataPartPtr & part : parts)
{
Poco::ScopedReadRWLock part_lock(part->columns_lock);
Names part_columns = part->columns.getNames();
union_columns_set.insert(part_columns.begin(), part_columns.end());
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
}
NamesAndTypesList columns_list = data.getColumnsList();
NamesAndTypesList union_columns = columns_list.filter(union_columns_set);
Names union_column_names = union_columns.getNames();
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
if (aio_threshold > 0)
{
@ -675,6 +665,9 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
part->accumulateColumnSizes(merged_column_to_size);
}
Names column_names = data.getColumnNamesList();
NamesAndTypesList column_names_and_types = data.getColumnsList();
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
@ -688,7 +681,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
String part_path = data.getFullPath() + parts[i]->name + '/';
auto input = std::make_unique<MergeTreeBlockInputStream>(
part_path, DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data,
parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
@ -753,7 +746,8 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
Poco::File(new_part_tmp_path).createDirectories();
MergedBlockOutputStreamPtr output_stream;
output_stream = std::make_unique<MergedBlockOutputStream>(data, new_part_tmp_path, union_columns, compression_method, merged_column_to_size, aio_threshold);
output_stream = std::make_unique<MergedBlockOutputStream>(
data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold);
per_shard_output.emplace(shard_no, std::move(output_stream));
}
@ -864,7 +858,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
MergeTreeData::MutableDataPartPtr & data_part = per_shard_data_parts.at(shard_no);
data_part->columns = union_columns;
data_part->columns = column_names_and_types;
data_part->checksums = output_stream->writeSuffixAndGetChecksums();
data_part->index.swap(output_stream->getIndex());
data_part->size = output_stream->marksCount();