mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #35646 from bigo-sg/reduce-mutex-scope
narrow mutex scope when setenv LIBHDFS3_CONF
This commit is contained in:
commit
30c1afe29d
@ -25,6 +25,8 @@ namespace ErrorCodes
|
||||
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
|
||||
const String HDFS_URL_REGEXP = "^hdfs://[^/]*/.*";
|
||||
|
||||
std::once_flag init_libhdfs3_conf_flag;
|
||||
|
||||
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
|
||||
const String & prefix, bool isUser)
|
||||
{
|
||||
@ -123,19 +125,22 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
|
||||
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
// Shall set env LIBHDFS3_CONF *before* HDFSBuilderWrapper construction.
|
||||
String libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", "");
|
||||
if (!libhdfs3_conf.empty())
|
||||
std::call_once(init_libhdfs3_conf_flag, [&config]()
|
||||
{
|
||||
if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf))
|
||||
String libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", "");
|
||||
if (!libhdfs3_conf.empty())
|
||||
{
|
||||
const String config_path = config.getString("config-file", "config.xml");
|
||||
const auto config_dir = std::filesystem::path{config_path}.remove_filename();
|
||||
if (std::filesystem::exists(config_dir / libhdfs3_conf))
|
||||
libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf);
|
||||
if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf))
|
||||
{
|
||||
const String config_path = config.getString("config-file", "config.xml");
|
||||
const auto config_dir = std::filesystem::path{config_path}.remove_filename();
|
||||
if (std::filesystem::exists(config_dir / libhdfs3_conf))
|
||||
libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf);
|
||||
}
|
||||
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), 1);
|
||||
}
|
||||
});
|
||||
|
||||
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), 1);
|
||||
}
|
||||
HDFSBuilderWrapper builder;
|
||||
if (builder.get() == nullptr)
|
||||
throw Exception("Unable to create builder to connect to HDFS: " +
|
||||
|
@ -22,8 +22,6 @@ ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
|
||||
|
||||
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>
|
||||
{
|
||||
/// HDFS create/open functions are not thread safe
|
||||
static std::mutex hdfs_init_mutex;
|
||||
|
||||
String hdfs_uri;
|
||||
String hdfs_file_path;
|
||||
@ -46,8 +44,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
, builder(createHDFSBuilder(hdfs_uri_, config_))
|
||||
, read_until_position(read_until_position_)
|
||||
{
|
||||
std::lock_guard lock(hdfs_init_mutex);
|
||||
|
||||
fs = createHDFSFS(builder.get());
|
||||
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
|
||||
|
||||
@ -59,7 +55,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
|
||||
~ReadBufferFromHDFSImpl() override
|
||||
{
|
||||
std::lock_guard lock(hdfs_init_mutex);
|
||||
hdfsCloseFile(fs.get(), fin);
|
||||
}
|
||||
|
||||
@ -124,9 +119,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
|
||||
|
||||
ReadBufferFromHDFS::ReadBufferFromHDFS(
|
||||
const String & hdfs_uri_,
|
||||
const String & hdfs_file_path_,
|
||||
|
Loading…
Reference in New Issue
Block a user