Add severals ifs for format_version, but have to move this routine to single class

This commit is contained in:
alesapin 2018-11-12 20:44:43 +03:00
parent 6f5fef5344
commit 0a64ef5de8
8 changed files with 41 additions and 14 deletions

View File

@ -257,7 +257,7 @@ public:
/// Size of memory, allocated for column. /// Size of memory, allocated for column.
/// This is greater or equals to byteSize due to memory reservation in containers. /// This is greater or equals to byteSize due to memory reservation in containers.
/// Zero, if could be determined. /// Zero, if could not be determined.
virtual size_t allocatedBytes() const = 0; virtual size_t allocatedBytes() const = 0;
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them. /// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.

View File

@ -17,11 +17,12 @@ struct MarkInCompressedFile
{ {
size_t offset_in_compressed_file; size_t offset_in_compressed_file;
size_t offset_in_decompressed_block; size_t offset_in_decompressed_block;
size_t index_granularity;
bool operator==(const MarkInCompressedFile & rhs) const bool operator==(const MarkInCompressedFile & rhs) const
{ {
return std::tie(offset_in_compressed_file, offset_in_decompressed_block) return std::tie(offset_in_compressed_file, offset_in_decompressed_block, index_granularity)
== std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block); == std::tie(rhs.offset_in_compressed_file, rhs.offset_in_decompressed_block, rhs.index_granularity);
} }
bool operator!=(const MarkInCompressedFile & rhs) const bool operator!=(const MarkInCompressedFile & rhs) const
{ {
@ -30,7 +31,7 @@ struct MarkInCompressedFile
String toString() const String toString() const
{ {
return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + ")"; return "(" + DB::toString(offset_in_compressed_file) + "," + DB::toString(offset_in_decompressed_block) + "," + DB::toString(index_granularity) + ")";
} }
}; };

View File

@ -114,7 +114,6 @@ MergeTreeData::MergeTreeData(
data_parts_by_info(data_parts_indexes.get<TagByInfo>()), data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>()) data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
{ {
std::cerr << "LOADING PART\n";
/// NOTE: using the same columns list as is read when performing actual merges. /// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical()); merging_params.check(getColumns().getAllPhysical());

View File

@ -23,7 +23,10 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(UInt64)) namespace {
constexpr auto MERGE_TREE_MARK_SIZE_FIXED_INDEX_GRANULARITY = 2 * sizeof(UInt64);
constexpr auto MERGE_TREE_MARK_SIZE_ADAPTIVE_INDEX_GRANULARITY = 3 * sizeof(UInt64);
}
namespace DB namespace DB
@ -195,10 +198,12 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
for (const auto & column : columns) for (const auto & column : columns)
{ {
std::cerr << "Searching for column:" << column.name << std::endl;
if (!hasColumnFiles(column.name)) if (!hasColumnFiles(column.name))
continue; continue;
const auto size = getColumnSize(column.name, *column.type).data_compressed; const auto size = getColumnSize(column.name, *column.type).data_compressed;
std::cerr << "Column size:" <<size<<std::endl;
if (size < minimum_size) if (size < minimum_size)
{ {
minimum_size = size; minimum_size = size;
@ -444,8 +449,13 @@ void MergeTreeDataPart::loadIndex()
if (columns.empty()) if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.format_version < MERGE_TREE_MARK_SIZE_ADAPTIVE_INDEX_GRANULARITY)
marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk") marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE; .getSize() / MERGE_TREE_MARK_SIZE_FIXED_INDEX_GRANULARITY;
else
marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk2")
.getSize() / MERGE_TREE_MARK_SIZE_ADAPTIVE_INDEX_GRANULARITY;
} }
size_t key_size = storage.primary_sort_columns.size(); size_t key_size = storage.primary_sort_columns.size();

View File

@ -3,6 +3,7 @@
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>
#include <IO/CachedCompressedReadBuffer.h> #include <IO/CachedCompressedReadBuffer.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <IO/CompressedReadBufferFromFile.h> #include <IO/CompressedReadBufferFromFile.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h> #include <Interpreters/evaluateMissingDefaults.h>
@ -162,9 +163,10 @@ MergeTreeReader::Stream::Stream(
MarkCache * mark_cache_, bool save_marks_in_cache_, MarkCache * mark_cache_, bool save_marks_in_cache_,
UncompressedCache * uncompressed_cache, UncompressedCache * uncompressed_cache,
size_t aio_threshold, size_t max_read_buffer_size, size_t aio_threshold, size_t max_read_buffer_size,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
MergeTreeDataFormatVersion format_version_)
: path_prefix(path_prefix_), extension(extension_), marks_count(marks_count_) : path_prefix(path_prefix_), extension(extension_), marks_count(marks_count_)
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_) , mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_), format_version(format_version_)
{ {
/// Compute the size of the buffer. /// Compute the size of the buffer.
size_t max_mark_range = 0; size_t max_mark_range = 0;
@ -263,6 +265,8 @@ const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index)
void MergeTreeReader::Stream::loadMarks() void MergeTreeReader::Stream::loadMarks()
{ {
std::string path = path_prefix + ".mrk"; std::string path = path_prefix + ".mrk";
if (format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY)
path += "2";
auto load = [&]() -> MarkCache::MappedPtr auto load = [&]() -> MarkCache::MappedPtr
{ {
@ -270,7 +274,11 @@ void MergeTreeReader::Stream::loadMarks()
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
size_t file_size = Poco::File(path).getSize(); size_t file_size = Poco::File(path).getSize();
size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count; size_t expected_file_size;
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY)
expected_file_size = (sizeof(MarkInCompressedFile) - sizeof(size_t)) * marks_count;
else
expected_file_size = sizeof(MarkInCompressedFile) * marks_count;
if (expected_file_size != file_size) if (expected_file_size != file_size)
throw Exception( throw Exception(
"bad size of marks file `" + path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), "bad size of marks file `" + path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
@ -375,7 +383,7 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type, co
streams.emplace(stream_name, std::make_unique<Stream>( streams.emplace(stream_name, std::make_unique<Stream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count, path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache, all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type)); uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type, storage.format_version));
}; };
IDataType::SubstreamPath path; IDataType::SubstreamPath path;

View File

@ -63,7 +63,8 @@ private:
MarkCache * mark_cache, bool save_marks_in_cache, MarkCache * mark_cache, bool save_marks_in_cache,
UncompressedCache * uncompressed_cache, UncompressedCache * uncompressed_cache,
size_t aio_threshold, size_t max_read_buffer_size, size_t aio_threshold, size_t max_read_buffer_size,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type); const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
MergeTreeDataFormatVersion format_version_);
void seekToMark(size_t index); void seekToMark(size_t index);
void seekToStart(); void seekToStart();
@ -87,6 +88,7 @@ private:
bool save_marks_in_cache; bool save_marks_in_cache;
MarkCache::MappedPtr marks; MarkCache::MappedPtr marks;
MergeTreeDataFormatVersion format_version;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer; std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer; std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
}; };

View File

@ -119,6 +119,7 @@ void IMergedBlockOutputStream::writeData(
serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size;
serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0;
std::cerr << "Index granularity:" << index_granularity << std::endl;
size_t size = column.size(); size_t size = column.size();
size_t prev_mark = 0; size_t prev_mark = 0;
while (prev_mark < size) while (prev_mark < size)
@ -153,8 +154,11 @@ void IMergedBlockOutputStream::writeData(
writeIntBinary(stream.plain_hashing.count(), stream.marks); writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks); writeIntBinary(stream.compressed.offset(), stream.marks);
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY) std::cerr << "Format version:" << storage.format_version << std::endl;
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_ADAPTIVE_INDEX_GRANULARITY) {
std::cerr << "Writing index granularity:" << index_granularity << std::endl;
writeIntBinary(index_granularity, stream.marks); writeIntBinary(index_granularity, stream.marks);
}
}, serialize_settings.path); }, serialize_settings.path);
} }

View File

@ -27,3 +27,6 @@ target_link_libraries (get_current_inserts_in_replicated dbms)
add_executable (get_abandonable_lock_in_all_partitions get_abandonable_lock_in_all_partitions.cpp) add_executable (get_abandonable_lock_in_all_partitions get_abandonable_lock_in_all_partitions.cpp)
target_link_libraries (get_abandonable_lock_in_all_partitions dbms) target_link_libraries (get_abandonable_lock_in_all_partitions dbms)
add_executable (marks_file_reader marks_file_reader.cpp)
target_link_libraries (marks_file_reader dbms)