Add MySQL read history data bytes judgment

This commit is contained in:
TCeason 2021-04-10 09:37:56 +08:00
parent ae3dc3dfc0
commit 472c131420
18 changed files with 235 additions and 117 deletions

View File

@ -445,6 +445,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
M(UInt64, max_read_mysql_rows, DEFAULT_BLOCK_SIZE, "Limit maximum rows when MaterializeMySQL flush history data. 0 -> Disable it.", 0) \
M(UInt64, max_read_mysql_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Limit maximum bytes when MaterializeMySQL flush history data. 0 -> Disable it.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -198,7 +198,7 @@ ASTPtr DatabaseConnectionMySQL::getCreateDatabaseQuery() const
void DatabaseConnectionMySQL::fetchTablesIntoLocalCache(ContextPtr local_context) const
{
const auto & tables_with_modification_time = fetchTablesWithModificationTime();
const auto & tables_with_modification_time = fetchTablesWithModificationTime(local_context);
destroyLocalCacheExtraTables(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time, local_context);
@ -252,7 +252,7 @@ void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(
}
}
std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTime() const
std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTime(ContextPtr local_context) const
{
Block tables_status_sample_block
{
@ -268,7 +268,8 @@ std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTim
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
std::map<String, UInt64> tables_with_modification_time;
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE);
StreamSettings mysql_input_stream_settings(local_context->getSettingsRef());
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_status_sample_block, mysql_input_stream_settings);
while (Block block = result.read())
{
@ -292,7 +293,7 @@ DatabaseConnectionMySQL::fetchTablesColumnsList(const std::vector<String> & tabl
mysql_pool,
database_name_in_mysql,
tables_name,
settings.external_table_functions_use_nulls,
settings,
database_settings->mysql_datatypes_support_level);
}

View File

@ -108,7 +108,7 @@ private:
void fetchTablesIntoLocalCache(ContextPtr context) const;
std::map<String, UInt64> fetchTablesWithModificationTime() const;
std::map<String, UInt64> fetchTablesWithModificationTime(ContextPtr local_context) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr context) const;

View File

@ -44,7 +44,7 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::PoolWithFailover & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,
const Settings & settings,
MultiEnum<MySQLDataTypesSupport> type_support)
{
std::map<String, NamesAndTypesList> tables_and_columns;
@ -78,7 +78,8 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
" WHERE TABLE_SCHEMA = " << quote << database_name
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
StreamSettings mysql_input_stream_settings(settings);
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, mysql_input_stream_settings);
while (Block block = result.read())
{
const auto & table_name_col = *block.getByPosition(0).column;
@ -99,7 +100,7 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
convertMySQLDataType(
type_support,
column_type_col[i].safeGet<String>(),
external_table_functions_use_nulls && is_nullable_col[i].safeGet<UInt64>(),
settings.external_table_functions_use_nulls && is_nullable_col[i].safeGet<UInt64>(),
is_unsigned_col[i].safeGet<UInt64>(),
char_max_length_col[i].safeGet<UInt64>(),
precision_col[i].safeGet<UInt64>(),

View File

@ -12,6 +12,7 @@
#include <map>
#include <vector>
#include <Core/Settings.h>
namespace DB
{
@ -20,7 +21,7 @@ std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::PoolWithFailover & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,
const Settings & settings,
MultiEnum<MySQLDataTypesSupport> type_support);
}

View File

@ -24,7 +24,8 @@ namespace ErrorCodes
}
static std::unordered_map<String, String> fetchTablesCreateQuery(
const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name, const std::vector<String> & fetch_tables)
const mysqlxx::PoolWithFailover::Entry & connection, const String & database_name,
const std::vector<String> & fetch_tables, const Settings & global_settings)
{
std::unordered_map<String, String> tables_create_query;
for (const auto & fetch_table_name : fetch_tables)
@ -34,9 +35,10 @@ static std::unordered_map<String, String> fetchTablesCreateQuery(
{std::make_shared<DataTypeString>(), "Create Table"},
};
StreamSettings mysql_input_stream_settings(global_settings, false, true);
MySQLBlockInputStream show_create_table(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name),
show_create_table_header, DEFAULT_BLOCK_SIZE, false, true);
show_create_table_header, mysql_input_stream_settings);
Block create_query_block = show_create_table.read();
if (!create_query_block || create_query_block.rows() != 1)
@ -49,13 +51,14 @@ static std::unordered_map<String, String> fetchTablesCreateQuery(
}
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database)
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database, const Settings & global_settings)
{
Block header{{std::make_shared<DataTypeString>(), "table_name"}};
String query = "SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE != 'VIEW' AND TABLE_SCHEMA = " + quoteString(database);
std::vector<String> tables_in_db;
MySQLBlockInputStream input(connection, query, header, DEFAULT_BLOCK_SIZE);
StreamSettings mysql_input_stream_settings(global_settings);
MySQLBlockInputStream input(connection, query, header, mysql_input_stream_settings);
while (Block block = input.read())
{
@ -77,7 +80,8 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
};
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE, false, true);
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, mysql_input_stream_settings);
Block master_status = input.read();
if (!master_status || master_status.rows() != 1)
@ -99,7 +103,8 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
};
const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'";
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE, false, true);
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, mysql_input_stream_settings);
while (Block variables_block = variables_input.read())
{
@ -114,7 +119,7 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
}
}
static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & connection, WriteBuffer & out)
static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & connection, const Settings & global_settings, WriteBuffer & out)
{
Block sync_user_privs_header
{
@ -122,7 +127,8 @@ static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & conne
};
String grants_query, sub_privs;
MySQLBlockInputStream input(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, DEFAULT_BLOCK_SIZE);
StreamSettings mysql_input_stream_settings(global_settings);
MySQLBlockInputStream input(connection, "SHOW GRANTS FOR CURRENT_USER();", sync_user_privs_header, mysql_input_stream_settings);
while (Block block = input.read())
{
for (size_t index = 0; index < block.rows(); ++index)
@ -146,11 +152,11 @@ static bool checkSyncUserPrivImpl(const mysqlxx::PoolWithFailover::Entry & conne
return false;
}
static void checkSyncUserPriv(const mysqlxx::PoolWithFailover::Entry & connection)
static void checkSyncUserPriv(const mysqlxx::PoolWithFailover::Entry & connection, const Settings & global_settings)
{
WriteBufferFromOwnString out;
if (!checkSyncUserPrivImpl(connection, out))
if (!checkSyncUserPrivImpl(connection, global_settings, out))
throw Exception("MySQL SYNC USER ACCESS ERR: mysql sync user needs "
"at least GLOBAL PRIVILEGES:'RELOAD, REPLICATION SLAVE, REPLICATION CLIENT' "
"and SELECT PRIVILEGE on MySQL Database."
@ -167,7 +173,8 @@ bool MaterializeMetadata::checkBinlogFileExists(const mysqlxx::PoolWithFailover:
{std::make_shared<DataTypeUInt64>(), "File_size"}
};
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, DEFAULT_BLOCK_SIZE, false, true);
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, mysql_input_stream_settings);
while (Block block = input.read())
{
@ -222,7 +229,7 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
commitMetadata(std::move(fun), persistent_tmp_path, persistent_path);
}
MaterializeMetadata::MaterializeMetadata(const String & path_) : persistent_path(path_)
MaterializeMetadata::MaterializeMetadata(const String & path_, const Settings & settings_) : persistent_path(path_), settings(settings_)
{
if (Poco::File(persistent_path).exists())
{
@ -244,7 +251,7 @@ void MaterializeMetadata::startReplication(
mysqlxx::PoolWithFailover::Entry & connection, const String & database,
bool & opened_transaction, std::unordered_map<String, String> & need_dumping_tables)
{
checkSyncUserPriv(connection);
checkSyncUserPriv(connection, settings);
if (checkBinlogFileExists(connection))
return;
@ -263,7 +270,7 @@ void MaterializeMetadata::startReplication(
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
opened_transaction = true;
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database));
need_dumping_tables = fetchTablesCreateQuery(connection, database, fetchTablesInDB(connection, database, settings), settings);
connection->query("UNLOCK TABLES;").execute();
}
catch (...)

View File

@ -10,6 +10,7 @@
#include <Core/MySQL/MySQLReplication.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/PoolWithFailover.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -25,6 +26,7 @@ namespace DB
struct MaterializeMetadata
{
const String persistent_path;
const Settings settings;
String binlog_file;
UInt64 binlog_position;
@ -50,7 +52,7 @@ struct MaterializeMetadata
bool & opened_transaction,
std::unordered_map<String, String> & need_dumping_tables);
MaterializeMetadata(const String & path_);
MaterializeMetadata(const String & path_, const Settings & settings_);
};
}

View File

@ -90,7 +90,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
}
}
static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const Settings & settings)
{
Block variables_header{
{std::make_shared<DataTypeString>(), "Variable_name"},
@ -104,19 +104,19 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
"OR (Variable_name = 'default_authentication_plugin' AND upper(Value) = 'MYSQL_NATIVE_PASSWORD') "
"OR (Variable_name = 'log_bin_use_v1_row_events' AND upper(Value) = 'OFF');";
MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE, false, true);
StreamSettings mysql_input_stream_settings(settings, false, true);
MySQLBlockInputStream variables_input(connection, check_query, variables_header, mysql_input_stream_settings);
Block variables_block = variables_input.read();
if (!variables_block || variables_block.rows() != 5)
std::unordered_map<String, String> variables_error_message{
{"log_bin", "log_bin = 'ON'"},
{"binlog_format", "binlog_format='ROW'"},
{"binlog_row_image", "binlog_row_image='FULL'"},
{"default_authentication_plugin", "default_authentication_plugin='mysql_native_password'"},
{"log_bin_use_v1_row_events", "log_bin_use_v1_row_events='OFF'"}
};
while (Block variables_block = variables_input.read())
{
std::unordered_map<String, String> variables_error_message{
{"log_bin", "log_bin = 'ON'"},
{"binlog_format", "binlog_format='ROW'"},
{"binlog_row_image", "binlog_row_image='FULL'"},
{"default_authentication_plugin", "default_authentication_plugin='mysql_native_password'"},
{"log_bin_use_v1_row_events", "log_bin_use_v1_row_events='OFF'"}
};
ColumnPtr variable_name_column = variables_block.getByName("Variable_name").column;
for (size_t index = 0; index < variables_block.rows(); ++index)
@ -126,7 +126,10 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
if (error_message_it != variables_error_message.end())
variables_error_message.erase(error_message_it);
}
}
if (!variables_error_message.empty())
{
bool first = true;
WriteBufferFromOwnString error_message;
error_message << "Illegal MySQL variables, the MaterializeMySQL engine requires ";
@ -167,7 +170,7 @@ void MaterializeMySQLSyncThread::synchronization()
try
{
MaterializeMetadata metadata(
DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata");
DatabaseCatalog::instance().getDatabase(database_name)->getMetadataPath() + "/.metadata", getContext()->getSettingsRef());
bool need_reconnect = true;
Stopwatch watch;
@ -240,7 +243,7 @@ void MaterializeMySQLSyncThread::assertMySQLAvailable()
{
try
{
checkMySQLVariables(pool.get());
checkMySQLVariables(pool.get(), getContext()->getSettingsRef());
}
catch (const mysqlxx::ConnectionFailed & e)
{
@ -326,9 +329,10 @@ static inline void dumpDataForTables(
tryToExecuteQuery(query_prefix + " " + iterator->second, query_context, database_name, comment); /// create table.
auto out = std::make_shared<CountingBlockOutputStream>(getTableOutput(database_name, table_name, query_context));
StreamSettings mysql_input_stream_settings(context->getSettingsRef());
MySQLBlockInputStream input(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
out->getHeader(), DEFAULT_BLOCK_SIZE);
out->getHeader(), mysql_input_stream_settings);
Stopwatch watch;
copyData(input, *out, is_cancelled);
@ -375,7 +379,7 @@ bool MaterializeMySQLSyncThread::prepareSynchronized(MaterializeMetadata & metad
opened_transaction = false;
checkMySQLVariables(connection);
checkMySQLVariables(connection, getContext()->getSettingsRef());
std::unordered_map<String, String> need_dumping_tables;
metadata.startReplication(connection, mysql_database_name, opened_transaction, need_dumping_tables);

View File

@ -4,9 +4,15 @@
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
#include <Core/Settings.h>
#include <Interpreters/Context.h>
namespace DB
{
[[maybe_unused]]
static const size_t default_num_tries_on_connection_loss = 3;
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
@ -14,20 +20,20 @@ namespace ErrorCodes
void registerDictionarySourceMysql(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block,
ContextPtr /* context */,
auto create_table_source = [=]([[maybe_unused]] const DictionaryStructure & dict_struct,
[[maybe_unused]] const Poco::Util::AbstractConfiguration & config,
[[maybe_unused]] const std::string & config_prefix,
[[maybe_unused]] Block & sample_block,
[[maybe_unused]] ContextPtr context,
const std::string & /* default_database */,
bool /* check_config */) -> DictionarySourcePtr {
#if USE_MYSQL
return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block);
StreamSettings mysql_input_stream_settings(context->getSettingsRef()
, config.getBool(config_prefix + ".mysql.close_connection", false) || config.getBool(config_prefix + ".mysql.share_connection", false)
, false
, config.getBool(config_prefix + ".mysql.fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss);
return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block, mysql_input_stream_settings);
#else
(void)dict_struct;
(void)config;
(void)config_prefix;
(void)sample_block;
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.");
#endif
@ -45,22 +51,21 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <IO/WriteHelpers.h>
# include <common/LocalDateTime.h>
# include <common/logger_useful.h>
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h"
# include <mysqlxx/Exception.h>
# include <mysqlxx/PoolFactory.h>
# include <Core/Settings.h>
namespace DB
{
static const UInt64 max_block_size = 8192;
static const size_t default_num_tries_on_connection_loss = 3;
MySQLDictionarySource::MySQLDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Block & sample_block_)
const Block & sample_block_,
const StreamSettings & settings_)
: log(&Poco::Logger::get("MySQLDictionarySource"))
, update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
@ -74,10 +79,7 @@ MySQLDictionarySource::MySQLDictionarySource(
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection(
config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false))
, max_tries_for_mysql_block_input_stream(
config.getBool(config_prefix + ".fail_on_connection_loss", false) ? 1 : default_num_tries_on_connection_loss)
, settings(settings_)
{
}
@ -98,8 +100,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, last_modification{other.last_modification}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
, close_connection{other.close_connection}
, max_tries_for_mysql_block_input_stream{other.max_tries_for_mysql_block_input_stream}
, settings(other.settings)
{
}
@ -122,7 +123,7 @@ std::string MySQLDictionarySource::getUpdateFieldAndDate()
BlockInputStreamPtr MySQLDictionarySource::loadFromQuery(const String & query)
{
return std::make_shared<MySQLWithFailoverBlockInputStream>(
pool, query, sample_block, max_block_size, close_connection, false, max_tries_for_mysql_block_input_stream);
pool, query, sample_block, settings);
}
BlockInputStreamPtr MySQLDictionarySource::loadAll()
@ -245,7 +246,7 @@ LocalDateTime MySQLDictionarySource::getLastModification(mysqlxx::Pool::Entry &
++fetched_rows;
}
if (close_connection && allow_connection_closure)
if (settings.auto_close && allow_connection_closure)
{
connection.disconnect();
}
@ -269,7 +270,7 @@ std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
MySQLBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, 1, close_connection);
MySQLBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, settings);
return readInvalidateQuery(block_input_stream);
}

View File

@ -12,7 +12,7 @@
# include "DictionaryStructure.h"
# include "ExternalQueryBuilder.h"
# include "IDictionarySource.h"
# include <Formats/MySQLBlockInputStream.h>
namespace Poco
{
@ -35,7 +35,8 @@ public:
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Block & sample_block_);
const Block & sample_block_,
const StreamSettings & settings_);
/// copy-constructor is provided in order to support cloneability
MySQLDictionarySource(const MySQLDictionarySource & other);
@ -87,8 +88,7 @@ private:
LocalDateTime last_modification;
std::string invalidate_query;
mutable std::string invalidate_query_response;
const bool close_connection;
const size_t max_tries_for_mysql_block_input_stream;
const StreamSettings settings;
};
}

View File

@ -30,6 +30,29 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
StreamSettings::StreamSettings(const Settings & settings)
{
max_read_mysql_rows = settings.max_read_mysql_rows;
max_read_bytes_size = settings.max_read_mysql_bytes;
}
StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_)
: auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
{
max_read_mysql_rows = settings.max_read_mysql_rows;
max_read_bytes_size = settings.max_read_mysql_bytes;
}
StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_)
: auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
, default_num_tries_on_connection_loss(max_retry_)
{
max_read_mysql_rows = settings.max_read_mysql_rows;
max_read_bytes_size = settings.max_read_mysql_bytes;
}
MySQLBlockInputStream::Connection::Connection(
const mysqlxx::PoolWithFailover::Entry & entry_,
const std::string & query_str)
@ -44,29 +67,19 @@ MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
const bool auto_close_,
const bool fetch_by_name_)
const StreamSettings & settings_)
: log(&Poco::Logger::get("MySQLBlockInputStream"))
, connection{std::make_unique<Connection>(entry, query_str)}
, max_block_size{max_block_size_}
, auto_close{auto_close_}
, fetch_by_name(fetch_by_name_)
, settings{std::make_unique<StreamSettings>(settings_)}
{
description.init(sample_block);
initPositionMappingFromQueryResultStructure();
}
/// For descendant MySQLWithFailoverBlockInputStream
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_,
bool fetch_by_name_)
MySQLBlockInputStream::MySQLBlockInputStream(const Block &sample_block_, const StreamSettings & settings_)
: log(&Poco::Logger::get("MySQLBlockInputStream"))
, max_block_size(max_block_size_)
, auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
, settings(std::make_unique<StreamSettings>(settings_))
{
description.init(sample_block_);
}
@ -76,14 +89,10 @@ MySQLWithFailoverBlockInputStream::MySQLWithFailoverBlockInputStream(
mysqlxx::PoolWithFailoverPtr pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_,
const bool fetch_by_name_,
const size_t max_tries_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_, fetch_by_name_)
, pool(pool_)
, query_str(query_str_)
, max_tries(max_tries_)
const StreamSettings & settings_)
: MySQLBlockInputStream(sample_block_, settings_)
, pool(pool_)
, query_str(query_str_)
{
}
@ -101,12 +110,12 @@ void MySQLWithFailoverBlockInputStream::readPrefix()
}
catch (const mysqlxx::ConnectionLost & ecl) /// There are two retriable failures: CR_SERVER_GONE_ERROR, CR_SERVER_LOST
{
LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, max_tries, ecl.displayText());
LOG_WARNING(log, "Failed connection ({}/{}). Trying to reconnect... (Info: {})", count_connect_attempts, settings->default_num_tries_on_connection_loss, ecl.displayText());
}
if (++count_connect_attempts > max_tries)
if (++count_connect_attempts > settings->default_num_tries_on_connection_loss)
{
LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, max_tries);
LOG_ERROR(log, "Failed to create connection to MySQL. ({}/{})", count_connect_attempts, settings->default_num_tries_on_connection_loss);
throw;
}
}
@ -118,45 +127,57 @@ namespace
{
using ValueType = ExternalResultDescription::ValueType;
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value)
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size)
{
switch (type)
{
case ValueType::vtUInt8:
assert_cast<ColumnUInt8 &>(column).insertValue(value.getUInt());
read_bytes_size += 1;
break;
case ValueType::vtUInt16:
assert_cast<ColumnUInt16 &>(column).insertValue(value.getUInt());
read_bytes_size += 2;
break;
case ValueType::vtUInt32:
assert_cast<ColumnUInt32 &>(column).insertValue(value.getUInt());
read_bytes_size += 4;
break;
case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
read_bytes_size += 8;
break;
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());
read_bytes_size += 1;
break;
case ValueType::vtInt16:
assert_cast<ColumnInt16 &>(column).insertValue(value.getInt());
read_bytes_size += 2;
break;
case ValueType::vtInt32:
assert_cast<ColumnInt32 &>(column).insertValue(value.getInt());
read_bytes_size += 4;
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());
read_bytes_size += 8;
break;
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(value.getDouble());
read_bytes_size += 4;
break;
case ValueType::vtFloat64:
assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble());
read_bytes_size += 8;
break;
case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
read_bytes_size += assert_cast<ColumnString &>(column).byteSize();
break;
case ValueType::vtDate:
assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum()));
read_bytes_size += 2;
break;
case ValueType::vtDateTime:
{
@ -166,10 +187,12 @@ namespace
if (time < 0)
time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time);
read_bytes_size += 4;
break;
}
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
read_bytes_size += assert_cast<ColumnUInt128 &>(column).byteSize();
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
@ -179,10 +202,12 @@ namespace
{
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type.getDefaultSerialization()->deserializeWholeText(column, buffer, FormatSettings{});
read_bytes_size += column.sizeOfValueIfFixed();
break;
}
case ValueType::vtFixedString:
assert_cast<ColumnFixedString &>(column).insertData(value.data(), value.size());
read_bytes_size += column.sizeOfValueIfFixed();
break;
default:
throw Exception("Unsupported value type", ErrorCodes::NOT_IMPLEMENTED);
@ -198,7 +223,7 @@ Block MySQLBlockInputStream::readImpl()
auto row = connection->result.fetch();
if (!row)
{
if (auto_close)
if (settings->auto_close)
connection->entry.disconnect();
return {};
@ -209,6 +234,8 @@ Block MySQLBlockInputStream::readImpl()
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
size_t num_rows = 0;
size_t read_bytes_size = 0;
while (row)
{
for (size_t index = 0; index < position_mapping.size(); ++index)
@ -224,12 +251,12 @@ Block MySQLBlockInputStream::readImpl()
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size);
column_nullable.getNullMapData().emplace_back(false);
}
else
{
insertValue(*sample.type, *columns[index], description.types[index].first, value);
insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size);
}
}
else
@ -245,7 +272,7 @@ Block MySQLBlockInputStream::readImpl()
}
++num_rows;
if (num_rows == max_block_size)
if (num_rows == settings->max_read_mysql_rows || (settings->max_read_bytes_size && read_bytes_size >= settings->max_read_bytes_size))
break;
row = connection->result.fetch();
@ -257,7 +284,7 @@ void MySQLBlockInputStream::initPositionMappingFromQueryResultStructure()
{
position_mapping.resize(description.sample_block.columns());
if (!fetch_by_name)
if (!settings->fetch_by_name)
{
if (description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "

View File

@ -6,11 +6,25 @@
#include <mysqlxx/PoolWithFailover.h>
#include <mysqlxx/Query.h>
#include <Core/ExternalResultDescription.h>
#include <Core/Settings.h>
namespace DB
{
struct StreamSettings
{
size_t max_read_mysql_rows;
size_t max_read_bytes_size;
bool auto_close = false;
bool fetch_by_name = false;
size_t default_num_tries_on_connection_loss = 5;
StreamSettings(const Settings & settings);
StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_);
StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_);
};
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream : public IBlockInputStream
{
@ -19,16 +33,14 @@ public:
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
const bool auto_close_ = false,
const bool fetch_by_name_ = false);
const StreamSettings & settings_);
String getName() const override { return "MySQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
protected:
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_, bool fetch_by_name_);
MySQLBlockInputStream(const Block & sample_block_, const struct StreamSettings & settings);
Block readImpl() override;
void initPositionMappingFromQueryResultStructure();
@ -44,9 +56,7 @@ protected:
Poco::Logger * log;
std::unique_ptr<Connection> connection;
const UInt64 max_block_size;
const bool auto_close;
const bool fetch_by_name;
const std::unique_ptr<StreamSettings> settings;
std::vector<size_t> position_mapping;
ExternalResultDescription description;
};
@ -57,23 +67,18 @@ protected:
class MySQLWithFailoverBlockInputStream final : public MySQLBlockInputStream
{
public:
static constexpr inline auto MAX_TRIES_MYSQL_CONNECT = 5;
MySQLWithFailoverBlockInputStream(
mysqlxx::PoolWithFailoverPtr pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_ = false,
const bool fetch_by_name_ = false,
const size_t max_tries_ = MAX_TRIES_MYSQL_CONNECT);
const StreamSettings & settings_);
private:
void readPrefix() override;
mysqlxx::PoolWithFailoverPtr pool;
std::string query_str;
size_t max_tries;
};
}

View File

@ -71,7 +71,7 @@ Pipe StorageMySQL::read(
SelectQueryInfo & query_info_,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size_,
size_t /*max_read_mysql_rows*/,
unsigned)
{
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
@ -95,8 +95,9 @@ Pipe StorageMySQL::read(
sample_block.insert({ column_data.type, column_data.name });
}
StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false);
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true)));
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, mysql_input_stream_settings)));
}

View File

@ -79,7 +79,7 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const
{
const auto & settings = context->getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(remote_table_name);
if (columns == tables_and_columns.end())

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine>
<max_read_mysql_rows>1</max_read_mysql_rows>
<max_read_mysql_bytes>0</max_read_mysql_bytes>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine>
<max_read_mysql_rows>0</max_read_mysql_rows>
<max_read_mysql_bytes>1</max_read_mysql_bytes>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -842,3 +842,19 @@ def system_tables_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE TABLE system_tables_test.test (id int NOT NULL PRIMARY KEY) ENGINE=InnoDB")
clickhouse_node.query("CREATE DATABASE system_tables_test ENGINE = MaterializeMySQL('{}:3306', 'system_tables_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT partition_key, sorting_key, primary_key FROM system.tables WHERE database = 'system_tables_test' AND name = 'test'", "intDiv(id, 4294967)\tid\tid\n")
def mysql_settings_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database")
mysql_node.query("CREATE DATABASE test_database")
mysql_node.query("CREATE TABLE test_database.a (id INT(11) NOT NULL PRIMARY KEY, value VARCHAR(255))")
mysql_node.query("INSERT INTO test_database.a VALUES(1, 'foo')")
mysql_node.query("INSERT INTO test_database.a VALUES(2, 'bar')")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT COUNT() FROM test_database.a FORMAT TSV", "2\n")
assert clickhouse_node.query("SELECT COUNT(DISTINCT blockNumber()) FROM test_database.a FORMAT TSV") == "2\n"
clickhouse_node.query("DROP DATABASE test_database")
mysql_node.query("DROP DATABASE test_database")

View File

@ -16,7 +16,8 @@ cluster = ClickHouseCluster(__file__)
node_db_ordinary = cluster.add_instance('node1', user_configs=["configs/users.xml"], with_mysql=False, stay_alive=True)
node_db_atomic = cluster.add_instance('node2', user_configs=["configs/users_db_atomic.xml"], with_mysql=False, stay_alive=True)
node_disable_bytes_settings = cluster.add_instance('node3', user_configs=["configs/users_disable_bytes_settings.xml"], with_mysql=False, stay_alive=True)
node_disable_rows_settings = cluster.add_instance('node4', user_configs=["configs/users_disable_rows_settings.xml"], with_mysql=False, stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
@ -289,5 +290,12 @@ def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_tables_table(started_cluster, started_mysql_8_0, clickhouse_node):
def test_system_tables_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_disable_bytes_settings, node_disable_rows_settings])
def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.mysql_settings_test(clickhouse_node, started_mysql_8_0, "mysql8_0")