mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 20:12:02 +00:00
Better
This commit is contained in:
parent
aef99dedba
commit
c91b86e220
@ -590,7 +590,7 @@ LRUFileCache::FileSegmentCell::FileSegmentCell(FileSegmentPtr file_segment_, LRU
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String LRUFileCache::dumpStructure()
|
String LRUFileCache::dumpStructure(const Key & key_)
|
||||||
{
|
{
|
||||||
std::lock_guard cache_lock(mutex);
|
std::lock_guard cache_lock(mutex);
|
||||||
|
|
||||||
@ -598,10 +598,13 @@ String LRUFileCache::dumpStructure()
|
|||||||
for (auto it = queue.begin(); it != queue.end(); ++it)
|
for (auto it = queue.begin(); it != queue.end(); ++it)
|
||||||
{
|
{
|
||||||
auto [key, offset] = *it;
|
auto [key, offset] = *it;
|
||||||
|
if (key == key_)
|
||||||
|
{
|
||||||
auto * cell = getCell(key, offset, cache_lock);
|
auto * cell = getCell(key, offset, cache_lock);
|
||||||
result << (it != queue.begin() ? ", " : "") << cell->file_segment->range().toString();
|
result << (it != queue.begin() ? ", " : "") << cell->file_segment->range().toString();
|
||||||
result << "(state: " << cell->file_segment->download_state << ")";
|
result << "(state: " << cell->file_segment->download_state << ")";
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return result.str();
|
return result.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ public:
|
|||||||
virtual void remove(const Key & key) = 0;
|
virtual void remove(const Key & key) = 0;
|
||||||
|
|
||||||
/// For debug.
|
/// For debug.
|
||||||
virtual String dumpStructure() = 0;
|
virtual String dumpStructure(const Key & key) = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
String cache_base_path;
|
String cache_base_path;
|
||||||
@ -173,7 +173,7 @@ public:
|
|||||||
|
|
||||||
Stat getStat();
|
Stat getStat();
|
||||||
|
|
||||||
String dumpStructure() override;
|
String dumpStructure(const Key & key_) override;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -53,9 +53,11 @@ String FileSegment::getOrSetDownloader()
|
|||||||
if (downloader_id.empty())
|
if (downloader_id.empty())
|
||||||
{
|
{
|
||||||
downloader_id = getCallerId();
|
downloader_id = getCallerId();
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii " + range().toString() + " "), "Set downloader: {}, prev state: {}", downloader_id, toString(download_state));
|
||||||
download_state = State::DOWNLOADING;
|
download_state = State::DOWNLOADING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii " + range().toString() + " "), "Returning with downloader: {} and state: {}", downloader_id, toString(download_state));
|
||||||
return downloader_id;
|
return downloader_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,7 +99,15 @@ FileSegment::State FileSegment::wait()
|
|||||||
|
|
||||||
if (download_state == State::DOWNLOADING)
|
if (download_state == State::DOWNLOADING)
|
||||||
{
|
{
|
||||||
LOG_TEST(&Poco::Logger::get("kssenii"), "Waiting on: {}", range().toString());
|
LOG_TEST(&Poco::Logger::get("kssenii " + range().toString() + " "), "{} waiting on: {}", downloader_id, range().toString());
|
||||||
|
|
||||||
|
assert(!downloader_id.empty() && downloader_id != getCallerId());
|
||||||
|
|
||||||
|
#ifndef NDEBUG
|
||||||
|
std::lock_guard cache_lock(cache->mutex);
|
||||||
|
assert(!cache->isLastFileSegmentHolder(key(), offset(), cache_lock));
|
||||||
|
#endif
|
||||||
|
|
||||||
cv.wait_for(segment_lock, std::chrono::seconds(60)); /// TODO: pass through settings
|
cv.wait_for(segment_lock, std::chrono::seconds(60)); /// TODO: pass through settings
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,8 +126,9 @@ bool FileSegment::reserve(size_t size)
|
|||||||
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
|
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
|
||||||
size, range().toString(), downloaded_size);
|
size, range().toString(), downloaded_size);
|
||||||
|
|
||||||
if (downloader_id != getCallerId())
|
auto caller_id = getCallerId();
|
||||||
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Space can be reserved only by downloader");
|
if (downloader_id != caller_id)
|
||||||
|
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "Space can be reserved only by downloader (current: {}, expected: {})", caller_id, downloader_id);
|
||||||
|
|
||||||
assert(reserved_size >= downloaded_size);
|
assert(reserved_size >= downloaded_size);
|
||||||
|
|
||||||
@ -139,22 +150,48 @@ bool FileSegment::reserve(size_t size)
|
|||||||
return reserved;
|
return reserved;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FileSegment::completeBatch()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard segment_lock(mutex);
|
||||||
|
|
||||||
|
bool is_downloader = downloader_id == getCallerId();
|
||||||
|
if (!is_downloader)
|
||||||
|
{
|
||||||
|
cv.notify_all();
|
||||||
|
throw Exception(ErrorCodes::FILE_CACHE_ERROR, "File segment can be completed only by downloader");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (downloaded_size == range().size())
|
||||||
|
download_state = State::DOWNLOADED;
|
||||||
|
|
||||||
|
downloader_id.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
cv.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
void FileSegment::complete(State state)
|
void FileSegment::complete(State state)
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard segment_lock(mutex);
|
std::lock_guard segment_lock(mutex);
|
||||||
|
|
||||||
bool is_downloader = downloader_id == getCallerId();
|
bool is_downloader = downloader_id == getCallerId();
|
||||||
|
|
||||||
if (!is_downloader)
|
if (!is_downloader)
|
||||||
|
{
|
||||||
|
cv.notify_all();
|
||||||
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
|
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
|
||||||
"File segment can be completed only by downloader or downloader's FileSegmentsHodler");
|
"File segment can be completed only by downloader or downloader's FileSegmentsHodler");
|
||||||
|
}
|
||||||
|
|
||||||
if (state != State::DOWNLOADED
|
if (state != State::DOWNLOADED
|
||||||
&& state != State::PARTIALLY_DOWNLOADED
|
&& state != State::PARTIALLY_DOWNLOADED
|
||||||
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
&& state != State::PARTIALLY_DOWNLOADED_NO_CONTINUATION)
|
||||||
|
{
|
||||||
|
cv.notify_all();
|
||||||
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
|
throw Exception(ErrorCodes::FILE_CACHE_ERROR,
|
||||||
"Cannot complete file segment with state: {}", toString(state));
|
"Cannot complete file segment with state: {}", toString(state));
|
||||||
|
}
|
||||||
|
|
||||||
download_state = state;
|
download_state = state;
|
||||||
completeImpl(segment_lock);
|
completeImpl(segment_lock);
|
||||||
@ -174,7 +211,7 @@ void FileSegment::complete()
|
|||||||
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
|
if (downloaded_size == range().size() && download_state != State::DOWNLOADED)
|
||||||
download_state = State::DOWNLOADED;
|
download_state = State::DOWNLOADED;
|
||||||
|
|
||||||
if (download_state == State::DOWNLOADING)
|
if (download_state == State::DOWNLOADING || download_state == State::EMPTY)
|
||||||
download_state = State::PARTIALLY_DOWNLOADED;
|
download_state = State::PARTIALLY_DOWNLOADED;
|
||||||
|
|
||||||
completeImpl(segment_lock);
|
completeImpl(segment_lock);
|
||||||
@ -200,6 +237,7 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & /* segment_lock */)
|
|||||||
if (!downloaded_size)
|
if (!downloaded_size)
|
||||||
{
|
{
|
||||||
download_state = State::SKIP_CACHE;
|
download_state = State::SKIP_CACHE;
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii " + range().toString() + " "), "Remove cell {} (downloaded: {})", range().toString(), downloaded_size);
|
||||||
cache->remove(key(), offset(), cache_lock);
|
cache->remove(key(), offset(), cache_lock);
|
||||||
}
|
}
|
||||||
else if (is_last_holder)
|
else if (is_last_holder)
|
||||||
@ -210,13 +248,17 @@ void FileSegment::completeImpl(std::lock_guard<std::mutex> & /* segment_lock */)
|
|||||||
* in FileSegmentsHolder represent a contiguous range, so we can resize
|
* in FileSegmentsHolder represent a contiguous range, so we can resize
|
||||||
* it only when nobody needs it.
|
* it only when nobody needs it.
|
||||||
*/
|
*/
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii " + range().toString() + " "), "Resize cell {} to downloaded: {}", range().toString(), downloaded_size);
|
||||||
cache->reduceSizeToDownloaded(key(), offset(), cache_lock);
|
cache->reduceSizeToDownloaded(key(), offset(), cache_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (downloader_id == getCallerId())
|
if (downloader_id == getCallerId())
|
||||||
|
{
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii " + range().toString() + " "), "Clearing downloader id: {}, current state: {}", downloader_id, toString(download_state));
|
||||||
downloader_id.clear();
|
downloader_id.clear();
|
||||||
|
}
|
||||||
|
|
||||||
if (!download_can_continue && download_buffer)
|
if (!download_can_continue && download_buffer)
|
||||||
{
|
{
|
||||||
|
@ -72,6 +72,8 @@ public:
|
|||||||
|
|
||||||
Range(size_t left_, size_t right_) : left(left_), right(right_) {}
|
Range(size_t left_, size_t right_) : left(left_), right(right_) {}
|
||||||
|
|
||||||
|
bool operator==(const Range & other) const { return left == other.left && right == other.right; }
|
||||||
|
|
||||||
size_t size() const { return right - left + 1; }
|
size_t size() const { return right - left + 1; }
|
||||||
|
|
||||||
String toString() const { return '[' + std::to_string(left) + ',' + std::to_string(right) + ']'; }
|
String toString() const { return '[' + std::to_string(left) + ',' + std::to_string(right) + ']'; }
|
||||||
@ -91,6 +93,8 @@ public:
|
|||||||
|
|
||||||
void complete(State state);
|
void complete(State state);
|
||||||
|
|
||||||
|
void completeBatch();
|
||||||
|
|
||||||
String getOrSetDownloader();
|
String getOrSetDownloader();
|
||||||
|
|
||||||
bool isDownloader() const;
|
bool isDownloader() const;
|
||||||
@ -99,6 +103,7 @@ public:
|
|||||||
|
|
||||||
static String getCallerId();
|
static String getCallerId();
|
||||||
|
|
||||||
|
String downloader_id;
|
||||||
private:
|
private:
|
||||||
size_t availableSize() const { return reserved_size - downloaded_size; }
|
size_t availableSize() const { return reserved_size - downloaded_size; }
|
||||||
bool lastFileSegmentHolder() const;
|
bool lastFileSegmentHolder() const;
|
||||||
@ -108,7 +113,6 @@ private:
|
|||||||
const Range segment_range;
|
const Range segment_range;
|
||||||
|
|
||||||
State download_state;
|
State download_state;
|
||||||
String downloader_id;
|
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFile> download_buffer;
|
std::unique_ptr<WriteBufferFromFile> download_buffer;
|
||||||
|
|
||||||
|
@ -446,7 +446,6 @@ TEST(LRUFileCache, get)
|
|||||||
holder.~FileSegmentsHolder();
|
holder.~FileSegmentsHolder();
|
||||||
other_1.join();
|
other_1.join();
|
||||||
printRanges(segments);
|
printRanges(segments);
|
||||||
std::cerr << "kssenii: " << cache.dumpStructure() << "\n";
|
|
||||||
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADED);
|
ASSERT_TRUE(segments[1]->state() == DB::FileSegment::State::DOWNLOADED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -458,8 +457,6 @@ TEST(LRUFileCache, get)
|
|||||||
/// Test LRUCache::restore().
|
/// Test LRUCache::restore().
|
||||||
|
|
||||||
auto cache2 = DB::LRUFileCache(cache_base_path, 30, 5);
|
auto cache2 = DB::LRUFileCache(cache_base_path, 30, 5);
|
||||||
std::cerr << "kssenii cache: " << cache.dumpStructure() << "\n";
|
|
||||||
std::cerr << "kssenii cache2: " << cache2.dumpStructure() << "\n";
|
|
||||||
ASSERT_EQ(cache2.getStat().downloaded_size, 5);
|
ASSERT_EQ(cache2.getStat().downloaded_size, 5);
|
||||||
|
|
||||||
auto holder1 = cache2.getOrSet(key, 2, 28); /// Get [2, 29]
|
auto holder1 = cache2.getOrSet(key, 2, 28); /// Get [2, 29]
|
||||||
|
@ -537,6 +537,7 @@ class IColumn;
|
|||||||
\
|
\
|
||||||
M(Int64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
|
M(Int64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
|
||||||
M(Int64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
|
M(Int64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
|
||||||
|
M(Bool, remote_fs_enable_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must me done via disk config), but allows to bypass cache for some queries if intended", 0) \
|
||||||
\
|
\
|
||||||
M(UInt64, http_max_tries, 1, "Max attempts to read via http.", 0) \
|
M(UInt64, http_max_tries, 1, "Max attempts to read via http.", 0) \
|
||||||
M(UInt64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
|
M(UInt64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
|
||||||
|
@ -63,8 +63,8 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::createCacheReadBuffer(size_t
|
|||||||
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::createReadBuffer(FileSegmentPtr file_segment)
|
SeekableReadBufferPtr CachedReadBufferFromRemoteFS::createReadBuffer(FileSegmentPtr file_segment)
|
||||||
{
|
{
|
||||||
auto range = file_segment->range();
|
auto range = file_segment->range();
|
||||||
|
bool first_segment_read_in_range = impl == nullptr;
|
||||||
assert((impl && range.left == file_offset_of_buffer_end) || (!impl && range.left <= file_offset_of_buffer_end));
|
bytes_to_predownload = 0;
|
||||||
|
|
||||||
SeekableReadBufferPtr implementation_buffer;
|
SeekableReadBufferPtr implementation_buffer;
|
||||||
|
|
||||||
@ -109,33 +109,117 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::createReadBuffer(FileSegment
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
case FileSegment::State::DOWNLOADED:
|
case FileSegment::State::DOWNLOADED:
|
||||||
case FileSegment::State::PARTIALLY_DOWNLOADED:
|
|
||||||
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
|
|
||||||
{
|
{
|
||||||
read_type = ReadType::CACHED;
|
read_type = ReadType::CACHED;
|
||||||
implementation_buffer = createCacheReadBuffer(range.left);
|
implementation_buffer = createCacheReadBuffer(range.left);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION:
|
||||||
|
{
|
||||||
|
/// If downloader failed before downloading anything, it is determined
|
||||||
|
/// whether continuation is possible. In case of no continuation and
|
||||||
|
/// downloaded_size == 0 - cache cell is removed and state is switched to SKIP_CACHE.
|
||||||
|
assert(file_segment->downloadOffset() > 0);
|
||||||
|
|
||||||
|
read_type = ReadType::CACHED;
|
||||||
|
implementation_buffer = createCacheReadBuffer(range.left);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case FileSegment::State::PARTIALLY_DOWNLOADED:
|
||||||
|
{
|
||||||
|
auto downloader_id = file_segment->getOrSetDownloader();
|
||||||
|
if (downloader_id == file_segment->getCallerId())
|
||||||
|
{
|
||||||
|
size_t download_offset = file_segment->downloadOffset();
|
||||||
|
bool can_start_from_cache = download_offset && download_offset >= file_offset_of_buffer_end;
|
||||||
|
|
||||||
|
if (can_start_from_cache)
|
||||||
|
{
|
||||||
|
/// segment{k}
|
||||||
|
/// cache: [______|___________
|
||||||
|
/// ^
|
||||||
|
/// download_offset
|
||||||
|
/// requested_range: [__________]
|
||||||
|
/// ^
|
||||||
|
/// file_offset_of_buffer_end
|
||||||
|
|
||||||
|
read_type = ReadType::CACHED;
|
||||||
|
implementation_buffer = createCacheReadBuffer(range.left);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
read_type = ReadType::REMOTE_FS_READ_AND_DOWNLOAD;
|
||||||
|
implementation_buffer = downloader;
|
||||||
|
|
||||||
|
if (download_offset && download_offset < file_offset_of_buffer_end)
|
||||||
|
{
|
||||||
|
/// segment{1}
|
||||||
|
/// cache: [_____|___________
|
||||||
|
/// ^
|
||||||
|
/// download_offset
|
||||||
|
/// requested_range: [__________]
|
||||||
|
/// ^
|
||||||
|
/// file_offset_of_buffer_end
|
||||||
|
|
||||||
|
assert(first_segment_read_in_range);
|
||||||
|
bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1;
|
||||||
|
LOG_TEST(log, "Bytes to predownload {} for {}", bytes_to_predownload, downloader_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
download_state = FileSegment::State::DOWNLOADING;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TEST(log, "Current file segment: {}, read type: {}", range.toString(), toString(read_type));
|
assert((!first_segment_read_in_range && range.left == file_offset_of_buffer_end)
|
||||||
|
|| (first_segment_read_in_range && range.left <= file_offset_of_buffer_end));
|
||||||
|
assert(file_segment->range() == range);
|
||||||
|
|
||||||
|
LOG_TEST(log, "Current file segment: {}, read type: {}, current file offset: {}", range.toString(), toString(read_type), file_offset_of_buffer_end);
|
||||||
|
|
||||||
implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right]
|
implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right]
|
||||||
|
|
||||||
if (read_type == ReadType::CACHED)
|
switch (read_type)
|
||||||
|
{
|
||||||
|
case ReadType::CACHED:
|
||||||
{
|
{
|
||||||
implementation_buffer->seek(file_offset_of_buffer_end - range.left, SEEK_SET);
|
implementation_buffer->seek(file_offset_of_buffer_end - range.left, SEEK_SET);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ReadType::REMOTE_FS_READ:
|
||||||
|
{
|
||||||
|
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ReadType::REMOTE_FS_READ_AND_DOWNLOAD:
|
||||||
|
{
|
||||||
|
if (bytes_to_predownload)
|
||||||
|
{
|
||||||
|
size_t download_offset = file_segment->downloadOffset();
|
||||||
|
assert(download_offset);
|
||||||
|
implementation_buffer->seek(download_offset + 1, SEEK_SET);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
assert(read_type != ReadType::REMOTE_FS_READ_AND_DOWNLOAD || file_offset_of_buffer_end == range.left);
|
assert(file_offset_of_buffer_end == range.left);
|
||||||
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
|
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return implementation_buffer;
|
return implementation_buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,6 +236,7 @@ bool CachedReadBufferFromRemoteFS::completeFileSegmentAndGetNext()
|
|||||||
if (read_type == ReadType::REMOTE_FS_READ_AND_DOWNLOAD)
|
if (read_type == ReadType::REMOTE_FS_READ_AND_DOWNLOAD)
|
||||||
(*file_segment_it)->complete(DB::FileSegment::State::DOWNLOADED);
|
(*file_segment_it)->complete(DB::FileSegment::State::DOWNLOADED);
|
||||||
|
|
||||||
|
LOG_TEST(log, "Removing file segment: {}, downloader: {}", (*file_segment_it)->range().toString(), (*file_segment_it)->downloader_id, (*file_segment_it)->state());
|
||||||
/// Do not hold pointer to file segment if it is not needed anymore
|
/// Do not hold pointer to file segment if it is not needed anymore
|
||||||
/// so can become releasable and can be evicted from cache.
|
/// so can become releasable and can be evicted from cache.
|
||||||
file_segments_holder->file_segments.erase(file_segment_it);
|
file_segments_holder->file_segments.erase(file_segment_it);
|
||||||
@ -198,8 +283,16 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
|
|||||||
if (current_file_segment_it == file_segments_holder->file_segments.end())
|
if (current_file_segment_it == file_segments_holder->file_segments.end())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
bytes_to_predownload = 0;
|
||||||
|
|
||||||
if (impl)
|
if (impl)
|
||||||
{
|
{
|
||||||
|
if (!use_external_buffer)
|
||||||
|
{
|
||||||
|
impl->position() = position();
|
||||||
|
assert(!impl->hasPendingData());
|
||||||
|
}
|
||||||
|
|
||||||
auto current_read_range = (*current_file_segment_it)->range();
|
auto current_read_range = (*current_file_segment_it)->range();
|
||||||
auto current_state = (*current_file_segment_it)->state();
|
auto current_state = (*current_file_segment_it)->state();
|
||||||
|
|
||||||
@ -222,29 +315,71 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
|
|||||||
impl = createReadBuffer(*current_file_segment_it);
|
impl = createReadBuffer(*current_file_segment_it);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto current_read_range = (*current_file_segment_it)->range();
|
|
||||||
|
|
||||||
assert(current_read_range.left <= file_offset_of_buffer_end);
|
|
||||||
assert(current_read_range.right >= file_offset_of_buffer_end);
|
|
||||||
|
|
||||||
if (use_external_buffer)
|
if (use_external_buffer)
|
||||||
{
|
{
|
||||||
assert(!internal_buffer.empty());
|
assert(!internal_buffer.empty());
|
||||||
swap(*impl);
|
swap(*impl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto & file_segment = *current_file_segment_it;
|
||||||
|
auto current_read_range = file_segment->range();
|
||||||
|
|
||||||
|
assert(current_read_range.left <= file_offset_of_buffer_end);
|
||||||
|
assert(current_read_range.right >= file_offset_of_buffer_end);
|
||||||
|
|
||||||
|
bool result = false;
|
||||||
|
size_t size = 0;
|
||||||
|
|
||||||
|
if (bytes_to_predownload)
|
||||||
|
{
|
||||||
|
/// Consider this case. Some user needed segment [a, b] and downloaded it partially
|
||||||
|
/// or did not download it at all. But before he called complete(state) or his holder
|
||||||
|
/// called complete(), some other user who needed segment [a', b'], a < a' < b' and
|
||||||
|
/// started waiting on [a, b] to be downloaded as it intersects with the range he needs.
|
||||||
|
/// But the first downloader fails and second must continue. In this case we need to
|
||||||
|
/// download from offset a'' < a', but return buffer from offset a'.
|
||||||
|
LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId());
|
||||||
|
|
||||||
|
while (bytes_to_predownload
|
||||||
|
&& file_segment->downloadOffset() + 1 != file_offset_of_buffer_end
|
||||||
|
&& downloader->next())
|
||||||
|
{
|
||||||
|
if (file_segment->reserve(downloader->buffer().size()))
|
||||||
|
{
|
||||||
|
size_t size_to_cache = std::min(bytes_to_predownload, downloader->buffer().size());
|
||||||
|
|
||||||
|
file_segment->write(downloader->buffer().begin(), size_to_cache);
|
||||||
|
|
||||||
|
bytes_to_predownload -= size_to_cache;
|
||||||
|
downloader->position() += size_to_cache;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
impl->position() = position();
|
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||||
assert(!impl->hasPendingData());
|
|
||||||
|
read_type = ReadType::REMOTE_FS_READ;
|
||||||
|
bytes_to_predownload = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (file_segment->downloadOffset() + 1 != file_offset_of_buffer_end
|
||||||
|
&& read_type == ReadType::REMOTE_FS_READ_AND_DOWNLOAD)
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Predownloading failed");
|
||||||
|
|
||||||
|
result = downloader->hasPendingData();
|
||||||
|
size = downloader->available();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool result;
|
|
||||||
auto & file_segment = *current_file_segment_it;
|
|
||||||
auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_DOWNLOAD;
|
auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_DOWNLOAD;
|
||||||
|
|
||||||
try
|
try
|
||||||
|
{
|
||||||
|
if (!result)
|
||||||
{
|
{
|
||||||
result = impl->next();
|
result = impl->next();
|
||||||
|
size = impl->buffer().size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -264,16 +399,14 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
|
|||||||
{
|
{
|
||||||
if (download_current_segment)
|
if (download_current_segment)
|
||||||
{
|
{
|
||||||
size_t size = impl->buffer().size();
|
|
||||||
|
|
||||||
if (file_segment->reserve(size))
|
if (file_segment->reserve(size))
|
||||||
{
|
{
|
||||||
file_segment->write(impl->buffer().begin(), size);
|
file_segment->write(impl->buffer().begin(), size);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
|
|
||||||
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||||
|
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -312,13 +445,10 @@ bool CachedReadBufferFromRemoteFS::nextImpl()
|
|||||||
else
|
else
|
||||||
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
|
||||||
|
|
||||||
LOG_TEST(log, "Key: {}. Returning with {} bytes, current range: {}, current offset: {}, file segment state: {}, download offset: {}, size on local fs: {}",
|
LOG_TEST(log, "Key: {}. Returning with {} bytes, current range: {}, current offset: {}, file segment state: {}, download offset: {}",
|
||||||
getHexUIntLowercase(key), working_buffer.size(), current_read_range.toString(),
|
getHexUIntLowercase(key), working_buffer.size(), current_read_range.toString(),
|
||||||
file_offset_of_buffer_end, FileSegment::toString(file_segment->state()), file_segment->downloadOffset());
|
file_offset_of_buffer_end, FileSegment::toString(file_segment->state()), file_segment->downloadOffset());
|
||||||
|
|
||||||
if (file_offset_of_buffer_end > current_read_range.right)
|
|
||||||
completeFileSegmentAndGetNext();
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,8 +43,7 @@ private:
|
|||||||
|
|
||||||
size_t read_until_position;
|
size_t read_until_position;
|
||||||
size_t file_offset_of_buffer_end = 0;
|
size_t file_offset_of_buffer_end = 0;
|
||||||
|
size_t bytes_to_predownload = 0;
|
||||||
String query_id;
|
|
||||||
|
|
||||||
std::optional<FileSegmentsHolder> file_segments_holder;
|
std::optional<FileSegmentsHolder> file_segments_holder;
|
||||||
FileSegments::iterator current_file_segment_it;
|
FileSegments::iterator current_file_segment_it;
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
#include <base/logger_useful.h>
|
#include <base/logger_useful.h>
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <Common/hex.h>
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
@ -27,7 +28,7 @@ namespace DB
|
|||||||
{
|
{
|
||||||
|
|
||||||
#if USE_AWS_S3
|
#if USE_AWS_S3
|
||||||
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path)
|
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t file_size)
|
||||||
{
|
{
|
||||||
current_path = path;
|
current_path = path;
|
||||||
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||||
@ -37,8 +38,12 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
|
|||||||
settings, use_external_buffer, read_until_position, true);
|
settings, use_external_buffer, read_until_position, true);
|
||||||
|
|
||||||
auto cache = settings.remote_fs_cache;
|
auto cache = settings.remote_fs_cache;
|
||||||
if (cache && !cache->shouldBypassCache())
|
if (cache && settings.remote_fs_enable_cache && !cache->shouldBypassCache())
|
||||||
return std::make_shared<CachedReadBufferFromRemoteFS>(path, cache, std::move(reader), settings, read_until_position);
|
{
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii"), "New cacheable buffer for{}: {}", getHexUIntLowercase(cache->hash(path)), read_until_position);
|
||||||
|
return std::make_shared<CachedReadBufferFromRemoteFS>(
|
||||||
|
path, cache, std::move(reader), settings, read_until_position ? read_until_position : file_size);
|
||||||
|
}
|
||||||
|
|
||||||
return std::move(reader);
|
return std::move(reader);
|
||||||
}
|
}
|
||||||
@ -46,7 +51,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S
|
|||||||
|
|
||||||
|
|
||||||
#if USE_AZURE_BLOB_STORAGE
|
#if USE_AZURE_BLOB_STORAGE
|
||||||
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path)
|
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path, size_t /* file_size */)
|
||||||
{
|
{
|
||||||
current_path = path;
|
current_path = path;
|
||||||
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||||
@ -56,7 +61,7 @@ SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementation
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path)
|
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t /* file_size */)
|
||||||
{
|
{
|
||||||
current_path = path;
|
current_path = path;
|
||||||
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool;
|
||||||
@ -65,7 +70,7 @@ SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(
|
|||||||
|
|
||||||
|
|
||||||
#if USE_HDFS
|
#if USE_HDFS
|
||||||
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path)
|
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t /* file_size */)
|
||||||
{
|
{
|
||||||
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size);
|
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size);
|
||||||
}
|
}
|
||||||
@ -116,7 +121,7 @@ void ReadBufferFromRemoteFSGather::initialize()
|
|||||||
if (!current_buf || current_buf_idx != i)
|
if (!current_buf || current_buf_idx != i)
|
||||||
{
|
{
|
||||||
current_buf_idx = i;
|
current_buf_idx = i;
|
||||||
current_buf = createImplementationBuffer(file_path);
|
current_buf = createImplementationBuffer(file_path, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
current_buf->seek(current_buf_offset, SEEK_SET);
|
current_buf->seek(current_buf_offset, SEEK_SET);
|
||||||
@ -152,7 +157,7 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
|
|||||||
++current_buf_idx;
|
++current_buf_idx;
|
||||||
|
|
||||||
const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx];
|
const auto & [path, size] = metadata.remote_fs_objects[current_buf_idx];
|
||||||
current_buf = createImplementationBuffer(path);
|
current_buf = createImplementationBuffer(path, size);
|
||||||
|
|
||||||
return readImpl();
|
return readImpl();
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ public:
|
|||||||
bool initialized() const { return current_buf != nullptr; }
|
bool initialized() const { return current_buf != nullptr; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) = 0;
|
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) = 0;
|
||||||
|
|
||||||
RemoteMetadata metadata;
|
RemoteMetadata metadata;
|
||||||
|
|
||||||
@ -104,7 +104,7 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
|
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
std::shared_ptr<Aws::S3::S3Client> client_ptr;
|
||||||
@ -135,7 +135,7 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
|
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
|
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
|
||||||
@ -162,7 +162,7 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
|
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
String uri;
|
String uri;
|
||||||
@ -191,7 +191,7 @@ public:
|
|||||||
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
|
hdfs_uri = hdfs_uri_.substr(0, begin_of_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
SeekableReadBufferPtr createImplementationBuffer(const String & path) override;
|
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const Poco::Util::AbstractConfiguration & config;
|
const Poco::Util::AbstractConfiguration & config;
|
||||||
|
@ -154,6 +154,8 @@ bool ReadBufferFromS3::nextImpl()
|
|||||||
|
|
||||||
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
off_t ReadBufferFromS3::seek(off_t offset_, int whence)
|
||||||
{
|
{
|
||||||
|
LOG_TEST(&Poco::Logger::get("kssenii"), "kssenii read buffer from s3 seek to: {}", offset_);
|
||||||
|
|
||||||
if (impl && restricted_seek)
|
if (impl && restricted_seek)
|
||||||
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
throw Exception("Seek is allowed only before first read attempt from the buffer.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
||||||
|
|
||||||
@ -221,8 +223,11 @@ off_t ReadBufferFromS3::getPosition()
|
|||||||
|
|
||||||
void ReadBufferFromS3::setReadUntilPosition(size_t position)
|
void ReadBufferFromS3::setReadUntilPosition(size_t position)
|
||||||
{
|
{
|
||||||
|
if (position != static_cast<size_t>(read_until_position))
|
||||||
|
{
|
||||||
read_until_position = position;
|
read_until_position = position;
|
||||||
impl.reset();
|
impl.reset();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||||
|
@ -77,6 +77,7 @@ struct ReadSettings
|
|||||||
|
|
||||||
size_t remote_fs_read_max_backoff_ms = 10000;
|
size_t remote_fs_read_max_backoff_ms = 10000;
|
||||||
size_t remote_fs_read_backoff_max_tries = 4;
|
size_t remote_fs_read_backoff_max_tries = 4;
|
||||||
|
bool remote_fs_enable_cache = true;
|
||||||
|
|
||||||
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
|
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;
|
||||||
|
|
||||||
|
@ -3124,6 +3124,7 @@ ReadSettings Context::getReadSettings() const
|
|||||||
|
|
||||||
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
|
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
|
||||||
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
|
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
|
||||||
|
res.remote_fs_enable_cache = settings.remote_fs_enable_cache;
|
||||||
|
|
||||||
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
|
res.remote_read_min_bytes_for_seek = settings.remote_read_min_bytes_for_seek;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user