diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 41ae41b38d4..f9645875a6c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -262,6 +262,7 @@ M(RemoteFSNewReaders, "Number of created impl objects") \ M(RemoteFSAsyncBuffers, "Total number of AsycnhronousReadIndirectBufferFromREmoteFS buffers") \ M(RemoteFSSimpleBuffers, "Total number of ReadIndirectBufferFromREmoteFS buffers") \ + M(RemoteFSRedundantlyReadBytes, "") \ \ M(SleepFunctionCalls, "Number of times a sleep function (sleep, sleepEachRow) has been called.") \ M(SleepFunctionMicroseconds, "Time spent sleeping due to a sleep function call.") \ diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index 4a583773b4b..c19e854dd45 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -28,6 +28,12 @@ void CachedCompressedReadBuffer::initInput() } +void CachedCompressedReadBuffer::prefetch() +{ + file_in->prefetch(); +} + + bool CachedCompressedReadBuffer::nextImpl() { /// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists. diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index bb24f699eed..6eedf66a487 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -33,8 +33,11 @@ private: UncompressedCache::MappedPtr owned_cell; void initInput(); + bool nextImpl() override; + void prefetch() override; + /// Passed into file_in. ReadBufferFromFileBase::ProfileCallback profile_callback; clockid_t clock_type {}; diff --git a/src/Compression/CompressedReadBufferBase.cpp b/src/Compression/CompressedReadBufferBase.cpp index 81e49e445a7..b0609b9ba26 100644 --- a/src/Compression/CompressedReadBufferBase.cpp +++ b/src/Compression/CompressedReadBufferBase.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -107,6 +108,13 @@ static void validateChecksum(char * data, size_t size, const Checksum expected_c } +void CompressedReadBufferBase::setRightOffset(size_t offset) +{ + if (auto * async_in = dynamic_cast(compressed_in)) + async_in->setRightOffset(offset); +} + + /// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need. /// Returns number of compressed bytes read. size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum, bool always_copy) diff --git a/src/Compression/CompressedReadBufferBase.h b/src/Compression/CompressedReadBufferBase.h index c32a169aecc..0f788ec445d 100644 --- a/src/Compression/CompressedReadBufferBase.h +++ b/src/Compression/CompressedReadBufferBase.h @@ -60,6 +60,12 @@ public: disable_checksum = true; } + /** + * For asynchronous range reading from remote fs need to update last offset for current task, + * when newer tasks read behind previous task last mark. + */ + void setRightOffset(size_t offset); + public: CompressionCodecPtr codec; }; diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index 2cfd6d65c1c..d5aae38cf34 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -44,12 +44,6 @@ bool CompressedReadBufferFromFile::nextImpl() } -void CompressedReadBufferFromFile::prefetch() -{ - file_in.prefetch(); -} - - CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr buf, bool allow_different_codecs_) : BufferWithOwnMemory(0), p_file_in(std::move(buf)), file_in(*p_file_in) { @@ -72,6 +66,12 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile( } +void CompressedReadBufferFromFile::prefetch() +{ + file_in.prefetch(); +} + + void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block) { /// Nothing to do if we already at required position diff --git a/src/Compression/CompressedReadBufferFromFile.h b/src/Compression/CompressedReadBufferFromFile.h index 5f027851da3..125e80a0078 100644 --- a/src/Compression/CompressedReadBufferFromFile.h +++ b/src/Compression/CompressedReadBufferFromFile.h @@ -42,6 +42,7 @@ private: /* size_t nextimpl_working_buffer_offset; */ bool nextImpl() override; + void prefetch() override; public: @@ -61,6 +62,7 @@ public: { file_in.setProfileCallback(profile_callback_, clock_type_); } + }; } diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index d6fae0aa7dc..148e34cf9c5 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -168,7 +168,7 @@ std::unique_ptr DiskWebServer::readFile(const String & p bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool; - auto web_impl = std::make_unique(url, meta, getContext(), threadpool_read, read_settings); + auto web_impl = std::make_unique(path, url, meta, getContext(), threadpool_read, read_settings); if (threadpool_read) { diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index 9ed861a5f34..cb3e1e00277 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -73,7 +73,7 @@ std::unique_ptr DiskHDFS::readFile(const String & path, "Read from file by path: {}. Existing HDFS objects: {}", backQuote(metadata_path + path), metadata.remote_fs_objects.size()); - auto hdfs_impl = std::make_unique(config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size); + auto hdfs_impl = std::make_unique(path, config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size); if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool) { diff --git a/src/Disks/ReadBufferFromRemoteFSGather.cpp b/src/Disks/ReadBufferFromRemoteFSGather.cpp index 5ffb8b9f589..67ba4448d20 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/ReadBufferFromRemoteFSGather.cpp @@ -12,6 +12,7 @@ #include #endif +#include #include #include @@ -27,31 +28,32 @@ namespace ErrorCodes #if USE_AWS_S3 -SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t last_offset) const { return std::make_unique(client_ptr, bucket, - fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read); + fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, last_offset); } #endif -SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t last_offset) const { - return std::make_unique(fs::path(uri) / path, context, settings, threadpool_read); + return std::make_unique(fs::path(uri) / path, context, settings, threadpool_read, last_offset); } #if USE_HDFS -SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t last_offset) const { - return std::make_unique(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size); + return std::make_unique(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, last_offset); } #endif -ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_) +ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const String & path_) : ReadBuffer(nullptr, 0) , metadata(metadata_) + , path(path_) { } @@ -91,7 +93,7 @@ void ReadBufferFromRemoteFSGather::initialize() /// Do not create a new buffer if we already have what we need. if (!current_buf || buf_idx != i) { - current_buf = createImplementationBuffer(file_path); + current_buf = createImplementationBuffer(file_path, last_offset); buf_idx = i; } @@ -126,8 +128,8 @@ bool ReadBufferFromRemoteFSGather::nextImpl() ++current_buf_idx; - const auto & path = metadata.remote_fs_objects[current_buf_idx].first; - current_buf = createImplementationBuffer(path); + const auto & current_path = metadata.remote_fs_objects[current_buf_idx].first; + current_buf = createImplementationBuffer(current_path, last_offset); return readImpl(); } @@ -145,6 +147,7 @@ bool ReadBufferFromRemoteFSGather::readImpl() if (bytes_to_ignore) current_buf->ignore(bytes_to_ignore); + LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", path); auto result = current_buf->next(); swap(*current_buf); @@ -158,8 +161,17 @@ bool ReadBufferFromRemoteFSGather::readImpl() void ReadBufferFromRemoteFSGather::seek(off_t offset) { + current_buf.reset(); absolute_position = offset; - initialize(); + // initialize(); +} + + +void ReadBufferFromRemoteFSGather::setRightOffset(size_t offset) +{ + assert(last_offset < offset); + current_buf.reset(); + last_offset = offset; } @@ -168,4 +180,13 @@ void ReadBufferFromRemoteFSGather::reset() current_buf.reset(); } + +String ReadBufferFromRemoteFSGather::getFileName() const +{ + return path; + // if (current_buf) + // return fs::path(metadata.metadata_file_path) / metadata.remote_fs_objects[buf_idx].first; + // return metadata.metadata_file_path; +} + } diff --git a/src/Disks/ReadBufferFromRemoteFSGather.h b/src/Disks/ReadBufferFromRemoteFSGather.h index efd3d7a2483..3285d7190d2 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/ReadBufferFromRemoteFSGather.h @@ -13,28 +13,31 @@ namespace Aws namespace S3 { class S3Client; -}} +} +} + namespace DB { class ReadBufferFromRemoteFSGather : public ReadBuffer { -friend class ThreadPoolRemoteFSReader; friend class ReadIndirectBufferFromRemoteFS; public: - explicit ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_); + explicit ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const String & path_); - String getFileName() const { return metadata.metadata_file_path; } + String getFileName() const; void reset(); void seek(off_t offset); /// SEEK_SET only. -protected: + void setRightOffset(size_t offset); + size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); - virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0; +protected: + virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const = 0; RemoteMetadata metadata; @@ -54,6 +57,10 @@ private: size_t buf_idx = 0; size_t bytes_to_ignore = 0; + + size_t last_offset = 0; + + String path; }; @@ -63,13 +70,14 @@ class ReadBufferFromS3Gather final : public ReadBufferFromRemoteFSGather { public: ReadBufferFromS3Gather( + const String & path_, std::shared_ptr client_ptr_, const String & bucket_, IDiskRemote::Metadata metadata_, size_t max_single_read_retries_, const ReadSettings & settings_, bool threadpool_read_ = false) - : ReadBufferFromRemoteFSGather(metadata_) + : ReadBufferFromRemoteFSGather(metadata_, path_) , client_ptr(std::move(client_ptr_)) , bucket(bucket_) , max_single_read_retries(max_single_read_retries_) @@ -78,7 +86,7 @@ public: { } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override; private: std::shared_ptr client_ptr; @@ -94,12 +102,13 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather { public: ReadBufferFromWebServerGather( + const String & path_, const String & uri_, RemoteMetadata metadata_, ContextPtr context_, size_t threadpool_read_, const ReadSettings & settings_) - : ReadBufferFromRemoteFSGather(metadata_) + : ReadBufferFromRemoteFSGather(metadata_, path_) , uri(uri_) , context(context_) , threadpool_read(threadpool_read_) @@ -107,7 +116,7 @@ public: { } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override; private: String uri; @@ -123,11 +132,12 @@ class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather { public: ReadBufferFromHDFSGather( + const String & path_, const Poco::Util::AbstractConfiguration & config_, const String & hdfs_uri_, IDiskRemote::Metadata metadata_, size_t buf_size_) - : ReadBufferFromRemoteFSGather(metadata_) + : ReadBufferFromRemoteFSGather(metadata_, path_) , config(config_) , buf_size(buf_size_) { @@ -136,7 +146,7 @@ public: hdfs_uri = hdfs_uri_.substr(0, begin_of_path); } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const override; private: const Poco::Util::AbstractConfiguration & config; diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index c22bc32c84b..b6c94ca0802 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -205,6 +205,7 @@ std::unique_ptr DiskS3::readFile(const String & path, co bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool; auto s3_impl = std::make_unique( + path, settings->client, bucket, metadata, settings->s3_max_single_read_retries, read_settings, threadpool_read); diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 5eabbfff5f8..1db9c3938c6 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -200,27 +200,27 @@ void registerDiskS3(DiskFactory & factory) s3disk->startup(); - bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true); + // bool cache_enabled = config.getBool(config_prefix + ".cache_enabled", true); - if (cache_enabled) - { - String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/"); + // if (cache_enabled) + // { + // String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/"); - if (metadata_path == cache_path) - throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS); + // if (metadata_path == cache_path) + // throw Exception("Metadata and cache path should be different: " + metadata_path, ErrorCodes::BAD_ARGUMENTS); - auto cache_disk = std::make_shared("s3-cache", cache_path, 0); - auto cache_file_predicate = [] (const String & path) - { - return path.ends_with("idx") // index files. - || path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files. - || path.ends_with("txt") || path.ends_with("dat"); - }; + // auto cache_disk = std::make_shared("s3-cache", cache_path, 0); + // auto cache_file_predicate = [] (const String & path) + // { + // return path.ends_with("idx") // index files. + // || path.ends_with("mrk") || path.ends_with("mrk2") || path.ends_with("mrk3") // mark files. + // || path.ends_with("txt") || path.ends_with("dat"); + // }; - s3disk = std::make_shared(s3disk, cache_disk, cache_file_predicate); - } + // s3disk = std::make_shared(s3disk, cache_disk, cache_file_predicate); + // } - return std::make_shared(s3disk); + return s3disk; }; factory.registerDiskType("s3", creator); } diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index c69eb893663..9dd3aeb4625 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -2,6 +2,7 @@ #include #include +#include namespace CurrentMetrics @@ -35,14 +36,16 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe Int32 priority_, std::shared_ptr impl_, size_t buf_size_, - size_t min_bytes_for_seek_) + size_t /* min_bytes_for_seek_ */) : ReadBufferFromFileBase(buf_size_, nullptr, 0) , reader(reader_) , priority(priority_) , impl(impl_) , prefetch_buffer(buf_size_) - , min_bytes_for_seek(min_bytes_for_seek_) + // , min_bytes_for_seek(min_bytes_for_seek_) { + ProfileEvents::increment(ProfileEvents::RemoteFSAsyncBuffers); + buffer_events += impl->getFileName() + " : "; } @@ -66,15 +69,31 @@ std::future AsynchronousReadIndirectBufferFromRemot void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() { - if (hasPendingData()) - return; + if (hasPendingData()) + return; + if (prefetch_future.valid()) + return; + + prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); + ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); + buffer_events += "-- Prefetch (" + toString(absolute_position) + ") --"; +} + + +void AsynchronousReadIndirectBufferFromRemoteFS::setRightOffset(size_t offset) +{ + buffer_events += "-- Set last offset " + toString(offset) + "--"; if (prefetch_future.valid()) - return; + { + buffer_events += "-- Cancelling because of offset update --"; + ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches); + prefetch_future.wait(); + prefetch_future = {}; + } - prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); - ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches); - buffer_events += "-- Prefetch --"; + last_offset = offset; + impl->setRightOffset(offset); } @@ -86,7 +105,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() if (prefetch_future.valid()) { ProfileEvents::increment(ProfileEvents::RemoteFSPrefetchReads); - buffer_events += "-- Read from prefetch --"; CurrentMetrics::Increment metric_increment{CurrentMetrics::AsynchronousReadWait}; Stopwatch watch; @@ -100,13 +118,17 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() absolute_position += size; } } + + buffer_events += fmt::format("-- Read from prefetch from offset: {}, upper bound: {}, actually read: {} --", + toString(absolute_position), toString(last_offset), toString(size)); watch.stop(); ProfileEvents::increment(ProfileEvents::AsynchronousReadWaitMicroseconds, watch.elapsedMicroseconds()); } else { - buffer_events += "-- Read without prefetch --"; size = readInto(memory.data(), memory.size()).get(); + buffer_events += fmt::format("-- Read without prefetch from offset: {}, upper bound: {}, actually read: {} --", + toString(absolute_position), toString(last_offset), toString(size)); if (size) { set(memory.data(), memory.size()); @@ -115,7 +137,6 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl() } } - buffer_events += " + " + toString(size) + " + "; prefetch_future = {}; return size; } @@ -163,6 +184,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence if (prefetch_future.valid()) { + buffer_events += "-- cancelling prefetch because of seek --"; ProfileEvents::increment(ProfileEvents::RemoteFSSeekCancelledPrefetches); prefetch_future.wait(); prefetch_future = {}; @@ -170,16 +192,18 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence pos = working_buffer.end(); - if (static_cast(absolute_position) >= getPosition() - && static_cast(absolute_position) < getPosition() + static_cast(min_bytes_for_seek)) - { - /** - * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. - */ - bytes_to_ignore = absolute_position - getPosition(); - } - else + // if (static_cast(absolute_position) >= getPosition() + // && static_cast(absolute_position) < getPosition() + static_cast(min_bytes_for_seek)) + // { + // /** + // * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. + // */ + // // bytes_to_ignore = absolute_position - getPosition(); + // impl->seek(absolute_position); /// SEEK_SET. + // } + // else { + buffer_events += "-- Impl seek --"; impl->seek(absolute_position); /// SEEK_SET. } @@ -189,14 +213,14 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence void AsynchronousReadIndirectBufferFromRemoteFS::finalize() { - std::cerr << "\n\n\nBuffer events: " << buffer_events << std::endl; - if (prefetch_future.valid()) { + buffer_events += "-- cancelling prefetch in finalize --"; ProfileEvents::increment(ProfileEvents::RemoteFSUnusedCancelledPrefetches); prefetch_future.wait(); prefetch_future = {}; } + std::cerr << "Buffer events: " << buffer_events << std::endl; } diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index 3c600562252..af67efe1218 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -47,6 +47,8 @@ public: void prefetch() override; + void setRightOffset(size_t offset); + private: bool nextImpl() override; @@ -68,8 +70,10 @@ private: String buffer_events; - size_t min_bytes_for_seek; + // size_t min_bytes_for_seek; + size_t bytes_to_ignore = 0; + Int64 last_offset = 0; }; } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index bf974440be2..40bda7d6bfe 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -32,7 +32,7 @@ namespace ErrorCodes ReadBufferFromS3::ReadBufferFromS3( std::shared_ptr client_ptr_, const String & bucket_, const String & key_, - UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_) + UInt64 max_single_read_retries_, const ReadSettings & settings_, bool use_external_buffer_, size_t last_offset_) : SeekableReadBuffer(nullptr, 0) , client_ptr(std::move(client_ptr_)) , bucket(bucket_) @@ -40,11 +40,22 @@ ReadBufferFromS3::ReadBufferFromS3( , max_single_read_retries(max_single_read_retries_) , read_settings(settings_) , use_external_buffer(use_external_buffer_) + , last_offset(last_offset_) { } bool ReadBufferFromS3::nextImpl() { + if (last_offset) + { + if (static_cast(last_offset) == offset) + { + impl.reset(); + working_buffer.resize(0); + return false; + } + } + bool next_result = false; /// `impl` has been initialized earlier and now we're at the end of the current portion of data. @@ -162,16 +173,17 @@ std::unique_ptr ReadBufferFromS3::initialize() req.SetBucket(bucket); req.SetKey(key); - auto right_offset = read_settings.remote_read_right_offset; - if (right_offset) + // auto right_offset = read_settings.remote_read_right_offset; + + if (last_offset) { - req.SetRange(fmt::format("bytes={}-{}", offset, right_offset)); - LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, right_offset); + req.SetRange(fmt::format("bytes={}-{}", offset, last_offset - 1)); + LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Range: {}-{}", bucket, key, offset, last_offset - 1); } else { req.SetRange(fmt::format("bytes={}-", offset)); - LOG_TRACE(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset); + LOG_DEBUG(log, "Read S3 object. Bucket: {}, Key: {}, Offset: {}", bucket, key, offset); } Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req); diff --git a/src/IO/ReadBufferFromS3.h b/src/IO/ReadBufferFromS3.h index b27de8aa0b4..336893ca5b0 100644 --- a/src/IO/ReadBufferFromS3.h +++ b/src/IO/ReadBufferFromS3.h @@ -45,8 +45,10 @@ public: const String & key_, UInt64 max_single_read_retries_, const ReadSettings & settings_, - bool use_external_buffer = false); + bool use_external_buffer = false, + size_t last_offset_ = 0); + size_t right = 0; bool nextImpl() override; off_t seek(off_t off, int whence) override; @@ -57,6 +59,7 @@ private: ReadSettings read_settings; bool use_external_buffer; + size_t last_offset; }; } diff --git a/src/IO/ReadBufferFromWebServer.cpp b/src/IO/ReadBufferFromWebServer.cpp index bfbca078248..41b4c80b2c8 100644 --- a/src/IO/ReadBufferFromWebServer.cpp +++ b/src/IO/ReadBufferFromWebServer.cpp @@ -23,7 +23,7 @@ namespace ErrorCodes static constexpr size_t HTTP_MAX_TRIES = 10; ReadBufferFromWebServer::ReadBufferFromWebServer( - const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_) + const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_, size_t) : SeekableReadBuffer(nullptr, 0) , log(&Poco::Logger::get("ReadBufferFromWebServer")) , context(context_) @@ -108,7 +108,7 @@ void ReadBufferFromWebServer::initializeWithRetry() if (i == num_tries - 1) throw; - LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), e.what()); + LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), "Error: {}, code: {}", e.what(), e.code()); sleepForMilliseconds(milliseconds_to_wait); milliseconds_to_wait *= 2; } diff --git a/src/IO/ReadBufferFromWebServer.h b/src/IO/ReadBufferFromWebServer.h index 780a4b16442..c4d847b9f39 100644 --- a/src/IO/ReadBufferFromWebServer.h +++ b/src/IO/ReadBufferFromWebServer.h @@ -20,7 +20,8 @@ public: explicit ReadBufferFromWebServer( const String & url_, ContextPtr context_, const ReadSettings & settings_ = {}, - bool use_external_buffer_ = false); + bool use_external_buffer_ = false, + size_t last_offset = 0); bool nextImpl() override; diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index 96d67ad0e08..f24705d7f65 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -125,7 +125,7 @@ ReadBufferFromHDFS::ReadBufferFromHDFS( const String & hdfs_uri_, const String & hdfs_file_path_, const Poco::Util::AbstractConfiguration & config_, - size_t buf_size_) + size_t buf_size_, size_t) : SeekableReadBuffer(nullptr, 0) , impl(std::make_unique(hdfs_uri_, hdfs_file_path_, config_, buf_size_)) { diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h index 035a55bd0fa..38c8047ba93 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/HDFS/ReadBufferFromHDFS.h @@ -27,7 +27,9 @@ struct ReadBufferFromHDFSImpl; public: ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_, - const Poco::Util::AbstractConfiguration & config_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + const Poco::Util::AbstractConfiguration & config_, + size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, + size_t last_offset = 0); ~ReadBufferFromHDFS() override; diff --git a/src/Storages/MergeTree/IMergeTreeReader.h b/src/Storages/MergeTree/IMergeTreeReader.h index 696cc2f105b..bcb51f2fce6 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.h +++ b/src/Storages/MergeTree/IMergeTreeReader.h @@ -31,7 +31,8 @@ public: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - virtual size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0; + virtual size_t readRows(size_t from_mark, size_t current_task_last_mark, + bool continue_reading, size_t max_rows_to_read, Columns & res_columns) = 0; virtual bool canReadIncompleteGranules() const = 0; diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 2f6bc10e472..31e8fe6454f 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -55,8 +55,11 @@ static void filterColumns(Columns & columns, const ColumnPtr & filter) MergeTreeRangeReader::DelayedStream::DelayedStream( - size_t from_mark, IMergeTreeReader * merge_tree_reader_) + size_t from_mark, + size_t current_task_last_mark_, + IMergeTreeReader * merge_tree_reader_) : current_mark(from_mark), current_offset(0), num_delayed_rows(0) + , current_task_last_mark(current_task_last_mark_) , merge_tree_reader(merge_tree_reader_) , index_granularity(&(merge_tree_reader->data_part->index_granularity)) , continue_reading(false), is_finished(false) @@ -73,7 +76,8 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Columns & columns, size_t n { if (num_rows) { - size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, columns); + size_t rows_read = merge_tree_reader->readRows( + current_mark, current_task_last_mark, continue_reading, num_rows, columns); continue_reading = true; /// Zero rows_read maybe either because reading has finished @@ -151,13 +155,13 @@ size_t MergeTreeRangeReader::DelayedStream::finalize(Columns & columns) MergeTreeRangeReader::Stream::Stream( - size_t from_mark, size_t to_mark, IMergeTreeReader * merge_tree_reader_) + size_t from_mark, size_t to_mark, size_t current_task_last_mark, IMergeTreeReader * merge_tree_reader_) : current_mark(from_mark), offset_after_current_mark(0) , last_mark(to_mark) , merge_tree_reader(merge_tree_reader_) , index_granularity(&(merge_tree_reader->data_part->index_granularity)) , current_mark_index_granularity(index_granularity->getMarkRows(from_mark)) - , stream(from_mark, merge_tree_reader) + , stream(from_mark, current_task_last_mark, merge_tree_reader) { size_t marks_count = index_granularity->getMarksCount(); if (from_mark >= marks_count) @@ -280,9 +284,9 @@ void MergeTreeRangeReader::ReadResult::adjustLastGranule() throw Exception("Can't adjust last granule because no granules were added.", ErrorCodes::LOGICAL_ERROR); if (num_rows_to_subtract > rows_per_granule.back()) - throw Exception("Can't adjust last granule because it has " + toString(rows_per_granule.back()) - + " rows, but try to subtract " + toString(num_rows_to_subtract) + " rows.", - ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Can't adjust last granule because it has {} rows, but try to subtract {} rows.", + toString(rows_per_granule.back()), toString(num_rows_to_subtract)); rows_per_granule.back() -= num_rows_to_subtract; total_rows_per_granule -= num_rows_to_subtract; @@ -750,6 +754,16 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t ReadResult result; result.columns.resize(merge_tree_reader->getColumns().size()); + auto current_task_last_mark_range = std::max_element(ranges.begin(), ranges.end(), + [&](const MarkRange & range1, const MarkRange & range2) + { + return range1.end < range2.end; + }); + + size_t current_task_last_mark = 0; + if (current_task_last_mark_range != ranges.end()) + current_task_last_mark = current_task_last_mark_range->end; + /// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to /// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than /// result.num_rows_read if the last granule in range also the last in part (so we have to adjust last granule). @@ -760,7 +774,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t if (stream.isFinished()) { result.addRows(stream.finalize(result.columns)); - stream = Stream(ranges.front().begin, ranges.front().end, merge_tree_reader); + stream = Stream(ranges.front().begin, ranges.front().end, current_task_last_mark, merge_tree_reader); result.addRange(ranges.front()); ranges.pop_front(); } @@ -818,7 +832,7 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t & num_rows += stream.finalize(columns); const auto & range = started_ranges[next_range_to_start].range; ++next_range_to_start; - stream = Stream(range.begin, range.end, merge_tree_reader); + stream = Stream(range.begin, range.end, 0, merge_tree_reader); } bool last = i + 1 == size; diff --git a/src/Storages/MergeTree/MergeTreeRangeReader.h b/src/Storages/MergeTree/MergeTreeRangeReader.h index d099d2475d2..c913b476b73 100644 --- a/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -62,7 +62,7 @@ public: { public: DelayedStream() = default; - DelayedStream(size_t from_mark, IMergeTreeReader * merge_tree_reader); + DelayedStream(size_t from_mark, size_t current_task_last_mark_, IMergeTreeReader * merge_tree_reader); /// Read @num_rows rows from @from_mark starting from @offset row /// Returns the number of rows added to block. @@ -81,6 +81,8 @@ public: size_t current_offset = 0; /// Num of rows we have to read size_t num_delayed_rows = 0; + /// Last mark from all ranges of current task. + size_t current_task_last_mark = 0; /// Actual reader of data from disk IMergeTreeReader * merge_tree_reader = nullptr; @@ -99,7 +101,8 @@ public: { public: Stream() = default; - Stream(size_t from_mark, size_t to_mark, IMergeTreeReader * merge_tree_reader); + Stream(size_t from_mark, size_t to_mark, + size_t current_task_last_mark, IMergeTreeReader * merge_tree_reader); /// Returns the number of rows added to block. size_t read(Columns & columns, size_t num_rows, bool skip_remaining_rows_in_current_granule); @@ -122,6 +125,7 @@ public: /// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity size_t offset_after_current_mark = 0; + /// Last mark in current range. size_t last_mark = 0; IMergeTreeReader * merge_tree_reader = nullptr; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index c898874f737..35da14319ba 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -121,7 +121,8 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( } } -size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) +size_t MergeTreeReaderCompact::readRows( + size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { if (continue_reading) from_mark = next_mark; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.h b/src/Storages/MergeTree/MergeTreeReaderCompact.h index dbfaa7868fa..5a419a23642 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -32,7 +32,8 @@ public: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; + size_t readRows(size_t from_mark, size_t current_task_last_mark, + bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; bool canReadIncompleteGranules() const override { return false; } diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp index 3e81fec5145..8a69183e858 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.cpp @@ -37,7 +37,8 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory( } } -size_t MergeTreeReaderInMemory::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) +size_t MergeTreeReaderInMemory::readRows( + size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { if (!continue_reading) total_rows_read = 0; diff --git a/src/Storages/MergeTree/MergeTreeReaderInMemory.h b/src/Storages/MergeTree/MergeTreeReaderInMemory.h index 4526b19c4a8..ff6eb92d9c3 100644 --- a/src/Storages/MergeTree/MergeTreeReaderInMemory.h +++ b/src/Storages/MergeTree/MergeTreeReaderInMemory.h @@ -23,7 +23,8 @@ public: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; + size_t readRows(size_t from_mark, size_t current_tasl_last_mark, + bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; bool canReadIncompleteGranules() const override { return true; } diff --git a/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/src/Storages/MergeTree/MergeTreeReaderStream.cpp index 04b1411d939..707a8c85c73 100644 --- a/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -1,6 +1,7 @@ #include #include +#include #include @@ -12,68 +13,38 @@ namespace ErrorCodes extern const int ARGUMENT_OUT_OF_BOUND; } - MergeTreeReaderStream::MergeTreeReaderStream( DiskPtr disk_, const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, const MarkRanges & all_mark_ranges, const MergeTreeReaderSettings & settings, MarkCache * mark_cache_, - UncompressedCache * uncompressed_cache, size_t file_size, + UncompressedCache * uncompressed_cache, size_t file_size_, const MergeTreeIndexGranularityInfo * index_granularity_info_, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) - : disk(std::move(disk_)), path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_) - , mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache) - , index_granularity_info(index_granularity_info_) - , marks_loader(disk, mark_cache, index_granularity_info->getMarksFilePath(path_prefix), - marks_count, *index_granularity_info, save_marks_in_cache) + : disk(std::move(disk_)) + , path_prefix(path_prefix_) + , data_file_extension(data_file_extension_) + , marks_count(marks_count_) + , file_size(file_size_) + , mark_cache(mark_cache_) + , save_marks_in_cache(settings.save_marks_in_cache) + , index_granularity_info(index_granularity_info_) + , marks_loader(disk, mark_cache, index_granularity_info->getMarksFilePath(path_prefix), + marks_count, *index_granularity_info, save_marks_in_cache) { /// Compute the size of the buffer. size_t max_mark_range_bytes = 0; size_t sum_mark_range_bytes = 0; - /// Rightmost bound to read. - size_t right_bound = 0; for (const auto & mark_range : all_mark_ranges) { size_t left_mark = mark_range.begin; size_t right_mark = mark_range.end; - - /// NOTE: if we are reading the whole file, then right_mark == marks_count - /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks. - - /// If the end of range is inside the block, we will need to read it too. - if (right_mark < marks_count && marks_loader.getMark(right_mark).offset_in_decompressed_block > 0) - { - auto indices = collections::range(right_mark, marks_count); - auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, [this](size_t i, size_t j) - { - return marks_loader.getMark(i).offset_in_compressed_file < marks_loader.getMark(j).offset_in_compressed_file; - }); - - right_mark = (it == indices.end() ? marks_count : *it); - } - - size_t mark_range_bytes; - size_t current_right_offset; - - /// If there are no marks after the end of range, just use file size - if (right_mark >= marks_count - || (right_mark + 1 == marks_count - && marks_loader.getMark(right_mark).offset_in_compressed_file == marks_loader.getMark(mark_range.end).offset_in_compressed_file)) - { - mark_range_bytes = file_size - (left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0); - current_right_offset = file_size; - } - else - { - mark_range_bytes = marks_loader.getMark(right_mark).offset_in_compressed_file - marks_loader.getMark(left_mark).offset_in_compressed_file; - current_right_offset = marks_loader.getMark(right_mark).offset_in_compressed_file; - } + auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark); max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes); sum_mark_range_bytes += mark_range_bytes; - right_bound = std::max(right_bound, current_right_offset); } /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. @@ -82,9 +53,6 @@ MergeTreeReaderStream::MergeTreeReaderStream( if (max_mark_range_bytes != 0) read_settings = read_settings.adjustBufferSize(max_mark_range_bytes); - /// Set bound for reading from remote disk. - read_settings.remote_read_right_offset = right_bound; - /// Initialize the objects that shall be used to perform read operations. if (uncompressed_cache) { @@ -128,6 +96,45 @@ MergeTreeReaderStream::MergeTreeReaderStream( } +std::pair MergeTreeReaderStream::getRightOffsetAndBytesRange(size_t left_mark, size_t right_mark) +{ + /// NOTE: if we are reading the whole file, then right_mark == marks_count + /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks. + + /// If the end of range is inside the block, we will need to read it too. + size_t result_right_mark = right_mark; + if (right_mark < marks_count && marks_loader.getMark(right_mark).offset_in_decompressed_block > 0) + { + auto indices = collections::range(right_mark, marks_count); + auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, [this](size_t i, size_t j) + { + return marks_loader.getMark(i).offset_in_compressed_file < marks_loader.getMark(j).offset_in_compressed_file; + }); + + result_right_mark = (it == indices.end() ? marks_count : *it); + } + + size_t right_offset; + size_t mark_range_bytes; + + /// If there are no marks after the end of range, just use file size + if (result_right_mark >= marks_count + || (result_right_mark + 1 == marks_count + && marks_loader.getMark(result_right_mark).offset_in_compressed_file == marks_loader.getMark(right_mark).offset_in_compressed_file)) + { + right_offset = file_size; + mark_range_bytes = right_offset - (left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0); + } + else + { + right_offset = marks_loader.getMark(result_right_mark).offset_in_compressed_file; + mark_range_bytes = right_offset - marks_loader.getMark(left_mark).offset_in_compressed_file; + } + + return std::make_pair(right_offset, mark_range_bytes); +} + + void MergeTreeReaderStream::seekToMark(size_t index) { MarkInCompressedFile mark = marks_loader.getMark(index); @@ -172,4 +179,18 @@ void MergeTreeReaderStream::seekToStart() } } + +void MergeTreeReaderStream::adjustForRange(size_t left_mark, size_t right_mark) +{ + auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark); + if (right_offset > last_right_offset) + { + last_right_offset = right_offset; + if (cached_buffer) + cached_buffer->setRightOffset(last_right_offset); + if (non_cached_buffer) + non_cached_buffer->setRightOffset(last_right_offset); + } +} + } diff --git a/src/Storages/MergeTree/MergeTreeReaderStream.h b/src/Storages/MergeTree/MergeTreeReaderStream.h index 32b9c45ccab..b8244d6252f 100644 --- a/src/Storages/MergeTree/MergeTreeReaderStream.h +++ b/src/Storages/MergeTree/MergeTreeReaderStream.h @@ -23,25 +23,32 @@ public: const MarkRanges & all_mark_ranges, const MergeTreeReaderSettings & settings_, MarkCache * mark_cache, UncompressedCache * uncompressed_cache, - size_t file_size, const MergeTreeIndexGranularityInfo * index_granularity_info_, + size_t file_size_, const MergeTreeIndexGranularityInfo * index_granularity_info_, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type); void seekToMark(size_t index); void seekToStart(); + void adjustForRange(size_t left_mark, size_t right_mark); + ReadBuffer * data_buffer; private: + std::pair getRightOffsetAndBytesRange(size_t left_mark, size_t right_mark); + DiskPtr disk; std::string path_prefix; std::string data_file_extension; size_t marks_count; + size_t file_size; MarkCache * mark_cache; bool save_marks_in_cache; + size_t last_right_offset = 0; + const MergeTreeIndexGranularityInfo * index_granularity_info; std::unique_ptr cached_buffer; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 206469da7be..a47563397be 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -61,7 +61,8 @@ MergeTreeReaderWide::MergeTreeReaderWide( } -size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) +size_t MergeTreeReaderWide::readRows( + size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) { size_t read_rows = 0; try @@ -87,7 +88,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si try { auto & cache = caches[column_from_part.getNameInStorage()]; - prefetch(column_from_part, from_mark, continue_reading, cache, prefetched_streams); + prefetch(column_from_part, from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams); } catch (Exception & e) { @@ -117,7 +118,7 @@ size_t MergeTreeReaderWide::readRows(size_t from_mark, bool continue_reading, si auto & cache = caches[column_from_part.getNameInStorage()]; readData( - column_from_part, column, from_mark, continue_reading, + column_from_part, column, from_mark, continue_reading, current_task_last_mark, max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty()); /// For elements of Nested, column_size_before_reading may be greater than column size @@ -199,6 +200,7 @@ static ReadBuffer * getStream( MergeTreeReaderWide::FileStreams & streams, const NameAndTypePair & name_and_type, size_t from_mark, bool seek_to_mark, + size_t current_task_last_mark, ISerialization::SubstreamsCache & cache) { /// If substream have already been read. @@ -212,6 +214,7 @@ static ReadBuffer * getStream( return nullptr; MergeTreeReaderStream & stream = *it->second; + stream.adjustForRange(seek_to_start ? 0 : from_mark, current_task_last_mark); if (seek_to_start) stream.seekToStart(); @@ -226,6 +229,7 @@ void MergeTreeReaderWide::prefetch( const NameAndTypePair & name_and_type, size_t from_mark, bool continue_reading, + size_t current_task_last_mark, ISerialization::SubstreamsCache & cache, std::unordered_set & prefetched_streams) { @@ -239,7 +243,7 @@ void MergeTreeReaderWide::prefetch( if (!prefetched_streams.count(stream_name)) { bool seek_to_mark = !continue_reading; - if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, cache)) + if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache)) buf->prefetch(); prefetched_streams.insert(stream_name); @@ -250,8 +254,8 @@ void MergeTreeReaderWide::prefetch( void MergeTreeReaderWide::readData( const NameAndTypePair & name_and_type, ColumnPtr & column, - size_t from_mark, bool continue_reading, size_t max_rows_to_read, - ISerialization::SubstreamsCache & cache, bool was_prefetched) + size_t from_mark, bool continue_reading, size_t current_task_last_mark, + size_t max_rows_to_read, ISerialization::SubstreamsCache & cache, bool was_prefetched) { double & avg_value_size_hint = avg_value_size_hints[name_and_type.name]; ISerialization::DeserializeBinaryBulkSettings deserialize_settings; @@ -264,7 +268,7 @@ void MergeTreeReaderWide::readData( { deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path) { - return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, cache); + return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, current_task_last_mark, cache); }; serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]); } @@ -275,7 +279,7 @@ void MergeTreeReaderWide::readData( return getStream( /* seek_to_start = */false, substream_path, streams, name_and_type, from_mark, - seek_to_mark, cache); + seek_to_mark, current_task_last_mark, cache); }; deserialize_settings.continuous_reading = continue_reading; auto & deserialize_state = deserialize_binary_bulk_state_map[name]; diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.h b/src/Storages/MergeTree/MergeTreeReaderWide.h index 08d743370a9..a71475acd60 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.h +++ b/src/Storages/MergeTree/MergeTreeReaderWide.h @@ -28,7 +28,8 @@ public: /// Return the number of rows has been read or zero if there is no columns to read. /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark - size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; + size_t readRows(size_t from_mark, size_t current_task_last_mark, + bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override; bool canReadIncompleteGranules() const override { return true; } @@ -39,13 +40,14 @@ private: FileStreams streams; Serializations serializations; DiskPtr disk; + std::map> marks; void addStreams(const NameAndTypePair & name_and_type, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type); void readData( const NameAndTypePair & name_and_type, ColumnPtr & column, - size_t from_mark, bool continue_reading, size_t max_rows_to_read, + size_t from_mark, bool continue_reading, size_t current_task_last_mark, size_t max_rows_to_read, ISerialization::SubstreamsCache & cache, bool was_prefetched); /// Make next readData more simple by calling 'prefetch' of all related ReadBuffers (column streams). @@ -53,6 +55,7 @@ private: const NameAndTypePair & name_and_type, size_t from_mark, bool continue_reading, + size_t current_task_last_mark, ISerialization::SubstreamsCache & cache, std::unordered_set & prefetched_streams); /// if stream was already prefetched do nothing }; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 9ea9383c7f0..df8d6a7c127 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -78,7 +78,8 @@ try const auto & sample = reader->getColumns(); Columns columns(sample.size()); - size_t rows_read = reader->readRows(current_mark, continue_reading, rows_to_read, columns); + /// TODO: pass stream size instead of zero? + size_t rows_read = reader->readRows(current_mark, 0, continue_reading, rows_to_read, columns); if (rows_read) {