Update TrivialBuffer.cpp

This commit is contained in:
alexey-milovidov 2017-04-14 02:52:35 +03:00
parent 1059c87828
commit 84040d8a85

View File

@ -18,18 +18,18 @@
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event TrivialBufferFlush; extern const Event TrivialBufferFlush;
extern const Event TrivialBufferErrorOnFlush; extern const Event TrivialBufferErrorOnFlush;
extern const Event TrivialBufferPassedAllMinThresholds; extern const Event TrivialBufferPassedAllMinThresholds;
extern const Event TrivialBufferPassedTimeMaxThreshold; extern const Event TrivialBufferPassedTimeMaxThreshold;
extern const Event TrivialBufferPassedRowsMaxThreshold; extern const Event TrivialBufferPassedRowsMaxThreshold;
extern const Event TrivialBufferPassedBytesMaxThreshold; extern const Event TrivialBufferPassedBytesMaxThreshold;
} }
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric TrivialBufferRows; extern const Metric TrivialBufferRows;
extern const Metric TrivialBufferBytes; extern const Metric TrivialBufferBytes;
} }
@ -38,347 +38,347 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int INFINITE_LOOP; extern const int INFINITE_LOOP;
extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE;
} }
StoragePtr TrivialBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_, StoragePtr TrivialBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
Context & context_, const size_t num_blocks_to_deduplicate_, Context & context_, const size_t num_blocks_to_deduplicate_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_) const String & destination_database_, const String & destination_table_)
{ {
return make_shared( return make_shared(
name_, columns_, materialized_columns_, alias_columns_, column_defaults_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, num_blocks_to_deduplicate_, min_thresholds_, max_thresholds_, context_, num_blocks_to_deduplicate_, min_thresholds_, max_thresholds_,
destination_database_, destination_table_); destination_database_, destination_table_);
} }
TrivialBuffer::TrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_, TrivialBuffer::TrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_, const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_, const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_, const ColumnDefaults & column_defaults_,
Context & context_, const size_t num_blocks_to_deduplicate_, Context & context_, const size_t num_blocks_to_deduplicate_,
const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
const String & destination_database_, const String & destination_table_) const String & destination_database_, const String & destination_table_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, : IStorage{materialized_columns_, alias_columns_, column_defaults_},
name(name_), columns(columns_), context(context_), name(name_), columns(columns_), context(context_),
num_blocks_to_deduplicate(num_blocks_to_deduplicate_), num_blocks_to_deduplicate(num_blocks_to_deduplicate_),
current_hashes(std::make_unique<DeduplicationBuffer>()), current_hashes(std::make_unique<DeduplicationBuffer>()),
previous_hashes(std::make_unique<DeduplicationBuffer>()), previous_hashes(std::make_unique<DeduplicationBuffer>()),
min_thresholds(min_thresholds_), max_thresholds(max_thresholds_), min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
destination_database(destination_database_), destination_table(destination_table_), destination_database(destination_database_), destination_table(destination_table_),
no_destination(destination_database.empty() && destination_table.empty()), no_destination(destination_database.empty() && destination_table.empty()),
log(&Logger::get("TrivialBuffer (" + name + ")")), log(&Logger::get("TrivialBuffer (" + name + ")")),
flush_thread(&TrivialBuffer::flushThread, this) flush_thread(&TrivialBuffer::flushThread, this)
{ {
} }
class TrivialBufferBlockInputStream : public IProfilingBlockInputStream class TrivialBufferBlockInputStream : public IProfilingBlockInputStream
{ {
public: public:
TrivialBufferBlockInputStream(const Names & column_names_, TrivialBuffer & buffer_) TrivialBufferBlockInputStream(const Names & column_names_, TrivialBuffer & buffer_)
: column_names(column_names_.begin(), column_names_.end()), buffer(buffer_) {} : column_names(column_names_.begin(), column_names_.end()), buffer(buffer_) {}
String getName() const { return "TrivialBuffer"; } String getName() const { return "TrivialBuffer"; }
String getID() const String getID() const
{ {
std::stringstream res; std::stringstream res;
res << "TrivialBuffer(" << &buffer; res << "TrivialBuffer(" << &buffer;
for (const auto & name : column_names) for (const auto & name : column_names)
res << ", " << name; res << ", " << name;
res << ")"; res << ")";
return res.str(); return res.str();
} }
protected: protected:
Block readImpl() Block readImpl()
{ {
Block res; Block res;
if (has_been_read) if (has_been_read)
return res; return res;
has_been_read = true; has_been_read = true;
std::lock_guard<std::mutex> lock(buffer.mutex); std::lock_guard<std::mutex> lock(buffer.mutex);
for (auto & block : buffer.data) for (auto & block : buffer.data)
{ {
if (!block.rows()) if (!block.rows())
continue; continue;
for (const auto & name : column_names) for (const auto & name : column_names)
{ {
auto & col = block.getByName(name); auto & col = block.getByName(name);
res.insert(ColumnWithTypeAndName(col.column->clone(), col.type, name)); res.insert(ColumnWithTypeAndName(col.column->clone(), col.type, name));
} }
} }
return res; return res;
} }
private: private:
Names column_names; Names column_names;
TrivialBuffer & buffer; TrivialBuffer & buffer;
bool has_been_read = false; bool has_been_read = false;
}; };
BlockInputStreams TrivialBuffer::read( BlockInputStreams TrivialBuffer::read(
const Names & column_names, const Names & column_names,
ASTPtr query, ASTPtr query,
const Context & context, const Context & context,
const Settings & settings, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, QueryProcessingStage::Enum & processed_stage,
size_t max_block_size, size_t max_block_size,
unsigned threads) unsigned threads)
{ {
processed_stage = QueryProcessingStage::FetchColumns; processed_stage = QueryProcessingStage::FetchColumns;
BlockInputStreams streams; BlockInputStreams streams;
if (!no_destination) if (!no_destination)
{ {
auto destination = context.getTable(destination_database, destination_table); auto destination = context.getTable(destination_database, destination_table);
if (destination.get() == this) if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", throw Exception("Destination table is myself. Read will cause infinite loop.",
ErrorCodes::INFINITE_LOOP); ErrorCodes::INFINITE_LOOP);
/** Отключаем оптимизацию "перенос в PREWHERE", /** Отключаем оптимизацию "перенос в PREWHERE",
* так как Buffer не поддерживает PREWHERE. * так как Buffer не поддерживает PREWHERE.
*/ */
Settings modified_settings = settings; Settings modified_settings = settings;
modified_settings.optimize_move_to_prewhere = false; modified_settings.optimize_move_to_prewhere = false;
streams = destination->read(column_names, query, context, modified_settings, streams = destination->read(column_names, query, context, modified_settings,
processed_stage, max_block_size, threads); processed_stage, max_block_size, threads);
} }
streams.push_back(std::make_shared<TrivialBufferBlockInputStream>(column_names, streams.push_back(std::make_shared<TrivialBufferBlockInputStream>(column_names,
*this)); *this));
/** Если источники из таблицы были обработаны до какой-то не начальной стадии выполнения запроса, /** Если источники из таблицы были обработаны до какой-то не начальной стадии выполнения запроса,
* то тогда источники из буферов надо тоже обернуть в конвейер обработки до той же стадии. * то тогда источники из буферов надо тоже обернуть в конвейер обработки до той же стадии.
*/ */
if (processed_stage > QueryProcessingStage::FetchColumns) if (processed_stage > QueryProcessingStage::FetchColumns)
streams.back() = InterpreterSelectQuery(query, context, processed_stage, 0, streams.back() = InterpreterSelectQuery(query, context, processed_stage, 0,
streams.back()).execute().in; streams.back()).execute().in;
return streams; return streams;
} }
void TrivialBuffer::addBlock(const Block & block) void TrivialBuffer::addBlock(const Block & block)
{ {
SipHash hash; SipHash hash;
block.updateHash(hash); block.updateHash(hash);
HashType block_hash = hash.get64(); HashType block_hash = hash.get64();
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
if (current_hashes->find(block_hash) == current_hashes->end() if (current_hashes->find(block_hash) == current_hashes->end()
&& previous_hashes->find(block_hash) == previous_hashes->end()) && previous_hashes->find(block_hash) == previous_hashes->end())
{ {
if (current_hashes->size() >= num_blocks_to_deduplicate / 2) if (current_hashes->size() >= num_blocks_to_deduplicate / 2)
{ {
previous_hashes = std::move(current_hashes); previous_hashes = std::move(current_hashes);
current_hashes = std::make_unique<DeduplicationBuffer>(); current_hashes = std::make_unique<DeduplicationBuffer>();
} }
current_hashes->insert(block_hash); current_hashes->insert(block_hash);
current_rows += block.rows(); current_rows += block.rows();
current_bytes += block.bytes(); current_bytes += block.bytes();
data.push_back(block); data.push_back(block);
CurrentMetrics::add(CurrentMetrics::TrivialBufferRows, current_rows); CurrentMetrics::add(CurrentMetrics::TrivialBufferRows, current_rows);
CurrentMetrics::add(CurrentMetrics::TrivialBufferBytes, current_bytes); CurrentMetrics::add(CurrentMetrics::TrivialBufferBytes, current_bytes);
} }
else else
{ {
//NOTE: может быть, если нашли этот блок в previous, //NOTE: может быть, если нашли этот блок в previous,
// то надо его хэш перенести в current? // то надо его хэш перенести в current?
/* /*
auto it = previous_hashes->find(block_hash); auto it = previous_hashes->find(block_hash);
if (it != previous_hashes->end()) if (it != previous_hashes->end())
{ {
current_hashes->insert(it); current_hashes->insert(it);
previous_hashes->erase(it); previous_hashes->erase(it);
} }
*/ */
} }
} }
void TrivialBuffer::flush(bool check_thresholds) void TrivialBuffer::flush(bool check_thresholds)
{ {
Block block_to_write; Block block_to_write;
time_t current_time = time(0); time_t current_time = time(0);
time_t time_passed = 0; time_t time_passed = 0;
if (data.empty()) if (data.empty())
return; return;
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
if (first_write_time) if (first_write_time)
time_passed = current_time - first_write_time; time_passed = current_time - first_write_time;
if (check_thresholds) if (check_thresholds)
{ {
if (!checkThresholdsImpl(current_rows, current_bytes, time_passed)) if (!checkThresholdsImpl(current_rows, current_bytes, time_passed))
return; return;
} }
else else
{ {
if (current_rows == 0) if (current_rows == 0)
return; return;
} }
/// Collecting BlockList into single block. /// Collecting BlockList into single block.
block_to_write = data.front().cloneEmpty(); block_to_write = data.front().cloneEmpty();
block_to_write.checkNumberOfRows(); block_to_write.checkNumberOfRows();
for (auto & block : data) for (auto & block : data)
{ {
block.checkNumberOfRows(); block.checkNumberOfRows();
for (size_t column_no = 0, columns = block.columns(); column_no < columns; ++column_no) for (size_t column_no = 0, columns = block.columns(); column_no < columns; ++column_no)
{ {
IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get(); IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get();
const IColumn & col_from = *block.getByName(col_to.getName()).column.get(); const IColumn & col_from = *block.getByName(col_to.getName()).column.get();
col_to.insertRangeFrom(col_from, 0, block.rows()); col_to.insertRangeFrom(col_from, 0, block.rows());
} }
} }
first_write_time = 0; first_write_time = 0;
ProfileEvents::increment(ProfileEvents::TrivialBufferFlush); ProfileEvents::increment(ProfileEvents::TrivialBufferFlush);
LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds."); LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds.");
if (no_destination) if (no_destination)
return; return;
/** For simplicity, buffer is locked during write. /** For simplicity, buffer is locked during write.
* We could unlock buffer temporary, but it would lead to too much difficulties: * We could unlock buffer temporary, but it would lead to too much difficulties:
* - data, that is written, will not be visible for SELECTs; * - data, that is written, will not be visible for SELECTs;
* - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written; * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written;
* - this could lead to infinite memory growth. * - this could lead to infinite memory growth.
*/ */
try try
{ {
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table)); writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
data.clear(); data.clear();
CurrentMetrics::sub(CurrentMetrics::TrivialBufferRows, block_to_write.rows()); CurrentMetrics::sub(CurrentMetrics::TrivialBufferRows, block_to_write.rows());
CurrentMetrics::sub(CurrentMetrics::TrivialBufferBytes, block_to_write.bytes()); CurrentMetrics::sub(CurrentMetrics::TrivialBufferBytes, block_to_write.bytes());
} }
catch (...) catch (...)
{ {
ProfileEvents::increment(ProfileEvents::TrivialBufferErrorOnFlush); ProfileEvents::increment(ProfileEvents::TrivialBufferErrorOnFlush);
if (!first_write_time) if (!first_write_time)
first_write_time = current_time; first_write_time = current_time;
/// Через некоторое время будет следующая попытка записать. /// Через некоторое время будет следующая попытка записать.
throw; throw;
} }
} }
} }
class TrivialBufferBlockOutputStream : public IBlockOutputStream class TrivialBufferBlockOutputStream : public IBlockOutputStream
{ {
public: public:
TrivialBufferBlockOutputStream(TrivialBuffer & buffer_) : buffer(buffer_) {} TrivialBufferBlockOutputStream(TrivialBuffer & buffer_) : buffer(buffer_) {}
void write(const Block & block) override void write(const Block & block) override
{ {
if (!block) if (!block)
return; return;
size_t rows = block.rows(); size_t rows = block.rows();
size_t bytes = block.bytes(); size_t bytes = block.bytes();
if (!rows) if (!rows)
return; return;
StoragePtr destination; StoragePtr destination;
if (!buffer.no_destination) if (!buffer.no_destination)
{ {
destination = buffer.context.tryGetTable(buffer.destination_database, destination = buffer.context.tryGetTable(buffer.destination_database,
buffer.destination_table); buffer.destination_table);
if (destination) if (destination)
{ {
if (destination.get() == &buffer) if (destination.get() == &buffer)
throw Exception("Destination table is myself. Write will " throw Exception("Destination table is myself. Write will "
"cause infinite loop.", ErrorCodes::INFINITE_LOOP); "cause infinite loop.", ErrorCodes::INFINITE_LOOP);
/// Проверяем структуру таблицы. /// Проверяем структуру таблицы.
try try
{ {
destination->check(block, true); destination->check(block, true);
} }
catch (Exception & e) catch (Exception & e)
{ {
e.addMessage("(when looking at destination table " e.addMessage("(when looking at destination table "
+ buffer.destination_database + "." + buffer.destination_database + "."
+ buffer.destination_table + ")"); + buffer.destination_table + ")");
throw; throw;
} }
} }
} }
/// Вставляем блок в список блоков. /// Вставляем блок в список блоков.
time_t current_time = time(0); time_t current_time = time(0);
if (buffer.checkThresholds(current_time, rows, bytes)) if (buffer.checkThresholds(current_time, rows, bytes))
{ {
/** Если после вставки в буфер, ограничения будут превышены, /** Если после вставки в буфер, ограничения будут превышены,
* то будем сбрасывать буфер. * то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, * Это также защищает от неограниченного потребления оперативки,
* так как в случае невозможности записать в таблицу, * так как в случае невозможности записать в таблицу,
* будет выкинуто исключение, а новые данные не будут добавлены в буфер. * будет выкинуто исключение, а новые данные не будут добавлены в буфер.
*/ */
buffer.flush(true); buffer.flush(true);
} }
if (!buffer.first_write_time) if (!buffer.first_write_time)
buffer.first_write_time = current_time; buffer.first_write_time = current_time;
buffer.addBlock(block); buffer.addBlock(block);
} }
private: private:
TrivialBuffer & buffer; TrivialBuffer & buffer;
}; };
BlockOutputStreamPtr TrivialBuffer::write(ASTPtr query, const Settings & settings) BlockOutputStreamPtr TrivialBuffer::write(ASTPtr query, const Settings & settings)
{ {
return std::make_shared<TrivialBufferBlockOutputStream>(*this); return std::make_shared<TrivialBufferBlockOutputStream>(*this);
} }
void TrivialBuffer::shutdown() void TrivialBuffer::shutdown()
{ {
shutdown_event.set(); shutdown_event.set();
if (flush_thread.joinable()) if (flush_thread.joinable())
flush_thread.join(); flush_thread.join();
try try
{ {
///NOTE: в StorageBuffer здесь используется optimize({}, {}, context.getSettings()). ///NOTE: в StorageBuffer здесь используется optimize({}, {}, context.getSettings()).
/// Зачем? /// Зачем?
flush(false); flush(false);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }
@ -395,162 +395,162 @@ void TrivialBuffer::shutdown()
*/ */
bool TrivialBuffer::optimize(const String & partition, bool final, const Settings & settings) bool TrivialBuffer::optimize(const String & partition, bool final, const Settings & settings)
{ {
if (!partition.empty()) if (!partition.empty())
throw Exception("Partition cannot be specified when optimizing table of type Buffer", throw Exception("Partition cannot be specified when optimizing table of type Buffer",
ErrorCodes::NOT_IMPLEMENTED); ErrorCodes::NOT_IMPLEMENTED);
if (final) if (final)
throw Exception("FINAL cannot be specified when optimizing table of type Buffer", throw Exception("FINAL cannot be specified when optimizing table of type Buffer",
ErrorCodes::NOT_IMPLEMENTED); ErrorCodes::NOT_IMPLEMENTED);
flush(false); flush(false);
return true; return true;
} }
bool TrivialBuffer::checkThresholds(const time_t current_time, const size_t additional_rows, bool TrivialBuffer::checkThresholds(const time_t current_time, const size_t additional_rows,
const size_t additional_bytes) const const size_t additional_bytes) const
{ {
time_t time_passed = 0; time_t time_passed = 0;
if (first_write_time) if (first_write_time)
time_passed = current_time - first_write_time; time_passed = current_time - first_write_time;
size_t rows = current_rows + additional_rows; size_t rows = current_rows + additional_rows;
size_t bytes = current_bytes + additional_bytes; size_t bytes = current_bytes + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed); return checkThresholdsImpl(rows, bytes, time_passed);
} }
bool TrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes, bool TrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes,
const time_t time_passed) const const time_t time_passed) const
{ {
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{ {
ProfileEvents::increment(ProfileEvents::TrivialBufferPassedAllMinThresholds); ProfileEvents::increment(ProfileEvents::TrivialBufferPassedAllMinThresholds);
return true; return true;
} }
if (time_passed > max_thresholds.time) if (time_passed > max_thresholds.time)
{ {
ProfileEvents::increment(ProfileEvents::TrivialBufferPassedTimeMaxThreshold); ProfileEvents::increment(ProfileEvents::TrivialBufferPassedTimeMaxThreshold);
return true; return true;
} }
if (rows > max_thresholds.rows) if (rows > max_thresholds.rows)
{ {
ProfileEvents::increment(ProfileEvents::TrivialBufferPassedRowsMaxThreshold); ProfileEvents::increment(ProfileEvents::TrivialBufferPassedRowsMaxThreshold);
return true; return true;
} }
if (bytes > max_thresholds.bytes) if (bytes > max_thresholds.bytes)
{ {
ProfileEvents::increment(ProfileEvents::TrivialBufferPassedBytesMaxThreshold); ProfileEvents::increment(ProfileEvents::TrivialBufferPassedBytesMaxThreshold);
return true; return true;
} }
return false; return false;
} }
void TrivialBuffer::flushThread() void TrivialBuffer::flushThread()
{ {
setThreadName("BufferFlush"); setThreadName("BufferFlush");
do do
{ {
try try
{ {
flush(true); flush(true);
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} while (!shutdown_event.tryWait(1000)); } while (!shutdown_event.tryWait(1000));
} }
void TrivialBuffer::writeBlockToDestination(const Block & block, StoragePtr table) void TrivialBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{ {
if (no_destination || !block) if (no_destination || !block)
return; return;
if (!table) if (!table)
{ {
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded."); LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded.");
return; return;
} }
auto insert = std::make_shared<ASTInsertQuery>(); auto insert = std::make_shared<ASTInsertQuery>();
insert->database = destination_database; insert->database = destination_database;
insert->table = destination_table; insert->table = destination_table;
/** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы. /** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы.
* Это позволит поддержать часть случаев (но не все), когда структура таблицы не совпадает. * Это позволит поддержать часть случаев (но не все), когда структура таблицы не совпадает.
*/ */
Block structure_of_destination_table = table->getSampleBlock(); Block structure_of_destination_table = table->getSampleBlock();
Names columns_intersection; Names columns_intersection;
columns_intersection.reserve(block.columns()); columns_intersection.reserve(block.columns());
for (size_t i : ext::range(0, structure_of_destination_table.columns())) for (size_t i : ext::range(0, structure_of_destination_table.columns()))
{ {
auto dst_col = structure_of_destination_table.getByPosition(i); auto dst_col = structure_of_destination_table.getByPosition(i);
if (block.has(dst_col.name)) if (block.has(dst_col.name))
{ {
if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName()) if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName())
{ {
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table
<< " have different type of column " << dst_col.name << ". Block of data is discarded."); << " have different type of column " << dst_col.name << ". Block of data is discarded.");
return; return;
} }
columns_intersection.push_back(dst_col.name); columns_intersection.push_back(dst_col.name);
} }
} }
if (columns_intersection.empty()) if (columns_intersection.empty())
{ {
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded."); LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded.");
return; return;
} }
if (columns_intersection.size() != block.columns()) if (columns_intersection.size() != block.columns())
LOG_WARNING(log, "Not all columns from block in buffer exist in destination table " LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
<< destination_database << "." << destination_table << ". Some columns are discarded."); << destination_database << "." << destination_table << ". Some columns are discarded.");
auto list_of_columns = std::make_shared<ASTExpressionList>(); auto list_of_columns = std::make_shared<ASTExpressionList>();
insert->columns = list_of_columns; insert->columns = list_of_columns;
list_of_columns->children.reserve(columns_intersection.size()); list_of_columns->children.reserve(columns_intersection.size());
for (const String & column : columns_intersection) for (const String & column : columns_intersection)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(StringRange(), column, ASTIdentifier::Column)); list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(StringRange(), column, ASTIdentifier::Column));
InterpreterInsertQuery interpreter{insert, context}; InterpreterInsertQuery interpreter{insert, context};
auto block_io = interpreter.execute(); auto block_io = interpreter.execute();
block_io.out->writePrefix(); block_io.out->writePrefix();
block_io.out->write(block); block_io.out->write(block);
block_io.out->writeSuffix(); block_io.out->writeSuffix();
} }
void TrivialBuffer::alter(const AlterCommands & params, const String & database_name, void TrivialBuffer::alter(const AlterCommands & params, const String & database_name,
const String & table_name, const Context & context) const String & table_name, const Context & context)
{ {
for (const auto & param : params) for (const auto & param : params)
if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
throw Exception("Storage engine " + getName() + " doesn't support primary key.", throw Exception("Storage engine " + getName() + " doesn't support primary key.",
ErrorCodes::NOT_IMPLEMENTED); ErrorCodes::NOT_IMPLEMENTED);
auto lock = lockStructureForAlter(); auto lock = lockStructureForAlter();
/// Чтобы не осталось блоков старой структуры. /// Чтобы не осталось блоков старой структуры.
///NOTE: опять здесь optimize({}, {}, context.getSettings()) ///NOTE: опять здесь optimize({}, {}, context.getSettings())
flush(false); flush(false);
params.apply(*columns, materialized_columns, alias_columns, column_defaults); params.apply(*columns, materialized_columns, alias_columns, column_defaults);
context.getDatabase(database_name)->alterTable( context.getDatabase(database_name)->alterTable(
context, table_name, context, table_name,
*columns, materialized_columns, alias_columns, column_defaults, {}); *columns, materialized_columns, alias_columns, column_defaults, {});
} }
} }