diff --git a/base/mysqlxx/Pool.cpp b/base/mysqlxx/Pool.cpp index 2f47aa67356..cee386311d4 100644 --- a/base/mysqlxx/Pool.cpp +++ b/base/mysqlxx/Pool.cpp @@ -7,10 +7,22 @@ #endif #include - #include - #include +#include + + +namespace +{ + +inline uint64_t clock_gettime_ns(clockid_t clock_type = CLOCK_MONOTONIC) +{ + struct timespec ts; + clock_gettime(clock_type, &ts); + return uint64_t(ts.tv_sec * 1000000000LL + ts.tv_nsec); +} + +} namespace mysqlxx @@ -124,10 +136,15 @@ Pool::~Pool() } -Pool::Entry Pool::get() +Pool::Entry Pool::get(uint64_t wait_timeout) { std::unique_lock lock(mutex); + uint64_t deadline = 0; + /// UINT64_MAX -- wait indefinitely + if (wait_timeout && wait_timeout != UINT64_MAX) + deadline = clock_gettime_ns() + wait_timeout * 1'000'000'000; + initialize(); for (;;) { @@ -153,6 +170,12 @@ Pool::Entry Pool::get() logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription()); } + if (!wait_timeout) + throw Poco::Exception("mysqlxx::Pool is full (wait is disabled, see connection_wait_timeout setting)"); + + if (deadline && clock_gettime_ns() >= deadline) + throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)"); + lock.unlock(); logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL); diff --git a/base/mysqlxx/Pool.h b/base/mysqlxx/Pool.h index 530e2c78cf2..08d8b85b4ac 100644 --- a/base/mysqlxx/Pool.h +++ b/base/mysqlxx/Pool.h @@ -189,7 +189,7 @@ public: ~Pool(); /// Allocates connection. - Entry get(); + Entry get(uint64_t wait_timeout); /// Allocates connection. /// If database is not accessible, returns empty Entry object. diff --git a/base/mysqlxx/PoolWithFailover.cpp b/base/mysqlxx/PoolWithFailover.cpp index e317ab7f228..14c0db9ecd5 100644 --- a/base/mysqlxx/PoolWithFailover.cpp +++ b/base/mysqlxx/PoolWithFailover.cpp @@ -21,8 +21,9 @@ PoolWithFailover::PoolWithFailover( const unsigned max_connections_, const size_t max_tries_) : max_tries(max_tries_) + , shareable(config_.getBool(config_name_ + ".share_connection", false)) + , wait_timeout(UINT64_MAX) { - shareable = config_.getBool(config_name_ + ".share_connection", false); if (config_.has(config_name_ + ".replica")) { Poco::Util::AbstractConfiguration::Keys replica_keys; @@ -80,9 +81,11 @@ PoolWithFailover::PoolWithFailover( const std::string & password, unsigned default_connections_, unsigned max_connections_, - size_t max_tries_) + size_t max_tries_, + uint64_t wait_timeout_) : max_tries(max_tries_) , shareable(false) + , wait_timeout(wait_timeout_) { /// Replicas have the same priority, but traversed replicas are moved to the end of the queue. for (const auto & [host, port] : addresses) @@ -101,6 +104,7 @@ PoolWithFailover::PoolWithFailover( PoolWithFailover::PoolWithFailover(const PoolWithFailover & other) : max_tries{other.max_tries} , shareable{other.shareable} + , wait_timeout(other.wait_timeout) { if (shareable) { @@ -140,7 +144,7 @@ PoolWithFailover::Entry PoolWithFailover::get() try { - Entry entry = shareable ? pool->get() : pool->tryGet(); + Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet(); if (!entry.isNull()) { @@ -172,7 +176,7 @@ PoolWithFailover::Entry PoolWithFailover::get() if (full_pool) { app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription()); - return (*full_pool)->get(); + return (*full_pool)->get(wait_timeout); } std::stringstream message; diff --git a/base/mysqlxx/PoolWithFailover.h b/base/mysqlxx/PoolWithFailover.h index 1c7a63e76c0..2bd5ec9f30a 100644 --- a/base/mysqlxx/PoolWithFailover.h +++ b/base/mysqlxx/PoolWithFailover.h @@ -80,6 +80,8 @@ namespace mysqlxx std::mutex mutex; /// Can the Pool be shared bool shareable; + /// Timeout for waiting free connection. + uint64_t wait_timeout = 0; public: using Entry = Pool::Entry; @@ -96,6 +98,7 @@ namespace mysqlxx * default_connections Number of connection in pool to each replica at start. * max_connections Maximum number of connections in pool to each replica. * max_tries_ Max number of connection tries. + * wait_timeout_ Timeout for waiting free connection. */ PoolWithFailover( const std::string & config_name_, @@ -117,7 +120,8 @@ namespace mysqlxx const std::string & password, unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS, - size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES); + size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, + uint64_t wait_timeout_ = UINT64_MAX); PoolWithFailover(const PoolWithFailover & other); diff --git a/docs/en/engines/database-engines/materialized-postgresql.md b/docs/en/engines/database-engines/materialized-postgresql.md index 89c7c803bb3..77a5f2af0e0 100644 --- a/docs/en/engines/database-engines/materialized-postgresql.md +++ b/docs/en/engines/database-engines/materialized-postgresql.md @@ -31,6 +31,10 @@ ENGINE = MaterializedPostgreSQL('host:port', ['database' | database], 'user', 'p - [materialized_postgresql_allow_automatic_update](../../operations/settings/settings.md#materialized-postgresql-allow-automatic-update) +- [materialized_postgresql_replication_slot](../../operations/settings/settings.md#materialized-postgresql-replication-slot) + +- [materialized_postgresql_snapshot](../../operations/settings/settings.md#materialized-postgresql-snapshot) + ``` sql CREATE DATABASE database1 ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password') @@ -73,7 +77,7 @@ WHERE oid = 'postgres_table'::regclass; !!! warning "Warning" Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.html) values is not supported. The default value for the data type will be used. - + ## Example of Use {#example-of-use} ``` sql @@ -82,3 +86,11 @@ ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres SELECT * FROM postgresql_db.postgres_table; ``` + +## Notes {#notes} + +- Failover of the logical replication slot. + +Logical Replication Slots which exist on the primary are not available on standby replicas. +So if there is a failover, new primary (the old physical standby) won’t be aware of any slots which were existing with old primary. This will lead to a broken replication from PostgreSQL. +A solution to this is to manage replication slots yourself and define a permanent replication slot (some information can be found [here](https://patroni.readthedocs.io/en/latest/SETTINGS.html)). You'll need to pass slot name via `materialized_postgresql_replication_slot` setting, and it has to be exported with `EXPORT SNAPSHOT` option. The snapshot identifier needs to be passed via `materialized_postgresql_snapshot` setting. diff --git a/docs/en/engines/table-engines/integrations/mysql.md b/docs/en/engines/table-engines/integrations/mysql.md index a6402e00bc9..7eac159a645 100644 --- a/docs/en/engines/table-engines/integrations/mysql.md +++ b/docs/en/engines/table-engines/integrations/mysql.md @@ -19,6 +19,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] SETTINGS [connection_pool_size=16, ] [connection_max_tries=3, ] + [connection_wait_timeout=5, ] /* 0 -- do not wait */ [connection_auto_close=true ] ; ``` diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 5d162f6a426..c99c39fe05c 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3436,6 +3436,14 @@ Possible values: Default value: `0`. +## materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot} + +Allows to have user-managed replication slots. Must be used together with `materialized_postgresql_snapshot`. + +## materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot} + +A text string identifying a snapshot, from which initial dump of tables will be performed. Must be used together with `materialized_postgresql_replication_slot`. + ## allow_experimental_projection_optimization {#allow-experimental-projection-optimization} Enables or disables [projection](../../engines/table-engines/mergetree-family/mergetree.md#projections) optimization when processing `SELECT` queries. @@ -3449,7 +3457,7 @@ Default value: `0`. ## force_optimize_projection {#force-optimize-projection} -Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting). +Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting). Possible values: diff --git a/docs/en/operations/system-tables/views.md b/docs/en/operations/system-tables/views.md deleted file mode 100644 index 8edebf00a91..00000000000 --- a/docs/en/operations/system-tables/views.md +++ /dev/null @@ -1,44 +0,0 @@ -# system.views {#system-views} - -Contains the dependencies of all views and the type to which the view belongs. The metadata of the view comes from the [system.tables](tables.md). - -Columns: - -- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the view is in. - -- `name` ([String](../../sql-reference/data-types/string.md)) — Name of the view. - -- `main_dependency_database` ([String](../../sql-reference/data-types/string.md)) — The name of the database on which the view depends. - -- `main_dependency_table` ([String](../../sql-reference/data-types/string.md)) - The name of the table on which the view depends. - -- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values: - - `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log. - - `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized). - - `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view). - -**Example** - -```sql -SELECT * FROM system.views LIMIT 2 FORMAT Vertical; -``` - -```text -Row 1: -────── -database: default -name: live_view -main_dependency_database: default -main_dependency_table: view_source_tb -view_type: Live - -Row 2: -────── -database: default -name: materialized_view -main_dependency_database: default -main_dependency_table: view_source_tb -view_type: Materialized -``` - -[Original article](https://clickhouse.tech/docs/en/operations/system-tables/views) diff --git a/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp index 53495aa3cb1..560d2d716c9 100644 --- a/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializedMySQLSyncThread.cpp @@ -247,7 +247,7 @@ void MaterializedMySQLSyncThread::assertMySQLAvailable() { try { - checkMySQLVariables(pool.get(), getContext()->getSettingsRef()); + checkMySQLVariables(pool.get(/* wait_timeout= */ UINT64_MAX), getContext()->getSettingsRef()); } catch (const mysqlxx::ConnectionFailed & e) { @@ -729,7 +729,7 @@ void MaterializedMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPt { /// Some behaviors(such as changing the value of "binlog_checksum") rotate the binlog file. /// To ensure that the synchronization continues, we need to handle these events - metadata.fetchMasterVariablesValue(pool.get()); + metadata.fetchMasterVariablesValue(pool.get(/* wait_timeout= */ UINT64_MAX)); client.setBinlogChecksum(metadata.binlog_checksum); } else if (receive_event->header.type != HEARTBEAT_EVENT) diff --git a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp index c9ea8d12ef2..218dda94d31 100644 --- a/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp +++ b/src/Databases/PostgreSQL/DatabaseMaterializedPostgreSQL.cpp @@ -61,10 +61,8 @@ void DatabaseMaterializedPostgreSQL::startSynchronization() connection_info, getContext(), is_attach, - settings->materialized_postgresql_max_block_size.value, - settings->materialized_postgresql_allow_automatic_update, - /* is_materialized_postgresql_database = */ true, - settings->materialized_postgresql_tables_list.value); + *settings, + /* is_materialized_postgresql_database = */ true); postgres::Connection connection(connection_info); NameSet tables_to_replicate; diff --git a/src/IO/ZstdInflatingReadBuffer.cpp b/src/IO/ZstdInflatingReadBuffer.cpp index 6c03ea420a9..da6768f7c4a 100644 --- a/src/IO/ZstdInflatingReadBuffer.cpp +++ b/src/IO/ZstdInflatingReadBuffer.cpp @@ -28,41 +28,49 @@ ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer() bool ZstdInflatingReadBuffer::nextImpl() { - if (eof) - return false; - - if (input.pos >= input.size) + do { - in->nextIfAtEnd(); - input.src = reinterpret_cast(in->position()); - input.pos = 0; - input.size = in->buffer().end() - in->position(); - } + // If it is known that end of file was reached, return false + if (eof) + return false; - output.dst = reinterpret_cast(internal_buffer.begin()); - output.size = internal_buffer.size(); - output.pos = 0; + /// If end was reached, get next part + if (input.pos >= input.size) + { + in->nextIfAtEnd(); + input.src = reinterpret_cast(in->position()); + input.pos = 0; + input.size = in->buffer().end() - in->position(); + } - size_t ret = ZSTD_decompressStream(dctx, &output, &input); - if (ZSTD_isError(ret)) - throw Exception( - ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream decoding failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING); + /// fill output + output.dst = reinterpret_cast(internal_buffer.begin()); + output.size = internal_buffer.size(); + output.pos = 0; - in->position() = in->buffer().begin() + input.pos; - working_buffer.resize(output.pos); + /// Decompress data and check errors. + size_t ret = ZSTD_decompressStream(dctx, &output, &input); + if (ZSTD_isError(ret)) + throw Exception( + ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream decoding failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING); - if (in->eof()) - { - eof = true; - return !working_buffer.empty(); - } - else if (output.pos == 0) - { + /// Check that something has changed after decompress (input or output position) + assert(output.pos > 0 || in->position() < in->buffer().begin() + input.pos); + + /// move position to the end of read data + in->position() = in->buffer().begin() + input.pos; + working_buffer.resize(output.pos); + + /// If end of file is reached, fill eof variable and return true if there is some data in buffer, otherwise return false + if (in->eof()) + { + eof = true; + return !working_buffer.empty(); + } /// It is possible, that input buffer is not at eof yet, but nothing was decompressed in current iteration. /// But there are cases, when such behaviour is not allowed - i.e. if input buffer is not eof, then /// it has to be guaranteed that working_buffer is not empty. So if it is empty, continue. - return nextImpl(); - } + } while (output.pos == 0); return true; } diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index af60eeeaba3..20ebc0a9ee5 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -617,12 +617,6 @@ Dependencies DatabaseCatalog::getDependencies(const StorageID & from) const return Dependencies(iter->second.begin(), iter->second.end()); } -ViewDependencies DatabaseCatalog::getViewDependencies() const -{ - std::lock_guard lock{databases_mutex}; - return ViewDependencies(view_dependencies.begin(), view_dependencies.end()); -} - void DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID & old_where, const StorageID & new_from, const StorageID & new_where) diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index 72dd28d335b..071b80690df 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -175,7 +175,6 @@ public: void addDependency(const StorageID & from, const StorageID & where); void removeDependency(const StorageID & from, const StorageID & where); Dependencies getDependencies(const StorageID & from) const; - ViewDependencies getViewDependencies() const; /// For Materialized and Live View void updateDependency(const StorageID & old_from, const StorageID & old_where,const StorageID & new_from, const StorageID & new_where); diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index 5ab59ba3f07..5304859aeea 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -402,8 +402,8 @@ void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) c + data_types[set_type_idx]->getName() + " on the right", ErrorCodes::TYPE_MISMATCH); } -MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector && index_mapping_) - : indexes_mapping(std::move(index_mapping_)) +MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector && indexes_mapping_) + : has_all_keys(set_elements.size() == indexes_mapping_.size()), indexes_mapping(std::move(indexes_mapping_)) { std::sort(indexes_mapping.begin(), indexes_mapping.end(), [](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r) @@ -548,11 +548,11 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector & key_ranges, break; } } - if (one_element_range) + if (one_element_range && has_all_keys) { /// Here we know that there is one element in range. /// The main difference with the normal case is that we can definitely say that - /// condition in this range always TRUE (can_be_false = 0) xor always FALSE (can_be_true = 0). + /// condition in this range is always TRUE (can_be_false = 0) or always FALSE (can_be_true = 0). /// Check if it's an empty range if (!left_included || !right_included) diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index 727a2c144a1..578913dd0d2 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -208,7 +208,7 @@ public: std::vector functions; }; - MergeTreeSetIndex(const Columns & set_elements, std::vector && index_mapping_); + MergeTreeSetIndex(const Columns & set_elements, std::vector && indexes_mapping_); size_t size() const { return ordered_set.at(0)->size(); } @@ -217,6 +217,8 @@ public: BoolMask checkInRange(const std::vector & key_ranges, const DataTypes & data_types) const; private: + // If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element. + bool has_all_keys; Columns ordered_set; std::vector indexes_mapping; diff --git a/src/Interpreters/TextLog.cpp b/src/Interpreters/TextLog.cpp index baf98b6771d..51ffbdd66ee 100644 --- a/src/Interpreters/TextLog.cpp +++ b/src/Interpreters/TextLog.cpp @@ -26,7 +26,8 @@ NamesAndTypesList TextLogElement::getNamesAndTypes() {"Notice", static_cast(Message::PRIO_NOTICE)}, {"Information", static_cast(Message::PRIO_INFORMATION)}, {"Debug", static_cast(Message::PRIO_DEBUG)}, - {"Trace", static_cast(Message::PRIO_TRACE)} + {"Trace", static_cast(Message::PRIO_TRACE)}, + {"Test", static_cast(Message::PRIO_TEST)}, }); return diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 734a67da758..92529b00faa 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -174,7 +174,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( if (query_info.projection->desc->is_minmax_count_projection) { Pipe pipe(std::make_shared( - query_info.minmax_count_projection_block, + query_info.minmax_count_projection_block.cloneEmpty(), Chunk(query_info.minmax_count_projection_block.getColumns(), query_info.minmax_count_projection_block.rows()))); auto read_from_pipe = std::make_unique(std::move(pipe)); projection_plan->addStep(std::move(read_from_pipe)); diff --git a/src/Storages/MySQL/MySQLSettings.h b/src/Storages/MySQL/MySQLSettings.h index da8723c2ea6..872b0607e20 100644 --- a/src/Storages/MySQL/MySQLSettings.h +++ b/src/Storages/MySQL/MySQLSettings.h @@ -17,6 +17,7 @@ class ASTStorage; #define LIST_OF_MYSQL_SETTINGS(M) \ M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \ M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \ + M(UInt64, connection_wait_timeout, 5, "Timeout (in seconds) for waiting for free connection (in case of there is already connection_pool_size active connections), 0 - do not wait.", 0) \ M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \ DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS) diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp index b43e7656084..1fc279bff23 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLConsumer.cpp @@ -625,9 +625,8 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot() tryLogCurrentException(__PRETTY_FUNCTION__); return false; } - catch (const pqxx::broken_connection & e) + catch (const pqxx::broken_connection &) { - LOG_ERROR(log, "Connection error: {}", e.what()); connection->tryUpdateConnection(); return false; } @@ -641,6 +640,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot() if (error_message.find("out of relcache_callback_list slots") == std::string::npos) tryLogCurrentException(__PRETTY_FUNCTION__); + connection->tryUpdateConnection(); return false; } catch (const pqxx::conversion_error & e) diff --git a/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h b/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h index 1d986b223e9..cc147a01d32 100644 --- a/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h +++ b/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.h @@ -17,6 +17,8 @@ namespace DB M(UInt64, materialized_postgresql_max_block_size, 65536, "Number of row collected before flushing data into table.", 0) \ M(String, materialized_postgresql_tables_list, "", "List of tables for MaterializedPostgreSQL database engine", 0) \ M(Bool, materialized_postgresql_allow_automatic_update, false, "Allow to reload table in the background, when schema changes are detected", 0) \ + M(String, materialized_postgresql_replication_slot, "", "A user-created replication slot", 0) \ + M(String, materialized_postgresql_snapshot, "", "User provided snapshot in case he manages replication slots himself", 0) \ DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS) diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp index c8c74d2ddaa..1bda6d13e11 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp @@ -32,24 +32,28 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler( const postgres::ConnectionInfo & connection_info_, ContextPtr context_, bool is_attach_, - const size_t max_block_size_, - bool allow_automatic_update_, - bool is_materialized_postgresql_database_, - const String tables_list_) + const MaterializedPostgreSQLSettings & replication_settings, + bool is_materialized_postgresql_database_) : log(&Poco::Logger::get("PostgreSQLReplicationHandler")) , context(context_) , is_attach(is_attach_) , remote_database_name(remote_database_name_) , current_database_name(current_database_name_) , connection_info(connection_info_) - , max_block_size(max_block_size_) - , allow_automatic_update(allow_automatic_update_) + , max_block_size(replication_settings.materialized_postgresql_max_block_size) + , allow_automatic_update(replication_settings.materialized_postgresql_allow_automatic_update) , is_materialized_postgresql_database(is_materialized_postgresql_database_) - , tables_list(tables_list_) + , tables_list(replication_settings.materialized_postgresql_tables_list) + , user_provided_snapshot(replication_settings.materialized_postgresql_snapshot) , connection(std::make_shared(connection_info_)) , milliseconds_to_wait(RESCHEDULE_MS) { - replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier); + replication_slot = replication_settings.materialized_postgresql_replication_slot; + if (replication_slot.empty()) + { + user_managed_slot = false; + replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier); + } publication_name = fmt::format("{}_ch_publication", replication_identifier); startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); }); @@ -121,7 +125,20 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) auto initial_sync = [&]() { - createReplicationSlot(tx, start_lsn, snapshot_name); + LOG_TRACE(log, "Starting tables sync load"); + + if (user_managed_slot) + { + if (user_provided_snapshot.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Using a user-defined replication slot must be provided with a snapshot from EXPORT SNAPSHOT when the slot is created." + "Pass it to `materialized_postgresql_snapshot` setting"); + snapshot_name = user_provided_snapshot; + } + else + { + createReplicationSlot(tx, start_lsn, snapshot_name); + } for (const auto & [table_name, storage] : materialized_storages) { @@ -147,12 +164,17 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error) /// Recreation of a replication slot imposes reloading of all tables. if (!isReplicationSlotExist(tx, start_lsn, /* temporary */false)) { + if (user_managed_slot) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Having replication slot `{}` from settings, but it does not exist", replication_slot); + initial_sync(); } /// Always drop replication slot if it is CREATE query and not ATTACH. else if (!is_attach || new_publication) { - dropReplicationSlot(tx); + if (!user_managed_slot) + dropReplicationSlot(tx); + initial_sync(); } /// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's @@ -376,6 +398,8 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction & void PostgreSQLReplicationHandler::createReplicationSlot( pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary) { + assert(temporary || !user_managed_slot); + String query_str, slot_name; if (temporary) slot_name = replication_slot + "_tmp"; @@ -401,6 +425,8 @@ void PostgreSQLReplicationHandler::createReplicationSlot( void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx, bool temporary) { + assert(temporary || !user_managed_slot); + std::string slot_name; if (temporary) slot_name = replication_slot + "_tmp"; @@ -433,14 +459,17 @@ void PostgreSQLReplicationHandler::shutdownFinal() connection->execWithRetry([&](pqxx::nontransaction & tx) { - if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false)) - dropReplicationSlot(tx, /* temporary */false); + if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true)) + dropReplicationSlot(tx, /* temporary */true); }); + if (user_managed_slot) + return; + connection->execWithRetry([&](pqxx::nontransaction & tx) { - if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true)) - dropReplicationSlot(tx, /* temporary */true); + if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false)) + dropReplicationSlot(tx, /* temporary */false); }); } catch (Exception & e) diff --git a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h index 3a0bedc0852..eacf6b69b3b 100644 --- a/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h +++ b/src/Storages/PostgreSQL/PostgreSQLReplicationHandler.h @@ -1,6 +1,7 @@ #pragma once #include "MaterializedPostgreSQLConsumer.h" +#include "MaterializedPostgreSQLSettings.h" #include #include @@ -25,10 +26,8 @@ public: const postgres::ConnectionInfo & connection_info_, ContextPtr context_, bool is_attach_, - const size_t max_block_size_, - bool allow_automatic_update_, - bool is_materialized_postgresql_database_, - const String tables_list = ""); + const MaterializedPostgreSQLSettings & replication_settings, + bool is_materialized_postgresql_database_); /// Activate task to be run from a separate thread: wait until connection is available and call startReplication(). void startup(); @@ -108,6 +107,9 @@ private: /// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated. String tables_list; + bool user_managed_slot = true; + String user_provided_snapshot; + String replication_slot, publication_name; /// Shared between replication_consumer and replication_handler, but never accessed concurrently. diff --git a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp index aa27a54cdac..73a685af9b4 100644 --- a/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp +++ b/src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp @@ -64,6 +64,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL( setInMemoryMetadata(storage_metadata); String replication_identifier = remote_database_name + "_" + remote_table_name_; + replication_settings->materialized_postgresql_tables_list = remote_table_name_; + replication_handler = std::make_unique( replication_identifier, remote_database_name, @@ -71,8 +73,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL( connection_info, getContext(), is_attach, - replication_settings->materialized_postgresql_max_block_size.value, - /* allow_automatic_update */ false, /* is_materialized_postgresql_database */false); + *replication_settings, + /* is_materialized_postgresql_database */false); if (!is_attach) { diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index 79bb1f59cc7..7f458ef82af 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -267,11 +267,15 @@ void registerStorageMySQL(StorageFactory & factory) throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS); auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); - mysqlxx::PoolWithFailover pool(remote_database, addresses, - username, password, + mysqlxx::PoolWithFailover pool( + remote_database, + addresses, + username, + password, MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, mysql_settings.connection_pool_size, - mysql_settings.connection_max_tries); + mysql_settings.connection_max_tries, + mysql_settings.connection_wait_timeout); bool replace_query = false; std::string on_duplicate_clause; diff --git a/src/Storages/System/StorageSystemViews.cpp b/src/Storages/System/StorageSystemViews.cpp deleted file mode 100644 index 0bb2724b358..00000000000 --- a/src/Storages/System/StorageSystemViews.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -class Context; - -NamesAndTypesList StorageSystemViews::getNamesAndTypes() -{ - auto view_type_datatype = std::make_shared(DataTypeEnum8::Values{ - {"Default", static_cast(QueryViewsLogElement::ViewType::DEFAULT)}, - {"Materialized", static_cast(QueryViewsLogElement::ViewType::MATERIALIZED)}, - {"Live", static_cast(QueryViewsLogElement::ViewType::LIVE)}}); - - return { - {"database", std::make_shared()}, - {"name", std::make_shared()}, - {"main_dependency_database", std::make_shared()}, - {"main_dependency_table", std::make_shared()}, - {"view_type", std::move(view_type_datatype)}, - }; -} - -void StorageSystemViews::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const -{ - const auto access = context->getAccess(); - const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); - - for (const auto & [table_id, view_ids] : DatabaseCatalog::instance().getViewDependencies()) - { - const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, table_id.database_name); - - if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, table_id.database_name, table_id.table_name)) - continue; - - size_t col_num; - for (const auto & view_id : view_ids) - { - auto view_ptr = DatabaseCatalog::instance().getTable(view_id, context); - QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT; - - if (typeid_cast(view_ptr.get())) - { - type = QueryViewsLogElement::ViewType::MATERIALIZED; - } - else if (typeid_cast(view_ptr.get())) - { - type = QueryViewsLogElement::ViewType::LIVE; - } - - col_num = 0; - res_columns[col_num++]->insert(view_id.database_name); - res_columns[col_num++]->insert(view_id.table_name); - res_columns[col_num++]->insert(table_id.database_name); - res_columns[col_num++]->insert(table_id.table_name); - res_columns[col_num++]->insert(type); - } - } -} - -} diff --git a/src/Storages/System/StorageSystemViews.h b/src/Storages/System/StorageSystemViews.h deleted file mode 100644 index 67fcb79067e..00000000000 --- a/src/Storages/System/StorageSystemViews.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - -class StorageSystemViews final : public shared_ptr_helper, public IStorageSystemOneBlock -{ - friend struct shared_ptr_helper; -protected: - using IStorageSystemOneBlock::IStorageSystemOneBlock; - - void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override; - -public: - std::string getName() const override { return "SystemViews"; } - - static NamesAndTypesList getNamesAndTypes(); - -}; - -} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 3656a239adb..95e86487073 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -44,7 +44,6 @@ #include #include #include -#include #include #include #include @@ -96,7 +95,6 @@ void attachSystemTablesLocal(IDatabase & system_database) attach(system_database, "zeros_mt", true); attach(system_database, "databases"); attach(system_database, "tables"); - attach(system_database, "views"); attach(system_database, "columns"); attach(system_database, "functions"); attach(system_database, "events"); diff --git a/tests/integration/test_keeper_two_nodes_cluster/__init__.py b/tests/integration/test_keeper_two_nodes_cluster/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_keeper_two_nodes_cluster/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_keeper_two_nodes_cluster/configs/enable_keeper1.xml b/tests/integration/test_keeper_two_nodes_cluster/configs/enable_keeper1.xml new file mode 100644 index 00000000000..21601ff4cc0 --- /dev/null +++ b/tests/integration/test_keeper_two_nodes_cluster/configs/enable_keeper1.xml @@ -0,0 +1,33 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 44444 + true + 3 + + + 2 + node2 + 44444 + true + true + 2 + + + + diff --git a/tests/integration/test_keeper_two_nodes_cluster/configs/enable_keeper2.xml b/tests/integration/test_keeper_two_nodes_cluster/configs/enable_keeper2.xml new file mode 100644 index 00000000000..baee6b578a0 --- /dev/null +++ b/tests/integration/test_keeper_two_nodes_cluster/configs/enable_keeper2.xml @@ -0,0 +1,33 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 44444 + true + 3 + + + 2 + node2 + 44444 + true + true + 2 + + + + diff --git a/tests/integration/test_keeper_two_nodes_cluster/configs/use_keeper.xml b/tests/integration/test_keeper_two_nodes_cluster/configs/use_keeper.xml new file mode 100644 index 00000000000..740b2afaab9 --- /dev/null +++ b/tests/integration/test_keeper_two_nodes_cluster/configs/use_keeper.xml @@ -0,0 +1,12 @@ + + + + node1 + 9181 + + + node2 + 9181 + + + diff --git a/tests/integration/test_keeper_two_nodes_cluster/test.py b/tests/integration/test_keeper_two_nodes_cluster/test.py new file mode 100644 index 00000000000..e6e3eb37af2 --- /dev/null +++ b/tests/integration/test_keeper_two_nodes_cluster/test.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 + +import pytest +from helpers.cluster import ClickHouseCluster +import random +import string +import os +import time +from multiprocessing.dummy import Pool +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True) +node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True) + +from kazoo.client import KazooClient, KazooState + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + +def smaller_exception(ex): + return '\n'.join(str(ex).split('\n')[0:2]) + +def wait_node(node): + for _ in range(100): + zk = None + try: + node.query("SELECT * FROM system.zookeeper WHERE path = '/'") + zk = get_fake_zk(node.name, timeout=30.0) + zk.create("/test", sequence=True) + print("node", node.name, "ready") + break + except Exception as ex: + time.sleep(0.2) + print("Waiting until", node.name, "will be ready, exception", ex) + finally: + if zk: + zk.stop() + zk.close() + else: + raise Exception("Can't wait node", node.name, "to become ready") + +def wait_nodes(): + for node in [node1, node2]: + wait_node(node) + + +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) + _fake_zk_instance.start() + return _fake_zk_instance + +def test_read_write_two_nodes(started_cluster): + try: + wait_nodes() + node1_zk = get_fake_zk("node1") + node2_zk = get_fake_zk("node2") + + node1_zk.create("/test_read_write_multinode_node1", b"somedata1") + node2_zk.create("/test_read_write_multinode_node2", b"somedata2") + + # stale reads are allowed + while node1_zk.exists("/test_read_write_multinode_node2") is None: + time.sleep(0.1) + + # stale reads are allowed + while node2_zk.exists("/test_read_write_multinode_node1") is None: + time.sleep(0.1) + + assert node2_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1" + assert node1_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1" + + assert node2_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2" + assert node1_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2" + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass + +def test_read_write_two_nodes_with_blocade(started_cluster): + try: + wait_nodes() + node1_zk = get_fake_zk("node1", timeout=5.0) + node2_zk = get_fake_zk("node2", timeout=5.0) + + print("Blocking nodes") + with PartitionManager() as pm: + pm.partition_instances(node2, node1) + + # We will respond conection loss but process this query + # after blocade will be removed + with pytest.raises(Exception): + node1_zk.create("/test_read_write_blocked_node1", b"somedata1") + + # This node is not leader and will not process anything + with pytest.raises(Exception): + node2_zk.create("/test_read_write_blocked_node2", b"somedata2") + + + print("Nodes unblocked") + for i in range(10): + try: + node1_zk = get_fake_zk("node1") + node2_zk = get_fake_zk("node2") + break + except: + time.sleep(0.5) + + + for i in range(100): + try: + node1_zk.create("/test_after_block1", b"somedata12") + break + except: + time.sleep(0.1) + else: + raise Exception("node1 cannot recover after blockade") + + print("Node1 created it's value") + + for i in range(100): + try: + node2_zk.create("/test_after_block2", b"somedata12") + break + except: + time.sleep(0.1) + else: + raise Exception("node2 cannot recover after blockade") + + print("Node2 created it's value") + + # stale reads are allowed + while node1_zk.exists("/test_after_block2") is None: + time.sleep(0.1) + + # stale reads are allowed + while node2_zk.exists("/test_after_block1") is None: + time.sleep(0.1) + + assert node1_zk.exists("/test_after_block1") is not None + assert node1_zk.exists("/test_after_block2") is not None + assert node2_zk.exists("/test_after_block1") is not None + assert node2_zk.exists("/test_after_block2") is not None + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass diff --git a/tests/integration/test_postgresql_replica_database_engine/test.py b/tests/integration/test_postgresql_replica_database_engine/test.py index 68b42d91fb6..1dd096087ff 100644 --- a/tests/integration/test_postgresql_replica_database_engine/test.py +++ b/tests/integration/test_postgresql_replica_database_engine/test.py @@ -31,18 +31,33 @@ postgres_table_template_3 = """ key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL) """ -def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'): +def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False): if database == True: conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name) else: conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port) + if replication: + conn_string += " replication='database'" + conn = psycopg2.connect(conn_string) if auto_commit: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) conn.autocommit = True return conn +def create_replication_slot(conn, slot_name='user_slot'): + cursor = conn.cursor() + cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name)) + result = cursor.fetchall() + print(result[0][0]) # slot name + print(result[0][1]) # start lsn + print(result[0][2]) # snapshot + return result[0][2] + +def drop_replication_slot(conn, slot_name='user_slot'): + cursor = conn.cursor() + cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name)) def create_postgres_db(cursor, name='postgres_database'): cursor.execute("CREATE DATABASE {}".format(name)) @@ -941,6 +956,34 @@ def test_quoting(started_cluster): drop_materialized_db() +def test_user_managed_slots(started_cluster): + conn = get_postgres_conn(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + database=True) + cursor = conn.cursor() + table_name = 'test_table' + create_postgres_table(cursor, table_name); + instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name)) + + slot_name = 'user_slot' + replication_connection = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port, + database=True, replication=True, auto_commit=True) + snapshot = create_replication_slot(replication_connection, slot_name=slot_name) + create_materialized_db(ip=started_cluster.postgres_ip, + port=started_cluster.postgres_port, + settings=["materialized_postgresql_replication_slot = '{}'".format(slot_name), + "materialized_postgresql_snapshot = '{}'".format(snapshot)]) + check_tables_are_synchronized(table_name); + instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name)) + check_tables_are_synchronized(table_name); + instance.restart_clickhouse() + instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(20000, 10000)".format(table_name)) + check_tables_are_synchronized(table_name); + drop_postgres_table(cursor, table_name) + drop_materialized_db() + drop_replication_slot(replication_connection, slot_name) + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...") diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index a044528cacf..c7ede8dede4 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -3,7 +3,10 @@ from contextlib import contextmanager ## sudo -H pip install PyMySQL import pymysql.cursors import pytest +import time +import threading from helpers.cluster import ClickHouseCluster +from helpers.client import QueryRuntimeException cluster = ClickHouseCluster(__file__) @@ -319,6 +322,51 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL conn.close() +# Check that limited connection_wait_timeout (via connection_pool_size=1) will throw. +def test_settings_connection_wait_timeout(started_cluster): + table_name = 'test_settings_connection_wait_timeout' + node1.query(f'DROP TABLE IF EXISTS {table_name}') + wait_timeout = 2 + + conn = get_mysql_conn(started_cluster, cluster.mysql_ip) + drop_mysql_table(conn, table_name) + create_mysql_table(conn, table_name) + + node1.query(''' + CREATE TABLE {} + ( + id UInt32, + name String, + age UInt32, + money UInt32 + ) + ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse') + SETTINGS connection_wait_timeout={}, connection_pool_size=1 + '''.format(table_name, table_name, wait_timeout) + ) + + node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name)) + + def worker(): + node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name)) + + worker_thread = threading.Thread(target=worker) + worker_thread.start() + + # ensure that first query started in worker_thread + time.sleep(1) + + started = time.time() + with pytest.raises(QueryRuntimeException, match=r"Exception: mysqlxx::Pool is full \(connection_wait_timeout is exceeded\)"): + node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name)) + ended = time.time() + assert (ended - started) >= wait_timeout + + worker_thread.join() + + drop_mysql_table(conn, table_name) + conn.close() + if __name__ == '__main__': with contextmanager(started_cluster)() as cluster: for name, instance in list(cluster.instances.items()): diff --git a/tests/queries/0_stateless/01710_minmax_count_projection.reference b/tests/queries/0_stateless/01710_minmax_count_projection.reference index 882d808069e..ad9b87b998d 100644 --- a/tests/queries/0_stateless/01710_minmax_count_projection.reference +++ b/tests/queries/0_stateless/01710_minmax_count_projection.reference @@ -2,3 +2,4 @@ 0 9998 5000 1 9999 5000 0 9998 5000 +1 diff --git a/tests/queries/0_stateless/01710_minmax_count_projection.sql b/tests/queries/0_stateless/01710_minmax_count_projection.sql index 3ee19fe8c2e..58af11f01f7 100644 --- a/tests/queries/0_stateless/01710_minmax_count_projection.sql +++ b/tests/queries/0_stateless/01710_minmax_count_projection.sql @@ -11,4 +11,7 @@ select min(i), max(i), count() from d group by _partition_id order by _partition select min(i), max(i), count() from d where _partition_value.1 = 0 group by _partition_id order by _partition_id; select min(i), max(i), count() from d where _partition_value.1 = 10 group by _partition_id order by _partition_id; +-- fuzz crash +select min(i) from d where 1 = _partition_value.1; + drop table d; diff --git a/tests/queries/0_stateless/01891_not_in_partition_prune.reference b/tests/queries/0_stateless/01891_not_in_partition_prune.reference index 628053cd4f8..9d2517ad760 100644 --- a/tests/queries/0_stateless/01891_not_in_partition_prune.reference +++ b/tests/queries/0_stateless/01891_not_in_partition_prune.reference @@ -4,3 +4,5 @@ 7 107 8 108 9 109 +1970-01-01 1 one +1970-01-01 3 three diff --git a/tests/queries/0_stateless/01891_not_in_partition_prune.sql b/tests/queries/0_stateless/01891_not_in_partition_prune.sql index edbfad93e5d..5bf90fdd65c 100644 --- a/tests/queries/0_stateless/01891_not_in_partition_prune.sql +++ b/tests/queries/0_stateless/01891_not_in_partition_prune.sql @@ -8,3 +8,18 @@ set max_rows_to_read = 5; select * from test1 where i not in (1,2,3,4,5) order by i; drop table test1; + +drop table if exists t1; +drop table if exists t2; + +create table t1 (date Date, a Float64, b String) Engine=MergeTree ORDER BY date; +create table t2 (date Date, a Float64, b String) Engine=MergeTree ORDER BY date; + +insert into t1(a, b) values (1, 'one'), (2, 'two'); +insert into t2(a, b) values (2, 'two'), (3, 'three'); + +select date, a, b from t1 where (date, a, b) NOT IN (select date,a,b from t2); +select date, a, b from t2 where (date, a, b) NOT IN (select date,a,b from t1); + +drop table t1; +drop table t2; diff --git a/tests/queries/0_stateless/02015_system_views.reference b/tests/queries/0_stateless/02015_system_views.reference deleted file mode 100644 index a1b1b2a9fd3..00000000000 --- a/tests/queries/0_stateless/02015_system_views.reference +++ /dev/null @@ -1 +0,0 @@ -02015_db materialized_view 02015_db view_source_tb Materialized diff --git a/tests/queries/0_stateless/02015_system_views.sql b/tests/queries/0_stateless/02015_system_views.sql deleted file mode 100644 index a6375dcb591..00000000000 --- a/tests/queries/0_stateless/02015_system_views.sql +++ /dev/null @@ -1,14 +0,0 @@ -DROP DATABASE IF EXISTS 02015_db; -CREATE DATABASE IF NOT EXISTS 02015_db; - -DROP TABLE IF EXISTS 02015_db.view_source_tb; -CREATE TABLE IF NOT EXISTS 02015_db.view_source_tb (a UInt8, s String) ENGINE = MergeTree() ORDER BY a; - -DROP TABLE IF EXISTS 02015_db.materialized_view; -CREATE MATERIALIZED VIEW IF NOT EXISTS 02015_db.materialized_view ENGINE = ReplacingMergeTree() ORDER BY a AS SELECT * FROM 02015_db.view_source_tb; - -SELECT * FROM system.views WHERE database='02015_db' and name = 'materialized_view'; - -DROP TABLE IF EXISTS 02015_db.materialized_view; -DROP TABLE IF EXISTS 02015_db.view_source_tb; -DROP DATABASE IF EXISTS 02015_db; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 0143cc78dbe..335ed370b9b 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -512,7 +512,6 @@ "01532_execute_merges_on_single_replica", /// static zk path "01530_drop_database_atomic_sync", /// creates database "02001_add_default_database_to_system_users", ///create user - "02002_row_level_filter_bug", ///create user - "02015_system_views" + "02002_row_level_filter_bug" ///create user ] }