better marks reading

This commit is contained in:
CurtizJ 2020-02-03 15:46:25 +03:00
parent 257bb3b599
commit a0635ed390
7 changed files with 101 additions and 132 deletions

View File

@ -40,6 +40,15 @@ struct MarkInCompressedFile
};
using MarksInCompressedFile = PODArray<MarkInCompressedFile>;
class MarksInCompressedFile : public PODArray<MarkInCompressedFile>
{
public:
MarksInCompressedFile(size_t n) : PODArray(n) {}
void read(ReadBuffer & buffer, size_t from, size_t count)
{
buffer.readStrict(reinterpret_cast<char *>(data() + from), count * sizeof(MarkInCompressedFile));
}
};
}

View File

@ -1,50 +1,106 @@
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <IO/ReadBufferFromFile.h>
#include <Poco/File.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int LOGICAL_ERROR;
}
MergeTreeMarksLoader::MergeTreeMarksLoader(
MarkCache * mark_cache_,
const String & mrk_path_,
const LoadFunc & load_func_,
size_t marks_count_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
bool save_marks_in_cache_,
size_t columns_num_)
size_t columns_in_mark_)
: mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, load_func(load_func_)
, marks_count(marks_count_)
, index_granularity_info(index_granularity_info_)
, save_marks_in_cache(save_marks_in_cache_)
, columns_num(columns_num_) {}
, columns_in_mark(columns_in_mark_) {}
const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
{
if (!marks)
loadMarks();
if (column_index >= columns_num)
throw Exception("Column index: " + toString(column_index)
+ " is out of range [0, " + toString(columns_num) + ")", ErrorCodes::LOGICAL_ERROR);
return (*marks)[row_index * columns_num + column_index];
#ifndef NDEBUG
if (column_index >= columns_in_mark)
throw Exception("Column index: " + toString(column_index)
+ " is out of range [0, " + toString(columns_in_mark) + ")", ErrorCodes::LOGICAL_ERROR);
#endif
return (*marks)[row_index * columns_in_mark + column_index];
}
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
{
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
size_t file_size = Poco::File(mrk_path).getSize();
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_file_size = mark_size * marks_count;
if (expected_file_size != file_size)
throw Exception(
"Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::CORRUPTED_DATA);
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
if (!index_granularity_info.is_adaptive)
{
/// Read directly to marks.
ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data()));
if (buffer.eof() || buffer.buffer().size() != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
}
else
{
ReadBufferFromFile buffer(mrk_path, file_size, -1);
size_t i = 0;
while (!buffer.eof())
{
res->read(buffer, i * columns_in_mark, columns_in_mark);
buffer.seek(sizeof(size_t), SEEK_CUR);
++i;
}
if (i * mark_size != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
}
res->protect();
return res;
}
void MergeTreeMarksLoader::loadMarks()
{
auto load = std::bind(load_func, mrk_path);
if (mark_cache)
{
auto key = mark_cache->hash(mrk_path);
if (save_marks_in_cache)
{
marks = mark_cache->getOrSet(key, load);
auto callback = std::bind(&MergeTreeMarksLoader::loadMarksImpl, this);
marks = mark_cache->getOrSet(key, callback);
}
else
{
marks = mark_cache->get(key);
if (!marks)
marks = load();
marks = loadMarksImpl();
}
}
else
marks = load();
marks = loadMarksImpl();
if (!marks)
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);

View File

@ -3,19 +3,20 @@
namespace DB
{
struct MergeTreeIndexGranularityInfo;
class MergeTreeMarksLoader
{
public:
using MarksPtr = MarkCache::MappedPtr;
using LoadFunc = std::function<MarksPtr(const String &)>;
MergeTreeMarksLoader() {}
MergeTreeMarksLoader(MarkCache * mark_cache_,
const String & mrk_path_,
const LoadFunc & load_func_,
MergeTreeMarksLoader(
MarkCache * mark_cache_,
const String & mrk_path,
size_t marks_count_,
const MergeTreeIndexGranularityInfo & index_granularity_info_,
bool save_marks_in_cache_,
size_t columns_num_ = 1);
size_t columns_num_in_mark_ = 1);
const MarkInCompressedFile & getMark(size_t row_index, size_t column_index = 0);
@ -24,12 +25,14 @@ public:
private:
MarkCache * mark_cache = nullptr;
String mrk_path;
LoadFunc load_func;
size_t marks_count;
const MergeTreeIndexGranularityInfo & index_granularity_info;
bool save_marks_in_cache = false;
size_t columns_num;
MarksPtr marks;
size_t columns_num_in_mark_;
MarkCache::MappedPtr marks;
void loadMarks();
MarkCache::MappedPtr loadMarksImpl();
};
}

View File

@ -19,11 +19,14 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
const MarkRanges & mark_ranges_, const MergeTreeReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_)
: IMergeTreeReader(data_part_, columns_
, uncompressed_cache_, mark_cache_, mark_ranges_
, settings_, avg_value_size_hints_)
: IMergeTreeReader(data_part_, columns_,
uncompressed_cache_, mark_cache_, mark_ranges_,
settings_, avg_value_size_hints_)
, marks_loader(mark_cache,
data_part->index_granularity_info.getMarksFilePath(path + MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(), data_part->index_granularity_info,
settings.save_marks_in_cache, data_part->getColumns().size())
{
initMarksLoader();
size_t buffer_size = settings.max_read_buffer_size;
const String full_data_path = path + MergeTreeDataPartCompact::DATA_FILE_NAME + MergeTreeDataPartCompact::DATA_FILE_EXTENSION;
@ -194,51 +197,6 @@ void MergeTreeReaderCompact::readData(
}
void MergeTreeReaderCompact::initMarksLoader()
{
if (marks_loader.initialized())
return;
size_t columns_num = data_part->getColumns().size();
auto load = [this, columns_num](const String & mrk_path) -> MarkCache::MappedPtr
{
size_t file_size = Poco::File(mrk_path).getSize();
size_t marks_count = data_part->getMarksCount();
size_t mark_size_in_bytes = data_part->index_granularity_info.getMarkSizeInBytes(columns_num);
size_t expected_file_size = mark_size_in_bytes * marks_count;
if (expected_file_size != file_size)
throw Exception(
"Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::CORRUPTED_DATA);
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_num);
ReadBufferFromFile buffer(mrk_path, file_size);
size_t i = 0;
while (!buffer.eof())
{
buffer.readStrict(reinterpret_cast<char *>(res->data() + i * columns_num), sizeof(MarkInCompressedFile) * columns_num);
buffer.seek(sizeof(size_t), SEEK_CUR);
++i;
}
if (i * mark_size_in_bytes != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
res->protect();
return res;
};
auto mrk_path = data_part->index_granularity_info.getMarksFilePath(path + MergeTreeDataPartCompact::DATA_FILE_NAME);
marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, settings.save_marks_in_cache, columns_num};
}
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);

View File

@ -46,7 +46,6 @@ private:
size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule;
void initMarksLoader();
void seekToMark(size_t row_index, size_t column_index);
void readData(const String & name, IColumn & column, const IDataType & type,

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
MergeTreeReaderStream::MergeTreeReaderStream(
const String & path_prefix_,const String & data_file_extension_, size_t marks_count_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings,
MarkCache * mark_cache_,
@ -26,15 +26,13 @@ MergeTreeReaderStream::MergeTreeReaderStream(
: path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_)
, mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(mark_cache, index_granularity_info->getMarksFilePath(path_prefix),
marks_count, *index_granularity_info, save_marks_in_cache)
{
/// Compute the size of the buffer.
size_t max_mark_range_bytes = 0;
size_t sum_mark_range_bytes = 0;
/// Care should be taken to not load marks when the part is empty (marks_count == 0).
initMarksLoader();
for (const auto & mark_range : all_mark_ranges)
{
size_t left_mark = mark_range.begin;
@ -106,58 +104,6 @@ MergeTreeReaderStream::MergeTreeReaderStream(
}
void MergeTreeReaderStream::initMarksLoader()
{
if (marks_loader.initialized())
return;
auto load = [this](const String & mrk_path) -> MarkCache::MappedPtr
{
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
size_t file_size = Poco::File(mrk_path).getSize();
size_t mark_size = index_granularity_info->getMarkSizeInBytes();
size_t expected_file_size = mark_size * marks_count;
if (expected_file_size != file_size)
throw Exception(
"Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::CORRUPTED_DATA);
auto res = std::make_shared<MarksInCompressedFile>(marks_count);
if (!index_granularity_info->is_adaptive)
{
/// Read directly to marks.
ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data()));
if (buffer.eof() || buffer.buffer().size() != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
}
else
{
ReadBufferFromFile buffer(mrk_path, file_size, -1);
size_t i = 0;
while (!buffer.eof())
{
readIntBinary((*res)[i].offset_in_compressed_file, buffer);
readIntBinary((*res)[i].offset_in_decompressed_block, buffer);
buffer.seek(sizeof(size_t), SEEK_CUR);
++i;
}
if (i * mark_size != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
}
res->protect();
return res;
};
auto mrk_path = index_granularity_info->getMarksFilePath(path_prefix);
marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, save_marks_in_cache};
}
void MergeTreeReaderStream::seekToMark(size_t index)
{
MarkInCompressedFile mark = marks_loader.getMark(index);

View File

@ -31,8 +31,6 @@ public:
ReadBuffer * data_buffer;
private:
void initMarksLoader();
std::string path_prefix;
std::string data_file_extension;