From 36c583d0dee412d398a7c253e12dff6c6b670447 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 1 Apr 2022 16:45:15 +0200 Subject: [PATCH] Better version of cache on insert --- src/Common/CurrentThread.h | 1 + src/Common/FileSegment.cpp | 72 +++++++++++++++++++ src/Common/FileSegment.h | 4 ++ src/IO/ParallelReadBuffer.cpp | 2 +- src/IO/WriteBufferFromS3.cpp | 66 +++++++++++++++-- src/IO/WriteBufferFromS3.h | 7 +- src/Interpreters/ThreadStatusExt.cpp | 10 +++ src/Interpreters/threadPoolCallbackRunner.cpp | 9 ++- src/Interpreters/threadPoolCallbackRunner.h | 2 +- tests/config/install.sh | 2 +- 10 files changed, 163 insertions(+), 12 deletions(-) diff --git a/src/Common/CurrentThread.h b/src/Common/CurrentThread.h index 9dbe8d355d6..4888adb511a 100644 --- a/src/Common/CurrentThread.h +++ b/src/Common/CurrentThread.h @@ -91,6 +91,7 @@ public: struct QueryScope { explicit QueryScope(ContextMutablePtr query_context); + explicit QueryScope(ContextPtr query_context); ~QueryScope(); void logPeakMemoryUsage(); diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 76749d24f43..d8e7a994df4 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -262,6 +262,78 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) assert(getDownloadOffset() == offset_ + size); } +void FileSegment::writeInMemory(const char * from, size_t size) +{ + if (!size) + throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed"); + + if (availableSize() < size) + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Not enough space is reserved. Available: {}, expected: {}", availableSize(), size); + + std::lock_guard segment_lock(mutex); + + if (cache_writer) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer already initialized"); + + auto download_path = cache->getPathInLocalCache(key(), offset()); + cache_writer = std::make_unique(download_path, size + 1); + + try + { + cache_writer->write(from, size); + } + catch (...) + { + LOG_ERROR(log, "Failed to write to cache. File segment info: {}", getInfoForLogImpl(segment_lock)); + + download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; + + cache_writer->finalize(); + cache_writer.reset(); + + throw; + } +} + +size_t FileSegment::finalizeWrite() +{ + std::lock_guard segment_lock(mutex); + + if (!cache_writer) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache writer not initialized"); + + size_t size = cache_writer->offset(); + + if (size == 0) + throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing size is not allowed"); + + try + { + cache_writer->next(); + } + catch (...) + { + download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; + + cache_writer->finalize(); + cache_writer.reset(); + + throw; + } + + downloaded_size += size; + cache_writer.reset(); + downloader_id.clear(); + download_state = State::DOWNLOADED; + + if (downloaded_size != range().size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} == {}", downloaded_size, range().size()); + + return size; +} + FileSegment::State FileSegment::wait() { std::unique_lock segment_lock(mutex); diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index ed9d33d37d1..c9e4146c726 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -97,6 +97,10 @@ public: void write(const char * from, size_t size, size_t offset_); + void writeInMemory(const char * from, size_t size); + + size_t finalizeWrite(); + RemoteFileReaderPtr getRemoteFileReader(); void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_); diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index f036d6a08c8..64550e9430b 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -33,7 +33,7 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock & /*buffer auto worker = read_workers.emplace_back(std::make_shared(std::move(reader))); - schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }); + schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); }, nullptr); return true; } diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index b5e61724ede..20d9a054230 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -43,6 +44,7 @@ struct WriteBufferFromS3::UploadPartTask bool is_finised = false; std::string tag; std::exception_ptr exception; + std::optional cache_files; }; struct WriteBufferFromS3::PutObjectTask @@ -93,25 +95,50 @@ void WriteBufferFromS3::nextImpl() size_t size = offset(); temporary_buffer->write(working_buffer.begin(), size); + ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup() + ? CurrentThread::get().getThreadGroup() + : MainThreadStatus::getInstance().getThreadGroup(); + + if (CurrentThread::isInitialized()) + query_context = CurrentThread::get().getQueryContext(); + + if (!query_context) + { + if (!shared_query_context) + { + ContextPtr global_context = CurrentThread::isInitialized() ? CurrentThread::get().getGlobalContext() : nullptr; + if (global_context) + { + shared_query_context = Context::createCopy(global_context); + shared_query_context->makeQueryContext(); + } + } + + if (shared_query_context) + { + shared_query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4())); + query_context = shared_query_context; + } + } + if (cacheEnabled()) { if (blob_name.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty blob name"); auto cache_key = cache->hash(blob_name); - auto file_segments_holder = cache->setDownloading(cache_key, current_download_offset, size); + file_segments_holder.emplace(cache->setDownloading(cache_key, current_download_offset, size)); current_download_offset += size; size_t remaining_size = size; - for (const auto & file_segment : file_segments_holder.file_segments) + for (const auto & file_segment : file_segments_holder->file_segments) { size_t current_size = std::min(file_segment->range().size(), remaining_size); remaining_size -= current_size; if (file_segment->reserve(current_size)) { - file_segment->write(working_buffer.begin(), current_size); - ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, current_size); + file_segment->writeInMemory(working_buffer.begin(), current_size); } else { @@ -273,7 +300,9 @@ void WriteBufferFromS3::writePart() /// Releasing lock and condvar notification. bg_tasks_condvar.notify_one(); } - }); + + finalizeCacheIfNeeded(); + }, query_context); } else { @@ -281,6 +310,7 @@ void WriteBufferFromS3::writePart() fillUploadRequest(task.req, part_tags.size() + 1); processUploadRequest(task); part_tags.push_back(task.tag); + finalizeCacheIfNeeded(); } } @@ -389,13 +419,15 @@ void WriteBufferFromS3::makeSinglepartUpload() bg_tasks_condvar.notify_one(); } - }); + finalizeCacheIfNeeded(); + }, query_context); } else { PutObjectTask task; fillPutRequest(task.req); processPutRequest(task); + finalizeCacheIfNeeded(); } } @@ -423,6 +455,28 @@ void WriteBufferFromS3::processPutRequest(PutObjectTask & task) throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } +void WriteBufferFromS3::finalizeCacheIfNeeded() +{ + if (!file_segments_holder) + return; + + auto & file_segments = file_segments_holder->file_segments; + for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();) + { + try + { + size_t size = (*file_segment_it)->finalizeWrite(); + file_segment_it = file_segments.erase(file_segment_it); + + ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + void WriteBufferFromS3::waitForReadyBackGroundTasks() { if (schedule) diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index ecddd72b9e8..1987bbe76a5 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -32,7 +33,7 @@ namespace Aws::S3::Model namespace DB { -using ScheduleFunc = std::function)>; +using ScheduleFunc = std::function, ContextPtr)>; class WriteBufferFromFile; /** @@ -125,6 +126,10 @@ private: const String blob_name; FileCachePtr cache; size_t current_download_offset = 0; + std::optional file_segments_holder; + void finalizeCacheIfNeeded(); + ContextMutablePtr shared_query_context; + ContextPtr query_context; }; } diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 2ea371d3d03..8fbbdb44c99 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -597,6 +597,16 @@ CurrentThread::QueryScope::QueryScope(ContextMutablePtr query_context) query_context->makeQueryContext(); } +CurrentThread::QueryScope::QueryScope(ContextPtr query_context) +{ + if (!query_context->hasQueryContext()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, "Cannot initialize query scope without query context"); + + CurrentThread::initializeQuery(); + CurrentThread::attachQueryContext(query_context); +} + void CurrentThread::QueryScope::logPeakMemoryUsage() { auto group = CurrentThread::getGroup(); diff --git a/src/Interpreters/threadPoolCallbackRunner.cpp b/src/Interpreters/threadPoolCallbackRunner.cpp index 288079e49d2..9eeea986d09 100644 --- a/src/Interpreters/threadPoolCallbackRunner.cpp +++ b/src/Interpreters/threadPoolCallbackRunner.cpp @@ -9,14 +9,19 @@ namespace DB CallbackRunner threadPoolCallbackRunner(ThreadPool & pool) { - return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback) mutable + return [pool = &pool, thread_group = CurrentThread::getGroup()](auto callback, ContextPtr query_context) mutable { pool->scheduleOrThrow( - [&, callback = std::move(callback), thread_group]() + [&, callback = std::move(callback), thread_group, query_context]() { if (thread_group) CurrentThread::attachTo(thread_group); + std::optional query_scope; + + if (query_context && !CurrentThread::get().getQueryContext()) + query_scope.emplace(query_context); + SCOPE_EXIT_SAFE({ if (thread_group) CurrentThread::detachQueryIfNotDetached(); diff --git a/src/Interpreters/threadPoolCallbackRunner.h b/src/Interpreters/threadPoolCallbackRunner.h index 59d06f2f1bc..8d9d5d4d45b 100644 --- a/src/Interpreters/threadPoolCallbackRunner.h +++ b/src/Interpreters/threadPoolCallbackRunner.h @@ -7,7 +7,7 @@ namespace DB { /// High-order function to run callbacks (functions with 'void()' signature) somewhere asynchronously -using CallbackRunner = std::function)>; +using CallbackRunner = std::function, ContextPtr)>; /// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()' CallbackRunner threadPoolCallbackRunner(ThreadPool & pool); diff --git a/tests/config/install.sh b/tests/config/install.sh index f1b4fe1a588..323ded10370 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -85,7 +85,7 @@ fi if [[ -n "$EXPORT_S3_STORAGE_POLICIES" ]]; then ln -sf $SRC_PATH/config.d/storage_conf.xml $DEST_SERVER_PATH/config.d/ - ln -sf $SRC_PATH/users.d/s3_cache.xml $DEST_SERVER_PATH/s3_cache/ + ln -sf $SRC_PATH/users.d/s3_cache.xml $DEST_SERVER_PATH/config.d/ fi if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then