Move reader selection logic back to StorageFile.

This commit is contained in:
pufit 2022-12-11 16:15:41 -05:00
parent e38a93c45a
commit 1d6e77a29a
3 changed files with 86 additions and 107 deletions

View File

@ -2,7 +2,6 @@
#include <IO/ReadBufferFromEmptyFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/MMapReadBufferFromFileWithCache.h>
#include <IO/MMapReadBufferFromFile.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <IO/SynchronousReader.h>
@ -24,37 +23,22 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int CANNOT_STAT;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptorBase(
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size,
int flags,
char * existing_memory,
size_t alignment,
bool read_from_fd,
int fd)
size_t alignment)
{
if (file_size.has_value() && !*file_size)
return std::make_unique<ReadBufferFromEmptyFile>();
struct stat file_stat{};
if (read_from_fd)
{
if (0 != fstat(fd, &file_stat))
throwFromErrno("Cannot stat file descriptor", ErrorCodes::CANNOT_STAT);
}
else
{
if (0 != stat(filename.c_str(), &file_stat))
throwFromErrno("Cannot stat file " + filename, ErrorCodes::CANNOT_STAT);
}
size_t estimated_size = file_stat.st_size;
size_t estimated_size = 0;
if (read_hint.has_value())
estimated_size = *read_hint;
else if (file_size.has_value())
@ -63,24 +47,23 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptor
if (!existing_memory
&& settings.local_fs_method == LocalFSReadMethod::mmap
&& settings.mmap_threshold
&& estimated_size >= settings.mmap_threshold
&& S_ISREG(file_stat.st_mode))
&& settings.mmap_cache
&& estimated_size >= settings.mmap_threshold)
{
try
{
std::unique_ptr<ReadBufferFromFileBase> res;
if (settings.mmap_cache)
res = std::make_unique<MMapReadBufferFromFileWithCache>(*settings.mmap_cache, filename, 0, estimated_size);
std::unique_ptr<MMapReadBufferFromFileWithCache> res;
if (file_size)
res = std::make_unique<MMapReadBufferFromFileWithCache>(*settings.mmap_cache, filename, 0, *file_size);
else
res = std::make_unique<MMapReadBufferFromFile>(filename, 0, estimated_size);
res = std::make_unique<MMapReadBufferFromFileWithCache>(*settings.mmap_cache, filename, 0, *file_size);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMap);
return res;
}
catch (const ErrnoException &)
{
/// Fallback if mmap is not supported.
/// Fallback if mmap is not supported (example: pipe).
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMapFailed);
}
}
@ -89,21 +72,13 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptor
{
std::unique_ptr<ReadBufferFromFileBase> res;
/// Pread works only with regular files, so we explicitly fallback to read in other cases.
if (settings.local_fs_method == LocalFSReadMethod::read || !S_ISREG(file_stat.st_mode))
if (settings.local_fs_method == LocalFSReadMethod::read)
{
if (read_from_fd)
res = std::make_unique<ReadBufferFromFileDescriptor>(fd, buffer_size, existing_memory, alignment, file_size);
else
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
}
else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap)
{
if (read_from_fd)
res = std::make_unique<ReadBufferFromFileDescriptorPRead>(fd, buffer_size, existing_memory, alignment, file_size);
else
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(
filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
@ -112,11 +87,6 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptor
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
if (read_from_fd)
res = std::make_unique<AsynchronousReadBufferFromFileDescriptor>(
reader, settings.priority, fd, buffer_size, existing_memory, alignment, file_size);
else
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
}
@ -127,11 +97,6 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptor
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
if (read_from_fd)
res = std::make_unique<AsynchronousReadBufferFromFileDescriptor>(
reader, settings.priority, fd, buffer_size, existing_memory, alignment, file_size);
else
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size);
}
@ -209,26 +174,4 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptor
return create(buffer_size, flags);
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size,
int flags_,
char * existing_memory,
size_t alignment)
{
return createReadBufferFromFileOrFileDescriptorBase(filename, settings, read_hint, file_size, flags_, existing_memory, alignment);
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileDescriptorBase(
int fd,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size,
char * existing_memory ,
size_t alignment)
{
return createReadBufferFromFileOrFileDescriptorBase({}, settings, read_hint, file_size, -1, existing_memory, alignment, true, fd);
}
}

View File

@ -14,17 +14,6 @@ namespace DB
* @param read_hint - the number of bytes to read hint
* @param file_size - size of file
*/
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileOrFileDescriptorBase(
const std::string & filename,
const ReadSettings & settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {},
int flags_ = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
bool read_from_fd = false,
int fd = 0);
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
@ -34,11 +23,4 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
char * existing_memory = nullptr,
size_t alignment = 0);
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileDescriptorBase(
int fd,
const ReadSettings & settings,
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {},
char * existing_memory = nullptr,
size_t alignment = 0);
}

View File

@ -38,8 +38,7 @@
#include <Common/typeid_cast.h>
#include <Common/parseGlobs.h>
#include <Common/filesystemHelpers.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Common/ProfileEvents.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -51,6 +50,13 @@
#include <filesystem>
namespace ProfileEvents
{
extern const Event CreatedReadBufferOrdinary;
extern const Event CreatedReadBufferMMap;
extern const Event CreatedReadBufferMMapFailed;
}
namespace fs = std::filesystem;
namespace DB
@ -69,6 +75,7 @@ namespace ErrorCodes
extern const int FILE_DOESNT_EXIST;
extern const int TIMEOUT_EXCEEDED;
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_STAT;
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
@ -182,6 +189,7 @@ void checkCreationIsAllowed(
std::unique_ptr<ReadBuffer> createReadBuffer(
const String & current_path,
bool use_table_fd,
const String & storage_name,
int table_fd,
const String & compression_method,
ContextPtr context)
@ -189,27 +197,73 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
std::unique_ptr<ReadBuffer> nested_buffer;
CompressionMethod method;
auto read_method = context->getSettingsRef().storage_file_read_method.value;
auto read_settings = context->getReadSettings();
read_settings.mmap_threshold = 1;
read_settings.mmap_cache = nullptr; /// Turn off mmap cache for Storage File
if (auto opt_method = magic_enum::enum_cast<LocalFSReadMethod>(read_method))
read_settings.local_fs_method = *opt_method;
auto read_method_string = context->getSettingsRef().storage_file_read_method.value;
LocalFSReadMethod read_method;
if (auto opt_method = magic_enum::enum_cast<LocalFSReadMethod>(read_method_string))
read_method = *opt_method;
else
throwFromErrno("Unknown read method " + read_method, ErrorCodes::UNKNOWN_READ_METHOD);
throwFromErrno("Unknown read method " + read_method_string, ErrorCodes::UNKNOWN_READ_METHOD);
struct stat file_stat{};
if (use_table_fd)
{
nested_buffer = createReadBufferFromFileDescriptorBase(table_fd, read_settings);
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != fstat(table_fd, &file_stat))
throwFromErrno("Cannot stat table file descriptor, inside " + storage_name, ErrorCodes::CANNOT_STAT);
method = chooseCompressionMethod("", compression_method);
}
else
{
nested_buffer = createReadBufferFromFileBase(current_path, read_settings);
/// Check if file descriptor allows random reads (and reading it twice).
if (0 != stat(current_path.c_str(), &file_stat))
throwFromErrno("Cannot stat file " + current_path, ErrorCodes::CANNOT_STAT);
method = chooseCompressionMethod(current_path, compression_method);
}
bool mmap_failed = false;
if (S_ISREG(file_stat.st_mode) && read_method == LocalFSReadMethod::mmap)
{
try
{
if (use_table_fd)
nested_buffer = std::make_unique<MMapReadBufferFromFileDescriptor>(table_fd, 0);
else
nested_buffer = std::make_unique<MMapReadBufferFromFile>(current_path, 0);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMap);
}
catch (const ErrnoException &)
{
/// Fallback if mmap is not supported.
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMapFailed);
mmap_failed = true;
}
}
if (S_ISREG(file_stat.st_mode) && (read_method == LocalFSReadMethod::pread || mmap_failed))
{
if (use_table_fd)
nested_buffer = std::make_unique<ReadBufferFromFileDescriptorPRead>(table_fd);
else
nested_buffer = std::make_unique<ReadBufferFromFilePRead>(current_path, context->getSettingsRef().max_read_buffer_size);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
}
else
{
if (use_table_fd)
nested_buffer = std::make_unique<ReadBufferFromFileDescriptor>(table_fd);
else
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
}
/// For clickhouse-local and clickhouse-client add progress callback to display progress bar.
if (context->getApplicationType() == Context::ApplicationType::LOCAL
|| context->getApplicationType() == Context::ApplicationType::CLIENT)
@ -277,7 +331,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c
{
/// We will use PeekableReadBuffer to create a checkpoint, so we need a place
/// where we can store the original read buffer.
read_buffer_from_fd = createReadBuffer("", true, table_fd, compression_method, context);
read_buffer_from_fd = createReadBuffer("", true, getName(), table_fd, compression_method, context);
auto read_buf = std::make_unique<PeekableReadBuffer>(*read_buffer_from_fd);
read_buf->setCheckpoint();
return read_buf;
@ -326,7 +380,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
if (it == paths.end())
return nullptr;
return createReadBuffer(*it++, false, -1, compression_method, context);
return createReadBuffer(*it++, false, "File", -1, compression_method, context);
};
ColumnsDescription columns;
@ -543,7 +597,7 @@ public:
}
if (!read_buf)
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context);
auto format
= context->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size, storage->format_settings);