diff --git a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp index 4ded25a0b7b..16187585ce4 100644 --- a/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp +++ b/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp @@ -40,7 +40,8 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context { LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread"), "Try execute query: " + query_to_execute); - Context context = context_; + Context context(context_); + CurrentThread::QueryScope query_scope(context); context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; context.setCurrentQueryId(""); // generate random query_id return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true); @@ -51,7 +52,7 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context throw; } -// LOG_DEBUG(&Logger::get("MaterializeMySQLSyncThread"), "Executed query: " + query_to_execute); + LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread"), "Executed query: " + query_to_execute); } static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name) @@ -89,7 +90,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread() MaterializeMySQLSyncThread::MaterializeMySQLSyncThread( const Context & context, const String & database_name_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_) - : log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context), database_name(database_name_) + : log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context.getGlobalContext()), database_name(database_name_) , mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_) { /// TODO: 做简单的check, 失败即报错 @@ -128,7 +129,10 @@ void MaterializeMySQLSyncThread::synchronization() std::unique_lock lock(sync_mutex); if (binlog_event) + { + binlog_event->dump(); onEvent(buffers, binlog_event, *metadata); + } if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds( settings->max_rows_in_buffer, settings->max_bytes_in_buffer, @@ -136,7 +140,9 @@ void MaterializeMySQLSyncThread::synchronization() ) { watch.restart(); - flushBuffersData(buffers, *metadata); + + if (!buffers.data.empty()) + flushBuffersData(buffers, *metadata); } } } @@ -263,6 +269,9 @@ static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value, sign_column_data.emplace_back(sign_value); version_column_data.emplace_back(version_value); } + + data.getByPosition(data.columns() - 2).column = std::move(sign_mutable_column); + data.getByPosition(data.columns() - 1).column = std::move(version_mutable_column); } template @@ -275,6 +284,8 @@ static size_t onWriteOrDeleteData(const std::vector & rows_data, Block & for (size_t index = 0; index < rows_data.size(); ++index) col_to->insert(DB::get(rows_data[index])[column]); + + buffer.getByPosition(column).column = std::move(col_to); } fillSignAndVersionColumnsData(buffer, sign, version, rows_data.size()); @@ -317,6 +328,8 @@ static inline size_t onUpdateData(const std::vector & rows_data, Block & col_to->insert(DB::get(rows_data[index + 1])[column]); } } + + buffer.getByPosition(column).column = std::move(col_to); } MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(buffer.getByPosition(buffer.columns() - 2).column)); @@ -342,6 +355,8 @@ static inline size_t onUpdateData(const std::vector & rows_data, Block & } } + buffer.getByPosition(buffer.columns() - 2).column = std::move(sign_mutable_column); + buffer.getByPosition(buffer.columns() - 1).column = std::move(version_mutable_column); return buffer.bytes() - prev_bytes; } diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index 6bcdc9bd3b8..aba9ea061e0 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -1,9 +1,14 @@ #include +#include #include #include +#include +#include + #include +#include namespace DB { @@ -51,6 +56,20 @@ Pipes StorageMaterializeMySQL::read( require_columns_name.emplace_back(sign_column.name); return nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams); + + /*for (auto & pipe : pipes) + { + std::cout << "Pipe Header Structure:" << pipe.getHeader().dumpStructure() << "\n"; + ASTPtr expr = makeASTFunction( + "equals", std::make_shared(sign_column.name), std::make_shared(Field(Int8(1)))); + auto syntax = SyntaxAnalyzer(context).analyze(expr, pipe.getHeader().getNamesAndTypesList()); + ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expr, syntax, context).getActions(true); + + pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), expression_actions, expr->getColumnName(), false)); + /// TODO: maybe need remove sign columns + }*/ + +// return pipes; } }