Progress on development

This commit is contained in:
Alexey Milovidov 2021-08-25 00:45:58 +03:00
parent 4d9ad3725d
commit 8f57216180
20 changed files with 84 additions and 58 deletions

View File

@ -41,7 +41,7 @@ std::unique_ptr<ReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
if (disk)
return disk->readFile(file_path);
else
return createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(file_path, {}, 0);
}
}

View File

@ -10,7 +10,7 @@ namespace
{
String readFile(const String & file_path)
{
auto buf = createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
auto buf = createReadBufferFromFileBase(file_path, {}, 0);
String s;
readStringUntilEOF(s, *buf);
return s;

View File

@ -578,6 +578,7 @@
M(607, BACKUP_ELEMENT_DUPLICATE) \
M(608, CANNOT_RESTORE_TABLE) \
M(609, CANNOT_ADVISE) \
M(610, UNKNOWN_READ_METHOD) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -3,6 +3,7 @@
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
#include <IO/ReadSettings.h>
namespace Poco::Util
@ -500,7 +501,7 @@ class IColumn;
\
M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Int32, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \

View File

@ -177,7 +177,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
[this, path, buf_size, mode]()
{
/// Copy file from cache to actual disk when cached buffer is finalized.
auto src_buffer = cache_disk->readFile(path);
auto src_buffer = cache_disk->readFile(path, ReadSettings(), 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();

View File

@ -262,7 +262,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String &
if (old_file_size)
{
/// Append mode: we continue to use the same header.
auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize);
auto read_buffer = delegate->readFile(wrapped_path, ReadSettings().adjustBufferSize(FileEncryption::Header::kSize));
header = readHeader(*read_buffer);
key = getKey(path, header, *settings);
}

View File

@ -7,6 +7,7 @@
#include <Common/CurrentMetrics.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/Progress.h>
#include <sys/stat.h>

View File

@ -52,6 +52,14 @@ struct ReadSettings
/// For 'pread_threadpool' method. Lower is more priority.
size_t priority = 0;
ReadSettings adjustBufferSize(size_t file_size) const
{
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size);
return res;
}
};
}

View File

@ -118,6 +118,7 @@ namespace ErrorCodes
extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_READ_METHOD;
}
@ -2691,4 +2692,31 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
return ignored_part_uuids;
}
ReadSettings Context::getReadSettings() const
{
ReadSettings res;
if (settings.local_filesystem_read_method.value == "read")
res.local_fs_method = ReadMethod::read;
else if (settings.local_filesystem_read_method.value == "pread")
res.local_fs_method = ReadMethod::pread;
else if (settings.local_filesystem_read_method.value == "mmap")
res.local_fs_method = ReadMethod::mmap;
else if (settings.local_filesystem_read_method.value == "pread_threadpool")
res.local_fs_method = ReadMethod::pread_threadpool;
else
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}'", settings.local_filesystem_read_method.value);
res.local_fs_prefetch = settings.local_filesystem_read_prefetch;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.mmap_cache = getMMappedFileCache().get();
return res;
}
}

View File

@ -825,6 +825,9 @@ public:
ReadTaskCallback getReadTaskCallback() const;
void setReadTaskCallback(ReadTaskCallback && callback);
/** Get settings for reading from filesystem. */
ReadSettings getReadSettings() const;
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -43,11 +43,9 @@ namespace ErrorCodes
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
{
const auto & settings = context->getSettingsRef();
return {
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
.min_bytes_to_use_mmap_io = settings.min_bytes_to_use_mmap_io,
.mmap_cache = context->getMMappedFileCache(),
.max_read_buffer_size = settings.max_read_buffer_size,
return
{
.read_settings = context->getReadSettings(),
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
};

View File

@ -57,10 +57,7 @@ namespace ErrorCodes
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
size_t file_size = disk->getFileSize(path);
ReadSettings settings;
settings.local_fs_buffer_size = std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, file_size);
settings.remote_fs_buffer_size = settings.local_fs_buffer_size;
return disk->readFile(path, settings, file_size);
return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path)

View File

@ -107,9 +107,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path);
ReadSettings settings;
settings.local_fs_buffer_size = settings.remote_fs_buffer_size = marks_file_size;
auto buffer = volume->getDisk()->readFile(marks_file_path, settings, marks_file_size);
auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size);
while (!buffer->eof())
{
/// Skip offsets for columns

View File

@ -129,9 +129,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
}
else
{
ReadSettings settings;
settings.local_fs_buffer_size = settings.remote_fs_buffer_size = marks_file_size;
auto buffer = volume->getDisk()->readFile(marks_file_path, marks_file_size);
auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size);
while (!buffer->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block

View File

@ -13,10 +13,8 @@ using MMappedFileCachePtr = std::shared_ptr<MMappedFileCache>;
struct MergeTreeReaderSettings
{
size_t min_bytes_to_use_direct_io = 0;
size_t min_bytes_to_use_mmap_io = 0;
MMappedFileCachePtr mmap_cache;
size_t max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
/// Common read settings.
ReadSettings read_settings;
/// If save_marks_in_cache is false, then, if marks are not in cache,
/// we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache = false;

View File

@ -60,13 +60,10 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(marks_count * columns_in_mark);
ReadSettings settings;
settings.local_fs_buffer_size = settings.remote_fs_buffer_size = file_size;
if (!index_granularity_info.is_adaptive)
{
/// Read directly to marks.
auto buffer = disk->readFile(mrk_path, settings, file_size);
auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size);
buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size);
if (!buffer->eof())
@ -75,7 +72,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
}
else
{
auto buffer = disk->readFile(mrk_path, settings, file_size);
auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size);
size_t i = 0;
while (!buffer->eof())
{

View File

@ -172,9 +172,7 @@ namespace
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
size_t file_size = disk->getFileSize(path);
ReadSettings settings;
settings.local_fs_buffer_size = settings.remote_fs_buffer_size = std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, file_size);
return disk->readFile(path, settings, file_size);
return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
}
String MergeTreePartition::getID(const MergeTreeData & storage) const

View File

@ -63,14 +63,14 @@ public:
LogSource(
size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
size_t mark_number_, size_t rows_limit_, ReadSettings read_settings_)
: SourceWithProgress(getHeader(columns_)),
block_size(block_size_),
columns(columns_),
storage(storage_),
mark_number(mark_number_),
rows_limit(rows_limit_),
max_read_buffer_size(max_read_buffer_size_)
read_settings(std::move(read_settings_))
{
}
@ -86,14 +86,14 @@ private:
size_t mark_number; /// from what mark to read data
size_t rows_limit; /// The maximum number of rows that can be read
size_t rows_read = 0;
size_t max_read_buffer_size;
ReadSettings read_settings;
std::unordered_map<String, SerializationPtr> serializations;
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_)
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path))))
Stream(const DiskPtr & disk, const String & data_path, size_t offset, ReadSettings read_settings_)
: plain(disk->readFile(data_path, read_settings_.adjustBufferSize(disk->getFileSize(data_path))))
, compressed(*plain)
{
if (offset)
@ -188,7 +188,7 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
offset = file_it->second.marks[mark_number].offset;
auto & data_file_path = file_it->second.data_file_path;
auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, max_read_buffer_size).first;
auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, read_settings).first;
return &it->second.compressed;
};
@ -563,7 +563,7 @@ void StorageLog::loadMarks(std::chrono::seconds lock_timeout)
for (auto & file : files_by_index)
file->second.marks.reserve(marks_count);
std::unique_ptr<ReadBuffer> marks_rb = disk->readFile(marks_file_path, 32768);
std::unique_ptr<ReadBuffer> marks_rb = disk->readFile(marks_file_path, ReadSettings().adjustBufferSize(32768));
while (!marks_rb->eof())
{
for (auto & file : files_by_index)
@ -678,7 +678,7 @@ Pipe StorageLog::read(
if (num_streams > marks_size)
num_streams = marks_size;
size_t max_read_buffer_size = context->getSettingsRef().max_read_buffer_size;
ReadSettings read_settings = context->getReadSettings();
for (size_t stream = 0; stream < num_streams; ++stream)
{
@ -694,7 +694,7 @@ Pipe StorageLog::read(
*this,
mark_begin,
rows_end - rows_begin,
max_read_buffer_size));
read_settings));
}
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.

View File

@ -78,7 +78,7 @@ public:
StorageStripeLog & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Names & column_names,
size_t max_read_buffer_size_,
ReadSettings read_settings_,
std::shared_ptr<const IndexForNativeFormat> & index_,
IndexForNativeFormat::Blocks::const_iterator index_begin_,
IndexForNativeFormat::Blocks::const_iterator index_end_)
@ -86,7 +86,7 @@ public:
getHeader(storage_, metadata_snapshot_, column_names, index_begin_, index_end_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_read_buffer_size(max_read_buffer_size_)
, read_settings(std::move(read_settings_))
, index(index_)
, index_begin(index_begin_)
, index_end(index_end_)
@ -123,7 +123,7 @@ protected:
private:
StorageStripeLog & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_read_buffer_size;
ReadSettings read_settings;
std::shared_ptr<const IndexForNativeFormat> index;
IndexForNativeFormat::Blocks::const_iterator index_begin;
@ -145,9 +145,7 @@ private:
started = true;
String data_file_path = storage.table_path + "data.bin";
size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file_path));
data_in.emplace(storage.disk->readFile(data_file_path, buffer_size));
data_in.emplace(storage.disk->readFile(data_file_path, read_settings.adjustBufferSize(storage.disk->getFileSize(data_file_path))));
block_in.emplace(*data_in, 0, index_begin, index_end);
}
}
@ -345,7 +343,9 @@ Pipe StorageStripeLog::read(
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
}
CompressedReadBufferFromFile index_in(disk->readFile(index_file, 4096));
ReadSettings read_settings = context->getReadSettings();
CompressedReadBufferFromFile index_in(disk->readFile(index_file, read_settings.adjustBufferSize(4096)));
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
size_t size = index->blocks.size();
@ -361,7 +361,7 @@ Pipe StorageStripeLog::read(
std::advance(end, (stream + 1) * size / num_streams);
pipes.emplace_back(std::make_shared<StripeLogSource>(
*this, metadata_snapshot, column_names, context->getSettingsRef().max_read_buffer_size, index, begin, end));
*this, metadata_snapshot, column_names, read_settings, index, begin, end));
}
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.

View File

@ -70,11 +70,11 @@ public:
size_t block_size_,
const NamesAndTypesList & columns_,
StorageTinyLog & storage_,
size_t max_read_buffer_size_,
ReadSettings read_settings_,
FileChecker::Map file_sizes_)
: SourceWithProgress(getHeader(columns_))
, block_size(block_size_), columns(columns_), storage(storage_)
, max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_))
, read_settings(std::move(read_settings_)), file_sizes(std::move(file_sizes_))
{
}
@ -88,13 +88,15 @@ private:
NamesAndTypesList columns;
StorageTinyLog & storage;
bool is_finished = false;
size_t max_read_buffer_size;
ReadSettings read_settings;
FileChecker::Map file_sizes;
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size)
: plain(file_size ? disk->readFile(data_path, std::min(max_read_buffer_size_, file_size)) : std::make_unique<ReadBuffer>(nullptr, 0)),
Stream(const DiskPtr & disk, const String & data_path, ReadSettings read_settings_, size_t file_size)
: plain(file_size
? disk->readFile(data_path, read_settings_.adjustBufferSize(file_size))
: std::make_unique<ReadBuffer>(nullptr, 0)),
limited(std::make_unique<LimitReadBuffer>(*plain, file_size, false)),
compressed(*limited)
{
@ -178,7 +180,7 @@ void TinyLogSource::readData(const NameAndTypePair & name_and_type,
{
String file_path = storage.files[stream_name].data_file_path;
stream = std::make_unique<Stream>(
storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]);
storage.disk, file_path, read_settings, file_sizes[fileName(file_path)]);
}
return &stream->compressed;
@ -493,8 +495,6 @@ Pipe StorageTinyLog::read(
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
const Settings & settings = context->getSettingsRef();
std::shared_lock lock{rwlock, getLockTimeout(context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -504,7 +504,7 @@ Pipe StorageTinyLog::read(
max_block_size,
Nested::convertToSubcolumns(all_columns),
*this,
settings.max_read_buffer_size,
context->getReadSettings(),
file_checker.getFileSizes()));
}