This commit is contained in:
Andrey Mironov 2014-11-20 17:41:11 +03:00
parent 83e0c58cba
commit d4d3663cd4
2 changed files with 41 additions and 0 deletions

View File

@ -194,6 +194,28 @@ protected:
progressImpl(Progress(res.rows(), res.bytes()));
pre_reader->fillMissingColumns(res);
/** The column possibly added by pre_reader can be scheduled to be read by reader,
* thus resulting in duplicate read of the same column. To avoid such possibility
* we are removing such column from reader's column list */
if (const auto added_column = pre_reader->getAddedColumn())
{
if (column_name_set.count(added_column->name))
{
for (auto it = columns.begin(); it != columns.end(); ++it)
{
if (it->name == added_column->name)
{
columns.erase(it);
break;
}
}
reader->removeColumn(added_column->name);
column_name_set.erase(added_column->name);
pre_columns.emplace_back(*added_column);
}
}
/// Вычислим выражение в PREWHERE.
prewhere_actions->execute(res);

View File

@ -58,6 +58,22 @@ public:
}
}
const NameAndTypePair * getAddedColumn() const { return added_column; }
void removeColumn(const String & column_name)
{
for (auto it = columns.begin(); it != columns.end(); ++it)
{
if (it->name == column_name)
{
columns.erase(it);
break;
}
}
streams.erase(column_name);
}
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
* Не добавляет столбцы, для которых нет файлов. Чтобы их добавить, нужно вызвать fillMissingColumns.
* В блоке должно быть либо ни одного столбца из columns, либо все, для которых есть файлы. */
@ -176,6 +192,8 @@ public:
addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges);
columns.emplace(std::begin(columns), *minimum_size_column);
added_column = &columns.front();
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
@ -366,6 +384,7 @@ private:
bool use_uncompressed_cache;
MergeTreeData & storage;
const MarkRanges & all_mark_ranges;
const NameAndTypePair * added_column = nullptr;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
{