Review fixes

This commit is contained in:
kssenii 2021-06-27 19:09:17 +00:00
parent db998c3f6c
commit c3a2fc0584
21 changed files with 330 additions and 296 deletions

View File

@ -1,15 +1,15 @@
---
toc_priority: 30
toc_title: MaterializePostgreSQL
toc_title: MaterializedPostgreSQL
---
# MaterializePostgreSQL {#materialize-postgresql}
# MaterializedPostgreSQL {#materialize-postgresql}
## Creating a Database {#creating-a-database}
``` sql
CREATE DATABASE test_database
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password'
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password'
SELECT * FROM test_database.postgres_table;
```
@ -17,17 +17,17 @@ SELECT * FROM test_database.postgres_table;
## Settings {#settings}
1. `materialize_postgresql_max_block_size` - Number of rows collected before flushing data into table. Default: `65536`.
1. `materialized_postgresql_max_block_size` - Number of rows collected before flushing data into table. Default: `65536`.
2. `materialize_postgresql_tables_list` - List of tables for MaterializePostgreSQL database engine. Default: `whole database`.
2. `materialized_postgresql_tables_list` - List of tables for MaterializedPostgreSQL database engine. Default: `whole database`.
3. `materialize_postgresql_allow_automatic_update` - Allow to reload table in the background, when schema changes are detected. Default: `0` (`false`).
3. `materialized_postgresql_allow_automatic_update` - Allow to reload table in the background, when schema changes are detected. Default: `0` (`false`).
``` sql
CREATE DATABASE test_database
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password'
SETTINGS materialize_postgresql_max_block_size = 65536,
materialize_postgresql_tables_list = 'table1,table2,table3';
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password'
SETTINGS materialized_postgresql_max_block_size = 65536,
materialized_postgresql_tables_list = 'table1,table2,table3';
SELECT * FROM test_database.table1;
```
@ -64,3 +64,8 @@ postgres# SELECT CASE relreplident
FROM pg_class
WHERE oid = 'postgres_table'::regclass;
```
## WARNINGS {#warnings}
1. **TOAST** values convertions is not supported. Default value for the data type will be used.

View File

@ -3,13 +3,13 @@ toc_priority: 12
toc_title: MateriaziePostgreSQL
---
# MaterializePostgreSQL {#materialize-postgresql}
# MaterializedPostgreSQL {#materialize-postgresql}
## Creating a Table {#creating-a-table}
``` sql
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
PRIMARY KEY key;
```
@ -18,7 +18,7 @@ PRIMARY KEY key;
- Setting `wal_level`to `logical` and `max_replication_slots` to at least `2` in the postgresql config file.
- A table with engine `MaterializePostgreSQL` must have a primary key - the same as a replica identity index (default: primary key) of a postgres table (See [details on replica identity index](../../database-engines/materialize-postgresql.md#requirements)).
- A table with engine `MaterializedPostgreSQL` must have a primary key - the same as a replica identity index (default: primary key) of a postgres table (See [details on replica identity index](../../database-engines/materialize-postgresql.md#requirements)).
- Only database `Atomic` is allowed.
@ -34,8 +34,13 @@ These columns do not need to be added, when table is created. They are always ac
``` sql
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
PRIMARY KEY key;
SELECT key, value, _version FROM test.postgresql_replica;
```
## WARNINGS {#warnings}
1. **TOAST** values convertions is not supported. Default value for the data type will be used.

View File

@ -428,7 +428,7 @@ class IColumn;
M(Bool, cast_keep_nullable, false, "CAST operator keep Nullable for result data type", 0) \
M(Bool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
M(Bool, allow_experimental_database_materialize_mysql, false, "Allow to create database with Engine=MaterializeMySQL(...).", 0) \
M(Bool, allow_experimental_database_materialize_postgresql, false, "Allow to create database with Engine=MaterializePostgreSQL(...).", 0) \
M(Bool, allow_experimental_database_materialized_postgresql, false, "Allow to create database with Engine=MaterializedPostgreSQL(...).", 0) \
M(Bool, system_events_show_zero_values, false, "Include all metrics, even with zero values", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \
M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \

View File

@ -36,8 +36,8 @@
#if USE_LIBPQXX
#include <Databases/PostgreSQL/DatabasePostgreSQL.h> // Y_IGNORE
#include <Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h>
#include <Storages/PostgreSQL/MaterializePostgreSQLSettings.h>
#include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#endif
namespace fs = std::filesystem;
@ -100,14 +100,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const UUID & uuid = create.uuid;
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "Lazy" ||
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializePostgreSQL";
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializedPostgreSQL";
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
bool has_unexpected_element = engine_define->engine->parameters || engine_define->partition_by ||
engine_define->primary_key || engine_define->order_by ||
engine_define->sample_by;
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializePostgreSQL";
bool may_have_settings = endsWith(engine_name, "MySQL") || engine_name == "Replicated" || engine_name == "MaterializedPostgreSQL";
if (has_unexpected_element || (!may_have_settings && engine_define->settings))
throw Exception("Database engine " + engine_name + " cannot have parameters, primary_key, order_by, sample_by, settings",
ErrorCodes::UNKNOWN_ELEMENT_IN_AST);
@ -263,7 +263,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
return std::make_shared<DatabasePostgreSQL>(
context, metadata_path, engine_define, database_name, postgres_database_name, connection_pool, use_table_cache);
}
else if (engine_name == "MaterializePostgreSQL")
else if (engine_name == "MaterializedPostgreSQL")
{
const ASTFunction * engine = engine_define->engine;
@ -287,12 +287,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
auto parsed_host_port = parseAddress(host_port, 5432);
auto connection_info = postgres::formatConnectionString(postgres_database_name, parsed_host_port.first, parsed_host_port.second, username, password);
auto postgresql_replica_settings = std::make_unique<MaterializePostgreSQLSettings>();
auto postgresql_replica_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (engine_define->settings)
postgresql_replica_settings->loadFromQuery(*engine_define);
return std::make_shared<DatabaseMaterializePostgreSQL>(
return std::make_shared<DatabaseMaterializedPostgreSQL>(
context, metadata_path, uuid, engine_define,
database_name, postgres_database_name, connection_info,
std::move(postgresql_replica_settings));

View File

@ -1,8 +1,8 @@
#include <Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h>
#include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#if USE_LIBPQXX
#include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <DataTypes/DataTypeNullable.h>
@ -31,7 +31,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
DatabaseMaterializedPostgreSQL::DatabaseMaterializedPostgreSQL(
ContextPtr context_,
const String & metadata_path_,
UUID uuid_,
@ -39,8 +39,8 @@ DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
const String & database_name_,
const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info_,
std::unique_ptr<MaterializePostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializePostgreSQL (" + database_name_ + ")", context_)
std::unique_ptr<MaterializedPostgreSQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid_, "DatabaseMaterializedPostgreSQL (" + database_name_ + ")", context_)
, database_engine_define(database_engine_define_->clone())
, remote_database_name(postgres_database_name)
, connection_info(connection_info_)
@ -49,7 +49,7 @@ DatabaseMaterializePostgreSQL::DatabaseMaterializePostgreSQL(
}
void DatabaseMaterializePostgreSQL::startSynchronization()
void DatabaseMaterializedPostgreSQL::startSynchronization()
{
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
@ -57,10 +57,10 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
database_name,
connection_info,
getContext(),
settings->materialize_postgresql_max_block_size.value,
settings->materialize_postgresql_allow_automatic_update,
/* is_materialize_postgresql_database = */ true,
settings->materialize_postgresql_tables_list.value);
settings->materialized_postgresql_max_block_size.value,
settings->materialized_postgresql_allow_automatic_update,
/* is_materialized_postgresql_database = */ true,
settings->materialized_postgresql_tables_list.value);
postgres::Connection connection(connection_info);
std::unordered_set<std::string> tables_to_replicate = replication_handler->fetchRequiredTables(connection.getRef());
@ -73,19 +73,19 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
if (storage)
{
/// Nested table was already created and synchronized.
storage = StorageMaterializePostgreSQL::create(storage, getContext());
storage = StorageMaterializedPostgreSQL::create(storage, getContext());
}
else
{
/// Nested table does not exist and will be created by replication thread.
storage = StorageMaterializePostgreSQL::create(StorageID(database_name, table_name), getContext());
storage = StorageMaterializedPostgreSQL::create(StorageID(database_name, table_name), getContext());
}
/// Cache MaterializePostgreSQL wrapper over nested table.
/// Cache MaterializedPostgreSQL wrapper over nested table.
materialized_tables[table_name] = storage;
/// Let replication thread now, which tables it needs to keep in sync.
replication_handler->addStorage(table_name, storage->as<StorageMaterializePostgreSQL>());
/// Let replication thread know, which tables it needs to keep in sync.
replication_handler->addStorage(table_name, storage->as<StorageMaterializedPostgreSQL>());
}
LOG_TRACE(log, "Loaded {} tables. Starting synchronization", materialized_tables.size());
@ -93,7 +93,7 @@ void DatabaseMaterializePostgreSQL::startSynchronization()
}
void DatabaseMaterializePostgreSQL::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
@ -112,9 +112,9 @@ void DatabaseMaterializePostgreSQL::loadStoredObjects(ContextMutablePtr local_co
}
StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, ContextPtr local_context) const
StoragePtr DatabaseMaterializedPostgreSQL::tryGetTable(const String & name, ContextPtr local_context) const
{
/// In otder to define which table access is needed - to MaterializePostgreSQL table (only in case of SELECT queries) or
/// In otder to define which table access is needed - to MaterializedPostgreSQL table (only in case of SELECT queries) or
/// to its nested ReplacingMergeTree table (in all other cases), the context of a query os modified.
/// Also if materialzied_tables set is empty - it means all access is done to ReplacingMergeTree tables - it is a case after
/// replication_handler was shutdown.
@ -123,14 +123,14 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, Conte
return DatabaseAtomic::tryGetTable(name, local_context);
}
/// Note: In select query we call MaterializePostgreSQL table and it calls tryGetTable from its nested.
/// So the only point, where synchronization is needed - access to MaterializePostgreSQL table wrapper over nested table.
/// Note: In select query we call MaterializedPostgreSQL table and it calls tryGetTable from its nested.
/// So the only point, where synchronization is needed - access to MaterializedPostgreSQL table wrapper over nested table.
std::lock_guard lock(tables_mutex);
auto table = materialized_tables.find(name);
/// Return wrapper over ReplacingMergeTree table. If table synchronization just started, table will not
/// be accessible immediately. Table is considered to exist once its nested table was created.
if (table != materialized_tables.end() && table->second->as <StorageMaterializePostgreSQL>()->hasNested())
if (table != materialized_tables.end() && table->second->as <StorageMaterializedPostgreSQL>()->hasNested())
{
return table->second;
}
@ -139,7 +139,7 @@ StoragePtr DatabaseMaterializePostgreSQL::tryGetTable(const String & name, Conte
}
void DatabaseMaterializePostgreSQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
void DatabaseMaterializedPostgreSQL::createTable(ContextPtr local_context, const String & table_name, const StoragePtr & table, const ASTPtr & query)
{
/// Create table query can only be called from replication thread.
if (local_context->isInternalQuery())
@ -153,7 +153,7 @@ void DatabaseMaterializePostgreSQL::createTable(ContextPtr local_context, const
}
void DatabaseMaterializePostgreSQL::stopReplication()
void DatabaseMaterializedPostgreSQL::stopReplication()
{
if (replication_handler)
replication_handler->shutdown();
@ -163,27 +163,27 @@ void DatabaseMaterializePostgreSQL::stopReplication()
}
void DatabaseMaterializePostgreSQL::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
void DatabaseMaterializedPostgreSQL::dropTable(ContextPtr local_context, const String & table_name, bool no_delay)
{
/// Modify context into nested_context and pass query to Atomic database.
DatabaseAtomic::dropTable(StorageMaterializePostgreSQL::makeNestedTableContext(local_context), table_name, no_delay);
DatabaseAtomic::dropTable(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), table_name, no_delay);
}
void DatabaseMaterializePostgreSQL::drop(ContextPtr local_context)
void DatabaseMaterializedPostgreSQL::drop(ContextPtr local_context)
{
if (replication_handler)
replication_handler->shutdownFinal();
DatabaseAtomic::drop(StorageMaterializePostgreSQL::makeNestedTableContext(local_context));
DatabaseAtomic::drop(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context));
}
DatabaseTablesIteratorPtr DatabaseMaterializePostgreSQL::getTablesIterator(
DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name)
{
/// Modify context into nested_context and pass query to Atomic database.
return DatabaseAtomic::getTablesIterator(StorageMaterializePostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
}

View File

@ -7,7 +7,7 @@
#if USE_LIBPQXX
#include <Storages/PostgreSQL/PostgreSQLReplicationHandler.h>
#include <Storages/PostgreSQL/MaterializePostgreSQLSettings.h>
#include <Storages/PostgreSQL/MaterializedPostgreSQLSettings.h>
#include <Databases/DatabasesCommon.h>
#include <Core/BackgroundSchedulePool.h>
@ -24,11 +24,11 @@ class PostgreSQLConnection;
using PostgreSQLConnectionPtr = std::shared_ptr<PostgreSQLConnection>;
class DatabaseMaterializePostgreSQL : public DatabaseAtomic
class DatabaseMaterializedPostgreSQL : public DatabaseAtomic
{
public:
DatabaseMaterializePostgreSQL(
DatabaseMaterializedPostgreSQL(
ContextPtr context_,
const String & metadata_path_,
UUID uuid_,
@ -36,9 +36,9 @@ public:
const String & database_name_,
const String & postgres_database_name,
const postgres::ConnectionInfo & connection_info,
std::unique_ptr<MaterializePostgreSQLSettings> settings_);
std::unique_ptr<MaterializedPostgreSQLSettings> settings_);
String getEngineName() const override { return "MaterializePostgreSQL"; }
String getEngineName() const override { return "MaterializedPostgreSQL"; }
String getMetadataPath() const override { return metadata_path; }
@ -63,7 +63,7 @@ private:
ASTPtr database_engine_define;
String remote_database_name;
postgres::ConnectionInfo connection_info;
std::unique_ptr<MaterializePostgreSQLSettings> settings;
std::unique_ptr<MaterializedPostgreSQLSettings> settings;
std::shared_ptr<PostgreSQLReplicationHandler> replication_handler;
std::map<std::string, StoragePtr> materialized_tables;

View File

@ -151,7 +151,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
throw Exception(ErrorCodes::UNKNOWN_DATABASE_ENGINE, "Unknown database engine: {}", serializeAST(*create.storage));
}
if (create.storage->engine->name == "Atomic" || create.storage->engine->name == "Replicated" || create.storage->engine->name == "MaterializePostgreSQL")
if (create.storage->engine->name == "Atomic" || create.storage->engine->name == "Replicated" || create.storage->engine->name == "MaterializedPostgreSQL")
{
if (create.attach && create.uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::INCORRECT_QUERY, "UUID must be specified for ATTACH. "
@ -217,9 +217,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
"Enable allow_experimental_database_replicated to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
if (create.storage->engine->name == "MaterializePostgreSQL" && !getContext()->getSettingsRef().allow_experimental_database_materialize_postgresql && !internal)
if (create.storage->engine->name == "MaterializedPostgreSQL" && !getContext()->getSettingsRef().allow_experimental_database_materialized_postgresql && !internal)
{
throw Exception("MaterializePostgreSQL is an experimental database engine. "
throw Exception("MaterializedPostgreSQL is an experimental database engine. "
"Enable allow_experimental_database_postgresql_replica to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}

View File

@ -21,7 +21,7 @@
#endif
#if USE_LIBPQXX
# include <Databases/PostgreSQL/DatabaseMaterializePostgreSQL.h>
# include <Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.h>
#endif
namespace DB
@ -321,7 +321,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
if (auto * replicated = typeid_cast<DatabaseReplicated *>(database.get()))
replicated->stopReplication();
#if USE_LIBPQXX
if (auto * materialize_postgresql = typeid_cast<DatabaseMaterializePostgreSQL *>(database.get()))
if (auto * materialize_postgresql = typeid_cast<DatabaseMaterializedPostgreSQL *>(database.get()))
materialize_postgresql->stopReplication();
#endif

View File

@ -1,30 +0,0 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
#define LIST_OF_MATERIALIZE_POSTGRESQL_SETTINGS(M) \
M(UInt64, materialize_postgresql_max_block_size, 65536, "Number of row collected before flushing data into table.", 0) \
M(String, materialize_postgresql_tables_list, "", "List of tables for MaterializePostgreSQL database engine", 0) \
M(Bool, materialize_postgresql_allow_automatic_update, 0, "Allow to reload table in the background, when schema changes are detected", 0) \
DECLARE_SETTINGS_TRAITS(MaterializePostgreSQLSettingsTraits, LIST_OF_MATERIALIZE_POSTGRESQL_SETTINGS)
struct MaterializePostgreSQLSettings : public BaseSettings<MaterializePostgreSQLSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}
#endif

View File

@ -1,6 +1,6 @@
#include "MaterializePostgreSQLConsumer.h"
#include "MaterializedPostgreSQLConsumer.h"
#include "StorageMaterializePostgreSQL.h"
#include "StorageMaterializedPostgreSQL.h"
#include <Columns/ColumnNullable.h>
#include <Common/hex.h>
#include <DataStreams/copyData.h>
@ -16,10 +16,9 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_TABLE;
}
MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
MaterializedPostgreSQLConsumer::MaterializedPostgreSQLConsumer(
ContextPtr context_,
std::shared_ptr<postgres::Connection> connection_,
const std::string & replication_slot_name_,
@ -46,7 +45,7 @@ MaterializePostgreSQLConsumer::MaterializePostgreSQLConsumer(
}
void MaterializePostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage)
void MaterializedPostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage)
{
const auto storage_metadata = storage->getInMemoryMetadataPtr();
const Block sample_block = storage_metadata->getSampleBlock();
@ -60,7 +59,7 @@ void MaterializePostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage
auto insert_columns = std::make_shared<ASTExpressionList>();
auto table_id = storage->getStorageID();
LOG_TRACE(&Poco::Logger::get("MaterializePostgreSQLBuffer"), "New buffer for table {}.{} ({}), structure: {}",
LOG_TRACE(&Poco::Logger::get("MaterializedPostgreSQLBuffer"), "New buffer for table {}.{} ({}), structure: {}",
table_id.database_name, table_id.table_name, toString(table_id.uuid), sample_block.dumpStructure());
assert(description.sample_block.columns() == storage_columns.size());
@ -79,7 +78,7 @@ void MaterializePostgreSQLConsumer::Buffer::createEmptyBuffer(StoragePtr storage
}
void MaterializePostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx)
void MaterializedPostgreSQLConsumer::insertValue(Buffer & buffer, const std::string & value, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second;
@ -105,14 +104,14 @@ void MaterializePostgreSQLConsumer::insertValue(Buffer & buffer, const std::stri
}
void MaterializePostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx)
void MaterializedPostgreSQLConsumer::insertDefaultValue(Buffer & buffer, size_t column_idx)
{
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column);
}
void MaterializePostgreSQLConsumer::readString(const char * message, size_t & pos, size_t size, String & result)
void MaterializedPostgreSQLConsumer::readString(const char * message, size_t & pos, size_t size, String & result)
{
assert(size > pos + 2);
char current = unhex2(message + pos);
@ -127,7 +126,7 @@ void MaterializePostgreSQLConsumer::readString(const char * message, size_t & po
template<typename T>
T MaterializePostgreSQLConsumer::unhexN(const char * message, size_t pos, size_t n)
T MaterializedPostgreSQLConsumer::unhexN(const char * message, size_t pos, size_t n)
{
T result = 0;
for (size_t i = 0; i < n; ++i)
@ -139,7 +138,7 @@ T MaterializePostgreSQLConsumer::unhexN(const char * message, size_t pos, size_t
}
Int64 MaterializePostgreSQLConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int64 MaterializedPostgreSQLConsumer::readInt64(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 16);
Int64 result = unhexN<Int64>(message, pos, 8);
@ -148,7 +147,7 @@ Int64 MaterializePostgreSQLConsumer::readInt64(const char * message, size_t & po
}
Int32 MaterializePostgreSQLConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int32 MaterializedPostgreSQLConsumer::readInt32(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 8);
Int32 result = unhexN<Int32>(message, pos, 4);
@ -157,7 +156,7 @@ Int32 MaterializePostgreSQLConsumer::readInt32(const char * message, size_t & po
}
Int16 MaterializePostgreSQLConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int16 MaterializedPostgreSQLConsumer::readInt16(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 4);
Int16 result = unhexN<Int16>(message, pos, 2);
@ -166,7 +165,7 @@ Int16 MaterializePostgreSQLConsumer::readInt16(const char * message, size_t & po
}
Int8 MaterializePostgreSQLConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size)
Int8 MaterializedPostgreSQLConsumer::readInt8(const char * message, size_t & pos, [[maybe_unused]] size_t size)
{
assert(size >= pos + 2);
Int8 result = unhex2(message + pos);
@ -175,7 +174,7 @@ Int8 MaterializePostgreSQLConsumer::readInt8(const char * message, size_t & pos,
}
void MaterializePostgreSQLConsumer::readTupleData(
void MaterializedPostgreSQLConsumer::readTupleData(
Buffer & buffer, const char * message, size_t & pos, [[maybe_unused]] size_t size, PostgreSQLQuery type, bool old_value)
{
Int16 num_columns = readInt16(message, pos, size);
@ -247,7 +246,7 @@ void MaterializePostgreSQLConsumer::readTupleData(
/// https://www.postgresql.org/docs/13/protocol-logicalrep-message-formats.html
void MaterializePostgreSQLConsumer::processReplicationMessage(const char * replication_message, size_t size)
void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * replication_message, size_t size)
{
/// Skip '\x'
size_t pos = 2;
@ -456,7 +455,7 @@ void MaterializePostgreSQLConsumer::processReplicationMessage(const char * repli
}
void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransaction> tx)
void MaterializedPostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransaction> tx)
{
try
{
@ -500,7 +499,7 @@ void MaterializePostgreSQLConsumer::syncTables(std::shared_ptr<pqxx::nontransact
}
String MaterializePostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> tx)
String MaterializedPostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontransaction> tx)
{
std::string query_str = fmt::format("SELECT end_lsn FROM pg_replication_slot_advance('{}', '{}')", replication_slot_name, final_lsn);
pqxx::result result{tx->exec(query_str)};
@ -516,7 +515,7 @@ String MaterializePostgreSQLConsumer::advanceLSN(std::shared_ptr<pqxx::nontransa
/// Sync for some table might not be allowed if:
/// 1. Table schema changed and might break synchronization.
/// 2. There is no storage for this table. (As a result of some exception or incorrect pg_publication)
bool MaterializePostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
bool MaterializedPostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
{
auto table_with_lsn = skip_list.find(relation_id);
@ -547,7 +546,7 @@ bool MaterializePostgreSQLConsumer::isSyncAllowed(Int32 relation_id)
}
void MaterializePostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const String & relation_name)
void MaterializedPostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const String & relation_name)
{
/// Empty lsn string means - continue waiting for valid lsn.
skip_list.insert({relation_id, ""});
@ -568,7 +567,7 @@ void MaterializePostgreSQLConsumer::markTableAsSkipped(Int32 relation_id, const
/// Read binary changes from replication slot via COPY command (starting from current lsn in a slot).
bool MaterializePostgreSQLConsumer::readFromReplicationSlot()
bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
{
std::shared_ptr<pqxx::nontransaction> tx;
bool slot_empty = true;
@ -626,7 +625,7 @@ bool MaterializePostgreSQLConsumer::readFromReplicationSlot()
}
catch (const pqxx::conversion_error & e)
{
LOG_ERROR(log, "Convertion error: {}", e.what());
LOG_ERROR(log, "Conversion error: {}", e.what());
return false;
}
catch (const pqxx::broken_connection & e)
@ -662,7 +661,7 @@ bool MaterializePostgreSQLConsumer::readFromReplicationSlot()
}
bool MaterializePostgreSQLConsumer::consume(std::vector<std::pair<Int32, String>> & skipped_tables)
bool MaterializedPostgreSQLConsumer::consume(std::vector<std::pair<Int32, String>> & skipped_tables)
{
/// Check if there are tables, which are skipped from being updated by changes from replication stream,
/// because schema changes were detected. Update them, if it is allowed.
@ -687,7 +686,7 @@ bool MaterializePostgreSQLConsumer::consume(std::vector<std::pair<Int32, String>
}
void MaterializePostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
void MaterializedPostgreSQLConsumer::updateNested(const String & table_name, StoragePtr nested_storage, Int32 table_id, const String & table_start_lsn)
{
/// Cache new pointer to replacingMergeTree table.
storages[table_name] = nested_storage;

View File

@ -14,12 +14,12 @@
namespace DB
{
class MaterializePostgreSQLConsumer
class MaterializedPostgreSQLConsumer
{
public:
using Storages = std::unordered_map<String, StoragePtr>;
MaterializePostgreSQLConsumer(
MaterializedPostgreSQLConsumer(
ContextPtr context_,
std::shared_ptr<postgres::Connection> connection_,
const String & replication_slot_name_,

View File

@ -1,4 +1,4 @@
#include "MaterializePostgreSQLSettings.h"
#include "MaterializedPostgreSQLSettings.h"
#if USE_LIBPQXX
#include <Parsers/ASTCreateQuery.h>
@ -15,9 +15,9 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(MaterializePostgreSQLSettingsTraits, LIST_OF_MATERIALIZE_POSTGRESQL_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)
void MaterializePostgreSQLSettings::loadFromQuery(ASTStorage & storage_def)
void MaterializedPostgreSQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{

View File

@ -0,0 +1,30 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include "config_core.h"
#endif
#if USE_LIBPQXX
#include <Core/BaseSettings.h>
namespace DB
{
class ASTStorage;
#define LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS(M) \
M(UInt64, materialized_postgresql_max_block_size, 65536, "Number of row collected before flushing data into table.", 0) \
M(String, materialized_postgresql_tables_list, "", "List of tables for MaterializedPostgreSQL database engine", 0) \
M(Bool, materialized_postgresql_allow_automatic_update, 0, "Allow to reload table in the background, when schema changes are detected", 0) \
DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)
struct MaterializedPostgreSQLSettings : public BaseSettings<MaterializedPostgreSQLSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}
#endif

View File

@ -2,7 +2,7 @@
#include <DataStreams/PostgreSQLBlockInputStream.h>
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Storages/PostgreSQL/StorageMaterializePostgreSQL.h>
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
@ -15,7 +15,7 @@ namespace DB
{
static const auto RESCHEDULE_MS = 500;
static const auto BACKOFF_TRESHOLD = 5000;
static const auto BACKOFF_TRESHOLD_MS = 10000;
namespace ErrorCodes
{
@ -30,7 +30,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
ContextPtr context_,
const size_t max_block_size_,
bool allow_automatic_update_,
bool is_materialize_postgresql_database_,
bool is_materialized_postgresql_database_,
const String tables_list_)
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, context(context_)
@ -39,7 +39,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
, connection_info(connection_info_)
, max_block_size(max_block_size_)
, allow_automatic_update(allow_automatic_update_)
, is_materialize_postgresql_database(is_materialize_postgresql_database_)
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(tables_list_)
, connection(std::make_shared<postgres::Connection>(connection_info_))
, milliseconds_to_wait(RESCHEDULE_MS)
@ -52,7 +52,7 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
}
void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage)
void PostgreSQLReplicationHandler::addStorage(const std::string & table_name, StorageMaterializedPostgreSQL * storage)
{
materialized_storages[table_name] = storage;
}
@ -122,14 +122,14 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
{
try
{
nested_storages[table_name] = loadFromSnapshot(snapshot_name, table_name, storage->as <StorageMaterializePostgreSQL>());
nested_storages[table_name] = loadFromSnapshot(snapshot_name, table_name, storage->as <StorageMaterializedPostgreSQL>());
}
catch (Exception & e)
{
e.addMessage("while loading table {}.{}", remote_database_name, table_name);
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Throw in case of single MaterializePostgreSQL storage, because initial setup is done immediately
/// Throw in case of single MaterializedPostgreSQL storage, because initial setup is done immediately
/// (unlike database engine where it is done in a separate thread).
if (throw_on_error)
throw;
@ -137,7 +137,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
}
};
/// There is one replication slot for each replication handler. In case of MaterializePostgreSQL database engine,
/// There is one replication slot for each replication handler. In case of MaterializedPostgreSQL database engine,
/// there is one replication slot per database. Its lifetime must be equal to the lifetime of replication handler.
/// Recreation of a replication slot imposes reloading of all tables.
if (!isReplicationSlotExist(tx, start_lsn, /* temporary */false))
@ -159,7 +159,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
LOG_TRACE(log, "Loading {} tables...", materialized_storages.size());
for (const auto & [table_name, storage] : materialized_storages)
{
auto * materialized_storage = storage->as <StorageMaterializePostgreSQL>();
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
try
{
/// Try load nested table, set materialized table metadata.
@ -181,7 +181,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// Pass current connection to consumer. It is not std::moved implicitly, but a shared_ptr is passed.
/// Consumer and replication handler are always executed one after another (not concurrently) and share the same connection.
/// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializePostgreSQLConsumer>(
consumer = std::make_shared<MaterializedPostgreSQLConsumer>(
context,
connection,
replication_slot,
@ -199,7 +199,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
StoragePtr PostgreSQLReplicationHandler::loadFromSnapshot(String & snapshot_name, const String & table_name,
StorageMaterializePostgreSQL * materialized_storage)
StorageMaterializedPostgreSQL * materialized_storage)
{
auto tx = std::make_shared<pqxx::ReplicationTransaction>(connection->getRef());
@ -270,7 +270,7 @@ void PostgreSQLReplicationHandler::consumerFunc()
else
{
consumer_task->scheduleAfter(milliseconds_to_wait);
if (milliseconds_to_wait < BACKOFF_TRESHOLD)
if (milliseconds_to_wait < BACKOFF_TRESHOLD_MS)
milliseconds_to_wait *= 2;
LOG_TRACE(log, "Scheduling replication thread: after {} ms", milliseconds_to_wait);
@ -432,7 +432,7 @@ void PostgreSQLReplicationHandler::shutdownFinal()
}
/// Used by MaterializePostgreSQL database engine.
/// Used by MaterializedPostgreSQL database engine.
NameSet PostgreSQLReplicationHandler::fetchRequiredTables(pqxx::connection & connection_)
{
pqxx::work tx(connection_);
@ -474,7 +474,7 @@ NameSet PostgreSQLReplicationHandler::fetchTablesFromPublication(pqxx::work & tx
PostgreSQLTableStructurePtr PostgreSQLReplicationHandler::fetchTableStructure(
pqxx::ReplicationTransaction & tx, const std::string & table_name) const
{
if (!is_materialize_postgresql_database)
if (!is_materialized_postgresql_database)
return nullptr;
return std::make_unique<PostgreSQLTableStructure>(fetchPostgreSQLTableStructure(tx, table_name, true, true, true));
@ -486,31 +486,36 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
/// If table schema has changed, the table stops consuming changes from replication stream.
/// If `allow_automatic_update` is true, create a new table in the background, load new table schema
/// and all data from scratch. Then execute REPLACE query.
/// This is only allowed for MaterializePostgreSQL database engine.
/// This is only allowed for MaterializedPostgreSQL database engine.
try
{
postgres::Connection replication_connection(connection_info, /* replication */true);
pqxx::nontransaction tx(replication_connection.getRef());
String snapshot_name, start_lsn;
if (isReplicationSlotExist(tx, start_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
createReplicationSlot(tx, start_lsn, snapshot_name, /* temporary */true);
for (const auto & [relation_id, table_name] : relation_data)
{
auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), context);
auto * materialized_storage = storage->as <StorageMaterializePostgreSQL>();
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
/// If for some reason this temporary table already exists - also drop it.
auto temp_materialized_storage = materialized_storage->createTemporary();
/// This snapshot is valid up to the end of the transaction, which exported it.
StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name, temp_materialized_storage->as <StorageMaterializePostgreSQL>());
StoragePtr temp_nested_storage = loadFromSnapshot(snapshot_name, table_name,
temp_materialized_storage->as <StorageMaterializedPostgreSQL>());
auto table_id = materialized_storage->getNestedStorageID();
auto temp_table_id = temp_nested_storage->getStorageID();
LOG_TRACE(log, "Starting background update of table {}.{} ({}) with table {}.{} ({})",
table_id.database_name, table_id.table_name, toString(table_id.uuid),
temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid));
LOG_TRACE(log, "Starting background update of table {} with table {}",
table_id.getNameForLogs(), temp_table_id.getNameForLogs());
auto ast_rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Element elem
@ -529,7 +534,8 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
InterpreterRenameQuery(ast_rename, nested_context).execute();
{
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name), nested_context);
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, table_id.table_name),
nested_context);
auto nested_table_lock = nested_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
auto nested_table_id = nested_storage->getStorageID();
@ -538,8 +544,8 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
auto nested_storage_metadata = nested_storage->getInMemoryMetadataPtr();
auto nested_sample_block = nested_storage_metadata->getSampleBlock();
LOG_TRACE(log, "Updated table {}.{} ({}). New structure: {}",
nested_table_id.database_name, nested_table_id.table_name, toString(nested_table_id.uuid), nested_sample_block.dumpStructure());
LOG_TRACE(log, "Updated table {}. New structure: {}",
nested_table_id.getNameForLogs(), nested_sample_block.dumpStructure());
auto materialized_storage_metadata = nested_storage->getInMemoryMetadataPtr();
auto materialized_sample_block = materialized_storage_metadata->getSampleBlock();
@ -550,7 +556,7 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
consumer->updateNested(table_name, nested_storage, relation_id, start_lsn);
}
LOG_DEBUG(log, "Dropping table {}.{} ({})", temp_table_id.database_name, temp_table_id.table_name, toString(temp_table_id.uuid));
LOG_DEBUG(log, "Dropping table {}", temp_table_id.getNameForLogs());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, nested_context, nested_context, temp_table_id, true);
}
catch (...)
@ -562,5 +568,10 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
dropReplicationSlot(tx, /* temporary */true);
tx.commit();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include "MaterializePostgreSQLConsumer.h"
#include "MaterializedPostgreSQLConsumer.h"
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Core/PostgreSQL/Utils.h>
@ -13,7 +13,7 @@ namespace DB
/// exist in CH, it can be loaded via snapshot while stream is stopped and then comparing wal positions with
/// current lsn and table start lsn.
class StorageMaterializePostgreSQL;
class StorageMaterializedPostgreSQL;
class PostgreSQLReplicationHandler
{
@ -26,7 +26,7 @@ public:
ContextPtr context_,
const size_t max_block_size_,
bool allow_automatic_update_,
bool is_materialize_postgresql_database_,
bool is_materialized_postgresql_database_,
const String tables_list = "");
/// Activate task to be run from a separate thread: wait until connection is available and call startReplication().
@ -39,7 +39,7 @@ public:
void shutdownFinal();
/// Add storage pointer to let handler know which tables it needs to keep in sync.
void addStorage(const std::string & table_name, StorageMaterializePostgreSQL * storage);
void addStorage(const std::string & table_name, StorageMaterializedPostgreSQL * storage);
/// Fetch list of tables which are going to be replicated. Used for database engine.
NameSet fetchRequiredTables(pqxx::connection & connection_);
@ -48,7 +48,7 @@ public:
void startSynchronization(bool throw_on_error);
private:
using MaterializedStorages = std::unordered_map<String, StorageMaterializePostgreSQL *>;
using MaterializedStorages = std::unordered_map<String, StorageMaterializedPostgreSQL *>;
/// Methods to manage Publication.
@ -74,7 +74,7 @@ private:
void consumerFunc();
StoragePtr loadFromSnapshot(std::string & snapshot_name, const String & table_name, StorageMaterializePostgreSQL * materialized_storage);
StoragePtr loadFromSnapshot(std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage);
void reloadFromSnapshot(const std::vector<std::pair<Int32, String>> & relation_data);
@ -95,8 +95,8 @@ private:
/// This setting allows to reloas table in the background.
bool allow_automatic_update = false;
/// To distinguish whether current replication handler belongs to a MaterializePostgreSQL database engine or single storage.
bool is_materialize_postgresql_database;
/// To distinguish whether current replication handler belongs to a MaterializedPostgreSQL database engine or single storage.
bool is_materialized_postgresql_database;
/// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated.
String tables_list;
@ -107,7 +107,7 @@ private:
std::shared_ptr<postgres::Connection> connection;
/// Replication consumer. Manages decoding of replication stream and syncing into tables.
std::shared_ptr<MaterializePostgreSQLConsumer> consumer;
std::shared_ptr<MaterializedPostgreSQLConsumer> consumer;
BackgroundSchedulePool::TaskHolder startup_task, consumer_task;
@ -118,7 +118,7 @@ private:
/// 2. at replication startup
bool new_publication_created = false;
/// MaterializePostgreSQL tables. Used for managing all operations with its internal nested tables.
/// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables.
MaterializedStorages materialized_storages;
UInt64 milliseconds_to_wait;

View File

@ -1,4 +1,4 @@
#include "StorageMaterializePostgreSQL.h"
#include "StorageMaterializedPostgreSQL.h"
#if USE_LIBPQXX
#include <Common/Macros.h>
@ -40,7 +40,7 @@ static const auto TMP_SUFFIX = "_tmp";
/// For the case of single storage.
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
const StorageID & table_id_,
bool is_attach_,
const String & remote_database_name,
@ -48,10 +48,10 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
const postgres::ConnectionInfo & connection_info,
const StorageInMemoryMetadata & storage_metadata,
ContextPtr context_,
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings)
std::unique_ptr<MaterializedPostgreSQLSettings> replication_settings)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, is_materialize_postgresql_database(false)
, is_materialized_postgresql_database(false)
, has_nested(false)
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
, nested_table_id(StorageID(table_id_.database_name, getNestedTableName()))
@ -59,7 +59,7 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
, is_attach(is_attach_)
{
if (table_id_.uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage MaterializePostgreSQL is allowed only for Atomic database");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Storage MaterializedPostgreSQL is allowed only for Atomic database");
setInMemoryMetadata(storage_metadata);
@ -70,31 +70,31 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(
table_id_.database_name,
connection_info,
getContext(),
replication_settings->materialize_postgresql_max_block_size.value,
/* allow_automatic_update */ false, /* is_materialize_postgresql_database */false);
replication_settings->materialized_postgresql_max_block_size.value,
/* allow_automatic_update */ false, /* is_materialized_postgresql_database */false);
}
/// For the case of MaterializePosgreSQL database engine.
/// It is used when nested ReplacingMergeeTree table has not yet be created by replication thread.
/// In this case this storage can't be used for read queries.
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(const StorageID & table_id_, ContextPtr context_)
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, is_materialize_postgresql_database(true)
, is_materialized_postgresql_database(true)
, has_nested(false)
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
{
}
/// Constructor for MaterializePostgreSQL table engine - for the case of MaterializePosgreSQL database engine.
/// Constructor for MaterializedPostgreSQL table engine - for the case of MaterializePosgreSQL database engine.
/// It is used when nested ReplacingMergeeTree table has already been created by replication thread.
/// This storage is ready to handle read queries.
StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(StoragePtr nested_storage_, ContextPtr context_)
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_)
: IStorage(nested_storage_->getStorageID())
, WithContext(context_->getGlobalContext())
, is_materialize_postgresql_database(true)
, is_materialized_postgresql_database(true)
, has_nested(true)
, nested_context(makeNestedTableContext(context_->getGlobalContext()))
, nested_table_id(nested_storage_->getStorageID())
@ -105,72 +105,82 @@ StorageMaterializePostgreSQL::StorageMaterializePostgreSQL(StoragePtr nested_sto
/// A temporary clone table might be created for current table in order to update its schema and reload
/// all data in the background while current table will still handle read requests.
StoragePtr StorageMaterializePostgreSQL::createTemporary() const
StoragePtr StorageMaterializedPostgreSQL::createTemporary() const
{
auto table_id = getStorageID();
auto new_context = Context::createCopy(context);
auto tmp_table_id = StorageID(table_id.database_name, table_id.table_name + TMP_SUFFIX);
return StorageMaterializePostgreSQL::create(StorageID(table_id.database_name, table_id.table_name + TMP_SUFFIX), new_context);
/// If for some reason it already exists - drop it.
auto tmp_storage = DatabaseCatalog::instance().tryGetTable(tmp_table_id, nested_context);
if (tmp_storage)
{
LOG_TRACE(&Poco::Logger::get("MaterializedPostgreSQLStorage"), "Temporary table {} already exists, dropping", tmp_table_id.getNameForLogs());
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), getContext(), tmp_table_id, /* no delay */true);
}
auto new_context = Context::createCopy(context);
return StorageMaterializedPostgreSQL::create(tmp_table_id, new_context);
}
StoragePtr StorageMaterializePostgreSQL::getNested() const
StoragePtr StorageMaterializedPostgreSQL::getNested() const
{
return DatabaseCatalog::instance().getTable(getNestedStorageID(), nested_context);
}
StoragePtr StorageMaterializePostgreSQL::tryGetNested() const
StoragePtr StorageMaterializedPostgreSQL::tryGetNested() const
{
return DatabaseCatalog::instance().tryGetTable(getNestedStorageID(), nested_context);
}
String StorageMaterializePostgreSQL::getNestedTableName() const
String StorageMaterializedPostgreSQL::getNestedTableName() const
{
auto table_id = getStorageID();
if (is_materialize_postgresql_database)
if (is_materialized_postgresql_database)
return table_id.table_name;
return toString(table_id.uuid) + NESTED_TABLE_SUFFIX;
}
StorageID StorageMaterializePostgreSQL::getNestedStorageID() const
StorageID StorageMaterializedPostgreSQL::getNestedStorageID() const
{
if (nested_table_id.has_value())
return nested_table_id.value();
auto table_id = getStorageID();
throw Exception(ErrorCodes::LOGICAL_ERROR,
"No storageID found for inner table. ({}.{}, {})", table_id.database_name, table_id.table_name, toString(table_id.uuid));
"No storageID found for inner table. ({})", table_id.getNameForLogs());
}
void StorageMaterializePostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure)
void StorageMaterializedPostgreSQL::createNestedIfNeeded(PostgreSQLTableStructurePtr table_structure)
{
const auto ast_create = getCreateNestedTableQuery(std::move(table_structure));
auto table_id = getStorageID();
auto tmp_nested_table_id = StorageID(table_id.database_name, getNestedTableName());
try
{
InterpreterCreateQuery interpreter(ast_create, nested_context);
interpreter.execute();
auto table_id = getStorageID();
auto nested_storage = DatabaseCatalog::instance().getTable(StorageID(table_id.database_name, getNestedTableName()), nested_context);
auto nested_storage = DatabaseCatalog::instance().getTable(tmp_nested_table_id, nested_context);
/// Save storage_id with correct uuid.
nested_table_id = nested_storage->getStorageID();
}
catch (...)
catch (Exception & e)
{
e.addMessage("while creating nested table: {}", tmp_nested_table_id.getNameForLogs());
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
std::shared_ptr<Context> StorageMaterializePostgreSQL::makeNestedTableContext(ContextPtr from_context)
std::shared_ptr<Context> StorageMaterializedPostgreSQL::makeNestedTableContext(ContextPtr from_context)
{
auto new_context = Context::createCopy(from_context);
new_context->setInternalQuery(true);
@ -178,7 +188,7 @@ std::shared_ptr<Context> StorageMaterializePostgreSQL::makeNestedTableContext(Co
}
StoragePtr StorageMaterializePostgreSQL::prepare()
StoragePtr StorageMaterializedPostgreSQL::prepare()
{
auto nested_table = getNested();
setInMemoryMetadata(nested_table->getInMemoryMetadata());
@ -187,9 +197,10 @@ StoragePtr StorageMaterializePostgreSQL::prepare()
}
void StorageMaterializePostgreSQL::startup()
void StorageMaterializedPostgreSQL::startup()
{
if (!is_materialize_postgresql_database)
/// replication_handler != nullptr only in case of single table engine MaterializedPostgreSQL.
if (replication_handler)
{
replication_handler->addStorage(remote_table_name, this);
@ -202,7 +213,7 @@ void StorageMaterializePostgreSQL::startup()
else
{
/// Start synchronization preliminary setup immediately and throw in case of failure.
/// It should be guaranteed that if MaterializePostgreSQL table was created successfully, then
/// It should be guaranteed that if MaterializedPostgreSQL table was created successfully, then
/// its nested table was also created.
replication_handler->startSynchronization(/* throw_on_error */ true);
}
@ -210,25 +221,29 @@ void StorageMaterializePostgreSQL::startup()
}
void StorageMaterializePostgreSQL::shutdown()
void StorageMaterializedPostgreSQL::shutdown()
{
if (replication_handler)
replication_handler->shutdown();
}
void StorageMaterializePostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
void StorageMaterializedPostgreSQL::dropInnerTableIfAny(bool no_delay, ContextPtr local_context)
{
if (replication_handler)
/// If it is a table with database engine MaterializedPostgreSQL - return, becuase delition of
/// internal tables is managed there.
if (is_materialized_postgresql_database)
return;
replication_handler->shutdownFinal();
auto nested_table = getNested();
if (nested_table && !is_materialize_postgresql_database)
if (nested_table)
InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, getNestedStorageID(), no_delay);
}
NamesAndTypesList StorageMaterializePostgreSQL::getVirtuals() const
NamesAndTypesList StorageMaterializedPostgreSQL::getVirtuals() const
{
return NamesAndTypesList{
{"_sign", std::make_shared<DataTypeInt8>()},
@ -237,7 +252,7 @@ NamesAndTypesList StorageMaterializePostgreSQL::getVirtuals() const
}
Pipe StorageMaterializePostgreSQL::read(
Pipe StorageMaterializedPostgreSQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
@ -253,7 +268,7 @@ Pipe StorageMaterializePostgreSQL::read(
}
std::shared_ptr<ASTColumnDeclaration> StorageMaterializePostgreSQL::getMaterializedColumnsDeclaration(
std::shared_ptr<ASTColumnDeclaration> StorageMaterializedPostgreSQL::getMaterializedColumnsDeclaration(
const String name, const String type, UInt64 default_value)
{
auto column_declaration = std::make_shared<ASTColumnDeclaration>();
@ -271,7 +286,7 @@ std::shared_ptr<ASTColumnDeclaration> StorageMaterializePostgreSQL::getMateriali
}
ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) const
ASTPtr StorageMaterializedPostgreSQL::getColumnDeclaration(const DataTypePtr & data_type) const
{
WhichDataType which(data_type);
@ -312,17 +327,17 @@ ASTPtr StorageMaterializePostgreSQL::getColumnDeclaration(const DataTypePtr & da
}
/// For single storage MaterializePostgreSQL get columns and primary key columns from storage definition.
/// For database engine MaterializePostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same
/// For single storage MaterializedPostgreSQL get columns and primary key columns from storage definition.
/// For database engine MaterializedPostgreSQL get columns and primary key columns by fetching from PostgreSQL, also using the same
/// transaction with snapshot, which is used for initial tables dump.
ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure)
ASTPtr StorageMaterializedPostgreSQL::getCreateNestedTableQuery(PostgreSQLTableStructurePtr table_structure)
{
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_id = getStorageID();
create_table_query->table = getNestedTableName();
create_table_query->database = table_id.database_name;
if (is_materialize_postgresql_database)
if (is_materialized_postgresql_database)
create_table_query->uuid = table_id.uuid;
auto columns_declare_list = std::make_shared<ASTColumns>();
@ -333,7 +348,7 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
const auto & columns = metadata_snapshot->getColumns();
NamesAndTypesList ordinary_columns_and_types;
if (!is_materialize_postgresql_database)
if (!is_materialized_postgresql_database)
{
ordinary_columns_and_types = columns.getOrdinary();
}
@ -416,19 +431,19 @@ ASTPtr StorageMaterializePostgreSQL::getCreateNestedTableQuery(PostgreSQLTableSt
}
void registerStorageMaterializePostgreSQL(StorageFactory & factory)
void registerStorageMaterializedPostgreSQL(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
bool has_settings = args.storage_def->settings;
auto postgresql_replication_settings = std::make_unique<MaterializePostgreSQLSettings>();
auto postgresql_replication_settings = std::make_unique<MaterializedPostgreSQLSettings>();
if (has_settings)
postgresql_replication_settings->loadFromQuery(*args.storage_def);
if (engine_args.size() != 5)
throw Exception("Storage MaterializePostgreSQL requires 5 parameters: "
throw Exception("Storage MaterializedPostgreSQL requires 5 parameters: "
"PostgreSQL('host:port', 'database', 'table', 'username', 'password'",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -443,7 +458,7 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
args.storage_def->set(args.storage_def->order_by, args.storage_def->primary_key->clone());
if (!args.storage_def->order_by)
throw Exception("Storage MaterializePostgreSQL needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Storage MaterializedPostgreSQL needs order by key or primary key", ErrorCodes::BAD_ARGUMENTS);
if (args.storage_def->primary_key)
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.getContext());
@ -462,14 +477,14 @@ void registerStorageMaterializePostgreSQL(StorageFactory & factory)
engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(),
engine_args[4]->as<ASTLiteral &>().value.safeGet<String>());
return StorageMaterializePostgreSQL::create(
return StorageMaterializedPostgreSQL::create(
args.table_id, args.attach, remote_database, remote_table, connection_info,
metadata, args.getContext(),
std::move(postgresql_replication_settings));
};
factory.registerStorage(
"MaterializePostgreSQL",
"MaterializedPostgreSQL",
creator_fn,
StorageFactory::StorageFeatures{
.supports_settings = true,

View File

@ -6,7 +6,7 @@
#if USE_LIBPQXX
#include "PostgreSQLReplicationHandler.h"
#include "MaterializePostgreSQLSettings.h"
#include "MaterializedPostgreSQLSettings.h"
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
@ -24,9 +24,9 @@
namespace DB
{
/** Case of single MaterializePostgreSQL table engine.
/** Case of single MaterializedPostgreSQL table engine.
*
* A user creates a table with engine MaterializePostgreSQL. Order by expression must be specified (needed for
* A user creates a table with engine MaterializedPostgreSQL. Order by expression must be specified (needed for
* nested ReplacingMergeTree table). This storage owns its own replication handler, which loads table data
* from PostgreSQL into nested ReplacingMergeTree table. If table is not created, but attached, replication handler
* will not start loading-from-snapshot procedure, instead it will continue from last committed lsn.
@ -37,47 +37,47 @@ namespace DB
**/
/** Case of MaterializePostgreSQL database engine.
/** Case of MaterializedPostgreSQL database engine.
*
* MaterializePostgreSQL table exists only in memory and acts as a wrapper for nested table, i.e. only provides an
* MaterializedPostgreSQL table exists only in memory and acts as a wrapper for nested table, i.e. only provides an
* interface to work with nested table. Both tables share the same StorageID.
*
* Main table is never created or dropped via database method. The only way database engine interacts with
* MaterializePostgreSQL table - in tryGetTable() method, a MaterializePostgreSQL table is returned in order to wrap
* MaterializedPostgreSQL table - in tryGetTable() method, a MaterializedPostgreSQL table is returned in order to wrap
* and redirect read requests. Set of such wrapper-tables is cached inside database engine. All other methods in
* regard to materializePostgreSQL table are handled by replication handler.
*
* All database methods, apart from tryGetTable(), are devoted only to nested table.
* NOTE: It makes sense to allow rename method for MaterializePostgreSQL table via database method.
* NOTE: It makes sense to allow rename method for MaterializedPostgreSQL table via database method.
* TODO: Make sure replication-to-table data channel is done only by relation_id.
*
* Also main table has the same InMemoryMetadata as its nested table, so if metadata of nested table changes - main table also has
* to update its metadata, because all read requests are passed to MaterializePostgreSQL table and then it redirects read
* to update its metadata, because all read requests are passed to MaterializedPostgreSQL table and then it redirects read
* into nested table.
*
* When there is a need to update table structure, there will be created a new MaterializePostgreSQL table with its own nested table,
* When there is a need to update table structure, there will be created a new MaterializedPostgreSQL table with its own nested table,
* it will have updated table schema and all data will be loaded from scratch in the background, while previous table with outadted table
* structure will still serve read requests. When data is loaded, nested tables will be swapped, metadata of metarialzied table will be
* updated according to nested table.
*
**/
class StorageMaterializePostgreSQL final : public shared_ptr_helper<StorageMaterializePostgreSQL>, public IStorage, WithContext
class StorageMaterializedPostgreSQL final : public shared_ptr_helper<StorageMaterializedPostgreSQL>, public IStorage, WithContext
{
friend struct shared_ptr_helper<StorageMaterializePostgreSQL>;
friend struct shared_ptr_helper<StorageMaterializedPostgreSQL>;
public:
StorageMaterializePostgreSQL(const StorageID & table_id_, ContextPtr context_);
StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_);
StorageMaterializePostgreSQL(StoragePtr nested_storage_, ContextPtr context_);
StorageMaterializedPostgreSQL(StoragePtr nested_storage_, ContextPtr context_);
String getName() const override { return "MaterializePostgreSQL"; }
String getName() const override { return "MaterializedPostgreSQL"; }
void startup() override;
void shutdown() override;
/// Used only for single MaterializePostgreSQL storage.
/// Used only for single MaterializedPostgreSQL storage.
void dropInnerTableIfAny(bool no_delay, ContextPtr local_context) override;
NamesAndTypesList getVirtuals() const override;
@ -102,7 +102,7 @@ public:
StoragePtr tryGetNested() const;
/// Create a temporary MaterializePostgreSQL table with current_table_name + TMP_SUFFIX.
/// Create a temporary MaterializedPostgreSQL table with current_table_name + TMP_SUFFIX.
/// An empty wrapper is returned - it does not have inMemory metadata, just acts as an empty wrapper over
/// temporary nested, which will be created shortly after.
StoragePtr createTemporary() const;
@ -120,7 +120,7 @@ public:
StoragePtr prepare();
protected:
StorageMaterializePostgreSQL(
StorageMaterializedPostgreSQL(
const StorageID & table_id_,
bool is_attach_,
const String & remote_database_name,
@ -128,7 +128,7 @@ protected:
const postgres::ConnectionInfo & connection_info,
const StorageInMemoryMetadata & storage_metadata,
ContextPtr context_,
std::unique_ptr<MaterializePostgreSQLSettings> replication_settings);
std::unique_ptr<MaterializedPostgreSQLSettings> replication_settings);
private:
static std::shared_ptr<ASTColumnDeclaration> getMaterializedColumnsDeclaration(
@ -140,37 +140,36 @@ private:
String getNestedTableName() const;
/// Not nullptr only for single MaterializePostgreSQL storage, because for MaterializePostgreSQL
/// Not nullptr only for single MaterializedPostgreSQL storage, because for MaterializedPostgreSQL
/// database engine there is one replication handler for all tables.
std::unique_ptr<PostgreSQLReplicationHandler> replication_handler;
/// Distinguish between single MaterilizePostgreSQL table engine and MaterializePostgreSQL database engine,
/// Distinguish between single MaterilizePostgreSQL table engine and MaterializedPostgreSQL database engine,
/// because table with engine MaterilizePostgreSQL acts differently in each case.
bool is_materialize_postgresql_database = false;
bool is_materialized_postgresql_database = false;
/// Will be set to `true` only once - when nested table was loaded by replication thread.
/// After that, it will never be changed. Needed for MaterializePostgreSQL database engine
/// After that, it will never be changed. Needed for MaterializedPostgreSQL database engine
/// because there is an invariant - table exists only if its nested table exists, but nested
/// table is not loaded immediately. It is made atomic, because it is accessed only by database engine,
/// and updated by replication handler (only once).
std::atomic<bool> has_nested = false;
/// Nested table context is a copy of global context, but contains query context with defined
/// ReplacingMergeTree storage in factoriesLog. This is needed to let database engine know
/// whether to access nested table or a wrapper over nested (materialized table).
/// Nested table context is a copy of global context, but modified to answer isInternalQuery() == true.
/// This is needed to let database engine know whether to access nested table or a wrapper over nested (materialized table).
ContextMutablePtr nested_context;
/// Save nested storageID to be able to fetch it. It is set once nested is created and will be
/// updated only when nested is reloaded or renamed.
std::optional<StorageID> nested_table_id;
/// Needed only for the case of single MaterializePostgreSQL storage - in order to make
/// Needed only for the case of single MaterializedPostgreSQL storage - in order to make
/// delayed storage forwarding into replication handler.
String remote_table_name;
/// Needed only for the case of single MaterializePostgreSQL storage, because in case of create
/// Needed only for the case of single MaterializedPostgreSQL storage, because in case of create
/// query (not attach) initial setup will be done immediately and error message is thrown at once.
/// It results in the fact: single MaterializePostgreSQL storage is created only if its nested table is created.
/// It results in the fact: single MaterializedPostgreSQL storage is created only if its nested table is created.
/// In case of attach - this setup will be done in a separate thread in the background. It will also
/// be checked for nested table and attempted to load it if it does not exist for some reason.
bool is_attach = true;

View File

@ -60,7 +60,7 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory);
#if USE_LIBPQXX
void registerStoragePostgreSQL(StorageFactory & factory);
void registerStorageMaterializePostgreSQL(StorageFactory & factory);
void registerStorageMaterializedPostgreSQL(StorageFactory & factory);
#endif
#if USE_MYSQL || USE_LIBPQXX
@ -122,7 +122,7 @@ void registerStorages()
#if USE_LIBPQXX
registerStoragePostgreSQL(factory);
registerStorageMaterializePostgreSQL(factory);
registerStorageMaterializedPostgreSQL(factory);
#endif
#if USE_MYSQL || USE_LIBPQXX

View File

@ -2,7 +2,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_postgresql>1</allow_experimental_database_materialize_postgresql>
<allow_experimental_database_materialized_postgresql>1</allow_experimental_database_materialized_postgresql>
</default>
</profiles>
</yandex>

View File

@ -62,7 +62,7 @@ def create_materialized_db(ip, port,
materialized_database='test_database',
postgres_database='postgres_database',
settings=[]):
create_query = "CREATE DATABASE {} ENGINE = MaterializePostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, ip, port, postgres_database)
create_query = "CREATE DATABASE {} ENGINE = MaterializedPostgreSQL('{}:{}', '{}', 'postgres', 'mysecretpassword')".format(materialized_database, ip, port, postgres_database)
if len(settings) > 0:
create_query += " SETTINGS "
for i in range(len(settings)):
@ -328,7 +328,7 @@ def test_load_and_sync_subset_of_database_tables(started_cluster):
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialize_postgresql_tables_list = '{}'".format(publication_tables)])
settings=["materialized_postgresql_tables_list = '{}'".format(publication_tables)])
assert 'test_database' in instance.query('SHOW DATABASES')
time.sleep(1)
@ -391,7 +391,7 @@ def test_clickhouse_restart(started_cluster):
create_postgres_table(cursor, 'postgresql_replica_{}'.format(i));
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT number, {} from numbers(50)".format(i, i))
instance.query("CREATE DATABASE test_database ENGINE = MaterializePostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
instance.query("CREATE DATABASE test_database ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres', 'mysecretpassword')")
for i in range(NUM_TABLES):
table_name = 'postgresql_replica_{}'.format(i)
@ -449,7 +449,7 @@ def test_table_schema_changes(started_cluster):
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialize_postgresql_allow_automatic_update = 1"])
settings=["materialized_postgresql_allow_automatic_update = 1"])
for i in range(NUM_TABLES):
instance.query("INSERT INTO postgres_database.postgresql_replica_{} SELECT 25 + number, {}, {}, {} from numbers(25)".format(i, i, i, i))
@ -608,7 +608,7 @@ def test_virtual_columns(started_cluster):
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialize_postgresql_allow_automatic_update = 1"])
settings=["materialized_postgresql_allow_automatic_update = 1"])
assert_nested_table_is_created('postgresql_replica_0')
instance.query("INSERT INTO postgres_database.postgresql_replica_0 SELECT number, number from numbers(10)")
check_tables_are_synchronized('postgresql_replica_0');

View File

@ -40,7 +40,7 @@ def create_clickhouse_postgres_db(ip, port, name='postgres_database'):
def create_materialized_table(ip, port):
instance.query('''
CREATE TABLE test.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializePostgreSQL(
ENGINE = MaterializedPostgreSQL(
'{}:{}', 'postgres_database', 'postgresql_replica', 'postgres', 'mysecretpassword')
PRIMARY KEY key; '''.format(ip, port))