Revert "dbms: allow all requested columns to be missing [#METR-13020]"

This reverts commit e09872e3621f24a9a839ce377e4a317dc2ca1788.
This commit is contained in:
Andrey Mironov 2014-11-25 21:46:58 +03:00
parent c28bb54ed4
commit eefcb3e369
2 changed files with 10 additions and 57 deletions

View File

@ -165,9 +165,9 @@ protected:
injectRequiredColumns(pre_columns);
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
reader.reset(new MergeTreeReader(path, owned_data_part, columns, uncompressed_cache, storage, all_mark_ranges));
reader.reset(new MergeTreeReader(path, owned_data_part->name, columns, uncompressed_cache, storage, all_mark_ranges));
if (prewhere_actions)
pre_reader.reset(new MergeTreeReader(path, owned_data_part, pre_columns, uncompressed_cache, storage,
pre_reader.reset(new MergeTreeReader(path, owned_data_part->name, pre_columns, uncompressed_cache, storage,
all_mark_ranges));
}

View File

@ -38,10 +38,9 @@ class MergeTreeReader
typedef std::map<std::string, ColumnPtr> OffsetColumns;
public:
MergeTreeReader(const String & path_, const MergeTreeData::DataPartPtr & data_part, /// Путь к куску
MergeTreeReader(const String & path_, const String & part_name_, /// Путь к куску
const NamesAndTypesList & columns_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges)
: path(path_), data_part(data_part), part_name(data_part->name), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_),
all_mark_ranges(all_mark_ranges)
: path(path_), part_name(part_name_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_)
{
try
{
@ -70,7 +69,7 @@ public:
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
*/
auto all_columns_missing = true;
bool has_missing_columns = false;
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
/// Если append, все значения nullptr, и offset_columns используется только для проверки, что столбец смещений уже прочитан.
@ -79,9 +78,10 @@ public:
for (const NameAndTypePair & it : columns)
{
if (streams.end() == streams.find(it.name))
{
has_missing_columns = true;
continue;
all_columns_missing = false;
}
/// Все столбцы уже есть в блоке. Будем добавлять значения в конец.
bool append = res.has(it.name);
@ -116,8 +116,8 @@ public:
res.insert(column);
}
if (all_columns_missing)
addMinimumSizeColumn(from_mark, max_rows_to_read, res);
if (has_missing_columns && !res)
throw Exception("All requested columns are missing", ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING);
}
catch (const Exception & e)
{
@ -136,51 +136,6 @@ public:
}
}
void addMinimumSizeColumn(const size_t from_mark, const size_t max_rows_to_read, Block & res)
{
const auto get_column_size = [this] (const String & name) {
const auto & files = data_part->checksums.files;
const auto escaped_name = escapeForFileName(name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
};
const auto & storage_columns = storage.getColumnsList();
const NameAndTypePair * minimum_size_column = nullptr;
auto minimum_size = std::numeric_limits<size_t>::max();
for (const auto & column : storage_columns)
{
if (!data_part->hasColumnFiles(column.name))
continue;
const auto size = get_column_size(column.name);
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = &column;
}
}
if (!minimum_size_column)
throw std::logic_error{"could not find a column of minimum size in MergeTree"};
ColumnWithNameAndType column{
minimum_size_column->type->createColumn(),
minimum_size_column->type,
minimum_size_column->name
};
addStream(column.name, *column.type, all_mark_ranges);
columns.emplace(std::begin(columns), *minimum_size_column);
res.insert(column);
readData(column.name, *column.type, *column.column, from_mark, max_rows_to_read, 0, true);
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
void fillMissingColumns(Block & res)
{
@ -362,13 +317,11 @@ private:
typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams;
String path;
const MergeTreeData::DataPartPtr & data_part;
String part_name;
FileStreams streams;
NamesAndTypesList columns;
bool use_uncompressed_cache;
MergeTreeData & storage;
const MarkRanges & all_mark_ranges;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
{