Merge branch 'master' into move_docker_images_to_separate_repo

This commit is contained in:
mergify[bot] 2021-09-07 18:29:33 +00:00 committed by GitHub
commit 40c61a98a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 253 additions and 213 deletions

View File

@ -7,10 +7,22 @@
#endif
#include <mysqlxx/Pool.h>
#include <common/sleep.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <ctime>
namespace
{
inline uint64_t clock_gettime_ns(clockid_t clock_type = CLOCK_MONOTONIC)
{
struct timespec ts;
clock_gettime(clock_type, &ts);
return uint64_t(ts.tv_sec * 1000000000LL + ts.tv_nsec);
}
}
namespace mysqlxx
@ -124,10 +136,15 @@ Pool::~Pool()
}
Pool::Entry Pool::get()
Pool::Entry Pool::get(uint64_t wait_timeout)
{
std::unique_lock<std::mutex> lock(mutex);
uint64_t deadline = 0;
/// UINT64_MAX -- wait indefinitely
if (wait_timeout && wait_timeout != UINT64_MAX)
deadline = clock_gettime_ns() + wait_timeout * 1'000'000'000;
initialize();
for (;;)
{
@ -153,6 +170,12 @@ Pool::Entry Pool::get()
logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription());
}
if (!wait_timeout)
throw Poco::Exception("mysqlxx::Pool is full (wait is disabled, see connection_wait_timeout setting)");
if (deadline && clock_gettime_ns() >= deadline)
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
lock.unlock();
logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);

View File

@ -189,7 +189,7 @@ public:
~Pool();
/// Allocates connection.
Entry get();
Entry get(uint64_t wait_timeout);
/// Allocates connection.
/// If database is not accessible, returns empty Entry object.

View File

@ -21,8 +21,9 @@ PoolWithFailover::PoolWithFailover(
const unsigned max_connections_,
const size_t max_tries_)
: max_tries(max_tries_)
, shareable(config_.getBool(config_name_ + ".share_connection", false))
, wait_timeout(UINT64_MAX)
{
shareable = config_.getBool(config_name_ + ".share_connection", false);
if (config_.has(config_name_ + ".replica"))
{
Poco::Util::AbstractConfiguration::Keys replica_keys;
@ -80,9 +81,11 @@ PoolWithFailover::PoolWithFailover(
const std::string & password,
unsigned default_connections_,
unsigned max_connections_,
size_t max_tries_)
size_t max_tries_,
uint64_t wait_timeout_)
: max_tries(max_tries_)
, shareable(false)
, wait_timeout(wait_timeout_)
{
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : addresses)
@ -101,6 +104,7 @@ PoolWithFailover::PoolWithFailover(
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
: max_tries{other.max_tries}
, shareable{other.shareable}
, wait_timeout(other.wait_timeout)
{
if (shareable)
{
@ -140,7 +144,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
try
{
Entry entry = shareable ? pool->get() : pool->tryGet();
Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet();
if (!entry.isNull())
{
@ -172,7 +176,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
if (full_pool)
{
app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription());
return (*full_pool)->get();
return (*full_pool)->get(wait_timeout);
}
std::stringstream message;

View File

@ -80,6 +80,8 @@ namespace mysqlxx
std::mutex mutex;
/// Can the Pool be shared
bool shareable;
/// Timeout for waiting free connection.
uint64_t wait_timeout = 0;
public:
using Entry = Pool::Entry;
@ -96,6 +98,7 @@ namespace mysqlxx
* default_connections Number of connection in pool to each replica at start.
* max_connections Maximum number of connections in pool to each replica.
* max_tries_ Max number of connection tries.
* wait_timeout_ Timeout for waiting free connection.
*/
PoolWithFailover(
const std::string & config_name_,
@ -117,7 +120,8 @@ namespace mysqlxx
const std::string & password,
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
uint64_t wait_timeout_ = UINT64_MAX);
PoolWithFailover(const PoolWithFailover & other);

View File

@ -31,6 +31,10 @@ ENGINE = MaterializedPostgreSQL('host:port', ['database' | database], 'user', 'p
- [materialized_postgresql_allow_automatic_update](../../operations/settings/settings.md#materialized-postgresql-allow-automatic-update)
- [materialized_postgresql_replication_slot](../../operations/settings/settings.md#materialized-postgresql-replication-slot)
- [materialized_postgresql_snapshot](../../operations/settings/settings.md#materialized-postgresql-snapshot)
``` sql
CREATE DATABASE database1
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
@ -73,7 +77,7 @@ WHERE oid = 'postgres_table'::regclass;
!!! warning "Warning"
Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.html) values is not supported. The default value for the data type will be used.
## Example of Use {#example-of-use}
``` sql
@ -82,3 +86,11 @@ ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres
SELECT * FROM postgresql_db.postgres_table;
```
## Notes {#notes}
- Failover of the logical replication slot.
Logical Replication Slots which exist on the primary are not available on standby replicas.
So if there is a failover, new primary (the old physical standby) wont be aware of any slots which were existing with old primary. This will lead to a broken replication from PostgreSQL.
A solution to this is to manage replication slots yourself and define a permanent replication slot (some information can be found [here](https://patroni.readthedocs.io/en/latest/SETTINGS.html)). You'll need to pass slot name via `materialized_postgresql_replication_slot` setting, and it has to be exported with `EXPORT SNAPSHOT` option. The snapshot identifier needs to be passed via `materialized_postgresql_snapshot` setting.

View File

@ -19,6 +19,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
SETTINGS
[connection_pool_size=16, ]
[connection_max_tries=3, ]
[connection_wait_timeout=5, ] /* 0 -- do not wait */
[connection_auto_close=true ]
;
```

View File

@ -3436,6 +3436,14 @@ Possible values:
Default value: `0`.
## materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot}
Allows to have user-managed replication slots. Must be used together with `materialized_postgresql_snapshot`.
## materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot}
A text string identifying a snapshot, from which initial dump of tables will be performed. Must be used together with `materialized_postgresql_replication_slot`.
## allow_experimental_projection_optimization {#allow-experimental-projection-optimization}
Enables or disables [projection](../../engines/table-engines/mergetree-family/mergetree.md#projections) optimization when processing `SELECT` queries.
@ -3449,7 +3457,7 @@ Default value: `0`.
## force_optimize_projection {#force-optimize-projection}
Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting).
Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting).
Possible values:

View File

@ -1,44 +0,0 @@
# system.views {#system-views}
Contains the dependencies of all views and the type to which the view belongs. The metadata of the view comes from the [system.tables](tables.md).
Columns:
- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the view is in.
- `name` ([String](../../sql-reference/data-types/string.md)) — Name of the view.
- `main_dependency_database` ([String](../../sql-reference/data-types/string.md)) — The name of the database on which the view depends.
- `main_dependency_table` ([String](../../sql-reference/data-types/string.md)) - The name of the table on which the view depends.
- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values:
- `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log.
- `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized).
- `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view).
**Example**
```sql
SELECT * FROM system.views LIMIT 2 FORMAT Vertical;
```
```text
Row 1:
──────
database: default
name: live_view
main_dependency_database: default
main_dependency_table: view_source_tb
view_type: Live
Row 2:
──────
database: default
name: materialized_view
main_dependency_database: default
main_dependency_table: view_source_tb
view_type: Materialized
```
[Original article](https://clickhouse.tech/docs/en/operations/system-tables/views) <!--hide-->

View File

@ -247,7 +247,7 @@ void MaterializedMySQLSyncThread::assertMySQLAvailable()
{
try
{
checkMySQLVariables(pool.get(), getContext()->getSettingsRef());
checkMySQLVariables(pool.get(/* wait_timeout= */ UINT64_MAX), getContext()->getSettingsRef());
}
catch (const mysqlxx::ConnectionFailed & e)
{
@ -729,7 +729,7 @@ void MaterializedMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPt
{
/// Some behaviors(such as changing the value of "binlog_checksum") rotate the binlog file.
/// To ensure that the synchronization continues, we need to handle these events
metadata.fetchMasterVariablesValue(pool.get());
metadata.fetchMasterVariablesValue(pool.get(/* wait_timeout= */ UINT64_MAX));
client.setBinlogChecksum(metadata.binlog_checksum);
}
else if (receive_event->header.type != HEARTBEAT_EVENT)

View File

@ -61,10 +61,8 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
connection_info,
getContext(),
is_attach,
settings->materialized_postgresql_max_block_size.value,
settings->materialized_postgresql_allow_automatic_update,
/* is_materialized_postgresql_database = */ true,
settings->materialized_postgresql_tables_list.value);
*settings,
/* is_materialized_postgresql_database = */ true);
postgres::Connection connection(connection_info);
NameSet tables_to_replicate;

View File

@ -41,7 +41,7 @@ public:
return name;
}
size_t getNumberOfArguments() const override { return 4; }
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForConstants() const override { return true; }

View File

@ -41,7 +41,7 @@ public:
return name;
}
size_t getNumberOfArguments() const override { return 4; }
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForConstants() const override { return true; }

View File

@ -617,12 +617,6 @@ Dependencies DatabaseCatalog::getDependencies(const StorageID & from) const
return Dependencies(iter->second.begin(), iter->second.end());
}
ViewDependencies DatabaseCatalog::getViewDependencies() const
{
std::lock_guard lock{databases_mutex};
return ViewDependencies(view_dependencies.begin(), view_dependencies.end());
}
void
DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID & old_where, const StorageID & new_from,
const StorageID & new_where)

View File

@ -175,7 +175,6 @@ public:
void addDependency(const StorageID & from, const StorageID & where);
void removeDependency(const StorageID & from, const StorageID & where);
Dependencies getDependencies(const StorageID & from) const;
ViewDependencies getViewDependencies() const;
/// For Materialized and Live View
void updateDependency(const StorageID & old_from, const StorageID & old_where,const StorageID & new_from, const StorageID & new_where);

View File

@ -402,8 +402,8 @@ void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) c
+ data_types[set_type_idx]->getName() + " on the right", ErrorCodes::TYPE_MISMATCH);
}
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
: indexes_mapping(std::move(index_mapping_))
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_)
: has_all_keys(set_elements.size() == indexes_mapping_.size()), indexes_mapping(std::move(indexes_mapping_))
{
std::sort(indexes_mapping.begin(), indexes_mapping.end(),
[](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r)
@ -548,11 +548,11 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges,
break;
}
}
if (one_element_range)
if (one_element_range && has_all_keys)
{
/// Here we know that there is one element in range.
/// The main difference with the normal case is that we can definitely say that
/// condition in this range always TRUE (can_be_false = 0) xor always FALSE (can_be_true = 0).
/// condition in this range is always TRUE (can_be_false = 0) or always FALSE (can_be_true = 0).
/// Check if it's an empty range
if (!left_included || !right_included)

View File

@ -208,7 +208,7 @@ public:
std::vector<FunctionBasePtr> functions;
};
MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_);
MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_);
size_t size() const { return ordered_set.at(0)->size(); }
@ -217,6 +217,8 @@ public:
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const;
private:
// If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element.
bool has_all_keys;
Columns ordered_set;
std::vector<KeyTuplePositionMapping> indexes_mapping;

View File

@ -26,7 +26,8 @@ NamesAndTypesList TextLogElement::getNamesAndTypes()
{"Notice", static_cast<Int8>(Message::PRIO_NOTICE)},
{"Information", static_cast<Int8>(Message::PRIO_INFORMATION)},
{"Debug", static_cast<Int8>(Message::PRIO_DEBUG)},
{"Trace", static_cast<Int8>(Message::PRIO_TRACE)}
{"Trace", static_cast<Int8>(Message::PRIO_TRACE)},
{"Test", static_cast<Int8>(Message::PRIO_TEST)},
});
return

View File

@ -17,6 +17,7 @@ class ASTStorage;
#define LIST_OF_MYSQL_SETTINGS(M) \
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \
M(UInt64, connection_wait_timeout, 5, "Timeout (in seconds) for waiting for free connection (in case of there is already connection_pool_size active connections), 0 - do not wait.", 0) \
M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)

View File

@ -625,9 +625,8 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
tryLogCurrentException(__PRETTY_FUNCTION__);
return false;
}
catch (const pqxx::broken_connection & e)
catch (const pqxx::broken_connection &)
{
LOG_ERROR(log, "Connection error: {}", e.what());
connection->tryUpdateConnection();
return false;
}
@ -641,6 +640,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
if (error_message.find("out of relcache_callback_list slots") == std::string::npos)
tryLogCurrentException(__PRETTY_FUNCTION__);
connection->tryUpdateConnection();
return false;
}
catch (const pqxx::conversion_error & e)

View File

@ -17,6 +17,8 @@ namespace DB
M(UInt64, materialized_postgresql_max_block_size, 65536, "Number of row collected before flushing data into table.", 0) \
M(String, materialized_postgresql_tables_list, "", "List of tables for MaterializedPostgreSQL database engine", 0) \
M(Bool, materialized_postgresql_allow_automatic_update, false, "Allow to reload table in the background, when schema changes are detected", 0) \
M(String, materialized_postgresql_replication_slot, "", "A user-created replication slot", 0) \
M(String, materialized_postgresql_snapshot, "", "User provided snapshot in case he manages replication slots himself", 0) \
DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)

View File

@ -32,24 +32,28 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
const size_t max_block_size_,
bool allow_automatic_update_,
bool is_materialized_postgresql_database_,
const String tables_list_)
const MaterializedPostgreSQLSettings & replication_settings,
bool is_materialized_postgresql_database_)
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, context(context_)
, is_attach(is_attach_)
, remote_database_name(remote_database_name_)
, current_database_name(current_database_name_)
, connection_info(connection_info_)
, max_block_size(max_block_size_)
, allow_automatic_update(allow_automatic_update_)
, max_block_size(replication_settings.materialized_postgresql_max_block_size)
, allow_automatic_update(replication_settings.materialized_postgresql_allow_automatic_update)
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
, tables_list(tables_list_)
, tables_list(replication_settings.materialized_postgresql_tables_list)
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
, connection(std::make_shared<postgres::Connection>(connection_info_))
, milliseconds_to_wait(RESCHEDULE_MS)
{
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
replication_slot = replication_settings.materialized_postgresql_replication_slot;
if (replication_slot.empty())
{
user_managed_slot = false;
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
}
publication_name = fmt::format("{}_ch_publication", replication_identifier);
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
@ -121,7 +125,20 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
auto initial_sync = [&]()
{
createReplicationSlot(tx, start_lsn, snapshot_name);
LOG_TRACE(log, "Starting tables sync load");
if (user_managed_slot)
{
if (user_provided_snapshot.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Using a user-defined replication slot must be provided with a snapshot from EXPORT SNAPSHOT when the slot is created."
"Pass it to `materialized_postgresql_snapshot` setting");
snapshot_name = user_provided_snapshot;
}
else
{
createReplicationSlot(tx, start_lsn, snapshot_name);
}
for (const auto & [table_name, storage] : materialized_storages)
{
@ -147,12 +164,17 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// Recreation of a replication slot imposes reloading of all tables.
if (!isReplicationSlotExist(tx, start_lsn, /* temporary */false))
{
if (user_managed_slot)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Having replication slot `{}` from settings, but it does not exist", replication_slot);
initial_sync();
}
/// Always drop replication slot if it is CREATE query and not ATTACH.
else if (!is_attach || new_publication)
{
dropReplicationSlot(tx);
if (!user_managed_slot)
dropReplicationSlot(tx);
initial_sync();
}
/// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's
@ -376,6 +398,8 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
void PostgreSQLReplicationHandler::createReplicationSlot(
pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary)
{
assert(temporary || !user_managed_slot);
String query_str, slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
@ -401,6 +425,8 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx, bool temporary)
{
assert(temporary || !user_managed_slot);
std::string slot_name;
if (temporary)
slot_name = replication_slot + "_tmp";
@ -433,14 +459,17 @@ void PostgreSQLReplicationHandler::shutdownFinal()
connection->execWithRetry([&](pqxx::nontransaction & tx)
{
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx, /* temporary */false);
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
});
if (user_managed_slot)
return;
connection->execWithRetry([&](pqxx::nontransaction & tx)
{
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
dropReplicationSlot(tx, /* temporary */true);
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
dropReplicationSlot(tx, /* temporary */false);
});
}
catch (Exception & e)

View File

@ -1,6 +1,7 @@
#pragma once
#include "MaterializedPostgreSQLConsumer.h"
#include "MaterializedPostgreSQLSettings.h"
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
#include <Core/PostgreSQL/Utils.h>
@ -25,10 +26,8 @@ public:
const postgres::ConnectionInfo & connection_info_,
ContextPtr context_,
bool is_attach_,
const size_t max_block_size_,
bool allow_automatic_update_,
bool is_materialized_postgresql_database_,
const String tables_list = "");
const MaterializedPostgreSQLSettings & replication_settings,
bool is_materialized_postgresql_database_);
/// Activate task to be run from a separate thread: wait until connection is available and call startReplication().
void startup();
@ -108,6 +107,9 @@ private:
/// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated.
String tables_list;
bool user_managed_slot = true;
String user_provided_snapshot;
String replication_slot, publication_name;
/// Shared between replication_consumer and replication_handler, but never accessed concurrently.

View File

@ -64,6 +64,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
setInMemoryMetadata(storage_metadata);
String replication_identifier = remote_database_name + "_" + remote_table_name_;
replication_settings->materialized_postgresql_tables_list = remote_table_name_;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
replication_identifier,
remote_database_name,
@ -71,8 +73,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
connection_info,
getContext(),
is_attach,
replication_settings->materialized_postgresql_max_block_size.value,
/* allow_automatic_update */ false, /* is_materialized_postgresql_database */false);
*replication_settings,
/* is_materialized_postgresql_database */false);
if (!is_attach)
{

View File

@ -267,11 +267,15 @@ void registerStorageMySQL(StorageFactory & factory)
throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS);
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
mysqlxx::PoolWithFailover pool(remote_database, addresses,
username, password,
mysqlxx::PoolWithFailover pool(
remote_database,
addresses,
username,
password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
mysql_settings.connection_pool_size,
mysql_settings.connection_max_tries);
mysql_settings.connection_max_tries,
mysql_settings.connection_wait_timeout);
bool replace_query = false;
std::string on_duplicate_clause;

View File

@ -1,68 +0,0 @@
#include <Storages/System/StorageSystemViews.h>
#include <DataTypes/DataTypeString.h>
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryViewsLog.h>
#include <DataTypes/DataTypeEnum.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/LiveView/StorageLiveView.h>
namespace DB
{
class Context;
NamesAndTypesList StorageSystemViews::getNamesAndTypes()
{
auto view_type_datatype = std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
{"Default", static_cast<Int8>(QueryViewsLogElement::ViewType::DEFAULT)},
{"Materialized", static_cast<Int8>(QueryViewsLogElement::ViewType::MATERIALIZED)},
{"Live", static_cast<Int8>(QueryViewsLogElement::ViewType::LIVE)}});
return {
{"database", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"main_dependency_database", std::make_shared<DataTypeString>()},
{"main_dependency_table", std::make_shared<DataTypeString>()},
{"view_type", std::move(view_type_datatype)},
};
}
void StorageSystemViews::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
const auto access = context->getAccess();
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
for (const auto & [table_id, view_ids] : DatabaseCatalog::instance().getViewDependencies())
{
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, table_id.database_name);
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, table_id.database_name, table_id.table_name))
continue;
size_t col_num;
for (const auto & view_id : view_ids)
{
auto view_ptr = DatabaseCatalog::instance().getTable(view_id, context);
QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT;
if (typeid_cast<const StorageMaterializedView *>(view_ptr.get()))
{
type = QueryViewsLogElement::ViewType::MATERIALIZED;
}
else if (typeid_cast<const StorageLiveView *>(view_ptr.get()))
{
type = QueryViewsLogElement::ViewType::LIVE;
}
col_num = 0;
res_columns[col_num++]->insert(view_id.database_name);
res_columns[col_num++]->insert(view_id.table_name);
res_columns[col_num++]->insert(table_id.database_name);
res_columns[col_num++]->insert(table_id.table_name);
res_columns[col_num++]->insert(type);
}
}
}
}

View File

@ -1,24 +0,0 @@
#pragma once
#include <common/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemViews final : public shared_ptr_helper<StorageSystemViews>, public IStorageSystemOneBlock<StorageSystemViews>
{
friend struct shared_ptr_helper<StorageSystemViews>;
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
public:
std::string getName() const override { return "SystemViews"; }
static NamesAndTypesList getNamesAndTypes();
};
}

View File

@ -44,7 +44,6 @@
#include <Storages/System/StorageSystemTableEngines.h>
#include <Storages/System/StorageSystemTableFunctions.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/System/StorageSystemViews.h>
#include <Storages/System/StorageSystemZooKeeper.h>
#include <Storages/System/StorageSystemContributors.h>
#include <Storages/System/StorageSystemErrors.h>
@ -96,7 +95,6 @@ void attachSystemTablesLocal(IDatabase & system_database)
attach<StorageSystemZeros>(system_database, "zeros_mt", true);
attach<StorageSystemDatabases>(system_database, "databases");
attach<StorageSystemTables>(system_database, "tables");
attach<StorageSystemViews>(system_database, "views");
attach<StorageSystemColumns>(system_database, "columns");
attach<StorageSystemFunctions>(system_database, "functions");
attach<StorageSystemEvents>(system_database, "events");

View File

@ -31,18 +31,33 @@ postgres_table_template_3 = """
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
"""
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'):
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
if database == True:
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
else:
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
if replication:
conn_string += " replication='database'"
conn = psycopg2.connect(conn_string)
if auto_commit:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name))
result = cursor.fetchall()
print(result[0][0]) # slot name
print(result[0][1]) # start lsn
print(result[0][2]) # snapshot
return result[0][2]
def drop_replication_slot(conn, slot_name='user_slot'):
cursor = conn.cursor()
cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name))
def create_postgres_db(cursor, name='postgres_database'):
cursor.execute("CREATE DATABASE {}".format(name))
@ -941,6 +956,34 @@ def test_quoting(started_cluster):
drop_materialized_db()
def test_user_managed_slots(started_cluster):
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
database=True)
cursor = conn.cursor()
table_name = 'test_table'
create_postgres_table(cursor, table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
slot_name = 'user_slot'
replication_connection = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
database=True, replication=True, auto_commit=True)
snapshot = create_replication_slot(replication_connection, slot_name=slot_name)
create_materialized_db(ip=started_cluster.postgres_ip,
port=started_cluster.postgres_port,
settings=["materialized_postgresql_replication_slot = '{}'".format(slot_name),
"materialized_postgresql_snapshot = '{}'".format(snapshot)])
check_tables_are_synchronized(table_name);
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
instance.restart_clickhouse()
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(20000, 10000)".format(table_name))
check_tables_are_synchronized(table_name);
drop_postgres_table(cursor, table_name)
drop_materialized_db()
drop_replication_slot(replication_connection, slot_name)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -3,7 +3,10 @@ from contextlib import contextmanager
## sudo -H pip install PyMySQL
import pymysql.cursors
import pytest
import time
import threading
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
@ -319,6 +322,51 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
conn.close()
# Check that limited connection_wait_timeout (via connection_pool_size=1) will throw.
def test_settings_connection_wait_timeout(started_cluster):
table_name = 'test_settings_connection_wait_timeout'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
wait_timeout = 2
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}
(
id UInt32,
name String,
age UInt32,
money UInt32
)
ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')
SETTINGS connection_wait_timeout={}, connection_pool_size=1
'''.format(table_name, table_name, wait_timeout)
)
node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name))
def worker():
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
worker_thread = threading.Thread(target=worker)
worker_thread.start()
# ensure that first query started in worker_thread
time.sleep(1)
started = time.time()
with pytest.raises(QueryRuntimeException, match=r"Exception: mysqlxx::Pool is full \(connection_wait_timeout is exceeded\)"):
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
ended = time.time()
assert (ended - started) >= wait_timeout
worker_thread.join()
drop_mysql_table(conn, table_name)
conn.close()
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):

View File

@ -4,3 +4,5 @@
7 107
8 108
9 109
1970-01-01 1 one
1970-01-01 3 three

View File

@ -8,3 +8,18 @@ set max_rows_to_read = 5;
select * from test1 where i not in (1,2,3,4,5) order by i;
drop table test1;
drop table if exists t1;
drop table if exists t2;
create table t1 (date Date, a Float64, b String) Engine=MergeTree ORDER BY date;
create table t2 (date Date, a Float64, b String) Engine=MergeTree ORDER BY date;
insert into t1(a, b) values (1, 'one'), (2, 'two');
insert into t2(a, b) values (2, 'two'), (3, 'three');
select date, a, b from t1 where (date, a, b) NOT IN (select date,a,b from t2);
select date, a, b from t2 where (date, a, b) NOT IN (select date,a,b from t1);
drop table t1;
drop table t2;

View File

@ -1 +0,0 @@
02015_db materialized_view 02015_db view_source_tb Materialized

View File

@ -1,14 +0,0 @@
DROP DATABASE IF EXISTS 02015_db;
CREATE DATABASE IF NOT EXISTS 02015_db;
DROP TABLE IF EXISTS 02015_db.view_source_tb;
CREATE TABLE IF NOT EXISTS 02015_db.view_source_tb (a UInt8, s String) ENGINE = MergeTree() ORDER BY a;
DROP TABLE IF EXISTS 02015_db.materialized_view;
CREATE MATERIALIZED VIEW IF NOT EXISTS 02015_db.materialized_view ENGINE = ReplacingMergeTree() ORDER BY a AS SELECT * FROM 02015_db.view_source_tb;
SELECT * FROM system.views WHERE database='02015_db' and name = 'materialized_view';
DROP TABLE IF EXISTS 02015_db.materialized_view;
DROP TABLE IF EXISTS 02015_db.view_source_tb;
DROP DATABASE IF EXISTS 02015_db;

View File

@ -512,7 +512,6 @@
"01532_execute_merges_on_single_replica", /// static zk path
"01530_drop_database_atomic_sync", /// creates database
"02001_add_default_database_to_system_users", ///create user
"02002_row_level_filter_bug", ///create user
"02015_system_views"
"02002_row_level_filter_bug" ///create user
]
}