diff --git a/docker/test/integration/runner/Dockerfile b/docker/test/integration/runner/Dockerfile index 783e689ed01..cff49a805fa 100644 --- a/docker/test/integration/runner/Dockerfile +++ b/docker/test/integration/runner/Dockerfile @@ -80,7 +80,8 @@ RUN python3 -m pip install \ redis \ tzlocal \ urllib3 \ - requests-kerberos + requests-kerberos \ + pyhdfs COPY modprobe.sh /usr/local/bin/modprobe COPY dockerd-entrypoint.sh /usr/local/bin/ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a32817928fc..26a68fcbd14 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -101,6 +101,7 @@ endif() if (USE_HDFS) add_headers_and_sources(dbms Storages/HDFS) + add_headers_and_sources(dbms Disks/HDFS) endif() list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h index 4e0ae226af4..a5c23fe2c2c 100644 --- a/src/Disks/DiskType.h +++ b/src/Disks/DiskType.h @@ -11,7 +11,8 @@ struct DiskType { Local, RAM, - S3 + S3, + HDFS }; static String toString(Type disk_type) { @@ -23,10 +24,11 @@ struct DiskType return "memory"; case Type::S3: return "s3"; + case Type::HDFS: + return "hdfs"; } __builtin_unreachable(); } }; } - diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp new file mode 100644 index 00000000000..0648fd9f08c --- /dev/null +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -0,0 +1,194 @@ +#include + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; +} + + +class HDFSPathKeeper : public RemoteFSPathKeeper +{ +public: + using Chunk = std::vector; + using Chunks = std::list; + + explicit HDFSPathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {} + + void addPath(const String & path) override + { + if (chunks.empty() || chunks.back().size() >= chunk_limit) + { + chunks.push_back(Chunks::value_type()); + chunks.back().reserve(chunk_limit); + } + chunks.back().push_back(path.data()); + } + + void removePaths(const std::function & remove_chunk_func) + { + for (auto & chunk : chunks) + remove_chunk_func(std::move(chunk)); + } + +private: + Chunks chunks; +}; + + +/// Reads data from HDFS using stored paths in metadata. +class ReadIndirectBufferFromHDFS final : public ReadIndirectBufferFromRemoteFS +{ +public: + ReadIndirectBufferFromHDFS( + const Poco::Util::AbstractConfiguration & config_, + const String & hdfs_uri_, + DiskHDFS::Metadata metadata_, + size_t buf_size_) + : ReadIndirectBufferFromRemoteFS(metadata_) + , config(config_) + , buf_size(buf_size_) + { + const size_t begin_of_path = hdfs_uri_.find('/', hdfs_uri_.find("//") + 2); + hdfs_directory = hdfs_uri_.substr(begin_of_path); + hdfs_uri = hdfs_uri_.substr(0, begin_of_path); + } + + std::unique_ptr createReadBuffer(const String & path) override + { + return std::make_unique(hdfs_uri, hdfs_directory + path, config, buf_size); + } + +private: + const Poco::Util::AbstractConfiguration & config; + String hdfs_uri; + String hdfs_directory; + size_t buf_size; +}; + + +DiskHDFS::DiskHDFS( + const String & disk_name_, + const String & hdfs_root_path_, + SettingsPtr settings_, + const String & metadata_path_, + const Poco::Util::AbstractConfiguration & config_) + : IDiskRemote(disk_name_, hdfs_root_path_, metadata_path_, "DiskHDFS", settings_->thread_pool_size) + , config(config_) + , hdfs_builder(createHDFSBuilder(hdfs_root_path_, config)) + , hdfs_fs(createHDFSFS(hdfs_builder.get())) + , settings(std::move(settings_)) +{ +} + + +std::unique_ptr DiskHDFS::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const +{ + auto metadata = readMeta(path); + + LOG_DEBUG(log, + "Read from file by path: {}. Existing HDFS objects: {}", + backQuote(metadata_path + path), metadata.remote_fs_objects.size()); + + auto reader = std::make_unique(config, remote_fs_root_path, metadata, buf_size); + return std::make_unique(std::move(reader), settings->min_bytes_for_seek); +} + + +std::unique_ptr DiskHDFS::writeFile(const String & path, size_t buf_size, WriteMode mode) +{ + auto metadata = readOrCreateMetaForWriting(path, mode); + + /// Path to store new HDFS object. + auto file_name = getRandomName(); + auto hdfs_path = remote_fs_root_path + file_name; + + LOG_DEBUG(log, "{} to file by path: {}. HDFS path: {}", mode == WriteMode::Rewrite ? "Write" : "Append", + backQuote(metadata_path + path), remote_fs_root_path + hdfs_path); + + /// Single O_WRONLY in libhdfs adds O_TRUNC + auto hdfs_buffer = std::make_unique(hdfs_path, + config, buf_size, + mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND); + + return std::make_unique>(std::move(hdfs_buffer), + std::move(metadata), + file_name); +} + + +RemoteFSPathKeeperPtr DiskHDFS::createFSPathKeeper() const +{ + return std::make_shared(settings->objects_chunk_size_to_delete); +} + + +void DiskHDFS::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) +{ + auto * hdfs_paths_keeper = dynamic_cast(fs_paths_keeper.get()); + if (hdfs_paths_keeper) + hdfs_paths_keeper->removePaths([&](std::vector && chunk) + { + for (const auto & hdfs_object_path : chunk) + { + const String & hdfs_path = hdfs_object_path; + const size_t begin_of_path = hdfs_path.find('/', hdfs_path.find("//") + 2); + + /// Add path from root to file name + int res = hdfsDelete(hdfs_fs.get(), hdfs_path.substr(begin_of_path).c_str(), 0); + if (res == -1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "HDFSDelete failed with path: " + hdfs_path); + } + }); +} + + +namespace +{ +std::unique_ptr getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix) +{ + return std::make_unique( + config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024), + config.getInt(config_prefix + ".thread_pool_size", 16), + config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000)); +} +} + +void registerDiskHDFS(DiskFactory & factory) +{ + auto creator = [](const String & name, + const Poco::Util::AbstractConfiguration & config, + const String & config_prefix, + ContextConstPtr context_) -> DiskPtr + { + Poco::File disk{context_->getPath() + "disks/" + name}; + disk.createDirectories(); + + String uri{config.getString(config_prefix + ".endpoint")}; + + if (uri.back() != '/') + throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must ends with '/', but '{}' doesn't.", uri); + + String metadata_path = context_->getPath() + "disks/" + name + "/"; + + return std::make_shared( + name, uri, + getSettings(config, config_prefix), + metadata_path, config); + }; + + factory.registerDiskType("hdfs", creator); +} + +} diff --git a/src/Disks/HDFS/DiskHDFS.h b/src/Disks/HDFS/DiskHDFS.h new file mode 100644 index 00000000000..49fdf44728b --- /dev/null +++ b/src/Disks/HDFS/DiskHDFS.h @@ -0,0 +1,72 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +struct DiskHDFSSettings +{ + size_t min_bytes_for_seek; + int thread_pool_size; + int objects_chunk_size_to_delete; + + DiskHDFSSettings( + int min_bytes_for_seek_, + int thread_pool_size_, + int objects_chunk_size_to_delete_) + : min_bytes_for_seek(min_bytes_for_seek_) + , thread_pool_size(thread_pool_size_) + , objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {} +}; + + +/** + * Storage for persisting data in HDFS and metadata on the local disk. + * Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file) + * that contains HDFS object key with actual data. + */ +class DiskHDFS final : public IDiskRemote +{ +public: + using SettingsPtr = std::unique_ptr; + + DiskHDFS( + const String & disk_name_, + const String & hdfs_root_path_, + SettingsPtr settings_, + const String & metadata_path_, + const Poco::Util::AbstractConfiguration & config_); + + DiskType::Type getType() const override { return DiskType::Type::HDFS; } + + std::unique_ptr readFile( + const String & path, + size_t buf_size, + size_t estimated_size, + size_t aio_threshold, + size_t mmap_threshold, + MMappedFileCache * mmap_cache) const override; + + std::unique_ptr writeFile(const String & path, size_t buf_size, WriteMode mode) override; + + void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) override; + + RemoteFSPathKeeperPtr createFSPathKeeper() const override; + +private: + String getRandomName() { return toString(UUIDHelpers::generateV4()); } + + const Poco::Util::AbstractConfiguration & config; + + HDFSBuilderWrapper hdfs_builder; + HDFSFSPtr hdfs_fs; + + SettingsPtr settings; +}; + +} diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 0b4e5779ea9..a42a60959c5 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -7,6 +7,7 @@ #include #include #include +#include "Disks/Executor.h" #include #include @@ -178,17 +179,17 @@ public: virtual void removeRecursive(const String & path) = 0; /// Remove file. Throws exception if file doesn't exists or if directory is not empty. - /// Differs from removeFile for S3 disks + /// Differs from removeFile for S3/HDFS disks /// Second bool param is a flag to remove (true) or keep (false) shared data on S3 virtual void removeSharedFile(const String & path, bool) { removeFile(path); } /// Remove file or directory with all children. Use with extra caution. Throws exception if file doesn't exists. - /// Differs from removeRecursive for S3 disks + /// Differs from removeRecursive for S3/HDFS disks /// Second bool param is a flag to remove (true) or keep (false) shared data on S3 virtual void removeSharedRecursive(const String & path, bool) { removeRecursive(path); } /// Remove file or directory if it exists. - /// Differs from removeFileIfExists for S3 disks + /// Differs from removeFileIfExists for S3/HDFS disks /// Second bool param is a flag to remove (true) or keep (false) shared data on S3 virtual void removeSharedFileIfExists(const String & path, bool) { removeFileIfExists(path); } diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp new file mode 100644 index 00000000000..bcb399f5d07 --- /dev/null +++ b/src/Disks/IDiskRemote.cpp @@ -0,0 +1,487 @@ +#include + +#include "Disks/DiskFactory.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_DISK_INDEX; + extern const int UNKNOWN_FORMAT; + extern const int FILE_ALREADY_EXISTS; + extern const int PATH_ACCESS_DENIED;; + extern const int CANNOT_DELETE_DIRECTORY; +} + + +/// Load metadata by path or create empty if `create` flag is set. +IDiskRemote::Metadata::Metadata( + const String & remote_fs_root_path_, + const String & disk_path_, + const String & metadata_file_path_, + bool create) + : remote_fs_root_path(remote_fs_root_path_) + , disk_path(disk_path_) + , metadata_file_path(metadata_file_path_) + , total_size(0), remote_fs_objects(0), ref_count(0) +{ + if (create) + return; + + try + { + ReadBufferFromFile buf(disk_path + metadata_file_path, 1024); /* reasonable buffer size for small file */ + + UInt32 version; + readIntText(version, buf); + + if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG) + throw Exception( + ErrorCodes::UNKNOWN_FORMAT, + "Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}", + disk_path + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG)); + + assertChar('\n', buf); + + UInt32 remote_fs_objects_count; + readIntText(remote_fs_objects_count, buf); + assertChar('\t', buf); + readIntText(total_size, buf); + assertChar('\n', buf); + remote_fs_objects.resize(remote_fs_objects_count); + + for (size_t i = 0; i < remote_fs_objects_count; ++i) + { + String remote_fs_object_path; + size_t remote_fs_object_size; + readIntText(remote_fs_object_size, buf); + assertChar('\t', buf); + readEscapedString(remote_fs_object_path, buf); + if (version == VERSION_ABSOLUTE_PATHS) + { + if (!boost::algorithm::starts_with(remote_fs_object_path, remote_fs_root_path)) + throw Exception( + ErrorCodes::UNKNOWN_FORMAT, + "Path in metadata does not correspond S3 root path. Path: {}, root path: {}, disk path: {}", + remote_fs_object_path, remote_fs_root_path, disk_path_); + + remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size()); + } + assertChar('\n', buf); + remote_fs_objects[i] = {remote_fs_object_path, remote_fs_object_size}; + } + + readIntText(ref_count, buf); + assertChar('\n', buf); + + if (version >= VERSION_READ_ONLY_FLAG) + { + readBoolText(read_only, buf); + assertChar('\n', buf); + } + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_FORMAT) + throw; + + throw Exception("Failed to read metadata file", e, ErrorCodes::UNKNOWN_FORMAT); + } +} + +void IDiskRemote::Metadata::addObject(const String & path, size_t size) +{ + total_size += size; + remote_fs_objects.emplace_back(path, size); +} + +/// Fsync metadata file if 'sync' flag is set. +void IDiskRemote::Metadata::save(bool sync) +{ + WriteBufferFromFile buf(disk_path + metadata_file_path, 1024); + + writeIntText(VERSION_RELATIVE_PATHS, buf); + writeChar('\n', buf); + + writeIntText(remote_fs_objects.size(), buf); + writeChar('\t', buf); + writeIntText(total_size, buf); + writeChar('\n', buf); + + for (const auto & [remote_fs_object_path, remote_fs_object_size] : remote_fs_objects) + { + writeIntText(remote_fs_object_size, buf); + writeChar('\t', buf); + writeEscapedString(remote_fs_object_path, buf); + writeChar('\n', buf); + } + + writeIntText(ref_count, buf); + writeChar('\n', buf); + + writeBoolText(read_only, buf); + writeChar('\n', buf); + + buf.finalize(); + if (sync) + buf.sync(); +} + +IDiskRemote::Metadata IDiskRemote::readOrCreateMetaForWriting(const String & path, WriteMode mode) +{ + bool exist = exists(path); + if (exist) + { + auto metadata = readMeta(path); + if (metadata.read_only) + throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED); + + if (mode == WriteMode::Rewrite) + removeFile(path); /// Remove for re-write. + else + return metadata; + } + + auto metadata = createMeta(path); + /// Save empty metadata to disk to have ability to get file size while buffer is not finalized. + metadata.save(); + + return metadata; +} + + +IDiskRemote::Metadata IDiskRemote::readMeta(const String & path) const +{ + return Metadata(remote_fs_root_path, metadata_path, path); +} + + +IDiskRemote::Metadata IDiskRemote::createMeta(const String & path) const +{ + return Metadata(remote_fs_root_path, metadata_path, path, true); +} + + +void IDiskRemote::removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper) +{ + LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path)); + + Poco::File file(metadata_path + path); + + if (!file.isFile()) + throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path); + + try + { + auto metadata = readMeta(path); + + /// If there is no references - delete content from remote FS. + if (metadata.ref_count == 0) + { + file.remove(); + for (const auto & [remote_fs_object_path, _] : metadata.remote_fs_objects) + fs_paths_keeper->addPath(remote_fs_root_path + remote_fs_object_path); + } + else /// In other case decrement number of references, save metadata and delete file. + { + --metadata.ref_count; + metadata.save(); + file.remove(); + } + } + catch (const Exception & e) + { + /// If it's impossible to read meta - just remove it from FS. + if (e.code() == ErrorCodes::UNKNOWN_FORMAT) + { + LOG_WARNING(log, + "Metadata file {} can't be read by reason: {}. Removing it forcibly.", + backQuote(path), e.nested() ? e.nested()->message() : e.message()); + file.remove(); + } + else + throw; + } +} + + +void IDiskRemote::removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper) +{ + checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. + + Poco::File file(metadata_path + path); + if (file.isFile()) + { + removeMeta(path, fs_paths_keeper); + } + else + { + for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) + removeMetaRecursive(it->path(), fs_paths_keeper); + file.remove(); + } +} + +DiskPtr DiskRemoteReservation::getDisk(size_t i) const +{ + if (i != 0) + throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX); + return disk; +} + + +void DiskRemoteReservation::update(UInt64 new_size) +{ + std::lock_guard lock(disk->reservation_mutex); + disk->reserved_bytes -= size; + size = new_size; + disk->reserved_bytes += size; +} + + +DiskRemoteReservation::~DiskRemoteReservation() +{ + try + { + std::lock_guard lock(disk->reservation_mutex); + if (disk->reserved_bytes < size) + { + disk->reserved_bytes = 0; + LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName()); + } + else + { + disk->reserved_bytes -= size; + } + + if (disk->reservation_count == 0) + LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName()); + else + --disk->reservation_count; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +IDiskRemote::IDiskRemote( + const String & name_, + const String & remote_fs_root_path_, + const String & metadata_path_, + const String & log_name_, + size_t thread_pool_size) + : IDisk(std::make_unique(log_name_, thread_pool_size)) + , log(&Poco::Logger::get(log_name_)) + , name(name_) + , remote_fs_root_path(remote_fs_root_path_) + , metadata_path(metadata_path_) +{ +} + + +bool IDiskRemote::exists(const String & path) const +{ + return Poco::File(metadata_path + path).exists(); +} + + +bool IDiskRemote::isFile(const String & path) const +{ + return Poco::File(metadata_path + path).isFile(); +} + + +void IDiskRemote::createFile(const String & path) +{ + /// Create empty metadata file. + auto metadata = createMeta(path); + metadata.save(); +} + + +size_t IDiskRemote::getFileSize(const String & path) const +{ + auto metadata = readMeta(path); + return metadata.total_size; +} + + +void IDiskRemote::moveFile(const String & from_path, const String & to_path) +{ + if (exists(to_path)) + throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS); + + Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path); +} + + +void IDiskRemote::replaceFile(const String & from_path, const String & to_path) +{ + if (exists(to_path)) + { + const String tmp_path = to_path + ".old"; + moveFile(to_path, tmp_path); + moveFile(from_path, to_path); + removeFile(tmp_path); + } + else + moveFile(from_path, to_path); +} + + +void IDiskRemote::removeFileIfExists(const String & path) +{ + RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); + if (Poco::File(metadata_path + path).exists()) + { + removeMeta(path, fs_paths_keeper); + removeFromRemoteFS(fs_paths_keeper); + } +} + + +void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs) +{ + RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); + removeMeta(path, fs_paths_keeper); + if (!keep_in_remote_fs) + removeFromRemoteFS(fs_paths_keeper); +} + + +void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs) +{ + RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); + removeMetaRecursive(path, fs_paths_keeper); + if (!keep_in_remote_fs) + removeFromRemoteFS(fs_paths_keeper); +} + + +void IDiskRemote::setReadOnly(const String & path) +{ + /// We should store read only flag inside metadata file (instead of using FS flag), + /// because we modify metadata file when create hard-links from it. + auto metadata = readMeta(path); + metadata.read_only = true; + metadata.save(); +} + + +bool IDiskRemote::isDirectory(const String & path) const +{ + return Poco::File(metadata_path + path).isDirectory(); +} + + +void IDiskRemote::createDirectory(const String & path) +{ + Poco::File(metadata_path + path).createDirectory(); +} + + +void IDiskRemote::createDirectories(const String & path) +{ + Poco::File(metadata_path + path).createDirectories(); +} + + +void IDiskRemote::clearDirectory(const String & path) +{ + for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) + if (isFile(it->path())) + removeFile(it->path()); +} + + +void IDiskRemote::removeDirectory(const String & path) +{ + Poco::File(metadata_path + path).remove(); +} + + +DiskDirectoryIteratorPtr IDiskRemote::iterateDirectory(const String & path) +{ + return std::make_unique(metadata_path + path, path); +} + + +void IDiskRemote::listFiles(const String & path, std::vector & file_names) +{ + for (auto it = iterateDirectory(path); it->isValid(); it->next()) + file_names.push_back(it->name()); +} + + +void IDiskRemote::setLastModified(const String & path, const Poco::Timestamp & timestamp) +{ + Poco::File(metadata_path + path).setLastModified(timestamp); +} + + +Poco::Timestamp IDiskRemote::getLastModified(const String & path) +{ + return Poco::File(metadata_path + path).getLastModified(); +} + + +void IDiskRemote::createHardLink(const String & src_path, const String & dst_path) +{ + /// Increment number of references. + auto src = readMeta(src_path); + ++src.ref_count; + src.save(); + + /// Create FS hardlink to metadata file. + DB::createHardLink(metadata_path + src_path, metadata_path + dst_path); +} + + +ReservationPtr IDiskRemote::reserve(UInt64 bytes) +{ + if (!tryReserve(bytes)) + return {}; + + return std::make_unique(std::static_pointer_cast(shared_from_this()), bytes); +} + + +bool IDiskRemote::tryReserve(UInt64 bytes) +{ + std::lock_guard lock(reservation_mutex); + if (bytes == 0) + { + LOG_DEBUG(log, "Reserving 0 bytes on remote_fs disk {}", backQuote(name)); + ++reservation_count; + return true; + } + + auto available_space = getAvailableSpace(); + UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); + if (unreserved_space >= bytes) + { + LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.", + ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); + ++reservation_count; + reserved_bytes += bytes; + return true; + } + return false; +} + +} diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h new file mode 100644 index 00000000000..b32258331a7 --- /dev/null +++ b/src/Disks/IDiskRemote.h @@ -0,0 +1,283 @@ +#pragma once +#include + +#include +#include "Disks/DiskFactory.h" +#include "Disks/Executor.h" +#include +#include +#include +#include + + +namespace DB +{ + +/// Helper class to collect paths into chunks of maximum size. +/// For s3 it is Aws::vector, for hdfs it is std::vector. +class RemoteFSPathKeeper +{ +public: + RemoteFSPathKeeper(size_t chunk_limit_) : chunk_limit(chunk_limit_) {} + + virtual ~RemoteFSPathKeeper() = default; + + virtual void addPath(const String & path) = 0; + +protected: + size_t chunk_limit; +}; + +using RemoteFSPathKeeperPtr = std::shared_ptr; + + +/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS) +class IDiskRemote : public IDisk +{ + +friend class DiskRemoteReservation; + +public: + IDiskRemote( + const String & name_, + const String & remote_fs_root_path_, + const String & metadata_path_, + const String & log_name_, + size_t thread_pool_size); + + struct Metadata; + + const String & getName() const final override { return name; } + + const String & getPath() const final override { return metadata_path; } + + Metadata readMeta(const String & path) const; + + Metadata createMeta(const String & path) const; + + Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode); + + UInt64 getTotalSpace() const override { return std::numeric_limits::max(); } + + UInt64 getAvailableSpace() const override { return std::numeric_limits::max(); } + + UInt64 getUnreservedSpace() const override { return std::numeric_limits::max(); } + + UInt64 getKeepingFreeSpace() const override { return 0; } + + bool exists(const String & path) const override; + + bool isFile(const String & path) const override; + + void createFile(const String & path) override; + + size_t getFileSize(const String & path) const override; + + void moveFile(const String & from_path, const String & to_path) override; + + void replaceFile(const String & from_path, const String & to_path) override; + + void removeFile(const String & path) override { removeSharedFile(path, false); } + + void removeFileIfExists(const String & path) override; + + void removeRecursive(const String & path) override { removeSharedRecursive(path, false); } + + void removeSharedFile(const String & path, bool keep_in_remote_fs) override; + + void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override; + + void listFiles(const String & path, std::vector & file_names) override; + + void setReadOnly(const String & path) override; + + bool isDirectory(const String & path) const override; + + void createDirectory(const String & path) override; + + void createDirectories(const String & path) override; + + void clearDirectory(const String & path) override; + + void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); } + + void removeDirectory(const String & path) override; + + DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + + void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; + + Poco::Timestamp getLastModified(const String & path) override; + + void createHardLink(const String & src_path, const String & dst_path) override; + + ReservationPtr reserve(UInt64 bytes) override; + + virtual void removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) = 0; + + virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0; + +protected: + Poco::Logger * log; + const String name; + const String remote_fs_root_path; + + const String metadata_path; + +private: + void removeMeta(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper); + + void removeMetaRecursive(const String & path, RemoteFSPathKeeperPtr fs_paths_keeper); + + bool tryReserve(UInt64 bytes); + + UInt64 reserved_bytes = 0; + UInt64 reservation_count = 0; + std::mutex reservation_mutex; +}; + +using RemoteDiskPtr = std::shared_ptr; + +/// Remote FS (S3, HDFS) metadata file layout: +/// Number of FS objects, Total size of all FS objects. +/// Each FS object represents path where object located in FS and size of object. + +struct IDiskRemote::Metadata +{ + /// Metadata file version. + static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1; + static constexpr UInt32 VERSION_RELATIVE_PATHS = 2; + static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3; + + using PathAndSize = std::pair; + + /// Remote FS (S3, HDFS) root path. + const String & remote_fs_root_path; + + /// Disk path. + const String & disk_path; + + /// Relative path to metadata file on local FS. + String metadata_file_path; + + /// Total size of all remote FS (S3, HDFS) objects. + size_t total_size = 0; + + /// Remote FS (S3, HDFS) objects paths and their sizes. + std::vector remote_fs_objects; + + /// Number of references (hardlinks) to this metadata file. + UInt32 ref_count = 0; + + /// Flag indicates that file is read only. + bool read_only = false; + + /// Load metadata by path or create empty if `create` flag is set. + Metadata(const String & remote_fs_root_path_, + const String & disk_path_, + const String & metadata_file_path_, + bool create = false); + + void addObject(const String & path, size_t size); + + /// Fsync metadata file if 'sync' flag is set. + void save(bool sync = false); + +}; + + +class RemoteDiskDirectoryIterator final : public IDiskDirectoryIterator +{ +public: + RemoteDiskDirectoryIterator(const String & full_path, const String & folder_path_) : iter(full_path), folder_path(folder_path_) {} + + void next() override { ++iter; } + + bool isValid() const override { return iter != Poco::DirectoryIterator(); } + + String path() const override + { + if (iter->isDirectory()) + return folder_path + iter.name() + '/'; + else + return folder_path + iter.name(); + } + + String name() const override { return iter.name(); } + +private: + Poco::DirectoryIterator iter; + String folder_path; +}; + + +class DiskRemoteReservation final : public IReservation +{ +public: + DiskRemoteReservation(const RemoteDiskPtr & disk_, UInt64 size_) + : disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_) + { + } + + UInt64 getSize() const override { return size; } + + DiskPtr getDisk(size_t i) const override; + + Disks getDisks() const override { return {disk}; } + + void update(UInt64 new_size) override; + + ~DiskRemoteReservation() override; + +private: + RemoteDiskPtr disk; + UInt64 size; + CurrentMetrics::Increment metric_increment; +}; + + +/// Runs tasks asynchronously using thread pool. +class AsyncExecutor : public Executor +{ +public: + explicit AsyncExecutor(const String & name_, int thread_pool_size) + : name(name_) + , pool(ThreadPool(thread_pool_size)) {} + + std::future execute(std::function task) override + { + auto promise = std::make_shared>(); + pool.scheduleOrThrowOnError( + [promise, task]() + { + try + { + task(); + promise->set_value(); + } + catch (...) + { + tryLogCurrentException("Failed to run async task"); + + try + { + promise->set_exception(std::current_exception()); + } + catch (...) {} + } + }); + + return promise->get_future(); + } + + void setMaxThreads(size_t threads) + { + pool.setMaxThreads(threads); + } + +private: + String name; + ThreadPool pool; +}; + +} diff --git a/src/Disks/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/ReadIndirectBufferFromRemoteFS.cpp new file mode 100644 index 00000000000..955986e5259 --- /dev/null +++ b/src/Disks/ReadIndirectBufferFromRemoteFS.cpp @@ -0,0 +1,128 @@ +#include "ReadIndirectBufferFromRemoteFS.h" + +#if USE_AWS_S3 || USE_HDFS +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_SEEK_THROUGH_FILE; +} + + +template +ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( + IDiskRemote::Metadata metadata_) + : metadata(std::move(metadata_)) +{ +} + +template +off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) +{ + if (whence == SEEK_CUR) + { + /// If position within current working buffer - shift pos. + if (!working_buffer.empty() && size_t(getPosition() + offset_) < absolute_position) + { + pos += offset_; + return getPosition(); + } + else + { + absolute_position += offset_; + } + } + else if (whence == SEEK_SET) + { + /// If position within current working buffer - shift pos. + if (!working_buffer.empty() && size_t(offset_) >= absolute_position - working_buffer.size() + && size_t(offset_) < absolute_position) + { + pos = working_buffer.end() - (absolute_position - offset_); + return getPosition(); + } + else + { + absolute_position = offset_; + } + } + else + throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + current_buf = initialize(); + pos = working_buffer.end(); + + return absolute_position; +} + + +template +std::unique_ptr ReadIndirectBufferFromRemoteFS::initialize() +{ + size_t offset = absolute_position; + for (size_t i = 0; i < metadata.remote_fs_objects.size(); ++i) + { + current_buf_idx = i; + const auto & [file_path, size] = metadata.remote_fs_objects[i]; + if (size > offset) + { + auto buf = createReadBuffer(file_path); + buf->seek(offset, SEEK_SET); + return buf; + } + offset -= size; + } + return nullptr; +} + + +template +bool ReadIndirectBufferFromRemoteFS::nextImpl() +{ + /// Find first available buffer that fits to given offset. + if (!current_buf) + current_buf = initialize(); + + /// If current buffer has remaining data - use it. + if (current_buf && current_buf->next()) + { + working_buffer = current_buf->buffer(); + absolute_position += working_buffer.size(); + return true; + } + + /// If there is no available buffers - nothing to read. + if (current_buf_idx + 1 >= metadata.remote_fs_objects.size()) + return false; + + ++current_buf_idx; + const auto & path = metadata.remote_fs_objects[current_buf_idx].first; + + current_buf = createReadBuffer(path); + current_buf->next(); + + working_buffer = current_buf->buffer(); + absolute_position += working_buffer.size(); + + return true; +} + + +#if USE_AWS_S3 +template +class ReadIndirectBufferFromRemoteFS; +#endif + +#if USE_HDFS +template +class ReadIndirectBufferFromRemoteFS; +#endif + +} + +#endif diff --git a/src/Disks/ReadIndirectBufferFromRemoteFS.h b/src/Disks/ReadIndirectBufferFromRemoteFS.h new file mode 100644 index 00000000000..f80406b5354 --- /dev/null +++ b/src/Disks/ReadIndirectBufferFromRemoteFS.h @@ -0,0 +1,46 @@ +#pragma once +#include + +#if USE_AWS_S3 || USE_HDFS + +#include +#include +#include + + +namespace DB +{ + +/// Reads data from S3/HDFS using stored paths in metadata. +template +class ReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase +{ +public: + ReadIndirectBufferFromRemoteFS(IDiskRemote::Metadata metadata_); + + off_t seek(off_t offset_, int whence) override; + + off_t getPosition() override { return absolute_position - available(); } + + String getFileName() const override { return metadata.metadata_file_path; } + + virtual std::unique_ptr createReadBuffer(const String & path) = 0; + +protected: + IDiskRemote::Metadata metadata; + +private: + std::unique_ptr initialize(); + + bool nextImpl() override; + + size_t absolute_position = 0; + + size_t current_buf_idx = 0; + + std::unique_ptr current_buf; +}; + +} + +#endif diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 1a347394012..a3f5fe89870 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -4,33 +4,28 @@ #include #include -#include #include #include #include -#include #include +#include +#include #include #include -#include -#include #include #include #include -#include #include #include #include -#include - +#include +#include #include #include #include #include #include -#include - namespace DB { @@ -39,55 +34,57 @@ namespace ErrorCodes { extern const int S3_ERROR; extern const int FILE_ALREADY_EXISTS; - extern const int CANNOT_SEEK_THROUGH_FILE; extern const int UNKNOWN_FORMAT; - extern const int INCORRECT_DISK_INDEX; extern const int BAD_ARGUMENTS; - extern const int PATH_ACCESS_DENIED; - extern const int CANNOT_DELETE_DIRECTORY; extern const int LOGICAL_ERROR; } - /// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API) -class DiskS3::AwsS3KeyKeeper : public std::list> +/// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html +class S3PathKeeper : public RemoteFSPathKeeper { public: - void addKey(const String & key); - static String getChunkKeys(const Aws::Vector & chunk); + using Chunk = Aws::Vector; + using Chunks = std::list; + + explicit S3PathKeeper(size_t chunk_limit_) : RemoteFSPathKeeper(chunk_limit_) {} + + void addPath(const String & path) override + { + if (chunks.empty() || chunks.back().size() >= chunk_limit) + { + /// add one more chunk + chunks.push_back(Chunks::value_type()); + chunks.back().reserve(chunk_limit); + } + Aws::S3::Model::ObjectIdentifier obj; + obj.SetKey(path); + chunks.back().push_back(obj); + } + + void removePaths(const std::function & remove_chunk_func) + { + for (auto & chunk : chunks) + remove_chunk_func(std::move(chunk)); + } + + static String getChunkKeys(const Chunk & chunk) + { + String res; + for (const auto & obj : chunk) + { + const auto & key = obj.GetKey(); + if (!res.empty()) + res.append(", "); + res.append(key.c_str(), key.size()); + } + return res; + } private: - /// limit for one DeleteObject request - /// see https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html - const static size_t chunk_limit = 1000; + Chunks chunks; }; -void DiskS3::AwsS3KeyKeeper::addKey(const String & key) -{ - if (empty() || back().size() >= chunk_limit) - { /// add one more chunk - push_back(value_type()); - back().reserve(chunk_limit); - } - - Aws::S3::Model::ObjectIdentifier obj; - obj.SetKey(key); - back().push_back(obj); -} - -String DiskS3::AwsS3KeyKeeper::getChunkKeys(const Aws::Vector & chunk) -{ - String res; - for (const auto & obj : chunk) - { - const auto & key = obj.GetKey(); - if (!res.empty()) - res.append(", "); - res.append(key.c_str(), key.size()); - } - return res; -} - String getRandomName() { std::uniform_int_distribution distribution('a', 'z'); @@ -117,174 +114,8 @@ void throwIfError(const Aws::Utils::Outcome & response) } } -/** - * S3 metadata file layout: - * Number of S3 objects, Total size of all S3 objects. - * Each S3 object represents path where object located in S3 and size of object. - */ -struct DiskS3::Metadata -{ - /// Metadata file version. - static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1; - static constexpr UInt32 VERSION_RELATIVE_PATHS = 2; - static constexpr UInt32 VERSION_READ_ONLY_FLAG = 3; - - using PathAndSize = std::pair; - - /// S3 root path. - const String & s3_root_path; - - /// Disk path. - const String & disk_path; - /// Relative path to metadata file on local FS. - String metadata_file_path; - /// Total size of all S3 objects. - size_t total_size; - /// S3 objects paths and their sizes. - std::vector s3_objects; - /// Number of references (hardlinks) to this metadata file. - UInt32 ref_count; - /// Flag indicates that file is read only. - bool read_only = false; - - /// Load metadata by path or create empty if `create` flag is set. - explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false) - : s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0) - { - if (create) - return; - - try - { - ReadBufferFromFile buf(disk_path + metadata_file_path, 1024); /* reasonable buffer size for small file */ - - UInt32 version; - readIntText(version, buf); - - if (version < VERSION_ABSOLUTE_PATHS || version > VERSION_READ_ONLY_FLAG) - throw Exception( - "Unknown metadata file version. Path: " + disk_path + metadata_file_path - + " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_READ_ONLY_FLAG), - ErrorCodes::UNKNOWN_FORMAT); - - assertChar('\n', buf); - - UInt32 s3_objects_count; - readIntText(s3_objects_count, buf); - assertChar('\t', buf); - readIntText(total_size, buf); - assertChar('\n', buf); - s3_objects.resize(s3_objects_count); - for (UInt32 i = 0; i < s3_objects_count; ++i) - { - String s3_object_path; - size_t s3_object_size; - readIntText(s3_object_size, buf); - assertChar('\t', buf); - readEscapedString(s3_object_path, buf); - if (version == VERSION_ABSOLUTE_PATHS) - { - if (!boost::algorithm::starts_with(s3_object_path, s3_root_path)) - throw Exception( - "Path in metadata does not correspond S3 root path. Path: " + s3_object_path - + ", root path: " + s3_root_path + ", disk path: " + disk_path_, - ErrorCodes::UNKNOWN_FORMAT); - s3_object_path = s3_object_path.substr(s3_root_path.size()); - } - assertChar('\n', buf); - s3_objects[i] = {s3_object_path, s3_object_size}; - } - - readIntText(ref_count, buf); - assertChar('\n', buf); - - if (version >= VERSION_READ_ONLY_FLAG) - { - readBoolText(read_only, buf); - assertChar('\n', buf); - } - } - catch (Exception & e) - { - if (e.code() == ErrorCodes::UNKNOWN_FORMAT) - throw; - - throw Exception("Failed to read metadata file", e, ErrorCodes::UNKNOWN_FORMAT); - } - } - - void addObject(const String & path, size_t size) - { - total_size += size; - s3_objects.emplace_back(path, size); - } - - /// Fsync metadata file if 'sync' flag is set. - void save(bool sync = false) - { - WriteBufferFromFile buf(disk_path + metadata_file_path, 1024); - - writeIntText(VERSION_RELATIVE_PATHS, buf); - writeChar('\n', buf); - - writeIntText(s3_objects.size(), buf); - writeChar('\t', buf); - writeIntText(total_size, buf); - writeChar('\n', buf); - for (const auto & [s3_object_path, s3_object_size] : s3_objects) - { - writeIntText(s3_object_size, buf); - writeChar('\t', buf); - writeEscapedString(s3_object_path, buf); - writeChar('\n', buf); - } - - writeIntText(ref_count, buf); - writeChar('\n', buf); - - writeBoolText(read_only, buf); - writeChar('\n', buf); - - buf.finalize(); - if (sync) - buf.sync(); - } -}; - -DiskS3::Metadata DiskS3::readOrCreateMetaForWriting(const String & path, WriteMode mode) -{ - bool exist = exists(path); - if (exist) - { - auto metadata = readMeta(path); - if (metadata.read_only) - throw Exception("File is read-only: " + path, ErrorCodes::PATH_ACCESS_DENIED); - - if (mode == WriteMode::Rewrite) - removeFile(path); /// Remove for re-write. - else - return metadata; - } - - auto metadata = createMeta(path); - /// Save empty metadata to disk to have ability to get file size while buffer is not finalized. - metadata.save(); - - return metadata; -} - -DiskS3::Metadata DiskS3::readMeta(const String & path) const -{ - return Metadata(s3_root_path, metadata_path, path); -} - -DiskS3::Metadata DiskS3::createMeta(const String & path) const -{ - return Metadata(s3_root_path, metadata_path, path, true); -} - /// Reads data from S3 using stored paths in metadata. -class ReadIndirectBufferFromS3 final : public ReadBufferFromFileBase +class ReadIndirectBufferFromS3 final : public ReadIndirectBufferFromRemoteFS { public: ReadIndirectBufferFromS3( @@ -293,291 +124,26 @@ public: DiskS3::Metadata metadata_, size_t s3_max_single_read_retries_, size_t buf_size_) - : client_ptr(std::move(client_ptr_)) + : ReadIndirectBufferFromRemoteFS(metadata_) + , client_ptr(std::move(client_ptr_)) , bucket(bucket_) - , metadata(std::move(metadata_)) , s3_max_single_read_retries(s3_max_single_read_retries_) , buf_size(buf_size_) { } - off_t seek(off_t offset_, int whence) override + std::unique_ptr createReadBuffer(const String & path) override { - if (whence == SEEK_CUR) - { - /// If position within current working buffer - shift pos. - if (!working_buffer.empty() && size_t(getPosition() + offset_) < absolute_position) - { - pos += offset_; - return getPosition(); - } - else - { - absolute_position += offset_; - } - } - else if (whence == SEEK_SET) - { - /// If position within current working buffer - shift pos. - if (!working_buffer.empty() && size_t(offset_) >= absolute_position - working_buffer.size() - && size_t(offset_) < absolute_position) - { - pos = working_buffer.end() - (absolute_position - offset_); - return getPosition(); - } - else - { - absolute_position = offset_; - } - } - else - throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); - - current_buf = initialize(); - pos = working_buffer.end(); - - return absolute_position; + return std::make_unique(client_ptr, bucket, metadata.remote_fs_root_path + path, s3_max_single_read_retries, buf_size); } - off_t getPosition() override { return absolute_position - available(); } - - std::string getFileName() const override { return metadata.metadata_file_path; } - private: - std::unique_ptr initialize() - { - size_t offset = absolute_position; - for (size_t i = 0; i < metadata.s3_objects.size(); ++i) - { - current_buf_idx = i; - const auto & [path, size] = metadata.s3_objects[i]; - if (size > offset) - { - auto buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size); - buf->seek(offset, SEEK_SET); - return buf; - } - offset -= size; - } - return nullptr; - } - - bool nextImpl() override - { - /// Find first available buffer that fits to given offset. - if (!current_buf) - current_buf = initialize(); - - /// If current buffer has remaining data - use it. - if (current_buf && current_buf->next()) - { - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - return true; - } - - /// If there is no available buffers - nothing to read. - if (current_buf_idx + 1 >= metadata.s3_objects.size()) - return false; - - ++current_buf_idx; - const auto & path = metadata.s3_objects[current_buf_idx].first; - current_buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, s3_max_single_read_retries, buf_size); - current_buf->next(); - working_buffer = current_buf->buffer(); - absolute_position += working_buffer.size(); - - return true; - } - std::shared_ptr client_ptr; const String & bucket; - DiskS3::Metadata metadata; size_t s3_max_single_read_retries; size_t buf_size; - - size_t absolute_position = 0; - size_t current_buf_idx = 0; - std::unique_ptr current_buf; }; -/// Stores data in S3 and adds the object key (S3 path) and object size to metadata file on local FS. -class WriteIndirectBufferFromS3 final : public WriteBufferFromFileDecorator -{ -public: - WriteIndirectBufferFromS3( - std::unique_ptr impl_, - DiskS3::Metadata metadata_, - String & s3_path_) - : WriteBufferFromFileDecorator(std::move(impl_)) - , metadata(std::move(metadata_)) - , s3_path(s3_path_) { } - - virtual ~WriteIndirectBufferFromS3() override - { - try - { - WriteIndirectBufferFromS3::finalize(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - - void finalize() override - { - if (finalized) - return; - - WriteBufferFromFileDecorator::finalize(); - - metadata.addObject(s3_path, count()); - metadata.save(); - } - - void sync() override - { - if (finalized) - metadata.save(true); - } - - std::string getFileName() const override { return metadata.metadata_file_path; } - -private: - DiskS3::Metadata metadata; - String s3_path; -}; - - -class DiskS3DirectoryIterator final : public IDiskDirectoryIterator -{ -public: - DiskS3DirectoryIterator(const String & full_path, const String & folder_path_) : iter(full_path), folder_path(folder_path_) {} - - void next() override { ++iter; } - - bool isValid() const override { return iter != Poco::DirectoryIterator(); } - - String path() const override - { - if (iter->isDirectory()) - return folder_path + iter.name() + '/'; - else - return folder_path + iter.name(); - } - - String name() const override { return iter.name(); } - -private: - Poco::DirectoryIterator iter; - String folder_path; -}; - - -using DiskS3Ptr = std::shared_ptr; - -class DiskS3Reservation final : public IReservation -{ -public: - DiskS3Reservation(const DiskS3Ptr & disk_, UInt64 size_) - : disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_) - { - } - - UInt64 getSize() const override { return size; } - - DiskPtr getDisk(size_t i) const override - { - if (i != 0) - { - throw Exception("Can't use i != 0 with single disk reservation", ErrorCodes::INCORRECT_DISK_INDEX); - } - return disk; - } - - Disks getDisks() const override { return {disk}; } - - void update(UInt64 new_size) override - { - std::lock_guard lock(disk->reservation_mutex); - disk->reserved_bytes -= size; - size = new_size; - disk->reserved_bytes += size; - } - - ~DiskS3Reservation() override - { - try - { - std::lock_guard lock(disk->reservation_mutex); - if (disk->reserved_bytes < size) - { - disk->reserved_bytes = 0; - LOG_ERROR(disk->log, "Unbalanced reservations size for disk '{}'.", disk->getName()); - } - else - { - disk->reserved_bytes -= size; - } - - if (disk->reservation_count == 0) - LOG_ERROR(disk->log, "Unbalanced reservation count for disk '{}'.", disk->getName()); - else - --disk->reservation_count; - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - -private: - DiskS3Ptr disk; - UInt64 size; - CurrentMetrics::Increment metric_increment; -}; - -/// Runs tasks asynchronously using thread pool. -class AsyncExecutor : public Executor -{ -public: - explicit AsyncExecutor(int thread_pool_size) : pool(ThreadPool(thread_pool_size)) { } - - std::future execute(std::function task) override - { - auto promise = std::make_shared>(); - pool.scheduleOrThrowOnError( - [promise, task]() - { - try - { - task(); - promise->set_value(); - } - catch (...) - { - tryLogCurrentException("DiskS3", "Failed to run async task"); - - try - { - promise->set_exception(std::current_exception()); - } catch (...) { } - } - }); - - return promise->get_future(); - } - - void setMaxThreads(size_t threads) - { - pool.setMaxThreads(threads); - } -private: - ThreadPool pool; -}; - - DiskS3::DiskS3( String name_, String bucket_, @@ -585,73 +151,46 @@ DiskS3::DiskS3( String metadata_path_, SettingsPtr settings_, GetDiskSettings settings_getter_) - : IDisk(std::make_unique(settings_->thread_pool_size)) - , name(std::move(name_)) + : IDiskRemote(name_, s3_root_path_, metadata_path_, "DiskS3", settings_->thread_pool_size) , bucket(std::move(bucket_)) - , s3_root_path(std::move(s3_root_path_)) - , metadata_path(std::move(metadata_path_)) , current_settings(std::move(settings_)) , settings_getter(settings_getter_) { } -ReservationPtr DiskS3::reserve(UInt64 bytes) -{ - if (!tryReserve(bytes)) - return {}; - return std::make_unique(std::static_pointer_cast(shared_from_this()), bytes); -} - -bool DiskS3::exists(const String & path) const -{ - return Poco::File(metadata_path + path).exists(); -} - -bool DiskS3::isFile(const String & path) const -{ - return Poco::File(metadata_path + path).isFile(); -} - -bool DiskS3::isDirectory(const String & path) const -{ - return Poco::File(metadata_path + path).isDirectory(); -} - -size_t DiskS3::getFileSize(const String & path) const -{ - auto metadata = readMeta(path); - return metadata.total_size; -} - -void DiskS3::createDirectory(const String & path) -{ - Poco::File(metadata_path + path).createDirectory(); -} - -void DiskS3::createDirectories(const String & path) -{ - Poco::File(metadata_path + path).createDirectories(); -} - String DiskS3::getUniqueId(const String & path) const { - Metadata metadata(s3_root_path, metadata_path, path); + Metadata metadata(remote_fs_root_path, metadata_path, path); String id; - if (!metadata.s3_objects.empty()) - id = metadata.s3_root_path + metadata.s3_objects[0].first; + if (!metadata.remote_fs_objects.empty()) + id = metadata.remote_fs_root_path + metadata.remote_fs_objects[0].first; return id; } -DiskDirectoryIteratorPtr DiskS3::iterateDirectory(const String & path) +RemoteFSPathKeeperPtr DiskS3::createFSPathKeeper() const { - return std::make_unique(metadata_path + path, path); + auto settings = current_settings.get(); + return std::make_shared(settings->objects_chunk_size_to_delete); } -void DiskS3::clearDirectory(const String & path) +void DiskS3::removeFromRemoteFS(RemoteFSPathKeeperPtr fs_paths_keeper) { - for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) - if (isFile(it->path())) - removeFile(it->path()); + auto settings = current_settings.get(); + auto * s3_paths_keeper = dynamic_cast(fs_paths_keeper.get()); + + if (s3_paths_keeper) + s3_paths_keeper->removePaths([&](S3PathKeeper::Chunk && chunk) + { + LOG_DEBUG(log, "Remove AWS keys {}", S3PathKeeper::getChunkKeys(chunk)); + Aws::S3::Model::Delete delkeys; + delkeys.SetObjects(chunk); + /// TODO: Make operation idempotent. Do not throw exception if key is already deleted. + Aws::S3::Model::DeleteObjectsRequest request; + request.SetBucket(bucket); + request.SetDelete(delkeys); + auto outcome = settings->client->DeleteObjects(request); + throwIfError(outcome); + }); } void DiskS3::moveFile(const String & from_path, const String & to_path) @@ -679,26 +218,13 @@ void DiskS3::moveFile(const String & from_path, const String & to_path, bool sen Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path); } -void DiskS3::replaceFile(const String & from_path, const String & to_path) -{ - if (exists(to_path)) - { - const String tmp_path = to_path + ".old"; - moveFile(to_path, tmp_path); - moveFile(from_path, to_path); - removeFile(tmp_path); - } - else - moveFile(from_path, to_path); -} - std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t, MMappedFileCache *) const { auto settings = current_settings.get(); auto metadata = readMeta(path); LOG_DEBUG(log, "Read from file by path: {}. Existing S3 objects: {}", - backQuote(metadata_path + path), metadata.s3_objects.size()); + backQuote(metadata_path + path), metadata.remote_fs_objects.size()); auto reader = std::make_unique(settings->client, bucket, metadata, settings->s3_max_single_read_retries, buf_size); return std::make_unique(std::move(reader), settings->min_bytes_for_seek); @@ -723,180 +249,23 @@ std::unique_ptr DiskS3::writeFile(const String & path, } LOG_DEBUG(log, "{} to file by path: {}. S3 path: {}", - mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), s3_root_path + s3_path); + mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_path + path), remote_fs_root_path + s3_path); auto s3_buffer = std::make_unique( settings->client, bucket, - metadata.s3_root_path + s3_path, + metadata.remote_fs_root_path + s3_path, settings->s3_min_upload_part_size, settings->s3_max_single_part_upload_size, std::move(object_metadata), buf_size); - return std::make_unique(std::move(s3_buffer), std::move(metadata), s3_path); -} - -void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) -{ - LOG_DEBUG(log, "Remove file by path: {}", backQuote(metadata_path + path)); - - Poco::File file(metadata_path + path); - - if (!file.isFile()) - throw Exception(ErrorCodes::CANNOT_DELETE_DIRECTORY, "Path '{}' is a directory", path); - - try - { - auto metadata = readMeta(path); - - /// If there is no references - delete content from S3. - if (metadata.ref_count == 0) - { - file.remove(); - - for (const auto & [s3_object_path, _] : metadata.s3_objects) - keys.addKey(s3_root_path + s3_object_path); - } - else /// In other case decrement number of references, save metadata and delete file. - { - --metadata.ref_count; - metadata.save(); - file.remove(); - } - } - catch (const Exception & e) - { - /// If it's impossible to read meta - just remove it from FS. - if (e.code() == ErrorCodes::UNKNOWN_FORMAT) - { - LOG_WARNING( - log, - "Metadata file {} can't be read by reason: {}. Removing it forcibly.", - backQuote(path), - e.nested() ? e.nested()->message() : e.message()); - - file.remove(); - } - else - throw; - } -} - -void DiskS3::removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys) -{ - checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - - Poco::File file(metadata_path + path); - if (file.isFile()) - { - removeMeta(path, keys); - } - else - { - for (auto it{iterateDirectory(path)}; it->isValid(); it->next()) - removeMetaRecursive(it->path(), keys); - file.remove(); - } -} - -void DiskS3::removeAws(const AwsS3KeyKeeper & keys) -{ - if (!keys.empty()) - { - auto settings = current_settings.get(); - - for (const auto & chunk : keys) - { - LOG_DEBUG(log, "Remove AWS keys {}", AwsS3KeyKeeper::getChunkKeys(chunk)); - - Aws::S3::Model::Delete delkeys; - delkeys.SetObjects(chunk); - - /// TODO: Make operation idempotent. Do not throw exception if key is already deleted. - Aws::S3::Model::DeleteObjectsRequest request; - request.SetBucket(bucket); - request.SetDelete(delkeys); - auto outcome = settings->client->DeleteObjects(request); - throwIfError(outcome); - } - } -} - -void DiskS3::removeFileIfExists(const String & path) -{ - AwsS3KeyKeeper keys; - if (Poco::File(metadata_path + path).exists()) - { - removeMeta(path, keys); - removeAws(keys); - } -} - -void DiskS3::removeDirectory(const String & path) -{ - Poco::File(metadata_path + path).remove(); -} - -void DiskS3::removeSharedFile(const String & path, bool keep_s3) -{ - AwsS3KeyKeeper keys; - removeMeta(path, keys); - if (!keep_s3) - removeAws(keys); -} - -void DiskS3::removeSharedRecursive(const String & path, bool keep_s3) -{ - AwsS3KeyKeeper keys; - removeMetaRecursive(path, keys); - if (!keep_s3) - removeAws(keys); -} - -bool DiskS3::tryReserve(UInt64 bytes) -{ - std::lock_guard lock(reservation_mutex); - if (bytes == 0) - { - LOG_DEBUG(log, "Reserving 0 bytes on s3 disk {}", backQuote(name)); - ++reservation_count; - return true; - } - - auto available_space = getAvailableSpace(); - UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes); - if (unreserved_space >= bytes) - { - LOG_DEBUG(log, "Reserving {} on disk {}, having unreserved {}.", - ReadableSize(bytes), backQuote(name), ReadableSize(unreserved_space)); - ++reservation_count; - reserved_bytes += bytes; - return true; - } - return false; -} - -void DiskS3::listFiles(const String & path, std::vector & file_names) -{ - for (auto it = iterateDirectory(path); it->isValid(); it->next()) - file_names.push_back(it->name()); -} - -void DiskS3::setLastModified(const String & path, const Poco::Timestamp & timestamp) -{ - Poco::File(metadata_path + path).setLastModified(timestamp); -} - -Poco::Timestamp DiskS3::getLastModified(const String & path) -{ - return Poco::File(metadata_path + path).getLastModified(); + return std::make_unique>(std::move(s3_buffer), std::move(metadata), s3_path); } void DiskS3::createHardLink(const String & src_path, const String & dst_path) { auto settings = current_settings.get(); - createHardLink(src_path, dst_path, settings->send_metadata); } @@ -922,22 +291,6 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path, bo DB::createHardLink(metadata_path + src_path, metadata_path + dst_path); } -void DiskS3::createFile(const String & path) -{ - /// Create empty metadata file. - auto metadata = createMeta(path); - metadata.save(); -} - -void DiskS3::setReadOnly(const String & path) -{ - /// We should store read only flag inside metadata file (instead of using FS flag), - /// because we modify metadata file when create hard-links from it. - auto metadata = readMeta(path); - metadata.read_only = true; - metadata.save(); -} - void DiskS3::shutdown() { auto settings = current_settings.get(); @@ -955,7 +308,7 @@ void DiskS3::createFileOperationObject(const String & operation_name, UInt64 rev WriteBufferFromS3 buffer( settings->client, bucket, - s3_root_path + key, + remote_fs_root_path + key, settings->s3_min_upload_part_size, settings->s3_max_single_part_upload_size, metadata); @@ -975,7 +328,7 @@ void DiskS3::startup() restore(); - if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION) + if (readSchemaVersion(bucket, remote_fs_root_path) < RESTORABLE_SCHEMA_VERSION) migrateToRestorableSchema(); findLastRevision(); @@ -995,8 +348,8 @@ void DiskS3::findLastRevision() LOG_DEBUG(log, "Check object exists with revision prefix {}", revision_prefix); /// Check file or operation with such revision prefix exists. - if (checkObjectExists(bucket, s3_root_path + "r" + revision_prefix) - || checkObjectExists(bucket, s3_root_path + "operations/r" + revision_prefix)) + if (checkObjectExists(bucket, remote_fs_root_path + "r" + revision_prefix) + || checkObjectExists(bucket, remote_fs_root_path + "operations/r" + revision_prefix)) revision += "1"; else revision += "0"; @@ -1030,7 +383,7 @@ void DiskS3::saveSchemaVersion(const int & version) WriteBufferFromS3 buffer( settings->client, bucket, - s3_root_path + SCHEMA_VERSION_OBJECT, + remote_fs_root_path + SCHEMA_VERSION_OBJECT, settings->s3_min_upload_part_size, settings->s3_max_single_part_upload_size); @@ -1058,12 +411,12 @@ void DiskS3::migrateFileToRestorableSchema(const String & path) auto meta = readMeta(path); - for (const auto & [key, _] : meta.s3_objects) + for (const auto & [key, _] : meta.remote_fs_objects) { ObjectMetadata metadata { {"path", path} }; - updateObjectMetadata(s3_root_path + key, metadata); + updateObjectMetadata(remote_fs_root_path + key, metadata); } } @@ -1286,7 +639,7 @@ void DiskS3::restore() { RestoreInformation information; information.source_bucket = bucket; - information.source_path = s3_root_path; + information.source_path = remote_fs_root_path; readRestoreInformation(information); if (information.revision == 0) @@ -1298,11 +651,11 @@ void DiskS3::restore() { /// In this case we need to additionally cleanup S3 from objects with later revision. /// Will be simply just restore to different path. - if (information.source_path == s3_root_path && information.revision != LATEST_REVISION) + if (information.source_path == remote_fs_root_path && information.revision != LATEST_REVISION) throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS); /// This case complicates S3 cleanup in case of unsuccessful restore. - if (information.source_path != s3_root_path && s3_root_path.starts_with(information.source_path)) + if (information.source_path != remote_fs_root_path && remote_fs_root_path.starts_with(information.source_path)) throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS); } @@ -1314,7 +667,7 @@ void DiskS3::restore() LOG_INFO(log, "Removing old metadata..."); - bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path; + bool cleanup_s3 = information.source_bucket != bucket || information.source_path != remote_fs_root_path; for (const auto & root : data_roots) if (exists(root)) removeSharedRecursive(root + '/', !cleanup_s3); @@ -1408,8 +761,8 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so auto relative_key = shrinkKey(source_path, key); /// Copy object if we restore to different bucket / path. - if (bucket != source_bucket || s3_root_path != source_path) - copyObject(source_bucket, key, bucket, s3_root_path + relative_key); + if (bucket != source_bucket || remote_fs_root_path != source_path) + copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key); metadata.addObject(relative_key, head_result.GetContentLength()); metadata.save(); @@ -1425,7 +778,7 @@ void DiskS3::restoreFileOperations(const RestoreInformation & restore_informatio LOG_INFO(log, "Starting restore file operations for disk {}", name); /// Enable recording file operations if we restore to different bucket / path. - bool send_metadata = bucket != restore_information.source_bucket || s3_root_path != restore_information.source_path; + bool send_metadata = bucket != restore_information.source_bucket || remote_fs_root_path != restore_information.source_path; std::set renames; auto restore_file_operations = [this, &restore_information, &renames, &send_metadata](auto list_result) @@ -1581,7 +934,8 @@ DiskS3Settings::DiskS3Settings( size_t min_bytes_for_seek_, bool send_metadata_, int thread_pool_size_, - int list_object_keys_size_) + int list_object_keys_size_, + int objects_chunk_size_to_delete_) : client(client_) , s3_max_single_read_retries(s3_max_single_read_retries_) , s3_min_upload_part_size(s3_min_upload_part_size_) @@ -1590,6 +944,7 @@ DiskS3Settings::DiskS3Settings( , send_metadata(send_metadata_) , thread_pool_size(thread_pool_size_) , list_object_keys_size(list_object_keys_size_) + , objects_chunk_size_to_delete(objects_chunk_size_to_delete_) { } diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index efc7fdcf643..8857a00d709 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -2,7 +2,6 @@ #include #include -#include #include "Disks/DiskFactory.h" #include "Disks/Executor.h" @@ -12,6 +11,7 @@ #include #include +#include namespace DB @@ -28,7 +28,8 @@ struct DiskS3Settings size_t min_bytes_for_seek_, bool send_metadata_, int thread_pool_size_, - int list_object_keys_size_); + int list_object_keys_size_, + int objects_chunk_size_to_delete_); std::shared_ptr client; size_t s3_max_single_read_retries; @@ -38,25 +39,24 @@ struct DiskS3Settings bool send_metadata; int thread_pool_size; int list_object_keys_size; + int objects_chunk_size_to_delete; }; + /** * Storage for persisting data in S3 and metadata on the local disk. * Files are represented by file in local filesystem (clickhouse_root/disks/disk_name/path/to/file) * that contains S3 object key with actual data. */ -class DiskS3 : public IDisk +class DiskS3 final : public IDiskRemote { public: using ObjectMetadata = std::map; using Futures = std::vector>; + using SettingsPtr = std::unique_ptr; using GetDiskSettings = std::function; - friend class DiskS3Reservation; - - class AwsS3KeyKeeper; - struct Metadata; struct RestoreInformation; DiskS3( @@ -67,44 +67,6 @@ public: SettingsPtr settings_, GetDiskSettings settings_getter_); - const String & getName() const override { return name; } - - const String & getPath() const override { return metadata_path; } - - ReservationPtr reserve(UInt64 bytes) override; - - UInt64 getTotalSpace() const override { return std::numeric_limits::max(); } - - UInt64 getAvailableSpace() const override { return std::numeric_limits::max(); } - - UInt64 getUnreservedSpace() const override { return std::numeric_limits::max(); } - - UInt64 getKeepingFreeSpace() const override { return 0; } - - bool exists(const String & path) const override; - - bool isFile(const String & path) const override; - - bool isDirectory(const String & path) const override; - - size_t getFileSize(const String & path) const override; - - void createDirectory(const String & path) override; - - void createDirectories(const String & path) override; - - void clearDirectory(const String & path) override; - - void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); } - - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; - - void moveFile(const String & from_path, const String & to_path) override; - void moveFile(const String & from_path, const String & to_path, bool send_metadata); - void replaceFile(const String & from_path, const String & to_path) override; - - void listFiles(const String & path, std::vector & file_names) override; - std::unique_ptr readFile( const String & path, size_t buf_size, @@ -118,25 +80,16 @@ public: size_t buf_size, WriteMode mode) override; - void removeFile(const String & path) override { removeSharedFile(path, false); } - void removeFileIfExists(const String & path) override; - void removeDirectory(const String & path) override; - void removeRecursive(const String & path) override { removeSharedRecursive(path, false); } + void removeFromRemoteFS(RemoteFSPathKeeperPtr keeper) override; - void removeSharedFile(const String & path, bool keep_s3) override; - void removeSharedRecursive(const String & path, bool keep_s3) override; + RemoteFSPathKeeperPtr createFSPathKeeper() const override; + + void moveFile(const String & from_path, const String & to_path, bool send_metadata); + void moveFile(const String & from_path, const String & to_path) override; void createHardLink(const String & src_path, const String & dst_path) override; void createHardLink(const String & src_path, const String & dst_path, bool send_metadata); - void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; - - Poco::Timestamp getLastModified(const String & path) override; - - void createFile(const String & path) override; - - void setReadOnly(const String & path) override; - DiskType::Type getType() const override { return DiskType::Type::S3; } void shutdown() override; @@ -157,16 +110,6 @@ public: void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextConstPtr context) override; private: - bool tryReserve(UInt64 bytes); - - void removeMeta(const String & path, AwsS3KeyKeeper & keys); - void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys); - void removeAws(const AwsS3KeyKeeper & keys); - - Metadata readOrCreateMetaForWriting(const String & path, WriteMode mode); - Metadata readMeta(const String & path) const; - Metadata createMeta(const String & path) const; - void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata); /// Converts revision to binary string with leading zeroes (64 bit). static String revisionToString(UInt64 revision); @@ -200,19 +143,12 @@ private: /// Forms detached path '../../detached/part_name/' from '../../part_name/' static String pathToDetached(const String & source_path); - const String name; const String bucket; - const String s3_root_path; - const String metadata_path; - MultiVersion current_settings; + MultiVersion current_settings; /// Gets disk settings from context. GetDiskSettings settings_getter; - UInt64 reserved_bytes = 0; - UInt64 reservation_count = 0; - std::mutex reservation_mutex; - std::atomic revision_counter = 0; static constexpr UInt64 LATEST_REVISION = std::numeric_limits::max(); static constexpr UInt64 UNKNOWN_REVISION = 0; @@ -229,8 +165,6 @@ private: static constexpr int RESTORABLE_SCHEMA_VERSION = 1; /// Directories with data. const std::vector data_roots {"data", "store"}; - - Poco::Logger * log = &Poco::Logger::get("DiskS3"); }; } diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 767e8890a01..e02f413c65e 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -156,7 +156,8 @@ std::unique_ptr getSettings(const Poco::Util::AbstractConfigurat config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024), config.getBool(config_prefix + ".send_metadata", false), config.getInt(config_prefix + ".thread_pool_size", 16), - config.getInt(config_prefix + ".list_object_keys_size", 1000)); + config.getInt(config_prefix + ".list_object_keys_size", 1000), + config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000)); } } @@ -225,4 +226,3 @@ void registerDiskS3(DiskFactory & factory) void registerDiskS3(DiskFactory &) {} #endif - diff --git a/src/Disks/WriteIndirectBufferFromRemoteFS.cpp b/src/Disks/WriteIndirectBufferFromRemoteFS.cpp new file mode 100644 index 00000000000..adc711608d7 --- /dev/null +++ b/src/Disks/WriteIndirectBufferFromRemoteFS.cpp @@ -0,0 +1,71 @@ +#include "WriteIndirectBufferFromRemoteFS.h" + +#if USE_AWS_S3 || USE_HDFS +#include +#include + + +namespace DB +{ + +/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS. +template +WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS( + std::unique_ptr impl_, + IDiskRemote::Metadata metadata_, + const String & remote_fs_path_) + : WriteBufferFromFileDecorator(std::move(impl_)) + , metadata(std::move(metadata_)) + , remote_fs_path(remote_fs_path_) +{ +} + + +template +WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS() +{ + try + { + WriteIndirectBufferFromRemoteFS::finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + + +template +void WriteIndirectBufferFromRemoteFS::finalize() +{ + if (finalized) + return; + + WriteBufferFromFileDecorator::finalize(); + + metadata.addObject(remote_fs_path, count()); + metadata.save(); +} + + +template +void WriteIndirectBufferFromRemoteFS::sync() +{ + if (finalized) + metadata.save(true); +} + + +#if USE_AWS_S3 +template +class WriteIndirectBufferFromRemoteFS; +#endif + +#if USE_HDFS +template +class WriteIndirectBufferFromRemoteFS; +#endif + +} + +#endif diff --git a/src/Disks/WriteIndirectBufferFromRemoteFS.h b/src/Disks/WriteIndirectBufferFromRemoteFS.h new file mode 100644 index 00000000000..cda7523e19e --- /dev/null +++ b/src/Disks/WriteIndirectBufferFromRemoteFS.h @@ -0,0 +1,39 @@ +#pragma once +#include + +#if USE_AWS_S3 || USE_HDFS + +#include +#include +#include + +namespace DB +{ + +/// Stores data in S3/HDFS and adds the object key (S3 path) and object size to metadata file on local FS. +template +class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator +{ +public: + WriteIndirectBufferFromRemoteFS( + std::unique_ptr impl_, + IDiskRemote::Metadata metadata_, + const String & remote_fs_path_); + + virtual ~WriteIndirectBufferFromRemoteFS() override; + + void finalize() override; + + void sync() override; + + String getFileName() const override { return metadata.metadata_file_path; } + +private: + IDiskRemote::Metadata metadata; + + String remote_fs_path; +}; + +} + +#endif diff --git a/src/Disks/registerDisks.cpp b/src/Disks/registerDisks.cpp index 2da39e62b19..8f4901e49e5 100644 --- a/src/Disks/registerDisks.cpp +++ b/src/Disks/registerDisks.cpp @@ -8,21 +8,33 @@ namespace DB { + void registerDiskLocal(DiskFactory & factory); void registerDiskMemory(DiskFactory & factory); + #if USE_AWS_S3 void registerDiskS3(DiskFactory & factory); #endif +#if USE_HDFS +void registerDiskHDFS(DiskFactory & factory); +#endif + + void registerDisks() { auto & factory = DiskFactory::instance(); registerDiskLocal(factory); registerDiskMemory(factory); + #if USE_AWS_S3 registerDiskS3(factory); #endif + +#if USE_HDFS + registerDiskHDFS(factory); +#endif } } diff --git a/src/Disks/tests/gtest_disk.cpp b/src/Disks/tests/gtest_disk.cpp index 3b9dca63002..714abf485ee 100644 --- a/src/Disks/tests/gtest_disk.cpp +++ b/src/Disks/tests/gtest_disk.cpp @@ -1,14 +1,15 @@ #include - #include #include #include "gtest_disk.h" + #if !defined(__clang__) # pragma GCC diagnostic push # pragma GCC diagnostic ignored "-Wsuggest-override" #endif + template DB::DiskPtr createDisk(); diff --git a/src/Disks/tests/gtest_disk.h b/src/Disks/tests/gtest_disk.h index d7ca0beca5f..fa1028089cb 100644 --- a/src/Disks/tests/gtest_disk.h +++ b/src/Disks/tests/gtest_disk.h @@ -1,4 +1,5 @@ #pragma once + #include #include #include diff --git a/src/Disks/tests/gtest_disk_hdfs.cpp b/src/Disks/tests/gtest_disk_hdfs.cpp new file mode 100644 index 00000000000..d537ac967e5 --- /dev/null +++ b/src/Disks/tests/gtest_disk_hdfs.cpp @@ -0,0 +1,160 @@ +#include +#include +#include +#include "gtest_disk.h" + + +#define RUN_HDFS_TEST 0 +#if RUN_HDFS_TEST + +#include +#include + +const String hdfs_uri = "hdfs://172.20.0.2:9000/disk_test/"; +const String metadata_path = "/path/to/metadata/"; +const String config_path = "/path/to/config.xml"; +const String file_name = "test.txt"; + + +TEST(DiskTestHDFS, RemoveFileHDFS) +{ + Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); + + DB::HDFSBuilderWrapper builder = DB::createHDFSBuilder(hdfs_uri, *config); + DB::HDFSFSPtr fs = DB::createHDFSFS(builder.get()); + + disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite); + auto metadata = disk.readMeta(file_name); + + const String hdfs_file_name = metadata.remote_fs_objects[0].first; + const String hdfs_file_path = "/disk_test/" + hdfs_file_name; + + auto ret = hdfsExists(fs.get(), hdfs_file_path.data()); + EXPECT_EQ(0, ret); + + disk.removeFile(file_name); + ret = hdfsExists(fs.get(), hdfs_file_path.data()); + EXPECT_EQ(-1, ret); +} + + +TEST(DiskTestHDFS, WriteReadHDFS) +{ + Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); + + { + auto out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite); + writeString("Test write to file", *out); + } + + { + DB::String result; + auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr); + readString(result, *in); + EXPECT_EQ("Test write to file", result); + } + + disk.removeFileIfExists(file_name); +} + + +TEST(DiskTestHDFS, RewriteFileHDFS) +{ + Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); + + for (size_t i = 1; i <= 10; ++i) + { + std::unique_ptr out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite); + writeString("Text" + DB::toString(i), *out); + } + + { + String result; + auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr); + readString(result, *in); + EXPECT_EQ("Text10", result); + readString(result, *in); + EXPECT_EQ("", result); + } + + disk.removeFileIfExists(file_name); +} + + +TEST(DiskTestHDFS, AppendFileHDFS) +{ + Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); + + { + std::unique_ptr out = disk.writeFile(file_name, 1024, DB::WriteMode::Append); + writeString("Text", *out); + for (size_t i = 0; i < 10; ++i) + { + writeIntText(i, *out); + } + } + + { + String result, expected; + auto in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr); + + readString(result, *in); + EXPECT_EQ("Text0123456789", result); + + readString(result, *in); + EXPECT_EQ("", result); + } + + disk.removeFileIfExists(file_name); +} + + +TEST(DiskTestHDFS, SeekHDFS) +{ + Poco::Util::AbstractConfiguration *config = new Poco::Util::XMLConfiguration(config_path); + auto settings = std::make_unique(1024 * 1024); + auto disk = DB::DiskHDFS("disk_hdfs", hdfs_uri, std::move(settings), metadata_path, *config); + + { + std::unique_ptr out = disk.writeFile(file_name, 1024, DB::WriteMode::Rewrite); + writeString("test data", *out); + } + + /// Test SEEK_SET + { + String buf(4, '0'); + std::unique_ptr in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr); + + in->seek(5, SEEK_SET); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } + + /// Test SEEK_CUR + { + std::unique_ptr in = disk.readFile(file_name, 1024, 1024, 1024, 1024, nullptr); + String buf(4, '0'); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("test", buf); + + // Skip whitespace + in->seek(1, SEEK_CUR); + + in->readStrict(buf.data(), 4); + EXPECT_EQ("data", buf); + } + + disk.removeFileIfExists(file_name); +} + +#endif diff --git a/src/Disks/ya.make.in b/src/Disks/ya.make.in index d030c7b1482..4f7ccff17ef 100644 --- a/src/Disks/ya.make.in +++ b/src/Disks/ya.make.in @@ -7,8 +7,7 @@ PEERDIR( ) -SRCS( - + ) END() diff --git a/src/Storages/HDFS/HDFSCommon.cpp b/src/Storages/HDFS/HDFSCommon.cpp index 40f52921008..d7e57a0f9eb 100644 --- a/src/Storages/HDFS/HDFSCommon.cpp +++ b/src/Storages/HDFS/HDFSCommon.cpp @@ -145,6 +145,7 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A hdfsBuilderSetUserName(builder.get(), user.c_str()); } + hdfsBuilderSetNameNode(builder.get(), host.c_str()); if (port != 0) { diff --git a/src/Storages/HDFS/HDFSCommon.h b/src/Storages/HDFS/HDFSCommon.h index 154c253a76b..5c70a8997c3 100644 --- a/src/Storages/HDFS/HDFSCommon.h +++ b/src/Storages/HDFS/HDFSCommon.h @@ -1,13 +1,15 @@ #pragma once +#if !defined(ARCADIA_BUILD) #include +#endif #if USE_HDFS #include #include #include -#include +#include // Y_IGNORE #include #include diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index 29ea46c7590..f81d7736db3 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -13,6 +13,8 @@ namespace ErrorCodes { extern const int NETWORK_ERROR; extern const int CANNOT_OPEN_FILE; + extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int SEEK_POSITION_OUT_OF_BOUND; } ReadBufferFromHDFS::~ReadBufferFromHDFS() = default; @@ -29,6 +31,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl HDFSBuilderWrapper builder; HDFSFSPtr fs; + off_t offset = 0; + bool initialized = false; + explicit ReadBufferFromHDFSImpl( const std::string & hdfs_uri_, const std::string & hdfs_file_path_, @@ -48,8 +53,30 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError())); } - int read(char * start, size_t size) const + ~ReadBufferFromHDFSImpl() { + std::lock_guard lock(hdfs_init_mutex); + hdfsCloseFile(fs.get(), fin); + } + + void initialize() const + { + if (!offset) + return; + + int seek_status = hdfsSeek(fs.get(), fin, offset); + if (seek_status != 0) + throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError())); + } + + int read(char * start, size_t size) + { + if (!initialized) + { + initialize(); + initialized = true; + } + int bytes_read = hdfsRead(fs.get(), fin, start, size); if (bytes_read < 0) throw Exception(ErrorCodes::NETWORK_ERROR, @@ -58,10 +85,25 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl return bytes_read; } - ~ReadBufferFromHDFSImpl() + int seek(off_t offset_, int whence) { - std::lock_guard lock(hdfs_init_mutex); - hdfsCloseFile(fs.get(), fin); + if (initialized) + throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + if (whence != SEEK_SET) + throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + + if (offset_ < 0) + throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", std::to_string(offset_)); + + offset = offset_; + + return offset; + } + + int tell() const + { + return offset; } }; @@ -73,7 +115,7 @@ ReadBufferFromHDFS::ReadBufferFromHDFS( const String & hdfs_file_path_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_) - : BufferWithOwnMemory(buf_size_) + : BufferWithOwnMemory(buf_size_) , impl(std::make_unique(hdfs_uri_, hdfs_file_path_, config_)) { } @@ -90,6 +132,18 @@ bool ReadBufferFromHDFS::nextImpl() return true; } + +off_t ReadBufferFromHDFS::seek(off_t off, int whence) +{ + return impl->seek(off, whence); +} + + +off_t ReadBufferFromHDFS::getPosition() +{ + return impl->tell() + count(); +} + } #endif diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h index bd14e3d3792..498056ea376 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/HDFS/ReadBufferFromHDFS.h @@ -7,28 +7,34 @@ #include #include #include -#include +#include // Y_IGNORE #include #include +#include namespace DB { + /** Accepts HDFS path to file and opens it. * Closes file by himself (thus "owns" a file descriptor). */ -class ReadBufferFromHDFS : public BufferWithOwnMemory +class ReadBufferFromHDFS : public BufferWithOwnMemory { struct ReadBufferFromHDFSImpl; public: ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_, - const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + const Poco::Util::AbstractConfiguration & config_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); ~ReadBufferFromHDFS() override; bool nextImpl() override; + off_t seek(off_t offset_, int whence) override; + + off_t getPosition() override; + private: std::unique_ptr impl; }; diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 68c2d022ab8..1428e80f9af 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -64,7 +64,6 @@ public: struct SourcesInfo { std::vector uris; - std::atomic next_uri_to_read = 0; bool need_path_column = false; diff --git a/src/Storages/HDFS/WriteBufferFromHDFS.cpp b/src/Storages/HDFS/WriteBufferFromHDFS.cpp index c1323df86f0..5696e8f247f 100644 --- a/src/Storages/HDFS/WriteBufferFromHDFS.cpp +++ b/src/Storages/HDFS/WriteBufferFromHDFS.cpp @@ -27,20 +27,24 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl HDFSBuilderWrapper builder; HDFSFSPtr fs; - explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_) - : hdfs_uri(hdfs_name_) - , builder(createHDFSBuilder(hdfs_uri,config_)) + explicit WriteBufferFromHDFSImpl( + const std::string & hdfs_uri_, + const Poco::Util::AbstractConfiguration & config_, + int flags) + : hdfs_uri(hdfs_uri_) + , builder(createHDFSBuilder(hdfs_uri, config_)) , fs(createHDFSFS(builder.get())) { const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); const String path = hdfs_uri.substr(begin_of_path); + if (path.find_first_of("*?{") != std::string::npos) - throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE); + throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri); if (!hdfsExists(fs.get(), path.c_str())) - throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path); - fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here + fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here if (fout == nullptr) { @@ -76,9 +80,13 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl } }; -WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_) +WriteBufferFromHDFS::WriteBufferFromHDFS( + const std::string & hdfs_name_, + const Poco::Util::AbstractConfiguration & config_, + size_t buf_size_, + int flags_) : BufferWithOwnMemory(buf_size_) - , impl(std::make_unique(hdfs_name_, config_)) + , impl(std::make_unique(hdfs_name_, config_, flags_)) { } diff --git a/src/Storages/HDFS/WriteBufferFromHDFS.h b/src/Storages/HDFS/WriteBufferFromHDFS.h index a28caaf60ee..9dc74e69d40 100644 --- a/src/Storages/HDFS/WriteBufferFromHDFS.h +++ b/src/Storages/HDFS/WriteBufferFromHDFS.h @@ -8,6 +8,7 @@ #include #include + namespace DB { /** Accepts HDFS path to file and opens it. @@ -15,11 +16,13 @@ namespace DB */ class WriteBufferFromHDFS final : public BufferWithOwnMemory { - struct WriteBufferFromHDFSImpl; - std::unique_ptr impl; public: - WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + WriteBufferFromHDFS( + const std::string & hdfs_name_, + const Poco::Util::AbstractConfiguration & config_, + size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, + int flags = O_WRONLY); WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default; @@ -30,6 +33,11 @@ public: void sync() override; void finalize() override; + +private: + struct WriteBufferFromHDFSImpl; + std::unique_ptr impl; }; + } #endif diff --git a/tests/integration/test_disk_types/configs/storage.xml b/tests/integration/test_disk_types/configs/storage.xml index 2bf9a2e363a..1167a4f7382 100644 --- a/tests/integration/test_disk_types/configs/storage.xml +++ b/tests/integration/test_disk_types/configs/storage.xml @@ -11,6 +11,10 @@ memory + + hdfs + http://hdfs1:9000/data/ + diff --git a/tests/integration/test_disk_types/test.py b/tests/integration/test_disk_types/test.py index ad09519a484..3f1a656d98f 100644 --- a/tests/integration/test_disk_types/test.py +++ b/tests/integration/test_disk_types/test.py @@ -5,6 +5,7 @@ disk_types = { "default": "local", "disk_s3": "s3", "disk_memory": "memory", + "disk_hdfs": "hdfs", } @@ -12,7 +13,7 @@ disk_types = { def cluster(): try: cluster = ClickHouseCluster(__file__) - cluster.add_instance("node", main_configs=["configs/storage.xml"], with_minio=True) + cluster.add_instance("node", main_configs=["configs/storage.xml"], with_minio=True, with_hdfs=True) cluster.start() yield cluster finally: @@ -35,3 +36,4 @@ def test_select_by_type(cluster): node = cluster.instances["node"] for name, disk_type in list(disk_types.items()): assert node.query("SELECT name FROM system.disks WHERE type='" + disk_type + "'") == name + "\n" + diff --git a/tests/integration/test_log_family_hdfs/__init__.py b/tests/integration/test_log_family_hdfs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_log_family_hdfs/configs/config.d/log_conf.xml b/tests/integration/test_log_family_hdfs/configs/config.d/log_conf.xml new file mode 100644 index 00000000000..318a6bca95d --- /dev/null +++ b/tests/integration/test_log_family_hdfs/configs/config.d/log_conf.xml @@ -0,0 +1,12 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + diff --git a/tests/integration/test_log_family_hdfs/configs/storage_conf.xml b/tests/integration/test_log_family_hdfs/configs/storage_conf.xml new file mode 100644 index 00000000000..15ff3891474 --- /dev/null +++ b/tests/integration/test_log_family_hdfs/configs/storage_conf.xml @@ -0,0 +1,11 @@ + + + + + + hdfs + hdfs://hdfs1:9000/clickhouse/ + + + + diff --git a/tests/integration/test_log_family_hdfs/test.py b/tests/integration/test_log_family_hdfs/test.py new file mode 100644 index 00000000000..44f6904e8ea --- /dev/null +++ b/tests/integration/test_log_family_hdfs/test.py @@ -0,0 +1,59 @@ +import logging +import sys + +import pytest +from helpers.cluster import ClickHouseCluster + +from pyhdfs import HdfsClient + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance("node", + main_configs=["configs/storage_conf.xml", "configs/config.d/log_conf.xml"], + with_hdfs=True) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + fs = HdfsClient(hosts='localhost') + fs.mkdirs('/clickhouse') + + yield cluster + finally: + cluster.shutdown() + + +def assert_objects_count(cluster, objects_count, path='data/'): + fs = HdfsClient(hosts='localhost') + hdfs_objects = fs.listdir('/clickhouse') + assert objects_count == len(hdfs_objects) + + +@pytest.mark.parametrize( + "log_engine,files_overhead,files_overhead_per_insert", + [("TinyLog", 1, 1), ("Log", 2, 1), ("StripeLog", 1, 2)]) +def test_log_family_hdfs(cluster, log_engine, files_overhead, files_overhead_per_insert): + node = cluster.instances["node"] + + node.query("CREATE TABLE hdfs_test (id UInt64) ENGINE={} SETTINGS disk = 'hdfs'".format(log_engine)) + + node.query("INSERT INTO hdfs_test SELECT number FROM numbers(5)") + assert node.query("SELECT * FROM hdfs_test") == "0\n1\n2\n3\n4\n" + assert_objects_count(cluster, files_overhead_per_insert + files_overhead) + + node.query("INSERT INTO hdfs_test SELECT number + 5 FROM numbers(3)") + assert node.query("SELECT * FROM hdfs_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n" + assert_objects_count(cluster, files_overhead_per_insert * 2 + files_overhead) + + node.query("INSERT INTO hdfs_test SELECT number + 8 FROM numbers(1)") + assert node.query("SELECT * FROM hdfs_test order by id") == "0\n1\n2\n3\n4\n5\n6\n7\n8\n" + assert_objects_count(cluster, files_overhead_per_insert * 3 + files_overhead) + + node.query("TRUNCATE TABLE hdfs_test") + assert_objects_count(cluster, 0) + + node.query("DROP TABLE hdfs_test") + diff --git a/tests/integration/test_merge_tree_hdfs/__init__.py b/tests/integration/test_merge_tree_hdfs/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_merge_tree_hdfs/configs/config.d/log_conf.xml b/tests/integration/test_merge_tree_hdfs/configs/config.d/log_conf.xml new file mode 100644 index 00000000000..318a6bca95d --- /dev/null +++ b/tests/integration/test_merge_tree_hdfs/configs/config.d/log_conf.xml @@ -0,0 +1,12 @@ + + 3 + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + diff --git a/tests/integration/test_merge_tree_hdfs/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_hdfs/configs/config.d/storage_conf.xml new file mode 100644 index 00000000000..d43e18fa99c --- /dev/null +++ b/tests/integration/test_merge_tree_hdfs/configs/config.d/storage_conf.xml @@ -0,0 +1,30 @@ + + + + + hdfs + hdfs://hdfs1:9000/clickhouse/ + + + local + / + + + + + +
+ hdfs +
+ + hdd + +
+
+
+
+ + + 0 + +
diff --git a/tests/integration/test_merge_tree_hdfs/test.py b/tests/integration/test_merge_tree_hdfs/test.py new file mode 100644 index 00000000000..2d0d9d9fb1e --- /dev/null +++ b/tests/integration/test_merge_tree_hdfs/test.py @@ -0,0 +1,317 @@ +import logging +import random +import string +import time +import threading +import os + +import pytest +from helpers.cluster import ClickHouseCluster + +from pyhdfs import HdfsClient + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/storage_conf.xml') + + +def create_table(cluster, table_name, additional_settings=None): + node = cluster.instances["node"] + + create_table_statement = """ + CREATE TABLE {} ( + dt Date, id Int64, data String, + INDEX min_max (id) TYPE minmax GRANULARITY 3 + ) ENGINE=MergeTree() + PARTITION BY dt + ORDER BY (dt, id) + SETTINGS + storage_policy='hdfs', + old_parts_lifetime=0, + index_granularity=512 + """.format(table_name) + + if additional_settings: + create_table_statement += "," + create_table_statement += additional_settings + + node.query(create_table_statement) + + +FILES_OVERHEAD = 1 +FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files +FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1 +FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1 + + +def random_string(length): + letters = string.ascii_letters + return ''.join(random.choice(letters) for i in range(length)) + + +def generate_values(date_str, count, sign=1): + data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)] + data.sort(key=lambda tup: tup[1]) + return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data]) + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance("node", main_configs=["configs/config.d/storage_conf.xml", + "configs/config.d/log_conf.xml"], with_hdfs=True) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + fs = HdfsClient(hosts='localhost') + fs.mkdirs('/clickhouse') + + logging.info("Created HDFS directory") + + yield cluster + finally: + cluster.shutdown() + + +def wait_for_delete_hdfs_objects(cluster, expected, num_tries=30): + fs = HdfsClient(hosts='localhost') + while num_tries > 0: + num_hdfs_objects = len(fs.listdir('/clickhouse')) + if num_hdfs_objects == expected: + break; + num_tries -= 1 + time.sleep(1) + assert(len(fs.listdir('/clickhouse')) == expected) + + +@pytest.fixture(autouse=True) +def drop_table(cluster): + node = cluster.instances["node"] + + fs = HdfsClient(hosts='localhost') + hdfs_objects = fs.listdir('/clickhouse') + print('Number of hdfs objects to delete:', len(hdfs_objects), sep=' ') + + node.query("DROP TABLE IF EXISTS hdfs_test SYNC") + + try: + wait_for_delete_hdfs_objects(cluster, 0) + finally: + hdfs_objects = fs.listdir('/clickhouse') + if len(hdfs_objects) == 0: + return + print("Manually removing extra objects to prevent tests cascade failing: ", hdfs_objects) + for path in hdfs_objects: + fs.delete(path) + + +@pytest.mark.parametrize("min_rows_for_wide_part,files_per_part", [(0, FILES_OVERHEAD_PER_PART_WIDE), (8192, FILES_OVERHEAD_PER_PART_COMPACT)]) +def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part): + create_table(cluster, "hdfs_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part)) + + node = cluster.instances["node"] + + values1 = generate_values('2020-01-03', 4096) + node.query("INSERT INTO hdfs_test VALUES {}".format(values1)) + assert node.query("SELECT * FROM hdfs_test order by dt, id FORMAT Values") == values1 + + fs = HdfsClient(hosts='localhost') + + hdfs_objects = fs.listdir('/clickhouse') + print(hdfs_objects) + assert len(hdfs_objects) == FILES_OVERHEAD + files_per_part + + values2 = generate_values('2020-01-04', 4096) + node.query("INSERT INTO hdfs_test VALUES {}".format(values2)) + assert node.query("SELECT * FROM hdfs_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2 + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + files_per_part * 2 + + assert node.query("SELECT count(*) FROM hdfs_test where id = 1 FORMAT Values") == "(2)" + + +def test_alter_table_columns(cluster): + create_table(cluster, "hdfs_test") + + node = cluster.instances["node"] + fs = HdfsClient(hosts='localhost') + + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1))) + + node.query("ALTER TABLE hdfs_test ADD COLUMN col1 UInt64 DEFAULT 1") + # To ensure parts have merged + node.query("OPTIMIZE TABLE hdfs_test") + + assert node.query("SELECT sum(col1) FROM hdfs_test FORMAT Values") == "(8192)" + assert node.query("SELECT sum(col1) FROM hdfs_test WHERE id > 0 FORMAT Values") == "(4096)" + wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN) + + node.query("ALTER TABLE hdfs_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2}) + + assert node.query("SELECT distinct(col1) FROM hdfs_test FORMAT Values") == "('1')" + # and file with mutation + wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1) + + node.query("ALTER TABLE hdfs_test DROP COLUMN col1", settings={"mutations_sync": 2}) + + # and 2 files with mutations + wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2) + + +def test_attach_detach_partition(cluster): + create_table(cluster, "hdfs_test") + + node = cluster.instances["node"] + fs = HdfsClient(hosts='localhost') + + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096))) + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + node.query("ALTER TABLE hdfs_test DETACH PARTITION '2020-01-03'") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(4096)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + node.query("ALTER TABLE hdfs_test ATTACH PARTITION '2020-01-03'") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + node.query("ALTER TABLE hdfs_test DROP PARTITION '2020-01-03'") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(4096)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + + node.query("ALTER TABLE hdfs_test DETACH PARTITION '2020-01-04'") + node.query("ALTER TABLE hdfs_test DROP DETACHED PARTITION '2020-01-04'", settings={"allow_drop_detached": 1}) + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(0)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + + +def test_move_partition_to_another_disk(cluster): + create_table(cluster, "hdfs_test") + + node = cluster.instances["node"] + fs = HdfsClient(hosts='localhost') + + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096))) + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + + node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-04' TO DISK 'hdfs'") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + +def test_table_manipulations(cluster): + create_table(cluster, "hdfs_test") + + node = cluster.instances["node"] + fs = HdfsClient(hosts='localhost') + + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096))) + + node.query("RENAME TABLE hdfs_test TO hdfs_renamed") + assert node.query("SELECT count(*) FROM hdfs_renamed FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + node.query("RENAME TABLE hdfs_renamed TO hdfs_test") + assert node.query("CHECK TABLE hdfs_test FORMAT Values") == "(1)" + + node.query("DETACH TABLE hdfs_test") + node.query("ATTACH TABLE hdfs_test") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2 + + node.query("TRUNCATE TABLE hdfs_test") + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(0)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + + +def test_move_replace_partition_to_another_table(cluster): + create_table(cluster, "hdfs_test") + + node = cluster.instances["node"] + fs = HdfsClient(hosts='localhost') + + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-05', 4096, -1))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-06', 4096, -1))) + assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4 + + create_table(cluster, "hdfs_clone") + + node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-03' TO TABLE hdfs_clone") + node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-05' TO TABLE hdfs_clone") + assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)" + assert node.query("SELECT sum(id) FROM hdfs_clone FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_clone FORMAT Values") == "(8192)" + + # Number of objects in HDFS should be unchanged. + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4 + + # Add new partitions to source table, but with different values and replace them from copied table. + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1))) + node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-05', 4096))) + assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)" + + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 6 + + node.query("ALTER TABLE hdfs_test REPLACE PARTITION '2020-01-03' FROM hdfs_clone") + node.query("ALTER TABLE hdfs_test REPLACE PARTITION '2020-01-05' FROM hdfs_clone") + assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)" + assert node.query("SELECT sum(id) FROM hdfs_clone FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_clone FORMAT Values") == "(8192)" + + # Wait for outdated partitions deletion. + print(1) + wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4) + + node.query("DROP TABLE hdfs_clone NO DELAY") + assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)" + assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)" + + # Data should remain in hdfs + hdfs_objects = fs.listdir('/clickhouse') + assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4 +