server side waiting

This commit is contained in:
Sema Checherinda 2023-11-16 14:29:53 +01:00
parent f3f8392051
commit ddca2c2187
4 changed files with 57 additions and 69 deletions

View File

@ -65,7 +65,6 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION = 8;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_WAITING_PREACTIVE = 9;
std::string getEndpointId(const std::string & node_id)
{
@ -121,7 +120,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
MergeTreePartInfo::fromPartName(part_name, data.format_version);
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_WAITING_PREACTIVE))});
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION))});
LOG_TRACE(log, "Sending part {}", part_name);
@ -139,29 +138,6 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
{
part = findPart(part_name);
/// Ephemeral zero-copy lock may be lost for PreActive parts
/// do not expose PreActive parts
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_WAITING_PREACTIVE)
{
bool part_is_ready = part->getState() != MergeTreeDataPartState::PreActive;
writeBinary(part_is_ready, out);
if (!part_is_ready)
{
LOG_TRACE(log, "Part {} is in PreActive state, reply to the client that part is not ready yet", part_name);
return;
}
}
else
{
bool zero_copy_enabled = data.getSettings()->allow_remote_fs_zero_copy_replication;
if (part->getState() == MergeTreeDataPartState::PreActive && zero_copy_enabled)
{
/// report error, client will try again later, error message would be printed
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", part_name);
}
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
if (part->getDataPartStorage().isStoredOnRemoteDisk())
@ -373,6 +349,25 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
return data_checksums;
}
bool wait_loop(UInt32 wait_timeout_ms, std::function<bool()> pred)
{
static const UInt32 loop_delay_ms = 5;
/// this is sleep-based wait, it has to be short
chassert(wait_timeout_ms < 2000);
if (pred())
return true;
Stopwatch timer;
while (!pred() && timer.elapsedMilliseconds() < wait_timeout_ms)
{
sleepForMilliseconds(loop_delay_ms);
}
return pred();
}
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
/// It is important to include Outdated parts here because remote replicas cannot reliably
@ -381,10 +376,26 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
bool zero_copy_enabled = data.getSettings()->allow_remote_fs_zero_copy_replication;
if (!zero_copy_enabled)
return part;
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
/// Ephemeral zero-copy lock may be lost for PreActive parts
/// do not expose PreActive parts for zero-copy
static const UInt32 wait_timeout_ms = 1000;
bool pred_result = wait_loop(wait_timeout_ms, [&] () { return part->getState() != MergeTreeDataPartState::PreActive; });
if (!pred_result)
throw Exception(
ErrorCodes::ABORTED,
"Part {} is in PreActive state for {} ms. Another host has to be asked.",
name, wait_timeout_ms);
return part;
}
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
@ -442,7 +453,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
{
{"endpoint", endpoint_id},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_WAITING_PREACTIVE)},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_METADATA_VERSION)},
{"compress", "false"}
});
@ -500,43 +511,17 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
creds.setPassword(password);
}
std::unique_ptr<PooledReadWriteBufferFromHTTP> in;
int server_protocol_version = 0;
bool part_is_ready = true;
static const UInt32 part_not_ready_attempts = 5;
static const UInt32 wait_sleep_time_ms = 100;
for (UInt32 attempt = 1; attempt <= part_not_ready_attempts; ++attempt)
{
in = std::make_unique<PooledReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
nullptr,
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_WAITING_PREACTIVE)
readBinary(part_is_ready, *in);
if (part_is_ready)
break;
sleepForMilliseconds(wait_sleep_time_ms);
if (blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Fetching of part was cancelled");
}
if (!part_is_ready)
throw Exception(ErrorCodes::ABORTED, "Part {} is still not ready in host {} after {} attempts, try another host",
part_name, host, part_not_ready_attempts);
std::unique_ptr<PooledReadWriteBufferFromHTTP> in = std::make_unique<PooledReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
nullptr,
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));
DiskPtr preffered_disk = disk;

View File

@ -83,7 +83,7 @@ struct Settings;
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
\
/* Part removal settings. */ \
@ -122,7 +122,7 @@ struct Settings;
M(UInt64, max_replicated_sends_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \
M(Float, fault_probability_before_part_commit, 0, "For testing. Do not change it.", 0) \
M(Float, fault_probability_after_part_commit, 0, "For testing. Do not change it.", 0) \
M(Float, fault_probability_after_part_commit, 0, "For testing. Do not change it.", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \

View File

@ -674,7 +674,7 @@ class MergeTreeSettingsRandomizer:
"replace_long_file_name_to_hash": lambda: random.randint(0, 1),
"max_file_name_length": threshold_generator(0.3, 0.3, 0, 128),
"sleep_before_commit_local_part_in_replicated_table_ms": threshold_generator(
0.3, 0.3, 0, 250
0.7, 0.7, 0, 100
),
}

View File

@ -7,7 +7,7 @@ create table tableIn (n int)
settings
storage_policy='s3_cache',
allow_remote_fs_zero_copy_replication=1,
sleep_before_commit_local_part_in_replicated_table_ms=50000;
sleep_before_commit_local_part_in_replicated_table_ms=5000;
create table tableOut (n int)
engine=ReplicatedMergeTree('/test/02916/{database}/table', '2')
order by tuple()
@ -15,9 +15,12 @@ create table tableOut (n int)
storage_policy='s3_cache',
allow_remote_fs_zero_copy_replication=1;
SET send_logs_level = 'error';
SET send_logs_level='error';
insert into tableIn values(1);
insert into tableIn values(2);
system sync replica tableOut;
select count() from tableOut;
drop table tableIn
drop table tableOut