ISSUES-16835 try fix miss match header with MySQL SHOW statement

This commit is contained in:
zhang2014 2020-11-24 23:31:43 +08:00
parent 2fb8717dfd
commit a74235397e
8 changed files with 84 additions and 65 deletions

View File

@ -21,4 +21,12 @@ Value Row::operator[] (const char * name) const
throw Exception(std::string("Unknown column ") + name);
}
std::string Row::getFieldName(size_t n) const
{
if (res->getNumFields() <= n)
throw Exception(std::string("Unknown column position ") + std::to_string(n));
return res->getFields()[n].name;
}
}

View File

@ -79,6 +79,8 @@ public:
*/
operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; }
std::string getFieldName(size_t n) const;
private:
MYSQL_ROW row{};
ResultBase * res{};

View File

@ -36,7 +36,7 @@ static std::unordered_map<String, String> fetchTablesCreateQuery(
MySQLBlockInputStream show_create_table(
connection, "SHOW CREATE TABLE " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(fetch_table_name),
show_create_table_header, DEFAULT_BLOCK_SIZE);
show_create_table_header, DEFAULT_BLOCK_SIZE, false, true);
Block create_query_block = show_create_table.read();
if (!create_query_block || create_query_block.rows() != 1)
@ -77,7 +77,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
};
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE);
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE, false, true);
Block master_status = input.read();
if (!master_status || master_status.rows() != 1)
@ -99,7 +99,7 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
};
const String & fetch_query = "SHOW VARIABLES WHERE Variable_name = 'binlog_checksum'";
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE);
MySQLBlockInputStream variables_input(connection, fetch_query, variables_header, DEFAULT_BLOCK_SIZE, false, true);
while (Block variables_block = variables_input.read())
{
@ -114,23 +114,6 @@ void MaterializeMetadata::fetchMasterVariablesValue(const mysqlxx::PoolWithFailo
}
}
static Block getShowMasterLogHeader(const String & mysql_version)
{
if (startsWith(mysql_version, "5."))
{
return Block {
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"}
};
}
return Block {
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"},
{std::make_shared<DataTypeString>(), "Encrypted"}
};
}
static bool checkSyncUserPrivImpl(mysqlxx::PoolWithFailover::Entry & connection, WriteBuffer & out)
{
Block sync_user_privs_header
@ -174,9 +157,14 @@ static void checkSyncUserPriv(mysqlxx::PoolWithFailover::Entry & connection)
"But the SYNC USER grant query is: " + out.str(), ErrorCodes::SYNC_MYSQL_USER_ACCESS_ERROR);
}
bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const
bool MaterializeMetadata::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const
{
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", getShowMasterLogHeader(mysql_version), DEFAULT_BLOCK_SIZE);
Block logs_header {
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"}
};
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", logs_header, DEFAULT_BLOCK_SIZE, false, true);
while (Block block = input.read())
{
@ -233,7 +221,7 @@ void MaterializeMetadata::transaction(const MySQLReplication::Position & positio
MaterializeMetadata::MaterializeMetadata(
mysqlxx::PoolWithFailover::Entry & connection, const String & path_,
const String & database, bool & opened_transaction, const String & mysql_version)
const String & database, bool & opened_transaction)
: persistent_path(path_)
{
checkSyncUserPriv(connection);
@ -251,7 +239,7 @@ MaterializeMetadata::MaterializeMetadata(
assertString("\nData Version:\t", in);
readIntText(data_version, in);
if (checkBinlogFileExists(connection, mysql_version))
if (checkBinlogFileExists(connection))
return;
}

View File

@ -41,13 +41,13 @@ struct MaterializeMetadata
void fetchMasterVariablesValue(const mysqlxx::PoolWithFailover::Entry & connection);
bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection, const String & mysql_version) const;
bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection) const;
void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun);
MaterializeMetadata(
mysqlxx::PoolWithFailover::Entry & connection, const String & path
, const String & database, bool & opened_transaction, const String & mysql_version);
, const String & database, bool & opened_transaction);
};
}

View File

@ -93,7 +93,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
}
}
static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection)
static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection)
{
Block variables_header{
{std::make_shared<DataTypeString>(), "Variable_name"},
@ -106,7 +106,7 @@ static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection
"OR (Variable_name = 'binlog_row_image' AND upper(Value) = 'FULL') "
"OR (Variable_name = 'default_authentication_plugin' AND upper(Value) = 'MYSQL_NATIVE_PASSWORD');";
MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE);
MySQLBlockInputStream variables_input(connection, check_query, variables_header, DEFAULT_BLOCK_SIZE, false, true);
Block variables_block = variables_input.read();
if (!variables_block || variables_block.rows() != 4)
@ -140,15 +140,6 @@ static String checkVariableAndGetVersion(const mysqlxx::Pool::Entry & connection
throw Exception(error_message.str(), ErrorCodes::ILLEGAL_MYSQL_VARIABLE);
}
Block version_header{{std::make_shared<DataTypeString>(), "version"}};
MySQLBlockInputStream version_input(connection, "SELECT version() AS version;", version_header, DEFAULT_BLOCK_SIZE);
Block version_block = version_input.read();
if (!version_block || version_block.rows() != 1)
throw Exception("LOGICAL ERROR: cannot get mysql version.", ErrorCodes::LOGICAL_ERROR);
return version_block.getByPosition(0).column->getDataAt(0).toString();
}
MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
@ -160,13 +151,13 @@ MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
query_prefix = "EXTERNAL DDL FROM MySQL(" + backQuoteIfNeed(database_name) + ", " + backQuoteIfNeed(mysql_database_name) + ") ";
}
void MaterializeMySQLSyncThread::synchronization(const String & mysql_version)
void MaterializeMySQLSyncThread::synchronization()
{
setThreadName(MYSQL_BACKGROUND_THREAD_NAME);
try
{
if (std::optional<MaterializeMetadata> metadata = prepareSynchronized(mysql_version))
if (std::optional<MaterializeMetadata> metadata = prepareSynchronized())
{
Stopwatch watch;
Buffers buffers(database_name);
@ -217,10 +208,8 @@ void MaterializeMySQLSyncThread::startSynchronization()
{
try
{
const auto & mysql_server_version = checkVariableAndGetVersion(pool.get());
background_thread_pool = std::make_unique<ThreadFromGlobalPool>(
[this, mysql_server_version = mysql_server_version]() { synchronization(mysql_server_version); });
checkMySQLVariables(pool.get());
background_thread_pool = std::make_unique<ThreadFromGlobalPool>([this]() { synchronization(); });
}
catch (...)
{
@ -324,7 +313,7 @@ static inline UInt32 randomNumber()
return dist6(rng);
}
std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchronized(const String & mysql_version)
std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchronized()
{
bool opened_transaction = false;
mysqlxx::PoolWithFailover::Entry connection;
@ -337,8 +326,7 @@ std::optional<MaterializeMetadata> MaterializeMySQLSyncThread::prepareSynchroniz
opened_transaction = false;
MaterializeMetadata metadata(
connection, getDatabase(database_name).getMetadataPath() + "/.metadata",
mysql_database_name, opened_transaction, mysql_version);
connection, getDatabase(database_name).getMetadataPath() + "/.metadata", mysql_database_name, opened_transaction);
if (!metadata.need_dumping_tables.empty())
{

View File

@ -95,11 +95,11 @@ private:
BufferAndSortingColumnsPtr getTableDataBuffer(const String & table, const Context & context);
};
void synchronization(const String & mysql_version);
void synchronization();
bool isCancelled() { return sync_quit.load(std::memory_order_relaxed); }
std::optional<MaterializeMetadata> prepareSynchronized(const String & mysql_version);
std::optional<MaterializeMetadata> prepareSynchronized();
void flushBuffersData(Buffers & buffers, MaterializeMetadata & metadata);

View File

@ -37,12 +37,14 @@ MySQLBlockInputStream::MySQLBlockInputStream(
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
const bool auto_close_)
const bool auto_close_,
const bool fetch_by_name_)
: connection{std::make_unique<Connection>(entry, query_str)}
, max_block_size{max_block_size_}
, auto_close{auto_close_}
, fetch_by_name(fetch_by_name_)
{
if (sample_block.columns() != connection->result.getNumFields())
if (!fetch_by_name && sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
@ -132,27 +134,51 @@ Block MySQLBlockInputStream::readImpl()
for (const auto i : ext::range(0, columns.size()))
columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
size_t num_rows = 0;
while (row)
if (unlikely(position_mapping.size() != description.sample_block.columns()))
{
position_mapping.resize(description.sample_block.columns());
if (!fetch_by_name)
{
for (const auto idx : ext::range(0, row.size()))
position_mapping[idx] = idx;
}
else
{
for (const auto idx : ext::range(0, row.size()))
{
const auto value = row[idx];
const auto & sample = description.sample_block.getByPosition(idx);
const auto & field_name = row.getFieldName(idx);
if (description.sample_block.has(field_name))
{
const auto & position = description.sample_block.getPositionByName(field_name);
position_mapping[position] = idx;
}
}
}
}
size_t num_rows = 0;
while (row)
{
for (size_t index = 0; index < position_mapping.size(); ++index)
{
const auto value = row[position_mapping[index]];
const auto & sample = description.sample_block.getByPosition(index);
if (!value.isNull())
{
if (description.types[idx].second)
if (description.types[index].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[idx].first, value);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*sample.type, *columns[idx], description.types[idx].first, value);
insertValue(*sample.type, *columns[index], description.types[index].first, value);
}
else
insertDefaultValue(*columns[idx], *sample.column);
insertDefaultValue(*columns[index], *sample.column);
}
++num_rows;
@ -167,9 +193,11 @@ Block MySQLBlockInputStream::readImpl()
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_)
bool auto_close_,
bool fetch_by_name_)
: max_block_size(max_block_size_)
, auto_close(auto_close_)
, fetch_by_name(fetch_by_name_)
{
description.init(sample_block_);
}
@ -179,8 +207,9 @@ MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_)
const bool auto_close_,
const bool fetch_by_name_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_, fetch_by_name_)
, pool(pool_)
, query_str(query_str_)
{
@ -189,7 +218,7 @@ MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
void MySQLLazyBlockInputStream::readPrefix()
{
connection = std::make_unique<Connection>(pool.get(), query_str);
if (description.sample_block.columns() != connection->result.getNumFields())
if (!fetch_by_name && description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(description.sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};

View File

@ -20,14 +20,15 @@ public:
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
const bool auto_close_ = false);
const bool auto_close_ = false,
const bool fetch_by_name_ = false);
String getName() const override { return "MySQL"; }
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
protected:
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_);
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_, bool fetch_by_name_);
Block readImpl() override;
struct Connection
@ -43,6 +44,8 @@ protected:
const UInt64 max_block_size;
const bool auto_close;
const bool fetch_by_name;
std::vector<size_t> position_mapping;
ExternalResultDescription description;
};
@ -56,7 +59,8 @@ public:
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_ = false);
const bool auto_close_ = false,
const bool fetch_by_name_ = false);
private:
void readPrefix() override;