Add missing methods

This commit is contained in:
kssenii 2021-04-12 10:13:53 +00:00
parent ab06ca3144
commit ed8fec4cb1
4 changed files with 38 additions and 25 deletions

View File

@ -34,13 +34,13 @@ DiskHDFS::DiskHDFS(
const String & name_,
const String & hdfs_name_,
const String & metadata_path_,
const Context & context_)
: log(&Poco::Logger::get("DiskHDFS"))
ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get("DiskHDFS"))
, name(name_)
, hdfs_name(hdfs_name_)
, metadata_path(std::move(metadata_path_))
, context(context_)
, config(context.getGlobalContext().getConfigRef())
, config(context_->getGlobalContext()->getConfigRef())
, builder(createHDFSBuilder(hdfs_name, config))
, fs(createHDFSFS(builder.get()))
{
@ -131,7 +131,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
"Read from file by path: {}. Existing HDFS objects: {}",
backQuote(metadata_path + path), metadata.hdfs_objects.size());
return std::make_unique<ReadIndirectBufferFromHDFS>(context, hdfs_name, "", metadata, buf_size);
return std::make_unique<ReadIndirectBufferFromHDFS>(getContext(), hdfs_name, "", metadata, buf_size);
}
@ -153,7 +153,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
LOG_DEBUG(log,
"Write to file by path: {}. New hdfs path: {}", backQuote(metadata_path + path), hdfs_path);
return std::make_unique<WriteIndirectBufferFromHDFS>(context, hdfs_path, file_name, metadata, buf_size);
return std::make_unique<WriteIndirectBufferFromHDFS>(getContext(), hdfs_path, file_name, metadata, buf_size);
}
else
{
@ -163,12 +163,12 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
"Append to file by path: {}. New hdfs path: {}. Existing HDFS objects: {}",
backQuote(metadata_path + path), hdfs_path, metadata.hdfs_objects.size());
return std::make_unique<WriteIndirectBufferFromHDFS>(context, hdfs_path, file_name, metadata, buf_size);
return std::make_unique<WriteIndirectBufferFromHDFS>(getContext(), hdfs_path, file_name, metadata, buf_size);
}
}
void DiskHDFS::remove(const String & path)
void DiskHDFS::removeFile(const String & path)
{
LOG_DEBUG(&Poco::Logger::get("DiskHDFS"), "Remove file by path: {}", backQuote(metadata_path + path));
@ -202,6 +202,14 @@ void DiskHDFS::remove(const String & path)
}
void DiskHDFS::removeFileIfExists(const String & path)
{
int exists_status = hdfsExists(fs.get(), path.data());
if (exists_status == 0)
removeFile(path);
}
void DiskHDFS::removeRecursive(const String & path)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
@ -209,7 +217,7 @@ void DiskHDFS::removeRecursive(const String & path)
Poco::File file(metadata_path + path);
if (file.isFile())
{
remove(path);
removeFile(path);
}
else
{
@ -220,6 +228,12 @@ void DiskHDFS::removeRecursive(const String & path)
}
void DiskHDFS::removeDirectory(const String & path)
{
Poco::File(metadata_path + path).remove();
}
void DiskHDFS::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
@ -305,9 +319,9 @@ void registerDiskHDFS(DiskFactory & factory)
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr
ContextConstPtr context) -> DiskPtr
{
Poco::File disk{context.getPath() + "disks/" + name};
Poco::File disk{context->getPath() + "disks/" + name};
disk.createDirectories();
DB::String uri{config.getString(config_prefix + ".endpoint")};
@ -315,13 +329,15 @@ void registerDiskHDFS(DiskFactory & factory)
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
String metadata_path = context.getPath() + "disks/" + name + "/";
return std::make_shared<DiskHDFS>(name, uri, metadata_path, context);
String metadata_path = context->getPath() + "disks/" + name + "/";
auto copy_context = Context::createCopy(context);
return std::make_shared<DiskHDFS>(name, uri, metadata_path, copy_context);
};
factory.registerDiskType("hdfs", creator);
}
//void DiskHDFS::copyFile(const String & from_path, const String & to_path)
//{
// if (exists(to_path))

View File

@ -16,7 +16,7 @@ namespace DB
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
* that contains HDFS object key with actual data.
*/
class DiskHDFS : public IDisk
class DiskHDFS : public IDisk, WithContext
{
friend class DiskHDFSReservation;
@ -27,7 +27,7 @@ public:
const String & name_,
const String & hdfs_name_,
const String & metadata_path_,
const Context & context_);
ContextPtr context_);
DiskType::Type getType() const override { return DiskType::Type::HDFS; }
@ -67,15 +67,13 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & /* from_path */, const std::shared_ptr<IDisk> & /* to_disk */, const String & /* to_path */) override {}
void listFiles(const String & path, std::vector<String> & file_names) override;
void removeFile(const String & /* path */) override {}
void removeFile(const String & path) override;
void removeFileIfExists(const String & /* path */) override {}
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & /* path */) override {}
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;
@ -113,7 +111,6 @@ private:
const String name;
const String hdfs_name;
String metadata_path;
Context context;
const Poco::Util::AbstractConfiguration & config;
HDFSBuilderWrapper builder;

View File

@ -18,12 +18,12 @@ class ReadIndirectBufferFromHDFS final : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromHDFS(
const Context & context,
ContextPtr context,
const String & hdfs_name_,
const String & /* bucket */,
Metadata metadata_,
size_t buf_size_)
: config(context.getGlobalContext().getConfigRef())
: config(context->getGlobalContext()->getConfigRef())
, hdfs_name(hdfs_name_)
, metadata(std::move(metadata_))
, buf_size(buf_size_)

View File

@ -12,13 +12,13 @@ class WriteIndirectBufferFromHDFS final : public WriteBufferFromFileBase
{
public:
WriteIndirectBufferFromHDFS(
const Context & context,
ContextPtr context,
const String & hdfs_name_,
const String & hdfs_path_,
Metadata metadata_,
size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromHDFS(hdfs_name_, context.getGlobalContext().getConfigRef(), buf_size_))
, impl(WriteBufferFromHDFS(hdfs_name_, context->getGlobalContext()->getConfigRef(), buf_size_))
, metadata(std::move(metadata_))
, hdfs_path(hdfs_path_)
{