#include "StorageMySQL.h" #if USE_MYSQL #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS; } static String backQuoteMySQL(const String & x) { String res(x.size(), '\0'); { WriteBufferFromString wb(res); writeBackQuotedStringMySQL(x, wb); } return res; } StorageMySQL::StorageMySQL( const StorageID & table_id_, mysqlxx::PoolWithFailover && pool_, const std::string & remote_database_name_, const std::string & remote_table_name_, const bool replace_query_, const std::string & on_duplicate_clause_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, const MySQLSettings & mysql_settings_) : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , remote_database_name(remote_database_name_) , remote_table_name(remote_table_name_) , replace_query{replace_query_} , on_duplicate_clause{on_duplicate_clause_} , mysql_settings(mysql_settings_) , pool(std::make_shared(pool_)) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); } Pipe StorageMySQL::read( const Names & column_names_, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info_, ContextPtr context_, QueryProcessingStage::Enum /*processed_stage*/, size_t /*max_block_size*/, unsigned) { metadata_snapshot->check(column_names_, getVirtuals(), getStorageID()); String query = transformQueryForExternalDatabase( query_info_, metadata_snapshot->getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_); Block sample_block; for (const String & column_name : column_names_) { auto column_data = metadata_snapshot->getColumns().getPhysical(column_name); WhichDataType which(column_data.type); /// Convert enum to string. if (which.isEnum()) column_data.type = std::make_shared(); sample_block.insert({ column_data.type, column_data.name }); } StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), mysql_settings.connection_auto_close); return Pipe(std::make_shared(pool, query, sample_block, mysql_input_stream_settings)); } class StorageMySQLSink : public SinkToStorage { public: explicit StorageMySQLSink( const StorageMySQL & storage_, const StorageMetadataPtr & metadata_snapshot_, const std::string & remote_database_name_, const std::string & remote_table_name_, const mysqlxx::PoolWithFailover::Entry & entry_, const size_t & mysql_max_rows_to_insert) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , storage{storage_} , metadata_snapshot{metadata_snapshot_} , remote_database_name{remote_database_name_} , remote_table_name{remote_table_name_} , entry{entry_} , max_batch_rows{mysql_max_rows_to_insert} { } String getName() const override { return "StorageMySQLSink"; } void consume(Chunk chunk) override { auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns()); auto blocks = splitBlocks(block, max_batch_rows); mysqlxx::Transaction trans(entry); try { for (const Block & batch_data : blocks) { writeBlockData(batch_data); } trans.commit(); } catch (...) { trans.rollback(); throw; } } void writeBlockData(const Block & block) { WriteBufferFromOwnString sqlbuf; sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT") << " INTO "; if (!remote_database_name.empty()) sqlbuf << backQuoteMySQL(remote_database_name) << "."; sqlbuf << backQuoteMySQL(remote_table_name); sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES "; auto writer = FormatFactory::instance().getOutputStream("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.getContext()); writer->write(block); if (!storage.on_duplicate_clause.empty()) sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause; sqlbuf << ";"; auto query = this->entry->query(sqlbuf.str()); query.execute(); } Blocks splitBlocks(const Block & block, const size_t & max_rows) const { /// Avoid Excessive copy when block is small enough if (block.rows() <= max_rows) return Blocks{std::move(block)}; const size_t split_block_size = ceil(block.rows() * 1.0 / max_rows); Blocks split_blocks(split_block_size); for (size_t idx = 0; idx < split_block_size; ++idx) split_blocks[idx] = block.cloneEmpty(); const size_t columns = block.columns(); const size_t rows = block.rows(); size_t offsets = 0; UInt64 limits = max_batch_rows; for (size_t idx = 0; idx < split_block_size; ++idx) { /// For last batch, limits should be the remain size if (idx == split_block_size - 1) limits = rows - offsets; for (size_t col_idx = 0; col_idx < columns; ++col_idx) { split_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); } offsets += max_batch_rows; } return split_blocks; } static std::string dumpNamesWithBackQuote(const Block & block) { WriteBufferFromOwnString out; for (auto it = block.begin(); it != block.end(); ++it) { if (it != block.begin()) out << ", "; out << backQuoteMySQL(it->name); } return out.str(); } private: const StorageMySQL & storage; StorageMetadataPtr metadata_snapshot; std::string remote_database_name; std::string remote_table_name; mysqlxx::PoolWithFailover::Entry entry; size_t max_batch_rows; }; SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) { return std::make_shared( *this, metadata_snapshot, remote_database_name, remote_table_name, pool->get(), local_context->getSettingsRef().mysql_max_rows_to_insert); } void registerStorageMySQL(StorageFactory & factory) { factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; if (engine_args.size() < 5 || engine_args.size() > 7) throw Exception( "Storage MySQL requires 5-7 parameters: MySQL('host:port' (or 'addresses_pattern'), database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext()); /// 3306 is the default MySQL port. const String & host_port = engine_args[0]->as().value.safeGet(); const String & remote_database = engine_args[1]->as().value.safeGet(); const String & remote_table = engine_args[2]->as().value.safeGet(); const String & username = engine_args[3]->as().value.safeGet(); const String & password = engine_args[4]->as().value.safeGet(); size_t max_addresses = args.getContext()->getSettingsRef().glob_expansion_max_elements; /// TODO: move some arguments from the arguments to the SETTINGS. MySQLSettings mysql_settings; if (args.storage_def->settings) { mysql_settings.loadFromQuery(*args.storage_def); } if (!mysql_settings.connection_pool_size) throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS); auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); mysqlxx::PoolWithFailover pool(remote_database, addresses, username, password, MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, mysql_settings.connection_pool_size, mysql_settings.connection_max_tries); bool replace_query = false; std::string on_duplicate_clause; if (engine_args.size() >= 6) replace_query = engine_args[5]->as().value.safeGet(); if (engine_args.size() == 7) on_duplicate_clause = engine_args[6]->as().value.safeGet(); if (replace_query && !on_duplicate_clause.empty()) throw Exception( "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them", ErrorCodes::BAD_ARGUMENTS); return StorageMySQL::create( args.table_id, std::move(pool), remote_database, remote_table, replace_query, on_duplicate_clause, args.columns, args.constraints, args.comment, args.getContext(), mysql_settings); }, { .supports_settings = true, .source_access_type = AccessType::MYSQL, }); } } #endif