From 49106f407ffbc8cacca0f71babddbf81934230ec Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 19 Oct 2021 09:34:06 +0300 Subject: [PATCH] Fix some checks --- src/Compression/CachedCompressedReadBuffer.h | 6 +++++- src/Disks/ReadBufferFromRemoteFSGather.cpp | 18 +++++++++--------- src/Disks/ReadBufferFromRemoteFSGather.h | 4 ++-- src/Storages/MergeTree/MergeTreeReadPool.cpp | 2 +- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index eb45667dc25..86c7d3d1ce4 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -59,7 +59,11 @@ public: clock_type = clock_type_; } - void setReadUntilPosition(size_t position) override { file_in->setReadUntilPosition(position); } + void setReadUntilPosition(size_t position) override + { + if (file_in) + file_in->setReadUntilPosition(position); + } }; } diff --git a/src/Disks/ReadBufferFromRemoteFSGather.cpp b/src/Disks/ReadBufferFromRemoteFSGather.cpp index f8246f41340..ff9231bf9d3 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/ReadBufferFromRemoteFSGather.cpp @@ -28,24 +28,24 @@ namespace ErrorCodes #if USE_AWS_S3 -SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t last_offset) const +SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t offset) const { return std::make_unique(client_ptr, bucket, - fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, last_offset); + fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, offset); } #endif -SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t last_offset) const +SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t offset) const { - return std::make_unique(fs::path(uri) / path, context, settings, threadpool_read, last_offset); + return std::make_unique(fs::path(uri) / path, context, settings, threadpool_read, offset); } #if USE_HDFS -SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t last_offset) const +SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t offset) const { - return std::make_unique(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, last_offset); + return std::make_unique(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, offset); } #endif @@ -53,7 +53,7 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const String & path_) : ReadBuffer(nullptr, 0) , metadata(metadata_) - , path(path_) + , canonical_path(path_) { } @@ -147,7 +147,7 @@ bool ReadBufferFromRemoteFSGather::readImpl() if (bytes_to_ignore) current_buf->ignore(bytes_to_ignore); - LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", path); + LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", canonical_path); auto result = current_buf->next(); swap(*current_buf); @@ -182,7 +182,7 @@ void ReadBufferFromRemoteFSGather::reset() String ReadBufferFromRemoteFSGather::getFileName() const { - return path; + return canonical_path; // if (current_buf) // return fs::path(metadata.metadata_file_path) / metadata.remote_fs_objects[buf_idx].first; // return metadata.metadata_file_path; diff --git a/src/Disks/ReadBufferFromRemoteFSGather.h b/src/Disks/ReadBufferFromRemoteFSGather.h index 70d3adc8fb1..72088012aeb 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/ReadBufferFromRemoteFSGather.h @@ -37,7 +37,7 @@ public: size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); protected: - virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const = 0; + virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t offset) const = 0; RemoteMetadata metadata; @@ -60,7 +60,7 @@ private: size_t last_offset = 0; - String path; + String canonical_path; }; diff --git a/src/Storages/MergeTree/MergeTreeReadPool.cpp b/src/Storages/MergeTree/MergeTreeReadPool.cpp index 8f0f20c7d2c..326ad843e7e 100644 --- a/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -48,7 +48,7 @@ MergeTreeReadPool::MergeTreeReadPool( do_not_steal_tasks = true; min_marks_for_concurrent_read = std::max(min_marks_for_concurrent_read, sum_marks_ / threads_); } - fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_); + fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read); }