From 5d91b4f2fdc962a78f6fb2ca706a71140d265cd9 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Mon, 14 May 2018 19:00:22 +0800 Subject: [PATCH] fix some bugs, fix some code styles --- dbms/src/Storages/StorageMySQL.cpp | 167 ++++++++++-------- dbms/src/Storages/StorageMySQL.h | 2 +- .../src/TableFunctions/TableFunctionMySQL.cpp | 13 +- 3 files changed, 104 insertions(+), 78 deletions(-) diff --git a/dbms/src/Storages/StorageMySQL.cpp b/dbms/src/Storages/StorageMySQL.cpp index 0fb6aa8eca0..cd9e0295de0 100644 --- a/dbms/src/Storages/StorageMySQL.cpp +++ b/dbms/src/Storages/StorageMySQL.cpp @@ -25,6 +25,7 @@ namespace DB namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; } @@ -32,7 +33,7 @@ StorageMySQL::StorageMySQL(const std::string & name, mysqlxx::Pool && pool, const std::string & remote_database_name, const std::string & remote_table_name, - const bool & replace_query, + const bool replace_query, const std::string & on_duplicate_clause, const ColumnsDescription & columns_, const Context & context) @@ -74,12 +75,11 @@ BlockInputStreams StorageMySQL::read( class StorageMySQLBlockOutputStream : public IBlockOutputStream { public: - explicit StorageMySQLBlockOutputStream( - const StorageMySQL & storage, - const std::string & remote_database_name, - const std::string & remote_table_name , - const mysqlxx::PoolWithFailover::Entry & entry, - const size_t & mysql_max_rows_to_insert) + explicit StorageMySQLBlockOutputStream(const StorageMySQL & storage, + const std::string & remote_database_name, + const std::string & remote_table_name , + const mysqlxx::PoolWithFailover::Entry & entry, + const size_t & mysql_max_rows_to_insert) : storage{storage} , remote_database_name{remote_database_name} , remote_table_name{remote_table_name} @@ -88,81 +88,95 @@ public: { } - Block getHeader() const override { return storage.getSampleBlock(); } + Block getHeader() const override { return storage.getSampleBlock(); } - void write(const Block & block) override - { - auto blocks = splitBlocks(block, max_batch_rows); - mysqlxx::Transaction trans(entry); - try - { - for(const Block & batch_data : blocks) - { - writeBlockData(batch_data); - } - trans.commit(); - } - catch(...) - { - trans.rollback(); - throw; - } - } + void write(const Block & block) override + { + auto blocks = splitBlocks(block, max_batch_rows); + mysqlxx::Transaction trans(entry); + try + { + for (const Block & batch_data : blocks) + { + writeBlockData(batch_data); + } + trans.commit(); + } + catch(...) + { + trans.rollback(); + throw; + } + } - void writeBlockData(const Block & block) - { - WriteBufferFromOwnString sqlbuf; - // If both `replace_query` and `on_duplicate_clause` are specified, only use the `on_duplicate_clause`. - sqlbuf << ( (storage.replace_query && storage.on_duplicate_clause.empty()) ? "REPLACE" : "INSERT"); - sqlbuf << " INTO `" << remote_database_name << "`.`" << remote_table_name << "`" - << " ( " << block.dumpNames() << " ) VALUES "; + void writeBlockData(const Block & block) + { + WriteBufferFromOwnString sqlbuf; + sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT") << " INTO "; + sqlbuf << backQuoteIfNeed(remote_database_name) << "." << backQuoteIfNeed(remote_table_name); + sqlbuf << " ( " << dumpNamesWithBackQuote(block) << " ) VALUES "; - auto writer = FormatFactory().getOutput("Values", sqlbuf, storage.getSampleBlock(), storage.context); - writer->write(block); - if(!storage.on_duplicate_clause.empty()) - sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause; - sqlbuf << ";"; + auto writer = FormatFactory().getOutput("Values", sqlbuf, storage.getSampleBlock(), storage.context); + writer->write(block); - auto query = this->entry->query(sqlbuf.str()); - query.execute(); - } + if (!storage.on_duplicate_clause.empty()) + sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause; - Blocks splitBlocks(const Block & block, const size_t & max_rows) const - { - // Avoid Excessive copy when block is small enough - if(block.rows() <= max_rows) return Blocks{std::move(block)}; + sqlbuf << ";"; - const size_t splited_block_size = ceil(block.rows() * 1.0 / max_rows); - Blocks splitted_blocks(splited_block_size); + auto query = this->entry->query(sqlbuf.str()); + query.execute(); + } - for (size_t idx = 0; idx < splited_block_size; ++idx) - splitted_blocks[idx] = block.cloneEmpty(); + Blocks splitBlocks(const Block & block, const size_t & max_rows) const + { + /// Avoid Excessive copy when block is small enough + if (block.rows() <= max_rows) + return Blocks{std::move(block)}; - const size_t columns = block.columns(); - const size_t rows = block.rows(); - size_t offsets = 0; - size_t limits = max_batch_rows; - for (size_t idx = 0; idx < splited_block_size; ++idx) - { - // For last batch, limits should be the remain size - if(idx == splited_block_size - 1) limits = rows - offsets; - for(size_t col_idx = 0; col_idx < columns; ++col_idx) - { - splitted_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); - } - offsets += max_batch_rows; - } + const size_t splited_block_size = ceil(block.rows() * 1.0 / max_rows); + Blocks splitted_blocks(splited_block_size); - return splitted_blocks; - } + for (size_t idx = 0; idx < splited_block_size; ++idx) + splitted_blocks[idx] = block.cloneEmpty(); + + const size_t columns = block.columns(); + const size_t rows = block.rows(); + size_t offsets = 0; + size_t limits = max_batch_rows; + for (size_t idx = 0; idx < splited_block_size; ++idx) + { + /// For last batch, limits should be the remain size + if (idx == splited_block_size - 1) limits = rows - offsets; + for (size_t col_idx = 0; col_idx < columns; ++col_idx) + { + splitted_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); + } + offsets += max_batch_rows; + } + + return splitted_blocks; + } + + std::string dumpNamesWithBackQuote(const Block & block) const + { + WriteBufferFromOwnString out; + for (auto it = block.begin(); it != block.end(); ++it) + { + if (it != block.begin()) + out << ", "; + out << backQuoteIfNeed(it->name); + } + return out.str(); + } private: - const StorageMySQL & storage; - std::string remote_database_name; - std::string remote_table_name; - mysqlxx::PoolWithFailover::Entry entry; - size_t max_batch_rows; + const StorageMySQL & storage; + std::string remote_database_name; + std::string remote_table_name; + mysqlxx::PoolWithFailover::Entry entry; + size_t max_batch_rows; }; @@ -180,7 +194,7 @@ void registerStorageMySQL(StorageFactory & factory) if (engine_args.size() < 5 || engine_args.size() > 7) throw Exception( - "Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause' ]).", + "Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (size_t i = 0; i < engine_args.size(); ++i) @@ -198,8 +212,15 @@ void registerStorageMySQL(StorageFactory & factory) bool replace_query = false; std::string on_duplicate_clause; - if(engine_args.size() >= 6) replace_query = static_cast(*engine_args[5]).value.safeGet() > 0; - if(engine_args.size() == 7) on_duplicate_clause = static_cast(*engine_args[6]).value.safeGet(); + if (engine_args.size() >= 6) + replace_query = static_cast(*engine_args[5]).value.safeGet() > 0; + if (engine_args.size() == 7) + on_duplicate_clause = static_cast(*engine_args[6]).value.safeGet(); + + if (replace_query && !on_duplicate_clause.empty()) + throw Exception( + "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them", + ErrorCodes::BAD_ARGUMENTS); return StorageMySQL::create( args.table_name, diff --git a/dbms/src/Storages/StorageMySQL.h b/dbms/src/Storages/StorageMySQL.h index eb9f91fb425..52197d54ae0 100644 --- a/dbms/src/Storages/StorageMySQL.h +++ b/dbms/src/Storages/StorageMySQL.h @@ -24,7 +24,7 @@ public: mysqlxx::Pool && pool, const std::string & remote_database_name, const std::string & remote_table_name, - const bool & replace_query, + const bool replace_query, const std::string & on_duplicate_clause, const ColumnsDescription & columns, const Context & context); diff --git a/dbms/src/TableFunctions/TableFunctionMySQL.cpp b/dbms/src/TableFunctions/TableFunctionMySQL.cpp index 04f61394c63..3201270d0bf 100644 --- a/dbms/src/TableFunctions/TableFunctionMySQL.cpp +++ b/dbms/src/TableFunctions/TableFunctionMySQL.cpp @@ -29,8 +29,8 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS;; } @@ -90,7 +90,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co ASTs & args = typeid_cast(*args_func.arguments).children; if (args.size() < 5 || args.size() > 7) - throw Exception("Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause' ]).", + throw Exception("Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (size_t i = 0; i < args.size(); ++i) @@ -104,11 +104,16 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co bool replace_query = false; std::string on_duplicate_clause; - if(args.size() >= 6) + if (args.size() >= 6) replace_query = static_cast(*args[5]).value.safeGet() > 0; - if(args.size() == 7) + if (args.size() == 7) on_duplicate_clause = static_cast(*args[6]).value.safeGet(); + if (replace_query && !on_duplicate_clause.empty()) + throw Exception( + "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them", + ErrorCodes::BAD_ARGUMENTS); + /// 3306 is the default MySQL port number auto parsed_host_port = parseAddress(host_port, 3306);