mirror of
synced 2024-12-03 13:02:00 +00:00
This reverts commita09e6bbb8e
, reversing changes made toce38d64c5a
235 lines
6.7 KiB
235 lines
6.7 KiB
#include "config.h"
#include <Databases/DatabaseHDFS.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Poco/URI.h>
#include <re2/re2.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
extern const int BAD_ARGUMENTS;
extern const int FILE_DOESNT_EXIST;
extern const int UNACCEPTABLE_URL;
extern const int ACCESS_DENIED;
extern const int DATABASE_ACCESS_DENIED;
extern const int HDFS_ERROR;
static constexpr std::string_view HDFS_HOST_REGEXP = "^hdfs://[^/]*";
DatabaseHDFS::DatabaseHDFS(const String & name_, const String & source_url, ContextPtr context_)
: IDatabase(name_)
, WithContext(context_->getGlobalContext())
, source(source_url)
, log(&Poco::Logger::get("DatabaseHDFS(" + name_ + ")"))
if (!source.empty())
if (!re2::RE2::FullMatch(source, std::string(HDFS_HOST_REGEXP)))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs host: {}. "
"It should have structure 'hdfs://<host_name>:<port>'", source);
void DatabaseHDFS::addTable(const std::string & table_name, StoragePtr table_storage) const
std::lock_guard lock(mutex);
auto [_, inserted] = loaded_tables.emplace(table_name, table_storage);
if (!inserted)
throw Exception(
"Table with name `{}` already exists in database `{}` (engine {})",
table_name, getDatabaseName(), getEngineName());
std::string DatabaseHDFS::getTablePath(const std::string & table_name) const
if (table_name.starts_with("hdfs://"))
return table_name;
if (source.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad hdfs url: {}. "
"It should have structure 'hdfs://<host_name>:<port>/path'", table_name);
return fs::path(source) / table_name;
bool DatabaseHDFS::checkUrl(const std::string & url, ContextPtr context_, bool throw_on_error) const
catch (...)
if (throw_on_error)
return false;
return true;
bool DatabaseHDFS::isTableExist(const String & name, ContextPtr context_) const
std::lock_guard lock(mutex);
if (loaded_tables.find(name) != loaded_tables.end())
return true;
return checkUrl(name, context_, false);
StoragePtr DatabaseHDFS::getTableImpl(const String & name, ContextPtr context_) const
/// Check if the table exists in the loaded tables map.
std::lock_guard lock(mutex);
auto it = loaded_tables.find(name);
if (it != loaded_tables.end())
return it->second;
auto url = getTablePath(name);
checkUrl(url, context_, true);
auto args = makeASTFunction("hdfs", std::make_shared<ASTLiteral>(url));
auto table_function = TableFunctionFactory::instance().get(args, context_);
if (!table_function)
return nullptr;
/// TableFunctionHDFS throws exceptions, if table cannot be created.
auto table_storage = table_function->execute(args, context_, name);
if (table_storage)
addTable(name, table_storage);
return table_storage;
StoragePtr DatabaseHDFS::getTable(const String & name, ContextPtr context_) const
/// Rethrow all exceptions from TableFunctionHDFS to show correct error to user.
if (auto storage = getTableImpl(name, context_))
return storage;
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
StoragePtr DatabaseHDFS::tryGetTable(const String & name, ContextPtr context_) const
return getTableImpl(name, context_);
catch (const Exception & e)
// Ignore exceptions thrown by TableFunctionHDFS, which indicate that there is no table
if (e.code() == ErrorCodes::BAD_ARGUMENTS
|| e.code() == ErrorCodes::ACCESS_DENIED
|| e.code() == ErrorCodes::DATABASE_ACCESS_DENIED
|| e.code() == ErrorCodes::FILE_DOESNT_EXIST
|| e.code() == ErrorCodes::UNACCEPTABLE_URL
|| e.code() == ErrorCodes::HDFS_ERROR
|| e.code() == ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE)
return nullptr;
catch (const Poco::URISyntaxException &)
return nullptr;
bool DatabaseHDFS::empty() const
std::lock_guard lock(mutex);
return loaded_tables.empty();
ASTPtr DatabaseHDFS::getCreateDatabaseQuery() const
const auto & settings = getContext()->getSettingsRef();
ParserCreateQuery parser;
const String query = fmt::format("CREATE DATABASE {} ENGINE = HDFS('{}')", backQuoteIfNeed(getDatabaseName()), source);
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0, settings.max_parser_depth);
if (const auto database_comment = getDatabaseComment(); !database_comment.empty())
auto & ast_create_query = ast->as<ASTCreateQuery &>();
ast_create_query.set(ast_create_query.comment, std::make_shared<ASTLiteral>(database_comment));
return ast;
void DatabaseHDFS::shutdown()
Tables tables_snapshot;
std::lock_guard lock(mutex);
tables_snapshot = loaded_tables;
for (const auto & kv : tables_snapshot)
auto table_id = kv.second->getStorageID();
std::lock_guard lock(mutex);
* Returns an empty vector because the database is read-only and no tables can be backed up
std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseHDFS::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const
return {};
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
DatabaseTablesIteratorPtr DatabaseHDFS::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
} // DB