2020-04-02 16:28:50 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
|
2018-11-28 17:21:27 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
|
2020-05-20 20:16:32 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2018-11-28 15:05:53 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int MEMORY_LIMIT_EXCEEDED;
|
|
|
|
}
|
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
MergeTreeSequentialSource::MergeTreeSequentialSource(
|
2018-11-28 15:05:53 +00:00
|
|
|
const MergeTreeData & storage_,
|
2020-06-16 14:25:08 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2020-04-02 16:28:50 +00:00
|
|
|
MergeTreeData::DataPartPtr data_part_,
|
2018-11-28 15:05:53 +00:00
|
|
|
Names columns_to_read_,
|
|
|
|
bool read_with_direct_io_,
|
2018-11-29 11:55:34 +00:00
|
|
|
bool take_column_types_from_storage,
|
2018-11-28 15:05:53 +00:00
|
|
|
bool quiet)
|
2020-06-19 17:17:13 +00:00
|
|
|
: SourceWithProgress(metadata_snapshot_->getSampleBlockForColumns(columns_to_read_, storage_.getVirtuals(), storage_.getStorageID()))
|
2020-04-02 16:28:50 +00:00
|
|
|
, storage(storage_)
|
2020-06-16 14:25:08 +00:00
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
2020-04-02 16:28:50 +00:00
|
|
|
, data_part(std::move(data_part_))
|
|
|
|
, columns_to_read(std::move(columns_to_read_))
|
2018-11-28 15:05:53 +00:00
|
|
|
, read_with_direct_io(read_with_direct_io_)
|
2021-04-10 23:33:54 +00:00
|
|
|
, mark_cache(storage.getContext()->getMarkCache())
|
2018-11-28 15:05:53 +00:00
|
|
|
{
|
|
|
|
if (!quiet)
|
2019-03-02 01:05:36 +00:00
|
|
|
{
|
2020-05-23 21:50:34 +00:00
|
|
|
/// Print column name but don't pollute logs in case of many columns.
|
|
|
|
if (columns_to_read.size() == 1)
|
2021-04-15 18:00:16 +00:00
|
|
|
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
|
2020-05-23 21:50:34 +00:00
|
|
|
data_part->getMarksCount(), data_part->name, data_part->rows_count, columns_to_read.front());
|
|
|
|
else
|
2021-04-15 18:00:16 +00:00
|
|
|
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
|
2020-05-23 21:50:34 +00:00
|
|
|
data_part->getMarksCount(), data_part->name, data_part->rows_count);
|
2019-03-02 01:05:36 +00:00
|
|
|
}
|
2018-11-28 15:05:53 +00:00
|
|
|
|
2021-12-09 10:39:28 +00:00
|
|
|
/// Note, that we don't check setting collaborate_with_coordinator presence, because this source
|
|
|
|
/// is only used in background merges.
|
2018-11-28 15:05:53 +00:00
|
|
|
addTotalRowsApprox(data_part->rows_count);
|
|
|
|
|
2018-11-29 15:16:08 +00:00
|
|
|
/// Add columns because we don't want to read empty blocks
|
2020-06-17 16:39:58 +00:00
|
|
|
injectRequiredColumns(storage, metadata_snapshot, data_part, columns_to_read);
|
2018-11-29 15:16:08 +00:00
|
|
|
NamesAndTypesList columns_for_reader;
|
2018-11-29 11:55:34 +00:00
|
|
|
if (take_column_types_from_storage)
|
|
|
|
{
|
2021-07-15 03:12:37 +00:00
|
|
|
columns_for_reader = metadata_snapshot->getColumns().getByNames(ColumnsDescription::AllPhysical, columns_to_read, false);
|
2018-11-29 11:55:34 +00:00
|
|
|
}
|
2018-11-29 15:16:08 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
/// take columns from data_part
|
2020-01-16 16:15:01 +00:00
|
|
|
columns_for_reader = data_part->getColumns().addTypes(columns_to_read);
|
2018-11-29 15:16:08 +00:00
|
|
|
}
|
2018-11-29 09:19:42 +00:00
|
|
|
|
2021-08-24 22:07:06 +00:00
|
|
|
ReadSettings read_settings;
|
|
|
|
if (read_with_direct_io)
|
|
|
|
read_settings.direct_io_threshold = 1;
|
|
|
|
|
2019-12-18 15:54:45 +00:00
|
|
|
MergeTreeReaderSettings reader_settings =
|
2019-10-10 16:30:30 +00:00
|
|
|
{
|
2021-08-24 22:07:06 +00:00
|
|
|
.read_settings = read_settings,
|
2019-10-10 16:30:30 +00:00
|
|
|
.save_marks_in_cache = false
|
|
|
|
};
|
|
|
|
|
2020-06-17 16:39:58 +00:00
|
|
|
reader = data_part->getReader(columns_for_reader, metadata_snapshot,
|
2019-03-25 13:55:24 +00:00
|
|
|
MarkRanges{MarkRange(0, data_part->getMarksCount())},
|
2019-10-10 16:30:30 +00:00
|
|
|
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
|
2018-11-28 15:05:53 +00:00
|
|
|
}
|
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
Chunk MergeTreeSequentialSource::generate()
|
2018-11-28 15:05:53 +00:00
|
|
|
try
|
|
|
|
{
|
2020-04-22 06:34:20 +00:00
|
|
|
const auto & header = getPort().getHeader();
|
2020-04-02 16:28:50 +00:00
|
|
|
|
2018-11-28 15:05:53 +00:00
|
|
|
if (!isCancelled() && current_row < data_part->rows_count)
|
|
|
|
{
|
2019-03-25 13:55:24 +00:00
|
|
|
size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
|
2018-11-28 15:05:53 +00:00
|
|
|
bool continue_reading = (current_mark != 0);
|
|
|
|
|
2020-04-22 06:34:20 +00:00
|
|
|
const auto & sample = reader->getColumns();
|
2019-10-01 16:50:08 +00:00
|
|
|
Columns columns(sample.size());
|
2021-10-15 08:36:26 +00:00
|
|
|
/// TODO: pass stream size instead of zero?
|
|
|
|
size_t rows_read = reader->readRows(current_mark, 0, continue_reading, rows_to_read, columns);
|
2018-11-28 15:05:53 +00:00
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
if (rows_read)
|
2019-10-01 16:50:08 +00:00
|
|
|
{
|
2020-04-02 16:28:50 +00:00
|
|
|
current_row += rows_read;
|
|
|
|
current_mark += (rows_to_read == rows_read);
|
2018-11-29 09:19:42 +00:00
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
bool should_evaluate_missing_defaults = false;
|
2020-04-02 16:28:50 +00:00
|
|
|
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
|
2018-11-28 15:05:53 +00:00
|
|
|
|
2018-11-29 11:55:34 +00:00
|
|
|
if (should_evaluate_missing_defaults)
|
2020-01-16 14:18:09 +00:00
|
|
|
{
|
2019-10-02 11:57:17 +00:00
|
|
|
reader->evaluateMissingDefaults({}, columns);
|
2020-01-16 14:18:09 +00:00
|
|
|
}
|
2018-11-28 15:05:53 +00:00
|
|
|
|
2020-01-15 13:00:08 +00:00
|
|
|
reader->performRequiredConversions(columns);
|
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
/// Reorder columns and fill result block.
|
|
|
|
size_t num_columns = sample.size();
|
2020-04-02 16:28:50 +00:00
|
|
|
Columns res_columns;
|
|
|
|
res_columns.reserve(num_columns);
|
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
auto it = sample.begin();
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
{
|
2020-04-02 16:28:50 +00:00
|
|
|
if (header.has(it->name))
|
|
|
|
res_columns.emplace_back(std::move(columns[i]));
|
2019-10-10 11:20:25 +00:00
|
|
|
|
2019-10-01 16:50:08 +00:00
|
|
|
++it;
|
|
|
|
}
|
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
return Chunk(std::move(res_columns), rows_read);
|
2018-11-29 11:55:34 +00:00
|
|
|
}
|
2018-11-28 15:05:53 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
finish();
|
|
|
|
}
|
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
return {};
|
2018-11-28 15:05:53 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
/// Suspicion of the broken part. A part is added to the queue for verification.
|
|
|
|
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
void MergeTreeSequentialSource::finish()
|
2018-11-28 15:05:53 +00:00
|
|
|
{
|
|
|
|
/** Close the files (before destroying the object).
|
|
|
|
* When many sources are created, but simultaneously reading only a few of them,
|
|
|
|
* buffers don't waste memory.
|
|
|
|
*/
|
|
|
|
reader.reset();
|
|
|
|
data_part.reset();
|
|
|
|
}
|
|
|
|
|
2020-04-02 16:28:50 +00:00
|
|
|
MergeTreeSequentialSource::~MergeTreeSequentialSource() = default;
|
2018-11-28 15:05:53 +00:00
|
|
|
|
|
|
|
}
|