polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-11-25 23:19:43 +03:00
parent b54f1629ab
commit 9e7adf4cbe
11 changed files with 58 additions and 53 deletions

View File

@ -126,7 +126,7 @@ void fillIndexGranularityImpl(
index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block);
/// FIXME correct index granularity for compact /// FIXME correct index granularity for compact
index_granularity_for_block = rows_in_block; // index_granularity_for_block = rows_in_block;
/// FIXME: split/join last mark for compact parts /// FIXME: split/join last mark for compact parts
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)

View File

@ -1579,10 +1579,10 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
} }
MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_on_disk, size_t rows_count) const MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, size_t rows_count) const
{ {
const auto settings = getSettings(); const auto settings = getSettings();
if (bytes_on_disk < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part) if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT; return MergeTreeDataPartType::COMPACT;
return MergeTreeDataPartType::WIDE; return MergeTreeDataPartType::WIDE;
@ -1601,15 +1601,18 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
throw Exception("Unknown part type", ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown part type", ErrorCodes::LOGICAL_ERROR);
} }
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
const MergeTreePartInfo & part_info, const DiskSpace::DiskPtr & disk, const String & name,
size_t bytes_on_disk, size_t rows_count, const String & relative_path) const const MergeTreePartInfo & part_info,
const DiskSpace::DiskPtr & disk,
const NamesAndTypesList & columns,
size_t bytes_uncompressed,
size_t rows_count,
const String & relative_path) const
{ {
auto part = createPart(name, choosePartType(bytes_on_disk, rows_count), part_info, disk, relative_path); auto part = createPart(name, choosePartType(bytes_uncompressed, rows_count), part_info, disk, relative_path);
part->setColumns(columns);
part->bytes_on_disk = bytes_on_disk; /// Don't save rows_count count here as it can change later
part->rows_count = rows_count;
return part; return part;
} }

View File

@ -180,10 +180,11 @@ public:
MergeTreeDataPartType choosePartType(size_t bytes_on_disk, size_t rows_count) const; MergeTreeDataPartType choosePartType(size_t bytes_on_disk, size_t rows_count) const;
/// After this methods setColumns must be called
/// FIXME make this inside this function
MutableDataPartPtr createPart(const String & name, MutableDataPartPtr createPart(const String & name,
const MergeTreePartInfo & part_info,const DiskSpace::DiskPtr & disk, const MergeTreePartInfo & part_info,const DiskSpace::DiskPtr & disk,
const NamesAndTypesList & columns,
size_t bytes_on_disk, size_t rows_num, const String & relative_path) const; size_t bytes_on_disk, size_t rows_num, const String & relative_path) const;
MutableDataPartPtr createPart(const String & name, MutableDataPartPtr createPart(const String & name,

View File

@ -573,11 +573,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
future_part.name, future_part.name,
future_part.part_info, future_part.part_info,
space_reservation->getDisk(), space_reservation->getDisk(),
all_columns,
estimated_bytes_uncompressed, estimated_bytes_uncompressed,
sum_input_rows_upper_bound, sum_input_rows_upper_bound,
TMP_PREFIX + future_part.name); TMP_PREFIX + future_part.name);
new_data_part->setColumns(all_columns);
new_data_part->partition.assign(future_part.getPartition()); new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = true; new_data_part->is_temp = true;
@ -958,15 +958,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation); LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
auto in = mutations_interpreter.execute(table_lock_holder);
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart( NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
future_part.name, source_part->getType(),
future_part.part_info, space_reservation->getDisk(), const auto & source_column_names = source_part->columns.getNames();
const auto & updated_column_names = updated_header.getNames();
NameSet new_columns_set(source_column_names.begin(), source_column_names.end());
new_columns_set.insert(updated_column_names.begin(), updated_column_names.end());
auto new_columns = all_columns.filter(new_columns_set);
auto new_data_part = data.createPart(
future_part.name,
future_part.part_info,
space_reservation->getDisk(),
std::move(new_columns),
source_part->bytes_on_disk,
source_part->rows_count,
"tmp_mut_" + future_part.name); "tmp_mut_" + future_part.name);
new_data_part->is_temp = true; new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos; new_data_part->ttl_infos = source_part->ttl_infos;
new_data_part->index_granularity_info = source_part->index_granularity_info;
/// FIXME Now it's wrong code. Check if nothing will break
// new_data_part->index_granularity_info = source_part->index_granularity_info;
String new_part_tmp_path = new_data_part->getFullPath(); String new_part_tmp_path = new_data_part->getFullPath();
@ -981,10 +998,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
Poco::File(new_part_tmp_path).createDirectories(); Poco::File(new_part_tmp_path).createDirectories();
auto in = mutations_interpreter.execute(table_lock_holder);
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
Block in_header = in->getHeader(); Block in_header = in->getHeader();
@ -1141,18 +1154,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096); WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096);
new_data_part->checksums.write(out_checksums); new_data_part->checksums.write(out_checksums);
} }
/// Write the columns list of the resulting part in the same order as all_columns.
Names source_column_names = source_part->columns.getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = all_columns.begin(); it != all_columns.end();)
{
if (source_columns_name_set.count(it->name) || updated_header.has(it->name))
++it;
else
it = new_data_part->columns.erase(it);
}
new_data_part->setColumns(all_columns);
{ {
/// Write a file with a description of columns. /// Write a file with a description of columns.
WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096); WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096);

View File

@ -203,10 +203,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
size_t expected_size = block.bytes(); size_t expected_size = block.bytes();
auto reservation = data.reserveSpace(expected_size); auto reservation = data.reserveSpace(expected_size);
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
auto new_data_part = data.createPart( auto new_data_part = data.createPart(
part_name, new_part_info, part_name, new_part_info,
reservation->getDisk(), reservation->getDisk(),
expected_size, block.rows(), columns,
expected_size,
block.rows(),
TMP_PREFIX + part_name); TMP_PREFIX + part_name);
new_data_part->partition = std::move(partition); new_data_part->partition = std::move(partition);
@ -263,9 +267,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
/// either default lz4 or compression method with zero thresholds on absolute and relative part size. /// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0); auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
new_data_part->setColumns(columns);
MergedBlockOutputStream out(new_data_part, columns, compression_codec); MergedBlockOutputStream out(new_data_part, columns, compression_codec);
out.writePrefix(); out.writePrefix();

View File

@ -26,6 +26,7 @@ MergeTreeIndexGranulePtr MergeTreeIndexReader::read()
{ {
auto granule = index->createIndexGranule(); auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream.data_buffer); granule->deserializeBinary(*stream.data_buffer);
std::cerr << "(MergeTreeIndexReader) granule.empty(): " << granule->empty() << "\n";
return granule; return granule;
} }

View File

@ -27,22 +27,23 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz
void MergeTreeMarksLoader::loadMarks() void MergeTreeMarksLoader::loadMarks()
{ {
auto load = std::bind(load_func, mrk_path);
if (mark_cache) if (mark_cache)
{ {
auto key = mark_cache->hash(mrk_path); auto key = mark_cache->hash(mrk_path);
if (save_marks_in_cache) if (save_marks_in_cache)
{ {
marks = mark_cache->getOrSet(key, load_func); marks = mark_cache->getOrSet(key, load);
} }
else else
{ {
marks = mark_cache->get(key); marks = mark_cache->get(key);
if (!marks) if (!marks)
marks = load_func(); marks = load();
} }
} }
else else
marks = load_func(); marks = load();
if (!marks) if (!marks)
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR); throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);

View File

@ -7,7 +7,7 @@ class MergeTreeMarksLoader
{ {
public: public:
using MarksPtr = MarkCache::MappedPtr; using MarksPtr = MarkCache::MappedPtr;
using LoadFunc = std::function<MarksPtr()>; using LoadFunc = std::function<MarksPtr(const String &)>;
MergeTreeMarksLoader() {} MergeTreeMarksLoader() {}

View File

@ -129,11 +129,9 @@ void MergeTreeReaderCompact::initMarksLoader()
if (marks_loader.initialized()) if (marks_loader.initialized())
return; return;
std::string mrk_path = data_part->index_granularity_info.getMarksFilePath(path + NAME_OF_FILE_WITH_DATA);
size_t columns_num = data_part->columns.size(); size_t columns_num = data_part->columns.size();
/// FIXME pass mrk_path as argument auto load = [this, columns_num](const String & mrk_path) -> MarkCache::MappedPtr
auto load = [this, columns_num, mrk_path]() -> MarkCache::MappedPtr
{ {
size_t file_size = Poco::File(mrk_path).getSize(); size_t file_size = Poco::File(mrk_path).getSize();
size_t marks_count = data_part->getMarksCount(); size_t marks_count = data_part->getMarksCount();
@ -178,9 +176,8 @@ void MergeTreeReaderCompact::initMarksLoader()
return res; return res;
}; };
marks_loader = MergeTreeMarksLoader{mark_cache, mrk_path, load, settings.save_marks_in_cache, columns_num}; auto mrk_path = data_part->index_granularity_info.getMarksFilePath(path + NAME_OF_FILE_WITH_DATA);
marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, settings.save_marks_in_cache, columns_num};
std::cerr << "(MergeTreeReaderCompact::loadMarks) end marks load..." << "\n";
} }
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index) void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)

View File

@ -108,13 +108,13 @@ void MergeTreeReaderStream::initMarksLoader()
if (marks_loader.initialized()) if (marks_loader.initialized())
return; return;
auto load = [this]() -> MarkCache::MappedPtr auto load = [this](const String & mrk_path) -> MarkCache::MappedPtr
{ {
std::cerr << "reading marks from path: " << mrk_path << "\n";
std::cerr << "marks: " << marks_count << "\n";
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix);
size_t file_size = Poco::File(mrk_path).getSize(); size_t file_size = Poco::File(mrk_path).getSize();
size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count; size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count;
if (expected_file_size != file_size) if (expected_file_size != file_size)
@ -151,7 +151,7 @@ void MergeTreeReaderStream::initMarksLoader()
}; };
auto mrk_path = index_granularity_info->getMarksFilePath(path_prefix); auto mrk_path = index_granularity_info->getMarksFilePath(path_prefix);
marks_loader = MergeTreeMarksLoader{mark_cache, mrk_path, load, save_marks_in_cache}; marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, save_marks_in_cache};
} }

View File

@ -146,7 +146,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->rows_count = rows_count; new_part->rows_count = rows_count;
new_part->modification_time = time(nullptr); new_part->modification_time = time(nullptr);
new_part->setColumns(*total_column_list); // new_part->setColumns(*total_column_list);
new_part->index = writer->releaseIndexColumns(); new_part->index = writer->releaseIndexColumns();
new_part->checksums = checksums; new_part->checksums = checksums;
new_part->bytes_on_disk = checksums.getTotalSizeOnDisk(); new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();