mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 01:00:48 +00:00
CLICKHOUSE-4127: Fix assertion in debug build.
Do reading faster if the structure isn't altered.
This commit is contained in:
parent
c399038e19
commit
7c03b35bc4
@ -1,3 +1,4 @@
|
||||
#include <boost/range/algorithm_ext/erase.hpp>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/InterpreterInsertQuery.h>
|
||||
#include <Interpreters/InterpreterAlterQuery.h>
|
||||
@ -148,56 +149,65 @@ BlockInputStreams StorageBuffer::read(
|
||||
if (destination.get() == this)
|
||||
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
|
||||
|
||||
/// Collect columns from the destination tables which can be requested.
|
||||
/// Find out if there is a struct mismatch and we need to convert read blocks from the destination tables.
|
||||
auto destination_lock = destination->lockStructure(false);
|
||||
|
||||
Names columns_intersection;
|
||||
bool struct_mismatch = false;
|
||||
for (const String & column_name : column_names)
|
||||
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
|
||||
{
|
||||
if (destination->hasColumn(column_name))
|
||||
return destination->hasColumn(column_name) &&
|
||||
destination->getColumn(column_name).type->equals(*getColumn(column_name).type);
|
||||
});
|
||||
|
||||
if (dst_has_same_structure)
|
||||
{
|
||||
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
|
||||
streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// There is a struct mismatch and we need to convert read blocks from the destination table.
|
||||
const Block header = getSampleBlock();
|
||||
Names columns_intersection = column_names;
|
||||
Block header_after_adding_defaults = header;
|
||||
for (const String & column_name : column_names)
|
||||
{
|
||||
columns_intersection.emplace_back(column_name);
|
||||
if (!destination->getColumn(column_name).type->equals(*getColumn(column_name).type))
|
||||
if (!destination->hasColumn(column_name))
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
<< " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used.");
|
||||
boost::range::remove_erase(columns_intersection, column_name);
|
||||
continue;
|
||||
}
|
||||
const auto & dst_col = destination->getColumn(column_name);
|
||||
const auto & col = getColumn(column_name);
|
||||
if (!dst_col.type->equals(*col.type))
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
<< " has different type of column " << backQuoteIfNeed(column_name) << " ("
|
||||
<< destination->getColumn(column_name).type->getName() << " != " << getColumn(column_name).type->getName()
|
||||
<< "). Data from destination table is converted.");
|
||||
struct_mismatch = true;
|
||||
<< dst_col.type->getName() << " != " << col.type->getName() << "). Data from destination table are converted.");
|
||||
header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name);
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_intersection.empty())
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
<< " has no common columns with block in buffer. Block of data is skipped.");
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
<< " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used.");
|
||||
struct_mismatch = true;
|
||||
streams_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
for (auto & stream : streams_from_dst)
|
||||
{
|
||||
stream = std::make_shared<AddingDefaultBlockInputStream>(
|
||||
stream, header_after_adding_defaults, getColumns().defaults, context);
|
||||
stream = std::make_shared<ConvertingBlockInputStream>(
|
||||
context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_intersection.empty())
|
||||
LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
|
||||
<< " has no common columns with block in buffer. Block of data is skipped.");
|
||||
else
|
||||
{
|
||||
streams_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
for (auto & stream : streams_from_dst)
|
||||
stream->addTableLock(destination_lock);
|
||||
}
|
||||
|
||||
if (struct_mismatch && !streams_from_dst.empty())
|
||||
{
|
||||
/// Add streams to convert read blocks from the destination table.
|
||||
auto header = getSampleBlock();
|
||||
for (auto & stream_from_dst : streams_from_dst)
|
||||
{
|
||||
stream_from_dst = std::make_shared<AddingDefaultBlockInputStream>(
|
||||
stream_from_dst, header, getColumns().defaults, context);
|
||||
stream_from_dst = std::make_shared<ConvertingBlockInputStream>(
|
||||
context, stream_from_dst, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
|
||||
}
|
||||
}
|
||||
for (auto & stream : streams_from_dst)
|
||||
stream->addTableLock(destination_lock);
|
||||
}
|
||||
|
||||
BlockInputStreams streams_from_buffers;
|
||||
|
Loading…
Reference in New Issue
Block a user