ClickHouse/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp

157 lines
4.7 KiB
C++
Raw Normal View History

2018-11-28 15:05:53 +00:00
#include <Storages/MergeTree/MergeTreeSequentialBlockInputStream.h>
2018-11-28 17:21:27 +00:00
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
2018-11-28 15:05:53 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
const MergeTreeData & storage_,
const MergeTreeData::DataPartPtr & data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
2018-11-29 11:55:34 +00:00
bool take_column_types_from_storage,
2018-11-28 15:05:53 +00:00
bool quiet)
: storage(storage_)
, data_part(data_part_)
, part_columns_lock(data_part->columns_lock)
, columns_to_read(columns_to_read_)
, read_with_direct_io(read_with_direct_io_)
, mark_cache(storage.global_context.getMarkCache())
2018-11-28 15:05:53 +00:00
{
if (!quiet)
2019-03-02 01:05:36 +00:00
{
std::stringstream message;
2019-03-25 13:55:24 +00:00
message << "Reading " << data_part->getMarksCount() << " marks from part " << data_part->name
2019-03-07 17:58:28 +00:00
<< ", total " << data_part->rows_count
2019-08-13 14:31:46 +00:00
<< " rows starting from the beginning of the part";
2019-03-02 01:05:36 +00:00
LOG_TRACE(log, message.rdbuf());
}
2018-11-28 15:05:53 +00:00
addTotalRowsApprox(data_part->rows_count);
header = storage.getSampleBlockForColumns(columns_to_read);
fixHeader(header);
2018-11-29 15:16:08 +00:00
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
2018-11-29 11:55:34 +00:00
if (take_column_types_from_storage)
{
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
2018-11-29 12:03:58 +00:00
columns_for_reader = physical_columns.addTypes(columns_to_read);
2018-11-29 11:55:34 +00:00
}
2018-11-29 15:16:08 +00:00
else
{
/// take columns from data_part
columns_for_reader = data_part->columns.addTypes(columns_to_read);
}
2018-11-29 09:19:42 +00:00
2018-11-28 15:05:53 +00:00
reader = std::make_unique<MergeTreeReader>(
2018-11-29 11:55:34 +00:00
data_part->getFullPath(), data_part, columns_for_reader, /* uncompressed_cache = */ nullptr,
2018-11-28 15:05:53 +00:00
mark_cache.get(), /* save_marks_in_cache = */ false, storage,
2019-03-25 13:55:24 +00:00
MarkRanges{MarkRange(0, data_part->getMarksCount())},
2018-11-29 09:19:42 +00:00
/* bytes to use AIO (this is hack) */
read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
2018-11-28 15:05:53 +00:00
DBMS_DEFAULT_BUFFER_SIZE);
}
void MergeTreeSequentialBlockInputStream::fixHeader(Block & header_block) const
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
for (const auto & name_type : data_part->columns)
{
if (header_block.has(name_type.name))
{
auto & elem = header_block.getByName(name_type.name);
if (!elem.type->equals(*name_type.type))
{
elem.type = name_type.type;
elem.column = elem.type->createColumn();
}
}
}
}
Block MergeTreeSequentialBlockInputStream::getHeader() const
{
return header;
}
Block MergeTreeSequentialBlockInputStream::readImpl()
try
{
Block res;
if (!isCancelled() && current_row < data_part->rows_count)
{
2019-03-25 13:55:24 +00:00
size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
2018-11-28 15:05:53 +00:00
bool continue_reading = (current_mark != 0);
2019-10-01 16:50:08 +00:00
auto & sample = reader->getColumns();
Columns columns(sample.size());
size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
2018-11-28 15:05:53 +00:00
2019-10-01 16:50:08 +00:00
if (rows_readed)
{
2018-11-29 11:55:34 +00:00
current_row += rows_readed;
2019-03-20 17:04:34 +00:00
current_mark += (rows_to_read == rows_readed);
2018-11-29 09:19:42 +00:00
2019-10-01 16:50:08 +00:00
bool should_evaluate_missing_defaults = false;
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_readed);
2018-11-28 15:05:53 +00:00
2018-11-29 11:55:34 +00:00
if (should_evaluate_missing_defaults)
reader->evaluateMissingDefaults({}, columns);
2018-11-28 15:05:53 +00:00
res = header.cloneEmpty();
2019-10-01 16:50:08 +00:00
/// Reorder columns and fill result block.
size_t num_columns = sample.size();
auto it = sample.begin();
for (size_t i = 0; i < num_columns; ++i)
{
if (header.has(it->name))
header.getByName(it->name).column = std::move(columns[i]);
2019-10-01 16:50:08 +00:00
++it;
}
res.checkNumberOfRows();
2018-11-29 11:55:34 +00:00
}
2018-11-28 15:05:53 +00:00
}
else
{
finish();
}
return res;
}
catch (...)
{
/// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
throw;
}
void MergeTreeSequentialBlockInputStream::finish()
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
* buffers don't waste memory.
*/
reader.reset();
part_columns_lock.unlock();
data_part.reset();
}
MergeTreeSequentialBlockInputStream::~MergeTreeSequentialBlockInputStream() = default;
}