diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index c6422aea453..be601d59baf 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -57,6 +57,12 @@ String FileSegment::getOrSetDownloader() if (downloader_id.empty()) { + if (download_state != State::EMPTY + && download_state != State::PARTIALLY_DOWNLOADED) + throw Exception(ErrorCodes::FILE_CACHE_ERROR, + "Can set downloader only for file segment with state EMPTY or PARTIALLY_DOWNLOADED, but got: {}", + download_state); + downloader_id = getCallerId(); LOG_TEST(log, "Set downloader: {}, prev state: {}", downloader_id, stateToString(download_state)); download_state = State::DOWNLOADING; @@ -77,6 +83,7 @@ String FileSegment::getDownloader() const bool FileSegment::isDownloader() const { std::lock_guard segment_lock(mutex); + LOG_TEST(log, "Checking for current downloader. Caller: {}, downloader: {}, current state: {}", getCallerId(), downloader_id, stateToString(download_state)); return getCallerId() == downloader_id; } @@ -121,6 +128,8 @@ void FileSegment::write(const char * from, size_t size) } cache_writer->write(from, size); + cache_writer->next(); + downloaded_size += size; } @@ -135,7 +144,8 @@ FileSegment::State FileSegment::wait() { LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id); - assert(!downloader_id.empty() && downloader_id != getCallerId()); + assert(!downloader_id.empty()); + assert(downloader_id != getCallerId()); #ifndef NDEBUG { @@ -192,6 +202,8 @@ void FileSegment::completeBatchAndResetDownloader() std::lock_guard segment_lock(mutex); bool is_downloader = downloader_id == getCallerId(); + std::cerr << "caller id: " << getCallerId() << "\n"; + std::cerr << "downloader id: " << downloader_id << "\n"; if (!is_downloader) { cv.notify_all(); @@ -214,7 +226,7 @@ void FileSegment::completeBatchAndResetDownloader() cv.notify_all(); } -void FileSegment::complete(State state) +void FileSegment::complete(State state, bool error) { { std::lock_guard segment_lock(mutex); @@ -226,6 +238,10 @@ void FileSegment::complete(State state) throw Exception(ErrorCodes::FILE_CACHE_ERROR, "File segment can be completed only by downloader or downloader's FileSegmentsHodler"); } + else if (error) + { + remote_file_reader.reset(); + } if (state != State::DOWNLOADED && state != State::PARTIALLY_DOWNLOADED diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index 20107241106..ad70e56dcf8 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -113,7 +113,7 @@ public: void completeBatchAndResetDownloader(); - void complete(State state); + void complete(State state, bool error = false); private: size_t availableSize() const { return reserved_size - downloaded_size; } diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index f19de554184..bddea8d07a9 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -64,7 +64,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe { switch (read_type_) { - case ReadType::REMOTE_FS_AND_PUT_IN_CACHE: + case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: { /** * Implementation (s3, hdfs, web) buffer might be passed through file segments. @@ -106,6 +106,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment(FileSegmentPtr file_segment) { + assert(!file_segment->isDownloader()); + auto range = file_segment->range(); [[maybe_unused]] bool first_segment_read_in_range = impl == nullptr; bytes_to_predownload = 0; @@ -132,7 +134,7 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( auto downloader_id = file_segment->getOrSetDownloader(); if (downloader_id == file_segment->getCallerId()) { - read_type = ReadType::REMOTE_FS_AND_PUT_IN_CACHE; + read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); break; @@ -177,61 +179,65 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( } case FileSegment::State::PARTIALLY_DOWNLOADED: { - auto downloader_id = file_segment->getOrSetDownloader(); - if (downloader_id == file_segment->getCallerId()) + size_t download_offset = file_segment->downloadOffset(); + bool can_start_from_cache = download_offset && download_offset >= file_offset_of_buffer_end; + + if (can_start_from_cache) { - size_t download_offset = file_segment->downloadOffset(); - bool can_start_from_cache = download_offset && download_offset >= file_offset_of_buffer_end; + /// segment{k} + /// cache: [______|___________ + /// ^ + /// download_offset + /// requested_range: [__________] + /// ^ + /// file_offset_of_buffer_end - if (can_start_from_cache) - { - /// segment{k} - /// cache: [______|___________ - /// ^ - /// download_offset - /// requested_range: [__________] - /// ^ - /// file_offset_of_buffer_end - - read_type = ReadType::CACHED; - implementation_buffer = getCacheReadBuffer(range.left); - } - else - { - read_type = ReadType::REMOTE_FS_AND_PUT_IN_CACHE; - implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); - - LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end); - if (download_offset && download_offset + 1 < file_offset_of_buffer_end) - { - /// segment{1} - /// cache: [_____|___________ - /// ^ - /// download_offset - /// requested_range: [__________] - /// ^ - /// file_offset_of_buffer_end - - assert(first_segment_read_in_range); - bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1; - - LOG_TEST(log, "Bytes to predownload {} for {}", bytes_to_predownload, downloader_id); - } - } + read_type = ReadType::CACHED; + implementation_buffer = getCacheReadBuffer(range.left); break; } - else + + LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end); + if (download_offset && download_offset + 1 < file_offset_of_buffer_end) { - download_state = FileSegment::State::DOWNLOADING; - continue; + /// segment{1} + /// cache: [_____|___________ + /// ^ + /// download_offset + /// requested_range: [__________] + /// ^ + /// file_offset_of_buffer_end + + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); + + // assert(first_segment_read_in_range); + // bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1; + + break; } + + auto downloader_id = file_segment->getOrSetDownloader(); + if (downloader_id == file_segment->getCallerId()) + { + read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; + implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); + + break; + } + + download_state = FileSegment::State::DOWNLOADING; + continue; } } break; } + [[maybe_unused]] auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; + assert(download_current_segment == file_segment->isDownloader()); + assert(file_segment->range() == range); assert(file_offset_of_buffer_end >= range.left && file_offset_of_buffer_end <= range.right); @@ -250,8 +256,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( implementation_buffer->seek(seek_offset, SEEK_SET); auto * file_reader = dynamic_cast(implementation_buffer.get()); - LOG_TEST(log, "Cache file: {}. Cached seek to: {}, file size: {}", - file_reader->getFileName(), seek_offset, file_reader->size()); + size_t file_size = file_reader->size(); + auto state = file_segment->state(); + + LOG_TEST(log, "Cache file: {}. Cached seek to: {}, file size: {}, file segment state: {}, download offset: {}", + file_reader->getFileName(), seek_offset, file_size, state, file_segment->downloadOffset()); + + assert(file_size > 0); break; } case ReadType::REMOTE_FS_READ_BYPASS_CACHE: @@ -259,8 +270,10 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET); break; } - case ReadType::REMOTE_FS_AND_PUT_IN_CACHE: + case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: { + assert(file_segment->isDownloader()); + if (bytes_to_predownload) { size_t download_offset = file_segment->downloadOffset(); @@ -271,11 +284,13 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( } else { - assert(!first_segment_read_in_range || file_offset_of_buffer_end == range.left); - implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET); + LOG_TEST(log, "Impl buffer seek to : {}, range: {}, download_offset: {}", + file_offset_of_buffer_end, range.toString(), file_segment->downloadOffset()); - LOG_TEST(log, "Impl buffer seek to current offset: {}", file_offset_of_buffer_end); + // assert(!first_segment_read_in_range || file_offset_of_buffer_end == range.left); + implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET); } + break; } } @@ -338,7 +353,9 @@ bool CachedReadBufferFromRemoteFS::checkForPartialDownload() if (file_offset_of_buffer_end > last_downloaded_offset) { if (file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED) - read_type = ReadType::REMOTE_FS_AND_PUT_IN_CACHE; + { + impl = getReadBufferForFileSegment(*current_file_segment_it); + } else { read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; @@ -349,29 +366,10 @@ bool CachedReadBufferFromRemoteFS::checkForPartialDownload() LOG_TEST( log, "Changing read buffer from cache to remote filesystem read for file segment: {}, starting from offset: {}", file_segment->range().toString(), file_offset_of_buffer_end); - - return true; } - } - } - if (read_type == ReadType::REMOTE_FS_AND_PUT_IN_CACHE) - { - /** - * ReadType::REMOTE_FS_AND_PUT_IN_CACHE means that on previous getReadBufferForFileSegment() call - * current buffer successfully called file_segment->getOrSetDownloader() and became a downloader - * for this file segment. However, the downloader's term has a lifespan of 1 nextImpl() call, - * e.g. downloader reads buffer_size byte and calls completeBatchAndResetDownloader() and some other - * thread can become a downloader if it calls getOrSetDownloader() faster. - * - * So downloader is committed to download only buffer_size bytes and then is not a downloader anymore, - * because there is no guarantee on a higher level, that current buffer will not disappear without - * being destructed till the end of query or without finishing the read range, which he was supposed - * to read by marks range given to him. Therefore, each nextImpl() call, in case of - * READ_AND_PUT_IN_CACHE, starts with getOrSetDownloader(). - */ - impl = getReadBufferForFileSegment(*current_file_segment_it); - return true; + return true; + } } return false; @@ -389,6 +387,11 @@ bool CachedReadBufferFromRemoteFS::nextImpl() if (impl) { + { + auto & file_segment = *current_file_segment_it; + LOG_TEST(log, "Prev init. current read type: {}, range: {}, state: {}", toString(read_type), file_segment->range().toString(), file_segment->state()); + } + if (!use_external_buffer) { impl->position() = position(); @@ -400,21 +403,44 @@ bool CachedReadBufferFromRemoteFS::nextImpl() assert(current_read_range.left <= file_offset_of_buffer_end); + bool new_buf = false; if (file_offset_of_buffer_end > current_read_range.right) { - if (!completeFileSegmentAndGetNext()) + new_buf = completeFileSegmentAndGetNext(); + if (!new_buf) return false; } if (current_state == FileSegment::State::PARTIALLY_DOWNLOADED || current_state == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION) { - checkForPartialDownload(); + new_buf = checkForPartialDownload(); } + + if (!new_buf && read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE) + { + /** + * ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE means that on previous getReadBufferForFileSegment() call + * current buffer successfully called file_segment->getOrSetDownloader() and became a downloader + * for this file segment. However, the downloader's term has a lifespan of 1 nextImpl() call, + * e.g. downloader reads buffer_size byte and calls completeBatchAndResetDownloader() and some other + * thread can become a downloader if it calls getOrSetDownloader() faster. + * + * So downloader is committed to download only buffer_size bytes and then is not a downloader anymore, + * because there is no guarantee on a higher level, that current buffer will not disappear without + * being destructed till the end of query or without finishing the read range, which he was supposed + * to read by marks range given to him. Therefore, each nextImpl() call, in case of + * READ_AND_PUT_IN_CACHE, starts with getOrSetDownloader(). + */ + impl = getReadBufferForFileSegment(*current_file_segment_it); + } + + LOG_TEST(log, "Non-first initialization"); } else { impl = getReadBufferForFileSegment(*current_file_segment_it); + LOG_TEST(log, "First initialization"); } if (use_external_buffer) @@ -444,52 +470,66 @@ bool CachedReadBufferFromRemoteFS::nextImpl() /// download from offset a'' < a', but return buffer from offset a'. LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId()); - while (true) - { - if (bytes_to_predownload - && file_segment->downloadOffset() + 1 != file_offset_of_buffer_end - && !impl->eof()) - { - result = impl->hasPendingData(); - size = impl->available(); + // while (true) + // { + // if (!bytes_to_predownload || impl->eof()) + // { + // if (file_segment->downloadOffset() + 1 != file_offset_of_buffer_end) + // throw Exception( + // ErrorCodes::LOGICAL_ERROR, + // "Failed to predownload. Current file segment: {}, current download offset: {}, expected: {}, eof: {}", + // file_segment->range().toString(), file_segment->downloadOffset(), file_offset_of_buffer_end, impl->eof()); - break; - } + // result = impl->hasPendingData(); + // size = impl->available(); - if (file_segment->reserve(impl->buffer().size())) - { - size_t size_to_cache = std::min(bytes_to_predownload, impl->buffer().size()); - LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, impl->buffer().size()); + // size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; + // remaining_size_to_read = std::min(impl->available(), remaining_size_to_read); - file_segment->write(impl->buffer().begin(), size_to_cache); + // if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end()) + // { + // LOG_TEST(log, "Resize. Offset: {}, remaining size: {}, file offset: {}, range: {}", + // offset(), remaining_size_to_read, file_offset_of_buffer_end, file_segment->range().toString()); + // impl->buffer().resize(offset() + remaining_size_to_read); + // } - bytes_to_predownload -= size_to_cache; - impl->position() += size_to_cache; - } - else - { - file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - bytes_to_predownload = 0; + // break; + // } - read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; - impl = getRemoteFSReadBuffer(file_segment, read_type); - impl->seek(file_offset_of_buffer_end, SEEK_SET); + // if (file_segment->reserve(impl->buffer().size())) + // { + // size_t size_to_cache = std::min(bytes_to_predownload, impl->buffer().size()); + // LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, impl->buffer().size()); - LOG_TEST( - log, "Predownload failed because of space limit. Will read from remote filesystem starting from offset: {}", - file_offset_of_buffer_end); + // file_segment->write(impl->buffer().begin(), size_to_cache); - break; - } + // bytes_to_predownload -= size_to_cache; + // impl->position() += size_to_cache; + // } + // else + // { + // file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + // bytes_to_predownload = 0; - if (file_segment->downloadOffset() + 1 != file_offset_of_buffer_end - && read_type == ReadType::REMOTE_FS_AND_PUT_IN_CACHE) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Predownloading failed"); - } + // read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + // impl = getRemoteFSReadBuffer(file_segment, read_type); + // impl->seek(file_offset_of_buffer_end, SEEK_SET); + + // LOG_TEST( + // log, "Predownload failed because of space limit. Will read from remote filesystem starting from offset: {}", + // file_offset_of_buffer_end); + + // break; + // } + // } } - auto download_current_segment = read_type == ReadType::REMOTE_FS_AND_PUT_IN_CACHE; - assert(!download_current_segment || file_segment->isDownloader()); + auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; + if (download_current_segment != file_segment->isDownloader()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Incorrect segment state. Having read type: {}, Caller id: {}, downloader id: {}, file segment state: {}", + toString(read_type), file_segment->getCallerId(), file_segment->getDownloader(), file_segment->state()); try { @@ -497,6 +537,14 @@ bool CachedReadBufferFromRemoteFS::nextImpl() { result = impl->next(); size = impl->buffer().size(); + + size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; + + if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end()) + { + size = std::min(size, remaining_size_to_read); + impl->buffer().resize(size); + } } } catch (...) @@ -504,7 +552,7 @@ bool CachedReadBufferFromRemoteFS::nextImpl() tryLogCurrentException(__PRETTY_FUNCTION__); if (download_current_segment) - file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED); + file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED, true); /// Note: If exception happens in another place -- out of scope of this buffer, then /// downloader's FileSegmentsHolder is responsible to call file_segment->complete(). @@ -515,6 +563,8 @@ bool CachedReadBufferFromRemoteFS::nextImpl() if (result) { + file_offset_of_buffer_end += size; + if (download_current_segment) { if (file_segment->reserve(size)) @@ -528,31 +578,22 @@ bool CachedReadBufferFromRemoteFS::nextImpl() } } - /// just implement setReadUntilPosition() for local filesysteam read buffer? - if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end()) - { - size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; - impl->buffer().resize(std::min(impl->buffer().size(), remaining_size_to_read)); - } - - file_offset_of_buffer_end += impl->buffer().size(); - switch (read_type) { case ReadType::CACHED: { - ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, working_buffer.size()); + ProfileEvents::increment(ProfileEvents::RemoteFSCacheReadBytes, size); break; } case ReadType::REMOTE_FS_READ_BYPASS_CACHE: { - ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, working_buffer.size()); + ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size); break; } - case ReadType::REMOTE_FS_AND_PUT_IN_CACHE: + case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: { - ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, working_buffer.size()); - ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, working_buffer.size()); + ProfileEvents::increment(ProfileEvents::RemoteFSReadBytes, size); + ProfileEvents::increment(ProfileEvents::RemoteFSCacheDownloadBytes, size); break; } } diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.h b/src/Disks/IO/CachedReadBufferFromRemoteFS.h index f956baa7128..cfb319235fc 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.h @@ -38,7 +38,7 @@ private: { CACHED, REMOTE_FS_READ_BYPASS_CACHE, - REMOTE_FS_AND_PUT_IN_CACHE, + REMOTE_FS_READ_AND_PUT_IN_CACHE, }; SeekableReadBufferPtr getRemoteFSReadBuffer(FileSegmentPtr file_segment, ReadType read_type_); @@ -80,8 +80,8 @@ private: return "CACHED"; case ReadType::REMOTE_FS_READ_BYPASS_CACHE: return "REMOTE_FS_READ_BYPASS_CACHE"; - case ReadType::REMOTE_FS_AND_PUT_IN_CACHE: - return "REMOTE_FS_AND_PUT_IN_CACHE"; + case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: + return "REMOTE_FS_READ_AND_PUT_IN_CACHE"; } } }; diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index fb9c59b3f16..39f07523899 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -160,7 +160,10 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence) return offset; if (impl && restricted_seek) - throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); + throw Exception( + ErrorCodes::CANNOT_SEEK_THROUGH_FILE, + "Seek is allowed only before first read attempt from the buffer (current offset: {}, new offset: {})", + offset, offset_); if (whence != SEEK_SET) throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);