ISSUES-4006 fix crash when dump data

This commit is contained in:
zhang2014 2020-06-24 01:45:55 +08:00
parent 1df8bfe4fb
commit 9324c0ee49
2 changed files with 38 additions and 4 deletions

View File

@ -40,7 +40,8 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context
{
LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread"), "Try execute query: " + query_to_execute);
Context context = context_;
Context context(context_);
CurrentThread::QueryScope query_scope(context);
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
@ -51,7 +52,7 @@ static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context
throw;
}
// LOG_DEBUG(&Logger::get("MaterializeMySQLSyncThread"), "Executed query: " + query_to_execute);
LOG_DEBUG(&Poco::Logger::get("MaterializeMySQLSyncThread"), "Executed query: " + query_to_execute);
}
static inline DatabaseMaterializeMySQL & getDatabase(const String & database_name)
@ -89,7 +90,7 @@ MaterializeMySQLSyncThread::~MaterializeMySQLSyncThread()
MaterializeMySQLSyncThread::MaterializeMySQLSyncThread(
const Context & context, const String & database_name_, const String & mysql_database_name_,
mysqlxx::Pool && pool_, MySQLClient && client_, MaterializeMySQLSettings * settings_)
: log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context), database_name(database_name_)
: log(&Poco::Logger::get("MaterializeMySQLSyncThread")), global_context(context.getGlobalContext()), database_name(database_name_)
, mysql_database_name(mysql_database_name_), pool(std::move(pool_)), client(std::move(client_)), settings(settings_)
{
/// TODO: 做简单的check, 失败即报错
@ -128,7 +129,10 @@ void MaterializeMySQLSyncThread::synchronization()
std::unique_lock<std::mutex> lock(sync_mutex);
if (binlog_event)
{
binlog_event->dump();
onEvent(buffers, binlog_event, *metadata);
}
if (watch.elapsedMilliseconds() > max_flush_time || buffers.checkThresholds(
settings->max_rows_in_buffer, settings->max_bytes_in_buffer,
@ -136,7 +140,9 @@ void MaterializeMySQLSyncThread::synchronization()
)
{
watch.restart();
flushBuffersData(buffers, *metadata);
if (!buffers.data.empty())
flushBuffersData(buffers, *metadata);
}
}
}
@ -263,6 +269,9 @@ static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value,
sign_column_data.emplace_back(sign_value);
version_column_data.emplace_back(version_value);
}
data.getByPosition(data.columns() - 2).column = std::move(sign_mutable_column);
data.getByPosition(data.columns() - 1).column = std::move(version_mutable_column);
}
template <Int8 sign>
@ -275,6 +284,8 @@ static size_t onWriteOrDeleteData(const std::vector<Field> & rows_data, Block &
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
buffer.getByPosition(column).column = std::move(col_to);
}
fillSignAndVersionColumnsData(buffer, sign, version, rows_data.size());
@ -317,6 +328,8 @@ static inline size_t onUpdateData(const std::vector<Field> & rows_data, Block &
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
}
}
buffer.getByPosition(column).column = std::move(col_to);
}
MutableColumnPtr sign_mutable_column = IColumn::mutate(std::move(buffer.getByPosition(buffer.columns() - 2).column));
@ -342,6 +355,8 @@ static inline size_t onUpdateData(const std::vector<Field> & rows_data, Block &
}
}
buffer.getByPosition(buffer.columns() - 2).column = std::move(sign_mutable_column);
buffer.getByPosition(buffer.columns() - 1).column = std::move(version_mutable_column);
return buffer.bytes() - prev_bytes;
}

View File

@ -1,9 +1,14 @@
#include <Storages/StorageMaterializeMySQL.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Pipe.h>
#include <Processors/Transforms/FilterTransform.h>
namespace DB
{
@ -51,6 +56,20 @@ Pipes StorageMaterializeMySQL::read(
require_columns_name.emplace_back(sign_column.name);
return nested_storage->read(require_columns_name, query_info, context, processed_stage, max_block_size, num_streams);
/*for (auto & pipe : pipes)
{
std::cout << "Pipe Header Structure:" << pipe.getHeader().dumpStructure() << "\n";
ASTPtr expr = makeASTFunction(
"equals", std::make_shared<ASTIdentifier>(sign_column.name), std::make_shared<ASTLiteral>(Field(Int8(1))));
auto syntax = SyntaxAnalyzer(context).analyze(expr, pipe.getHeader().getNamesAndTypesList());
ExpressionActionsPtr expression_actions = ExpressionAnalyzer(expr, syntax, context).getActions(true);
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), expression_actions, expr->getColumnName(), false));
/// TODO: maybe need remove sign columns
}*/
// return pipes;
}
}