Unite writeBuffers

This commit is contained in:
kssenii 2021-05-20 10:44:02 +00:00
parent 550a4e33f9
commit 2f6770c9a0
7 changed files with 109 additions and 140 deletions

View File

@ -426,6 +426,7 @@ endif()
if (USE_HDFS)
dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif()
if (USE_AWS_S3)

View File

@ -2,9 +2,9 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include "WriteIndirectBufferFromHDFS.h"
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadIndirectBufferFromRemoteFS.h>
#include <IO/WriteIndirectBufferFromRemoteFS.h>
#include <Common/checkStackSize.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
@ -59,7 +59,7 @@ DiskHDFS::DiskHDFS(
SettingsPtr settings_,
const String & metadata_path_,
const Poco::Util::AbstractConfiguration & config_)
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskS3")
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS")
, config(config_)
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
@ -92,9 +92,14 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
backQuote(metadata_path + path), remote_fs_root_path + hdfs_path);
return std::make_unique<WriteIndirectBufferFromHDFS>(
config, hdfs_path, file_name, metadata, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND); /// Single O_WRONLY in libhdfs adds O_TRUNC
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
config, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(std::move(hdfs_buffer),
std::move(metadata),
file_name);
}
@ -153,5 +158,4 @@ void registerDiskHDFS(DiskFactory & factory)
factory.registerDiskType("hdfs", creator);
}
}

View File

@ -1,82 +0,0 @@
#pragma once
#include <IO/WriteBufferFromFile.h>
#include <Disks/IDiskRemote.h>
namespace DB
{
/// Stores data in HDFS and adds the object key (HDFS path) and object size to metadata file on local FS.
/// If the file does not exist, it will be created before writing.
class WriteIndirectBufferFromHDFS final : public WriteBufferFromFileBase
{
public:
WriteIndirectBufferFromHDFS(
const Poco::Util::AbstractConfiguration & config_,
const String & hdfs_name_,
const String & hdfs_path_,
DiskHDFS::Metadata metadata_,
size_t buf_size_,
int flags_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromHDFS(hdfs_name_, config_, buf_size_, flags_))
, metadata(std::move(metadata_))
, hdfs_path(hdfs_path_)
{
}
~WriteIndirectBufferFromHDFS() override
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void finalize() override
{
if (finalized)
return;
next();
impl.finalize();
metadata.addObject(hdfs_path, count());
metadata.save();
finalized = true;
}
void sync() override
{
if (finalized)
metadata.save(true);
}
std::string getFileName() const override { return metadata.metadata_file_path; }
private:
void nextImpl() override
{
/// Transfer current working buffer to WriteBufferFromHDFS.
impl.swap(*this);
/// Write actual data to HDFS.
impl.next();
/// Return back working buffer.
impl.swap(*this);
}
WriteBufferFromHDFS impl;
bool finalized = false;
DiskHDFS::Metadata metadata;
String hdfs_path;
};
}

View File

@ -9,10 +9,10 @@
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadIndirectBufferFromRemoteFS.h>
#include <IO/WriteIndirectBufferFromRemoteFS.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
@ -98,55 +98,6 @@ private:
size_t buf_size;
};
/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS.
class WriteIndirectBufferFromS3 final : public WriteBufferFromFileDecorator
{
public:
WriteIndirectBufferFromS3(
std::unique_ptr<WriteBufferFromS3> impl_,
DiskS3::Metadata metadata_,
String & s3_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, metadata(std::move(metadata_))
, s3_path(s3_path_) { }
virtual ~WriteIndirectBufferFromS3() override
{
try
{
WriteIndirectBufferFromS3::finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void finalize() override
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
metadata.addObject(s3_path, count());
metadata.save();
}
void sync() override
{
if (finalized)
metadata.save(true);
}
std::string getFileName() const override { return metadata.metadata_file_path; }
private:
DiskS3::Metadata metadata;
String s3_path;
};
/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
@ -277,7 +228,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
std::move(object_metadata),
buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(std::move(s3_buffer), std::move(metadata), s3_path);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>>(std::move(s3_buffer), std::move(metadata), s3_path);
}
void DiskS3::removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper)

View File

@ -4,7 +4,7 @@
#include "gtest_disk.h"
/// To enable tests set to 1. It is set to 0, because there is not HDFS instance in CI.
#define RUN_HDFS_TEST 1
#define RUN_HDFS_TEST 0
#if RUN_HDFS_TEST

View File

@ -0,0 +1,63 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#include <IO/WriteBufferFromS3.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
namespace DB
{
/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS.
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, metadata(std::move(metadata_))
, remote_fs_path(remote_fs_path_)
{
}
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
{
try
{
WriteIndirectBufferFromRemoteFS::finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::finalize()
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
metadata.addObject(remote_fs_path, count());
metadata.save();
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::sync()
{
if (finalized)
metadata.save(true);
}
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>;
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>;
}

View File

@ -0,0 +1,32 @@
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h>
namespace DB
{
/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS.
template <typename T>
class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator
{
public:
WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_);
virtual ~WriteIndirectBufferFromRemoteFS() override;
void finalize() override;
void sync() override;
String getFileName() const override { return metadata.metadata_file_path; }
private:
IDiskRemote::Metadata metadata;
String remote_fs_path;
};
}