This commit is contained in:
zhongyuankai 2022-09-06 23:41:39 +08:00
parent 0bf76fe642
commit c7b2f719c6
8 changed files with 25 additions and 27 deletions

View File

@ -339,6 +339,12 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, parent_part(parent_part_)
, use_metadata_cache(storage.use_metadata_cache)
{
auto mrk_ext = MergeTreeIndexGranularityInfo::getMarksExtensionFromFilesystem(data_part_storage_);
if (mrk_ext)
index_granularity_info = MergeTreeIndexGranularityInfo(storage_, MarkType{*mrk_ext});
else
index_granularity_info = MergeTreeIndexGranularityInfo(storage_, part_type_);
if (parent_part)
state = MergeTreeDataPartState::Active;
incrementStateMetric(state);

View File

@ -11,7 +11,6 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int NO_FILE_IN_DATA_PART;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int LOGICAL_ERROR;
@ -103,7 +102,6 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
return size;
}
void MergeTreeDataPartWide::loadIndexGranularity()
{
index_granularity_info.changeGranularityIfRequired(data_part_storage);

View File

@ -35,7 +35,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
marks_file_hashing = std::make_unique<HashingWriteBuffer>(*marks_file);
if (MarkType(marks_file_extension).compressed)
if (data_part_->index_granularity_info.mark_type.compressed)
{
marks_compressor = std::make_unique<CompressedWriteBuffer>(
*marks_file_hashing,

View File

@ -10,7 +10,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_PART_TYPE;
extern const int INCORRECT_FILE_NAME;
}
@ -101,7 +100,12 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromF
}
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_)
: mark_type(storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue())
: MergeTreeIndexGranularityInfo(storage, {storage.canUseAdaptiveGranularity(), storage.getSettings()->compress_marks, type_.getValue()})
{
}
MergeTreeIndexGranularityInfo::MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MarkType mark_type_)
: mark_type(mark_type_)
{
fixed_index_granularity = storage.getSettings()->index_granularity;
}

View File

@ -44,6 +44,8 @@ public:
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MergeTreeDataPartType type_);
MergeTreeIndexGranularityInfo(const MergeTreeData & storage, MarkType mark_type_);
void changeGranularityIfRequired(const DataPartStoragePtr & data_part_storage);
String getMarksFilePath(const String & path_prefix) const
@ -51,13 +53,6 @@ public:
return path_prefix + mark_type.getFileExtension();
}
/// TODO: This is non-passable (method overload), remove before merge.
String getMarksFilePath(const DataPartStoragePtr & data_part_storage, const String & path_prefix) const
{
auto mrk_ext = getMarksExtensionFromFilesystem(data_part_storage);
return path_prefix + mrk_ext.value_or(mark_type.getFileExtension());
}
size_t getMarkSizeInBytes(size_t columns_num = 1) const;
static std::optional<std::string> getMarksExtensionFromFilesystem(const DataPartStoragePtr & data_part_storage);

View File

@ -61,12 +61,9 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
std::string file_extension = fs::path(mrk_path).extension();
bool compressed_marks = MarkType(file_extension).compressed;
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
if (!compressed_marks && expected_uncompressed_size != file_size)
if (!index_granularity_info.mark_type.compressed && expected_uncompressed_size != file_size)
throw Exception(
ErrorCodes::CORRUPTED_DATA,
"Bad size of marks file '{}': {}, must be: {}",
@ -75,7 +72,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto buffer = data_part_storage->readFile(mrk_path, read_settings.adjustBufferSize(file_size), file_size, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
if (!compressed_marks)
if (!index_granularity_info.mark_type.compressed)
reader = std::move(buffer);
else
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(buffer));

View File

@ -37,7 +37,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
, marks_loader(
data_part->data_part_storage,
mark_cache,
data_part->index_granularity_info.getMarksFilePath(data_part_->data_part_storage, MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->index_granularity_info.getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part->getMarksCount(),
data_part->index_granularity_info,
settings.save_marks_in_cache,

View File

@ -5,7 +5,7 @@
#include <Common/ErrorCodes.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace ProfileEvents
@ -48,16 +48,14 @@ std::unique_ptr<ReadBuffer> PartMetadataManagerWithCache::read(const String & fi
if (!status.ok())
{
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheMiss);
auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
std::unique_ptr<ReadBuffer> reader;
if (!isCompressedFromFileName(file_name))
{
auto in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
readStringUntilEOF(value, *in);
}
reader = std::move(in);
else
{
auto in = CompressedReadBuffer(*part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt));
readStringUntilEOF(value, in);
}
reader = std::make_unique<CompressedReadBufferFromFile>(std::move(in));
readStringUntilEOF(value, *reader);
cache->put(key, value);
}
else