2019-10-10 16:30:30 +00:00
|
|
|
#include <DataTypes/NestedUtils.h>
|
|
|
|
#include <DataTypes/DataTypeArray.h>
|
|
|
|
#include <Common/escapeForFileName.h>
|
|
|
|
#include <Compression/CachedCompressedReadBuffer.h>
|
|
|
|
#include <Columns/ColumnArray.h>
|
2020-02-25 08:53:14 +00:00
|
|
|
#include <Interpreters/inplaceBlockConversions.h>
|
2019-10-10 16:30:30 +00:00
|
|
|
#include <Storages/MergeTree/IMergeTreeReader.h>
|
|
|
|
#include <Common/typeid_cast.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
|
|
|
using OffsetColumns = std::map<std::string, ColumnPtr>;
|
|
|
|
}
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2020-06-17 16:39:58 +00:00
|
|
|
IMergeTreeReader::IMergeTreeReader(
|
|
|
|
const MergeTreeData::DataPartPtr & data_part_,
|
|
|
|
const NamesAndTypesList & columns_,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
|
|
|
UncompressedCache * uncompressed_cache_,
|
|
|
|
MarkCache * mark_cache_,
|
|
|
|
const MarkRanges & all_mark_ranges_,
|
|
|
|
const MergeTreeReaderSettings & settings_,
|
2019-10-10 16:30:30 +00:00
|
|
|
const ValueSizeMap & avg_value_size_hints_)
|
2020-06-17 16:39:58 +00:00
|
|
|
: data_part(data_part_)
|
|
|
|
, avg_value_size_hints(avg_value_size_hints_)
|
|
|
|
, columns(columns_)
|
2021-07-15 03:12:37 +00:00
|
|
|
, part_columns(data_part->getColumns())
|
2020-06-17 16:39:58 +00:00
|
|
|
, uncompressed_cache(uncompressed_cache_)
|
|
|
|
, mark_cache(mark_cache_)
|
|
|
|
, settings(settings_)
|
|
|
|
, storage(data_part_->storage)
|
|
|
|
, metadata_snapshot(metadata_snapshot_)
|
2019-10-10 16:30:30 +00:00
|
|
|
, all_mark_ranges(all_mark_ranges_)
|
2020-03-25 18:44:08 +00:00
|
|
|
, alter_conversions(storage.getAlterConversionsForPart(data_part))
|
2019-10-10 16:30:30 +00:00
|
|
|
{
|
2021-11-02 03:03:52 +00:00
|
|
|
if (isWidePart(data_part))
|
2020-12-18 12:27:15 +00:00
|
|
|
{
|
2021-11-02 03:03:52 +00:00
|
|
|
/// For wide parts convert plain arrays of Nested to subcolumns
|
|
|
|
/// to allow to use shared offset column from cache.
|
2020-12-18 12:27:15 +00:00
|
|
|
columns = Nested::convertToSubcolumns(columns);
|
|
|
|
part_columns = Nested::collect(part_columns);
|
|
|
|
}
|
|
|
|
|
2021-07-15 03:12:37 +00:00
|
|
|
for (const auto & column_from_part : part_columns)
|
2021-08-04 10:06:01 +00:00
|
|
|
columns_from_part[column_from_part.name] = &column_from_part.type;
|
2019-10-10 16:30:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
IMergeTreeReader::~IMergeTreeReader() = default;
|
|
|
|
|
|
|
|
|
|
|
|
const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints() const
|
|
|
|
{
|
|
|
|
return avg_value_size_hints;
|
|
|
|
}
|
|
|
|
|
2022-02-09 00:18:53 +00:00
|
|
|
void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows) const
|
2019-10-10 16:30:30 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2022-02-09 00:18:53 +00:00
|
|
|
DB::fillMissingColumns(res_columns, num_rows, columns, metadata_snapshot);
|
|
|
|
should_evaluate_missing_defaults = std::any_of(
|
|
|
|
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
|
2019-10-10 16:30:30 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
2022-04-12 18:59:49 +00:00
|
|
|
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
|
2019-10-10 16:30:30 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-09 00:18:53 +00:00
|
|
|
void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns & res_columns) const
|
2019-10-10 16:30:30 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2019-12-19 11:46:43 +00:00
|
|
|
size_t num_columns = columns.size();
|
2019-10-10 16:30:30 +00:00
|
|
|
|
2019-12-19 11:46:43 +00:00
|
|
|
if (res_columns.size() != num_columns)
|
|
|
|
throw Exception("invalid number of columns passed to MergeTreeReader::fillMissingColumns. "
|
|
|
|
"Expected " + toString(num_columns) + ", "
|
|
|
|
"got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
|
2019-10-10 16:30:30 +00:00
|
|
|
|
2019-12-19 11:46:43 +00:00
|
|
|
/// Convert columns list to block.
|
2020-08-08 00:47:03 +00:00
|
|
|
/// TODO: rewrite with columns interface. It will be possible after changes in ExpressionActions.
|
2019-12-19 11:46:43 +00:00
|
|
|
auto name_and_type = columns.begin();
|
|
|
|
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
|
|
|
{
|
|
|
|
if (res_columns[pos] == nullptr)
|
|
|
|
continue;
|
2019-10-10 16:30:30 +00:00
|
|
|
|
2019-12-19 11:46:43 +00:00
|
|
|
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
|
|
|
}
|
2019-10-10 16:30:30 +00:00
|
|
|
|
2021-02-05 15:11:26 +00:00
|
|
|
auto dag = DB::evaluateMissingDefaults(
|
2021-04-10 23:33:54 +00:00
|
|
|
additional_columns, columns, metadata_snapshot->getColumns(), storage.getContext());
|
2021-02-04 20:36:50 +00:00
|
|
|
if (dag)
|
|
|
|
{
|
2022-02-14 19:05:30 +00:00
|
|
|
dag->addMaterializingOutputActions();
|
2021-03-04 17:38:12 +00:00
|
|
|
auto actions = std::make_shared<
|
|
|
|
ExpressionActions>(std::move(dag),
|
2021-04-10 23:33:54 +00:00
|
|
|
ExpressionActionsSettings::fromSettings(storage.getContext()->getSettingsRef()));
|
2021-02-04 20:36:50 +00:00
|
|
|
actions->execute(additional_columns);
|
|
|
|
}
|
2019-12-19 11:46:43 +00:00
|
|
|
|
|
|
|
/// Move columns from block.
|
|
|
|
name_and_type = columns.begin();
|
|
|
|
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
|
|
|
res_columns[pos] = std::move(additional_columns.getByName(name_and_type->name).column);
|
2019-10-10 16:30:30 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
2022-04-12 18:59:49 +00:00
|
|
|
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
|
2019-10-10 16:30:30 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-08 16:20:52 +00:00
|
|
|
NameAndTypePair IMergeTreeReader::getColumnFromPart(const NameAndTypePair & required_column) const
|
|
|
|
{
|
2020-12-22 15:03:48 +00:00
|
|
|
auto name_in_storage = required_column.getNameInStorage();
|
2020-11-27 11:00:33 +00:00
|
|
|
|
2021-08-15 10:32:56 +00:00
|
|
|
ColumnsFromPart::ConstLookupResult it;
|
2020-11-27 11:00:33 +00:00
|
|
|
if (alter_conversions.isColumnRenamed(name_in_storage))
|
|
|
|
{
|
|
|
|
String old_name = alter_conversions.getColumnOldName(name_in_storage);
|
|
|
|
it = columns_from_part.find(old_name);
|
|
|
|
}
|
|
|
|
else
|
2020-04-08 16:20:52 +00:00
|
|
|
{
|
2020-11-27 11:00:33 +00:00
|
|
|
it = columns_from_part.find(name_in_storage);
|
2020-04-08 16:20:52 +00:00
|
|
|
}
|
2020-11-27 11:00:33 +00:00
|
|
|
|
|
|
|
if (it == columns_from_part.end())
|
|
|
|
return required_column;
|
|
|
|
|
2021-08-15 10:32:56 +00:00
|
|
|
const DataTypePtr & type = *it->getMapped();
|
2020-11-27 11:00:33 +00:00
|
|
|
if (required_column.isSubcolumn())
|
2020-05-14 16:56:13 +00:00
|
|
|
{
|
2020-11-27 11:00:33 +00:00
|
|
|
auto subcolumn_name = required_column.getSubcolumnName();
|
2021-07-15 03:12:37 +00:00
|
|
|
auto subcolumn_type = type->tryGetSubcolumnType(subcolumn_name);
|
2021-03-09 14:46:52 +00:00
|
|
|
|
2020-12-18 20:09:34 +00:00
|
|
|
if (!subcolumn_type)
|
2022-03-01 17:20:53 +00:00
|
|
|
return required_column;
|
2020-12-18 20:09:34 +00:00
|
|
|
|
2021-08-15 10:32:56 +00:00
|
|
|
return {String(it->getKey()), subcolumn_name, type, subcolumn_type};
|
2020-05-14 16:56:13 +00:00
|
|
|
}
|
2020-04-08 16:20:52 +00:00
|
|
|
|
2021-08-15 10:32:56 +00:00
|
|
|
return {String(it->getKey()), type};
|
2020-04-08 16:20:52 +00:00
|
|
|
}
|
|
|
|
|
2022-02-09 00:18:53 +00:00
|
|
|
void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
|
2020-02-25 08:53:14 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
size_t num_columns = columns.size();
|
|
|
|
|
|
|
|
if (res_columns.size() != num_columns)
|
|
|
|
{
|
|
|
|
throw Exception(
|
|
|
|
"Invalid number of columns passed to MergeTreeReader::performRequiredConversions. "
|
|
|
|
"Expected "
|
|
|
|
+ toString(num_columns)
|
|
|
|
+ ", "
|
|
|
|
"got "
|
|
|
|
+ toString(res_columns.size()),
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
Block copy_block;
|
|
|
|
auto name_and_type = columns.begin();
|
|
|
|
|
|
|
|
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
|
|
|
{
|
|
|
|
if (res_columns[pos] == nullptr)
|
|
|
|
continue;
|
|
|
|
|
2020-04-08 16:20:52 +00:00
|
|
|
copy_block.insert({res_columns[pos], getColumnFromPart(*name_and_type).type, name_and_type->name});
|
2020-02-25 08:53:14 +00:00
|
|
|
}
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
DB::performRequiredConversions(copy_block, columns, storage.getContext());
|
2020-02-25 08:53:14 +00:00
|
|
|
|
|
|
|
/// Move columns from block.
|
|
|
|
name_and_type = columns.begin();
|
|
|
|
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
|
|
|
{
|
|
|
|
res_columns[pos] = std::move(copy_block.getByName(name_and_type->name).column);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
2022-04-12 18:59:49 +00:00
|
|
|
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + ")");
|
2020-02-25 08:53:14 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-01 17:52:09 +00:00
|
|
|
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const String & column_name) const
|
|
|
|
{
|
|
|
|
String table_name = Nested::extractTableName(column_name);
|
|
|
|
for (const auto & part_column : data_part->getColumns())
|
|
|
|
{
|
|
|
|
if (typeid_cast<const DataTypeArray *>(part_column.type.get()))
|
|
|
|
{
|
2021-11-02 03:03:52 +00:00
|
|
|
auto position = data_part->getColumnPosition(part_column.getNameInStorage());
|
2020-06-01 17:52:09 +00:00
|
|
|
if (position && Nested::extractTableName(part_column.name) == table_name)
|
|
|
|
return position;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return {};
|
|
|
|
}
|
|
|
|
|
|
|
|
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
|
2020-04-14 19:47:19 +00:00
|
|
|
{
|
|
|
|
if (num_columns_to_read != columns.size())
|
|
|
|
throw Exception("invalid number of columns passed to MergeTreeReader::readRows. "
|
|
|
|
"Expected " + toString(columns.size()) + ", "
|
|
|
|
"got " + toString(num_columns_to_read), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
2019-10-10 16:30:30 +00:00
|
|
|
}
|