This commit is contained in:
kssenii 2022-05-16 11:16:46 +02:00
parent 570d006655
commit 49201f217f
13 changed files with 26 additions and 62 deletions

View File

@ -287,7 +287,6 @@ std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
{ {
auto wrapped_path = wrappedPath(path); auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, settings, read_hint, file_size); auto buffer = delegate->readFile(wrapped_path, settings, read_hint, file_size);
buffer->setReadUntilPosition(FileEncryption::Header::kSize);
if (buffer->eof()) if (buffer->eof())
{ {
/// File is empty, that's a normal case, see DiskEncrypted::truncateFile(). /// File is empty, that's a normal case, see DiskEncrypted::truncateFile().
@ -297,7 +296,6 @@ std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
auto encryption_settings = current_settings.get(); auto encryption_settings = current_settings.get();
FileEncryption::Header header = readHeader(*buffer); FileEncryption::Header header = readHeader(*buffer);
String key = getKey(path, header, *encryption_settings); String key = getKey(path, header, *encryption_settings);
buffer->setReadUntilPosition(0); /// Reset position back.
return std::make_unique<ReadBufferFromEncryptedFile>(settings.local_fs_buffer_size, std::move(buffer), key, header); return std::make_unique<ReadBufferFromEncryptedFile>(settings.local_fs_buffer_size, std::move(buffer), key, header);
} }

View File

@ -142,12 +142,6 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
prefetch_future = {}; prefetch_future = {};
} }
if (position == 0)
{
read_until_position.reset();
return;
}
if (position > read_until_position) if (position > read_until_position)
{ {
read_until_position = position; read_until_position = position;

View File

@ -49,12 +49,13 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
const ContextPtr & context, const SelectQueryInfo & query_info) const ContextPtr & context, const SelectQueryInfo & query_info)
{ {
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
MergeTreeReaderSettings reader_settings; return
reader_settings.setReadSettings(context->getReadSettings()); {
reader_settings.save_marks_in_cache = true; .read_settings = context->getReadSettings(),
reader_settings.checksum_on_read = settings.checksum_on_read; .save_marks_in_cache = true,
reader_settings.read_in_order = query_info.input_order_info != nullptr; .checksum_on_read = settings.checksum_on_read,
return reader_settings; .read_in_order = query_info.input_order_info != nullptr,
};
} }
static const PrewhereInfoPtr & getPrewhereInfo(const SelectQueryInfo & query_info) static const PrewhereInfoPtr & getPrewhereInfo(const SelectQueryInfo & query_info)

View File

@ -314,7 +314,6 @@ MergeTreeData::MergeTreeData(
else else
{ {
auto buf = version_file.second->readFile(version_file.first); auto buf = version_file.second->readFile(version_file.first);
buf->setReadUntilEnd(); /// For asynchronous reading.
UInt32 read_format_version; UInt32 read_format_version;
readIntText(read_format_version, *buf); readIntText(read_format_version, *buf);
format_version = read_format_version; format_version = read_format_version;

View File

@ -107,8 +107,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path); size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path);
auto buffer = volume->getDisk()->readFile(marks_file_path, MergeTreeReaderSettings::createReadSettings().adjustBufferSize(marks_file_size), marks_file_size); auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size);
buffer->setReadUntilEnd(); /// For asynchronous reading from remote fs.
while (!buffer->eof()) while (!buffer->eof())
{ {
/// Skip offsets for columns /// Skip offsets for columns

View File

@ -125,8 +125,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
} }
else else
{ {
auto buffer = volume->getDisk()->readFile(marks_file_path, MergeTreeReaderSettings::createReadSettings().adjustBufferSize(marks_file_size), marks_file_size); auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size);
buffer->setReadUntilEnd(); /// For asynchronous reading from remote fs.
while (!buffer->eof()) while (!buffer->eof())
{ {
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block

View File

@ -14,6 +14,8 @@ using MMappedFileCachePtr = std::shared_ptr<MMappedFileCache>;
struct MergeTreeReaderSettings struct MergeTreeReaderSettings
{ {
/// Common read settings.
ReadSettings read_settings;
/// If save_marks_in_cache is false, then, if marks are not in cache, /// If save_marks_in_cache is false, then, if marks are not in cache,
/// we will load them but won't save in the cache, to avoid evicting other data. /// we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache = false; bool save_marks_in_cache = false;
@ -21,31 +23,6 @@ struct MergeTreeReaderSettings
bool checksum_on_read = true; bool checksum_on_read = true;
/// True if we read in order of sorting key. /// True if we read in order of sorting key.
bool read_in_order = false; bool read_in_order = false;
MergeTreeReaderSettings()
{
/// Turn on merge tree related assetions for asynchronous reading from remote filesystem.
read_settings.must_read_until_position = true;
}
void setReadSettings(const ReadSettings & settings)
{
read_settings = settings;
read_settings.must_read_until_position = true;
}
const ReadSettings & getReadSettings() const { return read_settings; }
static ReadSettings createReadSettings()
{
ReadSettings settings;
settings.must_read_until_position = true;
return settings;
}
private:
/// Common read settings.
ReadSettings read_settings;
}; };
struct MergeTreeWriterSettings struct MergeTreeWriterSettings

View File

@ -60,13 +60,11 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
ErrorCodes::CORRUPTED_DATA); ErrorCodes::CORRUPTED_DATA);
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark); auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
auto read_settings = MergeTreeReaderSettings::createReadSettings().adjustBufferSize(file_size);
if (!index_granularity_info.is_adaptive) if (!index_granularity_info.is_adaptive)
{ {
/// Read directly to marks. /// Read directly to marks.
auto buffer = disk->readFile(mrk_path, read_settings, file_size); auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size);
buffer->setReadUntilEnd(); /// For asynchronous reading from remote fs.
buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size); buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size);
if (!buffer->eof()) if (!buffer->eof())
@ -75,8 +73,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
} }
else else
{ {
auto buffer = disk->readFile(mrk_path, read_settings, file_size); auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size);
buffer->setReadUntilEnd(); /// For asynchronous reading from remote fs.
size_t i = 0; size_t i = 0;
while (!buffer->eof()) while (!buffer->eof())
{ {

View File

@ -78,10 +78,9 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data. /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges); auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
if (buffer_size) if (buffer_size)
settings.setReadSettings(settings.getReadSettings().adjustBufferSize(buffer_size)); settings.read_settings = settings.read_settings.adjustBufferSize(buffer_size);
const auto & read_settings = settings.getReadSettings(); if (!settings.read_settings.local_fs_buffer_size || !settings.read_settings.remote_fs_buffer_size)
if (!read_settings.local_fs_buffer_size || !read_settings.remote_fs_buffer_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read to empty buffer."); throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read to empty buffer.");
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION; const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
@ -93,7 +92,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
{ {
return data_part->volume->getDisk()->readFile( return data_part->volume->getDisk()->readFile(
full_data_path, full_data_path,
settings.getReadSettings()); settings.read_settings);
}, },
uncompressed_cache, uncompressed_cache,
/* allow_different_codecs = */ true); /* allow_different_codecs = */ true);
@ -114,7 +113,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
std::make_unique<CompressedReadBufferFromFile>( std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile( data_part->volume->getDisk()->readFile(
full_data_path, full_data_path,
settings.getReadSettings()), settings.read_settings),
/* allow_different_codecs = */ true); /* allow_different_codecs = */ true);
if (profile_callback_) if (profile_callback_)

View File

@ -57,7 +57,8 @@ MergeTreeReaderStream::MergeTreeReaderStream(
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
/// For example: part has single dictionary and all marks point to the same position. /// For example: part has single dictionary and all marks point to the same position.
ReadSettings read_settings = settings.getReadSettings(); ReadSettings read_settings = settings.read_settings;
read_settings.must_read_until_position = true;
if (max_mark_range_bytes != 0) if (max_mark_range_bytes != 0)
read_settings = read_settings.adjustBufferSize(max_mark_range_bytes); read_settings = read_settings.adjustBufferSize(max_mark_range_bytes);

View File

@ -74,7 +74,7 @@ size_t MergeTreeReaderWide::readRows(
std::unordered_map<String, ISerialization::SubstreamsCache> caches; std::unordered_map<String, ISerialization::SubstreamsCache> caches;
std::unordered_set<std::string> prefetched_streams; std::unordered_set<std::string> prefetched_streams;
if (disk->isRemote() ? settings.getReadSettings().remote_fs_prefetch : settings.getReadSettings().local_fs_prefetch) if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{ {
/// Request reading of data in advance, /// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns. /// so if reading can be asynchronous, it will also be performed in parallel for all columns.

View File

@ -59,9 +59,11 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
if (read_with_direct_io) if (read_with_direct_io)
read_settings.direct_io_threshold = 1; read_settings.direct_io_threshold = 1;
MergeTreeReaderSettings reader_settings; MergeTreeReaderSettings reader_settings =
reader_settings.save_marks_in_cache = false; {
reader_settings.setReadSettings(read_settings); .read_settings = read_settings,
.save_marks_in_cache = false
};
reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata, reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata,
MarkRanges{MarkRange(0, data_part->getMarksCount())}, MarkRanges{MarkRange(0, data_part->getMarksCount())},

View File

@ -10,9 +10,7 @@ namespace DB
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path) static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{ {
size_t file_size = disk->getFileSize(path); size_t file_size = disk->getFileSize(path);
auto buf = disk->readFile(path, MergeTreeReaderSettings::createReadSettings().adjustBufferSize(file_size), file_size); return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
buf->setReadUntilEnd();
return buf;
} }
PartMetadataManagerOrdinary::PartMetadataManagerOrdinary(const IMergeTreeDataPart * part_) : IPartMetadataManager(part_) PartMetadataManagerOrdinary::PartMetadataManagerOrdinary(const IMergeTreeDataPart * part_) : IPartMetadataManager(part_)