From 7c03b35bc45375b946c877c66765d8c3b21ee512 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Sat, 1 Dec 2018 15:42:56 +0300 Subject: [PATCH] CLICKHOUSE-4127: Fix assertion in debug build. Do reading faster if the structure isn't altered. --- dbms/src/Storages/StorageBuffer.cpp | 82 ++++++++++++++++------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 626814468d5..c93ef08801c 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -148,56 +149,65 @@ BlockInputStreams StorageBuffer::read( if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); - /// Collect columns from the destination tables which can be requested. - /// Find out if there is a struct mismatch and we need to convert read blocks from the destination tables. auto destination_lock = destination->lockStructure(false); - Names columns_intersection; - bool struct_mismatch = false; - for (const String & column_name : column_names) + const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name) { - if (destination->hasColumn(column_name)) + return destination->hasColumn(column_name) && + destination->getColumn(column_name).type->equals(*getColumn(column_name).type); + }); + + if (dst_has_same_structure) + { + /// The destination table has the same structure of the requested columns and we can simply read blocks from there. + streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams); + } + else + { + /// There is a struct mismatch and we need to convert read blocks from the destination table. + const Block header = getSampleBlock(); + Names columns_intersection = column_names; + Block header_after_adding_defaults = header; + for (const String & column_name : column_names) { - columns_intersection.emplace_back(column_name); - if (!destination->getColumn(column_name).type->equals(*getColumn(column_name).type)) + if (!destination->hasColumn(column_name)) + { + LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + << " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used."); + boost::range::remove_erase(columns_intersection, column_name); + continue; + } + const auto & dst_col = destination->getColumn(column_name); + const auto & col = getColumn(column_name); + if (!dst_col.type->equals(*col.type)) { LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << " has different type of column " << backQuoteIfNeed(column_name) << " (" - << destination->getColumn(column_name).type->getName() << " != " << getColumn(column_name).type->getName() - << "). Data from destination table is converted."); - struct_mismatch = true; + << dst_col.type->getName() << " != " << col.type->getName() << "). Data from destination table are converted."); + header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name); } } + + if (columns_intersection.empty()) + { + LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + << " has no common columns with block in buffer. Block of data is skipped."); + } else { - LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) - << " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used."); - struct_mismatch = true; + streams_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams); + for (auto & stream : streams_from_dst) + { + stream = std::make_shared( + stream, header_after_adding_defaults, getColumns().defaults, context); + stream = std::make_shared( + context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); + } } } - if (columns_intersection.empty()) - LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) - << " has no common columns with block in buffer. Block of data is skipped."); - else - { - streams_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams); - for (auto & stream : streams_from_dst) - stream->addTableLock(destination_lock); - } - - if (struct_mismatch && !streams_from_dst.empty()) - { - /// Add streams to convert read blocks from the destination table. - auto header = getSampleBlock(); - for (auto & stream_from_dst : streams_from_dst) - { - stream_from_dst = std::make_shared( - stream_from_dst, header, getColumns().defaults, context); - stream_from_dst = std::make_shared( - context, stream_from_dst, header, ConvertingBlockInputStream::MatchColumnsMode::Name); - } - } + for (auto & stream : streams_from_dst) + stream->addTableLock(destination_lock); } BlockInputStreams streams_from_buffers;