Merge pull request #11058 from overshov/master

Implement IDisk interface for HDFS
This commit is contained in:
Kseniia Sumarokova 2021-05-26 14:14:06 +03:00 committed by GitHub
commit cb8af0fd4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 2162 additions and 850 deletions

View File

@ -80,7 +80,8 @@ RUN python3 -m pip install \
redis \
tzlocal \
urllib3 \
requests-kerberos
requests-kerberos \
pyhdfs
COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/

View File

@ -101,6 +101,7 @@ endif()
if (USE_HDFS)
add_headers_and_sources(dbms Storages/HDFS)
add_headers_and_sources(dbms Disks/HDFS)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})

View File

@ -11,7 +11,8 @@ struct DiskType
{
Local,
RAM,
S3
S3,
HDFS
};
static String toString(Type disk_type)
{
@ -23,10 +24,11 @@ struct DiskType
return "memory";
case Type::S3:
return "s3";
case Type::HDFS:
return "hdfs";
}
__builtin_unreachable();
}
};
}

194
src/Disks/HDFS/DiskHDFS.cpp Normal file
View File

@ -0,0 +1,194 @@
#include <Disks/HDFS/DiskHDFS.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/WriteIndirectBufferFromRemoteFS.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
class HDFSPathKeeper : public RemoteFSPathKeeper
{
public:
using Chunk = std::vector<std::string>;
using Chunks = std::list<Chunk>;
explicit HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {}
void addPath(const String & path) override
{
if (chunks.empty() || chunks.back().size() >= chunk_limit)
{
chunks.push_back(Chunks::value_type());
chunks.back().reserve(chunk_limit);
}
chunks.back().push_back(path.data());
}
void removePaths(const std::function<void(Chunk &&)> & remove_chunk_func)
{
for (auto & chunk : chunks)
remove_chunk_func(std::move(chunk));
}
private:
Chunks chunks;
};
/// Reads data from HDFS using stored paths in metadata.
class ReadIndirectBufferFromHDFS final : public ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>
{
public:
ReadIndirectBufferFromHDFS(
const Poco::Util::AbstractConfiguration & config_,
const String & hdfs_uri_,
DiskHDFS::Metadata metadata_,
size_t buf_size_)
: ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>(metadata_)
, config(config_)
, buf_size(buf_size_)
{
const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2);
hdfs_directory = hdfs_uri_.substr(begin_of_path);
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
}
std::unique_ptr<ReadBufferFromHDFS> createReadBuffer(const String & path) override
{
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_directory + path, config, buf_size);
}
private:
const Poco::Util::AbstractConfiguration & config;
String hdfs_uri;
String hdfs_directory;
size_t buf_size;
};
DiskHDFS::DiskHDFS(
const String & disk_name_,
const String & hdfs_root_path_,
SettingsPtr settings_,
const String & metadata_path_,
const Poco::Util::AbstractConfiguration & config_)
: IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS", settings_->thread_pool_size)
, config(config_)
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_))
{
}
std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const
{
auto metadata = readMeta(path);
LOG_DEBUG(log,
"Read from file by path: {}. Existing HDFS objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
auto reader = std::make_unique<ReadIndirectBufferFromHDFS>(config, remote_fs_root_path, metadata, buf_size);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new HDFS object.
auto file_name = getRandomName();
auto hdfs_path = remote_fs_root_path + file_name;
LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append",
backQuote(metadata_path + path), remote_fs_root_path + hdfs_path);
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
config, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(std::move(hdfs_buffer),
std::move(metadata),
file_name);
}
RemoteFSPathKeeperPtr DiskHDFS::createFSPathKeeper() const
{
return std::make_shared<HDFSPathKeeper>(settings->objects_chunk_size_to_delete);
}
void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper)
{
auto * hdfs_paths_keeper = dynamic_cast<HDFSPathKeeper *>(fs_paths_keeper.get());
if (hdfs_paths_keeper)
hdfs_paths_keeper->removePaths([&](std::vector<std::string> && chunk)
{
for (const auto & hdfs_object_path : chunk)
{
const String & hdfs_path = hdfs_object_path;
const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2);
/// Add path from root to file name
int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0);
if (res == -1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path);
}
});
}
namespace
{
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
return std::make_unique<DiskHDFSSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
}
}
void registerDiskHDFS(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextConstPtr context_) -> DiskPtr
{
Poco::File disk{context_->getPath() + "disks/" + name};
disk.createDirectories();
String uri{config.getString(config_prefix + ".endpoint")};
if (uri.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri);
String metadata_path = context_->getPath() + "disks/" + name + "/";
return std::make_shared<DiskHDFS>(
name, uri,
getSettings(config, config_prefix),
metadata_path, config);
};
factory.registerDiskType("hdfs", creator);
}
}

72
src/Disks/HDFS/DiskHDFS.h Normal file
View File

@ -0,0 +1,72 @@
#pragma once
#include <Disks/IDiskRemote.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Core/UUID.h>
#include <memory>
namespace DB
{
struct DiskHDFSSettings
{
size_t min_bytes_for_seek;
int thread_pool_size;
int objects_chunk_size_to_delete;
DiskHDFSSettings(
int min_bytes_for_seek_,
int thread_pool_size_,
int objects_chunk_size_to_delete_)
: min_bytes_for_seek(min_bytes_for_seek_)
, thread_pool_size(thread_pool_size_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {}
};
/**
* Storage for persisting data in HDFS and metadata on the local disk.
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
* that contains HDFS object key with actual data.
*/
class DiskHDFS final : public IDiskRemote
{
public:
using SettingsPtr = std::unique_ptr<DiskHDFSSettings>;
DiskHDFS(
const String & disk_name_,
const String & hdfs_root_path_,
SettingsPtr settings_,
const String & metadata_path_,
const Poco::Util::AbstractConfiguration & config_);
DiskType::Type getType() const override { return DiskType::Type::HDFS; }
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
size_t estimated_size,
size_t aio_threshold,
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override;
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
private:
String getRandomName() { return toString(UUIDHelpers::generateV4()); }
const Poco::Util::AbstractConfiguration & config;
HDFSBuilderWrapper hdfs_builder;
HDFSFSPtr hdfs_fs;
SettingsPtr settings;
};
}

View File

@ -7,6 +7,7 @@
#include <Common/Exception.h>
#include <Disks/Executor.h>
#include <Disks/DiskType.h>
#include "Disks/Executor.h"
#include <memory>
#include <mutex>
@ -178,17 +179,17 @@ public:
virtual void removeRecursive(const String & path) = 0;
/// Remove file. Throws exception if file doesn't exists or if directory is not empty.
/// Differs from removeFile for S3 disks
/// Differs from removeFile for S3/HDFS disks
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual void removeSharedFile(const String & path, bool) { removeFile(path); }
/// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists.
/// Differs from removeRecursive for S3 disks
/// Differs from removeRecursive for S3/HDFS disks
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual void removeSharedRecursive(const String & path, bool) { removeRecursive(path); }
/// Remove file or directory if it exists.
/// Differs from removeFileIfExists for S3 disks
/// Differs from removeFileIfExists for S3/HDFS disks
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual void removeSharedFileIfExists(const String & path, bool) { removeFileIfExists(path); }

487
src/Disks/IDiskRemote.cpp Normal file
View File

@ -0,0 +1,487 @@
#include <Disks/IDiskRemote.h>
#include "Disks/DiskFactory.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <common/logger_useful.h>
#include <Common/checkStackSize.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DISK_INDEX;
extern const int UNKNOWN_FORMAT;
extern const int FILE_ALREADY_EXISTS;
extern const int PATH_ACCESS_DENIED;;
extern const int CANNOT_DELETE_DIRECTORY;
}
/// Load metadata by path or create empty if `create` flag is set.
IDiskRemote::Metadata::Metadata(
const String & remote_fs_root_path_,
const String & disk_path_,
const String & metadata_file_path_,
bool create)
: remote_fs_root_path(remote_fs_root_path_)
, disk_path(disk_path_)
, metadata_file_path(metadata_file_path_)
, total_size(0), remote_fs_objects(0), ref_count(0)
{
if (create)
return;
try
{
ReadBufferFromFile buf(disk_path + metadata_file_path, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, buf);
if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG)
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
disk_path + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
assertChar('\n', buf);
UInt32 remote_fs_objects_count;
readIntText(remote_fs_objects_count, buf);
assertChar('\t', buf);
readIntText(total_size, buf);
assertChar('\n', buf);
remote_fs_objects.resize(remote_fs_objects_count);
for (size_t i = 0; i < remote_fs_objects_count; ++i)
{
String remote_fs_object_path;
size_t remote_fs_object_size;
readIntText(remote_fs_object_size, buf);
assertChar('\t', buf);
readEscapedString(remote_fs_object_path, buf);
if (version == VERSION_ABSOLUTE_PATHS)
{
if (!boost::algorithm::starts_with(remote_fs_object_path, remote_fs_root_path))
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond S3 root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, disk_path_);
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
assertChar('\n', buf);
remote_fs_objects[i] = {remote_fs_object_path, remote_fs_object_size};
}
readIntText(ref_count, buf);
assertChar('\n', buf);
if (version >= VERSION_READ_ONLY_FLAG)
{
readBoolText(read_only, buf);
assertChar('\n', buf);
}
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
throw;
throw Exception("Failed to read metadata file", e, ErrorCodes::UNKNOWN_FORMAT);
}
}
void IDiskRemote::Metadata::addObject(const String & path, size_t size)
{
total_size += size;
remote_fs_objects.emplace_back(path, size);
}
/// Fsync metadata file if 'sync' flag is set.
void IDiskRemote::Metadata::save(bool sync)
{
WriteBufferFromFile buf(disk_path + metadata_file_path, 1024);
writeIntText(VERSION_RELATIVE_PATHS, buf);
writeChar('\n', buf);
writeIntText(remote_fs_objects.size(), buf);
writeChar('\t', buf);
writeIntText(total_size, buf);
writeChar('\n', buf);
for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects)
{
writeIntText(remote_fs_object_size, buf);
writeChar('\t', buf);
writeEscapedString(remote_fs_object_path, buf);
writeChar('\n', buf);
}
writeIntText(ref_count, buf);
writeChar('\n', buf);
writeBoolText(read_only, buf);
writeChar('\n', buf);
buf.finalize();
if (sync)
buf.sync();
}
IDiskRemote::Metadata IDiskRemote::readOrCreateMetaForWriting(const String & path, WriteMode mode)
{
bool exist = exists(path);
if (exist)
{
auto metadata = readMeta(path);
if (metadata.read_only)
throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED);
if (mode == WriteMode::Rewrite)
removeFile(path); /// Remove for re-write.
else
return metadata;
}
auto metadata = createMeta(path);
/// Save empty metadata to disk to have ability to get file size while buffer is not finalized.
metadata.save();
return metadata;
}
IDiskRemote::Metadata IDiskRemote::readMeta(const String & path) const
{
return Metadata(remote_fs_root_path, metadata_path, path);
}
IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const
{
return Metadata(remote_fs_root_path, metadata_path, path, true);
}
void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
{
LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path));
Poco::File file(metadata_path + path);
if (!file.isFile())
throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path);
try
{
auto metadata = readMeta(path);
/// If there is no references - delete content from remote FS.
if (metadata.ref_count == 0)
{
file.remove();
for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects)
fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path);
}
else /// In other case decrement number of references, save metadata and delete file.
{
--metadata.ref_count;
metadata.save();
file.remove();
}
}
catch (const Exception & e)
{
/// If it's impossible to read meta - just remove it from FS.
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
{
LOG_WARNING(log,
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
backQuote(path), e.nested() ? e.nested()->message() : e.message());
file.remove();
}
else
throw;
}
}
void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
Poco::File file(metadata_path + path);
if (file.isFile())
{
removeMeta(path, fs_paths_keeper);
}
else
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
removeMetaRecursive(it->path(), fs_paths_keeper);
file.remove();
}
}
DiskPtr DiskRemoteReservation::getDisk(size_t i) const
{
if (i != 0)
throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX);
return disk;
}
void DiskRemoteReservation::update(UInt64 new_size)
{
std::lock_guard lock(disk->reservation_mutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
DiskRemoteReservation::~DiskRemoteReservation()
{
try
{
std::lock_guard lock(disk->reservation_mutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName());
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName());
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
IDiskRemote::IDiskRemote(
const String & name_,
const String & remote_fs_root_path_,
const String & metadata_path_,
const String & log_name_,
size_t thread_pool_size)
: IDisk(std::make_unique<AsyncExecutor>(log_name_, thread_pool_size))
, log(&Poco::Logger::get(log_name_))
, name(name_)
, remote_fs_root_path(remote_fs_root_path_)
, metadata_path(metadata_path_)
{
}
bool IDiskRemote::exists(const String & path) const
{
return Poco::File(metadata_path + path).exists();
}
bool IDiskRemote::isFile(const String & path) const
{
return Poco::File(metadata_path + path).isFile();
}
void IDiskRemote::createFile(const String & path)
{
/// Create empty metadata file.
auto metadata = createMeta(path);
metadata.save();
}
size_t IDiskRemote::getFileSize(const String & path) const
{
auto metadata = readMeta(path);
return metadata.total_size;
}
void IDiskRemote::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path);
}
void IDiskRemote::replaceFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
{
const String tmp_path = to_path + ".old";
moveFile(to_path, tmp_path);
moveFile(from_path, to_path);
removeFile(tmp_path);
}
else
moveFile(from_path, to_path);
}
void IDiskRemote::removeFileIfExists(const String & path)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
if (Poco::File(metadata_path + path).exists())
{
removeMeta(path, fs_paths_keeper);
removeFromRemoteFS(fs_paths_keeper);
}
}
void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMeta(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeFromRemoteFS(fs_paths_keeper);
}
void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs)
{
RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper();
removeMetaRecursive(path, fs_paths_keeper);
if (!keep_in_remote_fs)
removeFromRemoteFS(fs_paths_keeper);
}
void IDiskRemote::setReadOnly(const String & path)
{
/// We should store read only flag inside metadata file (instead of using FS flag),
/// because we modify metadata file when create hard-links from it.
auto metadata = readMeta(path);
metadata.read_only = true;
metadata.save();
}
bool IDiskRemote::isDirectory(const String & path) const
{
return Poco::File(metadata_path + path).isDirectory();
}
void IDiskRemote::createDirectory(const String & path)
{
Poco::File(metadata_path + path).createDirectory();
}
void IDiskRemote::createDirectories(const String & path)
{
Poco::File(metadata_path + path).createDirectories();
}
void IDiskRemote::clearDirectory(const String & path)
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
if (isFile(it->path()))
removeFile(it->path());
}
void IDiskRemote::removeDirectory(const String & path)
{
Poco::File(metadata_path + path).remove();
}
DiskDirectoryIteratorPtr IDiskRemote::iterateDirectory(const String & path)
{
return std::make_unique<RemoteDiskDirectoryIterator>(metadata_path + path, path);
}
void IDiskRemote::listFiles(const String & path, std::vector<String> & file_names)
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
file_names.push_back(it->name());
}
void IDiskRemote::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
Poco::File(metadata_path + path).setLastModified(timestamp);
}
Poco::Timestamp IDiskRemote::getLastModified(const String & path)
{
return Poco::File(metadata_path + path).getLastModified();
}
void IDiskRemote::createHardLink(const String & src_path, const String & dst_path)
{
/// Increment number of references.
auto src = readMeta(src_path);
++src.ref_count;
src.save();
/// Create FS hardlink to metadata file.
DB::createHardLink(metadata_path + src_path, metadata_path + dst_path);
}
ReservationPtr IDiskRemote::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
return {};
return std::make_unique<DiskRemoteReservation>(std::static_pointer_cast<IDiskRemote>(shared_from_this()), bytes);
}
bool IDiskRemote::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
if (bytes == 0)
{
LOG_DEBUG(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name));
++reservation_count;
return true;
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.",
ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return true;
}
return false;
}
}

283
src/Disks/IDiskRemote.h Normal file
View File

@ -0,0 +1,283 @@
#pragma once
#include <Common/config.h>
#include <atomic>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
#include <Poco/DirectoryIterator.h>
#include <utility>
#include <Common/MultiVersion.h>
#include <Common/ThreadPool.h>
namespace DB
{
/// Helper class to collect paths into chunks of maximum size.
/// For s3 it is Aws::vector<ObjectIdentifier>, for hdfs it is std::vector<std::string>.
class RemoteFSPathKeeper
{
public:
RemoteFSPathKeeper(size_t chunk_limit_) : chunk_limit(chunk_limit_) {}
virtual ~RemoteFSPathKeeper() = default;
virtual void addPath(const String & path) = 0;
protected:
size_t chunk_limit;
};
using RemoteFSPathKeeperPtr = std::shared_ptr<RemoteFSPathKeeper>;
/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS)
class IDiskRemote : public IDisk
{
friend class DiskRemoteReservation;
public:
IDiskRemote(
const String & name_,
const String & remote_fs_root_path_,
const String & metadata_path_,
const String & log_name_,
size_t thread_pool_size);
struct Metadata;
const String & getName() const final override { return name; }
const String & getPath() const final override { return metadata_path; }
Metadata readMeta(const String & path) const;
Metadata createMeta(const String & path) const;
Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode);
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getKeepingFreeSpace() const override { return 0; }
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
void createFile(const String & path) override;
size_t getFileSize(const String & path) const override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void removeFile(const String & path) override { removeSharedFile(path, false); }
void removeFileIfExists(const String & path) override;
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
void removeSharedFile(const String & path, bool keep_in_remote_fs) override;
void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
void setReadOnly(const String & path) override;
bool isDirectory(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
void removeDirectory(const String & path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
ReservationPtr reserve(UInt64 bytes) override;
virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0;
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;
protected:
Poco::Logger * log;
const String name;
const String remote_fs_root_path;
const String metadata_path;
private:
void removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
void removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper);
bool tryReserve(UInt64 bytes);
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
};
using RemoteDiskPtr = std::shared_ptr<IDiskRemote>;
/// Remote FS (S3, HDFS) metadata file layout:
/// Number of FS objects, Total size of all FS objects.
/// Each FS object represents path where object located in FS and size of object.
struct IDiskRemote::Metadata
{
/// Metadata file version.
static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1;
static constexpr UInt32 VERSION_RELATIVE_PATHS = 2;
static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3;
using PathAndSize = std::pair<String, size_t>;
/// Remote FS (S3, HDFS) root path.
const String & remote_fs_root_path;
/// Disk path.
const String & disk_path;
/// Relative path to metadata file on local FS.
String metadata_file_path;
/// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0;
/// Remote FS (S3, HDFS) objects paths and their sizes.
std::vector<PathAndSize> remote_fs_objects;
/// Number of references (hardlinks) to this metadata file.
UInt32 ref_count = 0;
/// Flag indicates that file is read only.
bool read_only = false;
/// Load metadata by path or create empty if `create` flag is set.
Metadata(const String & remote_fs_root_path_,
const String & disk_path_,
const String & metadata_file_path_,
bool create = false);
void addObject(const String & path, size_t size);
/// Fsync metadata file if 'sync' flag is set.
void save(bool sync = false);
};
class RemoteDiskDirectoryIterator final : public IDiskDirectoryIterator
{
public:
RemoteDiskDirectoryIterator(const String & full_path, const String & folder_path_) : iter(full_path), folder_path(folder_path_) {}
void next() override { ++iter; }
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
String path() const override
{
if (iter->isDirectory())
return folder_path + iter.name() + '/';
else
return folder_path + iter.name();
}
String name() const override { return iter.name(); }
private:
Poco::DirectoryIterator iter;
String folder_path;
};
class DiskRemoteReservation final : public IReservation
{
public:
DiskRemoteReservation(const RemoteDiskPtr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk(size_t i) const override;
Disks getDisks() const override { return {disk}; }
void update(UInt64 new_size) override;
~DiskRemoteReservation() override;
private:
RemoteDiskPtr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
public:
explicit AsyncExecutor(const String & name_, int thread_pool_size)
: name(name_)
, pool(ThreadPool(thread_pool_size)) {}
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
pool.scheduleOrThrowOnError(
[promise, task]()
{
try
{
task();
promise->set_value();
}
catch (...)
{
tryLogCurrentException("Failed to run async task");
try
{
promise->set_exception(std::current_exception());
}
catch (...) {}
}
});
return promise->get_future();
}
void setMaxThreads(size_t threads)
{
pool.setMaxThreads(threads);
}
private:
String name;
ThreadPool pool;
};
}

View File

@ -0,0 +1,128 @@
#include "ReadIndirectBufferFromRemoteFS.h"
#if USE_AWS_S3 || USE_HDFS
#include <IO/ReadBufferFromS3.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
}
template<typename T>
ReadIndirectBufferFromRemoteFS<T>::ReadIndirectBufferFromRemoteFS(
IDiskRemote::Metadata metadata_)
: metadata(std::move(metadata_))
{
}
template<typename T>
off_t ReadIndirectBufferFromRemoteFS<T>::seek(off_t offset_, int whence)
{
if (whence == SEEK_CUR)
{
/// If position within current working buffer - shift pos.
if (!working_buffer.empty() && size_t(getPosition() + offset_) < absolute_position)
{
pos += offset_;
return getPosition();
}
else
{
absolute_position += offset_;
}
}
else if (whence == SEEK_SET)
{
/// If position within current working buffer - shift pos.
if (!working_buffer.empty() && size_t(offset_) >= absolute_position - working_buffer.size()
&& size_t(offset_) < absolute_position)
{
pos = working_buffer.end() - (absolute_position - offset_);
return getPosition();
}
else
{
absolute_position = offset_;
}
}
else
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
current_buf = initialize();
pos = working_buffer.end();
return absolute_position;
}
template<typename T>
std::unique_ptr<T> ReadIndirectBufferFromRemoteFS<T>::initialize()
{
size_t offset = absolute_position;
for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i)
{
current_buf_idx = i;
const auto & [file_path, size] = metadata.remote_fs_objects[i];
if (size > offset)
{
auto buf = createReadBuffer(file_path);
buf->seek(offset, SEEK_SET);
return buf;
}
offset -= size;
}
return nullptr;
}
template<typename T>
bool ReadIndirectBufferFromRemoteFS<T>::nextImpl()
{
/// Find first available buffer that fits to given offset.
if (!current_buf)
current_buf = initialize();
/// If current buffer has remaining data - use it.
if (current_buf && current_buf->next())
{
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
return true;
}
/// If there is no available buffers - nothing to read.
if (current_buf_idx + 1 >= metadata.remote_fs_objects.size())
return false;
++current_buf_idx;
const auto & path = metadata.remote_fs_objects[current_buf_idx].first;
current_buf = createReadBuffer(path);
current_buf->next();
working_buffer = current_buf->buffer();
absolute_position += working_buffer.size();
return true;
}
#if USE_AWS_S3
template
class ReadIndirectBufferFromRemoteFS<ReadBufferFromS3>;
#endif
#if USE_HDFS
template
class ReadIndirectBufferFromRemoteFS<ReadBufferFromHDFS>;
#endif
}
#endif

View File

@ -0,0 +1,46 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3 || USE_HDFS
#include <IO/ReadBufferFromFile.h>
#include <Disks/IDiskRemote.h>
#include <utility>
namespace DB
{
/// Reads data from S3/HDFS using stored paths in metadata.
template <typename T>
class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase
{
public:
ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_);
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override { return absolute_position - available(); }
String getFileName() const override { return metadata.metadata_file_path; }
virtual std::unique_ptr<T> createReadBuffer(const String & path) = 0;
protected:
IDiskRemote::Metadata metadata;
private:
std::unique_ptr<T> initialize();
bool nextImpl() override;
size_t absolute_position = 0;
size_t current_buf_idx = 0;
std::unique_ptr<T> current_buf;
};
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,6 @@
#include <atomic>
#include <common/logger_useful.h>
#include <Common/MultiVersion.h>
#include "Disks/DiskFactory.h"
#include "Disks/Executor.h"
@ -12,6 +11,7 @@
#include <Poco/DirectoryIterator.h>
#include <re2/re2.h>
#include <Disks/IDiskRemote.h>
namespace DB
@ -28,7 +28,8 @@ struct DiskS3Settings
size_t min_bytes_for_seek_,
bool send_metadata_,
int thread_pool_size_,
int list_object_keys_size_);
int list_object_keys_size_,
int objects_chunk_size_to_delete_);
std::shared_ptr<Aws::S3::S3Client> client;
size_t s3_max_single_read_retries;
@ -38,25 +39,24 @@ struct DiskS3Settings
bool send_metadata;
int thread_pool_size;
int list_object_keys_size;
int objects_chunk_size_to_delete;
};
/**
* Storage for persisting data in S3 and metadata on the local disk.
* Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file)
* that contains S3 object key with actual data.
*/
class DiskS3 : public IDisk
class DiskS3 final : public IDiskRemote
{
public:
using ObjectMetadata = std::map<std::string, std::string>;
using Futures = std::vector<std::future<void>>;
using SettingsPtr = std::unique_ptr<DiskS3Settings>;
using GetDiskSettings = std::function<SettingsPtr(const Poco::Util::AbstractConfiguration &, const String, ContextConstPtr)>;
friend class DiskS3Reservation;
class AwsS3KeyKeeper;
struct Metadata;
struct RestoreInformation;
DiskS3(
@ -67,44 +67,6 @@ public:
SettingsPtr settings_,
GetDiskSettings settings_getter_);
const String & getName() const override { return name; }
const String & getPath() const override { return metadata_path; }
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getKeepingFreeSpace() const override { return 0; }
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
void replaceFile(const String & from_path, const String & to_path) override;
void listFiles(const String & path, std::vector<String> & file_names) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
size_t buf_size,
@ -118,25 +80,16 @@ public:
size_t buf_size,
WriteMode mode) override;
void removeFile(const String & path) override { removeSharedFile(path, false); }
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override { removeSharedRecursive(path, false); }
void removeFromRemoteFS(RemoteFSPathKeeperPtr keeper) override;
void removeSharedFile(const String & path, bool keep_s3) override;
void removeSharedRecursive(const String & path, bool keep_s3) override;
RemoteFSPathKeeperPtr createFSPathKeeper() const override;
void moveFile(const String & from_path, const String & to_path, bool send_metadata);
void moveFile(const String & from_path, const String & to_path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void createHardLink(const String & src_path, const String & dst_path, bool send_metadata);
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
Poco::Timestamp getLastModified(const String & path) override;
void createFile(const String & path) override;
void setReadOnly(const String & path) override;
DiskType::Type getType() const override { return DiskType::Type::S3; }
void shutdown() override;
@ -157,16 +110,6 @@ public:
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override;
private:
bool tryReserve(UInt64 bytes);
void removeMeta(const String & path, AwsS3KeyKeeper & keys);
void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys);
void removeAws(const AwsS3KeyKeeper & keys);
Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode);
Metadata readMeta(const String & path) const;
Metadata createMeta(const String & path) const;
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
/// Converts revision to binary string with leading zeroes (64 bit).
static String revisionToString(UInt64 revision);
@ -200,19 +143,12 @@ private:
/// Forms detached path '../../detached/part_name/' from '../../part_name/'
static String pathToDetached(const String & source_path);
const String name;
const String bucket;
const String s3_root_path;
const String metadata_path;
MultiVersion<DiskS3Settings> current_settings;
MultiVersion<DiskS3Settings> current_settings;
/// Gets disk settings from context.
GetDiskSettings settings_getter;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter = 0;
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0;
@ -229,8 +165,6 @@ private:
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
/// Directories with data.
const std::vector<String> data_roots {"data", "store"};
Poco::Logger * log = &Poco::Logger::get("DiskS3");
};
}

View File

@ -156,7 +156,8 @@ std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfigurat
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".list_object_keys_size", 1000));
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
}
}
@ -225,4 +226,3 @@ void registerDiskS3(DiskFactory & factory)
void registerDiskS3(DiskFactory &) {}
#endif

View File

@ -0,0 +1,71 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#if USE_AWS_S3 || USE_HDFS
#include <IO/WriteBufferFromS3.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
namespace DB
{
/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS.
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, metadata(std::move(metadata_))
, remote_fs_path(remote_fs_path_)
{
}
template <typename T>
WriteIndirectBufferFromRemoteFS<T>::~WriteIndirectBufferFromRemoteFS()
{
try
{
WriteIndirectBufferFromRemoteFS::finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::finalize()
{
if (finalized)
return;
WriteBufferFromFileDecorator::finalize();
metadata.addObject(remote_fs_path, count());
metadata.save();
}
template <typename T>
void WriteIndirectBufferFromRemoteFS<T>::sync()
{
if (finalized)
metadata.save(true);
}
#if USE_AWS_S3
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromS3>;
#endif
#if USE_HDFS
template
class WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>;
#endif
}
#endif

View File

@ -0,0 +1,39 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3 || USE_HDFS
#include <Disks/IDiskRemote.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileDecorator.h>
namespace DB
{
/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS.
template <typename T>
class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator
{
public:
WriteIndirectBufferFromRemoteFS(
std::unique_ptr<T> impl_,
IDiskRemote::Metadata metadata_,
const String & remote_fs_path_);
virtual ~WriteIndirectBufferFromRemoteFS() override;
void finalize() override;
void sync() override;
String getFileName() const override { return metadata.metadata_file_path; }
private:
IDiskRemote::Metadata metadata;
String remote_fs_path;
};
}
#endif

View File

@ -8,21 +8,33 @@
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDiskMemory(DiskFactory & factory);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory);
#endif
#if USE_HDFS
void registerDiskHDFS(DiskFactory & factory);
#endif
void registerDisks()
{
auto & factory = DiskFactory::instance();
registerDiskLocal(factory);
registerDiskMemory(factory);
#if USE_AWS_S3
registerDiskS3(factory);
#endif
#if USE_HDFS
registerDiskHDFS(factory);
#endif
}
}

View File

@ -1,14 +1,15 @@
#include <gtest/gtest.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include "gtest_disk.h"
#if !defined(__clang__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wsuggest-override"
#endif
template <typename T>
DB::DiskPtr createDisk();

View File

@ -1,4 +1,5 @@
#pragma once
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <Disks/IDisk.h>

View File

@ -0,0 +1,160 @@
#include <gtest/gtest.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include "gtest_disk.h"
#define RUN_HDFS_TEST 0
#if RUN_HDFS_TEST
#include <Disks/HDFS/DiskHDFS.h>
#include <Poco/Util/XMLConfiguration.h>
const String hdfs_uri = "hdfs://172.20.0.2:9000/disk_test/";
const String metadata_path = "/path/to/metadata/";
const String config_path = "/path/to/config.xml";
const String file_name = "test.txt";
TEST(DiskTestHDFS, RemoveFileHDFS)
{
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
DB::HDFSBuilderWrapper builder = DB::createHDFSBuilder(hdfs_uri, *config);
DB::HDFSFSPtr fs = DB::createHDFSFS(builder.get());
disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);
auto metadata = disk.readMeta(file_name);
const String hdfs_file_name = metadata.remote_fs_objects[0].first;
const String hdfs_file_path = "/disk_test/" + hdfs_file_name;
auto ret = hdfsExists(fs.get(), hdfs_file_path.data());
EXPECT_EQ(0, ret);
disk.removeFile(file_name);
ret = hdfsExists(fs.get(), hdfs_file_path.data());
EXPECT_EQ(-1, ret);
}
TEST(DiskTestHDFS, WriteReadHDFS)
{
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
{
auto out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);
writeString("Test write to file", *out);
}
{
DB::String result;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
readString(result, *in);
EXPECT_EQ("Test write to file", result);
}
disk.removeFileIfExists(file_name);
}
TEST(DiskTestHDFS, RewriteFileHDFS)
{
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
for (size_t i = 1; i <= 10; ++i)
{
std::unique_ptr<DB::WriteBuffer> out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);
writeString("Text" + DB::toString(i), *out);
}
{
String result;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
readString(result, *in);
EXPECT_EQ("Text10", result);
readString(result, *in);
EXPECT_EQ("", result);
}
disk.removeFileIfExists(file_name);
}
TEST(DiskTestHDFS, AppendFileHDFS)
{
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
{
std::unique_ptr<DB::WriteBuffer> out = disk.writeFile(file_name, 1024, DB::WriteMode::Append);
writeString("Text", *out);
for (size_t i = 0; i < 10; ++i)
{
writeIntText(i, *out);
}
}
{
String result, expected;
auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
readString(result, *in);
EXPECT_EQ("Text0123456789", result);
readString(result, *in);
EXPECT_EQ("", result);
}
disk.removeFileIfExists(file_name);
}
TEST(DiskTestHDFS, SeekHDFS)
{
Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path);
auto settings = std::make_unique<DB::DiskHDFSSettings>(1024 * 1024);
auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config);
{
std::unique_ptr<DB::WriteBuffer> out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite);
writeString("test data", *out);
}
/// Test SEEK_SET
{
String buf(4, '0');
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
in->seek(5, SEEK_SET);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
/// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr);
String buf(4, '0');
in->readStrict(buf.data(), 4);
EXPECT_EQ("test", buf);
// Skip whitespace
in->seek(1, SEEK_CUR);
in->readStrict(buf.data(), 4);
EXPECT_EQ("data", buf);
}
disk.removeFileIfExists(file_name);
}
#endif

View File

@ -7,8 +7,7 @@ PEERDIR(
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F S3 | sed 's/^\.\// /' | sort ?>
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -145,6 +145,7 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
hdfsBuilderSetUserName(builder.get(), user.c_str());
}
hdfsBuilderSetNameNode(builder.get(), host.c_str());
if (port != 0)
{

View File

@ -1,13 +1,15 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_HDFS
#include <memory>
#include <type_traits>
#include <vector>
#include <hdfs/hdfs.h>
#include <hdfs/hdfs.h> // Y_IGNORE
#include <common/types.h>
#include <mutex>

View File

@ -13,6 +13,8 @@ namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
@ -29,6 +31,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
off_t offset = 0;
bool initialized = false;
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const std::string & hdfs_file_path_,
@ -48,8 +53,30 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
}
int read(char * start, size_t size) const
~ReadBufferFromHDFSImpl()
{
std::lock_guard lock(hdfs_init_mutex);
hdfsCloseFile(fs.get(), fin);
}
void initialize() const
{
if (!offset)
return;
int seek_status = hdfsSeek(fs.get(), fin, offset);
if (seek_status != 0)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError()));
}
int read(char * start, size_t size)
{
if (!initialized)
{
initialize();
initialized = true;
}
int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0)
throw Exception(ErrorCodes::NETWORK_ERROR,
@ -58,10 +85,25 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
return bytes_read;
}
~ReadBufferFromHDFSImpl()
int seek(off_t offset_, int whence)
{
std::lock_guard lock(hdfs_init_mutex);
hdfsCloseFile(fs.get(), fin);
if (initialized)
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", std::to_string(offset_));
offset = offset_;
return offset;
}
int tell() const
{
return offset;
}
};
@ -73,7 +115,7 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_))
{
}
@ -90,6 +132,18 @@ bool ReadBufferFromHDFS::nextImpl()
return true;
}
off_t ReadBufferFromHDFS::seek(off_t off, int whence)
{
return impl->seek(off, whence);
}
off_t ReadBufferFromHDFS::getPosition()
{
return impl->tell() + count();
}
}
#endif

View File

@ -7,28 +7,34 @@
#include <IO/BufferWithOwnMemory.h>
#include <string>
#include <memory>
#include <hdfs/hdfs.h>
#include <hdfs/hdfs.h> // Y_IGNORE
#include <common/types.h>
#include <Interpreters/Context.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
/** Accepts HDFS path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
class ReadBufferFromHDFS : public BufferWithOwnMemory<SeekableReadBuffer>
{
struct ReadBufferFromHDFSImpl;
public:
ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
const Poco::Util::AbstractConfiguration & config_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~ReadBufferFromHDFS() override;
bool nextImpl() override;
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override;
private:
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
};

View File

@ -64,7 +64,6 @@ public:
struct SourcesInfo
{
std::vector<String> uris;
std::atomic<size_t> next_uri_to_read = 0;
bool need_path_column = false;

View File

@ -27,20 +27,24 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_)
explicit WriteBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const Poco::Util::AbstractConfiguration & config_,
int flags)
: hdfs_uri(hdfs_uri_)
, builder(createHDFSBuilder(hdfs_uri, config_))
, fs(createHDFSFS(builder.get()))
{
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String path = hdfs_uri.substr(begin_of_path);
if (path.find_first_of("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri);
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
if (fout == nullptr)
{
@ -76,9 +80,13 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
}
};
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_)
WriteBufferFromHDFS::WriteBufferFromHDFS(
const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_,
int flags_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_))
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_, flags_))
{
}

View File

@ -8,6 +8,7 @@
#include <string>
#include <memory>
namespace DB
{
/** Accepts HDFS path to file and opens it.
@ -15,11 +16,13 @@ namespace DB
*/
class WriteBufferFromHDFS final : public BufferWithOwnMemory<WriteBuffer>
{
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
public:
WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(
const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
int flags = O_WRONLY);
WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default;
@ -30,6 +33,11 @@ public:
void sync() override;
void finalize() override;
private:
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
};
}
#endif

View File

@ -11,6 +11,10 @@
<disk_memory>
<type>memory</type>
</disk_memory>
<disk_hdfs>
<type>hdfs</type>
<endpoint>http://hdfs1:9000/data/</endpoint>
</disk_hdfs>
</disks>
</storage_configuration>
</yandex>

View File

@ -5,6 +5,7 @@ disk_types = {
"default": "local",
"disk_s3": "s3",
"disk_memory": "memory",
"disk_hdfs": "hdfs",
}
@ -12,7 +13,7 @@ disk_types = {
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=["configs/storage.xml"], with_minio=True)
cluster.add_instance("node", main_configs=["configs/storage.xml"], with_minio=True, with_hdfs=True)
cluster.start()
yield cluster
finally:
@ -35,3 +36,4 @@ def test_select_by_type(cluster):
node = cluster.instances["node"]
for name, disk_type in list(disk_types.items()):
assert node.query("SELECT name FROM system.disks WHERE type='" + disk_type + "'") == name + "\n"

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<yandex>
<storage_configuration>
<disks>
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
</hdfs>
</disks>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,59 @@
import logging
import sys
import pytest
from helpers.cluster import ClickHouseCluster
from pyhdfs import HdfsClient
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/storage_conf.xml", "configs/config.d/log_conf.xml"],
with_hdfs=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
fs = HdfsClient(hosts='localhost')
fs.mkdirs('/clickhouse')
yield cluster
finally:
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path='data/'):
fs = HdfsClient(hosts='localhost')
hdfs_objects = fs.listdir('/clickhouse')
assert objects_count == len(hdfs_objects)
@pytest.mark.parametrize(
"log_engine,files_overhead,files_overhead_per_insert",
[("TinyLog", 1, 1), ("Log", 2, 1), ("StripeLog", 1, 2)])
def test_log_family_hdfs(cluster, log_engine, files_overhead, files_overhead_per_insert):
node = cluster.instances["node"]
node.query("CREATE TABLE hdfs_test (id UInt64) ENGINE={} SETTINGS disk = 'hdfs'".format(log_engine))
node.query("INSERT INTO hdfs_test SELECT number FROM numbers(5)")
assert node.query("SELECT * FROM hdfs_test") == "0\n1\n2\n3\n4\n"
assert_objects_count(cluster, files_overhead_per_insert + files_overhead)
node.query("INSERT INTO hdfs_test SELECT number + 5 FROM numbers(3)")
assert node.query("SELECT * FROM hdfs_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n"
assert_objects_count(cluster, files_overhead_per_insert * 2 + files_overhead)
node.query("INSERT INTO hdfs_test SELECT number + 8 FROM numbers(1)")
assert node.query("SELECT * FROM hdfs_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n"
assert_objects_count(cluster, files_overhead_per_insert * 3 + files_overhead)
node.query("TRUNCATE TABLE hdfs_test")
assert_objects_count(cluster, 0)
node.query("DROP TABLE hdfs_test")

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,30 @@
<yandex>
<storage_configuration>
<disks>
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
</hdfs>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<hdfs>
<volumes>
<main>
<disk>hdfs</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</hdfs>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -0,0 +1,317 @@
import logging
import random
import string
import time
import threading
import os
import pytest
from helpers.cluster import ClickHouseCluster
from pyhdfs import HdfsClient
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/storage_conf.xml')
def create_table(cluster, table_name, additional_settings=None):
node = cluster.instances["node"]
create_table_statement = """
CREATE TABLE {} (
dt Date, id Int64, data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
storage_policy='hdfs',
old_parts_lifetime=0,
index_granularity=512
""".format(table_name)
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
node.query(create_table_statement)
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=["configs/config.d/storage_conf.xml",
"configs/config.d/log_conf.xml"], with_hdfs=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
fs = HdfsClient(hosts='localhost')
fs.mkdirs('/clickhouse')
logging.info("Created HDFS directory")
yield cluster
finally:
cluster.shutdown()
def wait_for_delete_hdfs_objects(cluster, expected, num_tries=30):
fs = HdfsClient(hosts='localhost')
while num_tries > 0:
num_hdfs_objects = len(fs.listdir('/clickhouse'))
if num_hdfs_objects == expected:
break;
num_tries -= 1
time.sleep(1)
assert(len(fs.listdir('/clickhouse')) == expected)
@pytest.fixture(autouse=True)
def drop_table(cluster):
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
hdfs_objects = fs.listdir('/clickhouse')
print('Number of hdfs objects to delete:', len(hdfs_objects), sep=' ')
node.query("DROP TABLE IF EXISTS hdfs_test SYNC")
try:
wait_for_delete_hdfs_objects(cluster, 0)
finally:
hdfs_objects = fs.listdir('/clickhouse')
if len(hdfs_objects) == 0:
return
print("Manually removing extra objects to prevent tests cascade failing: ", hdfs_objects)
for path in hdfs_objects:
fs.delete(path)
@pytest.mark.parametrize("min_rows_for_wide_part,files_per_part", [(0, FILES_OVERHEAD_PER_PART_WIDE), (8192, FILES_OVERHEAD_PER_PART_COMPACT)])
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
create_table(cluster, "hdfs_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
node = cluster.instances["node"]
values1 = generate_values('2020-01-03', 4096)
node.query("INSERT INTO hdfs_test VALUES {}".format(values1))
assert node.query("SELECT * FROM hdfs_test order by dt, id FORMAT Values") == values1
fs = HdfsClient(hosts='localhost')
hdfs_objects = fs.listdir('/clickhouse')
print(hdfs_objects)
assert len(hdfs_objects) == FILES_OVERHEAD + files_per_part
values2 = generate_values('2020-01-04', 4096)
node.query("INSERT INTO hdfs_test VALUES {}".format(values2))
assert node.query("SELECT * FROM hdfs_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + files_per_part * 2
assert node.query("SELECT count(*) FROM hdfs_test where id = 1 FORMAT Values") == "(2)"
def test_alter_table_columns(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
node.query("ALTER TABLE hdfs_test ADD COLUMN col1 UInt64 DEFAULT 1")
# To ensure parts have merged
node.query("OPTIMIZE TABLE hdfs_test")
assert node.query("SELECT sum(col1) FROM hdfs_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(col1) FROM hdfs_test WHERE id > 0 FORMAT Values") == "(4096)"
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN)
node.query("ALTER TABLE hdfs_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2})
assert node.query("SELECT distinct(col1) FROM hdfs_test FORMAT Values") == "('1')"
# and file with mutation
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1)
node.query("ALTER TABLE hdfs_test DROP COLUMN col1", settings={"mutations_sync": 2})
# and 2 files with mutations
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2)
def test_attach_detach_partition(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test DETACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(4096)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test DROP PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(4096)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE hdfs_test DETACH PARTITION '2020-01-04'")
node.query("ALTER TABLE hdfs_test DROP DETACHED PARTITION '2020-01-04'", settings={"allow_drop_detached": 1})
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(0)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD
def test_move_partition_to_another_disk(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-04' TO DISK 'hdfs'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
def test_table_manipulations(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
node.query("RENAME TABLE hdfs_test TO hdfs_renamed")
assert node.query("SELECT count(*) FROM hdfs_renamed FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("RENAME TABLE hdfs_renamed TO hdfs_test")
assert node.query("CHECK TABLE hdfs_test FORMAT Values") == "(1)"
node.query("DETACH TABLE hdfs_test")
node.query("ATTACH TABLE hdfs_test")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("TRUNCATE TABLE hdfs_test")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(0)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD
def test_move_replace_partition_to_another_table(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-06', 4096, -1)))
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
create_table(cluster, "hdfs_clone")
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-03' TO TABLE hdfs_clone")
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-05' TO TABLE hdfs_clone")
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(id) FROM hdfs_clone FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_clone FORMAT Values") == "(8192)"
# Number of objects in HDFS should be unchanged.
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
# Add new partitions to source table, but with different values and replace them from copied table.
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-05', 4096)))
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 6
node.query("ALTER TABLE hdfs_test REPLACE PARTITION '2020-01-03' FROM hdfs_clone")
node.query("ALTER TABLE hdfs_test REPLACE PARTITION '2020-01-05' FROM hdfs_clone")
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
assert node.query("SELECT sum(id) FROM hdfs_clone FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_clone FORMAT Values") == "(8192)"
# Wait for outdated partitions deletion.
print(1)
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4)
node.query("DROP TABLE hdfs_clone NO DELAY")
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
# Data should remain in hdfs
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4