create async_blocks zk path for old replicated tables and add a flag "async_insert_deduplicate"

This commit is contained in:
Han Fei 2022-12-14 14:06:12 +01:00
parent b2cce09004
commit 34a589a7d8
3 changed files with 6 additions and 3 deletions

View File

@ -222,6 +222,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.", 0) \
\
M(Bool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be performed", 0) \
M(Bool, async_insert_deduplicate, false, "For async INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be performed", 0) \
\
M(UInt64Auto, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled, 'auto' - use majority", 0) \
M(Milliseconds, insert_quorum_timeout, 600000, "If the quorum of replicas did not meet in specified time (in milliseconds), exception will be thrown and insertion is aborted.", 0) \

View File

@ -633,6 +633,8 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString(), zkutil::CreateMode::Persistent));
/// For ALTER PARTITION with multi-leaders
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/alter_partition_version", String(), zkutil::CreateMode::Persistent));
/// For deduplication of async inserts
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/async_blocks", String(), zkutil::CreateMode::Persistent));
/// As for now, "/temp" node must exist, but we want to be able to remove it in future
if (zookeeper->exists(zookeeper_path + "/temp"))
@ -4535,7 +4537,7 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
const auto storage_settings_ptr = getSettings();
const Settings & query_settings = local_context->getSettingsRef();
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
bool async_deduplicate = query_settings.async_insert && storage_settings_ptr->replicated_deduplication_window_for_async_inserts != 0 && query_settings.insert_deduplicate;
bool async_deduplicate = query_settings.async_insert && query_settings.async_insert_deduplicate && storage_settings_ptr->replicated_deduplication_window_for_async_inserts != 0 && query_settings.insert_deduplicate;
if (async_deduplicate)
return std::make_shared<ReplicatedMergeTreeSinkWithAsyncDeduplicate>(
*this, metadata_snapshot, query_settings.insert_quorum.valueOr(0),
@ -6562,7 +6564,7 @@ void StorageReplicatedMergeTree::getClearBlocksInPartitionOpsImpl(
{
Strings blocks;
if (Coordination::Error::ZOK != zookeeper.tryGetChildren(fs::path(zookeeper_path) / blocks_dir_name, blocks))
throw Exception(zookeeper_path + "/" + blocks_dir_name + "blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
throw Exception(zookeeper_path + "/" + blocks_dir_name + "doesn't exist", ErrorCodes::NOT_FOUND_NODE);
String partition_prefix = partition_id + "_";
Strings paths_to_get;

View File

@ -68,7 +68,7 @@ def generate_data(q, total_number):
def fetch_and_insert_data(q, client):
while True:
insert = q.get()
client.query(insert, settings = {"async_insert": 1, "wait_for_async_insert": 0, "async_insert_busy_timeout_ms": 1500, "insert_keeper_fault_injection_probability": 0})
client.query(insert, settings = {"async_insert": 1, "async_insert_deduplicate": 1, "wait_for_async_insert": 0, "async_insert_busy_timeout_ms": 1500, "insert_keeper_fault_injection_probability": 0})
q.task_done()
sleep_time = random.randint(50, 500)
time.sleep(sleep_time/1000.0)