ISSUES-4006 convert _sign & _version to materialized column

This commit is contained in:
zhang2014 2020-08-10 22:32:05 +08:00
parent 90b7628496
commit 2311cda334
8 changed files with 73 additions and 141 deletions

View File

@ -1,57 +0,0 @@
#include <DataStreams/AddingVersionsBlockOutputStream.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
void AddingVersionsBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void AddingVersionsBlockOutputStream::writeSuffix()
{
output->writeSuffix();
}
void AddingVersionsBlockOutputStream::flush()
{
output->flush();
}
void AddingVersionsBlockOutputStream::write(const Block & block)
{
Block res;
size_t rows = block.rows();
for (size_t index = 0; index < block.columns(); ++index)
res.insert(block.getByPosition(index));
DataTypePtr sign_type = std::make_shared<DataTypeInt8>();
DataTypePtr version_type = std::make_shared<DataTypeUInt64>();
ColumnPtr sign_column = sign_type->createColumnConst(rows, Field(Int8(1)))->convertToFullColumnIfConst();
ColumnPtr version_column = version_type->createColumnConst(rows, Field(UInt64(++version)))->convertToFullColumnIfConst();
Block header = output->getHeader();
res.insert({sign_column, sign_type, header.getByPosition(header.columns() - 2).name});
res.insert({version_column, version_type, header.getByPosition(header.columns() - 1).name});
output->write(res);
written_rows += block.rows();
written_bytes += block.bytes();
}
Block AddingVersionsBlockOutputStream::getHeader() const
{
Block res;
Block header = output->getHeader();
for (size_t index = 0; index < header.columns() - 2; ++index)
res.insert(header.getByPosition(index));
return res;
}
}

View File

@ -1,36 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
class AddingVersionsBlockOutputStream : public IBlockOutputStream
{
public:
AddingVersionsBlockOutputStream(size_t & version_, const BlockOutputStreamPtr & output_)
: version(version_), output(output_)
{
}
Block getHeader() const override;
void write(const Block & block) override;
void flush() override;
void writePrefix() override;
void writeSuffix() override;
private:
size_t & version;
BlockOutputStreamPtr output;
std::atomic<size_t> written_rows{0}, written_bytes{0};
public:
size_t getWrittenRows() { return written_rows; }
size_t getWrittenBytes() { return written_bytes; }
};
}

View File

@ -80,7 +80,7 @@ void MaterializeMetadata::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & c
if (!master_status || master_status.rows() != 1)
throw Exception("Unable to get master status from MySQL.", ErrorCodes::LOGICAL_ERROR);
version = 0;
version = 1;
binlog_file = (*master_status.getByPosition(0).column)[0].safeGet<String>();
binlog_position = (*master_status.getByPosition(1).column)[0].safeGet<UInt64>();
binlog_do_db = (*master_status.getByPosition(2).column)[0].safeGet<String>();

View File

@ -32,7 +32,7 @@ struct MaterializeMetadata
String binlog_ignore_db;
String executed_gtid_set;
size_t version = 0;
size_t version = 1;
std::unordered_map<String, String> need_dumping_tables;
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);

View File

@ -9,7 +9,7 @@
# include <cstdlib>
# include <random>
# include <Columns/ColumnTuple.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <DataStreams/CountingBlockOutputStream.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/DatabaseMaterializeMySQL.h>
@ -37,20 +37,28 @@ namespace ErrorCodes
static constexpr auto MYSQL_BACKGROUND_THREAD_NAME = "MySQLDBSync";
static BlockIO tryToExecuteQuery(const String & query_to_execute, const Context & context_, const String & database, const String & comment)
static Context createQueryContext(const Context & global_context)
{
Settings new_query_settings = global_context.getSettings();
new_query_settings.insert_allow_materialized_columns = true;
Context query_context(global_context);
query_context.setSettings(new_query_settings);
CurrentThread::QueryScope query_scope(query_context);
query_context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context.setCurrentQueryId(""); // generate random query_id
return query_context;
}
static BlockIO tryToExecuteQuery(const String & query_to_execute, Context & query_context, const String & database, const String & comment)
{
try
{
Context context(context_);
CurrentThread::QueryScope query_scope(context);
if (!database.empty())
context.setCurrentDatabase(database);
query_context.setCurrentDatabase(database);
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
return executeQuery("/*" + comment + "*/ " + query_to_execute, context, true);
return executeQuery("/*" + comment + "*/ " + query_to_execute, query_context, true);
}
catch (...)
{
@ -216,16 +224,35 @@ static inline void cleanOutdatedTables(const String & database_name, const Conte
for (auto iterator = clean_database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
Context query_context = createQueryContext(context);
String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data";
String table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(iterator->name());
tryToExecuteQuery(" DROP TABLE " + table_name, context, database_name, comment);
tryToExecuteQuery(" DROP TABLE " + table_name, query_context, database_name, comment);
}
}
static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, const Context & context)
static inline BlockOutputStreamPtr getTableOutput(const String & database_name, const String & table_name, Context & query_context, bool insert_materialized = false)
{
const StoragePtr & storage = DatabaseCatalog::instance().getTable(StorageID(database_name, table_name), query_context);
std::stringstream insert_columns_str;
const StorageInMemoryMetadata & storage_metadata = storage->getInMemoryMetadata();
const ColumnsDescription & storage_columns = storage_metadata.getColumns();
const NamesAndTypesList & insert_columns_names = insert_materialized ? storage_columns.getAllPhysical() : storage_columns.getOrdinary();
for (auto iterator = insert_columns_names.begin(); iterator != insert_columns_names.end(); ++iterator)
{
if (iterator != insert_columns_names.begin())
insert_columns_str << ", ";
insert_columns_str << iterator->name;
}
String comment = "Materialize MySQL step 1: execute dump data";
BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + " VALUES", context, database_name, comment);
BlockIO res = tryToExecuteQuery("INSERT INTO " + backQuoteIfNeed(table_name) + "(" + insert_columns_str.str() + ")" + " VALUES",
query_context, database_name, comment);
if (!res.out)
throw Exception("LOGICAL ERROR: It is a bug.", ErrorCodes::LOGICAL_ERROR);
@ -242,21 +269,23 @@ static inline void dumpDataForTables(
for (; iterator != master_info.need_dumping_tables.end() && !is_cancelled(); ++iterator)
{
const auto & table_name = iterator->first;
Context query_context = createQueryContext(context);
String comment = "Materialize MySQL step 1: execute MySQL DDL for dump data";
tryToExecuteQuery(query_prefix + " " + iterator->second, context, database_name, comment); /// create table.
tryToExecuteQuery(query_prefix + " " + iterator->second, query_context, database_name, comment); /// create table.
auto out = std::make_shared<AddingVersionsBlockOutputStream>(master_info.version, getTableOutput(database_name, table_name, context));
auto out = std::make_shared<CountingBlockOutputStream>(getTableOutput(database_name, table_name, query_context));
MySQLBlockInputStream input(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name),
out->getHeader(), DEFAULT_BLOCK_SIZE);
Stopwatch watch;
copyData(input, *out, is_cancelled);
const Progress & progress = out->getProgress();
LOG_INFO(&Poco::Logger::get("MaterializeMySQLSyncThread(" + database_name + ")"),
"Materialize MySQL step 1: dump {}, {} rows, {} in {} sec., {} rows/sec., {}/sec.",
table_name, out->getWrittenRows(), ReadableSize(out->getWrittenBytes()), watch.elapsedSeconds(),
static_cast<size_t>(out->getWrittenRows() / watch.elapsedSeconds()),
ReadableSize(out->getWrittenRows() / watch.elapsedSeconds()));
"Materialize MySQL step 1: dump {}, {} rows, {} in {} sec., {} rows/sec., {}/sec."
, table_name, formatReadableQuantity(progress.written_rows), formatReadableSizeWithBinarySuffix(progress.written_bytes)
, watch.elapsedSeconds(), formatReadableQuantity(static_cast<size_t>(progress.written_rows / watch.elapsedSeconds()))
, formatReadableSizeWithBinarySuffix(static_cast<size_t>(progress.written_bytes / watch.elapsedSeconds())));
}
}
@ -564,9 +593,10 @@ void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr
try
{
Context query_context = createQueryContext(global_context);
String comment = "Materialize MySQL step 2: execute MySQL DDL for sync data";
String event_database = query_event.schema == mysql_database_name ? database_name : "";
tryToExecuteQuery(query_prefix + query_event.query, global_context, event_database, comment);
tryToExecuteQuery(query_prefix + query_event.query, query_context, event_database, comment);
}
catch (Exception & exception)
{
@ -616,8 +646,9 @@ void MaterializeMySQLSyncThread::Buffers::commit(const Context & context)
{
for (auto & table_name_and_buffer : data)
{
Context query_context = createQueryContext(context);
OneBlockInputStream input(table_name_and_buffer.second->first);
BlockOutputStreamPtr out = getTableOutput(database, table_name_and_buffer.first, context);
BlockOutputStreamPtr out = getTableOutput(database, table_name_and_buffer.first, query_context, true);
copyData(input, *out);
}
@ -640,11 +671,11 @@ MaterializeMySQLSyncThread::Buffers::BufferAndSortingColumnsPtr MaterializeMySQL
const auto & iterator = data.find(table_name);
if (iterator == data.end())
{
StoragePtr storage = getDatabase(database).tryGetTable(table_name, context);
StoragePtr storage = DatabaseCatalog::instance().getTable(StorageID(database, table_name), context);
const StorageInMemoryMetadata & metadata = storage->getInMemoryMetadata();
BufferAndSortingColumnsPtr & buffer_and_soring_columns = data.try_emplace(
table_name, std::make_shared<BufferAndSortingColumns>(metadata.getSampleBlockNonMaterialized(), std::vector<size_t>{})).first->second;
table_name, std::make_shared<BufferAndSortingColumns>(metadata.getSampleBlock(), std::vector<size_t>{})).first->second;
Names required_for_sorting_key = metadata.getColumnsRequiredForSortingKey();

View File

@ -339,12 +339,24 @@ ASTs InterpreterCreateImpl::getRewrittenQueries(
auto columns = std::make_shared<ASTColumns>();
const auto & create_materialized_column_declaration = [&](const String & name, const String & type, const auto & default_value)
{
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = name;
column_declaration->type = makeASTFunction(type);
column_declaration->default_specifier = "MATERIALIZED";
column_declaration->default_expression = std::make_shared<ASTLiteral>(default_value);
column_declaration->children.emplace_back(column_declaration->type);
column_declaration->children.emplace_back(column_declaration->default_expression);
return column_declaration;
};
/// Add _sign and _version column.
String sign_column_name = getUniqueColumnName(columns_name_and_type, "_sign");
String version_column_name = getUniqueColumnName(columns_name_and_type, "_version");
columns_name_and_type.emplace_back(NameAndTypePair{sign_column_name, std::make_shared<DataTypeInt8>()});
columns_name_and_type.emplace_back(NameAndTypePair{version_column_name, std::make_shared<DataTypeUInt64>()});
columns->set(columns->columns, InterpreterCreateQuery::formatColumns(columns_name_and_type));
columns->columns->children.emplace_back(create_materialized_column_declaration(sign_column_name, "Int8", UInt64(1)));
columns->columns->children.emplace_back(create_materialized_column_declaration(version_column_name, "UInt64", UInt64(1)));
auto storage = std::make_shared<ASTStorage>();

View File

@ -22,16 +22,9 @@ namespace DB
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_)
: IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
{
ColumnsDescription columns_desc;
const auto & nested_memory_metadata = nested_storage->getInMemoryMetadata();
const ColumnsDescription & nested_columns_desc = nested_memory_metadata.getColumns();
auto iterator = nested_columns_desc.begin();
for (size_t index = 0; index < nested_columns_desc.size() - 2; ++index, ++iterator)
columns_desc.add(*iterator);
auto nested_memory_metadata = nested_storage->getInMemoryMetadata();
StorageInMemoryMetadata in_memory_metadata;
in_memory_metadata.setColumns(columns_desc);
in_memory_metadata.setColumns(nested_memory_metadata.getColumns());
setInMemoryMetadata(in_memory_metadata);
}
@ -106,17 +99,7 @@ NamesAndTypesList StorageMaterializeMySQL::getVirtuals() const
{
/// If the background synchronization thread has exception.
database->rethrowExceptionIfNeed();
NamesAndTypesList virtuals;
Block nested_header = nested_storage->getInMemoryMetadata().getSampleBlockNonMaterialized();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
virtuals.emplace_back(NameAndTypePair(sign_column.name, sign_column.type));
virtuals.emplace_back(NameAndTypePair(version_column.name, version_column.type));
auto nested_virtuals = nested_storage->getVirtuals();
virtuals.insert(virtuals.end(), nested_virtuals.begin(), nested_virtuals.end());
return virtuals;
return nested_storage->getVirtuals();
}
}

View File

@ -19,7 +19,6 @@ public:
bool supportsFinal() const override { return nested_storage->supportsFinal(); }
bool supportsSampling() const override { return nested_storage->supportsSampling(); }
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_);
Pipes read(