pass config, not context to createHDFSBuilder

This commit is contained in:
Ilya Golshtein 2020-11-28 01:17:05 +03:00
parent fc0a1af214
commit 815856bc3d
7 changed files with 21 additions and 22 deletions

View File

@ -102,7 +102,7 @@ void HDFSBuilderWrapper::runKinit()
}
}
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context)
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config)
{
const Poco::URI uri(uri_str);
const auto & host = uri.getHost();
@ -139,8 +139,6 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
hdfsBuilderSetNameNodePort(builder.get(), port);
}
// const auto & config = context.getGlobalContext().getConfigRef();
const auto & config = context.getConfigRef();
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);

View File

@ -99,14 +99,14 @@ public:
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context);
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
};
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context);
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -26,9 +26,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_, const Context & context_)
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_),
builder(createHDFSBuilder(hdfs_uri, context_))
builder(createHDFSBuilder(hdfs_uri, config_))
{
std::lock_guard lock(hdfs_init_mutex);
@ -61,12 +62,12 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, context))
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, config_))
{
// auto modified_context = std::make_shared<Context>(context);
// impl = std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, modified_context);
}

View File

@ -25,7 +25,7 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
struct ReadBufferFromHDFSImpl;
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~ReadBufferFromHDFS() override;
bool nextImpl() override;

View File

@ -121,7 +121,7 @@ public:
current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path, context.getGlobalContext()), compression);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path, context.getGlobalContext().getConfigRef()), compression);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
@ -180,7 +180,7 @@ public:
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context.getGlobalContext()), compression_method, 3);
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context.getGlobalContext().getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -274,7 +274,7 @@ Pipe StorageHDFS::read(
const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path);
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_.getGlobalContext());
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_.getGlobalContext().getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();

View File

@ -27,9 +27,9 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_, const Context & context)
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_)
, builder(createHDFSBuilder(hdfs_uri, context))
, builder(createHDFSBuilder(hdfs_uri,config_))
, fs(createHDFSFS(builder.get()))
{
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
@ -73,9 +73,9 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
}
};
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, context))
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_))
{
}

View File

@ -18,7 +18,7 @@ class WriteBufferFromHDFS : public BufferWithOwnMemory<WriteBuffer>
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
public:
WriteBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default;
void nextImpl() override;