mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
Merge pull request #35922 from kssenii/fix-race-in-cached-buffer
Fix race in cached buffer
This commit is contained in:
commit
77c7545395
@ -334,15 +334,17 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getImplementationBuffer(File
|
|||||||
read_buffer_for_file_segment->seek(file_offset_of_buffer_end, SEEK_SET);
|
read_buffer_for_file_segment->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto impl_range = read_buffer_for_file_segment->getRemainingReadRange();
|
|
||||||
auto download_offset = file_segment->getDownloadOffset();
|
auto download_offset = file_segment->getDownloadOffset();
|
||||||
if (download_offset != static_cast<size_t>(read_buffer_for_file_segment->getPosition()))
|
if (download_offset != static_cast<size_t>(read_buffer_for_file_segment->getPosition()))
|
||||||
|
{
|
||||||
|
auto impl_range = read_buffer_for_file_segment->getRemainingReadRange();
|
||||||
throw Exception(
|
throw Exception(
|
||||||
ErrorCodes::LOGICAL_ERROR,
|
ErrorCodes::LOGICAL_ERROR,
|
||||||
"Buffer's offsets mismatch; cached buffer offset: {}, download_offset: {}, position: {}, implementation buffer offset: {}, "
|
"Buffer's offsets mismatch; cached buffer offset: {}, download_offset: {}, position: {}, implementation buffer offset: {}, "
|
||||||
"implementation buffer reading until: {}, file segment info: {}",
|
"implementation buffer reading until: {}, file segment info: {}",
|
||||||
file_offset_of_buffer_end, download_offset, read_buffer_for_file_segment->getPosition(),
|
file_offset_of_buffer_end, download_offset, read_buffer_for_file_segment->getPosition(),
|
||||||
impl_range.left, *impl_range.right, file_segment->getInfoForLog());
|
impl_range.left, *impl_range.right, file_segment->getInfoForLog());
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -802,12 +804,14 @@ std::optional<size_t> CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset()
|
|||||||
|
|
||||||
String CachedReadBufferFromRemoteFS::getInfoForLog()
|
String CachedReadBufferFromRemoteFS::getInfoForLog()
|
||||||
{
|
{
|
||||||
auto implementation_buffer_read_range_str =
|
String implementation_buffer_read_range_str;
|
||||||
implementation_buffer ?
|
if (implementation_buffer)
|
||||||
std::to_string(implementation_buffer->getRemainingReadRange().left)
|
{
|
||||||
+ '-'
|
auto read_range = implementation_buffer->getRemainingReadRange();
|
||||||
+ (implementation_buffer->getRemainingReadRange().right ? std::to_string(*implementation_buffer->getRemainingReadRange().right) : "None")
|
implementation_buffer_read_range_str = std::to_string(read_range.left) + '-' + (read_range.right ? std::to_string(*read_range.right) : "None");
|
||||||
: "None";
|
}
|
||||||
|
else
|
||||||
|
implementation_buffer_read_range_str = "None";
|
||||||
|
|
||||||
auto current_file_segment_info = current_file_segment_it == file_segments_holder->file_segments.end() ? "None" : (*current_file_segment_it)->getInfoForLog();
|
auto current_file_segment_info = current_file_segment_it == file_segments_holder->file_segments.end() ? "None" : (*current_file_segment_it)->getInfoForLog();
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ void ReadBufferFromS3::setReadUntilPosition(size_t position)
|
|||||||
|
|
||||||
SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
|
SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
|
||||||
{
|
{
|
||||||
return Range{.left = static_cast<size_t>(offset), .right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt};
|
return Range{ .left = static_cast<size_t>(offset), .right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt };
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
|
||||||
|
@ -33,8 +33,11 @@ private:
|
|||||||
String key;
|
String key;
|
||||||
UInt64 max_single_read_retries;
|
UInt64 max_single_read_retries;
|
||||||
|
|
||||||
off_t offset = 0;
|
/// These variables are atomic because they can be used for `logging only`
|
||||||
off_t read_until_position = 0;
|
/// (where it is not important to get consistent result)
|
||||||
|
/// from separate thread other than the one which uses the buffer for s3 reading.
|
||||||
|
std::atomic<off_t> offset = 0;
|
||||||
|
std::atomic<off_t> read_until_position = 0;
|
||||||
|
|
||||||
Aws::S3::Model::GetObjectResult read_result;
|
Aws::S3::Model::GetObjectResult read_result;
|
||||||
std::unique_ptr<ReadBuffer> impl;
|
std::unique_ptr<ReadBuffer> impl;
|
||||||
|
Loading…
Reference in New Issue
Block a user