rework WriteBufferFromS3, squashed

This commit is contained in:
Sema Checherinda 2023-04-18 13:11:42 +02:00
parent adad880c14
commit 7fbf87be17
30 changed files with 2102 additions and 591 deletions

View File

@ -1,15 +1,30 @@
set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest")
set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest")
add_library(_gtest "${SRC_DIR}/src/gtest-all.cc")
add_library(_gtest "${SRC_DIR}/googletest/src/gtest-all.cc")
set_target_properties(_gtest PROPERTIES VERSION "1.0.0")
target_compile_definitions (_gtest PUBLIC GTEST_HAS_POSIX_RE=0)
target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/include")
target_include_directories(_gtest PRIVATE "${SRC_DIR}")
target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/googletest/include")
target_include_directories(_gtest PRIVATE "${SRC_DIR}/googletest")
add_library(_gtest_main "${SRC_DIR}/src/gtest_main.cc")
add_library(_gtest_main "${SRC_DIR}/googletest/src/gtest_main.cc")
set_target_properties(_gtest_main PROPERTIES VERSION "1.0.0")
target_link_libraries(_gtest_main PUBLIC _gtest)
add_library(_gtest_all INTERFACE)
target_link_libraries(_gtest_all INTERFACE _gtest _gtest_main)
add_library(ch_contrib::gtest_all ALIAS _gtest_all)
add_library(_gmock "${SRC_DIR}/googlemock/src/gmock-all.cc")
set_target_properties(_gmock PROPERTIES VERSION "1.0.0")
target_compile_definitions (_gmock PUBLIC GTEST_HAS_POSIX_RE=0)
target_include_directories(_gmock SYSTEM PUBLIC "${SRC_DIR}/googlemock/include" "${SRC_DIR}/googletest/include")
target_include_directories(_gmock PRIVATE "${SRC_DIR}/googlemock")
add_library(_gmock_main "${SRC_DIR}/googlemock/src/gmock_main.cc")
set_target_properties(_gmock_main PROPERTIES VERSION "1.0.0")
target_link_libraries(_gmock_main PUBLIC _gmock)
add_library(_gmock_all INTERFACE)
target_link_libraries(_gmock_all INTERFACE _gmock _gmock_main)
add_library(ch_contrib::gmock_all ALIAS _gmock_all)

View File

@ -253,7 +253,6 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
fs::path(s3_uri.key) / file_name,
request_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"));
}

View File

@ -612,6 +612,7 @@ if (ENABLE_TESTS)
target_link_libraries(unit_tests_dbms PRIVATE
ch_contrib::gtest_all
ch_contrib::gmock_all
clickhouse_functions
clickhouse_aggregate_functions
clickhouse_parsers

View File

@ -544,6 +544,7 @@ try
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk_ptr);
auto buf = std::make_unique<WriteBufferFromTemporaryFile>(std::move(tmp_file));
buf->write(data.data, data.PAGE_SIZE_IN_BYTES);
buf->finalize();
buf->sync();
}
return true;

View File

@ -211,10 +211,16 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
{
size_t size = offset();
/// Write data to cache.
cacheData(working_buffer.begin(), size, throw_on_error_from_cache);
current_download_offset += size;
try
{
SwapHelper swap(*this, *impl);
/// Write data to the underlying buffer.
/// Actually here WriteBufferFromFileDecorator::nextImpl has to be called, but it is pivate method.
/// In particular WriteBufferFromFileDecorator introduces logic with swaps in order to achieve delegation.
impl->next();
}
catch (...)
@ -225,10 +231,6 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
throw;
}
/// Write data to cache.
cacheData(working_buffer.begin(), size, throw_on_error_from_cache);
current_download_offset += size;
}
void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool throw_on_error)
@ -292,8 +294,7 @@ void CachedOnDiskWriteBufferFromFile::finalizeImpl()
{
try
{
SwapHelper swap(*this, *impl);
impl->finalize();
WriteBufferFromFileDecorator::finalizeImpl();
}
catch (...)
{

View File

@ -161,7 +161,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
WriteMode mode, // S3 doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
size_t buf_size [[maybe_unused]],
const WriteSettings & write_settings)
{
WriteSettings disk_write_settings = IObjectStorage::patchSettings(write_settings);
@ -180,7 +180,6 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
object.remote_path,
settings_ptr->request_settings,
attributes,
buf_size,
std::move(scheduler),
disk_write_settings);

View File

@ -255,7 +255,7 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
auto result = Aws::S3::S3Client::HeadObject(request);
auto result = HeadObject(static_cast<const Model::HeadObjectRequest&>(request));
if (result.IsSuccess())
return result;
@ -312,70 +312,75 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
request.overrideURI(std::move(*bucket_uri));
return Aws::S3::S3Client::HeadObject(request);
/// The next call is NOT a recurcive call
/// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&)
return HeadObject(static_cast<const Model::HeadObjectRequest&>(request));
}
/// For each request, we wrap the request functions from Aws::S3::Client with doRequest
/// doRequest calls virtuall function from Aws::S3::Client while DB::S3::Client has not virtual calls for each request type
Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const
{
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return Aws::S3::S3Client::ListObjectsV2(req); });
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
}
Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const
{
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return Aws::S3::S3Client::ListObjects(req); });
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
}
Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const
{
return doRequest(request, [this](const Model::GetObjectRequest & req) { return Aws::S3::S3Client::GetObject(req); });
return doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); });
}
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::AbortMultipartUploadRequest & req) { return Aws::S3::S3Client::AbortMultipartUpload(req); });
request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
}
Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::CreateMultipartUploadRequest & req) { return Aws::S3::S3Client::CreateMultipartUpload(req); });
request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
}
Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::CompleteMultipartUploadRequest & req) { return Aws::S3::S3Client::CompleteMultipartUpload(req); });
request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
}
Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const
{
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return Aws::S3::S3Client::CopyObject(req); });
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
}
Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const
{
return doRequest(request, [this](const Model::PutObjectRequest & req) { return Aws::S3::S3Client::PutObject(req); });
return doRequest(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
}
Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartRequest & req) { return Aws::S3::S3Client::UploadPart(req); });
return doRequest(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
}
Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return Aws::S3::S3Client::UploadPartCopy(req); });
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
}
Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return Aws::S3::S3Client::DeleteObject(req); });
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
}
Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return Aws::S3::S3Client::DeleteObjects(req); });
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
}
template <typename RequestType, typename RequestFn>

View File

@ -40,6 +40,11 @@ struct ServerSideEncryptionKMSConfig
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/client/RetryStrategy.h>
namespace MockS3
{
struct Client;
}
namespace DB::S3
{
@ -195,6 +200,8 @@ public:
bool supportsMultiPartCopy() const;
private:
friend struct ::MockS3::Client;
Client(size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& credentials_provider,

17
src/IO/SwapHelper.cpp Normal file
View File

@ -0,0 +1,17 @@
#include <IO/SwapHelper.h>
namespace DB
{
SwapHelper::SwapHelper(BufferBase & b1_, BufferBase & b2_)
: b1(b1_), b2(b2_)
{
b1.swap(b2);
}
SwapHelper::~SwapHelper()
{
b1.swap(b2);
}
}

View File

@ -1,16 +1,19 @@
#pragma once
#include <IO/BufferBase.h>
namespace DB
{
class SwapHelper
{
public:
SwapHelper(BufferBase & b1_, BufferBase & b2_) : b1(b1_), b2(b2_) { b1.swap(b2); }
~SwapHelper() { b1.swap(b2); }
SwapHelper(BufferBase & b1_, BufferBase & b2_);
~SwapHelper();
private:
BufferBase & b1;
BufferBase & b2;
};
}

View File

@ -42,7 +42,8 @@ public:
{
if (!offset())
return;
bytes += offset();
auto bytes_in_buffer = offset();
try
{
@ -54,9 +55,11 @@ public:
* so that later (for example, when the stack was expanded) there was no second attempt to write data.
*/
pos = working_buffer.begin();
bytes += bytes_in_buffer;
throw;
}
bytes += bytes_in_buffer;
pos = working_buffer.begin();
}

View File

@ -1,6 +1,7 @@
#include "WriteBufferFromFileDecorator.h"
#include <IO/WriteBuffer.h>
#include <IO/SwapHelper.h>
namespace DB
{
@ -13,13 +14,19 @@ WriteBufferFromFileDecorator::WriteBufferFromFileDecorator(std::unique_ptr<Write
void WriteBufferFromFileDecorator::finalizeImpl()
{
next();
/// In case of exception in preFinalize as a part of finalize call
/// WriteBufferFromFileDecorator.finalized is set as true
/// but impl->finalized is remain false
/// That leads to situation when the destructor of impl is called with impl->finalized equal false.
if (!is_prefinalized)
WriteBufferFromFileDecorator::preFinalize();
{
SwapHelper swap(*this, *impl);
impl->finalize();
}
}
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
@ -31,12 +38,22 @@ WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// It is not a mistake that swap is called here
/// Swap has been called at constructor, it should be called at destructor
/// In oreder to provide valid buffer for impl's d-tor call
swap(*impl);
}
void WriteBufferFromFileDecorator::sync()
{
next();
{
SwapHelper swap(*this, *impl);
impl->sync();
}
}
std::string WriteBufferFromFileDecorator::getFileName() const
{
@ -45,11 +62,22 @@ std::string WriteBufferFromFileDecorator::getFileName() const
return std::string();
}
void WriteBufferFromFileDecorator::preFinalize()
{
next();
{
SwapHelper swap(*this, *impl);
impl->preFinalize();
}
is_prefinalized = true;
}
void WriteBufferFromFileDecorator::nextImpl()
{
swap(*impl);
SwapHelper swap(*this, *impl);
impl->next();
swap(*impl);
}
}

View File

@ -17,12 +17,7 @@ public:
std::string getFileName() const override;
void preFinalize() override
{
next();
impl->preFinalize();
is_prefinalized = true;
}
void preFinalize() override;
const WriteBuffer & getImpl() const { return *impl; }

File diff suppressed because it is too large Load Diff

View File

@ -4,20 +4,16 @@
#if USE_AWS_S3
#include <memory>
#include <vector>
#include <list>
#include <base/types.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <IO/S3/Requests.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
#include <memory>
#include <vector>
#include <list>
namespace Aws::S3
{
@ -27,8 +23,6 @@ class Client;
namespace DB
{
class WriteBufferFromFile;
/**
* Buffer to write a data to a S3 object with specified bucket and key.
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
@ -45,81 +39,74 @@ public:
const String & key_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ThreadPoolCallbackRunner<void> schedule_ = {},
const WriteSettings & write_settings_ = {});
~WriteBufferFromS3() override;
void nextImpl() override;
void preFinalize() override;
private:
void allocateBuffer();
void processWithStrictParts();
void processWithDynamicParts();
void fillCreateMultipartRequest(S3::CreateMultipartUploadRequest & req);
void createMultipartUpload();
void writePart();
void completeMultipartUpload();
void makeSinglepartUpload();
/// Receives response from the server after sending all data.
void finalizeImpl() override;
struct UploadPartTask;
void fillUploadRequest(S3::UploadPartRequest & req);
void processUploadRequest(UploadPartTask & task);
String getLogDetails() const;
struct PutObjectTask;
void fillPutRequest(S3::PutObjectRequest & req);
void processPutRequest(const PutObjectTask & task);
struct PartData;
void hidePartialData();
void allocateFirstBuffer();
void reallocateFirstBuffer();
void detachBuffer();
void allocateBuffer();
void setFakeBufferWhenPreFinalized();
void waitForReadyBackgroundTasks();
void waitForAllBackgroundTasks();
void waitForAllBackgroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock);
S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);
void writePart(PartData && data);
void writeMultipartUpload();
void createMultipartUpload();
void completeMultipartUpload();
void abortMultipartUpload();
void tryToAbortMultipartUpload();
S3::PutObjectRequest getPutRequest(PartData & data);
void makeSinglepartUpload(PartData && data);
const String bucket;
const String key;
const S3Settings::RequestSettings request_settings;
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const WriteSettings write_settings;
const std::shared_ptr<const S3::Client> client_ptr;
const std::optional<std::map<String, String>> object_metadata;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
/// Strict/static Part size, no adjustments will be done on fly.
size_t strict_upload_part_size = 0;
/// Part size will be adjusted on fly (for bigger uploads)
size_t current_upload_part_size = 0;
std::shared_ptr<Aws::StringStream> temporary_buffer; /// Buffer to accumulate data.
size_t last_part_size = 0;
size_t part_number = 0;
struct BufferAllocationPolicy;
std::unique_ptr<BufferAllocationPolicy> buffer_allocation_policy;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
String multipart_upload_id;
std::vector<String> TSA_GUARDED_BY(bg_tasks_mutex) part_tags;
std::deque<String> multipart_tags;
bool multipart_upload_finished = false;
/// Track that prefinalize() is called only once
bool is_prefinalized = false;
/// Following fields are for background uploads in thread pool (if specified).
/// We use std::function to avoid dependency of Interpreters
const ThreadPoolCallbackRunner<void> schedule;
/// First fully filled buffer has to be delayed
/// There are two ways after:
/// First is to call prefinalize/finalize, which leads to single part upload
/// Second is to write more data, which leads to multi part upload
std::deque<PartData> detached_part_data;
char fake_buffer_when_prefinalized[1] = {};
std::unique_ptr<PutObjectTask> put_object_task; /// Does not need protection by mutex because of the logic around is_finished field.
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) upload_object_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
/// offset() and count() are unstable inside nextImpl
/// For example nextImpl changes position hence offset() and count() is changed
/// This vars are dedicated to store information about sizes when offset() and count() are unstable
size_t total_size = 0;
size_t hidden_size = 0;
std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
WriteSettings write_settings;
class TaskTracker;
std::unique_ptr<TaskTracker> task_tracker;
};
}

View File

@ -0,0 +1,68 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3MemoryStream.h>
namespace DB
{
MemoryStream::MemoryBuf::MemoryBuf(char * begin_, size_t size_)
: begin(begin_)
, size(size_)
{
this->setg(begin, begin, begin + size);
}
MemoryStream::MemoryBuf::int_type MemoryStream::MemoryBuf::underflow()
{
if (gptr() < egptr())
return traits_type::to_int_type(*gptr());
return traits_type::eof();
}
MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekoff(off_type off, std::ios_base::seekdir way,
std::ios_base::openmode mode)
{
bool out_mode = (std::ios_base::out & mode) != 0;
if (out_mode)
return off_type(-1);
off_type ret(-1);
if (way == std::ios_base::beg)
ret = 0;
else if (way == std::ios_base::cur)
ret = gptr() - begin;
else if (way == std::ios_base::end)
ret = size;
if (ret == off_type(-1))
return ret;
ret += off;
if (!(ret >= 0 && size_t(ret) <= size))
return off_type(-1);
this->setg(begin, begin + ret, begin + size);
return pos_type(ret);
}
MemoryStream::MemoryBuf::pos_type MemoryStream::MemoryBuf::seekpos(pos_type sp,
std::ios_base::openmode mode)
{
return seekoff(off_type(sp), std::ios_base::beg, mode);
}
MemoryStream::MemoryStream(char * begin_, size_t size_)
: std::iostream(nullptr)
, mem_buf(begin_, size_)
{
init(&mem_buf);
}
}
#endif

View File

@ -0,0 +1,39 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include "WriteBufferFromS3.h"
#include <iostream>
namespace DB
{
struct MemoryStream: std::iostream
{
struct MemoryBuf: std::streambuf
{
MemoryBuf(char * begin_, size_t size_);
int_type underflow() override;
pos_type seekoff(off_type off, std::ios_base::seekdir way,
std::ios_base::openmode mode) override;
pos_type seekpos(pos_type sp,
std::ios_base::openmode mode) override;
char * begin = nullptr;
size_t size = 0;
};
MemoryStream(char * begin_, size_t size_);
MemoryBuf mem_buf;
};
}
#endif

View File

@ -0,0 +1,137 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3TaskTracker.h>
namespace DB
{
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
{}
WriteBufferFromS3::TaskTracker::~TaskTracker()
{
safeWaitAll();
}
ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
{
return [](Callback && callback, int64_t) mutable -> std::future<void>
{
auto package = std::packaged_task<void()>(std::move(callback));
/// No exceptions are propagated, exceptions are packed to future
package();
return package.get_future();
};
}
void WriteBufferFromS3::TaskTracker::getReady()
{
LOG_TEST(log, "getReady, in queue {}", futures.size());
/// Exceptions are propagated
auto it = futures.begin();
while (it != futures.end())
{
chassert(it->valid());
if (it->wait_for(std::chrono::seconds(0)) != std::future_status::ready)
{
++it;
continue;
}
try
{
it->get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
it = futures.erase(it);
}
LOG_TEST(log, "getReady ended, in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::getAll()
{
LOG_TEST(log, "getAll, in queue {}", futures.size());
/// Exceptions are propagated
for (auto & future : futures)
{
try
{
future.get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
futures.clear();
}
void WriteBufferFromS3::TaskTracker::safeWaitAll()
{
LOG_TEST(log, "safeWaitAll, wait in queue {}", futures.size());
/// Exceptions are not propagated
for (auto & future : futures)
{
LOG_TEST(log, "safeWaitAll, wait future");
if (future.valid())
future.wait();
}
LOG_TEST(log, "safeWaitAll, get in queue {}", futures.size());
for (auto & future : futures)
{
if (future.valid())
{
try
{
future.get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
futures.clear();
LOG_TEST(log, "safeWaitAll ended, get in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::add(Callback && func)
{
LOG_TEST(log, "add, in queue {}", futures.size());
auto future = scheduler(std::move(func), 0);
auto exit_scope = scope_guard(
[&future]()
{
future.wait();
}
);
futures.push_back(std::move(future));
exit_scope.release();
LOG_TEST(log, "add ended, in queue {}", futures.size());
}
bool WriteBufferFromS3::TaskTracker::isAsync() const
{
return is_async;
}
}
#endif

View File

@ -0,0 +1,37 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include "WriteBufferFromS3.h"
namespace DB
{
class WriteBufferFromS3::TaskTracker
{
public:
using Callback = std::function<void()>;
explicit TaskTracker(ThreadPoolCallbackRunner<void> scheduler_);
~TaskTracker();
static ThreadPoolCallbackRunner<void> syncRunner();
bool isAsync() const;
void getReady();
void getAll();
void safeWaitAll();
void add(Callback && func);
private:
bool is_async;
ThreadPoolCallbackRunner<void> scheduler;
std::list<std::future<void>> futures;
Poco::Logger * log = &Poco::Logger::get("TaskTracker");
};
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -290,6 +290,7 @@ void MergeTreeData::initializeDirectoriesAndFormatVersion(const std::string & re
{
auto buf = disk->writeFile(format_version_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, getContext()->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
buf->finalize();
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
}

View File

@ -160,7 +160,10 @@ void MergeTreeDeduplicationLog::rotate()
existing_logs.emplace(current_log_number, log_description);
if (current_writer)
{
current_writer->finalize();
current_writer->sync();
}
current_writer = disk->writeFile(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
}

View File

@ -75,6 +75,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
TransactionID::write(tid, *out);
*out << "\n";
}
out->finalize();
out->sync();
}
catch (...)

View File

@ -777,7 +777,6 @@ public:
key,
configuration_.request_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelWrite"),
context->getWriteSettings()),
compression_method,

View File

@ -6,6 +6,7 @@ SET skip_download_if_exceeds_query_cache=1;
SET filesystem_cache_max_download_size=128;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;

View File

@ -9,8 +9,8 @@ SET filesystem_cache_max_download_size=128;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
SYSTEM DROP FILESYSTEM CACHE;

View File

@ -1,6 +1,6 @@
Using storage policy: s3_cache
0
0
0 0
Row 1:
──────
file_segment_range_begin: 0
@ -8,11 +8,11 @@ file_segment_range_end: 745
size: 746
state: DOWNLOADED
8
8
8 1100
0
2
2
8
8 1100
Row 1:
──────
file_segment_range_begin: 0
@ -20,17 +20,17 @@ file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
8
8
8
8
24
35
43
8 2014
8 2014
8 2014
24 84045
35 168815
44 252113
5010500
18816
Using storage policy: local_cache
0
0
0 0
Row 1:
──────
file_segment_range_begin: 0
@ -38,11 +38,11 @@ file_segment_range_end: 745
size: 746
state: DOWNLOADED
8
8
8 1100
0
2
2
8
8 1100
Row 1:
──────
file_segment_range_begin: 0
@ -50,11 +50,11 @@ file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
8
8
8
8
24
35
43
8 2014
8 2014
8 2014
24 84045
35 168815
44 252113
5010500
18816

View File

@ -33,7 +33,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
FORMAT Vertical"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
@ -54,7 +54,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
FORMAT Vertical"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
@ -64,7 +64,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
$CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --query "SELECT count() size FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) size FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE"
@ -87,24 +87,23 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
FORMAT Vertical;"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM START MERGES test_02241"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)"
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"

View File

@ -8,7 +8,7 @@ SYSTEM STOP MERGES nopers;
INSERT INTO nopers SELECT number, toString(number) FROM numbers(10);
SELECT * FROM nopers FORMAT Null;
SELECT sum(size) FROM system.filesystem_cache;
194
195
SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size
FROM
(
@ -21,17 +21,18 @@ ON data_paths.cache_path = caches.cache_path
ORDER BY file, cache, size;
data.bin 0 114
data.mrk3 0 80
format_version.txt 0 1
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_small', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM STOP MERGES test;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT sum(size) FROM system.filesystem_cache;
1020
1021
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
4
5
SELECT count() FROM system.filesystem_cache;
4
5
SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size
FROM
(
@ -46,17 +47,18 @@ data.bin 0 114
data.bin 0 746
data.mrk3 0 80
data.mrk3 0_persistent 80
format_version.txt 0 1
DROP TABLE IF EXISTS test2;
CREATE TABLE test2 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_small', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM STOP MERGES test2;
INSERT INTO test2 SELECT number, toString(number) FROM numbers(100000);
SELECT * FROM test2 FORMAT Null;
SELECT sum(size) FROM system.filesystem_cache;
794
795
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
4
5
SELECT count() FROM system.filesystem_cache;
4
5
SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size
FROM
(
@ -71,6 +73,7 @@ data.bin 0 114
data.mrk3 0 80
data.mrk3 0_persistent 80
data.mrk3 0_persistent 520
format_version.txt 0 1
DROP TABLE test;
DROP TABLE test2;
DROP TABLE nopers;

View File

@ -1,8 +1,8 @@
INSERT TO S3
[ 0 ] S3CompleteMultipartUpload: 1
[ 0 ] S3CreateMultipartUpload: 1
[ 0 ] S3HeadObject: 1
[ 0 ] S3ReadRequestsCount: 1
[ 0 ] S3HeadObject: 2
[ 0 ] S3ReadRequestsCount: 2
[ 0 ] S3UploadPart: 1
[ 0 ] S3WriteRequestsCount: 3
CHECK WITH query_log