This commit is contained in:
Kseniia Sumarokova 2024-09-19 00:57:24 +02:00 committed by GitHub
commit 51416c2d6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 116 additions and 4 deletions

View File

@ -10,6 +10,7 @@
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Storages/StorageSnapshot.h>
#include <base/sleep.h>
#include <Common/CurrentThread.h>
@ -218,6 +219,11 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
{
ObjectStorageQueueTableMetadata table_metadata(settings, columns, format);
if (!settings.processing_threads_num.changed && settings.processing_threads_num <= 1)
{
table_metadata.processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
}
std::vector<std::string> metadata_paths;
size_t buckets_num = 0;
if (settings.mode == ObjectStorageQueueMode::ORDERED)
@ -250,6 +256,7 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
LOG_TRACE(log, "Metadata in keeper: {}", metadata_str);
table_metadata.adjustFromKeeper(metadata_from_zk);
table_metadata.checkEquals(metadata_from_zk);
return table_metadata;
}

View File

@ -116,6 +116,18 @@ ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const Str
return ObjectStorageQueueTableMetadata(json);
}
void ObjectStorageQueueTableMetadata::adjustFromKeeper(const ObjectStorageQueueTableMetadata & from_zk)
{
if (processing_threads_num != from_zk.processing_threads_num)
{
LOG_TRACE(getLogger("ObjectStorageQueueTableMetadata"),
"Using `processing_threads_num` from keeper: {} (local: {})",
from_zk.processing_threads_num, processing_threads_num);
processing_threads_num = from_zk.processing_threads_num;
}
}
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const
{
checkImmutableFieldsEquals(from_zk);

View File

@ -25,7 +25,7 @@ struct ObjectStorageQueueTableMetadata
const UInt64 tracked_files_limit;
const UInt64 tracked_file_ttl_sec;
const UInt64 buckets;
const UInt64 processing_threads_num;
UInt64 processing_threads_num; /// Can be changed from keeper.
const String last_processed_path;
const UInt64 loading_retries;
@ -42,6 +42,8 @@ struct ObjectStorageQueueTableMetadata
ObjectStorageQueueMode getMode() const;
void adjustFromKeeper(const ObjectStorageQueueTableMetadata & from_zk);
void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
private:

View File

@ -22,7 +22,6 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <filesystem>

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -124,6 +124,26 @@ def started_cluster():
with_installed_binary=True,
use_old_analyzer=True,
)
cluster.add_instance(
"node1",
with_zookeeper=True,
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
"configs/remote_servers.xml",
],
)
cluster.add_instance(
"node2",
with_zookeeper=True,
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
"configs/remote_servers.xml",
],
)
cluster.add_instance(
"instance_too_many_parts",
user_configs=["configs/users.xml"],
@ -215,6 +235,7 @@ def create_table(
auth=DEFAULT_AUTH,
bucket=None,
expect_error=False,
database_name="default",
):
auth_params = ",".join(auth)
bucket = started_cluster.minio_bucket if bucket is None else bucket
@ -236,7 +257,7 @@ def create_table(
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
CREATE TABLE {database_name}.{table_name} ({format})
ENGINE = {engine_def}
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
@ -514,7 +535,7 @@ def test_direct_select_multiple_files(started_cluster, mode):
table_name,
mode,
files_path,
additional_settings={"keeper_path": keeper_path},
additional_settings={"keeper_path": keeper_path, "processing_threads_num": 3},
)
for i in range(5):
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
@ -1867,3 +1888,58 @@ def test_commit_on_limit(started_cluster):
for value in expected_failed:
assert value not in processed
assert value in failed
def test_replicated(started_cluster):
node1 = started_cluster.instances["node1"]
node2 = started_cluster.instances["node2"]
table_name = f"test_replicated"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}"
files_path = f"{table_name}_data"
files_to_generate = 1000
node1.query(
"CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node1')"
)
node2.query(
"CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node2')"
)
create_table(
started_cluster,
node1,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
database_name="r",
)
assert '"processing_threads_num":16' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node1, f"r.{table_name}", dst_table_name)
create_mv(node2, f"r.{table_name}", dst_table_name)
def get_count():
return int(
node1.query(
f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})"
)
)
expected_rows = files_to_generate
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()