reduce mutex scope when setenv LIBHDFS3_CONF

This commit is contained in:
shuchaome 2022-03-28 12:23:51 +08:00
parent 97f29ac0fe
commit a565a93740
2 changed files with 14 additions and 17 deletions

View File

@ -25,6 +25,8 @@ namespace ErrorCodes
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
const String HDFS_URL_REGEXP = "^hdfs://[^/]*/.*"; const String HDFS_URL_REGEXP = "^hdfs://[^/]*/.*";
std::once_flag init_libhdfs3_conf_flag;
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config, void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & prefix, bool isUser) const String & prefix, bool isUser)
{ {
@ -123,19 +125,22 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS); throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
// Shall set env LIBHDFS3_CONF *before* HDFSBuilderWrapper construction. // Shall set env LIBHDFS3_CONF *before* HDFSBuilderWrapper construction.
String libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", ""); std::call_once(init_libhdfs3_conf_flag, [&config]()
if (!libhdfs3_conf.empty())
{ {
if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf)) String libhdfs3_conf = config.getString(HDFSBuilderWrapper::CONFIG_PREFIX + ".libhdfs3_conf", "");
if (!libhdfs3_conf.empty())
{ {
const String config_path = config.getString("config-file", "config.xml"); if (std::filesystem::path{libhdfs3_conf}.is_relative() && !std::filesystem::exists(libhdfs3_conf))
const auto config_dir = std::filesystem::path{config_path}.remove_filename(); {
if (std::filesystem::exists(config_dir / libhdfs3_conf)) const String config_path = config.getString("config-file", "config.xml");
libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf); const auto config_dir = std::filesystem::path{config_path}.remove_filename();
if (std::filesystem::exists(config_dir / libhdfs3_conf))
libhdfs3_conf = std::filesystem::absolute(config_dir / libhdfs3_conf);
}
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), 1);
} }
});
setenv("LIBHDFS3_CONF", libhdfs3_conf.c_str(), 1);
}
HDFSBuilderWrapper builder; HDFSBuilderWrapper builder;
if (builder.get() == nullptr) if (builder.get() == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " + throw Exception("Unable to create builder to connect to HDFS: " +

View File

@ -22,8 +22,6 @@ ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer> struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<SeekableReadBuffer>
{ {
/// HDFS create/open functions are not thread safe
static std::mutex hdfs_init_mutex;
String hdfs_uri; String hdfs_uri;
String hdfs_file_path; String hdfs_file_path;
@ -46,8 +44,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
, builder(createHDFSBuilder(hdfs_uri_, config_)) , builder(createHDFSBuilder(hdfs_uri_, config_))
, read_until_position(read_until_position_) , read_until_position(read_until_position_)
{ {
std::lock_guard lock(hdfs_init_mutex);
fs = createHDFSFS(builder.get()); fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0); fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
@ -59,7 +55,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
~ReadBufferFromHDFSImpl() override ~ReadBufferFromHDFSImpl() override
{ {
std::lock_guard lock(hdfs_init_mutex);
hdfsCloseFile(fs.get(), fin); hdfsCloseFile(fs.get(), fin);
} }
@ -124,9 +119,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
} }
}; };
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS( ReadBufferFromHDFS::ReadBufferFromHDFS(
const String & hdfs_uri_, const String & hdfs_uri_,
const String & hdfs_file_path_, const String & hdfs_file_path_,