mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #27484 from excitoon/patch-10
Don't silently ignore errors and don't count delays in `ReadBufferFromS3`
This commit is contained in:
commit
739caf86d5
@ -43,7 +43,6 @@ ReadBufferFromS3::ReadBufferFromS3(
|
|||||||
|
|
||||||
bool ReadBufferFromS3::nextImpl()
|
bool ReadBufferFromS3::nextImpl()
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
|
||||||
bool next_result = false;
|
bool next_result = false;
|
||||||
|
|
||||||
if (impl)
|
if (impl)
|
||||||
@ -62,19 +61,27 @@ bool ReadBufferFromS3::nextImpl()
|
|||||||
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
|
auto sleep_time_with_backoff_milliseconds = std::chrono::milliseconds(100);
|
||||||
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
|
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
|
||||||
{
|
{
|
||||||
|
Stopwatch watch;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
/// Try to read a next portion of data.
|
/// Try to read a next portion of data.
|
||||||
next_result = impl->next();
|
next_result = impl->next();
|
||||||
|
watch.stop();
|
||||||
|
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
|
watch.stop();
|
||||||
|
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
|
||||||
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
|
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors, 1);
|
||||||
|
|
||||||
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
|
LOG_INFO(log, "Caught exception while reading S3 object. Bucket: {}, Key: {}, Offset: {}, Attempt: {}, Message: {}",
|
||||||
bucket, key, getPosition(), attempt, e.message());
|
bucket, key, getPosition(), attempt, e.message());
|
||||||
|
|
||||||
|
if (attempt + 1 == max_single_read_retries)
|
||||||
|
throw;
|
||||||
|
|
||||||
/// Pause before next attempt.
|
/// Pause before next attempt.
|
||||||
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
|
std::this_thread::sleep_for(sleep_time_with_backoff_milliseconds);
|
||||||
sleep_time_with_backoff_milliseconds *= 2;
|
sleep_time_with_backoff_milliseconds *= 2;
|
||||||
@ -86,9 +93,6 @@ bool ReadBufferFromS3::nextImpl()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
watch.stop();
|
|
||||||
ProfileEvents::increment(ProfileEvents::S3ReadMicroseconds, watch.elapsedMicroseconds());
|
|
||||||
|
|
||||||
if (!next_result)
|
if (!next_result)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
@ -1265,8 +1265,14 @@ def test_kill_while_insert(start_cluster):
|
|||||||
disks = get_used_disks_for_table(node1, name)
|
disks = get_used_disks_for_table(node1, name)
|
||||||
assert set(disks) == {"jbod1"}
|
assert set(disks) == {"jbod1"}
|
||||||
|
|
||||||
|
def ignore_exceptions(f, *args):
|
||||||
|
try:
|
||||||
|
f(*args)
|
||||||
|
except:
|
||||||
|
"""(っಠ‿ಠ)っ"""
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
long_select = threading.Thread(target=node1.query, args=("SELECT sleep(3) FROM {name}".format(name=name),))
|
long_select = threading.Thread(target=ignore_exceptions, args=(node1.query, "SELECT sleep(3) FROM {name}".format(name=name)))
|
||||||
long_select.start()
|
long_select.start()
|
||||||
|
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
Loading…
Reference in New Issue
Block a user