diff --git a/base/mysqlxx/Row.cpp b/base/mysqlxx/Row.cpp index aecec46e519..bfc18689403 100644 --- a/base/mysqlxx/Row.cpp +++ b/base/mysqlxx/Row.cpp @@ -21,4 +21,12 @@ Value Row::operator[] (const char * name) const throw Exception(std::string("Unknown column ") + name); } +std::string Row::getFieldName(size_t n) const +{ + if (res->getNumFields() <= n) + throw Exception(std::string("Unknown column position ") + std::to_string(n)); + + return res->getFields()[n].name; +} + } diff --git a/base/mysqlxx/Row.h b/base/mysqlxx/Row.h index a0b88638546..547ee35e52e 100644 --- a/base/mysqlxx/Row.h +++ b/base/mysqlxx/Row.h @@ -79,6 +79,8 @@ public: */ operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; } + std::string getFieldName(size_t n) const; + private: MYSQL_ROW row{}; ResultBase * res{}; diff --git a/src/Databases/MySQL/MaterializeMetadata.cpp b/src/Databases/MySQL/MaterializeMetadata.cpp index c001955a8ae..591527474ca 100644 --- a/src/Databases/MySQL/MaterializeMetadata.cpp +++ b/src/Databases/MySQL/MaterializeMetadata.cpp @@ -36,7 +36,7 @@ static std::unordered_map fetchTablesCreateQuery( MySQLBlockInputStream show_create_table( connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name), - show_create_table_header, DEFAULT_BLOCK_SIZE); + show_create_table_header, DEFAULT_BLOCK_SIZE, false, true); Block create_query_block = show_create_table.read(); if (!create_query_block || create_query_block.rows() != 1) @@ -77,7 +77,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c {std::make_shared(), "Executed_Gtid_Set"}, }; - MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE); + MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE, false, true); Block master_status = input.read(); if (!master_status || master_status.rows() != 1) @@ -99,7 +99,7 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo }; const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'"; - MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE); + MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE, false, true); while (Block variables_block = variables_input.read()) { @@ -114,23 +114,6 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo } } -static Block getShowMasterLogHeader(const String & mysql_version) -{ - if (startsWith(mysql_version, "5.")) - { - return Block { - {std::make_shared(), "Log_name"}, - {std::make_shared(), "File_size"} - }; - } - - return Block { - {std::make_shared(), "Log_name"}, - {std::make_shared(), "File_size"}, - {std::make_shared(), "Encrypted"} - }; -} - static bool checkSyncUserPrivImpl(mysqlxx::PoolWithFailover::Entry & connection, WriteBuffer & out) { Block sync_user_privs_header @@ -174,9 +157,14 @@ static void checkSyncUserPriv(mysqlxx::PoolWithFailover::Entry & connection) "But the SYNC USER grant query is: " + out.str(), ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR); } -bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const +bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const { - MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", getShowMasterLogHeader(mysql_version), DEFAULT_BLOCK_SIZE); + Block logs_header { + {std::make_shared(), "Log_name"}, + {std::make_shared(), "File_size"} + }; + + MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, DEFAULT_BLOCK_SIZE, false, true); while (Block block = input.read()) { @@ -233,7 +221,7 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio MaterializeMetadata::MaterializeMetadata( mysqlxx::PoolWithFailover::Entry & connection, const String & path_, - const String & database, bool & opened_transaction, const String & mysql_version) + const String & database, bool & opened_transaction) : persistent_path(path_) { checkSyncUserPriv(connection); @@ -251,7 +239,7 @@ MaterializeMetadata::MaterializeMetadata( assertString("\nData Version:\t", in); readIntText(data_version, in); - if (checkBinlogFileExists(connection, mysql_version)) + if (checkBinlogFileExists(connection)) return; } diff --git a/src/Databases/MySQL/MaterializeMetadata.h b/src/Databases/MySQL/MaterializeMetadata.h index 94dfc73e5df..336dc85d899 100644 --- a/src/Databases/MySQL/MaterializeMetadata.h +++ b/src/Databases/MySQL/MaterializeMetadata.h @@ -41,13 +41,13 @@ struct MaterializeMetadata void fetchMasterVariablesValue(const mysqlxx::PoolWithFailover::Entry & connection); - bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const; + bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const; void transaction(const MySQLReplication::Position & position, const std::function & fun); MaterializeMetadata( mysqlxx::PoolWithFailover::Entry & connection, const String & path - , const String & database, bool & opened_transaction, const String & mysql_version); + , const String & database, bool & opened_transaction); }; } diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index b8f13f4ed18..ceea22091b6 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -93,7 +93,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() } } -static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection) +static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection) { Block variables_header{ {std::make_shared(), "Variable_name"}, @@ -106,7 +106,7 @@ static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection "OR (Variable_name = 'binlog_row_image' AND upper(Value) = 'FULL') " "OR (Variable_name = 'default_authentication_plugin' AND upper(Value) = 'MYSQL_NATIVE_PASSWORD');"; - MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE); + MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE, false, true); Block variables_block = variables_input.read(); if (!variables_block || variables_block.rows() != 4) @@ -140,15 +140,6 @@ static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection throw Exception(error_message.str(), ErrorCodes::ILLEGAL_MYSQL_VARIABLE); } - - Block version_header{{std::make_shared(), "version"}}; - MySQLBlockInputStream version_input(connection, "SELECT version() AS version;", version_header, DEFAULT_BLOCK_SIZE); - - Block version_block = version_input.read(); - if (!version_block || version_block.rows() != 1) - throw Exception("LOGICAL ERROR: cannot get mysql version.", ErrorCodes::LOGICAL_ERROR); - - return version_block.getByPosition(0).column->getDataAt(0).toString(); } MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( @@ -160,13 +151,13 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") "; } -void MaterializeMySQLSyncThread::synchronization(const String & mysql_version) +void MaterializeMySQLSyncThread::synchronization() { setThreadName(MYSQL_BACKGROUND_THREAD_NAME); try { - if (std::optional metadata = prepareSynchronized(mysql_version)) + if (std::optional metadata = prepareSynchronized()) { Stopwatch watch; Buffers buffers(database_name); @@ -217,10 +208,8 @@ void MaterializeMySQLSyncThread::startSynchronization() { try { - const auto & mysql_server_version = checkVariableAndGetVersion(pool.get()); - - background_thread_pool = std::make_unique( - [this, mysql_server_version = mysql_server_version]() { synchronization(mysql_server_version); }); + checkMySQLVariables(pool.get()); + background_thread_pool = std::make_unique([this]() { synchronization(); }); } catch (...) { @@ -324,7 +313,7 @@ static inline UInt32 randomNumber() return dist6(rng); } -std::optional MaterializeMySQLSyncThread::prepareSynchronized(const String & mysql_version) +std::optional MaterializeMySQLSyncThread::prepareSynchronized() { bool opened_transaction = false; mysqlxx::PoolWithFailover::Entry connection; @@ -337,8 +326,7 @@ std::optional MaterializeMySQLSyncThread::prepareSynchroniz opened_transaction = false; MaterializeMetadata metadata( - connection, getDatabase(database_name).getMetadataPath() + "/.metadata", - mysql_database_name, opened_transaction, mysql_version); + connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction); if (!metadata.need_dumping_tables.empty()) { diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.h b/src/Databases/MySQL/MaterializeMySQLSyncThread.h index 323ae5beb80..54f148026ad 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.h +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.h @@ -95,11 +95,11 @@ private: BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, const Context & context); }; - void synchronization(const String & mysql_version); + void synchronization(); bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); } - std::optional prepareSynchronized(const String & mysql_version); + std::optional prepareSynchronized(); void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata); diff --git a/src/Formats/MySQLBlockInputStream.cpp b/src/Formats/MySQLBlockInputStream.cpp index 73def337240..d38713879e7 100644 --- a/src/Formats/MySQLBlockInputStream.cpp +++ b/src/Formats/MySQLBlockInputStream.cpp @@ -37,12 +37,14 @@ MySQLBlockInputStream::MySQLBlockInputStream( const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, - const bool auto_close_) + const bool auto_close_, + const bool fetch_by_name_) : connection{std::make_unique(entry, query_str)} , max_block_size{max_block_size_} , auto_close{auto_close_} + , fetch_by_name(fetch_by_name_) { - if (sample_block.columns() != connection->result.getNumFields()) + if (!fetch_by_name && sample_block.columns() != connection->result.getNumFields()) throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while " + toString(sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; @@ -132,27 +134,51 @@ Block MySQLBlockInputStream::readImpl() for (const auto i : ext::range(0, columns.size())) columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); + if (unlikely(position_mapping.size() != description.sample_block.columns())) + { + position_mapping.resize(description.sample_block.columns()); + + if (!fetch_by_name) + { + for (const auto idx : ext::range(0, row.size())) + position_mapping[idx] = idx; + } + else + { + for (const auto idx : ext::range(0, row.size())) + { + const auto & field_name = row.getFieldName(idx); + if (description.sample_block.has(field_name)) + { + const auto & position = description.sample_block.getPositionByName(field_name); + position_mapping[position] = idx; + } + } + } + } + size_t num_rows = 0; while (row) { - for (const auto idx : ext::range(0, row.size())) + for (size_t index = 0; index < position_mapping.size(); ++index) { - const auto value = row[idx]; - const auto & sample = description.sample_block.getByPosition(idx); + const auto value = row[position_mapping[index]]; + const auto & sample = description.sample_block.getByPosition(index); + if (!value.isNull()) { - if (description.types[idx].second) + if (description.types[index].second) { - ColumnNullable & column_nullable = assert_cast(*columns[idx]); + ColumnNullable & column_nullable = assert_cast(*columns[index]); const auto & data_type = assert_cast(*sample.type); - insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[idx].first, value); + insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value); column_nullable.getNullMapData().emplace_back(0); } else - insertValue(*sample.type, *columns[idx], description.types[idx].first, value); + insertValue(*sample.type, *columns[index], description.types[index].first, value); } else - insertDefaultValue(*columns[idx], *sample.column); + insertDefaultValue(*columns[index], *sample.column); } ++num_rows; @@ -167,9 +193,11 @@ Block MySQLBlockInputStream::readImpl() MySQLBlockInputStream::MySQLBlockInputStream( const Block & sample_block_, UInt64 max_block_size_, - bool auto_close_) + bool auto_close_, + bool fetch_by_name_) : max_block_size(max_block_size_) , auto_close(auto_close_) + , fetch_by_name(fetch_by_name_) { description.init(sample_block_); } @@ -179,8 +207,9 @@ MySQLLazyBlockInputStream::MySQLLazyBlockInputStream( const std::string & query_str_, const Block & sample_block_, const UInt64 max_block_size_, - const bool auto_close_) - : MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_) + const bool auto_close_, + const bool fetch_by_name_) + : MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_, fetch_by_name_) , pool(pool_) , query_str(query_str_) { @@ -189,7 +218,7 @@ MySQLLazyBlockInputStream::MySQLLazyBlockInputStream( void MySQLLazyBlockInputStream::readPrefix() { connection = std::make_unique(pool.get(), query_str); - if (description.sample_block.columns() != connection->result.getNumFields()) + if (!fetch_by_name && description.sample_block.columns() != connection->result.getNumFields()) throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while " + toString(description.sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; diff --git a/src/Formats/MySQLBlockInputStream.h b/src/Formats/MySQLBlockInputStream.h index 2eaeb5b8d59..a516e81ac6c 100644 --- a/src/Formats/MySQLBlockInputStream.h +++ b/src/Formats/MySQLBlockInputStream.h @@ -20,14 +20,15 @@ public: const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, - const bool auto_close_ = false); + const bool auto_close_ = false, + const bool fetch_by_name_ = false); String getName() const override { return "MySQL"; } Block getHeader() const override { return description.sample_block.cloneEmpty(); } protected: - MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_); + MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_, bool fetch_by_name_); Block readImpl() override; struct Connection @@ -43,6 +44,8 @@ protected: const UInt64 max_block_size; const bool auto_close; + const bool fetch_by_name; + std::vector position_mapping; ExternalResultDescription description; }; @@ -56,7 +59,8 @@ public: const std::string & query_str_, const Block & sample_block_, const UInt64 max_block_size_, - const bool auto_close_ = false); + const bool auto_close_ = false, + const bool fetch_by_name_ = false); private: void readPrefix() override;