mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 17:50:47 +00:00
Revert "Revert "Revert "Merge pull request #34219 from ClickHouse/revert-34212-revert-33291-add-pool-to-s3-write-buffer"""
This reverts commit fb77d7a7d5
.
This commit is contained in:
parent
40b86e3dae
commit
2bc2ea485e
@ -8,22 +8,15 @@
|
||||
namespace DB
|
||||
{
|
||||
/**
|
||||
* This buffer writes to cache, but after finalize() copy written file from cache to disk.
|
||||
* Write buffer with possibility to set and invoke callback after 'finalize' call.
|
||||
*/
|
||||
class WritingToCacheWriteBuffer final : public WriteBufferFromFileDecorator
|
||||
class CompletionAwareWriteBuffer : public WriteBufferFromFileDecorator
|
||||
{
|
||||
public:
|
||||
WritingToCacheWriteBuffer(
|
||||
std::unique_ptr<WriteBufferFromFileBase> impl_,
|
||||
std::function<std::unique_ptr<ReadBuffer>()> create_read_buffer_,
|
||||
std::function<std::unique_ptr<WriteBuffer>()> create_write_buffer_)
|
||||
: WriteBufferFromFileDecorator(std::move(impl_))
|
||||
, create_read_buffer(std::move(create_read_buffer_))
|
||||
, create_write_buffer(std::move(create_write_buffer_))
|
||||
{
|
||||
}
|
||||
CompletionAwareWriteBuffer(std::unique_ptr<WriteBufferFromFileBase> impl_, std::function<void()> completion_callback_)
|
||||
: WriteBufferFromFileDecorator(std::move(impl_)), completion_callback(completion_callback_) { }
|
||||
|
||||
~WritingToCacheWriteBuffer() override
|
||||
~CompletionAwareWriteBuffer() override
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -35,36 +28,15 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void preFinalize() override
|
||||
{
|
||||
impl->next();
|
||||
impl->preFinalize();
|
||||
impl->finalize();
|
||||
|
||||
read_buffer = create_read_buffer();
|
||||
write_buffer = create_write_buffer();
|
||||
copyData(*read_buffer, *write_buffer);
|
||||
write_buffer->next();
|
||||
write_buffer->preFinalize();
|
||||
|
||||
is_prefinalized = true;
|
||||
}
|
||||
|
||||
void finalizeImpl() override
|
||||
{
|
||||
if (!is_prefinalized)
|
||||
preFinalize();
|
||||
WriteBufferFromFileDecorator::finalizeImpl();
|
||||
|
||||
write_buffer->finalize();
|
||||
completion_callback();
|
||||
}
|
||||
|
||||
private:
|
||||
std::function<std::unique_ptr<ReadBuffer>()> create_read_buffer;
|
||||
std::function<std::unique_ptr<WriteBuffer>()> create_write_buffer;
|
||||
std::unique_ptr<ReadBuffer> read_buffer;
|
||||
std::unique_ptr<WriteBuffer> write_buffer;
|
||||
|
||||
bool is_prefinalized = false;
|
||||
const std::function<void()> completion_callback;
|
||||
};
|
||||
|
||||
enum FileDownloadStatus
|
||||
@ -193,22 +165,21 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
|
||||
if (!cache_file_predicate(path))
|
||||
return DiskDecorator::writeFile(path, buf_size, mode);
|
||||
|
||||
LOG_TEST(log, "Write file {} to cache", backQuote(path));
|
||||
LOG_TRACE(log, "Write file {} to cache", backQuote(path));
|
||||
|
||||
auto dir_path = directoryPath(path);
|
||||
if (!cache_disk->exists(dir_path))
|
||||
cache_disk->createDirectories(dir_path);
|
||||
|
||||
return std::make_unique<WritingToCacheWriteBuffer>(
|
||||
return std::make_unique<CompletionAwareWriteBuffer>(
|
||||
cache_disk->writeFile(path, buf_size, mode),
|
||||
[this, path]()
|
||||
{
|
||||
/// Copy file from cache to actual disk when cached buffer is finalized.
|
||||
return cache_disk->readFile(path, ReadSettings(), /* read_hint= */ {}, /* file_size= */ {});
|
||||
},
|
||||
[this, path, buf_size, mode]()
|
||||
{
|
||||
return DiskDecorator::writeFile(path, buf_size, mode);
|
||||
/// Copy file from cache to actual disk when cached buffer is finalized.
|
||||
auto src_buffer = cache_disk->readFile(path, ReadSettings(), /* read_hint= */ {}, /* file_size= */ {});
|
||||
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode);
|
||||
copyData(*src_buffer, *dst_buffer);
|
||||
dst_buffer->finalize();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -151,11 +151,6 @@ void DiskDecorator::removeSharedFile(const String & path, bool keep_s3)
|
||||
delegate->removeSharedFile(path, keep_s3);
|
||||
}
|
||||
|
||||
void DiskDecorator::removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
|
||||
{
|
||||
delegate->removeSharedFiles(files, keep_in_remote_fs);
|
||||
}
|
||||
|
||||
void DiskDecorator::removeSharedRecursive(const String & path, bool keep_s3)
|
||||
{
|
||||
delegate->removeSharedRecursive(path, keep_s3);
|
||||
|
@ -52,7 +52,6 @@ public:
|
||||
void removeRecursive(const String & path) override;
|
||||
void removeSharedFile(const String & path, bool keep_s3) override;
|
||||
void removeSharedRecursive(const String & path, bool keep_s3) override;
|
||||
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs) override;
|
||||
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
|
||||
Poco::Timestamp getLastModified(const String & path) override;
|
||||
void setReadOnly(const String & path) override;
|
||||
|
@ -234,12 +234,6 @@ void DiskRestartProxy::removeSharedFile(const String & path, bool keep_s3)
|
||||
DiskDecorator::removeSharedFile(path, keep_s3);
|
||||
}
|
||||
|
||||
void DiskRestartProxy::removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
|
||||
{
|
||||
ReadLock lock (mutex);
|
||||
DiskDecorator::removeSharedFiles(files, keep_in_remote_fs);
|
||||
}
|
||||
|
||||
void DiskRestartProxy::removeSharedRecursive(const String & path, bool keep_s3)
|
||||
{
|
||||
ReadLock lock (mutex);
|
||||
|
@ -54,7 +54,6 @@ public:
|
||||
void removeDirectory(const String & path) override;
|
||||
void removeRecursive(const String & path) override;
|
||||
void removeSharedFile(const String & path, bool keep_s3) override;
|
||||
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs) override;
|
||||
void removeSharedRecursive(const String & path, bool keep_s3) override;
|
||||
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
|
||||
Poco::Timestamp getLastModified(const String & path) override;
|
||||
|
@ -197,32 +197,6 @@ public:
|
||||
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
|
||||
virtual void removeSharedFileIfExists(const String & path, bool) { removeFileIfExists(path); }
|
||||
|
||||
struct RemoveRequest
|
||||
{
|
||||
String path;
|
||||
bool if_exists = false;
|
||||
|
||||
explicit RemoveRequest(String path_, bool if_exists_ = false)
|
||||
: path(std::move(path_)), if_exists(std::move(if_exists_))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
using RemoveBatchRequest = std::vector<RemoveRequest>;
|
||||
|
||||
/// Batch request to remove multiple files.
|
||||
/// May be much faster for blob storage.
|
||||
virtual void removeSharedFiles(const RemoveBatchRequest & files, bool keep_in_remote_fs)
|
||||
{
|
||||
for (const auto & file : files)
|
||||
{
|
||||
if (file.if_exists)
|
||||
removeSharedFileIfExists(file.path, keep_in_remote_fs);
|
||||
else
|
||||
removeSharedFile(file.path, keep_in_remote_fs);
|
||||
}
|
||||
}
|
||||
|
||||
/// Set last modified time to file or directory at `path`.
|
||||
virtual void setLastModified(const String & path, const Poco::Timestamp & timestamp) = 0;
|
||||
|
||||
|
@ -415,20 +415,6 @@ void IDiskRemote::removeSharedFileIfExists(const String & path, bool delete_meta
|
||||
}
|
||||
}
|
||||
|
||||
void IDiskRemote::removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only)
|
||||
{
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
for (const auto & file : files)
|
||||
{
|
||||
bool skip = file.if_exists && !metadata_disk->exists(file.path);
|
||||
if (!skip)
|
||||
removeMetadata(file.path, fs_paths_keeper);
|
||||
}
|
||||
|
||||
if (!delete_metadata_only)
|
||||
removeFromRemoteFS(fs_paths_keeper);
|
||||
}
|
||||
|
||||
void IDiskRemote::removeSharedRecursive(const String & path, bool delete_metadata_only)
|
||||
{
|
||||
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
|
||||
@ -568,12 +554,4 @@ UInt32 IDiskRemote::getRefCount(const String & path) const
|
||||
return readMetadata(path).ref_count;
|
||||
}
|
||||
|
||||
ThreadPool & IDiskRemote::getThreadPoolWriter()
|
||||
{
|
||||
constexpr size_t pool_size = 100;
|
||||
constexpr size_t queue_size = 1000000;
|
||||
static ThreadPool writer(pool_size, pool_size, queue_size);
|
||||
return writer;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -100,8 +100,6 @@ public:
|
||||
|
||||
void removeSharedFileIfExists(const String & path, bool delete_metadata_only) override;
|
||||
|
||||
void removeSharedFiles(const RemoveBatchRequest & files, bool delete_metadata_only) override;
|
||||
|
||||
void removeSharedRecursive(const String & path, bool delete_metadata_only) override;
|
||||
|
||||
void listFiles(const String & path, std::vector<String> & file_names) override;
|
||||
@ -139,7 +137,6 @@ public:
|
||||
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;
|
||||
|
||||
static AsynchronousReaderPtr getThreadPoolReader();
|
||||
static ThreadPool & getThreadPoolWriter();
|
||||
|
||||
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
|
||||
|
||||
|
@ -9,7 +9,6 @@
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <base/scope_guard_safe.h>
|
||||
#include <base/unit.h>
|
||||
#include <base/FnTraits.h>
|
||||
|
||||
@ -262,21 +261,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
LOG_TRACE(log, "{} to file by path: {}. S3 path: {}",
|
||||
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), remote_fs_root_path + s3_path);
|
||||
|
||||
ScheduleFunc schedule = [pool = &getThreadPoolWriter(), thread_group = CurrentThread::getGroup()](auto callback)
|
||||
{
|
||||
pool->scheduleOrThrow([callback = std::move(callback), thread_group]()
|
||||
{
|
||||
if (thread_group)
|
||||
CurrentThread::attachTo(thread_group);
|
||||
|
||||
SCOPE_EXIT_SAFE(
|
||||
if (thread_group)
|
||||
CurrentThread::detachQueryIfNotDetached();
|
||||
);
|
||||
callback();
|
||||
});
|
||||
};
|
||||
|
||||
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
|
||||
settings->client,
|
||||
bucket,
|
||||
@ -284,8 +268,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
|
||||
settings->s3_min_upload_part_size,
|
||||
settings->s3_max_single_part_upload_size,
|
||||
std::move(object_metadata),
|
||||
buf_size,
|
||||
std::move(schedule));
|
||||
buf_size);
|
||||
|
||||
auto create_metadata_callback = [this, path, s3_path, mode] (size_t count)
|
||||
{
|
||||
|
@ -104,14 +104,10 @@ public:
|
||||
++pos;
|
||||
}
|
||||
|
||||
/// This method may be called before finalize() to tell there would not be any more data written.
|
||||
/// Used does not have to call it, implementation should check it itself if needed.
|
||||
///
|
||||
/// The idea is similar to prefetch. In case if all data is written, we can flush the buffer
|
||||
/// and start sending data asynchronously. It may improve writing performance in case you have
|
||||
/// multiple files to finalize. Mainly, for blob storage, finalization has high latency,
|
||||
/// and calling preFinalize in a loop may parallelize it.
|
||||
virtual void preFinalize() { next(); }
|
||||
virtual void sync()
|
||||
{
|
||||
next();
|
||||
}
|
||||
|
||||
/// Write the last data.
|
||||
void finalize()
|
||||
@ -134,13 +130,6 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for data to be reliably written. Mainly, call fsync for fd.
|
||||
/// May be called after finalize() if needed.
|
||||
virtual void sync()
|
||||
{
|
||||
next();
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void finalizeImpl()
|
||||
{
|
||||
|
@ -14,10 +14,6 @@ WriteBufferFromFileDecorator::WriteBufferFromFileDecorator(std::unique_ptr<Write
|
||||
void WriteBufferFromFileDecorator::finalizeImpl()
|
||||
{
|
||||
next();
|
||||
|
||||
if (!is_prefinalized)
|
||||
WriteBufferFromFileDecorator::preFinalize();
|
||||
|
||||
impl->finalize();
|
||||
}
|
||||
|
||||
|
@ -17,15 +17,6 @@ public:
|
||||
|
||||
std::string getFileName() const override;
|
||||
|
||||
void preFinalize() override
|
||||
{
|
||||
next();
|
||||
impl->preFinalize();
|
||||
is_prefinalized = true;
|
||||
}
|
||||
|
||||
const WriteBuffer & getImpl() const { return *impl; }
|
||||
|
||||
protected:
|
||||
void finalizeImpl() override;
|
||||
|
||||
@ -33,8 +24,6 @@ protected:
|
||||
|
||||
private:
|
||||
void nextImpl() override;
|
||||
|
||||
bool is_prefinalized = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -34,20 +34,6 @@ namespace ErrorCodes
|
||||
extern const int S3_ERROR;
|
||||
}
|
||||
|
||||
struct WriteBufferFromS3::UploadPartTask
|
||||
{
|
||||
Aws::S3::Model::UploadPartRequest req;
|
||||
bool is_finised = false;
|
||||
std::string tag;
|
||||
std::exception_ptr exception;
|
||||
};
|
||||
|
||||
struct WriteBufferFromS3::PutObjectTask
|
||||
{
|
||||
Aws::S3::Model::PutObjectRequest req;
|
||||
bool is_finised = false;
|
||||
std::exception_ptr exception;
|
||||
};
|
||||
|
||||
WriteBufferFromS3::WriteBufferFromS3(
|
||||
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
||||
@ -56,8 +42,7 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
size_t minimum_upload_part_size_,
|
||||
size_t max_single_part_upload_size_,
|
||||
std::optional<std::map<String, String>> object_metadata_,
|
||||
size_t buffer_size_,
|
||||
ScheduleFunc schedule_)
|
||||
size_t buffer_size_)
|
||||
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
|
||||
, bucket(bucket_)
|
||||
, key(key_)
|
||||
@ -65,7 +50,6 @@ WriteBufferFromS3::WriteBufferFromS3(
|
||||
, client_ptr(std::move(client_ptr_))
|
||||
, minimum_upload_part_size(minimum_upload_part_size_)
|
||||
, max_single_part_upload_size(max_single_part_upload_size_)
|
||||
, schedule(std::move(schedule_))
|
||||
{
|
||||
allocateBuffer();
|
||||
}
|
||||
@ -90,8 +74,6 @@ void WriteBufferFromS3::nextImpl()
|
||||
writePart();
|
||||
allocateBuffer();
|
||||
}
|
||||
|
||||
waitForReadyBackGroundTasks();
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::allocateBuffer()
|
||||
@ -106,7 +88,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
|
||||
finalize();
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::preFinalize()
|
||||
void WriteBufferFromS3::finalizeImpl()
|
||||
{
|
||||
next();
|
||||
|
||||
@ -118,20 +100,8 @@ void WriteBufferFromS3::preFinalize()
|
||||
{
|
||||
/// Write rest of the data as last part.
|
||||
writePart();
|
||||
}
|
||||
|
||||
is_prefinalized = true;
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::finalizeImpl()
|
||||
{
|
||||
if (!is_prefinalized)
|
||||
preFinalize();
|
||||
|
||||
waitForAllBackGroundTasks();
|
||||
|
||||
if (!multipart_upload_id.empty())
|
||||
completeMultipartUpload();
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::createMultipartUpload()
|
||||
@ -174,68 +144,22 @@ void WriteBufferFromS3::writePart()
|
||||
LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload.");
|
||||
}
|
||||
|
||||
if (schedule)
|
||||
{
|
||||
UploadPartTask * task = nullptr;
|
||||
int part_number;
|
||||
{
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
task = &upload_object_tasks.emplace_back();
|
||||
++num_added_bg_tasks;
|
||||
part_number = num_added_bg_tasks;
|
||||
}
|
||||
Aws::S3::Model::UploadPartRequest req;
|
||||
|
||||
fillUploadRequest(task->req, part_number);
|
||||
schedule([this, task]()
|
||||
{
|
||||
try
|
||||
{
|
||||
processUploadRequest(*task);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
task->exception = std::current_exception();
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
task->is_finised = true;
|
||||
++num_finished_bg_tasks;
|
||||
|
||||
/// Notification under mutex is important here.
|
||||
/// Othervies, WriteBuffer could be destroyed in between
|
||||
/// Releasing lock and condvar notification.
|
||||
bg_tasks_condvar.notify_one();
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
UploadPartTask task;
|
||||
fillUploadRequest(task.req, part_tags.size() + 1);
|
||||
processUploadRequest(task);
|
||||
part_tags.push_back(task.tag);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number)
|
||||
{
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
req.SetPartNumber(part_number);
|
||||
req.SetPartNumber(part_tags.size() + 1);
|
||||
req.SetUploadId(multipart_upload_id);
|
||||
req.SetContentLength(temporary_buffer->tellp());
|
||||
req.SetContentLength(size);
|
||||
req.SetBody(temporary_buffer);
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
|
||||
{
|
||||
auto outcome = client_ptr->UploadPart(task.req);
|
||||
auto outcome = client_ptr->UploadPart(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
task.tag = outcome.GetResult().GetETag();
|
||||
LOG_DEBUG(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, task.tag, part_tags.size());
|
||||
auto etag = outcome.GetResult().GetETag();
|
||||
part_tags.push_back(etag);
|
||||
LOG_DEBUG(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, etag, part_tags.size());
|
||||
}
|
||||
else
|
||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
@ -267,19 +191,14 @@ void WriteBufferFromS3::completeMultipartUpload()
|
||||
if (outcome.IsSuccess())
|
||||
LOG_DEBUG(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, part_tags.size());
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::S3_ERROR, "{} Tags:{}",
|
||||
outcome.GetError().GetMessage(),
|
||||
fmt::join(part_tags.begin(), part_tags.end(), " "));
|
||||
}
|
||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::makeSinglepartUpload()
|
||||
{
|
||||
auto size = temporary_buffer->tellp();
|
||||
bool with_pool = bool(schedule);
|
||||
|
||||
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}, WithPool: {}", bucket, key, size, with_pool);
|
||||
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}, Size: {}", bucket, key, size);
|
||||
|
||||
if (size < 0)
|
||||
throw Exception("Failed to make single part upload. Buffer in invalid state", ErrorCodes::S3_ERROR);
|
||||
@ -290,114 +209,22 @@ void WriteBufferFromS3::makeSinglepartUpload()
|
||||
return;
|
||||
}
|
||||
|
||||
if (schedule)
|
||||
{
|
||||
put_object_task = std::make_unique<PutObjectTask>();
|
||||
fillPutRequest(put_object_task->req);
|
||||
schedule([this]()
|
||||
{
|
||||
try
|
||||
{
|
||||
processPutRequest(*put_object_task);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
put_object_task->exception = std::current_exception();
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
put_object_task->is_finised = true;
|
||||
|
||||
/// Notification under mutex is important here.
|
||||
/// Othervies, WriteBuffer could be destroyed in between
|
||||
/// Releasing lock and condvar notification.
|
||||
bg_tasks_condvar.notify_one();
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
PutObjectTask task;
|
||||
fillPutRequest(task.req);
|
||||
processPutRequest(task);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
|
||||
{
|
||||
Aws::S3::Model::PutObjectRequest req;
|
||||
req.SetBucket(bucket);
|
||||
req.SetKey(key);
|
||||
req.SetContentLength(temporary_buffer->tellp());
|
||||
req.SetContentLength(size);
|
||||
req.SetBody(temporary_buffer);
|
||||
if (object_metadata.has_value())
|
||||
req.SetMetadata(object_metadata.value());
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::processPutRequest(PutObjectTask & task)
|
||||
{
|
||||
auto outcome = client_ptr->PutObject(task.req);
|
||||
bool with_pool = bool(schedule);
|
||||
auto outcome = client_ptr->PutObject(req);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
|
||||
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}", bucket, key, req.GetContentLength());
|
||||
else
|
||||
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::waitForReadyBackGroundTasks()
|
||||
{
|
||||
if (schedule)
|
||||
{
|
||||
std::lock_guard lock(bg_tasks_mutex);
|
||||
{
|
||||
while (!upload_object_tasks.empty() && upload_object_tasks.front().is_finised)
|
||||
{
|
||||
auto & task = upload_object_tasks.front();
|
||||
auto exception = std::move(task.exception);
|
||||
auto tag = std::move(task.tag);
|
||||
upload_object_tasks.pop_front();
|
||||
|
||||
if (exception)
|
||||
{
|
||||
waitForAllBackGroundTasks();
|
||||
std::rethrow_exception(exception);
|
||||
}
|
||||
|
||||
part_tags.push_back(tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferFromS3::waitForAllBackGroundTasks()
|
||||
{
|
||||
if (schedule)
|
||||
{
|
||||
std::unique_lock lock(bg_tasks_mutex);
|
||||
bg_tasks_condvar.wait(lock, [this]() { return num_added_bg_tasks == num_finished_bg_tasks; });
|
||||
|
||||
while (!upload_object_tasks.empty())
|
||||
{
|
||||
auto & task = upload_object_tasks.front();
|
||||
if (task.exception)
|
||||
std::rethrow_exception(std::move(task.exception));
|
||||
|
||||
part_tags.push_back(task.tag);
|
||||
|
||||
upload_object_tasks.pop_front();
|
||||
}
|
||||
|
||||
if (put_object_task)
|
||||
{
|
||||
bg_tasks_condvar.wait(lock, [this]() { return put_object_task->is_finised; });
|
||||
if (put_object_task->exception)
|
||||
std::rethrow_exception(std::move(put_object_task->exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -14,24 +14,14 @@
|
||||
|
||||
# include <aws/core/utils/memory/stl/AWSStringStream.h>
|
||||
|
||||
# include <Common/ThreadPool.h>
|
||||
|
||||
namespace Aws::S3
|
||||
{
|
||||
class S3Client;
|
||||
}
|
||||
|
||||
namespace Aws::S3::Model
|
||||
{
|
||||
class UploadPartRequest;
|
||||
class PutObjectRequest;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using ScheduleFunc = std::function<void(std::function<void()>)>;
|
||||
|
||||
/**
|
||||
* Buffer to write a data to a S3 object with specified bucket and key.
|
||||
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
|
||||
@ -39,7 +29,7 @@ using ScheduleFunc = std::function<void(std::function<void()>)>;
|
||||
* Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
|
||||
* Each chunk is written as a part to S3.
|
||||
*/
|
||||
class WriteBufferFromS3 final : public BufferWithOwnMemory<WriteBuffer>
|
||||
class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
|
||||
{
|
||||
public:
|
||||
explicit WriteBufferFromS3(
|
||||
@ -49,15 +39,12 @@ public:
|
||||
size_t minimum_upload_part_size_,
|
||||
size_t max_single_part_upload_size_,
|
||||
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
ScheduleFunc schedule_ = {});
|
||||
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
|
||||
~WriteBufferFromS3() override;
|
||||
|
||||
void nextImpl() override;
|
||||
|
||||
void preFinalize() override;
|
||||
|
||||
private:
|
||||
void allocateBuffer();
|
||||
|
||||
@ -70,17 +57,6 @@ private:
|
||||
/// Receives response from the server after sending all data.
|
||||
void finalizeImpl() override;
|
||||
|
||||
struct UploadPartTask;
|
||||
void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req, int part_number);
|
||||
void processUploadRequest(UploadPartTask & task);
|
||||
|
||||
struct PutObjectTask;
|
||||
void fillPutRequest(Aws::S3::Model::PutObjectRequest & req);
|
||||
void processPutRequest(PutObjectTask & task);
|
||||
|
||||
void waitForReadyBackGroundTasks();
|
||||
void waitForAllBackGroundTasks();
|
||||
|
||||
String bucket;
|
||||
String key;
|
||||
std::optional<std::map<String, String>> object_metadata;
|
||||
@ -96,18 +72,6 @@ private:
|
||||
String multipart_upload_id;
|
||||
std::vector<String> part_tags;
|
||||
|
||||
bool is_prefinalized = false;
|
||||
|
||||
/// Following fields are for background uploads in thread pool (if specified).
|
||||
/// We use std::function to avoid dependency of Interpreters
|
||||
ScheduleFunc schedule;
|
||||
std::unique_ptr<PutObjectTask> put_object_task;
|
||||
std::list<UploadPartTask> upload_object_tasks;
|
||||
size_t num_added_bg_tasks = 0;
|
||||
size_t num_finished_bg_tasks = 0;
|
||||
std::mutex bg_tasks_mutex;
|
||||
std::condition_variable bg_tasks_condvar;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
|
||||
};
|
||||
|
||||
|
@ -40,12 +40,10 @@ struct BlockIO
|
||||
pipeline.reset();
|
||||
}
|
||||
|
||||
void onException()
|
||||
void onException() const
|
||||
{
|
||||
if (exception_callback)
|
||||
exception_callback();
|
||||
|
||||
pipeline.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -300,7 +300,7 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
|
||||
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
|
||||
|
||||
if (hashing_out.count() != size)
|
||||
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}, expected {} got {}", path, hashing_out.count(), size);
|
||||
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}", path);
|
||||
|
||||
writePODBinary(hashing_out.getHash(), out);
|
||||
|
||||
@ -595,7 +595,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
||||
CompressionCodecFactory::instance().get("NONE", {}));
|
||||
|
||||
part_out.write(block);
|
||||
part_out.finalizePart(new_projection_part, false);
|
||||
part_out.writeSuffixAndFinalizePart(new_projection_part);
|
||||
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
|
||||
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
|
||||
}
|
||||
@ -619,7 +619,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
||||
CompressionCodecFactory::instance().get("NONE", {}));
|
||||
|
||||
part_out.write(block);
|
||||
part_out.finalizePart(new_data_part, false);
|
||||
part_out.writeSuffixAndFinalizePart(new_data_part);
|
||||
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
|
||||
|
||||
return new_data_part;
|
||||
|
@ -99,7 +99,7 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Dis
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::store(
|
||||
void IMergeTreeDataPart::MinMaxIndex::store(
|
||||
const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & out_checksums) const
|
||||
{
|
||||
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
||||
@ -108,10 +108,10 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
|
||||
auto minmax_column_names = data.getMinMaxColumnsNames(partition_key);
|
||||
auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
|
||||
|
||||
return store(minmax_column_names, minmax_column_types, disk_, part_path, out_checksums);
|
||||
store(minmax_column_names, minmax_column_types, disk_, part_path, out_checksums);
|
||||
}
|
||||
|
||||
IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::store(
|
||||
void IMergeTreeDataPart::MinMaxIndex::store(
|
||||
const Names & column_names,
|
||||
const DataTypes & data_types,
|
||||
const DiskPtr & disk_,
|
||||
@ -122,8 +122,6 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
|
||||
throw Exception("Attempt to store uninitialized MinMax index for part " + part_path + ". This is a bug.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
WrittenFiles written_files;
|
||||
|
||||
for (size_t i = 0; i < column_names.size(); ++i)
|
||||
{
|
||||
String file_name = "minmax_" + escapeForFileName(column_names[i]) + ".idx";
|
||||
@ -136,11 +134,8 @@ IMergeTreeDataPart::MinMaxIndex::WrittenFiles IMergeTreeDataPart::MinMaxIndex::s
|
||||
out_hashing.next();
|
||||
out_checksums.files[file_name].file_size = out_hashing.count();
|
||||
out_checksums.files[file_name].file_hash = out_hashing.getHash();
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
return written_files;
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::MinMaxIndex::update(const Block & block, const Names & column_names)
|
||||
@ -1276,7 +1271,6 @@ void IMergeTreeDataPart::remove() const
|
||||
try
|
||||
{
|
||||
/// Remove each expected file in directory, then remove directory itself.
|
||||
IDisk::RemoveBatchRequest request;
|
||||
|
||||
#if !defined(__clang__)
|
||||
# pragma GCC diagnostic push
|
||||
@ -1285,19 +1279,18 @@ void IMergeTreeDataPart::remove() const
|
||||
for (const auto & [file, _] : checksums.files)
|
||||
{
|
||||
if (projection_directories.find(file) == projection_directories.end())
|
||||
request.emplace_back(fs::path(to) / file);
|
||||
disk->removeSharedFile(fs::path(to) / file, *keep_shared_data);
|
||||
}
|
||||
#if !defined(__clang__)
|
||||
# pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
for (const auto & file : {"checksums.txt", "columns.txt"})
|
||||
request.emplace_back(fs::path(to) / file);
|
||||
disk->removeSharedFile(fs::path(to) / file, *keep_shared_data);
|
||||
|
||||
request.emplace_back(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, true);
|
||||
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
|
||||
disk->removeSharedFileIfExists(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, *keep_shared_data);
|
||||
disk->removeSharedFileIfExists(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, *keep_shared_data);
|
||||
|
||||
disk->removeSharedFiles(request, *keep_shared_data);
|
||||
disk->removeDirectory(to);
|
||||
}
|
||||
catch (...)
|
||||
@ -1331,24 +1324,22 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
|
||||
try
|
||||
{
|
||||
/// Remove each expected file in directory, then remove directory itself.
|
||||
IDisk::RemoveBatchRequest request;
|
||||
|
||||
#if !defined(__clang__)
|
||||
# pragma GCC diagnostic push
|
||||
# pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#endif
|
||||
for (const auto & [file, _] : checksums.files)
|
||||
request.emplace_back(fs::path(to) / file);
|
||||
disk->removeSharedFile(fs::path(to) / file, keep_shared_data);
|
||||
#if !defined(__clang__)
|
||||
# pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
for (const auto & file : {"checksums.txt", "columns.txt"})
|
||||
request.emplace_back(fs::path(to) / file);
|
||||
request.emplace_back(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, true);
|
||||
request.emplace_back(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, true);
|
||||
disk->removeSharedFile(fs::path(to) / file, keep_shared_data);
|
||||
disk->removeSharedFileIfExists(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_shared_data);
|
||||
disk->removeSharedFileIfExists(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_shared_data);
|
||||
|
||||
disk->removeSharedFiles(request, keep_shared_data);
|
||||
disk->removeSharedRecursive(to, keep_shared_data);
|
||||
}
|
||||
catch (...)
|
||||
|
@ -300,11 +300,9 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
|
||||
|
||||
void load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path);
|
||||
[[nodiscard]] WrittenFiles store(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
|
||||
[[nodiscard]] WrittenFiles store(const Names & column_names, const DataTypes & data_types, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
|
||||
void store(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
|
||||
void store(const Names & column_names, const DataTypes & data_types, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
|
||||
|
||||
void update(const Block & block, const Names & column_names);
|
||||
void merge(const MinMaxIndex & other);
|
||||
|
@ -32,9 +32,7 @@ public:
|
||||
|
||||
virtual void write(const Block & block, const IColumn::Permutation * permutation) = 0;
|
||||
|
||||
virtual void fillChecksums(IMergeTreeDataPart::Checksums & checksums) = 0;
|
||||
|
||||
virtual void finish(bool sync) = 0;
|
||||
virtual void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) = 0;
|
||||
|
||||
Columns releaseIndexColumns();
|
||||
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
|
||||
|
@ -478,11 +478,9 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
|
||||
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
|
||||
|
||||
ctx->executor.reset();
|
||||
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);
|
||||
auto changed_checksums = ctx->column_to->writeSuffixAndGetChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns, ctx->need_sync);
|
||||
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
|
||||
|
||||
ctx->delayed_streams.emplace_back(std::move(ctx->column_to));
|
||||
|
||||
if (global_ctx->rows_written != ctx->column_elems_written)
|
||||
{
|
||||
throw Exception("Written " + toString(ctx->column_elems_written) + " elements of column " + column_name +
|
||||
@ -507,8 +505,9 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
|
||||
|
||||
bool MergeTask::VerticalMergeStage::finalizeVerticalMergeForAllColumns() const
|
||||
{
|
||||
for (auto & stream : ctx->delayed_streams)
|
||||
stream->finish(ctx->need_sync);
|
||||
/// No need to execute this part if it is horizontal merge.
|
||||
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
|
||||
return false;
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -634,9 +633,9 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
|
||||
}
|
||||
|
||||
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
|
||||
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync);
|
||||
global_ctx->to->writeSuffixAndFinalizePart(global_ctx->new_data_part, ctx->need_sync);
|
||||
else
|
||||
global_ctx->to->finalizePart(
|
||||
global_ctx->to->writeSuffixAndFinalizePart(
|
||||
global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
|
||||
|
||||
global_ctx->promise.set_value(global_ctx->new_data_part);
|
||||
|
@ -268,7 +268,6 @@ private:
|
||||
|
||||
Float64 progress_before = 0;
|
||||
std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr};
|
||||
std::vector<std::unique_ptr<MergedColumnOnlyOutputStream>> delayed_streams;
|
||||
size_t column_elems_written{0};
|
||||
QueryPipeline column_parts_pipeline;
|
||||
std::unique_ptr<PullingPipelineExecutor> executor;
|
||||
|
@ -125,12 +125,12 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
|
||||
projection_compression_codec);
|
||||
|
||||
projection_out.write(projection_part->block);
|
||||
projection_out.finalizePart(projection_data_part, false);
|
||||
projection_out.writeSuffixAndFinalizePart(projection_data_part);
|
||||
new_data_part->addProjectionPart(projection_name, std::move(projection_data_part));
|
||||
}
|
||||
}
|
||||
|
||||
out.finalizePart(new_data_part, false);
|
||||
out.writeSuffixAndFinalizePart(new_data_part);
|
||||
}
|
||||
|
||||
void MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const
|
||||
|
@ -218,7 +218,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
if (columns_buffer.size() != 0)
|
||||
{
|
||||
@ -253,12 +253,6 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(IMergeTreeDataPart::Check
|
||||
marks.next();
|
||||
addToChecksums(checksums);
|
||||
|
||||
plain_file->preFinalize();
|
||||
marks_file->preFinalize();
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterCompact::finishDataSerialization(bool sync)
|
||||
{
|
||||
plain_file->finalize();
|
||||
marks_file->finalize();
|
||||
if (sync)
|
||||
@ -362,28 +356,16 @@ size_t MergeTreeDataPartWriterCompact::ColumnsBuffer::size() const
|
||||
return accumulated_columns.at(0)->size();
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterCompact::finish(IMergeTreeDataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
// If we don't have anything to write, skip finalization.
|
||||
if (!columns_list.empty())
|
||||
fillDataChecksums(checksums);
|
||||
finishDataSerialization(checksums, sync);
|
||||
|
||||
if (settings.rewrite_primary_key)
|
||||
fillPrimaryIndexChecksums(checksums);
|
||||
finishPrimaryIndexSerialization(checksums, sync);
|
||||
|
||||
fillSkipIndicesChecksums(checksums);
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterCompact::finish(bool sync)
|
||||
{
|
||||
// If we don't have anything to write, skip finalization.
|
||||
if (!columns_list.empty())
|
||||
finishDataSerialization(sync);
|
||||
|
||||
if (settings.rewrite_primary_key)
|
||||
finishPrimaryIndexSerialization(sync);
|
||||
|
||||
finishSkipIndicesSerialization(sync);
|
||||
finishSkipIndicesSerialization(checksums, sync);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,13 +20,11 @@ public:
|
||||
|
||||
void write(const Block & block, const IColumn::Permutation * permutation) override;
|
||||
|
||||
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
|
||||
void finish(bool sync) override;
|
||||
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
|
||||
|
||||
private:
|
||||
/// Finish serialization of the data. Flush rows in buffer to disk, compute checksums.
|
||||
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums);
|
||||
void finishDataSerialization(bool sync);
|
||||
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync);
|
||||
|
||||
void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override;
|
||||
|
||||
|
@ -76,7 +76,7 @@ void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Bl
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterInMemory::finish(IMergeTreeDataPart::Checksums & checksums, bool /* sync */)
|
||||
{
|
||||
/// If part is empty we still need to initialize block by empty columns.
|
||||
if (!part_in_memory->block)
|
||||
|
@ -18,8 +18,7 @@ public:
|
||||
/// You can write only one block. In-memory part can be written only at INSERT.
|
||||
void write(const Block & block, const IColumn::Permutation * permutation) override;
|
||||
|
||||
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) override;
|
||||
void finish(bool /*sync*/) override {}
|
||||
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) override;
|
||||
|
||||
private:
|
||||
void calculateAndSerializePrimaryIndex(const Block & primary_index_block);
|
||||
|
@ -2,7 +2,6 @@
|
||||
#include <Common/MemoryTrackerBlockerInThread.h>
|
||||
|
||||
#include <utility>
|
||||
#include "IO/WriteBufferFromFileDecorator.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -11,24 +10,13 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::Stream::preFinalize()
|
||||
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
|
||||
{
|
||||
compressed.next();
|
||||
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
|
||||
plain_hashing.next();
|
||||
marks.next();
|
||||
|
||||
plain_file->preFinalize();
|
||||
marks_file->preFinalize();
|
||||
|
||||
is_prefinalized = true;
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
|
||||
{
|
||||
if (!is_prefinalized)
|
||||
preFinalize();
|
||||
|
||||
plain_file->finalize();
|
||||
marks_file->finalize();
|
||||
}
|
||||
@ -257,7 +245,8 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::DataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
|
||||
MergeTreeData::DataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
bool write_final_mark = (with_final_mark && data_written);
|
||||
if (write_final_mark && compute_granularity)
|
||||
@ -280,14 +269,6 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
|
||||
index_stream->next();
|
||||
checksums.files["primary.idx"].file_size = index_stream->count();
|
||||
checksums.files["primary.idx"].file_hash = index_stream->getHash();
|
||||
index_file_stream->preFinalize();
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
|
||||
{
|
||||
if (index_stream)
|
||||
{
|
||||
index_file_stream->finalize();
|
||||
if (sync)
|
||||
index_file_stream->sync();
|
||||
@ -295,7 +276,8 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(bool sync)
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::DataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(
|
||||
MergeTreeData::DataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
for (size_t i = 0; i < skip_indices.size(); ++i)
|
||||
{
|
||||
@ -304,18 +286,10 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data
|
||||
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
|
||||
}
|
||||
|
||||
for (auto & stream : skip_indices_streams)
|
||||
{
|
||||
stream->preFinalize();
|
||||
stream->addToChecksums(checksums);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(bool sync)
|
||||
{
|
||||
for (auto & stream : skip_indices_streams)
|
||||
{
|
||||
stream->finalize();
|
||||
stream->addToChecksums(checksums);
|
||||
if (sync)
|
||||
stream->sync();
|
||||
}
|
||||
|
@ -71,10 +71,6 @@ public:
|
||||
std::unique_ptr<WriteBufferFromFileBase> marks_file;
|
||||
HashingWriteBuffer marks;
|
||||
|
||||
bool is_prefinalized = false;
|
||||
|
||||
void preFinalize();
|
||||
|
||||
void finalize();
|
||||
|
||||
void sync() const;
|
||||
@ -111,11 +107,9 @@ protected:
|
||||
void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, const Granules & granules_to_write);
|
||||
|
||||
/// Finishes primary index serialization: write final primary index row (if required) and compute checksums
|
||||
void fillPrimaryIndexChecksums(MergeTreeData::DataPart::Checksums & checksums);
|
||||
void finishPrimaryIndexSerialization(bool sync);
|
||||
void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync);
|
||||
/// Finishes skip indices serialization: write all accumulated data to disk and compute checksums
|
||||
void fillSkipIndicesChecksums(MergeTreeData::DataPart::Checksums & checksums);
|
||||
void finishSkipIndicesSerialization(bool sync);
|
||||
void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums, bool sync);
|
||||
|
||||
/// Get global number of the current which we are writing (or going to start to write)
|
||||
size_t getCurrentMark() const { return current_mark; }
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include <DataTypes/Serializations/ISerialization.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Columns/ColumnSparse.h>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -520,7 +519,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const NameAndTypePai
|
||||
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
const auto & global_settings = storage.getContext()->getSettingsRef();
|
||||
ISerialization::SerializeBinaryBulkSettings serialize_settings;
|
||||
@ -553,19 +552,10 @@ void MergeTreeDataPartWriterWide::fillDataChecksums(IMergeTreeDataPart::Checksum
|
||||
writeFinalMark(*it, offset_columns, serialize_settings.path);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto & stream : column_streams)
|
||||
{
|
||||
stream.second->preFinalize();
|
||||
stream.second->addToChecksums(checksums);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync)
|
||||
{
|
||||
for (auto & stream : column_streams)
|
||||
{
|
||||
stream.second->finalize();
|
||||
stream.second->addToChecksums(checksums);
|
||||
if (sync)
|
||||
stream.second->sync();
|
||||
}
|
||||
@ -589,28 +579,16 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(bool sync)
|
||||
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & checksums)
|
||||
void MergeTreeDataPartWriterWide::finish(IMergeTreeDataPart::Checksums & checksums, bool sync)
|
||||
{
|
||||
// If we don't have anything to write, skip finalization.
|
||||
if (!columns_list.empty())
|
||||
fillDataChecksums(checksums);
|
||||
finishDataSerialization(checksums, sync);
|
||||
|
||||
if (settings.rewrite_primary_key)
|
||||
fillPrimaryIndexChecksums(checksums);
|
||||
finishPrimaryIndexSerialization(checksums, sync);
|
||||
|
||||
fillSkipIndicesChecksums(checksums);
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::finish(bool sync)
|
||||
{
|
||||
// If we don't have anything to write, skip finalization.
|
||||
if (!columns_list.empty())
|
||||
finishDataSerialization(sync);
|
||||
|
||||
if (settings.rewrite_primary_key)
|
||||
finishPrimaryIndexSerialization(sync);
|
||||
|
||||
finishSkipIndicesSerialization(sync);
|
||||
finishSkipIndicesSerialization(checksums, sync);
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::writeFinalMark(
|
||||
|
@ -29,15 +29,12 @@ public:
|
||||
|
||||
void write(const Block & block, const IColumn::Permutation * permutation) override;
|
||||
|
||||
void fillChecksums(IMergeTreeDataPart::Checksums & checksums) final;
|
||||
|
||||
void finish(bool sync) final;
|
||||
void finish(IMergeTreeDataPart::Checksums & checksums, bool sync) final;
|
||||
|
||||
private:
|
||||
/// Finish serialization of data: write final mark if required and compute checksums
|
||||
/// Also validate written data in debug mode
|
||||
void fillDataChecksums(IMergeTreeDataPart::Checksums & checksums);
|
||||
void finishDataSerialization(bool sync);
|
||||
void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync);
|
||||
|
||||
/// Write data of one column.
|
||||
/// Return how many marks were written and
|
||||
|
@ -137,12 +137,6 @@ void updateTTL(
|
||||
|
||||
}
|
||||
|
||||
void MergeTreeDataWriter::TemporaryPart::finalize()
|
||||
{
|
||||
for (auto & stream : streams)
|
||||
stream.finalizer.finish();
|
||||
}
|
||||
|
||||
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
|
||||
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||
{
|
||||
@ -276,10 +270,9 @@ Block MergeTreeDataWriter::mergeBlock(
|
||||
return block.cloneWithColumns(status.chunk.getColumns());
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
|
||||
BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||
{
|
||||
TemporaryPart temp_part;
|
||||
Block & block = block_with_partition.block;
|
||||
|
||||
static const String TMP_PREFIX = "tmp_insert_";
|
||||
@ -350,7 +343,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
||||
/// If optimize_on_insert is true, block may become empty after merge.
|
||||
/// There is no need to create empty part.
|
||||
if (expected_size == 0)
|
||||
return temp_part;
|
||||
return nullptr;
|
||||
|
||||
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
|
||||
const auto & move_ttl_entries = metadata_snapshot->getMoveTTLs();
|
||||
@ -426,37 +419,30 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
||||
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
|
||||
|
||||
const auto & index_factory = MergeTreeIndexFactory::instance();
|
||||
auto out = std::make_unique<MergedBlockOutputStream>(new_data_part, metadata_snapshot,columns,
|
||||
MergedBlockOutputStream out(new_data_part, metadata_snapshot,columns,
|
||||
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
|
||||
|
||||
out->writeWithPermutation(block, perm_ptr);
|
||||
bool sync_on_insert = data_settings->fsync_after_insert;
|
||||
|
||||
out.writeWithPermutation(block, perm_ptr);
|
||||
|
||||
for (const auto & projection : metadata_snapshot->getProjections())
|
||||
{
|
||||
auto projection_block = projection.calculate(block, context);
|
||||
if (projection_block.rows())
|
||||
{
|
||||
auto proj_temp_part = writeProjectionPart(data, log, projection_block, projection, new_data_part.get());
|
||||
new_data_part->addProjectionPart(projection.name, std::move(proj_temp_part.part));
|
||||
for (auto & stream : proj_temp_part.streams)
|
||||
temp_part.streams.emplace_back(std::move(stream));
|
||||
}
|
||||
new_data_part->addProjectionPart(
|
||||
projection.name, writeProjectionPart(data, log, projection_block, projection, new_data_part.get()));
|
||||
}
|
||||
auto finalizer = out->finalizePartAsync(new_data_part, data_settings->fsync_after_insert);
|
||||
|
||||
temp_part.part = new_data_part;
|
||||
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
|
||||
|
||||
/// out.finish(new_data_part, std::move(written_files), sync_on_insert);
|
||||
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());
|
||||
|
||||
return temp_part;
|
||||
return new_data_part;
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
const String & part_name,
|
||||
MergeTreeDataPartType part_type,
|
||||
const String & relative_path,
|
||||
@ -467,7 +453,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
Block block,
|
||||
const ProjectionDescription & projection)
|
||||
{
|
||||
TemporaryPart temp_part;
|
||||
const StorageMetadataPtr & metadata_snapshot = projection.metadata;
|
||||
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
||||
auto new_data_part = data.createPart(
|
||||
@ -540,28 +525,24 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
|
||||
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
|
||||
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
|
||||
|
||||
auto out = std::make_unique<MergedBlockOutputStream>(
|
||||
MergedBlockOutputStream out(
|
||||
new_data_part,
|
||||
metadata_snapshot,
|
||||
columns,
|
||||
MergeTreeIndices{},
|
||||
{},
|
||||
compression_codec);
|
||||
|
||||
out->writeWithPermutation(block, perm_ptr);
|
||||
auto finalizer = out->finalizePartAsync(new_data_part, false);
|
||||
temp_part.part = new_data_part;
|
||||
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
|
||||
|
||||
// out.finish(new_data_part, std::move(written_files), false);
|
||||
out.writeWithPermutation(block, perm_ptr);
|
||||
out.writeSuffixAndFinalizePart(new_data_part);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterRows, block.rows());
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes());
|
||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk());
|
||||
|
||||
return temp_part;
|
||||
return new_data_part;
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPart(
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart(
|
||||
MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part)
|
||||
{
|
||||
String part_name = projection.name;
|
||||
@ -593,7 +574,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPart(
|
||||
|
||||
/// This is used for projection materialization process which may contain multiple stages of
|
||||
/// projection part merges.
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
|
||||
MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
@ -628,7 +609,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempProjectionPart(
|
||||
projection);
|
||||
}
|
||||
|
||||
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeInMemoryProjectionPart(
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeInMemoryProjectionPart(
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
|
@ -10,7 +10,6 @@
|
||||
#include <Interpreters/sortBlock.h>
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -47,28 +46,11 @@ public:
|
||||
*/
|
||||
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert);
|
||||
|
||||
/// This structure contains not completely written temporary part.
|
||||
/// Some writes may happen asynchronously, e.g. for blob storages.
|
||||
/// You should call finalize() to wait until all data is written.
|
||||
struct TemporaryPart
|
||||
{
|
||||
MergeTreeData::MutableDataPartPtr part;
|
||||
|
||||
struct Stream
|
||||
{
|
||||
std::unique_ptr<MergedBlockOutputStream> stream;
|
||||
MergedBlockOutputStream::Finalizer finalizer;
|
||||
};
|
||||
|
||||
std::vector<Stream> streams;
|
||||
|
||||
void finalize();
|
||||
};
|
||||
|
||||
TemporaryPart writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||
MergeTreeData::MutableDataPartPtr
|
||||
writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||
|
||||
/// For insertion.
|
||||
static TemporaryPart writeProjectionPart(
|
||||
static MergeTreeData::MutableDataPartPtr writeProjectionPart(
|
||||
MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
@ -76,7 +58,7 @@ public:
|
||||
const IMergeTreeDataPart * parent_part);
|
||||
|
||||
/// For mutation: MATERIALIZE PROJECTION.
|
||||
static TemporaryPart writeTempProjectionPart(
|
||||
static MergeTreeData::MutableDataPartPtr writeTempProjectionPart(
|
||||
MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
@ -85,7 +67,7 @@ public:
|
||||
size_t block_num);
|
||||
|
||||
/// For WriteAheadLog AddPart.
|
||||
static TemporaryPart writeInMemoryProjectionPart(
|
||||
static MergeTreeData::MutableDataPartPtr writeInMemoryProjectionPart(
|
||||
const MergeTreeData & data,
|
||||
Poco::Logger * log,
|
||||
Block block,
|
||||
@ -100,7 +82,7 @@ public:
|
||||
const MergeTreeData::MergingParams & merging_params);
|
||||
|
||||
private:
|
||||
static TemporaryPart writeProjectionPartImpl(
|
||||
static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl(
|
||||
const String & part_name,
|
||||
MergeTreeDataPartType part_type,
|
||||
const String & relative_path,
|
||||
|
@ -375,17 +375,17 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
|
||||
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||
{
|
||||
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
|
||||
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
|
||||
return store(partition_key_sample, disk, part_path, checksums);
|
||||
store(partition_key_sample, disk, part_path, checksums);
|
||||
}
|
||||
|
||||
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||
void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
|
||||
{
|
||||
if (!partition_key_sample)
|
||||
return nullptr;
|
||||
return;
|
||||
|
||||
auto out = disk->writeFile(part_path + "partition.dat");
|
||||
HashingWriteBuffer out_hashing(*out);
|
||||
@ -395,8 +395,7 @@ std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block &
|
||||
out_hashing.next();
|
||||
checksums.files["partition.dat"].file_size = out_hashing.count();
|
||||
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
|
||||
out->preFinalize();
|
||||
return out;
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context)
|
||||
|
@ -38,10 +38,8 @@ public:
|
||||
void serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const;
|
||||
|
||||
void load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path);
|
||||
/// Store functions return write buffer with written but not finalized data.
|
||||
/// User must call finish() for returned object.
|
||||
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
|
||||
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
|
||||
void store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
|
||||
void store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
|
||||
|
||||
void assign(const MergeTreePartition & other) { value = other.value; }
|
||||
|
||||
|
@ -7,21 +7,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
MergeTreeSink::~MergeTreeSink() = default;
|
||||
|
||||
MergeTreeSink::MergeTreeSink(
|
||||
StorageMergeTree & storage_,
|
||||
StorageMetadataPtr metadata_snapshot_,
|
||||
size_t max_parts_per_block_,
|
||||
ContextPtr context_)
|
||||
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
||||
, storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, max_parts_per_block(max_parts_per_block_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
void MergeTreeSink::onStart()
|
||||
{
|
||||
/// Only check "too many parts" before write,
|
||||
@ -29,42 +14,22 @@ void MergeTreeSink::onStart()
|
||||
storage.delayInsertOrThrowIfNeeded();
|
||||
}
|
||||
|
||||
void MergeTreeSink::onFinish()
|
||||
{
|
||||
finishDelayedChunk();
|
||||
}
|
||||
|
||||
struct MergeTreeSink::DelayedChunk
|
||||
{
|
||||
struct Partition
|
||||
{
|
||||
MergeTreeDataWriter::TemporaryPart temp_part;
|
||||
UInt64 elapsed_ns;
|
||||
String block_dedup_token;
|
||||
};
|
||||
|
||||
std::vector<Partition> partitions;
|
||||
};
|
||||
|
||||
|
||||
void MergeTreeSink::consume(Chunk chunk)
|
||||
{
|
||||
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
String block_dedup_token;
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
std::vector<MergeTreeSink::DelayedChunk::Partition> partitions;
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
Stopwatch watch;
|
||||
String block_dedup_token;
|
||||
|
||||
auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
|
||||
|
||||
UInt64 elapsed_ns = watch.elapsed();
|
||||
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
|
||||
|
||||
/// If optimize_on_insert setting is true, current_block could become empty after merge
|
||||
/// and we didn't create part.
|
||||
if (!temp_part.part)
|
||||
if (!part)
|
||||
continue;
|
||||
|
||||
if (storage.getDeduplicationLog())
|
||||
@ -79,41 +44,15 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
}
|
||||
}
|
||||
|
||||
partitions.emplace_back(MergeTreeSink::DelayedChunk::Partition
|
||||
{
|
||||
.temp_part = std::move(temp_part),
|
||||
.elapsed_ns = elapsed_ns,
|
||||
.block_dedup_token = std::move(block_dedup_token)
|
||||
});
|
||||
}
|
||||
|
||||
finishDelayedChunk();
|
||||
delayed_chunk = std::make_unique<MergeTreeSink::DelayedChunk>();
|
||||
delayed_chunk->partitions = std::move(partitions);
|
||||
}
|
||||
|
||||
void MergeTreeSink::finishDelayedChunk()
|
||||
{
|
||||
if (!delayed_chunk)
|
||||
return;
|
||||
|
||||
for (auto & partition : delayed_chunk->partitions)
|
||||
{
|
||||
partition.temp_part.finalize();
|
||||
|
||||
auto & part = partition.temp_part.part;
|
||||
|
||||
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
|
||||
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog(), partition.block_dedup_token))
|
||||
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog(), block_dedup_token))
|
||||
{
|
||||
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
|
||||
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
|
||||
|
||||
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
||||
storage.background_operations_assignee.trigger();
|
||||
}
|
||||
}
|
||||
|
||||
delayed_chunk.reset();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,16 +16,20 @@ class MergeTreeSink : public SinkToStorage
|
||||
public:
|
||||
MergeTreeSink(
|
||||
StorageMergeTree & storage_,
|
||||
StorageMetadataPtr metadata_snapshot_,
|
||||
const StorageMetadataPtr metadata_snapshot_,
|
||||
size_t max_parts_per_block_,
|
||||
ContextPtr context_);
|
||||
|
||||
~MergeTreeSink() override;
|
||||
ContextPtr context_)
|
||||
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
||||
, storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, max_parts_per_block(max_parts_per_block_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
String getName() const override { return "MergeTreeSink"; }
|
||||
void consume(Chunk chunk) override;
|
||||
void onStart() override;
|
||||
void onFinish() override;
|
||||
|
||||
private:
|
||||
StorageMergeTree & storage;
|
||||
@ -33,12 +37,6 @@ private:
|
||||
size_t max_parts_per_block;
|
||||
ContextPtr context;
|
||||
uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
|
||||
|
||||
/// We can delay processing for previous chunk and start writing a new one.
|
||||
struct DelayedChunk;
|
||||
std::unique_ptr<DelayedChunk> delayed_chunk;
|
||||
|
||||
void finishDelayedChunk();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -210,12 +210,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
||||
for (const auto & projection : metadata_snapshot->getProjections())
|
||||
{
|
||||
auto projection_block = projection.calculate(block, context);
|
||||
auto temp_part = MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get());
|
||||
temp_part.finalize();
|
||||
if (projection_block.rows())
|
||||
part->addProjectionPart(projection.name, std::move(temp_part.part));
|
||||
part->addProjectionPart(
|
||||
projection.name,
|
||||
MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get()));
|
||||
}
|
||||
part_out.finalizePart(part, false);
|
||||
part_out.writeSuffixAndFinalizePart(part);
|
||||
|
||||
min_block_number = std::min(min_block_number, part->info.min_block);
|
||||
max_block_number = std::max(max_block_number, part->info.max_block);
|
||||
|
@ -51,66 +51,7 @@ void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IC
|
||||
writeImpl(block, permutation);
|
||||
}
|
||||
|
||||
struct MergedBlockOutputStream::Finalizer::Impl
|
||||
{
|
||||
IMergeTreeDataPartWriter & writer;
|
||||
MergeTreeData::MutableDataPartPtr part;
|
||||
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
|
||||
bool sync;
|
||||
|
||||
Impl(IMergeTreeDataPartWriter & writer_, MergeTreeData::MutableDataPartPtr part_, bool sync_)
|
||||
: writer(writer_), part(std::move(part_)), sync(sync_) {}
|
||||
|
||||
void finish();
|
||||
};
|
||||
|
||||
void MergedBlockOutputStream::Finalizer::finish()
|
||||
{
|
||||
std::unique_ptr<Impl> to_finish = std::move(impl);
|
||||
if (to_finish)
|
||||
to_finish->finish();
|
||||
}
|
||||
|
||||
void MergedBlockOutputStream::Finalizer::Impl::finish()
|
||||
{
|
||||
writer.finish(sync);
|
||||
|
||||
for (auto & file : written_files)
|
||||
{
|
||||
file->finalize();
|
||||
if (sync)
|
||||
file->sync();
|
||||
}
|
||||
|
||||
part->storage.lockSharedData(*part);
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::Finalizer::~Finalizer()
|
||||
{
|
||||
try
|
||||
{
|
||||
finish();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("MergedBlockOutputStream");
|
||||
}
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::Finalizer::Finalizer(Finalizer &&) = default;
|
||||
MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operator=(Finalizer &&) = default;
|
||||
MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : impl(std::move(impl_)) {}
|
||||
|
||||
void MergedBlockOutputStream::finalizePart(
|
||||
MergeTreeData::MutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums)
|
||||
{
|
||||
finalizePartAsync(new_part, sync, total_columns_list, additional_column_checksums).finish();
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
|
||||
MergeTreeData::MutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list,
|
||||
@ -123,9 +64,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
checksums = std::move(*additional_column_checksums);
|
||||
|
||||
/// Finish columns serialization.
|
||||
writer->fillChecksums(checksums);
|
||||
|
||||
LOG_TRACE(&Poco::Logger::get("MergedBlockOutputStream"), "filled checksums {}", new_part->getNameWithState());
|
||||
writer->finish(checksums, sync);
|
||||
|
||||
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
|
||||
checksums.addFile(
|
||||
@ -145,9 +84,8 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
new_part->setSerializationInfos(serialization_infos);
|
||||
}
|
||||
|
||||
auto finalizer = std::make_unique<Finalizer::Impl>(*writer, new_part, sync);
|
||||
if (new_part->isStoredOnDisk())
|
||||
finalizer->written_files = finalizePartOnDisk(new_part, checksums);
|
||||
finalizePartOnDisk(new_part, checksums, sync);
|
||||
|
||||
new_part->rows_count = rows_count;
|
||||
new_part->modification_time = time(nullptr);
|
||||
@ -159,15 +97,15 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
|
||||
|
||||
if (default_codec != nullptr)
|
||||
new_part->default_codec = default_codec;
|
||||
|
||||
return Finalizer(std::move(finalizer));
|
||||
new_part->storage.lockSharedData(*new_part);
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDisk(
|
||||
void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
const MergeTreeData::DataPartPtr & new_part,
|
||||
MergeTreeData::DataPart::Checksums & checksums)
|
||||
MergeTreeData::DataPart::Checksums & checksums,
|
||||
bool sync)
|
||||
{
|
||||
WrittenFiles written_files;
|
||||
|
||||
if (new_part->isProjectionPart())
|
||||
{
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
|
||||
@ -178,8 +116,6 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
||||
count_out_hashing.next();
|
||||
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
||||
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
||||
count_out->preFinalize();
|
||||
written_files.emplace_back(std::move(count_out));
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -191,21 +127,16 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
||||
writeUUIDText(new_part->uuid, out_hashing);
|
||||
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
|
||||
checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
|
||||
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
{
|
||||
if (auto file = new_part->partition.store(storage, volume->getDisk(), part_path, checksums))
|
||||
written_files.emplace_back(std::move(file));
|
||||
|
||||
new_part->partition.store(storage, volume->getDisk(), part_path, checksums);
|
||||
if (new_part->minmax_idx->initialized)
|
||||
{
|
||||
auto files = new_part->minmax_idx->store(storage, volume->getDisk(), part_path, checksums);
|
||||
for (auto & file : files)
|
||||
written_files.emplace_back(std::move(file));
|
||||
}
|
||||
new_part->minmax_idx->store(storage, volume->getDisk(), part_path, checksums);
|
||||
else if (rows_count)
|
||||
throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
|
||||
+ ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -218,8 +149,9 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
||||
count_out_hashing.next();
|
||||
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
||||
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
||||
count_out->preFinalize();
|
||||
written_files.emplace_back(std::move(count_out));
|
||||
count_out->finalize();
|
||||
if (sync)
|
||||
count_out->sync();
|
||||
}
|
||||
}
|
||||
|
||||
@ -231,8 +163,9 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
||||
new_part->ttl_infos.write(out_hashing);
|
||||
checksums.files["ttl.txt"].file_size = out_hashing.count();
|
||||
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
|
||||
if (!new_part->getSerializationInfos().empty())
|
||||
@ -242,24 +175,25 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
||||
new_part->getSerializationInfos().writeJSON(out_hashing);
|
||||
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
|
||||
checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
|
||||
{
|
||||
/// Write a file with a description of columns.
|
||||
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "columns.txt", 4096);
|
||||
new_part->getColumns().writeText(*out);
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
|
||||
if (default_codec != nullptr)
|
||||
{
|
||||
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
|
||||
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -271,11 +205,10 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis
|
||||
/// Write file with checksums.
|
||||
auto out = volume->getDisk()->writeFile(fs::path(part_path) / "checksums.txt", 4096);
|
||||
checksums.write(*out);
|
||||
out->preFinalize();
|
||||
written_files.emplace_back(std::move(out));
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
|
||||
return written_files;
|
||||
}
|
||||
|
||||
void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
|
||||
|
@ -32,33 +32,11 @@ public:
|
||||
*/
|
||||
void writeWithPermutation(const Block & block, const IColumn::Permutation * permutation);
|
||||
|
||||
/// Finalizer is a structure which is returned from by finalizePart().
|
||||
/// Files from part may be written asynchronously, e.g. for blob storages.
|
||||
/// You should call finish() to wait until all data is written.
|
||||
struct Finalizer
|
||||
{
|
||||
struct Impl;
|
||||
std::unique_ptr<Impl> impl;
|
||||
|
||||
explicit Finalizer(std::unique_ptr<Impl> impl_);
|
||||
~Finalizer();
|
||||
Finalizer(Finalizer &&);
|
||||
Finalizer & operator=(Finalizer &&);
|
||||
|
||||
void finish();
|
||||
};
|
||||
|
||||
/// Finalize writing part and fill inner structures
|
||||
/// If part is new and contains projections, they should be added before invoking this method.
|
||||
Finalizer finalizePartAsync(
|
||||
void writeSuffixAndFinalizePart(
|
||||
MergeTreeData::MutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
const NamesAndTypesList * total_columns_list = nullptr,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
|
||||
|
||||
void finalizePart(
|
||||
MergeTreeData::MutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
bool sync = false,
|
||||
const NamesAndTypesList * total_columns_list = nullptr,
|
||||
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
|
||||
|
||||
@ -68,10 +46,10 @@ private:
|
||||
*/
|
||||
void writeImpl(const Block & block, const IColumn::Permutation * permutation);
|
||||
|
||||
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
|
||||
WrittenFiles finalizePartOnDisk(
|
||||
void finalizePartOnDisk(
|
||||
const MergeTreeData::DataPartPtr & new_part,
|
||||
MergeTreeData::DataPart::Checksums & checksums);
|
||||
MergeTreeData::DataPart::Checksums & checksums,
|
||||
bool sync);
|
||||
|
||||
NamesAndTypesList columns_list;
|
||||
IMergeTreeDataPart::MinMaxIndex minmax_idx;
|
||||
|
@ -55,13 +55,14 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
|
||||
}
|
||||
|
||||
MergeTreeData::DataPart::Checksums
|
||||
MergedColumnOnlyOutputStream::fillChecksums(
|
||||
MergedColumnOnlyOutputStream::writeSuffixAndGetChecksums(
|
||||
MergeTreeData::MutableDataPartPtr & new_part,
|
||||
MergeTreeData::DataPart::Checksums & all_checksums)
|
||||
MergeTreeData::DataPart::Checksums & all_checksums,
|
||||
bool sync)
|
||||
{
|
||||
/// Finish columns serialization.
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
writer->fillChecksums(checksums);
|
||||
writer->finish(checksums, sync);
|
||||
|
||||
for (const auto & [projection_name, projection_part] : new_part->getProjectionParts())
|
||||
checksums.addFile(
|
||||
@ -84,9 +85,4 @@ MergedColumnOnlyOutputStream::fillChecksums(
|
||||
return checksums;
|
||||
}
|
||||
|
||||
void MergedColumnOnlyOutputStream::finish(bool sync)
|
||||
{
|
||||
writer->finish(sync);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -25,11 +25,8 @@ public:
|
||||
|
||||
Block getHeader() const { return header; }
|
||||
void write(const Block & block) override;
|
||||
|
||||
MergeTreeData::DataPart::Checksums
|
||||
fillChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums);
|
||||
|
||||
void finish(bool sync);
|
||||
writeSuffixAndGetChecksums(MergeTreeData::MutableDataPartPtr & new_part, MergeTreeData::DataPart::Checksums & all_checksums, bool sync = false);
|
||||
|
||||
private:
|
||||
Block header;
|
||||
|
@ -804,12 +804,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
|
||||
const auto & projection = *ctx->projections_to_build[i];
|
||||
auto projection_block = projection_squashes[i].add(projection.calculate(cur_block, ctx->context));
|
||||
if (projection_block)
|
||||
{
|
||||
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
|
||||
tmp_part.finalize();
|
||||
projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
|
||||
}
|
||||
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num));
|
||||
}
|
||||
|
||||
(*ctx->mutate_entry)->rows_written += cur_block.rows();
|
||||
@ -827,10 +823,8 @@ bool PartMergerWriter::mutateOriginalPartAndPrepareProjections()
|
||||
auto projection_block = projection_squash.add({});
|
||||
if (projection_block)
|
||||
{
|
||||
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num);
|
||||
temp_part.finalize();
|
||||
projection_parts[projection.name].emplace_back(std::move(temp_part.part));
|
||||
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*ctx->data, ctx->log, projection_block, projection, ctx->new_data_part.get(), ++block_num));
|
||||
}
|
||||
}
|
||||
|
||||
@ -982,7 +976,7 @@ private:
|
||||
ctx->mutating_executor.reset();
|
||||
ctx->mutating_pipeline.reset();
|
||||
|
||||
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->finalizePart(ctx->new_data_part, ctx->need_sync);
|
||||
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync);
|
||||
ctx->out.reset();
|
||||
}
|
||||
|
||||
@ -1138,11 +1132,9 @@ private:
|
||||
ctx->mutating_pipeline.reset();
|
||||
|
||||
auto changed_checksums =
|
||||
static_pointer_cast<MergedColumnOnlyOutputStream>(ctx->out)->fillChecksums(
|
||||
ctx->new_data_part, ctx->new_data_part->checksums);
|
||||
static_pointer_cast<MergedColumnOnlyOutputStream>(ctx->out)->writeSuffixAndGetChecksums(
|
||||
ctx->new_data_part, ctx->new_data_part->checksums, ctx->need_sync);
|
||||
ctx->new_data_part->checksums.add(std::move(changed_checksums));
|
||||
|
||||
static_pointer_cast<MergedColumnOnlyOutputStream>(ctx->out)->finish(ctx->need_sync);
|
||||
}
|
||||
|
||||
for (const auto & [rename_from, rename_to] : ctx->files_to_rename)
|
||||
|
@ -32,17 +32,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
struct ReplicatedMergeTreeSink::DelayedChunk
|
||||
{
|
||||
struct Partition
|
||||
{
|
||||
MergeTreeDataWriter::TemporaryPart temp_part;
|
||||
UInt64 elapsed_ns;
|
||||
String block_id;
|
||||
};
|
||||
|
||||
std::vector<Partition> partitions;
|
||||
};
|
||||
|
||||
ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
|
||||
StorageReplicatedMergeTree & storage_,
|
||||
@ -71,8 +60,6 @@ ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
|
||||
quorum = 0;
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeSink::~ReplicatedMergeTreeSink() = default;
|
||||
|
||||
|
||||
/// Allow to verify that the session in ZooKeeper is still alive.
|
||||
static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
|
||||
@ -139,6 +126,8 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
{
|
||||
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
|
||||
last_block_is_duplicate = false;
|
||||
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
assertSessionIsNotExpired(zookeeper);
|
||||
|
||||
@ -151,7 +140,6 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
checkQuorumPrecondition(zookeeper);
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition> partitions;
|
||||
String block_dedup_token;
|
||||
|
||||
for (auto & current_block : part_blocks)
|
||||
@ -160,11 +148,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
|
||||
/// Write part to the filesystem under temporary name. Calculate a checksum.
|
||||
|
||||
auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
|
||||
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
|
||||
|
||||
/// If optimize_on_insert setting is true, current_block could become empty after merge
|
||||
/// and we didn't create part.
|
||||
if (!temp_part.part)
|
||||
if (!part)
|
||||
continue;
|
||||
|
||||
String block_id;
|
||||
@ -182,7 +170,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
|
||||
++chunk_dedup_seqnum;
|
||||
}
|
||||
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
|
||||
block_id = part->getZeroLevelPartBlockID(block_dedup_token);
|
||||
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
|
||||
}
|
||||
else
|
||||
@ -190,63 +178,27 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
LOG_DEBUG(log, "Wrote block with {} rows", current_block.block.rows());
|
||||
}
|
||||
|
||||
UInt64 elapsed_ns = watch.elapsed();
|
||||
|
||||
partitions.emplace_back(ReplicatedMergeTreeSink::DelayedChunk::Partition{
|
||||
.temp_part = std::move(temp_part),
|
||||
.elapsed_ns = elapsed_ns,
|
||||
.block_id = std::move(block_id)
|
||||
});
|
||||
}
|
||||
|
||||
finishDelayedChunk(zookeeper);
|
||||
delayed_chunk = std::make_unique<ReplicatedMergeTreeSink::DelayedChunk>();
|
||||
delayed_chunk->partitions = std::move(partitions);
|
||||
|
||||
/// If deduplicated data should not be inserted into MV, we need to set proper
|
||||
/// value for `last_block_is_duplicate`, which is possible only after the part is committed.
|
||||
/// Othervide we can delay commit.
|
||||
/// TODO: we can also delay commit if there is no MVs.
|
||||
if (!context->getSettingsRef().deduplicate_blocks_in_dependent_materialized_views)
|
||||
finishDelayedChunk(zookeeper);
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
if (!delayed_chunk)
|
||||
return;
|
||||
|
||||
last_block_is_duplicate = false;
|
||||
|
||||
for (auto & partition : delayed_chunk->partitions)
|
||||
{
|
||||
partition.temp_part.finalize();
|
||||
|
||||
auto & part = partition.temp_part.part;
|
||||
|
||||
try
|
||||
{
|
||||
commitPart(zookeeper, part, partition.block_id);
|
||||
|
||||
last_block_is_duplicate = last_block_is_duplicate || part->is_duplicate;
|
||||
commitPart(zookeeper, part, block_id);
|
||||
|
||||
/// Set a special error code if the block is duplicate
|
||||
int error = (deduplicate && part->is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
|
||||
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns, ExecutionStatus(error));
|
||||
int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
|
||||
PartLog::addNewPart(storage.getContext(), part, watch.elapsed(), ExecutionStatus(error));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns, ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
|
||||
PartLog::addNewPart(storage.getContext(), part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
delayed_chunk.reset();
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
|
||||
{
|
||||
last_block_is_duplicate = false;
|
||||
|
||||
/// NOTE: No delay in this case. That's Ok.
|
||||
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
@ -404,6 +356,7 @@ void ReplicatedMergeTreeSink::commitPart(
|
||||
if (storage.getActiveContainingPart(existing_part_name))
|
||||
{
|
||||
part->is_duplicate = true;
|
||||
last_block_is_duplicate = true;
|
||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||
if (quorum)
|
||||
{
|
||||
@ -578,12 +531,6 @@ void ReplicatedMergeTreeSink::onStart()
|
||||
storage.delayInsertOrThrowIfNeeded(&storage.partial_shutdown_event);
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeSink::onFinish()
|
||||
{
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
assertSessionIsNotExpired(zookeeper);
|
||||
finishDelayedChunk(zookeeper);
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeSink::waitForQuorum(
|
||||
zkutil::ZooKeeperPtr & zookeeper,
|
||||
|
@ -35,11 +35,8 @@ public:
|
||||
// needed to set the special LogEntryType::ATTACH_PART
|
||||
bool is_attach_ = false);
|
||||
|
||||
~ReplicatedMergeTreeSink() override;
|
||||
|
||||
void onStart() override;
|
||||
void consume(Chunk chunk) override;
|
||||
void onFinish() override;
|
||||
|
||||
String getName() const override { return "ReplicatedMergeTreeSink"; }
|
||||
|
||||
@ -93,12 +90,6 @@ private:
|
||||
|
||||
ContextPtr context;
|
||||
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
|
||||
|
||||
/// We can delay processing for previous chunk and start writing a new one.
|
||||
struct DelayedChunk;
|
||||
std::unique_ptr<DelayedChunk> delayed_chunk;
|
||||
|
||||
void finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -7559,7 +7559,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
|
||||
/// TODO(ab): What projections should we add to the empty part? How can we make sure that it
|
||||
/// won't block future merges? Perhaps we should also check part emptiness when selecting parts
|
||||
/// to merge.
|
||||
out.finalizePart(new_data_part, sync_on_insert);
|
||||
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -1,5 +1,4 @@
|
||||
from bottle import request, route, run, response
|
||||
from threading import Lock
|
||||
|
||||
|
||||
# Endpoint can be configured to throw 500 error on N-th request attempt.
|
||||
@ -7,7 +6,6 @@ from threading import Lock
|
||||
|
||||
# Dict to the number of request should be failed.
|
||||
cache = {}
|
||||
mutex = Lock()
|
||||
|
||||
|
||||
@route('/fail_request/<_request_number>')
|
||||
@ -40,34 +38,23 @@ def delete(_bucket):
|
||||
|
||||
@route('/<_bucket>/<_path:path>', ['GET', 'POST', 'PUT', 'DELETE'])
|
||||
def server(_bucket, _path):
|
||||
if cache.get('request_number', None):
|
||||
request_number = cache.pop('request_number') - 1
|
||||
if request_number > 0:
|
||||
cache['request_number'] = request_number
|
||||
else:
|
||||
response.status = 500
|
||||
response.content_type = 'text/xml'
|
||||
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpectedError</Code><Message>Expected Error</Message><RequestId>txfbd566d03042474888193-00608d7537</RequestId></Error>'
|
||||
|
||||
# It's delete query for failed part
|
||||
if _path.endswith('delete'):
|
||||
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
|
||||
response.status = 307
|
||||
return 'Redirected'
|
||||
|
||||
mutex.acquire()
|
||||
try:
|
||||
if cache.get('request_number', None):
|
||||
request_number = cache.pop('request_number') - 1
|
||||
if request_number > 0:
|
||||
cache['request_number'] = request_number
|
||||
else:
|
||||
response.status = 500
|
||||
response.content_type = 'text/xml'
|
||||
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpectedError</Code><Message>Expected Error</Message><RequestId>txfbd566d03042474888193-00608d7537</RequestId></Error>'
|
||||
|
||||
if cache.get('throttle_request_number', None):
|
||||
request_number = cache.pop('throttle_request_number') - 1
|
||||
if request_number > 0:
|
||||
cache['throttle_request_number'] = request_number
|
||||
else:
|
||||
response.status = 429
|
||||
response.content_type = 'text/xml'
|
||||
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>TooManyRequestsException</Code><Message>Please reduce your request rate.</Message><RequestId>txfbd566d03042474888193-00608d7538</RequestId></Error>'
|
||||
finally:
|
||||
mutex.release()
|
||||
if cache.get('throttle_request_number', None):
|
||||
request_number = cache.pop('throttle_request_number') - 1
|
||||
if request_number > 0:
|
||||
cache['throttle_request_number'] = request_number
|
||||
else:
|
||||
response.status = 429
|
||||
response.content_type = 'text/xml'
|
||||
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>TooManyRequestsException</Code><Message>Please reduce your request rate.</Message><RequestId>txfbd566d03042474888193-00608d7538</RequestId></Error>'
|
||||
|
||||
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
|
||||
response.status = 307
|
||||
|
Loading…
Reference in New Issue
Block a user