Merge pull request #22697 from TCeason/feature/add_MaterializeMySQL_sync_bytes_judgment

Add MySQL read history data bytes judgment
This commit is contained in:
Kseniia Sumarokova 2021-04-20 10:26:42 +03:00 committed by GitHub
commit b43656bf89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 259 additions and 117 deletions

View File

@ -446,6 +446,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \ M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \ M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \ M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializeMySQL. If equal to 0, this setting is disabled", 0) \
M(UInt64, external_storage_max_read_bytes, 0, "Limit maximum number of bytes when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializeMySQL. If equal to 0, this setting is disabled", 0) \
\ \
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\ \

View File

@ -198,7 +198,7 @@ ASTPtr DatabaseConnectionMySQL::getCreateDatabaseQuery() const
void DatabaseConnectionMySQL::fetchTablesIntoLocalCache(ContextPtr local_context) const void DatabaseConnectionMySQL::fetchTablesIntoLocalCache(ContextPtr local_context) const
{ {
const auto & tables_with_modification_time = fetchTablesWithModificationTime(); const auto & tables_with_modification_time = fetchTablesWithModificationTime(local_context);
destroyLocalCacheExtraTables(tables_with_modification_time); destroyLocalCacheExtraTables(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time, local_context); fetchLatestTablesStructureIntoCache(tables_with_modification_time, local_context);
@ -252,7 +252,7 @@ void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(
} }
} }
std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTime() const std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTime(ContextPtr local_context) const
{ {
Block tables_status_sample_block Block tables_status_sample_block
{ {
@ -268,7 +268,8 @@ std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTim
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql; " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
std::map<String, UInt64> tables_with_modification_time; std::map<String, UInt64> tables_with_modification_time;
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE); StreamSettings mysql_input_stream_settings(local_context->getSettingsRef());
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_status_sample_block, mysql_input_stream_settings);
while (Block block = result.read()) while (Block block = result.read())
{ {
@ -292,7 +293,7 @@ DatabaseConnectionMySQL::fetchTablesColumnsList(const std::vector<String> & tabl
mysql_pool, mysql_pool,
database_name_in_mysql, database_name_in_mysql,
tables_name, tables_name,
settings.external_table_functions_use_nulls, settings,
database_settings->mysql_datatypes_support_level); database_settings->mysql_datatypes_support_level);
} }

View File

@ -108,7 +108,7 @@ private:
void fetchTablesIntoLocalCache(ContextPtr context) const; void fetchTablesIntoLocalCache(ContextPtr context) const;
std::map<String, UInt64> fetchTablesWithModificationTime() const; std::map<String, UInt64> fetchTablesWithModificationTime(ContextPtr local_context) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr context) const; std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr context) const;

View File

@ -44,7 +44,7 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::PoolWithFailover & pool, mysqlxx::PoolWithFailover & pool,
const String & database_name, const String & database_name,
const std::vector<String> & tables_name, const std::vector<String> & tables_name,
bool external_table_functions_use_nulls, const Settings & settings,
MultiEnum<MySQLDataTypesSupport> type_support) MultiEnum<MySQLDataTypesSupport> type_support)
{ {
std::map<String, NamesAndTypesList> tables_and_columns; std::map<String, NamesAndTypesList> tables_and_columns;
@ -78,7 +78,8 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
" WHERE TABLE_SCHEMA = " << quote << database_name " WHERE TABLE_SCHEMA = " << quote << database_name
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION"; << " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE); StreamSettings mysql_input_stream_settings(settings);
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, mysql_input_stream_settings);
while (Block block = result.read()) while (Block block = result.read())
{ {
const auto & table_name_col = *block.getByPosition(0).column; const auto & table_name_col = *block.getByPosition(0).column;
@ -99,7 +100,7 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
convertMySQLDataType( convertMySQLDataType(
type_support, type_support,
column_type_col[i].safeGet<String>(), column_type_col[i].safeGet<String>(),
external_table_functions_use_nulls && is_nullable_col[i].safeGet<UInt64>(), settings.external_table_functions_use_nulls && is_nullable_col[i].safeGet<UInt64>(),
is_unsigned_col[i].safeGet<UInt64>(), is_unsigned_col[i].safeGet<UInt64>(),
char_max_length_col[i].safeGet<UInt64>(), char_max_length_col[i].safeGet<UInt64>(),
precision_col[i].safeGet<UInt64>(), precision_col[i].safeGet<UInt64>(),

View File

@ -12,6 +12,7 @@
#include <map> #include <map>
#include <vector> #include <vector>
#include <Core/Settings.h>
namespace DB namespace DB
{ {
@ -20,7 +21,7 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::PoolWithFailover & pool, mysqlxx::PoolWithFailover & pool,
const String & database_name, const String & database_name,
const std::vector<String> & tables_name, const std::vector<String> & tables_name,
bool external_table_functions_use_nulls, const Settings & settings,
MultiEnum<MySQLDataTypesSupport> type_support); MultiEnum<MySQLDataTypesSupport> type_support);
} }

View File

@ -24,7 +24,8 @@ namespace ErrorCodes
} }
static std::unordered_map<String, String> fetchTablesCreateQuery( static std::unordered_map<String, String> fetchTablesCreateQuery(
const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name, const std::vector<String> & fetch_tables) const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name,
const std::vector<String> & fetch_tables, const Settings & global_settings)
{ {
std::unordered_map<String, String> tables_create_query; std::unordered_map<String, String> tables_create_query;
for (const auto & fetch_table_name : fetch_tables) for (const auto & fetch_table_name : fetch_tables)
@ -34,9 +35,10 @@ static std::unordered_map<String, String> fetchTablesCreateQuery(
{std::make_shared<DataTypeString>(), "Create Table"}, {std::make_shared<DataTypeString>(), "Create Table"},
}; };
StreamSettings mysql_input_stream_settings(global_settings, false, true);
MySQLBlockInputStream show_create_table( MySQLBlockInputStream show_create_table(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name), connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name),
show_create_table_header, DEFAULT_BLOCK_SIZE, false, true); show_create_table_header, mysql_input_stream_settings);
Block create_query_block = show_create_table.read(); Block create_query_block = show_create_table.read();
if (!create_query_block || create_query_block.rows() != 1) if (!create_query_block || create_query_block.rows() != 1)
@ -49,13 +51,14 @@ static std::unordered_map<String, String> fetchTablesCreateQuery(
} }
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database) static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database, const Settings & global_settings)
{ {
Block header{{std::make_shared<DataTypeString>(), "table_name"}}; Block header{{std::make_shared<DataTypeString>(), "table_name"}};
String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE != 'VIEW' AND TABLE_SCHEMA = " + quoteString(database); String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE != 'VIEW' AND TABLE_SCHEMA = " + quoteString(database);
std::vector<String> tables_in_db; std::vector<String> tables_in_db;
MySQLBlockInputStream input(connection, query, header, DEFAULT_BLOCK_SIZE); StreamSettings mysql_input_stream_settings(global_settings);
MySQLBlockInputStream input(connection, query, header, mysql_input_stream_settings);
while (Block block = input.read()) while (Block block = input.read())
{ {
@ -77,7 +80,8 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"}, {std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
}; };
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE, false, true); StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, mysql_input_stream_settings);
Block master_status = input.read(); Block master_status = input.read();
if (!master_status || master_status.rows() != 1) if (!master_status || master_status.rows() != 1)
@ -99,7 +103,8 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
}; };
const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'"; const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'";
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE, false, true); StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, mysql_input_stream_settings);
while (Block variables_block = variables_input.read()) while (Block variables_block = variables_input.read())
{ {
@ -114,7 +119,7 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
} }
} }
static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & connection, WriteBuffer & out) static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & connection, const Settings & global_settings, WriteBuffer & out)
{ {
Block sync_user_privs_header Block sync_user_privs_header
{ {
@ -122,7 +127,8 @@ static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & conne
}; };
String grants_query, sub_privs; String grants_query, sub_privs;
MySQLBlockInputStream input(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, DEFAULT_BLOCK_SIZE); StreamSettings mysql_input_stream_settings(global_settings);
MySQLBlockInputStream input(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, mysql_input_stream_settings);
while (Block block = input.read()) while (Block block = input.read())
{ {
for (size_t index = 0; index < block.rows(); ++index) for (size_t index = 0; index < block.rows(); ++index)
@ -146,11 +152,11 @@ static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & conne
return false; return false;
} }
static void checkSyncUserPriv(const mysqlxx::PoolWithFailover::Entry & connection) static void checkSyncUserPriv(const mysqlxx::PoolWithFailover::Entry & connection, const Settings & global_settings)
{ {
WriteBufferFromOwnString out; WriteBufferFromOwnString out;
if (!checkSyncUserPrivImpl(connection, out)) if (!checkSyncUserPrivImpl(connection, global_settings, out))
throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs " throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs "
"at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' " "at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
"and SELECT PRIVILEGE on MySQL Database." "and SELECT PRIVILEGE on MySQL Database."
@ -167,7 +173,8 @@ bool MaterializeMetadata::checkBinlogFileExists(const mysqlxx::PoolWithFailover:
{std::make_shared<DataTypeUInt64>(), "File_size"} {std::make_shared<DataTypeUInt64>(), "File_size"}
}; };
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, DEFAULT_BLOCK_SIZE, false, true); StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, mysql_input_stream_settings);
while (Block block = input.read()) while (Block block = input.read())
{ {
@ -222,7 +229,7 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
commitMetadata(std::move(fun), persistent_tmp_path, persistent_path); commitMetadata(std::move(fun), persistent_tmp_path, persistent_path);
} }
MaterializeMetadata::MaterializeMetadata(const String & path_) : persistent_path(path_) MaterializeMetadata::MaterializeMetadata(const String & path_, const Settings & settings_) : persistent_path(path_), settings(settings_)
{ {
if (Poco::File(persistent_path).exists()) if (Poco::File(persistent_path).exists())
{ {
@ -244,7 +251,7 @@ void MaterializeMetadata::startReplication(
mysqlxx::PoolWithFailover::Entry & connection, const String & database, mysqlxx::PoolWithFailover::Entry & connection, const String & database,
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables) bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables)
{ {
checkSyncUserPriv(connection); checkSyncUserPriv(connection, settings);
if (checkBinlogFileExists(connection)) if (checkBinlogFileExists(connection))
return; return;
@ -263,7 +270,7 @@ void MaterializeMetadata::startReplication(
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute(); connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
opened_transaction = true; opened_transaction = true;
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database)); need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), settings);
connection->query("UNLOCK TABLES;").execute(); connection->query("UNLOCK TABLES;").execute();
} }
catch (...) catch (...)

View File

@ -10,6 +10,7 @@
#include <Core/MySQL/MySQLReplication.h> #include <Core/MySQL/MySQLReplication.h>
#include <mysqlxx/Connection.h> #include <mysqlxx/Connection.h>
#include <mysqlxx/PoolWithFailover.h> #include <mysqlxx/PoolWithFailover.h>
#include <Interpreters/Context.h>
namespace DB namespace DB
{ {
@ -25,6 +26,7 @@ namespace DB
struct MaterializeMetadata struct MaterializeMetadata
{ {
const String persistent_path; const String persistent_path;
const Settings settings;
String binlog_file; String binlog_file;
UInt64 binlog_position; UInt64 binlog_position;
@ -50,7 +52,7 @@ struct MaterializeMetadata
bool & opened_transaction, bool & opened_transaction,
std::unordered_map<String, String> & need_dumping_tables); std::unordered_map<String, String> & need_dumping_tables);
MaterializeMetadata(const String & path_); MaterializeMetadata(const String & path_, const Settings & settings_);
}; };
} }

View File

@ -90,7 +90,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
} }
} }
static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection) static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const Settings & settings)
{ {
Block variables_header{ Block variables_header{
{std::make_shared<DataTypeString>(), "Variable_name"}, {std::make_shared<DataTypeString>(), "Variable_name"},
@ -104,11 +104,9 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
"OR (Variable_name = 'default_authentication_plugin' AND upper(Value) = 'MYSQL_NATIVE_PASSWORD') " "OR (Variable_name = 'default_authentication_plugin' AND upper(Value) = 'MYSQL_NATIVE_PASSWORD') "
"OR (Variable_name = 'log_bin_use_v1_row_events' AND upper(Value) = 'OFF');"; "OR (Variable_name = 'log_bin_use_v1_row_events' AND upper(Value) = 'OFF');";
MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE, false, true); StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream variables_input(connection, check_query, variables_header, mysql_input_stream_settings);
Block variables_block = variables_input.read();
if (!variables_block || variables_block.rows() != 5)
{
std::unordered_map<String, String> variables_error_message{ std::unordered_map<String, String> variables_error_message{
{"log_bin", "log_bin = 'ON'"}, {"log_bin", "log_bin = 'ON'"},
{"binlog_format", "binlog_format='ROW'"}, {"binlog_format", "binlog_format='ROW'"},
@ -117,6 +115,8 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
{"log_bin_use_v1_row_events", "log_bin_use_v1_row_events='OFF'"} {"log_bin_use_v1_row_events", "log_bin_use_v1_row_events='OFF'"}
}; };
while (Block variables_block = variables_input.read())
{
ColumnPtr variable_name_column = variables_block.getByName("Variable_name").column; ColumnPtr variable_name_column = variables_block.getByName("Variable_name").column;
for (size_t index = 0; index < variables_block.rows(); ++index) for (size_t index = 0; index < variables_block.rows(); ++index)
@ -126,7 +126,10 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
if (error_message_it != variables_error_message.end()) if (error_message_it != variables_error_message.end())
variables_error_message.erase(error_message_it); variables_error_message.erase(error_message_it);
} }
}
if (!variables_error_message.empty())
{
bool first = true; bool first = true;
WriteBufferFromOwnString error_message; WriteBufferFromOwnString error_message;
error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires "; error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires ";
@ -167,7 +170,7 @@ void MaterializeMySQLSyncThread::synchronization()
try try
{ {
MaterializeMetadata metadata( MaterializeMetadata metadata(
DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata"); DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", getContext()->getSettingsRef());
bool need_reconnect = true; bool need_reconnect = true;
Stopwatch watch; Stopwatch watch;
@ -240,7 +243,7 @@ void MaterializeMySQLSyncThread::assertMySQLAvailable()
{ {
try try
{ {
checkMySQLVariables(pool.get()); checkMySQLVariables(pool.get(), getContext()->getSettingsRef());
} }
catch (const mysqlxx::ConnectionFailed & e) catch (const mysqlxx::ConnectionFailed & e)
{ {
@ -326,9 +329,10 @@ static inline void dumpDataForTables(
tryToExecuteQuery(query_prefix + " " + iterator->second, query_context, database_name, comment); /// create table. tryToExecuteQuery(query_prefix + " " + iterator->second, query_context, database_name, comment); /// create table.
auto out = std::make_shared<CountingBlockOutputStream>(getTableOutput(database_name, table_name, query_context)); auto out = std::make_shared<CountingBlockOutputStream>(getTableOutput(database_name, table_name, query_context));
StreamSettings mysql_input_stream_settings(context->getSettingsRef());
MySQLBlockInputStream input( MySQLBlockInputStream input(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
out->getHeader(), DEFAULT_BLOCK_SIZE); out->getHeader(), mysql_input_stream_settings);
Stopwatch watch; Stopwatch watch;
copyData(input, *out, is_cancelled); copyData(input, *out, is_cancelled);
@ -375,7 +379,7 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad
opened_transaction = false; opened_transaction = false;
checkMySQLVariables(connection); checkMySQLVariables(connection, getContext()->getSettingsRef());
std::unordered_map<String, String> need_dumping_tables; std::unordered_map<String, String> need_dumping_tables;
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables); metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables);

View File

@ -4,9 +4,15 @@
#include "DictionarySourceFactory.h" #include "DictionarySourceFactory.h"
#include "DictionaryStructure.h" #include "DictionaryStructure.h"
#include "registerDictionaries.h" #include "registerDictionaries.h"
#include <Core/Settings.h>
#include <Interpreters/Context.h>
namespace DB namespace DB
{ {
[[maybe_unused]]
static const size_t default_num_tries_on_connection_loss = 3;
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int SUPPORT_IS_DISABLED; extern const int SUPPORT_IS_DISABLED;
@ -14,20 +20,20 @@ namespace ErrorCodes
void registerDictionarySourceMysql(DictionarySourceFactory & factory) void registerDictionarySourceMysql(DictionarySourceFactory & factory)
{ {
auto create_table_source = [=](const DictionaryStructure & dict_struct, auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config, [[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, [[maybe_unused]] const std::string & config_prefix,
Block & sample_block, [[maybe_unused]] Block & sample_block,
ContextPtr /* context */, [[maybe_unused]] ContextPtr context,
const std::string & /* default_database */, const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr { bool /* check_config */) -> DictionarySourcePtr {
#if USE_MYSQL #if USE_MYSQL
return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block); StreamSettings mysql_input_stream_settings(context->getSettingsRef()
, config.getBool(config_prefix + ".mysql.close_connection", false) || config.getBool(config_prefix + ".mysql.share_connection", false)
, false
, config.getBool(config_prefix + ".mysql.fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss);
return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block, mysql_input_stream_settings);
#else #else
(void)dict_struct;
(void)config;
(void)config_prefix;
(void)sample_block;
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support."); "Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.");
#endif #endif
@ -45,22 +51,21 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <IO/WriteHelpers.h> # include <IO/WriteHelpers.h>
# include <common/LocalDateTime.h> # include <common/LocalDateTime.h>
# include <common/logger_useful.h> # include <common/logger_useful.h>
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h" # include "readInvalidateQuery.h"
# include <mysqlxx/Exception.h> # include <mysqlxx/Exception.h>
# include <mysqlxx/PoolFactory.h> # include <mysqlxx/PoolFactory.h>
# include <Core/Settings.h>
namespace DB namespace DB
{ {
static const UInt64 max_block_size = 8192;
static const size_t default_num_tries_on_connection_loss = 3;
MySQLDictionarySource::MySQLDictionarySource( MySQLDictionarySource::MySQLDictionarySource(
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, const std::string & config_prefix,
const Block & sample_block_) const Block & sample_block_,
const StreamSettings & settings_)
: log(&Poco::Logger::get("MySQLDictionarySource")) : log(&Poco::Logger::get("MySQLDictionarySource"))
, update_time{std::chrono::system_clock::from_time_t(0)} , update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_} , dict_struct{dict_struct_}
@ -74,10 +79,7 @@ MySQLDictionarySource::MySQLDictionarySource(
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks} , query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()} , load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")} , invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection( , settings(settings_)
config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false))
, max_tries_for_mysql_block_input_stream(
config.getBool(config_prefix + ".fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss)
{ {
} }
@ -98,8 +100,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, last_modification{other.last_modification} , last_modification{other.last_modification}
, invalidate_query{other.invalidate_query} , invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response} , invalidate_query_response{other.invalidate_query_response}
, close_connection{other.close_connection} , settings(other.settings)
, max_tries_for_mysql_block_input_stream{other.max_tries_for_mysql_block_input_stream}
{ {
} }
@ -122,7 +123,7 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query) BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query)
{ {
return std::make_shared<MySQLWithFailoverBlockInputStream>( return std::make_shared<MySQLWithFailoverBlockInputStream>(
pool, query, sample_block, max_block_size, close_connection, false, max_tries_for_mysql_block_input_stream); pool, query, sample_block, settings);
} }
BlockInputStreamPtr MySQLDictionarySource::loadAll() BlockInputStreamPtr MySQLDictionarySource::loadAll()
@ -245,7 +246,7 @@ LocalDateTime MySQLDictionarySource::getLastModification(mysqlxx::Pool::Entry &
++fetched_rows; ++fetched_rows;
} }
if (close_connection && allow_connection_closure) if (settings.auto_close && allow_connection_closure)
{ {
connection.disconnect(); connection.disconnect();
} }
@ -269,7 +270,7 @@ std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request
Block invalidate_sample_block; Block invalidate_sample_block;
ColumnPtr column(ColumnString::create()); ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block")); invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
MySQLBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, 1, close_connection); MySQLBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, settings);
return readInvalidateQuery(block_input_stream); return readInvalidateQuery(block_input_stream);
} }

View File

@ -12,7 +12,7 @@
# include "DictionaryStructure.h" # include "DictionaryStructure.h"
# include "ExternalQueryBuilder.h" # include "ExternalQueryBuilder.h"
# include "IDictionarySource.h" # include "IDictionarySource.h"
# include <Formats/MySQLBlockInputStream.h>
namespace Poco namespace Poco
{ {
@ -35,7 +35,8 @@ public:
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config, const Poco::Util::AbstractConfiguration & config,
const String & config_prefix, const String & config_prefix,
const Block & sample_block_); const Block & sample_block_,
const StreamSettings & settings_);
/// copy-constructor is provided in order to support cloneability /// copy-constructor is provided in order to support cloneability
MySQLDictionarySource(const MySQLDictionarySource & other); MySQLDictionarySource(const MySQLDictionarySource & other);
@ -87,8 +88,7 @@ private:
LocalDateTime last_modification; LocalDateTime last_modification;
std::string invalidate_query; std::string invalidate_query;
mutable std::string invalidate_query_response; mutable std::string invalidate_query_response;
const bool close_connection; const StreamSettings settings;
const size_t max_tries_for_mysql_block_input_stream;
}; };
} }

View File

@ -30,6 +30,15 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
} }
StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_)
: max_read_mysql_row_nums((settings.external_storage_max_read_rows) ? settings.external_storage_max_read_rows : settings.max_block_size)
, max_read_mysql_bytes_size(settings.external_storage_max_read_bytes)
, auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
, default_num_tries_on_connection_loss(max_retry_)
{
}
MySQLBlockInputStream::Connection::Connection( MySQLBlockInputStream::Connection::Connection(
const mysqlxx::PoolWithFailover::Entry & entry_, const mysqlxx::PoolWithFailover::Entry & entry_,
const std::string & query_str) const std::string & query_str)
@ -44,29 +53,19 @@ MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry, const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_, const StreamSettings & settings_)
const bool auto_close_,
const bool fetch_by_name_)
: log(&Poco::Logger::get("MySQLBlockInputStream")) : log(&Poco::Logger::get("MySQLBlockInputStream"))
, connection{std::make_unique<Connection>(entry, query_str)} , connection{std::make_unique<Connection>(entry, query_str)}
, max_block_size{max_block_size_} , settings{std::make_unique<StreamSettings>(settings_)}
, auto_close{auto_close_}
, fetch_by_name(fetch_by_name_)
{ {
description.init(sample_block); description.init(sample_block);
initPositionMappingFromQueryResultStructure(); initPositionMappingFromQueryResultStructure();
} }
/// For descendant MySQLWithFailoverBlockInputStream /// For descendant MySQLWithFailoverBlockInputStream
MySQLBlockInputStream::MySQLBlockInputStream( MySQLBlockInputStream::MySQLBlockInputStream(const Block &sample_block_, const StreamSettings & settings_)
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_,
bool fetch_by_name_)
: log(&Poco::Logger::get("MySQLBlockInputStream")) : log(&Poco::Logger::get("MySQLBlockInputStream"))
, max_block_size(max_block_size_) , settings(std::make_unique<StreamSettings>(settings_))
, auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
{ {
description.init(sample_block_); description.init(sample_block_);
} }
@ -76,14 +75,10 @@ MySQLWithFailoverBlockInputStream::MySQLWithFailoverBlockInputStream(
mysqlxx::PoolWithFailoverPtr pool_, mysqlxx::PoolWithFailoverPtr pool_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block_, const Block & sample_block_,
const UInt64 max_block_size_, const StreamSettings & settings_)
const bool auto_close_, : MySQLBlockInputStream(sample_block_, settings_)
const bool fetch_by_name_, , pool(pool_)
const size_t max_tries_) , query_str(query_str_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_, fetch_by_name_)
, pool(pool_)
, query_str(query_str_)
, max_tries(max_tries_)
{ {
} }
@ -101,12 +96,12 @@ void MySQLWithFailoverBlockInputStream::readPrefix()
} }
catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
{ {
LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, max_tries, ecl.displayText()); LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, settings->default_num_tries_on_connection_loss, ecl.displayText());
} }
if (++count_connect_attempts > max_tries) if (++count_connect_attempts > settings->default_num_tries_on_connection_loss)
{ {
LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, max_tries); LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, settings->default_num_tries_on_connection_loss);
throw; throw;
} }
} }
@ -118,45 +113,57 @@ namespace
{ {
using ValueType = ExternalResultDescription::ValueType; using ValueType = ExternalResultDescription::ValueType;
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value) void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size)
{ {
switch (type) switch (type)
{ {
case ValueType::vtUInt8: case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(value.getUInt()); assert_cast<ColumnUInt8 &>(column).insertValue(value.getUInt());
read_bytes_size += 1;
break; break;
case ValueType::vtUInt16: case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(value.getUInt()); assert_cast<ColumnUInt16 &>(column).insertValue(value.getUInt());
read_bytes_size += 2;
break; break;
case ValueType::vtUInt32: case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(value.getUInt()); assert_cast<ColumnUInt32 &>(column).insertValue(value.getUInt());
read_bytes_size += 4;
break; break;
case ValueType::vtUInt64: case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt()); assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
read_bytes_size += 8;
break; break;
case ValueType::vtInt8: case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(value.getInt()); assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());
read_bytes_size += 1;
break; break;
case ValueType::vtInt16: case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(value.getInt()); assert_cast<ColumnInt16 &>(column).insertValue(value.getInt());
read_bytes_size += 2;
break; break;
case ValueType::vtInt32: case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(value.getInt()); assert_cast<ColumnInt32 &>(column).insertValue(value.getInt());
read_bytes_size += 4;
break; break;
case ValueType::vtInt64: case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(value.getInt()); assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());
read_bytes_size += 8;
break; break;
case ValueType::vtFloat32: case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(value.getDouble()); assert_cast<ColumnFloat32 &>(column).insertValue(value.getDouble());
read_bytes_size += 4;
break; break;
case ValueType::vtFloat64: case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble()); assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble());
read_bytes_size += 8;
break; break;
case ValueType::vtString: case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size()); assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
read_bytes_size += assert_cast<ColumnString &>(column).byteSize();
break; break;
case ValueType::vtDate: case ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum())); assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum()));
read_bytes_size += 2;
break; break;
case ValueType::vtDateTime: case ValueType::vtDateTime:
{ {
@ -166,10 +173,12 @@ namespace
if (time < 0) if (time < 0)
time = 0; time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time); assert_cast<ColumnUInt32 &>(column).insertValue(time);
read_bytes_size += 4;
break; break;
} }
case ValueType::vtUUID: case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size())); assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
read_bytes_size += assert_cast<ColumnUInt128 &>(column).byteSize();
break; break;
case ValueType::vtDateTime64:[[fallthrough]]; case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]]; case ValueType::vtDecimal32: [[fallthrough]];
@ -179,10 +188,12 @@ namespace
{ {
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0); ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type.getDefaultSerialization()->deserializeWholeText(column, buffer, FormatSettings{}); data_type.getDefaultSerialization()->deserializeWholeText(column, buffer, FormatSettings{});
read_bytes_size += column.sizeOfValueIfFixed();
break; break;
} }
case ValueType::vtFixedString: case ValueType::vtFixedString:
assert_cast<ColumnFixedString &>(column).insertData(value.data(), value.size()); assert_cast<ColumnFixedString &>(column).insertData(value.data(), value.size());
read_bytes_size += column.sizeOfValueIfFixed();
break; break;
default: default:
throw Exception("Unsupported value type", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Unsupported value type", ErrorCodes::NOT_IMPLEMENTED);
@ -198,7 +209,7 @@ Block MySQLBlockInputStream::readImpl()
auto row = connection->result.fetch(); auto row = connection->result.fetch();
if (!row) if (!row)
{ {
if (auto_close) if (settings->auto_close)
connection->entry.disconnect(); connection->entry.disconnect();
return {}; return {};
@ -209,6 +220,8 @@ Block MySQLBlockInputStream::readImpl()
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
size_t num_rows = 0; size_t num_rows = 0;
size_t read_bytes_size = 0;
while (row) while (row)
{ {
for (size_t index = 0; index < position_mapping.size(); ++index) for (size_t index = 0; index < position_mapping.size(); ++index)
@ -224,12 +237,12 @@ Block MySQLBlockInputStream::readImpl()
{ {
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]); ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type); const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value); insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size);
column_nullable.getNullMapData().emplace_back(false); column_nullable.getNullMapData().emplace_back(false);
} }
else else
{ {
insertValue(*sample.type, *columns[index], description.types[index].first, value); insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size);
} }
} }
else else
@ -245,7 +258,7 @@ Block MySQLBlockInputStream::readImpl()
} }
++num_rows; ++num_rows;
if (num_rows == max_block_size) if (num_rows == settings->max_read_mysql_row_nums || (settings->max_read_mysql_bytes_size && read_bytes_size >= settings->max_read_mysql_bytes_size))
break; break;
row = connection->result.fetch(); row = connection->result.fetch();
@ -257,7 +270,7 @@ void MySQLBlockInputStream::initPositionMappingFromQueryResultStructure()
{ {
position_mapping.resize(description.sample_block.columns()); position_mapping.resize(description.sample_block.columns());
if (!fetch_by_name) if (!settings->fetch_by_name)
{ {
if (description.sample_block.columns() != connection->result.getNumFields()) if (description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while " throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "

View File

@ -6,11 +6,24 @@
#include <mysqlxx/PoolWithFailover.h> #include <mysqlxx/PoolWithFailover.h>
#include <mysqlxx/Query.h> #include <mysqlxx/Query.h>
#include <Core/ExternalResultDescription.h> #include <Core/ExternalResultDescription.h>
#include <Core/Settings.h>
namespace DB namespace DB
{ {
struct StreamSettings
{
/// Check if setting is enabled, otherwise use common `max_block_size` setting.
size_t max_read_mysql_row_nums;
size_t max_read_mysql_bytes_size;
bool auto_close;
bool fetch_by_name;
size_t default_num_tries_on_connection_loss;
StreamSettings(const Settings & settings, bool auto_close_ = false, bool fetch_by_name_ = false, size_t max_retry_ = 5);
};
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining /// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream : public IBlockInputStream class MySQLBlockInputStream : public IBlockInputStream
{ {
@ -19,16 +32,14 @@ public:
const mysqlxx::PoolWithFailover::Entry & entry, const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_, const StreamSettings & settings_);
const bool auto_close_ = false,
const bool fetch_by_name_ = false);
String getName() const override { return "MySQL"; } String getName() const override { return "MySQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); } Block getHeader() const override { return description.sample_block.cloneEmpty(); }
protected: protected:
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_, bool fetch_by_name_); MySQLBlockInputStream(const Block & sample_block_, const StreamSettings & settings);
Block readImpl() override; Block readImpl() override;
void initPositionMappingFromQueryResultStructure(); void initPositionMappingFromQueryResultStructure();
@ -44,9 +55,7 @@ protected:
Poco::Logger * log; Poco::Logger * log;
std::unique_ptr<Connection> connection; std::unique_ptr<Connection> connection;
const UInt64 max_block_size; const std::unique_ptr<StreamSettings> settings;
const bool auto_close;
const bool fetch_by_name;
std::vector<size_t> position_mapping; std::vector<size_t> position_mapping;
ExternalResultDescription description; ExternalResultDescription description;
}; };
@ -57,23 +66,18 @@ protected:
class MySQLWithFailoverBlockInputStream final : public MySQLBlockInputStream class MySQLWithFailoverBlockInputStream final : public MySQLBlockInputStream
{ {
public: public:
static constexpr inline auto MAX_TRIES_MYSQL_CONNECT = 5;
MySQLWithFailoverBlockInputStream( MySQLWithFailoverBlockInputStream(
mysqlxx::PoolWithFailoverPtr pool_, mysqlxx::PoolWithFailoverPtr pool_,
const std::string & query_str_, const std::string & query_str_,
const Block & sample_block_, const Block & sample_block_,
const UInt64 max_block_size_, const StreamSettings & settings_);
const bool auto_close_ = false,
const bool fetch_by_name_ = false,
const size_t max_tries_ = MAX_TRIES_MYSQL_CONNECT);
private: private:
void readPrefix() override; void readPrefix() override;
mysqlxx::PoolWithFailoverPtr pool; mysqlxx::PoolWithFailoverPtr pool;
std::string query_str; std::string query_str;
size_t max_tries;
}; };
} }

View File

@ -71,7 +71,7 @@ Pipe StorageMySQL::read(
SelectQueryInfo & query_info_, SelectQueryInfo & query_info_,
ContextPtr context_, ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/, QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size_, size_t /*max_block_size*/,
unsigned) unsigned)
{ {
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID()); metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
@ -95,8 +95,10 @@ Pipe StorageMySQL::read(
sample_block.insert({ column_data.type, column_data.name }); sample_block.insert({ column_data.type, column_data.name });
} }
StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false);
return Pipe(std::make_shared<SourceFromInputStream>( return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true))); std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, mysql_input_stream_settings)));
} }

View File

@ -79,7 +79,7 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const
{ {
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level); const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(remote_table_name); const auto columns = tables_and_columns.find(remote_table_name);
if (columns == tables_and_columns.end()) if (columns == tables_and_columns.end())

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine>
<external_storage_max_read_rows>1</external_storage_max_read_rows>
<external_storage_max_read_bytes>0</external_storage_max_read_bytes>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine>
<external_storage_max_read_rows>0</external_storage_max_read_rows>
<external_storage_max_read_bytes>1</external_storage_max_read_bytes>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -842,3 +842,19 @@ def system_tables_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE TABLE system_tables_test.test (id int NOT NULL PRIMARY KEY) ENGINE=InnoDB") mysql_node.query("CREATE TABLE system_tables_test.test (id int NOT NULL PRIMARY KEY) ENGINE=InnoDB")
clickhouse_node.query("CREATE DATABASE system_tables_test ENGINE = MaterializeMySQL('{}:3306', 'system_tables_test', 'root', 'clickhouse')".format(service_name)) clickhouse_node.query("CREATE DATABASE system_tables_test ENGINE = MaterializeMySQL('{}:3306', 'system_tables_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT partition_key, sorting_key, primary_key FROM system.tables WHERE database = 'system_tables_test' AND name = 'test'", "intDiv(id, 4294967)\tid\tid\n") check_query(clickhouse_node, "SELECT partition_key, sorting_key, primary_key FROM system.tables WHERE database = 'system_tables_test' AND name = 'test'", "intDiv(id, 4294967)\tid\tid\n")
def mysql_settings_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database")
mysql_node.query("CREATE TABLE test_database.a (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("INSERT INTO test_database.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.a VALUES(2, 'bar')")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.a FORMAT TSV", "2\n")
assert clickhouse_node.query("SELECT COUNT(DISTINCT blockNumber()) FROM test_database.a FORMAT TSV") == "2\n"
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -16,7 +16,8 @@ cluster = ClickHouseCluster(__file__)
node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False, stay_alive=True) node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False, stay_alive=True)
node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql=False, stay_alive=True) node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql=False, stay_alive=True)
node_disable_bytes_settings = cluster.add_instance('node3', user_configs=["configs/users_disable_bytes_settings.xml"], with_mysql=False, stay_alive=True)
node_disable_rows_settings = cluster.add_instance('node4', user_configs=["configs/users_disable_rows_settings.xml"], with_mysql=False, stay_alive=True)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
@ -289,5 +290,12 @@ def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary]) @pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_tables_table(started_cluster, started_mysql_8_0, clickhouse_node): def test_system_tables_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql8_0") materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_disable_bytes_settings, node_disable_rows_settings])
def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_8_0, "mysql8_0")

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<max_block_size>2</max_block_size>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -9,6 +9,7 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True) node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_mysql_cluster=True) node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_mysql_cluster=True)
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.xml'], with_mysql=True)
create_table_sql_template = """ create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` ( CREATE TABLE `clickhouse`.`{}` (
@ -260,6 +261,25 @@ def test_mysql_distributed(started_cluster):
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n') assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
def test_external_settings(started_cluster):
table_name = 'test_external_settings'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node3.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
node3.query(
"INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(100) ".format(
table_name))
assert node3.query("SELECT count() FROM {}".format(table_name)).rstrip() == '100'
assert node3.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '300'
node3.query("select value from system.settings where name = 'max_block_size' FORMAT TSV") == "2\n"
node3.query("select value from system.settings where name = 'external_storage_max_read_rows' FORMAT TSV") == "0\n"
assert node3.query("SELECT COUNT(DISTINCT blockNumber()) FROM {} FORMAT TSV".format(table_name)) == '50\n'
conn.close()
if __name__ == '__main__': if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster: with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()): for name, instance in list(cluster.instances.items()):