Merge pull request #56808 from CheSema/retry-fetch-part

Fetching a part waits when that part is fully committed on remote replica
This commit is contained in:
Sema Checherinda 2023-11-19 01:57:25 +01:00 committed by GitHub
commit c465933b1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 10 deletions

View File

@ -65,8 +65,6 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION = 8;
// Reserved for ALTER PRIMARY KEY
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 9;
std::string getEndpointId(const std::string & node_id)
{
@ -351,22 +349,60 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
return data_checksums;
}
bool wait_loop(UInt32 wait_timeout_ms, const std::function<bool()> & pred)
{
static const UInt32 loop_delay_ms = 5;
/// this is sleep-based wait, it has to be short
chassert(wait_timeout_ms < 2000);
if (pred())
return true;
Stopwatch timer;
sleepForMilliseconds(loop_delay_ms);
while (!pred() && timer.elapsedMilliseconds() < wait_timeout_ms)
{
sleepForMilliseconds(loop_delay_ms);
}
return pred();
}
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
/// It is important to include Outdated parts here because remote replicas cannot reliably
/// determine the local state of the part, so queries for the parts in these states are completely normal.
MergeTreeData::DataPartPtr part;
/// Ephemeral zero-copy lock may be lost for PreActive parts
part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
bool zero_copy_enabled = data.getSettings()->allow_remote_fs_zero_copy_replication;
if (zero_copy_enabled)
part = data.getPartIfExists(name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
else
part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
if (!zero_copy_enabled)
return part;
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
/// Ephemeral zero-copy lock may be lost for PreActive parts
/// do not expose PreActive parts for zero-copy
static const UInt32 wait_timeout_ms = 1000;
auto pred = [&] ()
{
auto lock = data.lockParts();
return part->getState() != MergeTreeDataPartState::PreActive;
};
bool pred_result = wait_loop(wait_timeout_ms, pred);
if (!pred_result)
throw Exception(
ErrorCodes::ABORTED,
"Could not exchange part {} as it's in preActive state ({} ms) and it uses zero copy replication. "
"This is expected behaviour and the client will retry fetching the part automatically.",
name, wait_timeout_ms);
return part;
}
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
@ -493,7 +529,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));
DiskPtr preffered_disk = disk;

View File

@ -84,6 +84,7 @@ struct Settings;
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
\
/* Part removal settings. */ \
M(UInt64, simultaneous_parts_removal_limit, 0, "Maximum number of parts to remove during one CleanupThread iteration (0 means unlimited).", 0) \

View File

@ -924,6 +924,14 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
Coordination::Error multi_code = zookeeper->tryMultiNoThrow(ops, responses); /// 1 RTT
if (multi_code == Coordination::Error::ZOK)
{
auto sleep_before_commit_local_part_in_replicated_table_ms = storage.getSettings()->sleep_before_commit_local_part_in_replicated_table_ms;
if (sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds())
{
LOG_INFO(log, "committing part {}, triggered sleep_before_commit_local_part_in_replicated_table_ms {}",
part->name, sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds());
sleepForMilliseconds(sleep_before_commit_local_part_in_replicated_table_ms.totalMilliseconds());
}
part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
transaction.commit();
storage.merge_selecting_task->schedule();

View File

@ -0,0 +1,26 @@
-- Tags: no-replicated-database, no-fasttest
-- Tag no-replicated-database: different number of replicas
create table tableIn (n int)
engine=ReplicatedMergeTree('/test/02916/{database}/table', '1')
order by tuple()
settings
storage_policy='s3_cache',
allow_remote_fs_zero_copy_replication=1,
sleep_before_commit_local_part_in_replicated_table_ms=5000;
create table tableOut (n int)
engine=ReplicatedMergeTree('/test/02916/{database}/table', '2')
order by tuple()
settings
storage_policy='s3_cache',
allow_remote_fs_zero_copy_replication=1;
SET send_logs_level='error';
insert into tableIn values(1);
insert into tableIn values(2);
system sync replica tableOut;
select count() from tableOut;
drop table tableIn;
drop table tableOut;