Merge branch 'ClickHouse:master' into arcadia_arrow2

This commit is contained in:
Vitaly Stoyan 2021-07-28 19:57:25 +03:00 committed by GitHub
commit c1f71b2e6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 432 additions and 418 deletions

View File

@ -4,13 +4,23 @@ QUERIES_FILE="queries.sql"
TABLE=$1
TRIES=3
if [ -x ./clickhouse ]
then
CLICKHOUSE_CLIENT="./clickhouse client"
elif command -v clickhouse-client >/dev/null 2>&1
CLICKHOUSE_CLIENT="clickhouse-client"
else
echo "clickhouse-client is not found"
exit 1
fi
cat "$QUERIES_FILE" | sed "s/{table}/${TABLE}/g" | while read query; do
sync
echo 3 | sudo tee /proc/sys/vm/drop_caches >/dev/null
echo -n "["
for i in $(seq 1 $TRIES); do
RES=$(clickhouse-client --time --format=Null --query="$query" 2>&1)
RES=$(${CLICKHOUSE_CLIENT} --time --format=Null --max_memory_usage=100G --query="$query" 2>&1)
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"
[[ "$i" != $TRIES ]] && echo -n ", "
done

View File

@ -11,8 +11,8 @@ DATASET="${TABLE}_v1.tar.xz"
QUERIES_FILE="queries.sql"
TRIES=3
AMD64_BIN_URL="https://clickhouse-builds.s3.yandex.net/0/e29c4c3cc47ab2a6c4516486c1b77d57e7d42643/clickhouse_build_check/gcc-10_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse"
AARCH64_BIN_URL="https://clickhouse-builds.s3.yandex.net/0/e29c4c3cc47ab2a6c4516486c1b77d57e7d42643/clickhouse_special_build_check/clang-10-aarch64_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse"
AMD64_BIN_URL="https://builds.clickhouse.tech/master/amd64/clickhouse"
AARCH64_BIN_URL="https://builds.clickhouse.tech/master/aarch64/clickhouse"
# Note: on older Ubuntu versions, 'axel' does not support IPv6. If you are using IPv6-only servers on very old Ubuntu, just don't install 'axel'.
@ -89,7 +89,7 @@ cat "$QUERIES_FILE" | sed "s/{table}/${TABLE}/g" | while read query; do
echo -n "["
for i in $(seq 1 $TRIES); do
RES=$(./clickhouse client --max_memory_usage 100000000000 --time --format=Null --query="$query" 2>&1 ||:)
RES=$(./clickhouse client --max_memory_usage 100G --time --format=Null --query="$query" 2>&1 ||:)
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"
[[ "$i" != $TRIES ]] && echo -n ", "
done

View File

@ -70,7 +70,13 @@ Note that integration of ClickHouse with third-party drivers is not tested. Also
Unit tests are useful when you want to test not the ClickHouse as a whole, but a single isolated library or class. You can enable or disable build of tests with `ENABLE_TESTS` CMake option. Unit tests (and other test programs) are located in `tests` subdirectories across the code. To run unit tests, type `ninja test`. Some tests use `gtest`, but some are just programs that return non-zero exit code on test failure.
Its not necessarily to have unit tests if the code is already covered by functional tests (and functional tests are usually much more simple to use).
Its not necessary to have unit tests if the code is already covered by functional tests (and functional tests are usually much more simple to use).
You can run individual gtest checks by calling the executable directly, for example:
```bash
$ ./src/unit_tests_dbms --gtest_filter=LocalAddress*
```
## Performance Tests {#performance-tests}

View File

@ -14,7 +14,7 @@ You can also use the following database engines:
- [MySQL](../../engines/database-engines/mysql.md)
- [MaterializeMySQL](../../engines/database-engines/materialize-mysql.md)
- [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md)
- [Lazy](../../engines/database-engines/lazy.md)

View File

@ -1,9 +1,9 @@
---
toc_priority: 29
toc_title: MaterializeMySQL
toc_title: MaterializedMySQL
---
# MaterializeMySQL {#materialize-mysql}
# MaterializedMySQL {#materialized-mysql}
**This is experimental feature that should not be used in production.**
@ -17,7 +17,7 @@ This feature is experimental.
``` sql
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
```
**Engine Parameters**
@ -36,7 +36,7 @@ ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'passwor
- `max_wait_time_when_mysql_unavailable` — Retry interval when MySQL is not available (milliseconds). Negative value disable retry. Default: `1000`.
- `allows_query_when_mysql_lost` — Allow query materialized table when mysql is lost. Default: `0` (`false`).
```
CREATE DATABASE mysql ENGINE = MaterializeMySQL('localhost:3306', 'db', 'user', '***')
CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***')
SETTINGS
allows_query_when_mysql_lost=true,
max_wait_time_when_mysql_unavailable=10000;
@ -51,7 +51,7 @@ For the correct work of `MaterializeMySQL`, there are few mandatory `MySQL`-side
## Virtual columns {#virtual-columns}
When working with the `MaterializeMySQL` database engine, [ReplacingMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) tables are used with virtual `_sign` and `_version` columns.
When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) tables are used with virtual `_sign` and `_version` columns.
- `_version` — Transaction counter. Type [UInt64](../../sql-reference/data-types/int-uint.md).
- `_sign` — Deletion mark. Type [Int8](../../sql-reference/data-types/int-uint.md). Possible values:
@ -99,7 +99,7 @@ MySQL DDL queries are converted into the corresponding ClickHouse DDL queries ([
### Data Replication {#data-replication}
`MaterializeMySQL` does not support direct `INSERT`, `DELETE` and `UPDATE` queries. However, they are supported in terms of data replication:
`MaterializedMySQL` does not support direct `INSERT`, `DELETE` and `UPDATE` queries. However, they are supported in terms of data replication:
- MySQL `INSERT` query is converted into `INSERT` with `_sign=1`.
@ -107,9 +107,9 @@ MySQL DDL queries are converted into the corresponding ClickHouse DDL queries ([
- MySQL `UPDATE` query is converted into `INSERT` with `_sign=-1` and `INSERT` with `_sign=1`.
### Selecting from MaterializeMySQL Tables {#select}
### Selecting from MaterializedMySQL Tables {#select}
`SELECT` query from `MaterializeMySQL` tables has some specifics:
`SELECT` query from `MaterializedMySQL` tables has some specifics:
- If `_version` is not specified in the `SELECT` query, [FINAL](../../sql-reference/statements/select/from.md#select-from-final) modifier is used. So only rows with `MAX(_version)` are selected.
@ -126,10 +126,10 @@ ClickHouse has only one physical order, which is determined by `ORDER BY` clause
**Notes**
- Rows with `_sign=-1` are not deleted physically from the tables.
- Cascade `UPDATE/DELETE` queries are not supported by the `MaterializeMySQL` engine.
- Cascade `UPDATE/DELETE` queries are not supported by the `MaterializedMySQL` engine.
- Replication can be easily broken.
- Manual operations on database and tables are forbidden.
- `MaterializeMySQL` is influenced by [optimize_on_insert](../../operations/settings/settings.md#optimize-on-insert) setting. The data is merged in the corresponding table in the `MaterializeMySQL` database when a table in the MySQL server changes.
- `MaterializedMySQL` is influenced by [optimize_on_insert](../../operations/settings/settings.md#optimize-on-insert) setting. The data is merged in the corresponding table in the `MaterializedMySQL` database when a table in the MySQL server changes.
## Examples of Use {#examples-of-use}
@ -158,7 +158,7 @@ Database in ClickHouse, exchanging data with the MySQL server:
The database and the table created:
``` sql
CREATE DATABASE mysql ENGINE = MaterializeMySQL('localhost:3306', 'db', 'user', '***');
CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***');
SHOW TABLES FROM mysql;
```
@ -193,4 +193,4 @@ SELECT * FROM mysql.test;
└───┴─────┴──────┘
```
[Original article](https://clickhouse.tech/docs/en/engines/database-engines/materialize-mysql/) <!--hide-->
[Original article](https://clickhouse.tech/docs/en/engines/database-engines/materialized-mysql/) <!--hide-->

View File

@ -2927,7 +2927,7 @@ Result:
└─────────────┘
```
Note that this setting influences [Materialized view](../../sql-reference/statements/create/view.md#materialized) and [MaterializeMySQL](../../engines/database-engines/materialize-mysql.md) behaviour.
Note that this setting influences [Materialized view](../../sql-reference/statements/create/view.md#materialized) and [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md) behaviour.
## engine_file_empty_if_not_exists {#engine-file-empty_if-not-exists}

View File

@ -14,7 +14,7 @@ toc_title: "Введение"
- [MySQL](../../engines/database-engines/mysql.md)
- [MaterializeMySQL](../../engines/database-engines/materialize-mysql.md)
- [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md)
- [Lazy](../../engines/database-engines/lazy.md)

View File

@ -1,22 +1,22 @@
---
toc_priority: 29
toc_title: MaterializeMySQL
toc_title: MaterializedMySQL
---
# MaterializeMySQL {#materialize-mysql}
# MaterializedMySQL {#materialized-mysql}
Создает базу данных ClickHouse со всеми таблицами, существующими в MySQL, и всеми данными в этих таблицах.
Сервер ClickHouse работает как реплика MySQL. Он читает файл binlog и выполняет DDL and DML-запросы.
`MaterializeMySQL` — экспериментальный движок баз данных.
`MaterializedMySQL` — экспериментальный движок баз данных.
## Создание базы данных {#creating-a-database}
``` sql
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
```
**Параметры движка**
@ -28,7 +28,7 @@ ENGINE = MaterializeMySQL('host:port', ['database' | database], 'user', 'passwor
## Виртуальные столбцы {#virtual-columns}
При работе с движком баз данных `MaterializeMySQL` используются таблицы семейства [ReplacingMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) с виртуальными столбцами `_sign` и `_version`.
При работе с движком баз данных `MaterializedMySQL` используются таблицы семейства [ReplacingMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) с виртуальными столбцами `_sign` и `_version`.
- `_version` — счетчик транзакций. Тип [UInt64](../../sql-reference/data-types/int-uint.md).
- `_sign` — метка удаления. Тип [Int8](../../sql-reference/data-types/int-uint.md). Возможные значения:
@ -75,9 +75,9 @@ DDL-запросы в MySQL конвертируются в соответств
- Запрос `UPDATE` конвертируется в ClickHouse в `INSERT` с `_sign=-1` и `INSERT` с `_sign=1`.
### Выборка из таблиц движка MaterializeMySQL {#select}
### Выборка из таблиц движка MaterializedMySQL {#select}
Запрос `SELECT` из таблиц движка `MaterializeMySQL` имеет некоторую специфику:
Запрос `SELECT` из таблиц движка `MaterializedMySQL` имеет некоторую специфику:
- Если в запросе `SELECT` напрямую не указан столбец `_version`, то используется модификатор [FINAL](../../sql-reference/statements/select/from.md#select-from-final). Таким образом, выбираются только строки с `MAX(_version)`.
@ -94,10 +94,10 @@ DDL-запросы в MySQL конвертируются в соответств
**Примечание**
- Строки с `_sign=-1` физически не удаляются из таблиц.
- Каскадные запросы `UPDATE/DELETE` не поддерживаются движком `MaterializeMySQL`.
- Каскадные запросы `UPDATE/DELETE` не поддерживаются движком `MaterializedMySQL`.
- Репликация может быть легко нарушена.
- Прямые операции изменения данных в таблицах и базах данных `MaterializeMySQL` запрещены.
- На работу `MaterializeMySQL` влияет настройка [optimize_on_insert](../../operations/settings/settings.md#optimize-on-insert). Когда таблица на MySQL сервере меняется, происходит слияние данных в соответсвующей таблице в базе данных `MaterializeMySQL`.
- Прямые операции изменения данных в таблицах и базах данных `MaterializedMySQL` запрещены.
- На работу `MaterializedMySQL` влияет настройка [optimize_on_insert](../../operations/settings/settings.md#optimize-on-insert). Когда таблица на MySQL сервере меняется, происходит слияние данных в соответсвующей таблице в базе данных `MaterializedMySQL`.
## Примеры использования {#examples-of-use}
@ -126,7 +126,7 @@ mysql> SELECT * FROM test;
База данных и созданная таблица:
``` sql
CREATE DATABASE mysql ENGINE = MaterializeMySQL('localhost:3306', 'db', 'user', '***');
CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***');
SHOW TABLES FROM mysql;
```

View File

@ -2777,7 +2777,7 @@ SELECT * FROM test2;
└─────────────┘
```
Обратите внимание на то, что эта настройка влияет на поведение [материализованных представлений](../../sql-reference/statements/create/view.md#materialized) и БД [MaterializeMySQL](../../engines/database-engines/materialize-mysql.md).
Обратите внимание на то, что эта настройка влияет на поведение [материализованных представлений](../../sql-reference/statements/create/view.md#materialized) и БД [MaterializedMySQL](../../engines/database-engines/materialized-mysql.md).
## engine_file_empty_if_not_exists {#engine-file-empty_if-not-exists}

View File

@ -6,6 +6,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}

View File

@ -434,7 +434,7 @@ class IColumn;
M(Bool, data_type_default_nullable, false, "Data types without NULL or NOT NULL will make Nullable", 0) \
M(Bool, cast_keep_nullable, false, "CAST operator keep Nullable for result data type", 0) \
M(Bool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
M(Bool, allow_experimental_database_materialize_mysql, false, "Allow to create database with Engine=MaterializeMySQL(...).", 0) \
M(Bool, allow_experimental_database_materialized_mysql, false, "Allow to create database with Engine=MaterializedMySQL(...).", 0) \
M(Bool, allow_experimental_database_materialized_postgresql, false, "Allow to create database with Engine=MaterializedPostgreSQL(...).", 0) \
M(Bool, system_events_show_zero_values, false, "Include all metrics, even with zero values", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \
@ -471,8 +471,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializeMySQL. If equal to 0, this setting is disabled", 0) \
M(UInt64, external_storage_max_read_bytes, 0, "Limit maximum number of bytes when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializeMySQL. If equal to 0, this setting is disabled", 0) \
M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializedMySQL. If equal to 0, this setting is disabled", 0) \
M(UInt64, external_storage_max_read_bytes, 0, "Limit maximum number of bytes when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializedMySQL. If equal to 0, this setting is disabled", 0) \
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \

View File

@ -23,8 +23,8 @@
# include <Core/MySQL/MySQLClient.h>
# include <Databases/MySQL/ConnectionMySQLSettings.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/MaterializeMySQLSettings.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/MaterializedMySQLSettings.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <mysqlxx/Pool.h>
#endif
@ -103,8 +103,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
const String & engine_name = engine_define->engine->name;
const UUID & uuid = create.uuid;
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "Lazy" ||
engine_name == "Replicated" || engine_name == "PostgreSQL" || engine_name == "MaterializedPostgreSQL" || engine_name == "SQLite";
bool engine_may_have_arguments = engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL" ||
engine_name == "Lazy" || engine_name == "Replicated" || engine_name == "PostgreSQL" ||
engine_name == "MaterializedPostgreSQL" || engine_name == "SQLite";
if (engine_define->engine->arguments && !engine_may_have_arguments)
throw Exception("Database engine " + engine_name + " cannot have arguments", ErrorCodes::BAD_ARGUMENTS);
@ -127,7 +128,7 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
#if USE_MYSQL
else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL")
else if (engine_name == "MySQL" || engine_name == "MaterializeMySQL" || engine_name == "MaterializedMySQL")
{
const ASTFunction * engine = engine_define->engine;
if (!engine->arguments || engine->arguments->children.size() != 4)
@ -165,17 +166,17 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
auto materialize_mode_settings = std::make_unique<MaterializeMySQLSettings>();
auto materialize_mode_settings = std::make_unique<MaterializedMySQLSettings>();
if (engine_define->settings)
materialize_mode_settings->loadFromQuery(*engine_define);
if (create.uuid == UUIDHelpers::Nil)
return std::make_shared<DatabaseMaterializeMySQL<DatabaseOrdinary>>(
return std::make_shared<DatabaseMaterializedMySQL<DatabaseOrdinary>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
else
return std::make_shared<DatabaseMaterializeMySQL<DatabaseAtomic>>(
return std::make_shared<DatabaseMaterializedMySQL<DatabaseAtomic>>(
context, database_name, metadata_path, uuid, mysql_database_name, std::move(mysql_pool), std::move(client)
, std::move(materialize_mode_settings));
}

View File

@ -18,9 +18,6 @@ class ASTStorage;
DECLARE_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS)
/** Settings for the MaterializeMySQL database engine.
* Could be loaded from a CREATE DATABASE query (SETTINGS clause).
*/
struct DatabaseReplicatedSettings : public BaseSettings<DatabaseReplicatedSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);

View File

@ -4,15 +4,15 @@
#if USE_MYSQL
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Interpreters/Context.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/DatabaseAtomic.h>
# include <Databases/MySQL/DatabaseMaterializeTablesIterator.h>
# include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <Databases/MySQL/DatabaseMaterializedTablesIterator.h>
# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <Parsers/ASTCreateQuery.h>
# include <Storages/StorageMaterializeMySQL.h>
# include <Storages/StorageMaterializedMySQL.h>
# include <Poco/Logger.h>
# include <Common/setThreadName.h>
# include <filesystem>
@ -29,7 +29,7 @@ namespace ErrorCodes
}
template <>
DatabaseMaterializeMySQL<DatabaseOrdinary>::DatabaseMaterializeMySQL(
DatabaseMaterializedMySQL<DatabaseOrdinary>::DatabaseMaterializedMySQL(
ContextPtr context_,
const String & database_name_,
const String & metadata_path_,
@ -37,12 +37,12 @@ DatabaseMaterializeMySQL<DatabaseOrdinary>::DatabaseMaterializeMySQL(
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
std::unique_ptr<MaterializeMySQLSettings> settings_)
std::unique_ptr<MaterializedMySQLSettings> settings_)
: DatabaseOrdinary(
database_name_,
metadata_path_,
"data/" + escapeForFileName(database_name_) + "/",
"DatabaseMaterializeMySQL<Ordinary> (" + database_name_ + ")",
"DatabaseMaterializedMySQL<Ordinary> (" + database_name_ + ")",
context_)
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
@ -50,7 +50,7 @@ DatabaseMaterializeMySQL<DatabaseOrdinary>::DatabaseMaterializeMySQL(
}
template <>
DatabaseMaterializeMySQL<DatabaseAtomic>::DatabaseMaterializeMySQL(
DatabaseMaterializedMySQL<DatabaseAtomic>::DatabaseMaterializedMySQL(
ContextPtr context_,
const String & database_name_,
const String & metadata_path_,
@ -58,15 +58,15 @@ DatabaseMaterializeMySQL<DatabaseAtomic>::DatabaseMaterializeMySQL(
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
std::unique_ptr<MaterializeMySQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializeMySQL<Atomic> (" + database_name_ + ")", context_)
std::unique_ptr<MaterializedMySQLSettings> settings_)
: DatabaseAtomic(database_name_, metadata_path_, uuid, "DatabaseMaterializedMySQL<Atomic> (" + database_name_ + ")", context_)
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), settings.get())
{
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::rethrowExceptionIfNeed() const
void DatabaseMaterializedMySQL<Base>::rethrowExceptionIfNeed() const
{
std::unique_lock<std::mutex> lock(Base::mutex);
@ -87,14 +87,14 @@ void DatabaseMaterializeMySQL<Base>::rethrowExceptionIfNeed() const
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::setException(const std::exception_ptr & exception_)
void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & exception_)
{
std::unique_lock<std::mutex> lock(Base::mutex);
exception = exception_;
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach)
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach)
{
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach);
if (!force_attach)
@ -105,59 +105,59 @@ void DatabaseMaterializeMySQL<Base>::loadStoredObjects(ContextMutablePtr context
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
void DatabaseMaterializedMySQL<Base>::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
{
assertCalledFromSyncThreadOrDrop("create table");
Base::createTable(context_, name, table, query);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::dropTable(ContextPtr context_, const String & name, bool no_delay)
void DatabaseMaterializedMySQL<Base>::dropTable(ContextPtr context_, const String & name, bool no_delay)
{
assertCalledFromSyncThreadOrDrop("drop table");
Base::dropTable(context_, name, no_delay);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseMaterializedMySQL<Base>::attachTable(const String & name, const StoragePtr & table, const String & relative_table_path)
{
assertCalledFromSyncThreadOrDrop("attach table");
Base::attachTable(name, table, relative_table_path);
}
template<typename Base>
StoragePtr DatabaseMaterializeMySQL<Base>::detachTable(const String & name)
StoragePtr DatabaseMaterializedMySQL<Base>::detachTable(const String & name)
{
assertCalledFromSyncThreadOrDrop("detach table");
return Base::detachTable(name);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
void DatabaseMaterializedMySQL<Base>::renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary)
{
assertCalledFromSyncThreadOrDrop("rename table");
if (exchange)
throw Exception("MaterializeMySQL database not support exchange table.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("MaterializedMySQL database not support exchange table.", ErrorCodes::NOT_IMPLEMENTED);
if (dictionary)
throw Exception("MaterializeMySQL database not support rename dictionary.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("MaterializedMySQL database not support rename dictionary.", ErrorCodes::NOT_IMPLEMENTED);
if (to_database.getDatabaseName() != Base::getDatabaseName())
throw Exception("Cannot rename with other database for MaterializeMySQL database.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Cannot rename with other database for MaterializedMySQL database.", ErrorCodes::NOT_IMPLEMENTED);
Base::renameTable(context_, name, *this, to_name, exchange, dictionary);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::alterTable(ContextPtr context_, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
void DatabaseMaterializedMySQL<Base>::alterTable(ContextPtr context_, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{
assertCalledFromSyncThreadOrDrop("alter table");
Base::alterTable(context_, table_id, metadata);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::drop(ContextPtr context_)
void DatabaseMaterializedMySQL<Base>::drop(ContextPtr context_)
{
/// Remove metadata info
fs::path metadata(Base::getMetadataPath() + "/.metadata");
@ -169,16 +169,16 @@ void DatabaseMaterializeMySQL<Base>::drop(ContextPtr context_)
}
template<typename Base>
StoragePtr DatabaseMaterializeMySQL<Base>::tryGetTable(const String & name, ContextPtr context_) const
StoragePtr DatabaseMaterializedMySQL<Base>::tryGetTable(const String & name, ContextPtr context_) const
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
{
StoragePtr nested_storage = Base::tryGetTable(name, context_);
if (!nested_storage)
return {};
return std::make_shared<StorageMaterializeMySQL>(std::move(nested_storage), this);
return std::make_shared<StorageMaterializedMySQL>(std::move(nested_storage), this);
}
return Base::tryGetTable(name, context_);
@ -186,36 +186,36 @@ StoragePtr DatabaseMaterializeMySQL<Base>::tryGetTable(const String & name, Cont
template <typename Base>
DatabaseTablesIteratorPtr
DatabaseMaterializeMySQL<Base>::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name)
DatabaseMaterializedMySQL<Base>::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name)
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
{
DatabaseTablesIteratorPtr iterator = Base::getTablesIterator(context_, filter_by_table_name);
return std::make_unique<DatabaseMaterializeTablesIterator>(std::move(iterator), this);
return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
}
return Base::getTablesIterator(context_, filter_by_table_name);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::assertCalledFromSyncThreadOrDrop(const char * method) const
void DatabaseMaterializedMySQL<Base>::assertCalledFromSyncThreadOrDrop(const char * method) const
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread() && started_up)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializeMySQL database not support {}", method);
if (!MaterializedMySQLSyncThread::isMySQLSyncThread() && started_up)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MaterializedMySQL database not support {}", method);
}
template<typename Base>
void DatabaseMaterializeMySQL<Base>::shutdownSynchronizationThread()
void DatabaseMaterializedMySQL<Base>::shutdownSynchronizationThread()
{
materialize_thread.stopSynchronization();
started_up = false;
}
template<typename Database, template<class> class Helper, typename... Args>
auto castToMaterializeMySQLAndCallHelper(Database * database, Args && ... args)
auto castToMaterializedMySQLAndCallHelper(Database * database, Args && ... args)
{
using Ordinary = DatabaseMaterializeMySQL<DatabaseOrdinary>;
using Atomic = DatabaseMaterializeMySQL<DatabaseAtomic>;
using Ordinary = DatabaseMaterializedMySQL<DatabaseOrdinary>;
using Atomic = DatabaseMaterializedMySQL<DatabaseAtomic>;
using ToOrdinary = typename std::conditional_t<std::is_const_v<Database>, const Ordinary *, Ordinary *>;
using ToAtomic = typename std::conditional_t<std::is_const_v<Database>, const Atomic *, Atomic *>;
if (auto * database_materialize = typeid_cast<ToOrdinary>(database))
@ -223,29 +223,29 @@ auto castToMaterializeMySQLAndCallHelper(Database * database, Args && ... args)
if (auto * database_materialize = typeid_cast<ToAtomic>(database))
return (database_materialize->*Helper<Atomic>::v)(std::forward<Args>(args)...);
throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializeMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception("LOGICAL_ERROR: cannot cast to DatabaseMaterializedMySQL, it is a bug.", ErrorCodes::LOGICAL_ERROR);
}
template<typename T> struct HelperSetException { static constexpr auto v = &T::setException; };
void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception)
void setSynchronizationThreadException(const DatabasePtr & materialized_mysql_db, const std::exception_ptr & exception)
{
castToMaterializeMySQLAndCallHelper<IDatabase, HelperSetException>(materialize_mysql_db.get(), exception);
castToMaterializedMySQLAndCallHelper<IDatabase, HelperSetException>(materialized_mysql_db.get(), exception);
}
template<typename T> struct HelperStopSync { static constexpr auto v = &T::shutdownSynchronizationThread; };
void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db)
void stopDatabaseSynchronization(const DatabasePtr & materialized_mysql_db)
{
castToMaterializeMySQLAndCallHelper<IDatabase, HelperStopSync>(materialize_mysql_db.get());
castToMaterializedMySQLAndCallHelper<IDatabase, HelperStopSync>(materialized_mysql_db.get());
}
template<typename T> struct HelperRethrow { static constexpr auto v = &T::rethrowExceptionIfNeed; };
void rethrowSyncExceptionIfNeed(const IDatabase * materialize_mysql_db)
void rethrowSyncExceptionIfNeed(const IDatabase * materialized_mysql_db)
{
castToMaterializeMySQLAndCallHelper<const IDatabase, HelperRethrow>(materialize_mysql_db);
castToMaterializedMySQLAndCallHelper<const IDatabase, HelperRethrow>(materialized_mysql_db);
}
template class DatabaseMaterializeMySQL<DatabaseOrdinary>;
template class DatabaseMaterializeMySQL<DatabaseAtomic>;
template class DatabaseMaterializedMySQL<DatabaseOrdinary>;
template class DatabaseMaterializedMySQL<DatabaseAtomic>;
}

View File

@ -7,8 +7,8 @@
#include <mysqlxx/Pool.h>
#include <Core/MySQL/MySQLClient.h>
#include <Databases/IDatabase.h>
#include <Databases/MySQL/MaterializeMySQLSettings.h>
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
#include <Databases/MySQL/MaterializedMySQLSettings.h>
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
namespace DB
{
@ -18,30 +18,30 @@ namespace DB
* All table structure and data will be written to the local file system
*/
template<typename Base>
class DatabaseMaterializeMySQL : public Base
class DatabaseMaterializedMySQL : public Base
{
public:
DatabaseMaterializeMySQL(
DatabaseMaterializedMySQL(
ContextPtr context, const String & database_name_, const String & metadata_path_, UUID uuid,
const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_, std::unique_ptr<MaterializeMySQLSettings> settings_);
MySQLClient && client_, std::unique_ptr<MaterializedMySQLSettings> settings_);
void rethrowExceptionIfNeed() const;
void setException(const std::exception_ptr & exception);
protected:
std::unique_ptr<MaterializeMySQLSettings> settings;
std::unique_ptr<MaterializedMySQLSettings> settings;
MaterializeMySQLSyncThread materialize_thread;
MaterializedMySQLSyncThread materialize_thread;
std::exception_ptr exception;
std::atomic_bool started_up{false};
public:
String getEngineName() const override { return "MaterializeMySQL"; }
String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach) override;
@ -67,13 +67,13 @@ public:
void shutdownSynchronizationThread();
friend class DatabaseMaterializeTablesIterator;
friend class DatabaseMaterializedTablesIterator;
};
void setSynchronizationThreadException(const DatabasePtr & materialize_mysql_db, const std::exception_ptr & exception);
void stopDatabaseSynchronization(const DatabasePtr & materialize_mysql_db);
void rethrowSyncExceptionIfNeed(const IDatabase * materialize_mysql_db);
void setSynchronizationThreadException(const DatabasePtr & materialized_mysql_db, const std::exception_ptr & exception);
void stopDatabaseSynchronization(const DatabasePtr & materialized_mysql_db);
void rethrowSyncExceptionIfNeed(const IDatabase * materialized_mysql_db);
}

View File

@ -1,18 +1,18 @@
#pragma once
#include <Databases/IDatabase.h>
#include <Storages/StorageMaterializeMySQL.h>
#include <Storages/StorageMaterializedMySQL.h>
namespace DB
{
/** MaterializeMySQL database table iterator
/** MaterializedMySQL database table iterator
*
* The iterator returns different storage engine types depending on the visitor.
* When MySQLSync thread accesses, it always returns MergeTree
* Other cases always convert MergeTree to StorageMaterializeMySQL
* Other cases always convert MergeTree to StorageMaterializedMySQL
*/
class DatabaseMaterializeTablesIterator final : public IDatabaseTablesIterator
class DatabaseMaterializedTablesIterator final : public IDatabaseTablesIterator
{
public:
void next() override { nested_iterator->next(); }
@ -23,13 +23,13 @@ public:
const StoragePtr & table() const override
{
StoragePtr storage = std::make_shared<StorageMaterializeMySQL>(nested_iterator->table(), database);
StoragePtr storage = std::make_shared<StorageMaterializedMySQL>(nested_iterator->table(), database);
return tables.emplace_back(storage);
}
UUID uuid() const override { return nested_iterator->uuid(); }
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, const IDatabase * database_)
DatabaseMaterializedTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, const IDatabase * database_)
: IDatabaseTablesIterator(database_->getDatabaseName()), nested_iterator(std::move(nested_iterator_)), database(database_)
{
}

View File

@ -1,4 +1,4 @@
#include <Databases/MySQL/MaterializeMySQLSettings.h>
#include <Databases/MySQL/MaterializedMySQLSettings.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
@ -11,9 +11,9 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(MaterializeMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(MaterializedMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)
void MaterializeMySQLSettings::loadFromQuery(ASTStorage & storage_def)
void MaterializedMySQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{

View File

@ -17,13 +17,13 @@ class ASTStorage;
M(Int64, max_wait_time_when_mysql_unavailable, 1000, "Retry interval when MySQL is not available (milliseconds). Negative value disable retry.", 0) \
M(Bool, allows_query_when_mysql_lost, false, "Allow query materialized table when mysql is lost.", 0) \
DECLARE_SETTINGS_TRAITS(MaterializeMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)
DECLARE_SETTINGS_TRAITS(MaterializedMySQLSettingsTraits, LIST_OF_MATERIALIZE_MODE_SETTINGS)
/** Settings for the MaterializeMySQL database engine.
/** Settings for the MaterializedMySQL database engine.
* Could be loaded from a CREATE DATABASE query (SETTINGS clause).
*/
struct MaterializeMySQLSettings : public BaseSettings<MaterializeMySQLSettingsTraits>
struct MaterializedMySQLSettings : public BaseSettings<MaterializedMySQLSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};

View File

@ -4,7 +4,7 @@
#if USE_MYSQL
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <cstdlib>
# include <random>
# include <Columns/ColumnTuple.h>
@ -12,7 +12,7 @@
# include <DataStreams/CountingBlockOutputStream.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/ReadBufferFromString.h>
@ -71,14 +71,14 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, ContextMutable
catch (...)
{
tryLogCurrentException(
&Poco::Logger::get("MaterializeMySQLSyncThread(" + database + ")"),
&Poco::Logger::get("MaterializedMySQLSyncThread(" + database + ")"),
"Query " + query_to_execute + " wasn't finished successfully");
throw;
}
}
MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
MaterializedMySQLSyncThread::~MaterializedMySQLSyncThread()
{
try
{
@ -129,7 +129,7 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
{
bool first = true;
WriteBufferFromOwnString error_message;
error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires ";
error_message << "Illegal MySQL variables, the MaterializedMySQL engine requires ";
for (const auto & [variable_name, variable_error_val] : variables_error_message)
{
error_message << (first ? "" : ", ") << variable_name << "='" << variable_error_val << "'";
@ -142,15 +142,15 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
}
}
MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
MaterializedMySQLSyncThread::MaterializedMySQLSyncThread(
ContextPtr context_,
const String & database_name_,
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
MaterializeMySQLSettings * settings_)
MaterializedMySQLSettings * settings_)
: WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("MaterializeMySQLSyncThread"))
, log(&Poco::Logger::get("MaterializedMySQLSyncThread"))
, database_name(database_name_)
, mysql_database_name(mysql_database_name_)
, pool(std::move(pool_))
@ -160,7 +160,7 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
}
void MaterializeMySQLSyncThread::synchronization()
void MaterializedMySQLSyncThread::synchronization()
{
setThreadName(MYSQL_BACKGROUND_THREAD_NAME);
@ -221,7 +221,7 @@ void MaterializeMySQLSyncThread::synchronization()
}
}
void MaterializeMySQLSyncThread::stopSynchronization()
void MaterializedMySQLSyncThread::stopSynchronization()
{
if (!sync_quit && background_thread_pool)
{
@ -231,12 +231,12 @@ void MaterializeMySQLSyncThread::stopSynchronization()
}
}
void MaterializeMySQLSyncThread::startSynchronization()
void MaterializedMySQLSyncThread::startSynchronization()
{
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this]() { synchronization(); });
}
void MaterializeMySQLSyncThread::assertMySQLAvailable()
void MaterializedMySQLSyncThread::assertMySQLAvailable()
{
try
{
@ -334,7 +334,7 @@ static inline void dumpDataForTables(
Stopwatch watch;
copyData(input, *out, is_cancelled);
const Progress & progress = out->getProgress();
LOG_INFO(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database_name + ")"),
LOG_INFO(&Poco::Logger::get("MaterializedMySQLSyncThread(" + database_name + ")"),
"Materialize MySQL step 1: dump {}, {} rows, {} in {} sec., {} rows/sec., {}/sec."
, table_name, formatReadableQuantity(progress.written_rows), formatReadableSizeWithBinarySuffix(progress.written_bytes)
, watch.elapsedSeconds(), formatReadableQuantity(static_cast<size_t>(progress.written_rows / watch.elapsedSeconds()))
@ -356,7 +356,7 @@ static inline UInt32 randomNumber()
return dist6(rng);
}
bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metadata)
bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metadata)
{
bool opened_transaction = false;
mysqlxx::PoolWithFailover::Entry connection;
@ -441,7 +441,7 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad
return false;
}
void MaterializeMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata)
void MaterializedMySQLSyncThread::flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata)
{
if (buffers.data.empty())
return;
@ -674,7 +674,7 @@ static inline size_t onUpdateData(const Row & rows_data, Block & buffer, size_t
return buffer.bytes() - prev_bytes;
}
void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr & receive_event, MaterializeMetadata & metadata)
void MaterializedMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr & receive_event, MaterializeMetadata & metadata)
{
if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT)
{
@ -729,7 +729,7 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
}
}
void MaterializeMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_event)
void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_event)
{
try
{
@ -751,18 +751,18 @@ void MaterializeMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_event
}
}
bool MaterializeMySQLSyncThread::isMySQLSyncThread()
bool MaterializedMySQLSyncThread::isMySQLSyncThread()
{
return getThreadName() == MYSQL_BACKGROUND_THREAD_NAME;
}
void MaterializeMySQLSyncThread::setSynchronizationThreadException(const std::exception_ptr & exception)
void MaterializedMySQLSyncThread::setSynchronizationThreadException(const std::exception_ptr & exception)
{
auto db = DatabaseCatalog::instance().getDatabase(database_name);
DB::setSynchronizationThreadException(db, exception);
}
void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes)
void MaterializedMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_bytes, size_t written_rows, size_t written_bytes)
{
total_blocks_rows += written_rows;
total_blocks_bytes += written_bytes;
@ -770,13 +770,13 @@ void MaterializeMySQLSyncThread::Buffers::add(size_t block_rows, size_t block_by
max_block_bytes = std::max(block_bytes, max_block_bytes);
}
bool MaterializeMySQLSyncThread::Buffers::checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes) const
bool MaterializedMySQLSyncThread::Buffers::checkThresholds(size_t check_block_rows, size_t check_block_bytes, size_t check_total_rows, size_t check_total_bytes) const
{
return max_block_rows >= check_block_rows || max_block_bytes >= check_block_bytes || total_blocks_rows >= check_total_rows
|| total_blocks_bytes >= check_total_bytes;
}
void MaterializeMySQLSyncThread::Buffers::commit(ContextPtr context)
void MaterializedMySQLSyncThread::Buffers::commit(ContextPtr context)
{
try
{
@ -801,7 +801,7 @@ void MaterializeMySQLSyncThread::Buffers::commit(ContextPtr context)
}
}
MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQLSyncThread::Buffers::getTableDataBuffer(
MaterializedMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializedMySQLSyncThread::Buffers::getTableDataBuffer(
const String & table_name, ContextPtr context)
{
const auto & iterator = data.find(table_name);

View File

@ -14,7 +14,7 @@
# include <Databases/DatabaseOrdinary.h>
# include <Databases/IDatabase.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Databases/MySQL/MaterializeMySQLSettings.h>
# include <Databases/MySQL/MaterializedMySQLSettings.h>
# include <Parsers/ASTCreateQuery.h>
# include <mysqlxx/Pool.h>
# include <mysqlxx/PoolWithFailover.h>
@ -36,18 +36,18 @@ namespace DB
* real-time pull incremental data:
* We will pull the binlog event of MySQL to parse and execute when the full data synchronization is completed.
*/
class MaterializeMySQLSyncThread : WithContext
class MaterializedMySQLSyncThread : WithContext
{
public:
~MaterializeMySQLSyncThread();
~MaterializedMySQLSyncThread();
MaterializeMySQLSyncThread(
MaterializedMySQLSyncThread(
ContextPtr context,
const String & database_name_,
const String & mysql_database_name_,
mysqlxx::Pool && pool_,
MySQLClient && client_,
MaterializeMySQLSettings * settings_);
MaterializedMySQLSettings * settings_);
void stopSynchronization();
@ -65,7 +65,7 @@ private:
mutable mysqlxx::Pool pool;
mutable MySQLClient client;
MaterializeMySQLSettings * settings;
MaterializedMySQLSettings * settings;
String query_prefix;
// USE MySQL ERROR CODE:

View File

@ -21,12 +21,12 @@ SRCS(
DatabaseReplicatedWorker.cpp
DatabasesCommon.cpp
MySQL/ConnectionMySQLSettings.cpp
MySQL/DatabaseMaterializeMySQL.cpp
MySQL/DatabaseMaterializedMySQL.cpp
MySQL/DatabaseMySQL.cpp
MySQL/FetchTablesColumnsList.cpp
MySQL/MaterializeMetadata.cpp
MySQL/MaterializeMySQLSettings.cpp
MySQL/MaterializeMySQLSyncThread.cpp
MySQL/MaterializedMySQLSettings.cpp
MySQL/MaterializedMySQLSyncThread.cpp
SQLite/DatabaseSQLite.cpp
SQLite/SQLiteUtils.cpp
SQLite/fetchSQLiteTableStructure.cpp

View File

@ -49,7 +49,7 @@ MySQLBlockInputStream::Connection::Connection(
{
}
/// Used in MaterializeMySQL and in doInvalidateQuery for dictionary source.
/// Used in MaterializedMySQL and in doInvalidateQuery for dictionary source.
MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,

View File

@ -24,8 +24,8 @@
#endif
#if USE_MYSQL
# include <Databases/MySQL/MaterializeMySQLSyncThread.h>
# include <Storages/StorageMaterializeMySQL.h>
# include <Databases/MySQL/MaterializedMySQLSyncThread.h>
# include <Storages/StorageMaterializedMySQL.h>
#endif
#if USE_LIBPQXX
@ -246,11 +246,11 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
#endif
#if USE_MYSQL
/// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializeMySQL::tryGetTable(...)
if (db_and_table.first->getEngineName() == "MaterializeMySQL")
/// It's definitely not the best place for this logic, but behaviour must be consistent with DatabaseMaterializedMySQL::tryGetTable(...)
if (db_and_table.first->getEngineName() == "MaterializedMySQL")
{
if (!MaterializeMySQLSyncThread::isMySQLSyncThread())
db_and_table.second = std::make_shared<StorageMaterializeMySQL>(std::move(db_and_table.second), db_and_table.first.get());
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
db_and_table.second = std::make_shared<StorageMaterializedMySQL>(std::move(db_and_table.second), db_and_table.first.get());
}
#endif
return db_and_table;

View File

@ -164,7 +164,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (!create.attach && fs::exists(metadata_path))
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Metadata directory {} already exists", metadata_path.string());
}
else if (create.storage->engine->name == "MaterializeMySQL")
else if (create.storage->engine->name == "MaterializeMySQL" || create.storage->engine->name == "MaterializedMySQL")
{
/// It creates nested database with Ordinary or Atomic engine depending on UUID in query and default engine setting.
/// Do nothing if it's an internal ATTACH on server startup or short-syntax ATTACH query from user,
@ -204,11 +204,12 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
metadata_path = metadata_path / "metadata" / database_name_escaped;
}
if (create.storage->engine->name == "MaterializeMySQL" && !getContext()->getSettingsRef().allow_experimental_database_materialize_mysql
if ((create.storage->engine->name == "MaterializeMySQL" || create.storage->engine->name == "MaterializedMySQL")
&& !getContext()->getSettingsRef().allow_experimental_database_materialized_mysql
&& !internal)
{
throw Exception("MaterializeMySQL is an experimental database engine. "
"Enable allow_experimental_database_materialize_mysql to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
throw Exception("MaterializedMySQL is an experimental database engine. "
"Enable allow_experimental_database_materialized_mysql to use it.", ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
if (create.storage->engine->name == "Replicated" && !getContext()->getSettingsRef().allow_experimental_database_replicated && !internal)

View File

@ -17,7 +17,7 @@
#endif
#if USE_MYSQL
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
#endif
#if USE_LIBPQXX
@ -315,7 +315,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
throw Exception("DETACH PERMANENTLY is not implemented for databases", ErrorCodes::NOT_IMPLEMENTED);
#if USE_MYSQL
if (database->getEngineName() == "MaterializeMySQL")
if (database->getEngineName() == "MaterializedMySQL")
stopDatabaseSynchronization(database);
#endif
if (auto * replicated = typeid_cast<DatabaseReplicated *>(database.get()))
@ -335,7 +335,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
/// since in this case getTablesIterator() may do some additional work,
/// see DatabaseMaterializeMySQL<>::getTablesIterator()
/// see DatabaseMaterializedMySQL<>::getTablesIterator()
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
iterator->table()->flush();

View File

@ -70,7 +70,7 @@
#include <Parsers/ASTSystemQuery.h>
#include <Databases/MySQL/MaterializeMySQLSyncThread.h>
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
#include <Parsers/ASTExternalDDLQuery.h>
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>

View File

@ -1732,7 +1732,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
syntax_analyzer_result->optimize_trivial_count
&& (settings.max_parallel_replicas <= 1)
&& storage
&& storage->getName() != "MaterializeMySQL"
&& storage->getName() != "MaterializedMySQL"
&& !row_policy_filter
&& processing_stage == QueryProcessingStage::FetchColumns
&& query_analyzer->hasAggregation()

View File

@ -28,7 +28,7 @@ static inline ASTPtr tryRewrittenCreateQuery(const String & query, ContextPtr co
context, "test_database", "test_database")[0];
}
static const char MATERIALIZEMYSQL_TABLE_COLUMNS[] = ", `_sign` Int8() MATERIALIZED 1"
static const char MATERIALIZEDMYSQL_TABLE_COLUMNS[] = ", `_sign` Int8() MATERIALIZED 1"
", `_version` UInt64() MATERIALIZED 1"
", INDEX _version _version TYPE minmax GRANULARITY 1";
@ -50,19 +50,19 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + ")", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` " + mapped_type + " COMMENT 'test_comment'" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
if (Poco::toUpper(test_type).find("INT") != std::string::npos)
@ -70,25 +70,25 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")" + " COMMENT 'test_comment'" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " NOT NULL UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` U" + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' UNSIGNED NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` U" + mapped_type + " COMMENT 'test_comment'" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
MATERIALIZEDMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}
}
@ -114,13 +114,13 @@ TEST(MySQLCreateRewritten, PartitionPolicy)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " PRIMARY KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
MATERIALIZEDMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " NOT NULL PRIMARY KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
MATERIALIZEDMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
}
}
@ -145,25 +145,25 @@ TEST(MySQLCreateRewritten, OrderbyPolicy)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " PRIMARY KEY, `key2` " + test_type + " UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` Nullable(" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS +
MATERIALIZEDMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, assumeNotNull(key2))");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " NOT NULL PRIMARY KEY, `key2` " + test_type + " NOT NULL UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
MATERIALIZEDMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " KEY UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
MATERIALIZEDMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + ", `key2` " + test_type + " UNIQUE KEY, PRIMARY KEY(`key`, `key2`))", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
MATERIALIZEDMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
}
}
@ -176,25 +176,25 @@ TEST(MySQLCreateRewritten, RewrittenQueryWithPrimaryKey)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY KEY) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL, PRIMARY KEY (`key`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key_1` int NOT NULL, key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int32, `key_2` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key_1, 4294967) ORDER BY (key_1, key_2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key_1` BIGINT NOT NULL, key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int64, `key_2` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key_2, 4294967) ORDER BY (key_1, key_2)");
}
@ -206,7 +206,7 @@ TEST(MySQLCreateRewritten, RewrittenQueryWithPrefixKey)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY KEY, `prefix_key` varchar(200) NOT NULL, KEY prefix_key_index(prefix_key(2))) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `prefix_key` String" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) + ") ENGINE = "
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY (key, prefix_key)");
}
@ -220,7 +220,7 @@ TEST(MySQLCreateRewritten, UniqueKeysConvert)
" id bigint NOT NULL AUTO_INCREMENT, tenant_id bigint NOT NULL, PRIMARY KEY (id), UNIQUE KEY code_id (code, tenant_id), UNIQUE KEY name_id (name, tenant_id))"
" ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`code` String, `name` String, `id` Int64, `tenant_id` Int64" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(id, 18446744073709551) ORDER BY (code, name, tenant_id, id)");
}
@ -232,7 +232,7 @@ TEST(MySQLCreateRewritten, QueryWithColumnComments)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` INT COMMENT 'test_comment')", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(Int32) COMMENT 'test_comment'" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}
@ -244,16 +244,16 @@ TEST(MySQLCreateRewritten, QueryWithEnum)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` ENUM('a','b','c'))", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(Enum8('a' = 1, 'b' = 2, 'c' = 3))" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` ENUM('a','b','c') NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Enum8('a' = 1, 'b' = 2, 'c' = 3)" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` ENUM('a','b','c') COMMENT 'test_comment')", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(Enum8('a' = 1, 'b' = 2, 'c' = 3)) COMMENT 'test_comment'" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
std::string(MATERIALIZEDMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}

View File

@ -174,9 +174,8 @@ void PullingAsyncPipelineExecutor::cancel()
if (data && !data->is_finished && data->executor)
data->executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung.
if (lazy_format && !lazy_format->isFinished())
lazy_format->finish();
/// The following code is needed to rethrow exception from PipelineExecutor.
/// It could have been thrown from pull(), but we will not likely call it again.
/// Join thread here to wait for possible exception.
if (data && data->thread.joinable())

View File

@ -29,7 +29,7 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit) override;
void finish()
void onCancel() override
{
finished_processing = true;
/// Clear queue in case if somebody is waiting lazy_format to push.

View File

@ -4,7 +4,7 @@
#if USE_MYSQL
#include <Storages/StorageMaterializeMySQL.h>
#include <Storages/StorageMaterializedMySQL.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
@ -21,14 +21,14 @@
#include <Processors/Pipe.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
#include <Databases/MySQL/DatabaseMaterializedMySQL.h>
#include <Storages/ReadFinalForExternalReplicaStorage.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_)
StorageMaterializedMySQL::StorageMaterializedMySQL(const StoragePtr & nested_storage_, const IDatabase * database_)
: StorageProxy(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
{
StorageInMemoryMetadata in_memory_metadata;
@ -36,12 +36,12 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora
setInMemoryMetadata(in_memory_metadata);
}
bool StorageMaterializeMySQL::needRewriteQueryWithFinal(const Names & column_names) const
bool StorageMaterializedMySQL::needRewriteQueryWithFinal(const Names & column_names) const
{
return needRewriteQueryWithFinalForStorage(column_names, nested_storage);
}
Pipe StorageMaterializeMySQL::read(
Pipe StorageMaterializedMySQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
@ -57,14 +57,14 @@ Pipe StorageMaterializeMySQL::read(
query_info, context, processed_stage, max_block_size, num_streams);
}
NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const
NamesAndTypesList StorageMaterializedMySQL::getVirtuals() const
{
/// If the background synchronization thread has exception.
rethrowSyncExceptionIfNeed(database);
return nested_storage->getVirtuals();
}
IStorage::ColumnSizeByName StorageMaterializeMySQL::getColumnSizes() const
IStorage::ColumnSizeByName StorageMaterializedMySQL::getColumnSizes() const
{
auto sizes = nested_storage->getColumnSizes();
auto nested_header = nested_storage->getInMemoryMetadataPtr()->getSampleBlock();

View File

@ -16,13 +16,13 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
class StorageMaterializeMySQL final : public shared_ptr_helper<StorageMaterializeMySQL>, public StorageProxy
class StorageMaterializedMySQL final : public shared_ptr_helper<StorageMaterializedMySQL>, public StorageProxy
{
friend struct shared_ptr_helper<StorageMaterializeMySQL>;
friend struct shared_ptr_helper<StorageMaterializedMySQL>;
public:
String getName() const override { return "MaterializeMySQL"; }
String getName() const override { return "MaterializedMySQL"; }
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_);
StorageMaterializedMySQL(const StoragePtr & nested_storage_, const IDatabase * database_);
bool needRewriteQueryWithFinal(const Names & column_names) const override;
@ -42,7 +42,7 @@ public:
private:
[[noreturn]] void throwNotAllowed() const
{
throw Exception("This method is not allowed for MaterializeMySQL", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("This method is not allowed for MaterializedMySQL", ErrorCodes::NOT_IMPLEMENTED);
}
StoragePtr nested_storage;

View File

@ -391,7 +391,7 @@ Pipe StorageMerge::createSources(
if (!modified_select.final() && storage->needRewriteQueryWithFinal(real_column_names))
{
/// NOTE: It may not work correctly in some cases, because query was analyzed without final.
/// However, it's needed for MaterializeMySQL and it's unlikely that someone will use it with Merge tables.
/// However, it's needed for MaterializedMySQL and it's unlikely that someone will use it with Merge tables.
modified_select.setFinal();
}
@ -727,10 +727,10 @@ void StorageMerge::convertingSourceStream(
IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const
{
auto first_materialize_mysql = getFirstTable([](const StoragePtr & table) { return table && table->getName() == "MaterializeMySQL"; });
if (!first_materialize_mysql)
auto first_materialized_mysql = getFirstTable([](const StoragePtr & table) { return table && table->getName() == "MaterializedMySQL"; });
if (!first_materialized_mysql)
return {};
return first_materialize_mysql->getColumnSizes();
return first_materialized_mysql->getColumnSizes();
}
void registerStorageMerge(StorageFactory & factory)

View File

@ -4377,12 +4377,6 @@ void StorageReplicatedMergeTree::shutdown()
/// Wait for all of them
std::unique_lock lock(data_parts_exchange_ptr->rwlock);
}
/// We clear all old parts after stopping all background operations. It's
/// important, because background operations can produce temporary parts
/// which will remove themselves in their destructors. If so, we may have
/// race condition between our remove call and background process.
clearOldPartsFromFilesystem(true);
}

View File

@ -7,7 +7,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMaterializeMySQL.h>
#include <Storages/StorageMaterializedMySQL.h>
#include <Storages/VirtualColumnUtils.h>
#include <Access/ContextAccess.h>
#include <Databases/IDatabase.h>
@ -124,7 +124,7 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, Conte
String engine_name = storage->getName();
#if USE_MYSQL
if (auto * proxy = dynamic_cast<StorageMaterializeMySQL *>(storage.get()))
if (auto * proxy = dynamic_cast<StorageMaterializedMySQL *>(storage.get()))
{
auto nested = proxy->getNested();
storage.swap(nested);

View File

@ -134,7 +134,7 @@ SRCS(
StorageJoin.cpp
StorageLog.cpp
StorageLogSettings.cpp
StorageMaterializeMySQL.cpp
StorageMaterializedMySQL.cpp
StorageMaterializedView.cpp
StorageMemory.cpp
StorageMerge.cpp

View File

@ -127,60 +127,60 @@
"test_keeper_multinode_simple/test.py::test_simple_replicated_table",
"test_keeper_multinode_simple/test.py::test_watch_on_follower",
"test_limited_replicated_fetches/test.py::test_limited_fetches",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_multi_table_update[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_multi_table_update[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_settings[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_mysql_settings[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_network_partition_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_network_partition_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_network_partition_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_network_partition_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_select_without_columns_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_select_without_columns_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_select_without_columns_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_select_without_columns_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_system_parts_table[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_system_parts_table[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_system_tables_table[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_parts_delete_zookeeper/test.py::test_merge_doesnt_work_without_zookeeper",
"test_polymorphic_parts/test.py::test_compact_parts_only",
"test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_compact-Compact]",

View File

@ -3,4 +3,4 @@
# 1. Generate all tests list as in CI run
./runner ' --setup-plan' | grep '::' | sed 's/ (fixtures used:.*//g' | sed 's/^ *//g' | sed 's/ *$//g' | sort -u > all_tests.txt
# 2. Filter known tests that are currently not run in parallel
cat all_tests.txt | grep '^test_replicated_database\|^test_disabled_mysql_server\|^test_distributed_ddl\|^test_distributed_ddl\|^test_quorum_inserts_parallel\|^test_ddl_worker_non_leader\|^test_consistent_parts_after_clone_replica\|^test_materialize_mysql_database\|^test_atomic_drop_table\|^test_distributed_respect_user_timeouts\|^test_storage_kafka\|^test_replace_partition\|^test_replicated_fetches_timeouts\|^test_system_clusters_actual_information\|^test_delayed_replica_failover\|^test_limited_replicated_fetches\|^test_hedged_requests\|^test_insert_into_distributed\|^test_insert_into_distributed_through_materialized_view\|^test_drop_replica\|^test_attach_without_fetching\|^test_system_replicated_fetches\|^test_cross_replication\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_insert_into_distributed_sync_async\|^test_hedged_requests_parallel\|^test_dictionaries_update_field\|^test_broken_part_during_merge\|^test_random_inserts\|^test_reload_clusters_config\|^test_parts_delete_zookeeper\|^test_polymorphic_parts\|^test_keeper_multinode_simple\|^test_https_replication\|^test_storage_kerberized_kafka\|^test_cleanup_dir_after_bad_zk_conn\|^test_system_metrics\|^test_keeper_multinode_blocade_leader' | awk '{$1=$1;print}' | jq -R -n '[inputs] | .' > parallel_skip.json
cat all_tests.txt | grep '^test_replicated_database\|^test_disabled_mysql_server\|^test_distributed_ddl\|^test_distributed_ddl\|^test_quorum_inserts_parallel\|^test_ddl_worker_non_leader\|^test_consistent_parts_after_clone_replica\|^test_materialized_mysql_database\|^test_atomic_drop_table\|^test_distributed_respect_user_timeouts\|^test_storage_kafka\|^test_replace_partition\|^test_replicated_fetches_timeouts\|^test_system_clusters_actual_information\|^test_delayed_replica_failover\|^test_limited_replicated_fetches\|^test_hedged_requests\|^test_insert_into_distributed\|^test_insert_into_distributed_through_materialized_view\|^test_drop_replica\|^test_attach_without_fetching\|^test_system_replicated_fetches\|^test_cross_replication\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_dictionary_allow_read_expired_keys\|^test_insert_into_distributed_sync_async\|^test_hedged_requests_parallel\|^test_dictionaries_update_field\|^test_broken_part_during_merge\|^test_random_inserts\|^test_reload_clusters_config\|^test_parts_delete_zookeeper\|^test_polymorphic_parts\|^test_keeper_multinode_simple\|^test_https_replication\|^test_storage_kerberized_kafka\|^test_cleanup_dir_after_bad_zk_conn\|^test_system_metrics\|^test_keeper_multinode_blocade_leader' | awk '{$1=$1;print}' | jq -R -n '[inputs] | .' > parallel_skip.json

View File

@ -131,60 +131,60 @@
"test_keeper_multinode_simple/test.py::test_simple_replicated_table",
"test_keeper_multinode_simple/test.py::test_watch_on_follower",
"test_limited_replicated_fetches/test.py::test_limited_fetches",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_multi_table_update[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_multi_table_update[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_killed_while_insert_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_mysql_settings[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_mysql_settings[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_network_partition_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_network_partition_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_network_partition_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_network_partition_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_select_without_columns_5_7[atomic]",
"test_materialize_mysql_database/test.py::test_select_without_columns_5_7[ordinary]",
"test_materialize_mysql_database/test.py::test_select_without_columns_8_0[atomic]",
"test_materialize_mysql_database/test.py::test_select_without_columns_8_0[ordinary]",
"test_materialize_mysql_database/test.py::test_system_parts_table[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_system_parts_table[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_system_tables_table[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_clickhouse_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_insert_with_modify_binlog_checksum_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_empty_transaction_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_ddl_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_dml_with_mysql_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_materialize_database_err_sync_user_privs_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_multi_table_update[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_killed_while_insert_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_mysql_kill_sync_thread_restore_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_mysql_settings[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_network_partition_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_5_7[ordinary]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[atomic]",
"test_materialized_mysql_database/test.py::test_select_without_columns_8_0[ordinary]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_parts_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialized_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_parts_delete_zookeeper/test.py::test_merge_doesnt_work_without_zookeeper",
"test_polymorphic_parts/test.py::test_compact_parts_only",
"test_polymorphic_parts/test.py::test_different_part_types_on_replicas[polymorphic_table_compact-Compact]",

View File

@ -2,7 +2,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
cluster = ClickHouseCluster(__file__, name="detach")
# Version 21.6.3.14 has incompatible partition id for tables with UUID in partition key.
node_21_6 = cluster.add_instance('node_21_6', image='yandex/clickhouse-server', tag='21.6.3.14', stay_alive=True, with_installed_binary=True)

View File

@ -2,7 +2,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_experimental_database_materialized_mysql>1</allow_experimental_database_materialized_mysql>
<allow_introspection_functions>1</allow_introspection_functions>
<optimize_on_insert>0</optimize_on_insert>
<default_database_engine>Ordinary</default_database_engine>

View File

@ -2,7 +2,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_experimental_database_materialized_mysql>1</allow_experimental_database_materialized_mysql>
<default_database_engine>Atomic</default_database_engine>
</default>
</profiles>

View File

@ -2,7 +2,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_experimental_database_materialized_mysql>1</allow_experimental_database_materialized_mysql>
<default_database_engine>Atomic</default_database_engine>
<external_storage_max_read_rows>1</external_storage_max_read_rows>
<external_storage_max_read_bytes>0</external_storage_max_read_bytes>

View File

@ -2,7 +2,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<allow_experimental_database_materialized_mysql>1</allow_experimental_database_materialized_mysql>
<default_database_engine>Atomic</default_database_engine>
<external_storage_max_read_rows>0</external_storage_max_read_rows>
<external_storage_max_read_bytes>1</external_storage_max_read_bytes>

View File

@ -30,7 +30,7 @@ def check_query(clickhouse_node, query, result_set, retry_count=10, interval_sec
assert clickhouse_node.query(query) == result_set
def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def dml_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_dml")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_dml")
mysql_node.query("CREATE DATABASE test_database_dml DEFAULT CHARACTER SET 'utf8'")
@ -117,7 +117,7 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
mysql_node.query("DROP DATABASE test_database_dml")
def materialize_mysql_database_with_views(clickhouse_node, mysql_node, service_name):
def materialized_mysql_database_with_views(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
@ -146,7 +146,7 @@ def materialize_mysql_database_with_views(clickhouse_node, mysql_node, service_n
'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00', true);
""")
clickhouse_node.query(
"CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database ENGINE = MaterializedMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(
service_name))
assert "test_database" in clickhouse_node.query("SHOW DATABASES")
@ -156,7 +156,7 @@ def materialize_mysql_database_with_views(clickhouse_node, mysql_node, service_n
mysql_node.query("DROP DATABASE test_database")
def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_node, service_name):
def materialized_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_dt")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_dt")
mysql_node.query("CREATE DATABASE test_database_dt DEFAULT CHARACTER SET 'utf8'")
@ -166,7 +166,7 @@ def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_
mysql_node.query("INSERT INTO test_database_dt.test_table_1 VALUES(3, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.99', -" + ('9' * 35) + "." + ('9' * 30) + ")")
mysql_node.query("INSERT INTO test_database_dt.test_table_1 VALUES(4, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.9999', -." + ('0' * 29) + "1)")
clickhouse_node.query("CREATE DATABASE test_database_dt ENGINE = MaterializeMySQL('{}:3306', 'test_database_dt', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE test_database_dt ENGINE = MaterializedMySQL('{}:3306', 'test_database_dt', 'root', 'clickhouse')".format(service_name))
assert "test_database_dt" in clickhouse_node.query("SHOW DATABASES")
check_query(clickhouse_node, "SELECT * FROM test_database_dt.test_table_1 ORDER BY key FORMAT TSV",
@ -190,7 +190,7 @@ def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_
mysql_node.query("DROP DATABASE test_database_dt")
def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def drop_table_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_drop")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_drop")
mysql_node.query("CREATE DATABASE test_database_drop DEFAULT CHARACTER SET 'utf8'")
@ -204,7 +204,7 @@ def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, serv
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_drop ENGINE = MaterializeMySQL('{}:3306', 'test_database_drop', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_drop ENGINE = MaterializedMySQL('{}:3306', 'test_database_drop', 'root', 'clickhouse')".format(
service_name))
assert "test_database_drop" in clickhouse_node.query("SHOW DATABASES")
@ -225,7 +225,7 @@ def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, serv
mysql_node.query("DROP DATABASE test_database_drop")
def create_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def create_table_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_create")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_create")
mysql_node.query("CREATE DATABASE test_database_create DEFAULT CHARACTER SET 'utf8'")
@ -236,7 +236,7 @@ def create_table_with_materialize_mysql_database(clickhouse_node, mysql_node, se
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_create ENGINE = MaterializeMySQL('{}:3306', 'test_database_create', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_create ENGINE = MaterializedMySQL('{}:3306', 'test_database_create', 'root', 'clickhouse')".format(
service_name))
# Check for pre-existing status
@ -253,7 +253,7 @@ def create_table_with_materialize_mysql_database(clickhouse_node, mysql_node, se
mysql_node.query("DROP DATABASE test_database_create")
def rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def rename_table_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_rename")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_rename")
mysql_node.query("CREATE DATABASE test_database_rename DEFAULT CHARACTER SET 'utf8'")
@ -263,7 +263,7 @@ def rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, se
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_rename ENGINE = MaterializeMySQL('{}:3306', 'test_database_rename', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_rename ENGINE = MaterializedMySQL('{}:3306', 'test_database_rename', 'root', 'clickhouse')".format(
service_name))
assert "test_database_rename" in clickhouse_node.query("SHOW DATABASES")
@ -275,7 +275,7 @@ def rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, se
mysql_node.query("DROP DATABASE test_database_rename")
def alter_add_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def alter_add_column_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_add")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_add")
mysql_node.query("CREATE DATABASE test_database_add DEFAULT CHARACTER SET 'utf8'")
@ -289,7 +289,7 @@ def alter_add_column_with_materialize_mysql_database(clickhouse_node, mysql_node
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_add ENGINE = MaterializeMySQL('{}:3306', 'test_database_add', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_add ENGINE = MaterializedMySQL('{}:3306', 'test_database_add', 'root', 'clickhouse')".format(
service_name))
assert "test_database_add" in clickhouse_node.query("SHOW DATABASES")
@ -317,7 +317,7 @@ def alter_add_column_with_materialize_mysql_database(clickhouse_node, mysql_node
mysql_node.query("DROP DATABASE test_database_add")
def alter_drop_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def alter_drop_column_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_alter_drop")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_alter_drop")
mysql_node.query("CREATE DATABASE test_database_alter_drop DEFAULT CHARACTER SET 'utf8'")
@ -328,7 +328,7 @@ def alter_drop_column_with_materialize_mysql_database(clickhouse_node, mysql_nod
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_alter_drop ENGINE = MaterializeMySQL('{}:3306', 'test_database_alter_drop', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_alter_drop ENGINE = MaterializedMySQL('{}:3306', 'test_database_alter_drop', 'root', 'clickhouse')".format(
service_name))
assert "test_database_alter_drop" in clickhouse_node.query("SHOW DATABASES")
@ -351,7 +351,7 @@ def alter_drop_column_with_materialize_mysql_database(clickhouse_node, mysql_nod
mysql_node.query("DROP DATABASE test_database_alter_drop")
def alter_rename_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def alter_rename_column_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_alter_rename")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_alter_rename")
mysql_node.query("CREATE DATABASE test_database_alter_rename DEFAULT CHARACTER SET 'utf8'")
@ -364,7 +364,7 @@ def alter_rename_column_with_materialize_mysql_database(clickhouse_node, mysql_n
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_alter_rename ENGINE = MaterializeMySQL('{}:3306', 'test_database_alter_rename', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_alter_rename ENGINE = MaterializedMySQL('{}:3306', 'test_database_alter_rename', 'root', 'clickhouse')".format(
service_name))
assert "test_database_alter_rename" in clickhouse_node.query("SHOW DATABASES")
@ -386,7 +386,7 @@ def alter_rename_column_with_materialize_mysql_database(clickhouse_node, mysql_n
mysql_node.query("DROP DATABASE test_database_alter_rename")
def alter_modify_column_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def alter_modify_column_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_alter_modify")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_alter_modify")
mysql_node.query("CREATE DATABASE test_database_alter_modify DEFAULT CHARACTER SET 'utf8'")
@ -399,7 +399,7 @@ def alter_modify_column_with_materialize_mysql_database(clickhouse_node, mysql_n
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_alter_modify ENGINE = MaterializeMySQL('{}:3306', 'test_database_alter_modify', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_alter_modify ENGINE = MaterializedMySQL('{}:3306', 'test_database_alter_modify', 'root', 'clickhouse')".format(
service_name))
assert "test_database_alter_modify" in clickhouse_node.query("SHOW DATABASES")
@ -429,10 +429,10 @@ def alter_modify_column_with_materialize_mysql_database(clickhouse_node, mysql_n
# TODO: need ClickHouse support ALTER TABLE table_name ADD COLUMN column_name, RENAME COLUMN column_name TO new_column_name;
# def test_mysql_alter_change_column_for_materialize_mysql_database(started_cluster):
# def test_mysql_alter_change_column_for_materialized_mysql_database(started_cluster):
# pass
def alter_rename_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def alter_rename_table_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_rename_table")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_rename_table")
mysql_node.query("CREATE DATABASE test_database_rename_table DEFAULT CHARACTER SET 'utf8'")
@ -444,7 +444,7 @@ def alter_rename_table_with_materialize_mysql_database(clickhouse_node, mysql_no
# create mapping
clickhouse_node.query(
"CREATE DATABASE test_database_rename_table ENGINE = MaterializeMySQL('{}:3306', 'test_database_rename_table', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_rename_table ENGINE = MaterializedMySQL('{}:3306', 'test_database_rename_table', 'root', 'clickhouse')".format(
service_name))
assert "test_database_rename_table" in clickhouse_node.query("SHOW DATABASES")
@ -479,7 +479,7 @@ def query_event_with_empty_transaction(clickhouse_node, mysql_node, service_name
mysql_node.query("INSERT INTO test_database_event.t1(a) VALUES(1)")
clickhouse_node.query(
"CREATE DATABASE test_database_event ENGINE = MaterializeMySQL('{}:3306', 'test_database_event', 'root', 'clickhouse')".format(
"CREATE DATABASE test_database_event ENGINE = MaterializedMySQL('{}:3306', 'test_database_event', 'root', 'clickhouse')".format(
service_name))
# Reject one empty GTID QUERY event with 'BEGIN' and 'COMMIT'
@ -510,7 +510,7 @@ def select_without_columns(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE db")
mysql_node.query("CREATE TABLE db.t (a INT PRIMARY KEY, b INT)")
clickhouse_node.query(
"CREATE DATABASE db ENGINE = MaterializeMySQL('{}:3306', 'db', 'root', 'clickhouse') SETTINGS max_flush_data_time = 100000".format(service_name))
"CREATE DATABASE db ENGINE = MaterializedMySQL('{}:3306', 'db', 'root', 'clickhouse') SETTINGS max_flush_data_time = 100000".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM db FORMAT TSV", "t\n")
clickhouse_node.query("SYSTEM STOP MERGES db.t")
clickhouse_node.query("CREATE VIEW v AS SELECT * FROM db.t")
@ -548,7 +548,7 @@ def select_without_columns(clickhouse_node, mysql_node, service_name):
def insert_with_modify_binlog_checksum(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE test_checksum")
mysql_node.query("CREATE TABLE test_checksum.t (a INT PRIMARY KEY, b varchar(200))")
clickhouse_node.query("CREATE DATABASE test_checksum ENGINE = MaterializeMySQL('{}:3306', 'test_checksum', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE test_checksum ENGINE = MaterializedMySQL('{}:3306', 'test_checksum', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM test_checksum FORMAT TSV", "t\n")
mysql_node.query("INSERT INTO test_checksum.t VALUES(1, '1111')")
check_query(clickhouse_node, "SELECT * FROM test_checksum.t ORDER BY a FORMAT TSV", "1\t1111\n")
@ -565,7 +565,7 @@ def insert_with_modify_binlog_checksum(clickhouse_node, mysql_node, service_name
mysql_node.query("DROP DATABASE test_checksum")
def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
def err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS priv_err_db")
mysql_node.query("DROP DATABASE IF EXISTS priv_err_db")
mysql_node.query("CREATE DATABASE priv_err_db DEFAULT CHARACTER SET 'utf8'")
@ -575,7 +575,7 @@ def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_n
mysql_node.result("SHOW GRANTS FOR 'test'@'%';")
clickhouse_node.query(
"CREATE DATABASE priv_err_db ENGINE = MaterializeMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
"CREATE DATABASE priv_err_db ENGINE = MaterializedMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
service_name))
check_query(clickhouse_node, "SELECT count() FROM priv_err_db.test_table_1 FORMAT TSV", "1\n", 30, 5)
@ -585,7 +585,7 @@ def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_n
mysql_node.query("REVOKE REPLICATION SLAVE ON *.* FROM 'test'@'%'")
clickhouse_node.query(
"CREATE DATABASE priv_err_db ENGINE = MaterializeMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
"CREATE DATABASE priv_err_db ENGINE = MaterializedMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
service_name))
assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES")
assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db")
@ -593,7 +593,7 @@ def err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, mysql_n
mysql_node.query("REVOKE REPLICATION CLIENT, RELOAD ON *.* FROM 'test'@'%'")
clickhouse_node.query(
"CREATE DATABASE priv_err_db ENGINE = MaterializeMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
"CREATE DATABASE priv_err_db ENGINE = MaterializedMySQL('{}:3306', 'priv_err_db', 'test', '123')".format(
service_name))
assert "priv_err_db" in clickhouse_node.query("SHOW DATABASES")
assert "test_table_1" not in clickhouse_node.query("SHOW TABLES FROM priv_err_db")
@ -641,7 +641,7 @@ def network_partition_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE test;")
clickhouse_node.query(
"CREATE DATABASE test_database_network ENGINE = MaterializeMySQL('{}:3306', 'test_database_network', 'root', 'clickhouse')".format(service_name))
"CREATE DATABASE test_database_network ENGINE = MaterializedMySQL('{}:3306', 'test_database_network', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT * FROM test_database_network.test_table", '')
with PartitionManager() as pm:
@ -651,7 +651,7 @@ def network_partition_test(clickhouse_node, mysql_node, service_name):
with pytest.raises(QueryRuntimeException) as exception:
clickhouse_node.query(
"CREATE DATABASE test ENGINE = MaterializeMySQL('{}:3306', 'test', 'root', 'clickhouse')".format(service_name))
"CREATE DATABASE test ENGINE = MaterializedMySQL('{}:3306', 'test', 'root', 'clickhouse')".format(service_name))
assert "Can't connect to MySQL server" in str(exception.value)
@ -660,7 +660,7 @@ def network_partition_test(clickhouse_node, mysql_node, service_name):
check_query(clickhouse_node, "SELECT * FROM test_database_network.test_table FORMAT TSV", '1\n')
clickhouse_node.query(
"CREATE DATABASE test ENGINE = MaterializeMySQL('{}:3306', 'test', 'root', 'clickhouse')".format(service_name))
"CREATE DATABASE test ENGINE = MaterializedMySQL('{}:3306', 'test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM test_database_network FORMAT TSV", "test_table\n")
mysql_node.query("CREATE TABLE test.test ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
@ -686,8 +686,8 @@ def mysql_kill_sync_thread_restore_test(clickhouse_node, mysql_node, service_nam
mysql_node.query("CREATE TABLE test_database_auto.test_table ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
mysql_node.query("INSERT INTO test_database_auto.test_table VALUES (11)")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse') SETTINGS max_wait_time_when_mysql_unavailable=-1".format(service_name))
clickhouse_node.query("CREATE DATABASE test_database_auto ENGINE = MaterializeMySQL('{}:3306', 'test_database_auto', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializedMySQL('{}:3306', 'test_database', 'root', 'clickhouse') SETTINGS max_wait_time_when_mysql_unavailable=-1".format(service_name))
clickhouse_node.query("CREATE DATABASE test_database_auto ENGINE = MaterializedMySQL('{}:3306', 'test_database_auto', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT * FROM test_database.test_table FORMAT TSV", '1\n')
check_query(clickhouse_node, "SELECT * FROM test_database_auto.test_table FORMAT TSV", '11\n')
@ -737,7 +737,7 @@ def mysql_killed_while_insert(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS kill_mysql_while_insert")
mysql_node.query("CREATE DATABASE kill_mysql_while_insert")
mysql_node.query("CREATE TABLE kill_mysql_while_insert.test ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
clickhouse_node.query("CREATE DATABASE kill_mysql_while_insert ENGINE = MaterializeMySQL('{}:3306', 'kill_mysql_while_insert', 'root', 'clickhouse') SETTINGS max_wait_time_when_mysql_unavailable=-1".format(service_name))
clickhouse_node.query("CREATE DATABASE kill_mysql_while_insert ENGINE = MaterializedMySQL('{}:3306', 'kill_mysql_while_insert', 'root', 'clickhouse') SETTINGS max_wait_time_when_mysql_unavailable=-1".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM kill_mysql_while_insert FORMAT TSV", 'test\n')
try:
@ -773,7 +773,7 @@ def clickhouse_killed_while_insert(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS kill_clickhouse_while_insert")
mysql_node.query("CREATE DATABASE kill_clickhouse_while_insert")
mysql_node.query("CREATE TABLE kill_clickhouse_while_insert.test ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB;")
clickhouse_node.query("CREATE DATABASE kill_clickhouse_while_insert ENGINE = MaterializeMySQL('{}:3306', 'kill_clickhouse_while_insert', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE kill_clickhouse_while_insert ENGINE = MaterializedMySQL('{}:3306', 'kill_clickhouse_while_insert', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM kill_clickhouse_while_insert FORMAT TSV", 'test\n')
def insert(num):
@ -802,7 +802,7 @@ def utf8mb4_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE utf8mb4_test")
mysql_node.query("CREATE TABLE utf8mb4_test.test (id INT(11) NOT NULL PRIMARY KEY, name VARCHAR(255)) ENGINE=InnoDB DEFAULT CHARACTER SET utf8mb4")
mysql_node.query("INSERT INTO utf8mb4_test.test VALUES(1, '🦄'),(2, '\u2601')")
clickhouse_node.query("CREATE DATABASE utf8mb4_test ENGINE = MaterializeMySQL('{}:3306', 'utf8mb4_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE utf8mb4_test ENGINE = MaterializedMySQL('{}:3306', 'utf8mb4_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM utf8mb4_test FORMAT TSV", "test\n")
check_query(clickhouse_node, "SELECT id, name FROM utf8mb4_test.test ORDER BY id", "1\t\U0001F984\n2\t\u2601\n")
@ -814,7 +814,7 @@ def system_parts_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("INSERT INTO system_parts_test.test VALUES(1),(2),(3)")
def check_active_parts(num):
check_query(clickhouse_node, "SELECT count() FROM system.parts WHERE database = 'system_parts_test' AND table = 'test' AND active = 1", "{}\n".format(num))
clickhouse_node.query("CREATE DATABASE system_parts_test ENGINE = MaterializeMySQL('{}:3306', 'system_parts_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE system_parts_test ENGINE = MaterializedMySQL('{}:3306', 'system_parts_test', 'root', 'clickhouse')".format(service_name))
check_active_parts(1)
mysql_node.query("INSERT INTO system_parts_test.test VALUES(4),(5),(6)")
check_active_parts(2)
@ -829,7 +829,7 @@ def multi_table_update_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE TABLE multi_table_update.b (id INT(11) NOT NULL PRIMARY KEY, othervalue VARCHAR(255))")
mysql_node.query("INSERT INTO multi_table_update.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO multi_table_update.b VALUES(1, 'bar')")
clickhouse_node.query("CREATE DATABASE multi_table_update ENGINE = MaterializeMySQL('{}:3306', 'multi_table_update', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE multi_table_update ENGINE = MaterializedMySQL('{}:3306', 'multi_table_update', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SHOW TABLES FROM multi_table_update", "a\nb\n")
mysql_node.query("UPDATE multi_table_update.a, multi_table_update.b SET value='baz', othervalue='quux' where a.id=b.id")
@ -841,7 +841,7 @@ def system_tables_test(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS system_tables_test")
mysql_node.query("CREATE DATABASE system_tables_test")
mysql_node.query("CREATE TABLE system_tables_test.test (id int NOT NULL PRIMARY KEY) ENGINE=InnoDB")
clickhouse_node.query("CREATE DATABASE system_tables_test ENGINE = MaterializeMySQL('{}:3306', 'system_tables_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE system_tables_test ENGINE = MaterializedMySQL('{}:3306', 'system_tables_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT partition_key, sorting_key, primary_key FROM system.tables WHERE database = 'system_tables_test' AND name = 'test'", "intDiv(id, 4294967)\tid\tid\n")
def materialize_with_column_comments_test(clickhouse_node, mysql_node, service_name):
@ -849,7 +849,7 @@ def materialize_with_column_comments_test(clickhouse_node, mysql_node, service_n
clickhouse_node.query("DROP DATABASE IF EXISTS materialize_with_column_comments_test")
mysql_node.query("CREATE DATABASE materialize_with_column_comments_test")
mysql_node.query("CREATE TABLE materialize_with_column_comments_test.test (id int NOT NULL PRIMARY KEY, value VARCHAR(255) COMMENT 'test comment') ENGINE=InnoDB")
clickhouse_node.query("CREATE DATABASE materialize_with_column_comments_test ENGINE = MaterializeMySQL('{}:3306', 'materialize_with_column_comments_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE materialize_with_column_comments_test ENGINE = MaterializedMySQL('{}:3306', 'materialize_with_column_comments_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "DESCRIBE TABLE materialize_with_column_comments_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(String)\t\t\ttest comment\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
mysql_node.query("ALTER TABLE materialize_with_column_comments_test.test MODIFY value VARCHAR(255) COMMENT 'comment test'")
check_query(clickhouse_node, "DESCRIBE TABLE materialize_with_column_comments_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(String)\t\t\tcomment test\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
@ -872,7 +872,7 @@ def materialize_with_enum8_test(clickhouse_node, mysql_node, service_name):
enum8_values_with_backslash += "\\\'" + str(enum8_values_count) +"\\\' = " + str(enum8_values_count)
mysql_node.query("CREATE TABLE materialize_with_enum8_test.test (id int NOT NULL PRIMARY KEY, value ENUM(" + enum8_values + ")) ENGINE=InnoDB")
mysql_node.query("INSERT INTO materialize_with_enum8_test.test (id, value) VALUES (1, '1'),(2, '2')")
clickhouse_node.query("CREATE DATABASE materialize_with_enum8_test ENGINE = MaterializeMySQL('{}:3306', 'materialize_with_enum8_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE materialize_with_enum8_test ENGINE = MaterializedMySQL('{}:3306', 'materialize_with_enum8_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum8_test.test ORDER BY id", "1\n2\n")
mysql_node.query("INSERT INTO materialize_with_enum8_test.test (id, value) VALUES (3, '127')")
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum8_test.test ORDER BY id", "1\n2\n127\n")
@ -894,7 +894,7 @@ def materialize_with_enum16_test(clickhouse_node, mysql_node, service_name):
enum16_values_with_backslash += "\\\'" + str(enum16_values_count) +"\\\' = " + str(enum16_values_count)
mysql_node.query("CREATE TABLE materialize_with_enum16_test.test (id int NOT NULL PRIMARY KEY, value ENUM(" + enum16_values + ")) ENGINE=InnoDB")
mysql_node.query("INSERT INTO materialize_with_enum16_test.test (id, value) VALUES (1, '1'),(2, '2')")
clickhouse_node.query("CREATE DATABASE materialize_with_enum16_test ENGINE = MaterializeMySQL('{}:3306', 'materialize_with_enum16_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE materialize_with_enum16_test ENGINE = MaterializedMySQL('{}:3306', 'materialize_with_enum16_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum16_test.test ORDER BY id", "1\n2\n")
mysql_node.query("INSERT INTO materialize_with_enum16_test.test (id, value) VALUES (3, '500')")
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum16_test.test ORDER BY id", "1\n2\n500\n")
@ -917,7 +917,7 @@ def alter_enum8_to_enum16_test(clickhouse_node, mysql_node, service_name):
enum8_values_with_backslash += "\\\'" + str(enum8_values_count) +"\\\' = " + str(enum8_values_count)
mysql_node.query("CREATE TABLE alter_enum8_to_enum16_test.test (id int NOT NULL PRIMARY KEY, value ENUM(" + enum8_values + ")) ENGINE=InnoDB")
mysql_node.query("INSERT INTO alter_enum8_to_enum16_test.test (id, value) VALUES (1, '1'),(2, '2')")
clickhouse_node.query("CREATE DATABASE alter_enum8_to_enum16_test ENGINE = MaterializeMySQL('{}:3306', 'alter_enum8_to_enum16_test', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE alter_enum8_to_enum16_test ENGINE = MaterializedMySQL('{}:3306', 'alter_enum8_to_enum16_test', 'root', 'clickhouse')".format(service_name))
mysql_node.query("INSERT INTO alter_enum8_to_enum16_test.test (id, value) VALUES (3, '75')")
check_query(clickhouse_node, "SELECT value FROM alter_enum8_to_enum16_test.test ORDER BY id", "1\n2\n75\n")
check_query(clickhouse_node, "DESCRIBE TABLE alter_enum8_to_enum16_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(Enum8(" + enum8_values_with_backslash + "))\t\t\t\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
@ -942,7 +942,7 @@ def move_to_prewhere_and_column_filtering(clickhouse_node, mysql_node, service_n
clickhouse_node.query("DROP DATABASE IF EXISTS cond_on_key_col")
mysql_node.query("DROP DATABASE IF EXISTS cond_on_key_col")
mysql_node.query("CREATE DATABASE cond_on_key_col")
clickhouse_node.query("CREATE DATABASE cond_on_key_col ENGINE = MaterializeMySQL('{}:3306', 'cond_on_key_col', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE cond_on_key_col ENGINE = MaterializedMySQL('{}:3306', 'cond_on_key_col', 'root', 'clickhouse')".format(service_name))
mysql_node.query("create table cond_on_key_col.products (id int primary key, product_id int not null, catalog_id int not null, brand_id int not null, name text)")
mysql_node.query("insert into cond_on_key_col.products (id, name, catalog_id, brand_id, product_id) values (915, 'ertyui', 5287, 15837, 0), (990, 'wer', 1053, 24390, 1), (781, 'qwerty', 1041, 1176, 2);")
mysql_node.query("create table cond_on_key_col.test (id int(11) NOT NULL AUTO_INCREMENT, a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, PRIMARY KEY (id)) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;")
@ -972,7 +972,7 @@ def mysql_settings_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("INSERT INTO test_database.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.a VALUES(2, 'bar')")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializedMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.a FORMAT TSV", "2\n")
assert clickhouse_node.query("SELECT COUNT(DISTINCT blockNumber()) FROM test_database.a FORMAT TSV") == "2\n"

View File

@ -94,40 +94,40 @@ def started_mysql_8_0():
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_mysql_database_with_views(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.dml_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialized_mysql_database_with_views(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialized_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.move_to_prewhere_and_column_filtering(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialize_mysql_database_with_views(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.dml_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_mysql_database_with_views(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.move_to_prewhere_and_column_filtering(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.drop_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.create_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.rename_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_add_column_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_drop_column_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
# mysql 5.7 cannot support alter rename column
# materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
# materialize_with_ddl.alter_rename_column_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_rename_table_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_modify_column_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_add_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.drop_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.create_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.rename_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_add_column_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_drop_column_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_rename_table_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_rename_column_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_modify_column_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
@ -160,12 +160,12 @@ def test_insert_with_modify_binlog_checksum_8_0(started_cluster, started_mysql_8
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, started_mysql_5_7, "mysql57")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.err_sync_user_privs_with_materialized_mysql_database(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [pytest.param(node_db_ordinary, id="ordinary"), pytest.param(node_db_atomic, id="atomic")])
def test_network_partition_5_7(started_cluster, started_mysql_5_7, clickhouse_node):

View File

@ -23,27 +23,27 @@ node5 = cluster.add_instance('node5', macros={'cluster': 'test3'}, main_configs=
all_nodes = [node1, node2, node3, node4, node5]
def prepare_cluster():
for node in all_nodes:
node.query("DROP TABLE IF EXISTS test_mutations SYNC")
for node in [node1, node2, node3, node4]:
node.query("""
CREATE TABLE test_mutations(d Date, x UInt32, i UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}')
ORDER BY x
PARTITION BY toYYYYMM(d)
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0
""")
node5.query(
"CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)")
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in all_nodes:
node.query("DROP TABLE IF EXISTS test_mutations")
for node in [node1, node2, node3, node4]:
node.query("""
CREATE TABLE test_mutations(d Date, x UInt32, i UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}')
ORDER BY x
PARTITION BY toYYYYMM(d)
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0
""")
node5.query(
"CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)")
yield cluster
finally:
@ -165,6 +165,8 @@ def wait_for_mutations(nodes, number_of_mutations):
def test_mutations(started_cluster):
prepare_cluster()
DURATION_SECONDS = 30
nodes = [node1, node2]
@ -212,6 +214,8 @@ def test_mutations(started_cluster):
]
)
def test_mutations_dont_prevent_merges(started_cluster, nodes):
prepare_cluster()
for year in range(2000, 2016):
rows = ''
date_str = '{}-01-{}'.format(year, random.randint(1, 10))

View File

@ -86,7 +86,7 @@ def test_upgrade_while_mutation(start_cluster):
node3.restart_with_latest_version(signal=9)
# checks for readonly
exec_query_with_retry(node3, "OPTIMIZE TABLE mt1", retry_count=60)
exec_query_with_retry(node3, "OPTIMIZE TABLE mt1", sleep_time=5, retry_count=60)
node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
# will delete nothing, but previous async mutation will finish with this query

View File

@ -181,7 +181,8 @@ std::vector<std::shared_ptr<Coordination::ZooKeeper>> Runner::getConnections()
"", /*identity*/
Poco::Timespan(0, 30000 * 1000),
Poco::Timespan(0, 1000 * 1000),
Poco::Timespan(0, 10000 * 1000)));
Poco::Timespan(0, 10000 * 1000),
nullptr));
}
return zookeepers;