Merge pull request #26791 from ClickHouse/async-reads

Experiment with asynchronous readers
This commit is contained in:
alesapin 2021-08-31 13:17:45 +03:00 committed by GitHub
commit 525999145e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
97 changed files with 1505 additions and 602 deletions

View File

@ -18,9 +18,6 @@
<!-- One NUMA node w/o hyperthreading -->
<max_threads>12</max_threads>
<!-- mmap shows some improvements in perf tests -->
<min_bytes_to_use_mmap_io>64Mi</min_bytes_to_use_mmap_io>
<!-- disable jit for perf tests -->
<compile_expressions>0</compile_expressions>
<compile_aggregate_expressions>0</compile_aggregate_expressions>

View File

@ -44,7 +44,7 @@ void processTableFiles(const fs::path & path, const String & files_prefix, Strin
writeIntText(fs::file_size(file_path), metadata_buf);
writeChar('\n', metadata_buf);
auto src_buf = createReadBufferFromFileBase(file_path, fs::file_size(file_path), 0, 0, nullptr);
auto src_buf = createReadBufferFromFileBase(file_path, {}, fs::file_size(file_path));
auto dst_buf = create_dst_buf(remote_file_name);
copyData(*src_buf, *dst_buf);

View File

@ -41,7 +41,7 @@ std::unique_ptr<ReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
if (disk)
return disk->readFile(file_path);
else
return createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(file_path, {}, 0);
}
}

View File

@ -10,7 +10,7 @@ namespace
{
String readFile(const String & file_path)
{
auto buf = createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
auto buf = createReadBufferFromFileBase(file_path, {}, 0);
String s;
readStringUntilEOF(s, *buf);
return s;

View File

@ -76,6 +76,7 @@
M(ActiveAsyncDrainedConnections, "Number of active connections drained asynchronously.") \
M(SyncDrainedConnections, "Number of connections drained synchronously.") \
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
namespace CurrentMetrics
{

View File

@ -583,6 +583,8 @@
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \
M(614, POSTGRESQL_CONNECTION_FAILURE) \
M(615, CANNOT_ADVISE) \
M(616, UNKNOWN_READ_METHOD) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -251,6 +251,15 @@
\
M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \
M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \
\
M(ThreadPoolReaderPageCacheHit, "Number of times the read inside ThreadPoolReader was done from page cache.") \
M(ThreadPoolReaderPageCacheHitBytes, "Number of bytes read inside ThreadPoolReader when it was done from page cache.") \
M(ThreadPoolReaderPageCacheHitElapsedMicroseconds, "Time spent reading data from page cache in ThreadPoolReader.") \
M(ThreadPoolReaderPageCacheMiss, "Number of times the read inside ThreadPoolReader was not done from page cache and was hand off to thread pool.") \
M(ThreadPoolReaderPageCacheMissBytes, "Number of bytes read inside ThreadPoolReader when read was not done from page cache and was hand off to thread pool.") \
M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \
\
M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \
namespace ProfileEvents

View File

@ -173,7 +173,7 @@ void QueryProfilerBase<ProfilerImpl>::tryCleanup()
}
template class QueryProfilerBase<QueryProfilerReal>;
template class QueryProfilerBase<QueryProfilerCpu>;
template class QueryProfilerBase<QueryProfilerCPU>;
QueryProfilerReal::QueryProfilerReal(const UInt64 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_MONOTONIC, period, SIGUSR1)
@ -185,11 +185,11 @@ void QueryProfilerReal::signalHandler(int sig, siginfo_t * info, void * context)
writeTraceInfo(TraceType::Real, sig, info, context);
}
QueryProfilerCpu::QueryProfilerCpu(const UInt64 thread_id, const UInt32 period)
QueryProfilerCPU::QueryProfilerCPU(const UInt64 thread_id, const UInt32 period)
: QueryProfilerBase(thread_id, CLOCK_THREAD_CPUTIME_ID, period, SIGUSR2)
{}
void QueryProfilerCpu::signalHandler(int sig, siginfo_t * info, void * context)
void QueryProfilerCPU::signalHandler(int sig, siginfo_t * info, void * context)
{
DENY_ALLOCATIONS_IN_SCOPE;
writeTraceInfo(TraceType::CPU, sig, info, context);

View File

@ -26,7 +26,7 @@ namespace DB
* 3. write collected stack trace to trace_pipe for TraceCollector
*
* Destructor tries to unset timer and restore previous signal handler.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCpu.
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCPU.
*/
template <typename ProfilerImpl>
class QueryProfilerBase
@ -62,10 +62,10 @@ public:
};
/// Query profiler with timer based on CPU clock
class QueryProfilerCpu : public QueryProfilerBase<QueryProfilerCpu>
class QueryProfilerCPU : public QueryProfilerBase<QueryProfilerCPU>
{
public:
QueryProfilerCpu(const UInt64 thread_id, const UInt32 period);
QueryProfilerCPU(const UInt64 thread_id, const UInt32 period);
static void signalHandler(int sig, siginfo_t * info, void * context);
};

View File

@ -158,7 +158,7 @@ std::pair<bool, std::string> StudentTTest::compareAndReport(size_t confidence_le
if (mean_difference > mean_confidence_interval && (mean_difference - mean_confidence_interval > 0.0001)) /// difference must be more than 0.0001, to take into account connection latency.
{
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence : ";
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence: ";
ss << std::fixed << std::setprecision(8) << "mean difference is " << mean_difference << ", but confidence interval is " << mean_confidence_interval;
return {false, ss.str()};
}

View File

@ -29,7 +29,7 @@ namespace DB
class QueryStatus;
class ThreadStatus;
class QueryProfilerReal;
class QueryProfilerCpu;
class QueryProfilerCPU;
class QueryThreadLog;
struct OpenTelemetrySpanHolder;
class TasksStatsCounters;
@ -140,7 +140,7 @@ protected:
// CPU and Real time query profilers
std::unique_ptr<QueryProfilerReal> query_profiler_real;
std::unique_ptr<QueryProfilerCpu> query_profiler_cpu;
std::unique_ptr<QueryProfilerCPU> query_profiler_cpu;
Poco::Logger * log = nullptr;

View File

@ -32,7 +32,8 @@ protected:
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
///
/// If always_copy is true then even if the compressed block is already stored in compressed_in.buffer() it will be copied into own_compressed_buffer.
/// If always_copy is true then even if the compressed block is already stored in compressed_in.buffer()
/// it will be copied into own_compressed_buffer.
/// This is required for CheckingCompressedReadBuffer, since this is just a proxy.
///
/// Returns number of compressed bytes read.

View File

@ -36,6 +36,13 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
void CompressedReadBufferFromFile::prefetch()
{
file_in.prefetch();
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
@ -46,14 +53,11 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadB
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path,
const ReadSettings & settings,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache,
size_t buf_size,
bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache, buf_size))
, p_file_in(createReadBufferFromFileBase(path, settings, estimated_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;

View File

@ -1,7 +1,8 @@
#pragma once
#include "CompressedReadBufferBase.h"
#include <Compression/CompressedReadBufferBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <time.h>
#include <memory>
@ -28,13 +29,13 @@ private:
size_t size_compressed = 0;
bool nextImpl() override;
void prefetch() override;
public:
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, bool allow_different_codecs_ = false);
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -37,7 +37,7 @@ int main(int argc, char ** argv)
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(path, {}, 0);
},
&cache
);
@ -56,7 +56,7 @@ int main(int argc, char ** argv)
path,
[&]()
{
return createReadBufferFromFileBase(path, 0, 0, 0, nullptr);
return createReadBufferFromFileBase(path, {}, 0);
},
&cache
);

View File

@ -3,6 +3,7 @@
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
#include <IO/ReadSettings.h>
namespace Poco::Util
@ -499,6 +500,11 @@ class IColumn;
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable', 'disable', 'force_enable'", 0) \
\
M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

View File

@ -88,19 +88,16 @@ std::shared_ptr<FileDownloadMetadata> DiskCacheWrapper::acquireDownloadMetadata(
std::unique_ptr<ReadBufferFromFileBase>
DiskCacheWrapper::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
const ReadSettings & settings,
size_t estimated_size) const
{
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, settings, estimated_size);
LOG_DEBUG(log, "Read file {} from cache", backQuote(path));
if (cache_disk->exists(path))
return cache_disk->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return cache_disk->readFile(path, settings, estimated_size);
auto metadata = acquireDownloadMetadata(path);
@ -134,8 +131,8 @@ DiskCacheWrapper::readFile(
auto tmp_path = path + ".tmp";
{
auto src_buffer = DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
auto dst_buffer = cache_disk->writeFile(tmp_path, buf_size, WriteMode::Rewrite);
auto src_buffer = DiskDecorator::readFile(path, settings, estimated_size);
auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite);
copyData(*src_buffer, *dst_buffer);
}
cache_disk->moveFile(tmp_path, path);
@ -158,9 +155,9 @@ DiskCacheWrapper::readFile(
}
if (metadata->status == DOWNLOADED)
return cache_disk->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return cache_disk->readFile(path, settings, estimated_size);
return DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return DiskDecorator::readFile(path, settings, estimated_size);
}
std::unique_ptr<WriteBufferFromFileBase>
@ -180,7 +177,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
[this, path, buf_size, mode]()
{
/// Copy file from cache to actual disk when cached buffer is finalized.
auto src_buffer = cache_disk->readFile(path, buf_size, 0, 0, 0, nullptr);
auto src_buffer = cache_disk->readFile(path, ReadSettings(), 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();

View File

@ -36,11 +36,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -115,9 +115,9 @@ void DiskDecorator::listFiles(const String & path, std::vector<String> & file_na
std::unique_ptr<ReadBufferFromFileBase>
DiskDecorator::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
const String & path, const ReadSettings & settings, size_t estimated_size) const
{
return delegate->readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
return delegate->readFile(path, settings, estimated_size);
}
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -37,11 +37,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -64,7 +61,8 @@ public:
void sync(int fd) const;
String getUniqueId(const String & path) const override { return delegate->getUniqueId(path); }
bool checkUniqueId(const String & id) const override { return delegate->checkUniqueId(id); }
DiskType::Type getType() const override { return delegate->getType(); }
DiskType getType() const override { return delegate->getType(); }
bool isRemote() const override { return delegate->isRemote(); }
bool supportZeroCopyReplication() const override { return delegate->supportZeroCopyReplication(); }
void onFreeze(const String & path) override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -238,18 +238,15 @@ void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk>
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
const ReadSettings & settings,
size_t estimated_size) const
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
auto settings = current_settings.get();
auto buffer = delegate->readFile(wrapped_path, settings, estimated_size);
auto encryption_settings = current_settings.get();
FileEncryption::Header header = readHeader(*buffer);
String key = getKey(path, header, *settings);
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header);
String key = getKey(path, header, *encryption_settings);
return std::make_unique<ReadBufferFromEncryptedFile>(settings.local_fs_buffer_size, std::move(buffer), key, header);
}
std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String & path, size_t buf_size, WriteMode mode)
@ -265,7 +262,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String &
if (old_file_size)
{
/// Append mode: we continue to use the same header.
auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize);
auto read_buffer = delegate->readFile(wrapped_path, ReadSettings().adjustBufferSize(FileEncryption::Header::kSize));
header = readHeader(*read_buffer);
key = getKey(path, header, *settings);
}

View File

@ -121,11 +121,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -215,7 +212,8 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override;
DiskType::Type getType() const override { return DiskType::Type::Encrypted; }
DiskType getType() const override { return DiskType::Encrypted; }
bool isRemote() const override { return delegate->isRemote(); }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

View File

@ -259,11 +259,9 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path)
fs::rename(from_file, to_file);
}
std::unique_ptr<ReadBufferFromFileBase>
DiskLocal::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache) const
std::unique_ptr<ReadBufferFromFileBase> DiskLocal::readFile(const String & path, const ReadSettings & settings, size_t estimated_size) const
{
return createReadBufferFromFileBase(fs::path(disk_path) / path, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache, buf_size);
return createReadBufferFromFileBase(fs::path(disk_path) / path, settings, estimated_size);
}
std::unique_ptr<WriteBufferFromFileBase>

View File

@ -73,11 +73,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -99,7 +96,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType::Type getType() const override { return DiskType::Type::Local; }
DiskType getType() const override { return DiskType::Local; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }

View File

@ -313,7 +313,7 @@ void DiskMemory::replaceFileImpl(const String & from_path, const String & to_pat
files.insert(std::move(node));
}
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskMemory::readFile(const String & path, const ReadSettings &, size_t) const
{
std::lock_guard lock(mutex);

View File

@ -64,11 +64,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -90,7 +87,8 @@ public:
void truncateFile(const String & path, size_t size) override;
DiskType::Type getType() const override { return DiskType::Type::RAM; }
DiskType getType() const override { return DiskType::RAM; }
bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }

View File

@ -187,11 +187,10 @@ void DiskRestartProxy::listFiles(const String & path, std::vector<String> & file
}
std::unique_ptr<ReadBufferFromFileBase> DiskRestartProxy::readFile(
const String & path, size_t buf_size, size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache)
const
const String & path, const ReadSettings & settings, size_t estimated_size) const
{
ReadLock lock (mutex);
auto impl = DiskDecorator::readFile(path, buf_size, estimated_size, direct_io_threshold, mmap_threshold, mmap_cache);
auto impl = DiskDecorator::readFile(path, settings, estimated_size);
return std::make_unique<RestartAwareReadBuffer>(*this, std::move(impl));
}

View File

@ -45,11 +45,8 @@ public:
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;

View File

@ -5,37 +5,34 @@
namespace DB
{
struct DiskType
enum class DiskType
{
enum class Type
{
Local,
RAM,
S3,
HDFS,
Encrypted,
WebServer
};
WebServer,
};
static String toString(Type disk_type)
{
inline String toString(DiskType disk_type)
{
switch (disk_type)
{
case Type::Local:
case DiskType::Local:
return "local";
case Type::RAM:
case DiskType::RAM:
return "memory";
case Type::S3:
case DiskType::S3:
return "s3";
case Type::HDFS:
case DiskType::HDFS:
return "hdfs";
case Type::Encrypted:
case DiskType::Encrypted:
return "encrypted";
case Type::WebServer:
case DiskType::WebServer:
return "web";
}
__builtin_unreachable();
}
};
}
}

View File

@ -226,7 +226,7 @@ bool DiskWebServer::exists(const String & path) const
}
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
LOG_DEBUG(log, "Read from file by path: {}", path);
@ -237,7 +237,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
RemoteMetadata meta(uri, fs::path(path).parent_path() / fs::path(path).filename());
meta.remote_fs_objects.emplace_back(std::make_pair(getFileName(path), file.size));
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, buf_size);
auto reader = std::make_unique<ReadBufferFromWebServer>(uri, meta, getContext(), settings->max_read_tries, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}

View File

@ -107,14 +107,14 @@ public:
String getFileName(const String & path) const;
DiskType::Type getType() const override { return DiskType::Type::WebServer; }
DiskType getType() const override { return DiskType::WebServer; }
bool isRemote() const override { return true; }
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
/// Disk info
const String & getName() const final override { return name; }

View File

@ -93,15 +93,15 @@ DiskHDFS::DiskHDFS(
}
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
auto metadata = readMeta(path);
LOG_DEBUG(log,
LOG_TRACE(log,
"Read from file by path: {}. Existing HDFS objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromHDFS>(config, remote_fs_root_path, metadata, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromHDFS>(config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
@ -114,7 +114,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
auto file_name = getRandomName();
auto hdfs_path = remote_fs_root_path + file_name;
LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
LOG_TRACE(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
backQuote(metadata_path + path), hdfs_path);
/// Single O_WRONLY in libhdfs adds O_TRUNC

View File

@ -42,17 +42,15 @@ public:
const String & metadata_path_,
const Poco::Util::AbstractConfiguration & config_);
DiskType::Type getType() const override { return DiskType::Type::HDFS; }
DiskType getType() const override { return DiskType::HDFS; }
bool isRemote() const override { return true; }
bool supportZeroCopyReplication() const override { return true; }
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;

View File

@ -8,24 +8,34 @@
#include <Common/Exception.h>
#include <Disks/Executor.h>
#include <Disks/DiskType.h>
#include <IO/ReadSettings.h>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include "Poco/Util/AbstractConfiguration.h"
#include <Poco/Timestamp.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace CurrentMetrics
{
extern const Metric DiskSpaceReservedForMerge;
extern const Metric DiskSpaceReservedForMerge;
}
namespace DB
{
class IDiskDirectoryIterator;
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
@ -155,11 +165,8 @@ public:
/// Open the file for read and return ReadBufferFromFileBase object.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
size_t estimated_size = 0,
size_t direct_io_threshold = 0,
size_t mmap_threshold = 0,
MMappedFileCache * mmap_cache = nullptr) const = 0;
const ReadSettings & settings = ReadSettings{},
size_t estimated_size = 0) const = 0;
/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(
@ -210,7 +217,10 @@ public:
virtual void truncateFile(const String & path, size_t size);
/// Return disk type - "local", "s3", etc.
virtual DiskType::Type getType() const = 0;
virtual DiskType getType() const = 0;
/// Involves network interaction.
virtual bool isRemote() const = 0;
/// Whether this disk support zero-copy replication.
/// Overrode in remote fs disks.
@ -240,7 +250,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) { }
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
protected:
friend class DiskDecorator;

View File

@ -39,7 +39,6 @@ public:
/// mutations files
virtual DiskPtr getAnyDisk() const = 0;
virtual DiskPtr getDiskByName(const String & disk_name) const = 0;
virtual Disks getDisksByType(DiskType::Type type) const = 0;
/// Get free space from most free disk
virtual UInt64 getMaxUnreservedFreeSpace() const = 0;
/// Reserves space on any volume with index > min_volume_index or returns nullptr

View File

@ -184,7 +184,7 @@ void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
if (s3_paths_keeper)
s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk)
{
LOG_DEBUG(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk));
LOG_TRACE(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk));
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(chunk);
/// TODO: Make operation idempotent. Do not throw exception if key is already deleted.
@ -221,15 +221,16 @@ void DiskS3::moveFile(const String & from_path, const String & to_path, bool sen
fs::rename(fs::path(metadata_path) / from_path, fs::path(metadata_path) / to_path);
}
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, const ReadSettings & read_settings, size_t) const
{
auto settings = current_settings.get();
auto metadata = readMeta(path);
LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}",
LOG_TRACE(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromS3>(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size);
auto reader = std::make_unique<ReadIndirectBufferFromS3>(
settings->client, bucket, metadata, settings->s3_max_single_read_retries, read_settings.remote_fs_buffer_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
@ -251,7 +252,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
s3_path = "r" + revisionToString(revision) + "-file-" + s3_path;
}
LOG_DEBUG(log, "{} to file by path: {}. S3 path: {}",
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + s3_path);
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
@ -351,7 +352,7 @@ void DiskS3::findLastRevision()
{
auto revision_prefix = revision + "1";
LOG_DEBUG(log, "Check object exists with revision prefix {}", revision_prefix);
LOG_TRACE(log, "Check object exists with revision prefix {}", revision_prefix);
/// Check file or operation with such revision prefix exists.
if (checkObjectExists(bucket, remote_fs_root_path + "r" + revision_prefix)
@ -405,7 +406,7 @@ void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & met
void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_DEBUG(log, "Migrate file {} to restorable schema", metadata_path + path);
LOG_TRACE(log, "Migrate file {} to restorable schema", metadata_path + path);
auto meta = readMeta(path);
@ -422,7 +423,7 @@ void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & r
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_DEBUG(log, "Migrate directory {} to restorable schema", metadata_path + path);
LOG_TRACE(log, "Migrate directory {} to restorable schema", metadata_path + path);
bool dir_contains_only_files = true;
for (auto it = iterateDirectory(path); it->isValid(); it->next())
@ -595,7 +596,7 @@ void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & s
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
{
LOG_DEBUG(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
LOG_TRACE(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
src_bucket, src_key, dst_bucket, dst_key, metadata ? "REPLACE" : "NOT_SET");
auto settings = current_settings.get();
@ -669,7 +670,7 @@ void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & s
throwIfError(outcome);
LOG_DEBUG(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
LOG_TRACE(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
"Upload_id: {}, Parts: {}", src_bucket, src_key, dst_bucket, dst_key, multipart_upload_id, part_tags.size());
}
}
@ -871,7 +872,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();
LOG_DEBUG(log, "Restored file {}", path);
LOG_TRACE(log, "Restored file {}", path);
}
}
@ -918,7 +919,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
if (exists(from_path))
{
moveFile(from_path, to_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
LOG_TRACE(log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path);
if (restore_information.detached && isDirectory(to_path))
{
@ -945,7 +946,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
{
createDirectories(directoryPath(dst_path));
createHardLink(src_path, dst_path, send_metadata);
LOG_DEBUG(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
LOG_TRACE(log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path);
}
}
}
@ -976,7 +977,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio
auto detached_path = pathToDetached(path);
LOG_DEBUG(log, "Move directory to 'detached' {} -> {}", path, detached_path);
LOG_TRACE(log, "Move directory to 'detached' {} -> {}", path, detached_path);
fs::path from_path = fs::path(metadata_path) / path;
fs::path to_path = fs::path(metadata_path) / detached_path;

View File

@ -76,11 +76,8 @@ public:
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
const ReadSettings & settings,
size_t estimated_size) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
@ -97,7 +94,8 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
void createHardLink(const String & src_path, const String & dst_path, bool send_metadata);
DiskType::Type getType() const override { return DiskType::Type::S3; }
DiskType getType() const override { return DiskType::S3; }
bool isRemote() const override { return true; }
bool supportZeroCopyReplication() const override { return true; }

View File

@ -39,7 +39,7 @@ void checkWriteAccess(IDisk & disk)
void checkReadAccess(const String & disk_name, IDisk & disk)
{
auto file = disk.readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
auto file = disk.readFile("test_acl");
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")

View File

@ -157,17 +157,6 @@ Disks StoragePolicy::getDisks() const
}
Disks StoragePolicy::getDisksByType(DiskType::Type type) const
{
Disks res;
for (const auto & volume : volumes)
for (const auto & disk : volume->getDisks())
if (disk->getType() == type)
res.push_back(disk);
return res;
}
DiskPtr StoragePolicy::getAnyDisk() const
{
/// StoragePolicy must contain at least one Volume

View File

@ -47,9 +47,6 @@ public:
/// Returns disks ordered by volumes priority
Disks getDisks() const override;
/// Returns disks by type ordered by volumes priority
Disks getDisksByType(DiskType::Type type) const override;
/// Returns any disk
/// Used when it's not important, for example for
/// mutations files

View File

@ -53,7 +53,7 @@ TEST(DiskTestHDFS, WriteReadHDFS)
{
DB::String result;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
auto in = disk.readFile(file_name, {}, 1024);
readString(result, *in);
EXPECT_EQ("Test write to file", result);
}
@ -76,7 +76,7 @@ TEST(DiskTestHDFS, RewriteFileHDFS)
{
String result;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
auto in = disk.readFile(file_name, {}, 1024);
readString(result, *in);
EXPECT_EQ("Text10", result);
readString(result, *in);
@ -104,7 +104,7 @@ TEST(DiskTestHDFS, AppendFileHDFS)
{
String result, expected;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
auto in = disk.readFile(file_name, {}, 1024);
readString(result, *in);
EXPECT_EQ("Text0123456789", result);
@ -131,7 +131,7 @@ TEST(DiskTestHDFS, SeekHDFS)
/// Test SEEK_SET
{
String buf(4, '0');
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024);
in->seek(5, SEEK_SET);
@ -141,7 +141,7 @@ TEST(DiskTestHDFS, SeekHDFS)
/// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, {}, 1024);
String buf(4, '0');
in->readStrict(buf.data(), 4);

View File

@ -6,6 +6,7 @@
# include <sys/syscall.h>
# include <unistd.h>
# include <utility>
/** Small wrappers for asynchronous I/O.
@ -50,9 +51,21 @@ AIOContext::AIOContext(unsigned int nr_events)
AIOContext::~AIOContext()
{
if (ctx)
io_destroy(ctx);
}
AIOContext::AIOContext(AIOContext && rhs)
{
*this = std::move(rhs);
}
AIOContext & AIOContext::operator=(AIOContext && rhs)
{
std::swap(ctx, rhs.ctx);
return *this;
}
#elif defined(OS_FREEBSD)
# include <Common/Exception.h>

View File

@ -33,10 +33,13 @@ int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event * events,
struct AIOContext : private boost::noncopyable
{
aio_context_t ctx;
aio_context_t ctx = 0;
AIOContext(unsigned int nr_events = 128);
AIOContext() {}
AIOContext(unsigned int nr_events);
~AIOContext();
AIOContext(AIOContext && rhs);
AIOContext & operator=(AIOContext && rhs);
};
#elif defined(OS_FREEBSD)

View File

@ -1,172 +0,0 @@
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <Common/MemorySanitizer.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp>
#include <errno.h>
#include <IO/AIOContextPool.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
}
AIOContextPool::~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
io_completion_monitor.join();
}
void AIOContextPool::doMonitor()
{
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void AIOContextPool::waitForCompletion()
{
/// array to hold completion events
std::vector<io_event> events(max_concurrent_events);
try
{
const auto num_events = getCompletionEvents(events.data(), max_concurrent_events);
fulfillPromises(events.data(), num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::waitForCompletion()");
}
}
int AIOContextPool::getCompletionEvents(io_event events[], const int max_events) const
{
timespec timeout{timeout_sec, 0};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno);
/// Unpoison the memory returned from a non-instrumented system call.
__msan_unpoison(events, sizeof(*events) * num_events);
return num_events;
}
void AIOContextPool::fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
#if defined(__FreeBSD__)
const auto completed_id = (reinterpret_cast<struct iocb *>(event.udata))->aio_data;
#else
const auto completed_id = event.data;
#endif
/// set value via promise and release it
const auto it = promises.find(completed_id);
if (it == std::end(promises))
{
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id {}", completed_id);
continue;
}
#if defined(__FreeBSD__)
it->second.set_value(aio_return(reinterpret_cast<struct aiocb *>(event.udata)));
#else
it->second.set_value(event.res);
#endif
promises.erase(it);
}
}
void AIOContextPool::notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void AIOContextPool::reportExceptionToAnyProducer()
{
const std::lock_guard lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb)
{
std::unique_lock lock{mutex};
/// get current id and increment it by one
const auto request_id = next_id;
++next_id;
/// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<BytesRead>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
struct iocb * requests[] { &iocb };
/// submit a request
while (io_submit(aio_context.ctx, 1, requests) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
return promises[request_id].get_future();
}
AIOContextPool & AIOContextPool::instance()
{
static AIOContextPool instance;
return instance;
}
}
#endif

View File

@ -1,53 +0,0 @@
#pragma once
#if defined(OS_LINUX) || defined(__FreeBSD__)
#include <condition_variable>
#include <future>
#include <mutex>
#include <map>
#include <IO/AIO.h>
#include <Common/ThreadPool.h>
namespace DB
{
class AIOContextPool : private boost::noncopyable
{
static const auto max_concurrent_events = 128;
static const auto timeout_sec = 1;
AIOContext aio_context{max_concurrent_events};
using ID = size_t;
using BytesRead = ssize_t;
/// Autoincremental id used to identify completed requests
ID next_id{};
mutable std::mutex mutex;
mutable std::condition_variable have_resources;
std::map<ID, std::promise<BytesRead>> promises;
std::atomic<bool> cancelled{false};
ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool();
void doMonitor();
void waitForCompletion();
int getCompletionEvents(io_event events[], const int max_events) const;
void fulfillPromises(const io_event events[], const int num_events);
void notifyProducers(const int num_producers) const;
void reportExceptionToAnyProducer();
public:
static AIOContextPool & instance();
/// Request AIO read operation for iocb, returns a future with number of bytes read
std::future<BytesRead> post(struct iocb & iocb);
};
}
#endif

View File

@ -0,0 +1,106 @@
#include <fcntl.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/ProfileEvents.h>
#include <errno.h>
namespace ProfileEvents
{
extern const Event FileOpen;
}
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_CLOSE_FILE;
}
AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_,
Int32 priority_,
const std::string & file_name_,
size_t buf_size,
int flags,
char * existing_memory,
size_t alignment)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment), file_name(file_name_)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
#ifdef __APPLE__
bool o_direct = (flags != -1) && (flags & O_DIRECT);
if (o_direct)
flags = flags & ~O_DIRECT;
#endif
fd = ::open(file_name.c_str(), flags == -1 ? O_RDONLY | O_CLOEXEC : flags | O_CLOEXEC);
if (-1 == fd)
throwFromErrnoWithPath("Cannot open file " + file_name, file_name,
errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
#ifdef __APPLE__
if (o_direct)
{
if (fcntl(fd, F_NOCACHE, 1) == -1)
throwFromErrnoWithPath("Cannot set F_NOCACHE on file " + file_name, file_name, ErrorCodes::CANNOT_OPEN_FILE);
}
#endif
}
AsynchronousReadBufferFromFile::AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_,
Int32 priority_,
int & fd_,
const std::string & original_file_name,
size_t buf_size,
char * existing_memory,
size_t alignment)
:
AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, fd_, buf_size, existing_memory, alignment),
file_name(original_file_name.empty() ? "(fd = " + toString(fd_) + ")" : original_file_name)
{
fd_ = -1;
}
AsynchronousReadBufferFromFile::~AsynchronousReadBufferFromFile()
{
/// Must wait for events in flight before closing the file.
finalize();
if (fd < 0)
return;
::close(fd);
}
void AsynchronousReadBufferFromFile::close()
{
if (fd < 0)
return;
if (0 != ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
fd = -1;
}
AsynchronousReadBufferFromFileWithDescriptorsCache::~AsynchronousReadBufferFromFileWithDescriptorsCache()
{
/// Must wait for events in flight before potentially closing the file by destroying OpenedFilePtr.
finalize();
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/OpenedFileCache.h>
namespace DB
{
class AsynchronousReadBufferFromFile : public AsynchronousReadBufferFromFileDescriptor
{
protected:
std::string file_name;
public:
explicit AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_, Int32 priority_,
const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0);
/// Use pre-opened file descriptor.
explicit AsynchronousReadBufferFromFile(
AsynchronousReaderPtr reader_, Int32 priority_,
int & fd, /// Will be set to -1 if constructor didn't throw and ownership of file descriptor is passed to the object.
const std::string & original_file_name = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr, size_t alignment = 0);
~AsynchronousReadBufferFromFile() override;
/// Close file before destruction of object.
void close();
std::string getFileName() const override
{
return file_name;
}
};
/** Similar to AsynchronousReadBufferFromFile but also transparently shares open file descriptors.
*/
class AsynchronousReadBufferFromFileWithDescriptorsCache : public AsynchronousReadBufferFromFileDescriptor
{
private:
std::string file_name;
OpenedFileCache::OpenedFilePtr file;
public:
AsynchronousReadBufferFromFileWithDescriptorsCache(
AsynchronousReaderPtr reader_, Int32 priority_,
const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: AsynchronousReadBufferFromFileDescriptor(std::move(reader_), priority_, -1, buf_size, existing_memory, alignment),
file_name(file_name_)
{
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();
}
~AsynchronousReadBufferFromFileWithDescriptorsCache() override;
std::string getFileName() const override
{
return file_name;
}
};
}

View File

@ -0,0 +1,204 @@
#include <errno.h>
#include <time.h>
#include <optional>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <IO/AsynchronousReadBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event AsynchronousReadWaitMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric AsynchronousReadWait;
}
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
std::string AsynchronousReadBufferFromFileDescriptor::getFileName() const
{
return "(fd = " + toString(fd) + ")";
}
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromFileDescriptor::readInto(char * data, size_t size)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<IAsynchronousReader::LocalFileDescriptor>(fd);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = priority;
return reader->submit(request);
}
void AsynchronousReadBufferFromFileDescriptor::prefetch()
{
if (prefetch_future.valid())
return;
/// Will request the same amount of data that is read in nextImpl.
prefetch_buffer.resize(internal_buffer.size());
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
}
bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{
if (prefetch_future.valid())
{
/// Read request already in flight. Wait for its completion.
size_t size = 0;
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait};
size = prefetch_future.get();
ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds());
}
prefetch_future = {};
file_offset_of_buffer_end += size;
if (size)
{
prefetch_buffer.swap(memory);
set(memory.data(), memory.size());
working_buffer.resize(size);
return true;
}
return false;
}
else
{
/// No pending request. Do synchronous read.
auto size = readInto(memory.data(), memory.size()).get();
file_offset_of_buffer_end += size;
if (size)
{
set(memory.data(), memory.size());
working_buffer.resize(size);
return true;
}
return false;
}
}
void AsynchronousReadBufferFromFileDescriptor::finalize()
{
if (prefetch_future.valid())
{
prefetch_future.wait();
prefetch_future = {};
}
}
AsynchronousReadBufferFromFileDescriptor::~AsynchronousReadBufferFromFileDescriptor()
{
finalize();
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
size_t new_pos;
if (whence == SEEK_SET)
{
assert(offset >= 0);
new_pos = offset;
}
else if (whence == SEEK_CUR)
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
}
else
{
throw Exception("ReadBufferFromFileDescriptor::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return new_pos;
}
else
{
if (prefetch_future.valid())
{
//std::cerr << "Ignoring prefetched data" << "\n";
prefetch_future.wait();
prefetch_future = {};
}
/// Position is out of the buffer, we need to do real seek.
off_t seek_pos = required_alignment > 1
? new_pos / required_alignment * required_alignment
: new_pos;
off_t offset_after_seek_pos = new_pos - seek_pos;
/// First put position at the end of the buffer so the next read will fetch new data to the buffer.
pos = working_buffer.end();
/// Just update the info about the next position in file.
file_offset_of_buffer_end = seek_pos;
if (offset_after_seek_pos > 0)
ignore(offset_after_seek_pos);
return seek_pos;
}
}
void AsynchronousReadBufferFromFileDescriptor::rewind()
{
if (prefetch_future.valid())
{
prefetch_future.wait();
prefetch_future = {};
}
/// Clearing the buffer with existing data. New data will be read on subsequent call to 'next'.
working_buffer.resize(0);
pos = working_buffer.begin();
file_offset_of_buffer_end = 0;
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
#include <IO/AsynchronousReader.h>
#include <Interpreters/Context.h>
#include <optional>
#include <unistd.h>
namespace DB
{
/** Use ready file descriptor. Does not open or close a file.
*/
class AsynchronousReadBufferFromFileDescriptor : public ReadBufferFromFileBase
{
protected:
AsynchronousReaderPtr reader;
Int32 priority;
Memory<> prefetch_buffer;
std::future<IAsynchronousReader::Result> prefetch_future;
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
int fd;
bool nextImpl() override;
/// Name or some description of file.
std::string getFileName() const override;
void finalize();
public:
AsynchronousReadBufferFromFileDescriptor(
AsynchronousReaderPtr reader_, Int32 priority_,
int fd_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment),
reader(std::move(reader_)), priority(priority_), required_alignment(alignment), fd(fd_)
{
}
~AsynchronousReadBufferFromFileDescriptor() override;
void prefetch() override;
int getFD() const
{
return fd;
}
off_t getPosition() override
{
return file_offset_of_buffer_end - (working_buffer.end() - pos);
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t seek(off_t off, int whence) override;
/// Seek to the beginning, discarding already read data if any. Useful to reread file that changes on every read.
void rewind();
private:
std::future<IAsynchronousReader::Result> readInto(char * data, size_t size);
};
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <Core/Types.h>
#include <optional>
#include <memory>
#include <future>
namespace DB
{
/** Interface for asynchronous reads from file descriptors.
* It can abstract Linux AIO, io_uring or normal reads from separate thread pool,
* and also reads from non-local filesystems.
* The implementation not necessarily to be efficient for large number of small requests,
* instead it should be ok for moderate number of sufficiently large requests
* (e.g. read 1 MB of data 50 000 times per seconds; BTW this is normal performance for reading from page cache).
* For example, this interface may not suffice if you want to serve 10 000 000 of 4 KiB requests per second.
* This interface is fairly limited.
*/
class IAsynchronousReader
{
public:
/// For local filesystems, the file descriptor is simply integer
/// but it can be arbitrary opaque object for remote filesystems.
struct IFileDescriptor
{
virtual ~IFileDescriptor() = default;
};
using FileDescriptorPtr = std::shared_ptr<IFileDescriptor>;
struct LocalFileDescriptor : public IFileDescriptor
{
LocalFileDescriptor(int fd_) : fd(fd_) {}
int fd;
};
/// Read from file descriptor at specified offset up to size bytes into buf.
/// Some implementations may require alignment and it is responsibility of
/// the caller to provide conforming requests.
struct Request
{
FileDescriptorPtr descriptor;
size_t offset = 0;
size_t size = 0;
char * buf = nullptr;
int64_t priority = 0;
};
/// Less than requested amount of data can be returned.
/// If size is zero - the file has ended.
/// (for example, EINTR must be handled by implementation automatically)
using Result = size_t;
/// Submit request and obtain a handle. This method don't perform any waits.
/// If this method did not throw, the caller must wait for the result with 'wait' method
/// or destroy the whole reader before destroying the buffer for request.
/// The method can be called concurrently from multiple threads.
virtual std::future<Result> submit(Request request) = 0;
/// Destructor must wait for all not completed request and ignore the results.
/// It may also cancel the requests.
virtual ~IAsynchronousReader() = default;
};
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
}

View File

@ -48,18 +48,22 @@ struct Memory : boost::noncopyable, Allocator
dealloc();
}
Memory(Memory && rhs) noexcept
{
*this = std::move(rhs);
}
Memory & operator=(Memory && rhs) noexcept
void swap(Memory & rhs) noexcept
{
std::swap(m_capacity, rhs.m_capacity);
std::swap(m_size, rhs.m_size);
std::swap(m_data, rhs.m_data);
std::swap(alignment, rhs.alignment);
}
Memory(Memory && rhs) noexcept
{
swap(rhs);
}
Memory & operator=(Memory && rhs) noexcept
{
swap(rhs);
return *this;
}

View File

@ -65,6 +65,12 @@ public:
it->second = res;
return res;
}
static OpenedFileCache & instance()
{
static OpenedFileCache res;
return res;
}
};
using OpenedFileCachePtr = std::shared_ptr<OpenedFileCache>;

View File

@ -197,6 +197,11 @@ public:
return read(to, n);
}
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
* It's used for asynchronous readers with double-buffering.
*/
virtual void prefetch() {}
protected:
/// The number of bytes to ignore from the initial position of `working_buffer`
/// buffer. Apparently this is an additional out-parameter for nextImpl(),

View File

@ -88,7 +88,4 @@ void ReadBufferFromFile::close()
metric_increment.destroy();
}
OpenedFileCache ReadBufferFromFilePReadWithCache::cache;
}

View File

@ -4,10 +4,6 @@
#include <IO/OpenedFileCache.h>
#include <Common/CurrentMetrics.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace CurrentMetrics
{
@ -65,21 +61,19 @@ public:
/** Similar to ReadBufferFromFilePRead but also transparently shares open file descriptors.
*/
class ReadBufferFromFilePReadWithCache : public ReadBufferFromFileDescriptorPRead
class ReadBufferFromFilePReadWithDescriptorsCache : public ReadBufferFromFileDescriptorPRead
{
private:
static OpenedFileCache cache;
std::string file_name;
OpenedFileCache::OpenedFilePtr file;
public:
ReadBufferFromFilePReadWithCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
ReadBufferFromFilePReadWithDescriptorsCache(const std::string & file_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, int flags = -1,
char * existing_memory = nullptr, size_t alignment = 0)
: ReadBufferFromFileDescriptorPRead(-1, buf_size, existing_memory, alignment),
file_name(file_name_)
{
file = cache.get(file_name, flags);
file = OpenedFileCache::instance().get(file_name, flags);
fd = file->getFD();
}

View File

@ -7,8 +7,14 @@
#include <functional>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace DB
{

View File

@ -6,13 +6,9 @@
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <sys/stat.h>
#include <Common/UnicodeBar.h>
#include <Common/TerminalSize.h>
#include <IO/Operators.h>
#include <IO/Progress.h>
#include <sys/stat.h>
namespace ProfileEvents
@ -39,6 +35,7 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_SELECT;
extern const int CANNOT_FSTAT;
extern const int CANNOT_ADVISE;
}
@ -111,6 +108,20 @@ bool ReadBufferFromFileDescriptor::nextImpl()
}
void ReadBufferFromFileDescriptor::prefetch()
{
#if defined(POSIX_FADV_WILLNEED)
/// For direct IO, loading data into page cache is pointless.
if (required_alignment)
return;
/// Ask OS to prefetch data into page cache.
if (0 != posix_fadvise(fd, file_offset_of_buffer_end, internal_buffer.size(), POSIX_FADV_WILLNEED))
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif
}
/// If 'offset' is small enough to stay in buffer after seek, then true seek in file does not happen.
off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
{
@ -133,16 +144,15 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
/// file_offset_of_buffer_end corresponds to working_buffer.end(); it's a past-the-end pos,
/// so the second inequality is strict.
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos < file_offset_of_buffer_end)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
pos = working_buffer.end() - file_offset_of_buffer_end + new_pos;
assert(pos >= working_buffer.begin());
assert(pos < working_buffer.end());
assert(pos <= working_buffer.end());
return new_pos;
}

View File

@ -21,6 +21,7 @@ protected:
int fd;
bool nextImpl() override;
void prefetch() override;
/// Name or some description of file.
std::string getFileName() const override;

View File

@ -76,7 +76,7 @@ bool ReadBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
LOG_DEBUG(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
bucket, key, getPosition(), attempt, e.message());
if (attempt + 1 == max_single_read_retries)

32
src/IO/ReadSettings.cpp Normal file
View File

@ -0,0 +1,32 @@
#include <IO/ReadSettings.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_READ_METHOD;
}
const char * toString(ReadMethod read_method)
{
switch (read_method)
{
#define CASE_READ_METHOD(NAME) case ReadMethod::NAME: return #NAME;
FOR_EACH_READ_METHOD(CASE_READ_METHOD)
#undef CASE_READ_METHOD
}
__builtin_unreachable();
}
ReadMethod parseReadMethod(const std::string & name)
{
#define CASE_READ_METHOD(NAME) if (name == #NAME) return ReadMethod::NAME;
FOR_EACH_READ_METHOD(CASE_READ_METHOD)
#undef CASE_READ_METHOD
throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}'", name);
}
}

80
src/IO/ReadSettings.h Normal file
View File

@ -0,0 +1,80 @@
#pragma once
#include <cstddef>
#include <string>
#include <Core/Defines.h>
namespace DB
{
#define FOR_EACH_READ_METHOD(M) \
/** Simple synchronous reads with 'read'. \
Can use direct IO after specified size. Can use prefetch by asking OS to perform readahead. */ \
M(read) \
\
/** Simple synchronous reads with 'pread'. \
In contrast to 'read', shares single file descriptor from multiple threads. \
Can use direct IO after specified size. Can use prefetch by asking OS to perform readahead. */ \
M(pread) \
\
/** Use mmap after specified size or simple synchronous reads with 'pread'. \
Can use prefetch by asking OS to perform readahead. */ \
M(mmap) \
\
/** Checks if data is in page cache with 'preadv2' on modern Linux kernels. \
If data is in page cache, read from the same thread. \
If not, offload IO to separate threadpool. \
Can do prefetch with double buffering. \
Can use specified priorities and limit the number of concurrent reads. */ \
M(pread_threadpool) \
\
/** It's using asynchronous reader with fake backend that in fact synchronous. \
Only used for testing purposes. */ \
M(pread_fake_async) \
enum class ReadMethod
{
#define DEFINE_READ_METHOD(NAME) NAME,
FOR_EACH_READ_METHOD(DEFINE_READ_METHOD)
#undef DEFINE_READ_METHOD
};
const char * toString(ReadMethod read_method);
ReadMethod parseReadMethod(const std::string & name);
class MMappedFileCache;
struct ReadSettings
{
/// Method to use reading from local filesystem.
ReadMethod local_fs_method = ReadMethod::pread;
size_t local_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
size_t remote_fs_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
bool local_fs_prefetch = false;
bool remote_fs_prefetch = false;
/// For 'read', 'pread' and 'pread_threadpool' methods.
size_t direct_io_threshold = 0;
/// For 'mmap' method.
size_t mmap_threshold = 0;
MMappedFileCache * mmap_cache = nullptr;
/// For 'pread_threadpool' method. Lower is more priority.
size_t priority = 0;
ReadSettings adjustBufferSize(size_t file_size) const
{
ReadSettings res = *this;
res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size);
res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size);
return res;
}
};
}

View File

@ -0,0 +1,89 @@
#include <IO/SynchronousReader.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <common/errnoToString.h>
#include <unordered_map>
#include <mutex>
#include <unistd.h>
#include <fcntl.h>
namespace ProfileEvents
{
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds;
extern const Event Seek;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int CANNOT_ADVISE;
}
std::future<IAsynchronousReader::Result> SynchronousReader::submit(Request request)
{
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
#if defined(POSIX_FADV_WILLNEED)
if (0 != posix_fadvise(fd, request.offset, request.size, POSIX_FADV_WILLNEED))
throwFromErrno("Cannot posix_fadvise", ErrorCodes::CANNOT_ADVISE);
#endif
return std::async(std::launch::deferred, [fd, request]
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead);
Stopwatch watch(CLOCK_MONOTONIC);
size_t bytes_read = 0;
while (!bytes_read)
{
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::pread(fd, request.buf, request.size, request.offset);
}
if (!res)
break;
if (-1 == res && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (res > 0)
bytes_read += res;
}
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
/// (TaskStatsInfoGetter has about 500K RPS).
watch.stop();
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read;
});
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <IO/AsynchronousReader.h>
namespace DB
{
/** Implementation of IAsynchronousReader that in fact synchronous.
* The only addition is posix_fadvise.
*/
class SynchronousReader final : public IAsynchronousReader
{
public:
std::future<Result> submit(Request request) override;
};
}

226
src/IO/ThreadPoolReader.cpp Normal file
View File

@ -0,0 +1,226 @@
#include <IO/ThreadPoolReader.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <common/errnoToString.h>
#include <Poco/Event.h>
#include <future>
#include <unistd.h>
#include <fcntl.h>
#if defined(__linux__)
#include <sys/syscall.h>
#include <sys/uio.h>
/// We don't want to depend on specific glibc version.
#if !defined(RWF_NOWAIT)
#define RWF_NOWAIT 8
#endif
#if !defined(SYS_preadv2)
#if defined(__x86_64__)
#define SYS_preadv2 327
#elif defined(__aarch64__)
#define SYS_preadv2 286
#elif defined(__ppc64__)
#define SYS_preadv2 380
#else
#error "Unsupported architecture"
#endif
#endif
#endif
namespace ProfileEvents
{
extern const Event ThreadPoolReaderPageCacheHit;
extern const Event ThreadPoolReaderPageCacheHitBytes;
extern const Event ThreadPoolReaderPageCacheHitElapsedMicroseconds;
extern const Event ThreadPoolReaderPageCacheMiss;
extern const Event ThreadPoolReaderPageCacheMissBytes;
extern const Event ThreadPoolReaderPageCacheMissElapsedMicroseconds;
extern const Event ReadBufferFromFileDescriptorRead;
extern const Event ReadBufferFromFileDescriptorReadFailed;
extern const Event ReadBufferFromFileDescriptorReadBytes;
extern const Event DiskReadElapsedMicroseconds;
}
namespace CurrentMetrics
{
extern const Metric Read;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_)
: pool(pool_size, pool_size, queue_size_)
{
}
std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request request)
{
int fd = assert_cast<const LocalFileDescriptor &>(*request.descriptor).fd;
#if defined(__linux__)
/// Check if data is already in page cache with preadv2 syscall.
/// We don't want to depend on new Linux kernel.
static std::atomic<bool> has_pread_nowait_support{true};
if (has_pread_nowait_support.load(std::memory_order_relaxed))
{
Stopwatch watch(CLOCK_MONOTONIC);
std::promise<Result> promise;
std::future<Result> future = promise.get_future();
size_t bytes_read = 0;
while (!bytes_read)
{
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
struct iovec io_vec{ .iov_base = request.buf, .iov_len = request.size };
res = syscall(
SYS_preadv2, fd,
&io_vec, 1,
/// This is kind of weird calling convention for syscall.
static_cast<int64_t>(request.offset), static_cast<int64_t>(request.offset >> 32),
/// This flag forces read from page cache or returning EAGAIN.
RWF_NOWAIT);
}
if (!res)
{
/// The file has ended.
promise.set_value(0);
watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return future;
}
if (-1 == res)
{
if (errno == ENOSYS || errno == EOPNOTSUPP)
{
/// No support for the syscall or the flag in the Linux kernel.
has_pread_nowait_support.store(false, std::memory_order_relaxed);
break;
}
else if (errno == EAGAIN)
{
/// Data is not available in page cache. Will hand off to thread pool.
break;
}
else if (errno == EINTR)
{
/// Interrupted by a signal.
continue;
}
else
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
promise.set_exception(std::make_exception_ptr(ErrnoException(
fmt::format("Cannot read from file {}, {}", fd,
errnoToString(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)),
ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, errno)));
return future;
}
}
else
{
bytes_read += res;
}
}
if (bytes_read)
{
/// It reports real time spent including the time spent while thread was preempted doing nothing.
/// And it is Ok for the purpose of this watch (it is used to lower the number of threads to read from tables).
/// Sometimes it is better to use taskstats::blkio_delay_total, but it is quite expensive to get it
/// (TaskStatsInfoGetter has about 500K RPS).
watch.stop();
/// Read successfully from page cache.
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHit);
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitBytes, bytes_read);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheHitElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
promise.set_value(bytes_read);
return future;
}
}
#endif
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss);
auto task = std::make_shared<std::packaged_task<Result()>>([request, fd]
{
setThreadName("ThreadPoolRead");
Stopwatch watch(CLOCK_MONOTONIC);
size_t bytes_read = 0;
while (!bytes_read)
{
ssize_t res = 0;
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::Read};
res = ::pread(fd, request.buf, request.size, request.offset);
}
/// File has ended.
if (!res)
break;
if (-1 == res && errno != EINTR)
{
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadFailed);
throwFromErrno(fmt::format("Cannot read from file {}", fd), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
bytes_read += res;
}
watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissBytes, bytes_read);
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
return bytes_read;
});
auto future = task->get_future();
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
pool.scheduleOrThrow([task]{ (*task)(); }, -request.priority);
return future;
}
}

39
src/IO/ThreadPoolReader.h Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include <IO/AsynchronousReader.h>
#include <Common/ThreadPool.h>
namespace DB
{
/** Perform reads from separate thread pool of specified size.
*
* Note: doing reads from thread pool is usually bad idea for the following reasons:
* - for fast devices or when data is in page cache, it is less cache-friendly and less NUMA friendly
* and also involves extra synchronization overhead;
* - for fast devices and lots of small random reads, it cannot utilize the device, because
* OS will spent all the time in switching between threads and wasting CPU;
* - you don't know how many threads do you need, for example, when reading from page cache,
* you need the number of threads similar to the number of CPU cores;
* when reading from HDD array you need the number of threads as the number of disks times N;
* when reading from SSD you may need at least hundreds of threads for efficient random reads,
* but it is impractical;
* For example, this method is used in POSIX AIO that is notoriously useless (in contrast to Linux AIO).
*
* This is intended only as example for implementation of readers from remote filesystems,
* where this method can be feasible.
*/
class ThreadPoolReader final : public IAsynchronousReader
{
private:
ThreadPool pool;
public:
ThreadPoolReader(size_t pool_size, size_t queue_size_);
std::future<Result> submit(Request request) override;
/// pool automatically waits for all tasks in destructor.
};
}

View File

@ -1,6 +1,9 @@
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/MMapReadBufferFromFileWithCache.h>
#include <IO/AsynchronousReadBufferFromFile.h>
#include <IO/ThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <Common/ProfileEvents.h>
@ -16,16 +19,29 @@ namespace ProfileEvents
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
size_t estimated_size, size_t direct_io_threshold, size_t mmap_threshold, MMappedFileCache * mmap_cache,
size_t buffer_size, int flags, char * existing_memory, size_t alignment)
const ReadSettings & settings,
size_t estimated_size,
int flags,
char * existing_memory,
size_t alignment)
{
if (!existing_memory && mmap_threshold && mmap_cache && estimated_size >= mmap_threshold)
if (!existing_memory
&& settings.local_fs_method == ReadMethod::mmap
&& settings.mmap_threshold
&& settings.mmap_cache
&& estimated_size >= settings.mmap_threshold)
{
try
{
auto res = std::make_unique<MMapReadBufferFromFileWithCache>(*mmap_cache, filename, 0);
auto res = std::make_unique<MMapReadBufferFromFileWithCache>(*settings.mmap_cache, filename, 0);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMap);
return res;
}
@ -36,8 +52,41 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
}
auto create = [&](size_t buffer_size, int actual_flags)
{
std::unique_ptr<ReadBufferFromFileBase> res;
if (settings.local_fs_method == ReadMethod::read)
{
res = std::make_unique<ReadBufferFromFile>(filename, buffer_size, actual_flags, existing_memory, alignment);
}
else if (settings.local_fs_method == ReadMethod::pread || settings.local_fs_method == ReadMethod::mmap)
{
res = std::make_unique<ReadBufferFromFilePReadWithDescriptorsCache>(filename, buffer_size, actual_flags, existing_memory, alignment);
}
else if (settings.local_fs_method == ReadMethod::pread_fake_async)
{
static AsynchronousReaderPtr reader = std::make_shared<SynchronousReader>();
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment);
}
else if (settings.local_fs_method == ReadMethod::pread_threadpool)
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolReader>(16, 1000000);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");
return res;
};
if (flags == -1)
flags = O_RDONLY | O_CLOEXEC;
#if defined(OS_LINUX) || defined(__FreeBSD__)
if (direct_io_threshold && estimated_size >= direct_io_threshold)
if (settings.direct_io_threshold && estimated_size >= settings.direct_io_threshold)
{
/** O_DIRECT
* The O_DIRECT flag may impose alignment restrictions on the length and address of user-space buffers and the file offset of I/Os.
@ -61,6 +110,8 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
else if (alignment % min_alignment)
alignment = align_up(alignment);
size_t buffer_size = settings.local_fs_buffer_size;
if (buffer_size % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned size.
@ -75,8 +126,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
/// Attempt to open a file with O_DIRECT
try
{
auto res = std::make_unique<ReadBufferFromFilePReadWithCache>(
filename, buffer_size, (flags == -1 ? O_RDONLY | O_CLOEXEC : flags) | O_DIRECT, existing_memory, alignment);
std::unique_ptr<ReadBufferFromFileBase> res = create(buffer_size, flags | O_DIRECT);
ProfileEvents::increment(ProfileEvents::CreatedReadBufferDirectIO);
return res;
}
@ -86,13 +136,10 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
ProfileEvents::increment(ProfileEvents::CreatedReadBufferDirectIOFailed);
}
}
#else
(void)direct_io_threshold;
(void)estimated_size;
#endif
ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary);
return std::make_unique<ReadBufferFromFilePReadWithCache>(filename, buffer_size, flags, existing_memory, alignment);
return create(settings.local_fs_buffer_size, flags);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadSettings.h>
#include <string>
#include <memory>
@ -8,23 +9,13 @@
namespace DB
{
class MMappedFileCache;
/** Create an object to read data from a file.
* estimated_size - the number of bytes to read
* direct_io_threshold - the minimum number of bytes for asynchronous reads
*
* If direct_io_threshold = 0 or estimated_size < direct_io_threshold, read operations are executed synchronously.
* Otherwise, the read operations are performed asynchronously.
*/
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
size_t estimated_size,
size_t direct_io_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache,
size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
int flags_ = -1,
char * existing_memory = nullptr,
size_t alignment = 0);

View File

@ -20,7 +20,6 @@ PEERDIR(
SRCS(
AIO.cpp
AIOContextPool.cpp
BrotliReadBuffer.cpp
BrotliWriteBuffer.cpp
Bzip2ReadBuffer.cpp

View File

@ -2707,4 +2707,24 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
return ignored_part_uuids;
}
ReadSettings Context::getReadSettings() const
{
ReadSettings res;
res.local_fs_method = parseReadMethod(settings.local_filesystem_read_method.value);
res.local_fs_prefetch = settings.local_filesystem_read_prefetch;
res.remote_fs_prefetch = settings.remote_filesystem_read_prefetch;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.mmap_cache = getMMappedFileCache().get();
return res;
}
}

View File

@ -827,6 +827,9 @@ public:
ReadTaskCallback getReadTaskCallback() const;
void setReadTaskCallback(ReadTaskCallback && callback);
/** Get settings for reading from filesystem. */
ReadSettings getReadSettings() const;
private:
std::unique_lock<std::recursive_mutex> getLock() const;

View File

@ -328,7 +328,7 @@ void ThreadStatus::initQueryProfiler()
/* period */ static_cast<UInt32>(settings.query_profiler_real_time_period_ns));
if (settings.query_profiler_cpu_time_period_ns > 0)
query_profiler_cpu = std::make_unique<QueryProfilerCpu>(thread_id,
query_profiler_cpu = std::make_unique<QueryProfilerCPU>(thread_id,
/* period */ static_cast<UInt32>(settings.query_profiler_cpu_time_period_ns));
}
catch (...)

View File

@ -43,11 +43,9 @@ namespace ErrorCodes
static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context)
{
const auto & settings = context->getSettingsRef();
return {
.min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io,
.min_bytes_to_use_mmap_io = settings.min_bytes_to_use_mmap_io,
.mmap_cache = context->getMMappedFileCache(),
.max_read_buffer_size = settings.max_read_buffer_size,
return
{
.read_settings = context->getReadSettings(),
.save_marks_in_cache = true,
.checksum_on_read = settings.checksum_on_read,
};

View File

@ -183,7 +183,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
{
auto disk = part->volume->getDisk();
auto disk_type = DiskType::toString(disk->getType());
auto disk_type = toString(disk->getType());
if (disk->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
{
/// Send metadata if the receiver's capability covers the source disk type.
@ -364,7 +364,7 @@ void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part
writeStringBinary(it.first, out);
writeBinary(file_size, out);
auto file_in = createReadBufferFromFileBase(metadata_file, 0, 0, 0, nullptr, DBMS_DEFAULT_BUFFER_SIZE);
auto file_in = createReadBufferFromFileBase(metadata_file, {}, 0);
HashingWriteBuffer hashing_out(out);
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
if (blocker.isCancelled())
@ -431,23 +431,20 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
if (!disk)
{
DiskType::Type zero_copy_disk_types[] = {DiskType::Type::S3, DiskType::Type::HDFS};
for (auto disk_type: zero_copy_disk_types)
{
Disks disks = data.getDisksByType(disk_type);
if (!disks.empty())
{
capability.push_back(DiskType::toString(disk_type));
}
}
Disks disks = data.getDisks();
for (const auto & data_disk : disks)
if (data_disk->supportZeroCopyReplication())
capability.push_back(toString(data_disk->getType()));
}
else if (disk->supportZeroCopyReplication())
{
capability.push_back(DiskType::toString(disk->getType()));
capability.push_back(toString(disk->getType()));
}
}
if (!capability.empty())
{
std::sort(capability.begin(), capability.end());
capability.erase(std::unique(capability.begin(), capability.end()), capability.end());
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
}

View File

@ -56,7 +56,8 @@ namespace ErrorCodes
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
return disk->readFile(path, std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), disk->getFileSize(path)));
size_t file_size = disk->getFileSize(path);
return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path)

View File

@ -741,7 +741,7 @@ public:
/// Reserves 0 bytes
ReservationPtr makeEmptyReservationOnLargestDisk() const { return getStoragePolicy()->makeEmptyReservationOnLargestDisk(); }
Disks getDisksByType(DiskType::Type type) const { return getStoragePolicy()->getDisksByType(type); }
Disks getDisks() const { return getStoragePolicy()->getDisks(); }
/// Return alter conversions for part which must be applied on fly.
AlterConversions getAlterConversionsForPart(MergeTreeDataPartPtr part) const;

View File

@ -107,7 +107,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path);
auto buffer = volume->getDisk()->readFile(marks_file_path, marks_file_size);
auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size);
while (!buffer->eof())
{
/// Skip offsets for columns

View File

@ -129,7 +129,7 @@ void MergeTreeDataPartWide::loadIndexGranularity()
}
else
{
auto buffer = volume->getDisk()->readFile(marks_file_path, marks_file_size);
auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size);
while (!buffer->eof())
{
buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block

View File

@ -13,10 +13,8 @@ using MMappedFileCachePtr = std::shared_ptr<MMappedFileCache>;
struct MergeTreeReaderSettings
{
size_t min_bytes_to_use_direct_io = 0;
size_t min_bytes_to_use_mmap_io = 0;
MMappedFileCachePtr mmap_cache;
size_t max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
/// Common read settings.
ReadSettings read_settings;
/// If save_marks_in_cache is false, then, if marks are not in cache,
/// we will load them but won't save in the cache, to avoid evicting other data.
bool save_marks_in_cache = false;

View File

@ -63,7 +63,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
if (!index_granularity_info.is_adaptive)
{
/// Read directly to marks.
auto buffer = disk->readFile(mrk_path, file_size);
auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size);
buffer->readStrict(reinterpret_cast<char *>(res->data()), file_size);
if (!buffer->eof())
@ -72,7 +72,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
}
else
{
auto buffer = disk->readFile(mrk_path, file_size);
auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size);
size_t i = 0;
while (!buffer->eof())
{

View File

@ -171,7 +171,8 @@ namespace
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
return disk->readFile(path, std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), disk->getFileSize(path)));
size_t file_size = disk->getFileSize(path);
return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
}
String MergeTreePartition::getID(const MergeTreeData & storage) const

View File

@ -68,23 +68,19 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
if (!buffer_size || settings.max_read_buffer_size < buffer_size)
buffer_size = settings.max_read_buffer_size;
if (buffer_size)
settings.read_settings = settings.read_settings.adjustBufferSize(buffer_size);
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
[this, full_data_path]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io,
settings.mmap_cache.get());
settings.read_settings);
},
uncompressed_cache,
/* allow_different_codecs = */ true);
@ -104,11 +100,8 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io,
settings.mmap_cache.get()),
settings.read_settings,
0),
/* allow_different_codecs = */ true);
if (profile_callback_)

View File

@ -72,25 +72,21 @@ MergeTreeReaderStream::MergeTreeReaderStream(
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
/// For example: part has single dictionary and all marks point to the same position.
if (max_mark_range_bytes == 0)
max_mark_range_bytes = settings.max_read_buffer_size;
size_t buffer_size = std::min(settings.max_read_buffer_size, max_mark_range_bytes);
ReadSettings read_settings = settings.read_settings;
if (max_mark_range_bytes != 0)
read_settings = read_settings.adjustBufferSize(max_mark_range_bytes);
/// Initialize the objects that shall be used to perform read operations.
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(disk, path_prefix + data_file_extension),
[this, buffer_size, sum_mark_range_bytes, &settings]()
[this, sum_mark_range_bytes, read_settings]()
{
return disk->readFile(
path_prefix + data_file_extension,
buffer_size,
sum_mark_range_bytes,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io,
settings.mmap_cache.get());
read_settings,
sum_mark_range_bytes);
},
uncompressed_cache);
@ -108,12 +104,8 @@ MergeTreeReaderStream::MergeTreeReaderStream(
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
disk->readFile(
path_prefix + data_file_extension,
buffer_size,
sum_mark_range_bytes,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io,
settings.mmap_cache.get())
);
read_settings,
sum_mark_range_bytes));
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);

View File

@ -46,6 +46,7 @@ MergeTreeReaderWide::MergeTreeReaderWide(
{
try
{
disk = data_part->volume->getDisk();
for (const NameAndTypePair & column : columns)
{
auto column_from_part = getColumnFromPart(column);
@ -74,6 +75,28 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si
OffsetColumns offset_columns;
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
{
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
auto column_from_part = getColumnFromPart(*name_and_type);
try
{
auto & cache = caches[column_from_part.getNameInStorage()];
prefetch(column_from_part, from_mark, continue_reading, cache);
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + column_from_part.name + ")");
throw;
}
}
}
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
@ -153,7 +176,7 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
return;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part->volume->getDisk(), data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
disk, data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part->index_granularity_info,
@ -166,15 +189,14 @@ void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
}
void MergeTreeReaderWide::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
static ReadBuffer * getStream(
bool stream_for_prefix,
const ISerialization::SubstreamPath & substream_path,
MergeTreeReaderWide::FileStreams & streams,
const NameAndTypePair & name_and_type,
size_t from_mark, bool continue_reading,
ISerialization::SubstreamsCache & cache)
{
auto get_stream_getter = [&](bool stream_for_prefix) -> ISerialization::InputStreamGetter
{
return [&, stream_for_prefix](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer * //-V1047
{
/// If substream have already been read.
if (cache.count(ISerialization::getSubcolumnNameForStream(substream_path)))
return nullptr;
@ -188,35 +210,60 @@ void MergeTreeReaderWide::readData(
MergeTreeReaderStream & stream = *it->second;
if (stream_for_prefix)
{
stream.seekToStart();
continue_reading = false;
}
else if (!continue_reading)
stream.seekToMark(from_mark);
return stream.data_buffer;
};
};
}
void MergeTreeReaderWide::prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
ISerialization::SubstreamsCache & cache)
{
const auto & name = name_and_type.name;
auto & serialization = serializations[name];
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache))
buf->prefetch();
});
}
void MergeTreeReaderWide::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache)
{
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.avg_value_size_hint = avg_value_size_hint;
const auto & name = name_and_type.name;
auto serialization = serializations[name];
auto & serialization = serializations[name];
if (deserialize_binary_bulk_state_map.count(name) == 0)
{
deserialize_settings.getter = get_stream_getter(true);
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(true, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
};
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
}
deserialize_settings.getter = get_stream_getter(false);
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(false, substream_path, streams, name_and_type, from_mark, continue_reading, cache);
};
deserialize_settings.continuous_reading = continue_reading;
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
serializations[name]->deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, deserialize_settings, deserialize_state, &cache);
serialization->deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, deserialize_settings, deserialize_state, &cache);
IDataType::updateAvgValueSizeHint(*column, avg_value_size_hint);
}

View File

@ -32,12 +32,13 @@ public:
bool canReadIncompleteGranules() const override { return true; }
private:
using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
using Serializations = std::map<std::string, SerializationPtr>;
private:
FileStreams streams;
Serializations serializations;
DiskPtr disk;
void addStreams(const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
@ -46,6 +47,13 @@ private:
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
ISerialization::SubstreamsCache & cache);
/// Make next readData more simple by calling 'prefetch' of all related ReadBuffers.
void prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
ISerialization::SubstreamsCache & cache);
};
}

View File

@ -51,11 +51,13 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
columns_for_reader = data_part->getColumns().addTypes(columns_to_read);
}
ReadSettings read_settings;
if (read_with_direct_io)
read_settings.direct_io_threshold = 1;
MergeTreeReaderSettings reader_settings =
{
/// bytes to use direct IO (this is hack)
.min_bytes_to_use_direct_io = read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
.max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
.read_settings = read_settings,
.save_marks_in_cache = false
};

View File

@ -118,7 +118,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
std::unique_lock lock(write_mutex);
MergeTreeData::MutableDataPartsVector parts;
auto in = disk->readFile(path, DBMS_DEFAULT_BUFFER_SIZE);
auto in = disk->readFile(path, {}, 0);
NativeBlockInputStream block_in(*in, 0);
NameSet dropped_parts;

View File

@ -63,14 +63,14 @@ public:
LogSource(
size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
size_t mark_number_, size_t rows_limit_, ReadSettings read_settings_)
: SourceWithProgress(getHeader(columns_)),
block_size(block_size_),
columns(columns_),
storage(storage_),
mark_number(mark_number_),
rows_limit(rows_limit_),
max_read_buffer_size(max_read_buffer_size_)
read_settings(std::move(read_settings_))
{
}
@ -86,14 +86,14 @@ private:
size_t mark_number; /// from what mark to read data
size_t rows_limit; /// The maximum number of rows that can be read
size_t rows_read = 0;
size_t max_read_buffer_size;
ReadSettings read_settings;
std::unordered_map<String, SerializationPtr> serializations;
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_)
: plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path))))
Stream(const DiskPtr & disk, const String & data_path, size_t offset, ReadSettings read_settings_)
: plain(disk->readFile(data_path, read_settings_.adjustBufferSize(disk->getFileSize(data_path))))
, compressed(*plain)
{
if (offset)
@ -188,7 +188,7 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
offset = file_it->second.marks[mark_number].offset;
auto & data_file_path = file_it->second.data_file_path;
auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, max_read_buffer_size).first;
auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, read_settings).first;
return &it->second.compressed;
};
@ -563,7 +563,7 @@ void StorageLog::loadMarks(std::chrono::seconds lock_timeout)
for (auto & file : files_by_index)
file->second.marks.reserve(marks_count);
std::unique_ptr<ReadBuffer> marks_rb = disk->readFile(marks_file_path, 32768);
std::unique_ptr<ReadBuffer> marks_rb = disk->readFile(marks_file_path, ReadSettings().adjustBufferSize(32768));
while (!marks_rb->eof())
{
for (auto & file : files_by_index)
@ -678,7 +678,7 @@ Pipe StorageLog::read(
if (num_streams > marks_size)
num_streams = marks_size;
size_t max_read_buffer_size = context->getSettingsRef().max_read_buffer_size;
ReadSettings read_settings = context->getReadSettings();
for (size_t stream = 0; stream < num_streams; ++stream)
{
@ -694,7 +694,7 @@ Pipe StorageLog::read(
*this,
mark_begin,
rows_end - rows_begin,
max_read_buffer_size));
read_settings));
}
/// No need to hold lock while reading because we read fixed range of data that does not change while appending more data.

View File

@ -7246,7 +7246,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
return;
String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk->getType()));
String zero_copy = fmt::format("zero_copy_{}", toString(disk->getType()));
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
@ -7286,7 +7286,7 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
DiskPtr disk = part.volume->getDisk();
if (!disk || !disk->supportZeroCopyReplication())
return true;
String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk->getType()));
String zero_copy = fmt::format("zero_copy_{}", toString(disk->getType()));
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
@ -7346,7 +7346,7 @@ bool StorageReplicatedMergeTree::tryToFetchIfShared(
String StorageReplicatedMergeTree::getSharedDataReplica(
const IMergeTreeDataPart & part, DiskType::Type disk_type) const
const IMergeTreeDataPart & part, DiskType disk_type) const
{
String best_replica;
@ -7354,7 +7354,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
if (!zookeeper)
return best_replica;
String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk_type));
String zero_copy = fmt::format("zero_copy_{}", toString(disk_type));
String zookeeper_part_node = fs::path(zookeeper_path) / zero_copy / "shared" / part.name;
Strings ids;

View File

@ -239,7 +239,7 @@ public:
bool tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType::Type disk_type) const;
String getSharedDataReplica(const IMergeTreeDataPart & part, DiskType disk_type) const;
inline String getReplicaName() const { return replica_name; }

View File

@ -78,7 +78,7 @@ public:
StorageStripeLog & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Names & column_names,
size_t max_read_buffer_size_,
ReadSettings read_settings_,
std::shared_ptr<const IndexForNativeFormat> & index_,
IndexForNativeFormat::Blocks::const_iterator index_begin_,
IndexForNativeFormat::Blocks::const_iterator index_end_)
@ -86,7 +86,7 @@ public:
getHeader(storage_, metadata_snapshot_, column_names, index_begin_, index_end_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_read_buffer_size(max_read_buffer_size_)
, read_settings(std::move(read_settings_))
, index(index_)
, index_begin(index_begin_)
, index_end(index_end_)
@ -123,7 +123,7 @@ protected:
private:
StorageStripeLog & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_read_buffer_size;
ReadSettings read_settings;
std::shared_ptr<const IndexForNativeFormat> index;
IndexForNativeFormat::Blocks::const_iterator index_begin;
@ -145,9 +145,7 @@ private:
started = true;
String data_file_path = storage.table_path + "data.bin";
size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file_path));
data_in.emplace(storage.disk->readFile(data_file_path, buffer_size));
data_in.emplace(storage.disk->readFile(data_file_path, read_settings.adjustBufferSize(storage.disk->getFileSize(data_file_path))));
block_in.emplace(*data_in, 0, index_begin, index_end);
}
}
@ -345,7 +343,9 @@ Pipe StorageStripeLog::read(
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
}
CompressedReadBufferFromFile index_in(disk->readFile(index_file, 4096));
ReadSettings read_settings = context->getReadSettings();
CompressedReadBufferFromFile index_in(disk->readFile(index_file, read_settings.adjustBufferSize(4096)));
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
size_t size = index->blocks.size();
@ -361,7 +361,7 @@ Pipe StorageStripeLog::read(
std::advance(end, (stream + 1) * size / num_streams);
pipes.emplace_back(std::make_shared<StripeLogSource>(
*this, metadata_snapshot, column_names, context->getSettingsRef().max_read_buffer_size, index, begin, end));
*this, metadata_snapshot, column_names, read_settings, index, begin, end));
}
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.

View File

@ -70,11 +70,11 @@ public:
size_t block_size_,
const NamesAndTypesList & columns_,
StorageTinyLog & storage_,
size_t max_read_buffer_size_,
ReadSettings read_settings_,
FileChecker::Map file_sizes_)
: SourceWithProgress(getHeader(columns_))
, block_size(block_size_), columns(columns_), storage(storage_)
, max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_))
, read_settings(std::move(read_settings_)), file_sizes(std::move(file_sizes_))
{
}
@ -88,13 +88,15 @@ private:
NamesAndTypesList columns;
StorageTinyLog & storage;
bool is_finished = false;
size_t max_read_buffer_size;
ReadSettings read_settings;
FileChecker::Map file_sizes;
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size)
: plain(file_size ? disk->readFile(data_path, std::min(max_read_buffer_size_, file_size)) : std::make_unique<ReadBuffer>(nullptr, 0)),
Stream(const DiskPtr & disk, const String & data_path, ReadSettings read_settings_, size_t file_size)
: plain(file_size
? disk->readFile(data_path, read_settings_.adjustBufferSize(file_size))
: std::make_unique<ReadBuffer>(nullptr, 0)),
limited(std::make_unique<LimitReadBuffer>(*plain, file_size, false)),
compressed(*limited)
{
@ -178,7 +180,7 @@ void TinyLogSource::readData(const NameAndTypePair & name_and_type,
{
String file_path = storage.files[stream_name].data_file_path;
stream = std::make_unique<Stream>(
storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]);
storage.disk, file_path, read_settings, file_sizes[fileName(file_path)]);
}
return &stream->compressed;
@ -493,8 +495,6 @@ Pipe StorageTinyLog::read(
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
const Settings & settings = context->getSettingsRef();
std::shared_lock lock{rwlock, getLockTimeout(context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
@ -504,7 +504,7 @@ Pipe StorageTinyLog::read(
max_block_size,
Nested::convertToSubcolumns(all_columns),
*this,
settings.max_read_buffer_size,
context->getReadSettings(),
file_checker.getFileSizes()));
}

View File

@ -51,7 +51,7 @@ Pipe StorageSystemDisks::read(
col_free->insert(disk_ptr->getAvailableSpace());
col_total->insert(disk_ptr->getTotalSpace());
col_keep->insert(disk_ptr->getKeepingFreeSpace());
col_type->insert(DiskType::toString(disk_ptr->getType()));
col_type->insert(toString(disk_ptr->getType()));
}
Columns res_columns;

View File

@ -1,6 +1,6 @@
<test>
<preconditions>
<table_exists>hits_10m_single</table_exists>
<table_exists>test.hits</table_exists>
</preconditions>
<create_query>CREATE TABLE hits_none (Title String CODEC(NONE)) ENGINE = MergeTree ORDER BY tuple()</create_query>

View File

@ -1,9 +1,10 @@
<test>
<preconditions>
<table_exists>hits_10m_single</table_exists>
<table_exists>test.hits</table_exists>
</preconditions>
<settings>
<local_filesystem_read_method>mmap</local_filesystem_read_method>
<min_bytes_to_use_mmap_io>1</min_bytes_to_use_mmap_io>
</settings>

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS test_01343;
CREATE TABLE test_01343 (x String) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_01343 VALUES ('Hello, world');
SET min_bytes_to_use_mmap_io = 1;
SET local_filesystem_read_method = 'mmap', min_bytes_to_use_mmap_io = 1;
SELECT * FROM test_01343;
SYSTEM FLUSH LOGS;

View File

@ -2,7 +2,7 @@ DROP TABLE IF EXISTS test_01344;
CREATE TABLE test_01344 (x String, INDEX idx (x) TYPE set(10) GRANULARITY 1) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO test_01344 VALUES ('Hello, world');
SET min_bytes_to_use_mmap_io = 1;
SET local_filesystem_read_method = 'mmap', min_bytes_to_use_mmap_io = 1;
SELECT * FROM test_01344 WHERE x = 'Hello, world';
SYSTEM FLUSH LOGS;

View File

@ -19,7 +19,7 @@
static void checkByCompressedReadBuffer(const std::string & mrk_path, const std::string & bin_path)
{
DB::ReadBufferFromFile mrk_in(mrk_path);
DB::CompressedReadBufferFromFile bin_in(bin_path, 0, 0, 0, nullptr);
DB::CompressedReadBufferFromFile bin_in(bin_path, {}, 0);
DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
bool mrk2_format = boost::algorithm::ends_with(mrk_path, ".mrk2");