Merge pull request #32049 from leosunli/32039

#32039 Make HDFS replication configurable in WriteBufferFromHDFSImpl#WriteBu…
This commit is contained in:
Kseniia Sumarokova 2021-12-06 20:24:29 +03:00 committed by GitHub
commit a2dfa883cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 17 additions and 9 deletions

View File

@ -75,6 +75,7 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \

View File

@ -97,7 +97,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
config, buf_size,
config, settings->replication, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(std::move(hdfs_buffer),
@ -142,12 +142,13 @@ bool DiskHDFS::checkUniqueId(const String & hdfs_uri) const
namespace
{
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Settings & settings)
{
return std::make_unique<DiskHDFSSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
settings.hdfs_replication);
}
}
@ -173,7 +174,7 @@ void registerDiskHDFS(DiskFactory & factory)
return std::make_shared<DiskHDFS>(
name, uri,
getSettings(config, config_prefix),
getSettings(config, config_prefix, context_->getSettingsRef()),
metadata_disk, config);
};

View File

@ -14,14 +14,17 @@ struct DiskHDFSSettings
size_t min_bytes_for_seek;
int thread_pool_size;
int objects_chunk_size_to_delete;
int replication;
DiskHDFSSettings(
int min_bytes_for_seek_,
int thread_pool_size_,
int objects_chunk_size_to_delete_)
int objects_chunk_size_to_delete_,
int replication_)
: min_bytes_for_seek(min_bytes_for_seek_)
, thread_pool_size(thread_pool_size_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {}
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
, replication(replication_) {}
};

View File

@ -203,7 +203,7 @@ public:
const CompressionMethod compression_method)
: SinkToStorage(sample_block)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context->getGlobalContext()->getConfigRef()), compression_method, 3);
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context->getGlobalContext()->getConfigRef(), context->getSettingsRef().hdfs_replication), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context);
}

View File

@ -29,6 +29,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
explicit WriteBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const Poco::Util::AbstractConfiguration & config_,
int replication_,
int flags)
: hdfs_uri(hdfs_uri_)
, builder(createHDFSBuilder(hdfs_uri, config_))
@ -43,7 +44,7 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path);
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
if (fout == nullptr)
{
@ -82,10 +83,11 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
WriteBufferFromHDFS::WriteBufferFromHDFS(
const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
int replication_,
size_t buf_size_,
int flags_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_, flags_))
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_, replication_, flags_))
{
}

View File

@ -23,6 +23,7 @@ public:
WriteBufferFromHDFS(
const String & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
int replication_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
int flags = O_WRONLY);