#include "StorageMySQL.h" #if USE_MYSQL #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS; } static String backQuoteMySQL(const String & x) { String res(x.size(), '\0'); { WriteBufferFromString wb(res); writeBackQuotedStringMySQL(x, wb); } return res; } StorageMySQL::StorageMySQL( const StorageID & table_id_, mysqlxx::PoolWithFailover && pool_, const std::string & remote_database_name_, const std::string & remote_table_name_, const bool replace_query_, const std::string & on_duplicate_clause_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & comment, ContextPtr context_, const MySQLSettings & mysql_settings_) : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , remote_database_name(remote_database_name_) , remote_table_name(remote_table_name_) , replace_query{replace_query_} , on_duplicate_clause{on_duplicate_clause_} , mysql_settings(mysql_settings_) , pool(std::make_shared(pool_)) , log(&Poco::Logger::get("StorageMySQL (" + table_id_.table_name + ")")) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); } Pipe StorageMySQL::read( const Names & column_names_, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info_, ContextPtr context_, QueryProcessingStage::Enum /*processed_stage*/, size_t /*max_block_size*/, unsigned) { metadata_snapshot->check(column_names_, getVirtuals(), getStorageID()); String query = transformQueryForExternalDatabase( query_info_, metadata_snapshot->getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_); LOG_TRACE(log, "Query: {}", query); Block sample_block; for (const String & column_name : column_names_) { auto column_data = metadata_snapshot->getColumns().getPhysical(column_name); WhichDataType which(column_data.type); /// Convert enum to string. if (which.isEnum()) column_data.type = std::make_shared(); sample_block.insert({ column_data.type, column_data.name }); } StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), mysql_settings.connection_auto_close); return Pipe(std::make_shared(pool, query, sample_block, mysql_input_stream_settings)); } class StorageMySQLSink : public SinkToStorage { public: explicit StorageMySQLSink( const StorageMySQL & storage_, const StorageMetadataPtr & metadata_snapshot_, const std::string & remote_database_name_, const std::string & remote_table_name_, const mysqlxx::PoolWithFailover::Entry & entry_, const size_t & mysql_max_rows_to_insert) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , storage{storage_} , metadata_snapshot{metadata_snapshot_} , remote_database_name{remote_database_name_} , remote_table_name{remote_table_name_} , entry{entry_} , max_batch_rows{mysql_max_rows_to_insert} { } String getName() const override { return "StorageMySQLSink"; } void consume(Chunk chunk) override { auto block = getHeader().cloneWithColumns(chunk.detachColumns()); auto blocks = splitBlocks(block, max_batch_rows); mysqlxx::Transaction trans(entry); try { for (const Block & batch_data : blocks) { writeBlockData(batch_data); } trans.commit(); } catch (...) { trans.rollback(); throw; } } void writeBlockData(const Block & block) { WriteBufferFromOwnString sqlbuf; sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT") << " INTO "; if (!remote_database_name.empty()) sqlbuf << backQuoteMySQL(remote_database_name) << "."; sqlbuf << backQuoteMySQL(remote_table_name); sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES "; auto writer = FormatFactory::instance().getOutputFormat("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.getContext()); writer->write(block); if (!storage.on_duplicate_clause.empty()) sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause; sqlbuf << ";"; auto query = this->entry->query(sqlbuf.str()); query.execute(); } Blocks splitBlocks(const Block & block, const size_t & max_rows) const { /// Avoid Excessive copy when block is small enough if (block.rows() <= max_rows) return Blocks{std::move(block)}; const size_t split_block_size = ceil(block.rows() * 1.0 / max_rows); Blocks split_blocks(split_block_size); for (size_t idx = 0; idx < split_block_size; ++idx) split_blocks[idx] = block.cloneEmpty(); const size_t columns = block.columns(); const size_t rows = block.rows(); size_t offsets = 0; UInt64 limits = max_batch_rows; for (size_t idx = 0; idx < split_block_size; ++idx) { /// For last batch, limits should be the remain size if (idx == split_block_size - 1) limits = rows - offsets; for (size_t col_idx = 0; col_idx < columns; ++col_idx) { split_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); } offsets += max_batch_rows; } return split_blocks; } static std::string dumpNamesWithBackQuote(const Block & block) { WriteBufferFromOwnString out; for (auto it = block.begin(); it != block.end(); ++it) { if (it != block.begin()) out << ", "; out << backQuoteMySQL(it->name); } return out.str(); } private: const StorageMySQL & storage; StorageMetadataPtr metadata_snapshot; std::string remote_database_name; std::string remote_table_name; mysqlxx::PoolWithFailover::Entry entry; size_t max_batch_rows; }; SinkToStoragePtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) { return std::make_shared( *this, metadata_snapshot, remote_database_name, remote_table_name, pool->get(), local_context->getSettingsRef().mysql_max_rows_to_insert); } StorageMySQLConfiguration StorageMySQL::getConfiguration(ASTs engine_args, ContextPtr context_) { StorageMySQLConfiguration configuration; if (auto named_collection = getExternalDataSourceConfiguration(engine_args, context_)) { auto [common_configuration, storage_specific_args] = named_collection.value(); configuration.set(common_configuration); configuration.addresses = {std::make_pair(configuration.host, configuration.port)}; for (const auto & [arg_name, arg_value] : storage_specific_args) { if (arg_name == "replace_query") configuration.replace_query = arg_value->as()->value.safeGet(); else if (arg_name == "on_duplicate_clause") configuration.on_duplicate_clause = arg_value->as()->value.safeGet(); else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected key-value argument." "Got: {}, but expected one of:" "host, port, username, password, database, table, replace_query, on_duplicate_clause.", arg_name); } } else { if (engine_args.size() < 5 || engine_args.size() > 7) throw Exception( "Storage MySQL requires 5-7 parameters: MySQL('host:port' (or 'addresses_pattern'), database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context_); const auto & host_port = engine_args[0]->as().value.safeGet(); size_t max_addresses = context_->getSettingsRef().glob_expansion_max_elements; configuration.addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306); configuration.database = engine_args[1]->as().value.safeGet(); configuration.table = engine_args[2]->as().value.safeGet(); configuration.username = engine_args[3]->as().value.safeGet(); configuration.password = engine_args[4]->as().value.safeGet(); if (engine_args.size() >= 6) configuration.replace_query = engine_args[5]->as().value.safeGet(); if (engine_args.size() == 7) configuration.on_duplicate_clause = engine_args[6]->as().value.safeGet(); } if (configuration.replace_query && !configuration.on_duplicate_clause.empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them"); return configuration; } void registerStorageMySQL(StorageFactory & factory) { factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args) { auto configuration = StorageMySQL::getConfiguration(args.engine_args, args.getLocalContext()); MySQLSettings mysql_settings; /// TODO: move some arguments from the arguments to the SETTINGS. if (args.storage_def->settings) mysql_settings.loadFromQuery(*args.storage_def); if (!mysql_settings.connection_pool_size) throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS); mysqlxx::PoolWithFailover pool( configuration.database, configuration.addresses, configuration.username, configuration.password, MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS, mysql_settings.connection_pool_size, mysql_settings.connection_max_tries, mysql_settings.connection_wait_timeout); return StorageMySQL::create( args.table_id, std::move(pool), configuration.database, configuration.table, configuration.replace_query, configuration.on_duplicate_clause, args.columns, args.constraints, args.comment, args.getContext(), mysql_settings); }, { .supports_settings = true, .source_access_type = AccessType::MYSQL, }); } } #endif