Better version of cache on insert

This commit is contained in:
kssenii 2022-04-01 16:45:15 +02:00
parent 6c8e073a61
commit 36c583d0de
10 changed files with 163 additions and 12 deletions

View File

@ -91,6 +91,7 @@ public:
struct QueryScope struct QueryScope
{ {
explicit QueryScope(ContextMutablePtr query_context); explicit QueryScope(ContextMutablePtr query_context);
explicit QueryScope(ContextPtr query_context);
~QueryScope(); ~QueryScope();
void logPeakMemoryUsage(); void logPeakMemoryUsage();

View File

@ -262,6 +262,78 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
assert(getDownloadOffset() == offset_ + size); assert(getDownloadOffset() == offset_ + size);
} }
void FileSegment::writeInMemory(const char * from, size_t size)
{
if (!size)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed");
if (availableSize() < size)
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
std::lock_guard segment_lock(mutex);
if (cache_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer already initialized");
auto download_path = cache->getPathInLocalCache(key(), offset());
cache_writer = std::make_unique<WriteBufferFromFile>(download_path, size + 1);
try
{
cache_writer->write(from, size);
}
catch (...)
{
LOG_ERROR(log, "Failed to write to cache. File segment info: {}", getInfoForLogImpl(segment_lock));
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
cache_writer->finalize();
cache_writer.reset();
throw;
}
}
size_t FileSegment::finalizeWrite()
{
std::lock_guard segment_lock(mutex);
if (!cache_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer not initialized");
size_t size = cache_writer->offset();
if (size == 0)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing size is not allowed");
try
{
cache_writer->next();
}
catch (...)
{
download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION;
cache_writer->finalize();
cache_writer.reset();
throw;
}
downloaded_size += size;
cache_writer.reset();
downloader_id.clear();
download_state = State::DOWNLOADED;
if (downloaded_size != range().size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} == {}", downloaded_size, range().size());
return size;
}
FileSegment::State FileSegment::wait() FileSegment::State FileSegment::wait()
{ {
std::unique_lock segment_lock(mutex); std::unique_lock segment_lock(mutex);

View File

@ -97,6 +97,10 @@ public:
void write(const char * from, size_t size, size_t offset_); void write(const char * from, size_t size, size_t offset_);
void writeInMemory(const char * from, size_t size);
size_t finalizeWrite();
RemoteFileReaderPtr getRemoteFileReader(); RemoteFileReaderPtr getRemoteFileReader();
void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_); void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_);

View File

@ -33,7 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader))); auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }); schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, nullptr);
return true; return true;
} }

View File

@ -7,6 +7,7 @@
#include <IO/WriteBufferFromS3.h> #include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <aws/s3/S3Client.h> #include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h> #include <aws/s3/model/CreateMultipartUploadRequest.h>
@ -43,6 +44,7 @@ struct WriteBufferFromS3::UploadPartTask
bool is_finised = false; bool is_finised = false;
std::string tag; std::string tag;
std::exception_ptr exception; std::exception_ptr exception;
std::optional<FileSegmentsHolder> cache_files;
}; };
struct WriteBufferFromS3::PutObjectTask struct WriteBufferFromS3::PutObjectTask
@ -93,25 +95,50 @@ void WriteBufferFromS3::nextImpl()
size_t size = offset(); size_t size = offset();
temporary_buffer->write(working_buffer.begin(), size); temporary_buffer->write(working_buffer.begin(), size);
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (CurrentThread::isInitialized())
query_context = CurrentThread::get().getQueryContext();
if (!query_context)
{
if (!shared_query_context)
{
ContextPtr global_context = CurrentThread::isInitialized() ? CurrentThread::get().getGlobalContext() : nullptr;
if (global_context)
{
shared_query_context = Context::createCopy(global_context);
shared_query_context->makeQueryContext();
}
}
if (shared_query_context)
{
shared_query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
query_context = shared_query_context;
}
}
if (cacheEnabled()) if (cacheEnabled())
{ {
if (blob_name.empty()) if (blob_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty blob name"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty blob name");
auto cache_key = cache->hash(blob_name); auto cache_key = cache->hash(blob_name);
auto file_segments_holder = cache->setDownloading(cache_key, current_download_offset, size); file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size));
current_download_offset += size; current_download_offset += size;
size_t remaining_size = size; size_t remaining_size = size;
for (const auto & file_segment : file_segments_holder.file_segments) for (const auto & file_segment : file_segments_holder->file_segments)
{ {
size_t current_size = std::min(file_segment->range().size(), remaining_size); size_t current_size = std::min(file_segment->range().size(), remaining_size);
remaining_size -= current_size; remaining_size -= current_size;
if (file_segment->reserve(current_size)) if (file_segment->reserve(current_size))
{ {
file_segment->write(working_buffer.begin(), current_size); file_segment->writeInMemory(working_buffer.begin(), current_size);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, current_size);
} }
else else
{ {
@ -273,7 +300,9 @@ void WriteBufferFromS3::writePart()
/// Releasing lock and condvar notification. /// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one(); bg_tasks_condvar.notify_one();
} }
});
finalizeCacheIfNeeded();
}, query_context);
} }
else else
{ {
@ -281,6 +310,7 @@ void WriteBufferFromS3::writePart()
fillUploadRequest(task.req, part_tags.size() + 1); fillUploadRequest(task.req, part_tags.size() + 1);
processUploadRequest(task); processUploadRequest(task);
part_tags.push_back(task.tag); part_tags.push_back(task.tag);
finalizeCacheIfNeeded();
} }
} }
@ -389,13 +419,15 @@ void WriteBufferFromS3::makeSinglepartUpload()
bg_tasks_condvar.notify_one(); bg_tasks_condvar.notify_one();
} }
}); finalizeCacheIfNeeded();
}, query_context);
} }
else else
{ {
PutObjectTask task; PutObjectTask task;
fillPutRequest(task.req); fillPutRequest(task.req);
processPutRequest(task); processPutRequest(task);
finalizeCacheIfNeeded();
} }
} }
@ -423,6 +455,28 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
} }
void WriteBufferFromS3::finalizeCacheIfNeeded()
{
if (!file_segments_holder)
return;
auto & file_segments = file_segments_holder->file_segments;
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
{
try
{
size_t size = (*file_segment_it)->finalizeWrite();
file_segment_it = file_segments.erase(file_segment_it);
ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
void WriteBufferFromS3::waitForReadyBackGroundTasks() void WriteBufferFromS3::waitForReadyBackGroundTasks()
{ {
if (schedule) if (schedule)

View File

@ -12,6 +12,7 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/FileCache_fwd.h> #include <Common/FileCache_fwd.h>
#include <Common/FileSegment.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
@ -32,7 +33,7 @@ namespace Aws::S3::Model
namespace DB namespace DB
{ {
using ScheduleFunc = std::function<void(std::function<void()>)>; using ScheduleFunc = std::function<void(std::function<void()>, ContextPtr)>;
class WriteBufferFromFile; class WriteBufferFromFile;
/** /**
@ -125,6 +126,10 @@ private:
const String blob_name; const String blob_name;
FileCachePtr cache; FileCachePtr cache;
size_t current_download_offset = 0; size_t current_download_offset = 0;
std::optional<FileSegmentsHolder> file_segments_holder;
void finalizeCacheIfNeeded();
ContextMutablePtr shared_query_context;
ContextPtr query_context;
}; };
} }

View File

@ -597,6 +597,16 @@ CurrentThread::QueryScope::QueryScope(ContextMutablePtr query_context)
query_context->makeQueryContext(); query_context->makeQueryContext();
} }
CurrentThread::QueryScope::QueryScope(ContextPtr query_context)
{
if (!query_context->hasQueryContext())
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Cannot initialize query scope without query context");
CurrentThread::initializeQuery();
CurrentThread::attachQueryContext(query_context);
}
void CurrentThread::QueryScope::logPeakMemoryUsage() void CurrentThread::QueryScope::logPeakMemoryUsage()
{ {
auto group = CurrentThread::getGroup(); auto group = CurrentThread::getGroup();

View File

@ -9,14 +9,19 @@ namespace DB
CallbackRunner threadPoolCallbackRunner(ThreadPool & pool) CallbackRunner threadPoolCallbackRunner(ThreadPool & pool)
{ {
return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback) mutable return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback, ContextPtr query_context) mutable
{ {
pool->scheduleOrThrow( pool->scheduleOrThrow(
[&, callback = std::move(callback), thread_group]() [&, callback = std::move(callback), thread_group, query_context]()
{ {
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachTo(thread_group);
std::optional<CurrentThread::QueryScope> query_scope;
if (query_context && !CurrentThread::get().getQueryContext())
query_scope.emplace(query_context);
SCOPE_EXIT_SAFE({ SCOPE_EXIT_SAFE({
if (thread_group) if (thread_group)
CurrentThread::detachQueryIfNotDetached(); CurrentThread::detachQueryIfNotDetached();

View File

@ -7,7 +7,7 @@ namespace DB
{ {
/// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously /// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously
using CallbackRunner = std::function<void(std::function<void()>)>; using CallbackRunner = std::function<void(std::function<void()>, ContextPtr)>;
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()' /// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'
CallbackRunner threadPoolCallbackRunner(ThreadPool & pool); CallbackRunner threadPoolCallbackRunner(ThreadPool & pool);

View File

@ -85,7 +85,7 @@ fi
if [[ -n "$EXPORT_S3_STORAGE_POLICIES" ]]; then if [[ -n "$EXPORT_S3_STORAGE_POLICIES" ]]; then
ln -sf $SRC_PATH/config.d/storage_conf.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/storage_conf.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/users.d/s3_cache.xml $DEST_SERVER_PATH/s3_cache/ ln -sf $SRC_PATH/users.d/s3_cache.xml $DEST_SERVER_PATH/config.d/
fi fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then