polymorphic parts (development)

This commit is contained in:
CurtizJ 2019-10-11 18:37:16 +03:00
parent b433add65c
commit 18163e4d7f
12 changed files with 189 additions and 25 deletions

View File

@ -144,7 +144,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
, disk(disk_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage) {}
, index_granularity_info(shared_from_this()) {}
IMergeTreeDataPart::IMergeTreeDataPart(
const MergeTreeData & storage_,
@ -157,7 +157,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, info(info_)
, disk(disk_)
, relative_path(relative_path_.value_or(name_))
, index_granularity_info(storage) {}
, index_granularity_info(shared_from_this()) {}
ColumnSize IMergeTreeDataPart::getColumnSize(const String & column_name, const IDataType & type) const

View File

@ -84,6 +84,10 @@ public:
/// If no checksums are present returns the name of the first physically existing column.
virtual String getColumnNameWithMinumumCompressedSize() const = 0;
virtual String getMarkExtension(bool /* is_adaptive */) const { return ""; }
virtual size_t getMarkSize(bool /* is_adaptive */) const { return 0; }
// virtual void detach() = 0;
// virtual Checksums check(

View File

@ -65,6 +65,32 @@ static bool arrayHasNoElementsRead(const IColumn & column)
return last_offset != 0;
}
IMergeTreeReader::MarksPtr IMergeTreeReader::loadMarks(const String & mrk_path, const LoadFunc & load_func)
{
MarksPtr marks;
if (mark_cache)
{
auto key = mark_cache->hash(mrk_path);
if (settings.save_marks_in_cache)
{
marks = mark_cache->getOrSet(key, load_func);
}
else
{
marks = mark_cache->get(key);
if (!marks)
marks = load_func();
}
}
else
marks = load_func();
if (!marks)
throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR);
return marks;
}
void IMergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults, size_t num_rows)
{

View File

@ -55,7 +55,13 @@ public:
return all_mark_ranges.back().begin;
}
using MarksPtr = MarkCache::MappedPtr;
protected:
using LoadFunc = std::function<MarksPtr()>;
MarksPtr loadMarks(const String & mrk_path, const LoadFunc & load_func);
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk

View File

@ -58,6 +58,13 @@ public:
bool isStoredOnDisk() const override { return true; }
String getMarkExtension(bool /* is_adaptive */) const override { return ".mrk3"; }
bool getMarkSize(bool is_adaptive)
{
return sizeof(size_t) + columns.size() * sizeof(size_t) * 2;
}
void remove() const override;
/// NOTE: Returns zeros if column files are not found in checksums.

View File

@ -315,7 +315,7 @@ void MergeTreeDataPartWide::loadColumnsChecksumsIndexes(bool require_columns_che
void MergeTreeDataPartWide::loadIndexGranularity()
{
String full_path = getFullPath();
index_granularity_info.changeGranularityIfRequired(full_path);
index_granularity_info.changeGranularityIfRequired(shared_from_this());
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);

View File

@ -62,6 +62,14 @@ public:
bool supportsVerticalMerge() const override { return true; }
String getMarkExtension(bool is_adaptive) const override { return is_adaptive ? ".mrk2" : ".mrk"; }
size_t getMarkSize(bool is_adaptive) const override
{
size_t nums = is_adaptive ? 3 : 2;
return sizeof(size_t) * nums;
}
/// NOTE: Returns zeros if column files are not found in checksums.
/// NOTE: You must ensure that no ALTERs are in progress when calculating ColumnSizes.
/// (either by locking columns_lock, or by locking table structure).

View File

@ -7,6 +7,13 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(const std::string & path_to_part) const
{
if (Poco::File(path_to_part).exists())
@ -22,39 +29,42 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(
return {};
}
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(
const MergeTreeData & storage)
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeDataPartPtr & part)
{
const auto storage_settings = storage.getSettings();
const auto storage_settings = part->storage.getSettings();
fixed_index_granularity = storage_settings->index_granularity;
/// Granularity is fixed
if (!storage.canUseAdaptiveGranularity())
if (!part->storage.canUseAdaptiveGranularity())
setNonAdaptive();
else
setAdaptive(storage_settings->index_granularity_bytes);
mark_size_in_bytes = part->getMarkSize(is_adaptive);
marks_file_extension = part->getMarkExtension(is_adaptive);
}
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const std::string & path_to_part)
void MergeTreeIndexGranularityInfo::changeGranularityIfRequired(const MergeTreeDataPartPtr & part)
{
auto mrk_ext = getMrkExtensionFromFS(part->getFullPath());
if (mrk_ext && *mrk_ext == ".mrk") /// TODO
{
auto mrk_ext = getMrkExtensionFromFS(path_to_part);
if (mrk_ext && *mrk_ext == getNonAdaptiveMrkExtension())
setNonAdaptive();
mark_size_in_bytes = part->getMarkSize(is_adaptive);
marks_file_extension = part->getMarkExtension(is_adaptive);
}
}
void MergeTreeIndexGranularityInfo::setAdaptive(size_t index_granularity_bytes_)
{
is_adaptive = true;
mark_size_in_bytes = getAdaptiveMrkSize();
marks_file_extension = getAdaptiveMrkExtension();
index_granularity_bytes = index_granularity_bytes_;
}
void MergeTreeIndexGranularityInfo::setNonAdaptive()
{
is_adaptive = false;
mark_size_in_bytes = getNonAdaptiveMrkSize();
marks_file_extension = getNonAdaptiveMrkExtension();
index_granularity_bytes = 0;
}

View File

@ -2,15 +2,20 @@
#include <optional>
#include <Core/Types.h>
// #include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
class MergeTreeData;
class IMergeTreeDataPart;
/// Meta information about index granularity
struct MergeTreeIndexGranularityInfo
{
public:
using MergeTreeDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
/// Marks file extension '.mrk' or '.mrk2'
String marks_file_extension;
@ -26,14 +31,13 @@ public:
/// Approximate bytes size of one granule
size_t index_granularity_bytes;
MergeTreeIndexGranularityInfo(
const MergeTreeData & storage);
MergeTreeIndexGranularityInfo(const MergeTreeDataPartPtr & part);
void changeGranularityIfRequired(const std::string & path_to_part);
void changeGranularityIfRequired(const MergeTreeDataPartPtr & part);
String getMarksFilePath(const String & column_path) const
String getMarksFilePath(const String & path_prefix) const
{
return column_path + marks_file_extension;
return path_prefix + marks_file_extension;
}
private:

View File

@ -0,0 +1,68 @@
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Poco/File.h>
namespace DB
{
MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr & data_part_,
const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
const MarkRanges & mark_ranges_, const ReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_)
: IMergeTreeReader(data_part_, columns_
, uncompressed_cache_, mark_cache_, mark_ranges_
, settings_, avg_value_size_hints_)
{
}
size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
{
UNUSED(from_mark);
UNUSED(continue_reading);
UNUSED(max_rows_to_read);
UNUSED(res);
return 0;
}
void MergeTreeReaderCompact::loadMarks()
{
const auto & index_granularity_info = data_part->index_granularity_info;
size_t marks_count = data_part->getMarksCount();
std::string mrk_path = index_granularity_info.getMarksFilePath(NAME_OF_FILE_WITH_DATA);
auto load_func = [&]() -> MarkCache::MappedPtr
{
size_t file_size = Poco::File(mrk_path).getSize();
size_t expected_file_size = index_granularity_info.mark_size_in_bytes * marks_count;
if (expected_file_size != file_size)
throw Exception(
"Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
ErrorCodes::CORRUPTED_DATA);
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns.size());
ReadBufferFromFile buffer(mrk_path, file_size);
size_t i = 0;
while (!buffer.eof())
{
buffer.seek(sizeof(size_t));
buffer.read(marks.getRowAddress(i), marks.getRowSize());
++i;
}
if (i * index_granularity_info.mark_size_in_bytes != file_size)
throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA);
res->protect();
return res;
};
auto marks_array = IMergeTreeReader::loadMarks(mrk_path, load_func);
marks = MarksInCompressedFileCompact(marks_array, columns.size());
}
}

View File

@ -8,6 +8,36 @@
namespace DB
{
class MarksInCompressedFileCompact
{
public:
using MarksPtr = MarkCache::MappedPtr;
MarksInCompressedFileCompact() = default;
MarksInCompressedFileCompact(const MarksPtr & data_, size_t columns_num_)
: data(data_), columns_num(columns_num_) {}
const MarkInCompressedFile & getMark(size_t index, size_t column) const
{
return (*data)[index * columns_num + column];
}
char * getRowAddress(size_t index) const
{
return reinterpret_cast<char *>(data->data() + index * columns_num);
}
size_t getRowSize() const
{
return sizeof(MarkInCompressedFile) * columns_num;
}
private:
MarksPtr data;
size_t columns_num;
};
/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).
@ -20,9 +50,7 @@ public:
MarkCache * mark_cache_,
const MarkRanges & mark_ranges_,
const ReaderSettings & settings_,
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
const ValueSizeMap & avg_value_size_hints_ = ValueSizeMap{});
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
@ -31,8 +59,11 @@ public:
private:
ReadBuffer * data_buffer;
using Offsets = std::vector<MarkCache::MappedPtr>;
Offsets offsets;
MarksInCompressedFileCompact marks;
void loadMarks();
static auto constexpr NAME_OF_FILE_WITH_DATA = "data";
/// Columns that are read.