Merge pull request #57754 from ClickHouse/fix-loading-dependent-table-materialized-postgresql

Table engine MaterializedPostgreSQL fix dependency loading
This commit is contained in:
Kseniia Sumarokova 2023-12-13 12:17:01 +01:00 committed by GitHub
commit b1325450e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 31 deletions

View File

@ -1,6 +1,10 @@
#include <Databases/DDLLoadingDependencyVisitor.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include "config.h"
#if USE_LIBPQXX
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#endif
#include <Interpreters/Context.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTCreateQuery.h>
@ -131,6 +135,14 @@ void DDLLoadingDependencyVisitor::visit(const ASTStorage & storage, Data & data)
extractTableNameFromArgument(*storage.engine, data, 3);
else if (storage.engine->name == "Dictionary")
extractTableNameFromArgument(*storage.engine, data, 0);
#if USE_LIBPQXX
else if (storage.engine->name == "MaterializedPostgreSQL")
{
const auto * create_query = data.create_query->as<ASTCreateQuery>();
auto nested_table = toString(create_query->uuid) + StorageMaterializedPostgreSQL::NESTED_TABLE_SUFFIX;
data.dependencies.emplace(QualifiedTableName{ .database = create_query->getDatabase(), .table = nested_table });
}
#endif
}

View File

@ -139,6 +139,8 @@ void DatabaseOrdinary::loadTableFromMetadata(
assert(name.database == TSA_SUPPRESS_WARNING_FOR_READ(database_name));
const auto & query = ast->as<const ASTCreateQuery &>();
LOG_TRACE(log, "Loading table {}", name.getFullName());
try
{
auto [table_name, table] = createTableFromAST(

View File

@ -47,9 +47,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static const auto NESTED_TABLE_SUFFIX = "_nested";
static const auto TMP_SUFFIX = "_tmp";
/// For the case of single storage.
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(

View File

@ -63,6 +63,9 @@ namespace DB
class StorageMaterializedPostgreSQL final : public IStorage, WithContext
{
public:
static constexpr auto NESTED_TABLE_SUFFIX = "_nested";
static constexpr auto TMP_SUFFIX = "_tmp";
StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_,
const String & postgres_database_name, const String & postgres_table_name);

View File

@ -280,9 +280,15 @@ class PostgresManager:
f"INSERT INTO {database_name}.{table_name} SELECT number, number from numbers(50)"
)
def create_and_fill_postgres_tables(self, tables_num, numbers=50, database_name=""):
def create_and_fill_postgres_tables(
self,
tables_num,
numbers=50,
database_name="",
table_name_base="postgresql_replica",
):
for i in range(tables_num):
table_name = f"postgresql_replica_{i}"
table_name = f"{table_name_base}_{i}"
create_postgres_table(self.cursor, table_name, database_name)
if numbers > 0:
db = self.database_or_default(database_name)

View File

@ -393,18 +393,19 @@ def test_table_schema_changes(started_cluster):
def test_many_concurrent_queries(started_cluster):
table = "test_many_conc"
query_pool = [
"DELETE FROM postgresql_replica_{} WHERE (value*value) % 3 = 0;",
"UPDATE postgresql_replica_{} SET value = value - 125 WHERE key % 2 = 0;",
"DELETE FROM postgresql_replica_{} WHERE key % 10 = 0;",
"UPDATE postgresql_replica_{} SET value = value*5 WHERE key % 2 = 1;",
"DELETE FROM postgresql_replica_{} WHERE value % 2 = 0;",
"UPDATE postgresql_replica_{} SET value = value + 2000 WHERE key % 5 = 0;",
"DELETE FROM postgresql_replica_{} WHERE value % 3 = 0;",
"UPDATE postgresql_replica_{} SET value = value * 2 WHERE key % 3 = 0;",
"DELETE FROM postgresql_replica_{} WHERE value % 9 = 2;",
"UPDATE postgresql_replica_{} SET value = value + 2 WHERE key % 3 = 1;",
"DELETE FROM postgresql_replica_{} WHERE value%5 = 0;",
"DELETE FROM {} WHERE (value*value) % 3 = 0;",
"UPDATE {} SET value = value - 125 WHERE key % 2 = 0;",
"DELETE FROM {} WHERE key % 10 = 0;",
"UPDATE {} SET value = value*5 WHERE key % 2 = 1;",
"DELETE FROM {} WHERE value % 2 = 0;",
"UPDATE {} SET value = value + 2000 WHERE key % 5 = 0;",
"DELETE FROM {} WHERE value % 3 = 0;",
"UPDATE {} SET value = value * 2 WHERE key % 3 = 0;",
"DELETE FROM {} WHERE value % 9 = 2;",
"UPDATE {} SET value = value + 2 WHERE key % 3 = 1;",
"DELETE FROM {} WHERE value%5 = 0;",
]
NUM_TABLES = 5
@ -415,7 +416,9 @@ def test_many_concurrent_queries(started_cluster):
database=True,
)
cursor = conn.cursor()
pg_manager.create_and_fill_postgres_tables(NUM_TABLES, numbers=10000)
pg_manager.create_and_fill_postgres_tables(
NUM_TABLES, numbers=10000, table_name_base=table
)
def attack(thread_id):
print("thread {}".format(thread_id))
@ -423,17 +426,19 @@ def test_many_concurrent_queries(started_cluster):
for i in range(20):
query_id = random.randrange(0, len(query_pool) - 1)
table_id = random.randrange(0, 5) # num tables
random_table_name = f"{table}_{table_id}"
table_name = f"{table}_{thread_id}"
# random update / delete query
cursor.execute(query_pool[query_id].format(table_id))
print("table {} query {} ok".format(table_id, query_id))
cursor.execute(query_pool[query_id].format(random_table_name))
print("table {} query {} ok".format(random_table_name, query_id))
# allow some thread to do inserts (not to violate key constraints)
if thread_id < 5:
print("try insert table {}".format(thread_id))
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT {}*10000*({} + number), number from numbers(1000)".format(
i, thread_id, k
"INSERT INTO postgres_database.{} SELECT {}*10000*({} + number), number from numbers(1000)".format(
table_name, thread_id, k
)
)
k += 1
@ -443,8 +448,8 @@ def test_many_concurrent_queries(started_cluster):
# also change primary key value
print("try update primary key {}".format(thread_id))
cursor.execute(
"UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(
thread_id, i + 1, i + 1
"UPDATE {table}_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(
table_name, i + 1, i + 1
)
)
print("update primary key {} ok".format(thread_id))
@ -467,25 +472,25 @@ def test_many_concurrent_queries(started_cluster):
n[0] = 50000
for table_id in range(NUM_TABLES):
n[0] += 1
table_name = f"{table}_{table_id}"
instance.query(
"INSERT INTO postgres_database.postgresql_replica_{} SELECT {} + number, number from numbers(5000)".format(
table_id, n[0]
"INSERT INTO postgres_database.{} SELECT {} + number, number from numbers(5000)".format(
table_name, n[0]
)
)
# cursor.execute("UPDATE postgresql_replica_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(table_id, table_id+1, table_id+1))
# cursor.execute("UPDATE {table}_{} SET key=key%100000+100000*{} WHERE key%{}=0".format(table_id, table_id+1, table_id+1))
for thread in threads:
thread.join()
for i in range(NUM_TABLES):
check_tables_are_synchronized(instance, "postgresql_replica_{}".format(i))
table_name = f"{table}_{i}"
check_tables_are_synchronized(instance, table_name)
count1 = instance.query(
"SELECT count() FROM postgres_database.postgresql_replica_{}".format(i)
"SELECT count() FROM postgres_database.{}".format(table_name)
)
count2 = instance.query(
"SELECT count() FROM (SELECT * FROM test_database.postgresql_replica_{})".format(
i
)
"SELECT count() FROM (SELECT * FROM test_database.{})".format(table_name)
)
assert int(count1) == int(count2)
print(count1, count2)

View File

@ -24,4 +24,10 @@
<database>postgres_database</database>
</postgres2>
</named_collections>
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<level>Test</level>
</text_log>
</clickhouse>

View File

@ -1038,6 +1038,62 @@ def test_default_columns(started_cluster):
)
def test_dependent_loading(started_cluster):
table = "test_dependent_loading"
pg_manager.create_postgres_table(table)
instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)"
)
instance.query(
f"""
SET allow_experimental_materialized_postgresql_table=1;
CREATE TABLE {table} (key Int32, value Int32)
ENGINE=MaterializedPostgreSQL('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'postgres_database', '{table}', 'postgres', 'mysecretpassword') ORDER BY key
"""
)
check_tables_are_synchronized(
instance,
table,
postgres_database=pg_manager.get_default_database(),
materialized_database="default",
)
assert 50 == int(instance.query(f"SELECT count() FROM {table}"))
instance.restart_clickhouse()
check_tables_are_synchronized(
instance,
table,
postgres_database=pg_manager.get_default_database(),
materialized_database="default",
)
assert 50 == int(instance.query(f"SELECT count() FROM {table}"))
uuid = instance.query(
f"SELECT uuid FROM system.tables WHERE name='{table}' and database='default' limit 1"
).strip()
nested_table = f"default.`{uuid}_nested`"
instance.contains_in_log(
f"Table default.{table} has 1 dependencies: {nested_table} (level 1)"
)
instance.query("SYSTEM FLUSH LOGS")
nested_time = instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{uuid}_nested' and message not like '%like%'"
).strip()
time = instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{table}' and message not like '%like%'"
).strip()
instance.query(
f"SELECT toDateTime64('{nested_time}', 6) < toDateTime64('{time}', 6)"
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")