From 2311cda3349e0638901b57e89ff050344b6ab03b Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 10 Aug 2020 22:32:05 +0800 Subject: [PATCH] ISSUES-4006 convert _sign & _version to materialized column --- .../AddingVersionsBlockOutputStream.cpp | 57 -------------- .../AddingVersionsBlockOutputStream.h | 36 --------- src/Databases/MySQL/MaterializeMetadata.cpp | 2 +- src/Databases/MySQL/MaterializeMetadata.h | 2 +- .../MySQL/MaterializeMySQLSyncThread.cpp | 77 +++++++++++++------ .../MySQL/InterpretersMySQLDDLQuery.cpp | 16 +++- src/Storages/StorageMaterializeMySQL.cpp | 23 +----- src/Storages/StorageMaterializeMySQL.h | 1 - 8 files changed, 73 insertions(+), 141 deletions(-) delete mode 100644 src/DataStreams/AddingVersionsBlockOutputStream.cpp delete mode 100644 src/DataStreams/AddingVersionsBlockOutputStream.h diff --git a/src/DataStreams/AddingVersionsBlockOutputStream.cpp b/src/DataStreams/AddingVersionsBlockOutputStream.cpp deleted file mode 100644 index b94c858b38e..00000000000 --- a/src/DataStreams/AddingVersionsBlockOutputStream.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include - -#include -#include - -namespace DB -{ - -void AddingVersionsBlockOutputStream::writePrefix() -{ - output->writePrefix(); -} - -void AddingVersionsBlockOutputStream::writeSuffix() -{ - output->writeSuffix(); -} - -void AddingVersionsBlockOutputStream::flush() -{ - output->flush(); -} - -void AddingVersionsBlockOutputStream::write(const Block & block) -{ - Block res; - size_t rows = block.rows(); - - for (size_t index = 0; index < block.columns(); ++index) - res.insert(block.getByPosition(index)); - - DataTypePtr sign_type = std::make_shared(); - DataTypePtr version_type = std::make_shared(); - - ColumnPtr sign_column = sign_type->createColumnConst(rows, Field(Int8(1)))->convertToFullColumnIfConst(); - ColumnPtr version_column = version_type->createColumnConst(rows, Field(UInt64(++version)))->convertToFullColumnIfConst(); - - Block header = output->getHeader(); - res.insert({sign_column, sign_type, header.getByPosition(header.columns() - 2).name}); - res.insert({version_column, version_type, header.getByPosition(header.columns() - 1).name}); - output->write(res); - - written_rows += block.rows(); - written_bytes += block.bytes(); -} -Block AddingVersionsBlockOutputStream::getHeader() const -{ - Block res; - Block header = output->getHeader(); - - for (size_t index = 0; index < header.columns() - 2; ++index) - res.insert(header.getByPosition(index)); - - return res; -} - -} diff --git a/src/DataStreams/AddingVersionsBlockOutputStream.h b/src/DataStreams/AddingVersionsBlockOutputStream.h deleted file mode 100644 index 4edc0ac13c7..00000000000 --- a/src/DataStreams/AddingVersionsBlockOutputStream.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -class AddingVersionsBlockOutputStream : public IBlockOutputStream -{ -public: - AddingVersionsBlockOutputStream(size_t & version_, const BlockOutputStreamPtr & output_) - : version(version_), output(output_) - { - } - - Block getHeader() const override; - - void write(const Block & block) override; - - void flush() override; - - void writePrefix() override; - void writeSuffix() override; - -private: - size_t & version; - BlockOutputStreamPtr output; - - std::atomic written_rows{0}, written_bytes{0}; - -public: - size_t getWrittenRows() { return written_rows; } - size_t getWrittenBytes() { return written_bytes; } -}; - -} diff --git a/src/Databases/MySQL/MaterializeMetadata.cpp b/src/Databases/MySQL/MaterializeMetadata.cpp index 80484c130d3..3818761cc11 100644 --- a/src/Databases/MySQL/MaterializeMetadata.cpp +++ b/src/Databases/MySQL/MaterializeMetadata.cpp @@ -80,7 +80,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c if (!master_status || master_status.rows() != 1) throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR); - version = 0; + version = 1; binlog_file = (*master_status.getByPosition(0).column)[0].safeGet(); binlog_position = (*master_status.getByPosition(1).column)[0].safeGet(); binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet(); diff --git a/src/Databases/MySQL/MaterializeMetadata.h b/src/Databases/MySQL/MaterializeMetadata.h index 045dab5b84c..3fcae3dcf1c 100644 --- a/src/Databases/MySQL/MaterializeMetadata.h +++ b/src/Databases/MySQL/MaterializeMetadata.h @@ -32,7 +32,7 @@ struct MaterializeMetadata String binlog_ignore_db; String executed_gtid_set; - size_t version = 0; + size_t version = 1; std::unordered_map need_dumping_tables; void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection); diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index b53d84189ab..ebe93f527d3 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -9,7 +9,7 @@ # include # include # include -# include +# include # include # include # include @@ -37,20 +37,28 @@ namespace ErrorCodes static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync"; -static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & database, const String & comment) +static Context createQueryContext(const Context & global_context) +{ + Settings new_query_settings = global_context.getSettings(); + new_query_settings.insert_allow_materialized_columns = true; + + Context query_context(global_context); + query_context.setSettings(new_query_settings); + CurrentThread::QueryScope query_scope(query_context); + + query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + query_context.setCurrentQueryId(""); // generate random query_id + return query_context; +} + +static BlockIO tryToExecuteQuery(const String & query_to_execute, Context & query_context, const String & database, const String & comment) { try { - Context context(context_); - CurrentThread::QueryScope query_scope(context); - if (!database.empty()) - context.setCurrentDatabase(database); + query_context.setCurrentDatabase(database); - context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; - context.setCurrentQueryId(""); // generate random query_id - - return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); + return executeQuery("/*" + comment + "*/ " + query_to_execute, query_context, true); } catch (...) { @@ -216,16 +224,35 @@ static inline void cleanOutdatedTables(const String & database_name, const Conte for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next()) { + Context query_context = createQueryContext(context); String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data"; String table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name()); - tryToExecuteQuery(" DROP TABLE " + table_name, context, database_name, comment); + tryToExecuteQuery(" DROP TABLE " + table_name, query_context, database_name, comment); } } -static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, const Context & context) +static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, Context & query_context, bool insert_materialized = false) { + const StoragePtr & storage = DatabaseCatalog::instance().getTable(StorageID(database_name, table_name), query_context); + + std::stringstream insert_columns_str; + const StorageInMemoryMetadata & storage_metadata = storage->getInMemoryMetadata(); + const ColumnsDescription & storage_columns = storage_metadata.getColumns(); + const NamesAndTypesList & insert_columns_names = insert_materialized ? storage_columns.getAllPhysical() : storage_columns.getOrdinary(); + + + for (auto iterator = insert_columns_names.begin(); iterator != insert_columns_names.end(); ++iterator) + { + if (iterator != insert_columns_names.begin()) + insert_columns_str << ", "; + + insert_columns_str << iterator->name; + } + + String comment = "Materialize MySQL step 1: execute dump data"; - BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, comment); + BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + "(" + insert_columns_str.str() + ")" + " VALUES", + query_context, database_name, comment); if (!res.out) throw Exception("LOGICAL ERROR: It is a bug.", ErrorCodes::LOGICAL_ERROR); @@ -242,21 +269,23 @@ static inline void dumpDataForTables( for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator) { const auto & table_name = iterator->first; + Context query_context = createQueryContext(context); String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data"; - tryToExecuteQuery(query_prefix + " " + iterator->second, context, database_name, comment); /// create table. + tryToExecuteQuery(query_prefix + " " + iterator->second, query_context, database_name, comment); /// create table. - auto out = std::make_shared(master_info.version, getTableOutput(database_name, table_name, context)); + auto out = std::make_shared(getTableOutput(database_name, table_name, query_context)); MySQLBlockInputStream input( connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE); Stopwatch watch; copyData(input, *out, is_cancelled); + const Progress & progress = out->getProgress(); LOG_INFO(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database_name + ")"), - "Materialize MySQL step 1: dump {}, {} rows, {} in {} sec., {} rows/sec., {}/sec.", - table_name, out->getWrittenRows(), ReadableSize(out->getWrittenBytes()), watch.elapsedSeconds(), - static_cast(out->getWrittenRows() / watch.elapsedSeconds()), - ReadableSize(out->getWrittenRows() / watch.elapsedSeconds())); + "Materialize MySQL step 1: dump {}, {} rows, {} in {} sec., {} rows/sec., {}/sec." + , table_name, formatReadableQuantity(progress.written_rows), formatReadableSizeWithBinarySuffix(progress.written_bytes) + , watch.elapsedSeconds(), formatReadableQuantity(static_cast(progress.written_rows / watch.elapsedSeconds())) + , formatReadableSizeWithBinarySuffix(static_cast(progress.written_bytes / watch.elapsedSeconds()))); } } @@ -564,9 +593,10 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr try { + Context query_context = createQueryContext(global_context); String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data"; String event_database = query_event.schema == mysql_database_name ? database_name : ""; - tryToExecuteQuery(query_prefix + query_event.query, global_context, event_database, comment); + tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment); } catch (Exception & exception) { @@ -616,8 +646,9 @@ void MaterializeMySQLSyncThread::Buffers::commit(const Context & context) { for (auto & table_name_and_buffer : data) { + Context query_context = createQueryContext(context); OneBlockInputStream input(table_name_and_buffer.second->first); - BlockOutputStreamPtr out = getTableOutput(database, table_name_and_buffer.first, context); + BlockOutputStreamPtr out = getTableOutput(database, table_name_and_buffer.first, query_context, true); copyData(input, *out); } @@ -640,11 +671,11 @@ MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQL const auto & iterator = data.find(table_name); if (iterator == data.end()) { - StoragePtr storage = getDatabase(database).tryGetTable(table_name, context); + StoragePtr storage = DatabaseCatalog::instance().getTable(StorageID(database, table_name), context); const StorageInMemoryMetadata & metadata = storage->getInMemoryMetadata(); BufferAndSortingColumnsPtr & buffer_and_soring_columns = data.try_emplace( - table_name, std::make_shared(metadata.getSampleBlockNonMaterialized(), std::vector{})).first->second; + table_name, std::make_shared(metadata.getSampleBlock(), std::vector{})).first->second; Names required_for_sorting_key = metadata.getColumnsRequiredForSortingKey(); diff --git a/src/Interpreters/MySQL/InterpretersMySQLDDLQuery.cpp b/src/Interpreters/MySQL/InterpretersMySQLDDLQuery.cpp index 31eb3324b94..7bf659e1939 100644 --- a/src/Interpreters/MySQL/InterpretersMySQLDDLQuery.cpp +++ b/src/Interpreters/MySQL/InterpretersMySQLDDLQuery.cpp @@ -339,12 +339,24 @@ ASTs InterpreterCreateImpl::getRewrittenQueries( auto columns = std::make_shared(); + const auto & create_materialized_column_declaration = [&](const String & name, const String & type, const auto & default_value) + { + const auto column_declaration = std::make_shared(); + column_declaration->name = name; + column_declaration->type = makeASTFunction(type); + column_declaration->default_specifier = "MATERIALIZED"; + column_declaration->default_expression = std::make_shared(default_value); + column_declaration->children.emplace_back(column_declaration->type); + column_declaration->children.emplace_back(column_declaration->default_expression); + return column_declaration; + }; + /// Add _sign and _version column. String sign_column_name = getUniqueColumnName(columns_name_and_type, "_sign"); String version_column_name = getUniqueColumnName(columns_name_and_type, "_version"); - columns_name_and_type.emplace_back(NameAndTypePair{sign_column_name, std::make_shared()}); - columns_name_and_type.emplace_back(NameAndTypePair{version_column_name, std::make_shared()}); columns->set(columns->columns, InterpreterCreateQuery::formatColumns(columns_name_and_type)); + columns->columns->children.emplace_back(create_materialized_column_declaration(sign_column_name, "Int8", UInt64(1))); + columns->columns->children.emplace_back(create_materialized_column_declaration(version_column_name, "UInt64", UInt64(1))); auto storage = std::make_shared(); diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index a855fccef56..fc898ba0702 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -22,16 +22,9 @@ namespace DB StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_) : IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_) { - ColumnsDescription columns_desc; - const auto & nested_memory_metadata = nested_storage->getInMemoryMetadata(); - const ColumnsDescription & nested_columns_desc = nested_memory_metadata.getColumns(); - - auto iterator = nested_columns_desc.begin(); - for (size_t index = 0; index < nested_columns_desc.size() - 2; ++index, ++iterator) - columns_desc.add(*iterator); - + auto nested_memory_metadata = nested_storage->getInMemoryMetadata(); StorageInMemoryMetadata in_memory_metadata; - in_memory_metadata.setColumns(columns_desc); + in_memory_metadata.setColumns(nested_memory_metadata.getColumns()); setInMemoryMetadata(in_memory_metadata); } @@ -106,17 +99,7 @@ NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const { /// If the background synchronization thread has exception. database->rethrowExceptionIfNeed(); - - NamesAndTypesList virtuals; - Block nested_header = nested_storage->getInMemoryMetadata().getSampleBlockNonMaterialized(); - ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2); - ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1); - virtuals.emplace_back(NameAndTypePair(sign_column.name, sign_column.type)); - virtuals.emplace_back(NameAndTypePair(version_column.name, version_column.type)); - - auto nested_virtuals = nested_storage->getVirtuals(); - virtuals.insert(virtuals.end(), nested_virtuals.begin(), nested_virtuals.end()); - return virtuals; + return nested_storage->getVirtuals(); } } diff --git a/src/Storages/StorageMaterializeMySQL.h b/src/Storages/StorageMaterializeMySQL.h index 39eca3191f1..d3e132844ee 100644 --- a/src/Storages/StorageMaterializeMySQL.h +++ b/src/Storages/StorageMaterializeMySQL.h @@ -19,7 +19,6 @@ public: bool supportsFinal() const override { return nested_storage->supportsFinal(); } bool supportsSampling() const override { return nested_storage->supportsSampling(); } - StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_); Pipes read(