2013-11-26 11:55:11 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2015-04-16 06:12:35 +00:00
|
|
|
|
#include <DB/Storages/MarkCache.h>
|
2014-03-09 17:36:01 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeData.h>
|
2013-11-26 11:55:11 +00:00
|
|
|
|
#include <DB/DataTypes/IDataType.h>
|
|
|
|
|
#include <DB/DataTypes/DataTypeNested.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/DataTypes/DataTypeArray.h>
|
2013-11-26 11:55:11 +00:00
|
|
|
|
#include <DB/Core/NamesAndTypes.h>
|
|
|
|
|
#include <DB/Common/escapeForFileName.h>
|
|
|
|
|
#include <DB/IO/CachedCompressedReadBuffer.h>
|
2014-01-15 14:53:20 +00:00
|
|
|
|
#include <DB/IO/CompressedReadBufferFromFile.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/Columns/ColumnArray.h>
|
|
|
|
|
#include <DB/Columns/ColumnNested.h>
|
2014-10-23 13:53:16 +00:00
|
|
|
|
#include <DB/Interpreters/evaluateMissingDefaults.h>
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
/** Пара засечек, определяющая диапазон строк в куске. Именно, диапазон имеет вид [begin * index_granularity, end * index_granularity).
|
|
|
|
|
*/
|
|
|
|
|
struct MarkRange
|
|
|
|
|
{
|
|
|
|
|
size_t begin;
|
|
|
|
|
size_t end;
|
|
|
|
|
|
|
|
|
|
MarkRange() {}
|
|
|
|
|
MarkRange(size_t begin_, size_t end_) : begin(begin_), end(end_) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef std::vector<MarkRange> MarkRanges;
|
|
|
|
|
|
|
|
|
|
|
2013-11-26 11:55:11 +00:00
|
|
|
|
/** Умеет читать данные между парой засечек из одного куска. При чтении последовательных отрезков не делает лишних seek-ов.
|
|
|
|
|
* При чтении почти последовательных отрезков делает seek-и быстро, не выбрасывая содержимое буфера.
|
|
|
|
|
*/
|
|
|
|
|
class MergeTreeReader
|
|
|
|
|
{
|
2014-03-24 16:10:14 +00:00
|
|
|
|
typedef std::map<std::string, ColumnPtr> OffsetColumns;
|
|
|
|
|
|
2013-11-26 11:55:11 +00:00
|
|
|
|
public:
|
2014-12-14 05:27:39 +00:00
|
|
|
|
MergeTreeReader(const String & path_, /// Путь к куску
|
|
|
|
|
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns_,
|
2015-04-16 06:12:35 +00:00
|
|
|
|
UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_,
|
|
|
|
|
MergeTreeData & storage_, const MarkRanges & all_mark_ranges,
|
2015-04-12 04:39:20 +00:00
|
|
|
|
size_t aio_threshold_, size_t max_read_buffer_size_)
|
2014-12-14 05:27:39 +00:00
|
|
|
|
: path(path_), data_part(data_part), part_name(data_part->name), columns(columns_),
|
2015-04-16 06:12:35 +00:00
|
|
|
|
uncompressed_cache(uncompressed_cache_), mark_cache(mark_cache_),
|
|
|
|
|
storage(storage_), all_mark_ranges(all_mark_ranges),
|
2015-04-12 04:39:20 +00:00
|
|
|
|
aio_threshold(aio_threshold_), max_read_buffer_size(max_read_buffer_size_)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
2014-07-23 09:15:41 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2014-07-23 15:24:45 +00:00
|
|
|
|
if (!Poco::File(path).exists())
|
|
|
|
|
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
|
|
|
|
|
|
2014-07-23 09:15:41 +00:00
|
|
|
|
for (const NameAndTypePair & column : columns)
|
|
|
|
|
addStream(column.name, *column.type, all_mark_ranges);
|
|
|
|
|
}
|
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
storage.reportBrokenPart(part_name);
|
|
|
|
|
throw;
|
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец.
|
|
|
|
|
* Не добавляет столбцы, для которых нет файлов. Чтобы их добавить, нужно вызвать fillMissingColumns.
|
2015-04-02 03:08:43 +00:00
|
|
|
|
* В блоке должно быть либо ни одного столбца из columns, либо все, для которых есть файлы.
|
|
|
|
|
*/
|
2013-11-26 11:55:11 +00:00
|
|
|
|
void readRange(size_t from_mark, size_t to_mark, Block & res)
|
|
|
|
|
{
|
2014-07-01 08:50:30 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
/** Для некоторых столбцов файлы с данными могут отсутствовать.
|
|
|
|
|
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
|
|
|
|
|
*/
|
2014-11-25 20:55:36 +00:00
|
|
|
|
auto has_missing_columns = false;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
|
|
|
|
|
/// Если append, все значения nullptr, и offset_columns используется только для проверки, что столбец смещений уже прочитан.
|
|
|
|
|
OffsetColumns offset_columns;
|
2014-11-25 20:55:34 +00:00
|
|
|
|
const auto read_column = [&] (const NameAndTypePair & it) {
|
2014-07-17 13:41:47 +00:00
|
|
|
|
if (streams.end() == streams.find(it.name))
|
2014-11-25 20:55:36 +00:00
|
|
|
|
{
|
|
|
|
|
has_missing_columns = true;
|
2014-11-25 20:55:34 +00:00
|
|
|
|
return;
|
2014-11-25 20:55:36 +00:00
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
/// Все столбцы уже есть в блоке. Будем добавлять значения в конец.
|
2014-07-17 13:41:47 +00:00
|
|
|
|
bool append = res.has(it.name);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
ColumnWithNameAndType column;
|
2014-07-17 13:41:47 +00:00
|
|
|
|
column.name = it.name;
|
|
|
|
|
column.type = it.type;
|
2014-07-01 08:50:30 +00:00
|
|
|
|
if (append)
|
|
|
|
|
column.column = res.getByName(column.name).column;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
bool read_offsets = true;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
/// Для вложенных структур запоминаем указатели на столбцы со смещениями
|
|
|
|
|
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&*column.type))
|
|
|
|
|
{
|
|
|
|
|
String name = DataTypeNested::extractNestedTableName(column.name);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
if (offset_columns.count(name) == 0)
|
2014-11-25 20:55:34 +00:00
|
|
|
|
offset_columns[name] = append ? nullptr : new ColumnArray::ColumnOffsets_t;
|
2014-07-01 08:50:30 +00:00
|
|
|
|
else
|
|
|
|
|
read_offsets = false; /// на предыдущих итерациях смещения уже считали вызовом readData
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
if (!append)
|
|
|
|
|
column.column = new ColumnArray(type_arr->getNestedType()->createColumn(), offset_columns[name]);
|
|
|
|
|
}
|
|
|
|
|
else if (!append)
|
|
|
|
|
column.column = column.type->createColumn();
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
|
|
|
|
readData(column.name, *column.type, *column.column, from_mark, max_rows_to_read, 0, read_offsets);
|
2014-07-01 08:50:30 +00:00
|
|
|
|
|
|
|
|
|
if (!append && column.column->size())
|
|
|
|
|
res.insert(column);
|
2014-11-25 20:55:34 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
for (const NameAndTypePair & it : columns)
|
|
|
|
|
read_column(it);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-11-25 20:55:36 +00:00
|
|
|
|
if (has_missing_columns && !res)
|
2014-11-25 20:55:34 +00:00
|
|
|
|
{
|
|
|
|
|
addMinimumSizeColumn();
|
|
|
|
|
/// minimum size column is necessarily at list's front
|
|
|
|
|
read_column(columns.front());
|
|
|
|
|
}
|
2014-07-01 08:50:30 +00:00
|
|
|
|
}
|
|
|
|
|
catch (const Exception & e)
|
|
|
|
|
{
|
2015-04-02 03:08:43 +00:00
|
|
|
|
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
2015-03-18 19:17:05 +00:00
|
|
|
|
{
|
2014-07-23 09:15:41 +00:00
|
|
|
|
storage.reportBrokenPart(part_name);
|
2015-03-18 19:17:05 +00:00
|
|
|
|
}
|
2014-07-23 09:15:41 +00:00
|
|
|
|
|
2014-07-01 08:50:30 +00:00
|
|
|
|
/// Более хорошая диагностика.
|
2014-10-21 12:11:20 +00:00
|
|
|
|
throw Exception(e.message() + "\n(while reading from part " + path + " from mark " + toString(from_mark) + " to "
|
2014-07-01 08:50:30 +00:00
|
|
|
|
+ toString(to_mark) + ")", e.code());
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
2014-07-23 09:15:41 +00:00
|
|
|
|
catch (...)
|
|
|
|
|
{
|
|
|
|
|
storage.reportBrokenPart(part_name);
|
|
|
|
|
|
|
|
|
|
throw;
|
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
|
|
|
|
/** Добавить столбец минимального размера.
|
|
|
|
|
* Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
|
|
|
|
|
* Добавляет в columns.
|
|
|
|
|
*/
|
2014-11-25 20:55:34 +00:00
|
|
|
|
void addMinimumSizeColumn()
|
2014-11-25 20:55:23 +00:00
|
|
|
|
{
|
|
|
|
|
const auto get_column_size = [this] (const String & name) {
|
|
|
|
|
const auto & files = data_part->checksums.files;
|
|
|
|
|
|
|
|
|
|
const auto escaped_name = escapeForFileName(name);
|
|
|
|
|
const auto bin_file_name = escaped_name + ".bin";
|
|
|
|
|
const auto mrk_file_name = escaped_name + ".mrk";
|
|
|
|
|
|
|
|
|
|
return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const auto & storage_columns = storage.getColumnsList();
|
|
|
|
|
const NameAndTypePair * minimum_size_column = nullptr;
|
|
|
|
|
auto minimum_size = std::numeric_limits<size_t>::max();
|
|
|
|
|
|
|
|
|
|
for (const auto & column : storage_columns)
|
|
|
|
|
{
|
|
|
|
|
if (!data_part->hasColumnFiles(column.name))
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
const auto size = get_column_size(column.name);
|
|
|
|
|
if (size < minimum_size)
|
|
|
|
|
{
|
|
|
|
|
minimum_size = size;
|
|
|
|
|
minimum_size_column = &column;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!minimum_size_column)
|
2015-02-16 17:01:38 +00:00
|
|
|
|
throw Exception{
|
|
|
|
|
"could not find a column of minimum size in MergeTree",
|
|
|
|
|
ErrorCodes::LOGICAL_ERROR
|
|
|
|
|
};
|
2014-11-25 20:55:23 +00:00
|
|
|
|
|
2014-11-25 20:55:34 +00:00
|
|
|
|
addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges);
|
2014-11-25 20:55:23 +00:00
|
|
|
|
columns.emplace(std::begin(columns), *minimum_size_column);
|
2014-11-25 20:55:36 +00:00
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
added_minimum_size_column = &columns.front();
|
2014-11-25 20:55:23 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-04-02 03:08:43 +00:00
|
|
|
|
|
|
|
|
|
/** Добавляет в блок недостающие столбцы из ordered_names, состоящие из значений по-умолчанию.
|
|
|
|
|
* Недостающие столбцы добавляются в позиции, такие же как в ordered_names.
|
|
|
|
|
* Если был добавлен хотя бы один столбец - то все столбцы в блоке переупорядочиваются как в ordered_names.
|
|
|
|
|
*/
|
2015-02-16 17:01:38 +00:00
|
|
|
|
void fillMissingColumns(Block & res, const Names & ordered_names)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
2015-04-02 03:08:43 +00:00
|
|
|
|
fillMissingColumnsImpl(res, ordered_names, false);
|
|
|
|
|
}
|
2014-12-04 15:50:48 +00:00
|
|
|
|
|
2015-04-02 03:08:43 +00:00
|
|
|
|
/** То же самое, но всегда переупорядочивает столбцы в блоке, как в ordered_names
|
|
|
|
|
* (даже если не было недостающих столбцов).
|
|
|
|
|
*/
|
|
|
|
|
void fillMissingColumnsAndReorder(Block & res, const Names & ordered_names)
|
|
|
|
|
{
|
|
|
|
|
fillMissingColumnsImpl(res, ordered_names, true);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
struct Stream
|
|
|
|
|
{
|
2014-02-11 13:30:42 +00:00
|
|
|
|
MarkCache::MappedPtr marks;
|
2014-01-15 14:53:20 +00:00
|
|
|
|
ReadBuffer * data_buffer;
|
|
|
|
|
Poco::SharedPtr<CachedCompressedReadBuffer> cached_buffer;
|
|
|
|
|
Poco::SharedPtr<CompressedReadBufferFromFile> non_cached_buffer;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
std::string path_prefix;
|
2014-04-23 15:34:47 +00:00
|
|
|
|
size_t max_mark_range;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2015-02-15 12:38:21 +00:00
|
|
|
|
/// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины.
|
|
|
|
|
double avg_value_size_hint = 0;
|
|
|
|
|
|
2015-03-31 11:16:34 +00:00
|
|
|
|
Stream(const String & path_prefix_, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MarkRanges & all_mark_ranges,
|
2015-04-12 04:39:20 +00:00
|
|
|
|
size_t aio_threshold, size_t max_read_buffer_size)
|
|
|
|
|
: path_prefix(path_prefix_)
|
2014-01-15 14:53:20 +00:00
|
|
|
|
{
|
2014-02-11 13:30:42 +00:00
|
|
|
|
loadMarks(mark_cache);
|
2014-04-23 15:34:47 +00:00
|
|
|
|
size_t max_mark_range = 0;
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < all_mark_ranges.size(); ++i)
|
|
|
|
|
{
|
|
|
|
|
size_t right = all_mark_ranges[i].end;
|
2014-04-25 14:55:27 +00:00
|
|
|
|
|
2014-04-23 15:34:47 +00:00
|
|
|
|
/// Если правая граница лежит внутри блока, то его тоже придется читать.
|
2014-04-25 14:55:27 +00:00
|
|
|
|
if (right < (*marks).size() && (*marks)[right].offset_in_decompressed_block > 0)
|
2014-04-23 15:34:47 +00:00
|
|
|
|
{
|
|
|
|
|
while (right < (*marks).size() && (*marks)[right].offset_in_compressed_file ==
|
|
|
|
|
(*marks)[all_mark_ranges[i].end].offset_in_compressed_file)
|
|
|
|
|
++right;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Если правее засечек нет, просто используем DEFAULT_BUFFER_SIZE
|
2014-04-25 14:55:27 +00:00
|
|
|
|
if (right >= (*marks).size() || (right + 1 == (*marks).size() &&
|
|
|
|
|
(*marks)[right].offset_in_compressed_file == (*marks)[all_mark_ranges[i].end].offset_in_compressed_file))
|
2014-04-23 15:34:47 +00:00
|
|
|
|
{
|
2015-04-12 04:39:20 +00:00
|
|
|
|
max_mark_range = max_read_buffer_size;
|
2014-04-23 15:34:47 +00:00
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
2014-12-25 00:18:16 +00:00
|
|
|
|
max_mark_range = std::max(max_mark_range,
|
|
|
|
|
(*marks)[right].offset_in_compressed_file - (*marks)[all_mark_ranges[i].begin].offset_in_compressed_file);
|
2014-04-23 15:34:47 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-04-13 15:02:39 +00:00
|
|
|
|
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
|
2015-04-14 10:33:00 +00:00
|
|
|
|
|
|
|
|
|
size_t estimated_size = 0;
|
|
|
|
|
if (aio_threshold > 0)
|
|
|
|
|
{
|
|
|
|
|
for (const auto & mark_range : all_mark_ranges)
|
|
|
|
|
{
|
|
|
|
|
size_t offset_begin = (*marks)[mark_range.begin].offset_in_compressed_file;
|
|
|
|
|
|
|
|
|
|
size_t offset_end;
|
|
|
|
|
if (mark_range.end < (*marks).size())
|
|
|
|
|
offset_end = (*marks)[mark_range.end].offset_in_compressed_file;
|
|
|
|
|
else
|
|
|
|
|
offset_end = Poco::File(path_prefix + ".bin").getSize();
|
|
|
|
|
|
|
|
|
|
if (offset_end > 0)
|
|
|
|
|
estimated_size += offset_end - offset_begin;
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-04-23 15:34:47 +00:00
|
|
|
|
|
2014-01-15 14:53:20 +00:00
|
|
|
|
if (uncompressed_cache)
|
|
|
|
|
{
|
2015-04-13 15:02:39 +00:00
|
|
|
|
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache,
|
|
|
|
|
estimated_size, aio_threshold, buffer_size);
|
2014-01-15 14:53:20 +00:00
|
|
|
|
data_buffer = &*cached_buffer;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2015-04-13 15:02:39 +00:00
|
|
|
|
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", estimated_size,
|
|
|
|
|
aio_threshold, buffer_size);
|
2014-01-15 14:53:20 +00:00
|
|
|
|
data_buffer = &*non_cached_buffer;
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-02-11 13:30:42 +00:00
|
|
|
|
void loadMarks(MarkCache * cache)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
2014-02-11 13:30:42 +00:00
|
|
|
|
std::string path = path_prefix + ".mrk";
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-02-11 13:30:42 +00:00
|
|
|
|
UInt128 key;
|
|
|
|
|
if (cache)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
2014-02-11 13:30:42 +00:00
|
|
|
|
key = cache->hash(path);
|
|
|
|
|
marks = cache->get(key);
|
|
|
|
|
if (marks)
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
marks.reset(new MarksInCompressedFile);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-02-11 13:30:42 +00:00
|
|
|
|
ReadBufferFromFile buffer(path);
|
|
|
|
|
while (!buffer.eof())
|
|
|
|
|
{
|
|
|
|
|
MarkInCompressedFile mark;
|
|
|
|
|
readIntBinary(mark.offset_in_compressed_file, buffer);
|
|
|
|
|
readIntBinary(mark.offset_in_decompressed_block, buffer);
|
|
|
|
|
marks->push_back(mark);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-02-11 13:30:42 +00:00
|
|
|
|
if (cache)
|
|
|
|
|
cache->set(key, marks);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void seekToMark(size_t index)
|
|
|
|
|
{
|
|
|
|
|
MarkInCompressedFile mark = (*marks)[index];
|
|
|
|
|
|
2013-11-26 11:55:11 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
2014-01-15 14:53:20 +00:00
|
|
|
|
if (cached_buffer)
|
2014-04-23 15:34:47 +00:00
|
|
|
|
{
|
2014-02-11 13:30:42 +00:00
|
|
|
|
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
2014-04-23 15:34:47 +00:00
|
|
|
|
}
|
2014-01-15 14:53:20 +00:00
|
|
|
|
if (non_cached_buffer)
|
2014-02-11 13:30:42 +00:00
|
|
|
|
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
catch (const Exception & e)
|
|
|
|
|
{
|
|
|
|
|
/// Более хорошая диагностика.
|
|
|
|
|
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
2015-04-16 06:12:35 +00:00
|
|
|
|
throw Exception(e.message() + " (while seeking to mark " + toString(index)
|
2014-02-11 13:30:42 +00:00
|
|
|
|
+ " of column " + path_prefix + "; offsets are: "
|
|
|
|
|
+ toString(mark.offset_in_compressed_file) + " "
|
|
|
|
|
+ toString(mark.offset_in_decompressed_block) + ")", e.code());
|
2013-11-26 11:55:11 +00:00
|
|
|
|
else
|
|
|
|
|
throw;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2014-04-22 22:43:55 +00:00
|
|
|
|
typedef std::map<std::string, std::unique_ptr<Stream> > FileStreams;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
|
|
|
|
String path;
|
2014-11-25 20:55:23 +00:00
|
|
|
|
const MergeTreeData::DataPartPtr & data_part;
|
2014-07-23 09:15:41 +00:00
|
|
|
|
String part_name;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
FileStreams streams;
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
|
|
|
|
/// Запрашиваемые столбцы. Возможно, с добавлением minimum_size_column.
|
2014-07-17 13:41:47 +00:00
|
|
|
|
NamesAndTypesList columns;
|
2015-04-09 00:37:08 +00:00
|
|
|
|
const NameAndTypePair * added_minimum_size_column = nullptr;
|
|
|
|
|
|
2015-04-16 06:12:35 +00:00
|
|
|
|
UncompressedCache * uncompressed_cache;
|
|
|
|
|
MarkCache * mark_cache;
|
|
|
|
|
|
2014-03-09 17:36:01 +00:00
|
|
|
|
MergeTreeData & storage;
|
2014-11-25 20:55:23 +00:00
|
|
|
|
const MarkRanges & all_mark_ranges;
|
2015-04-09 13:34:16 +00:00
|
|
|
|
size_t aio_threshold;
|
2015-04-12 04:39:20 +00:00
|
|
|
|
size_t max_read_buffer_size;
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-04-23 15:34:47 +00:00
|
|
|
|
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
|
|
|
|
String escaped_column_name = escapeForFileName(name);
|
|
|
|
|
|
|
|
|
|
/** Если файла с данными нет - то не будем пытаться открыть его.
|
|
|
|
|
* Это нужно, чтобы можно было добавлять новые столбцы к структуре таблицы без создания файлов для старых кусков.
|
|
|
|
|
*/
|
|
|
|
|
if (!Poco::File(path + escaped_column_name + ".bin").exists())
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
/// Для массивов используются отдельные потоки для размеров.
|
2014-06-26 00:58:14 +00:00
|
|
|
|
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
|
|
|
|
String size_name = DataTypeNested::extractNestedTableName(name)
|
|
|
|
|
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
|
|
|
|
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
|
|
|
|
|
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
|
|
|
|
|
2014-01-15 16:12:48 +00:00
|
|
|
|
if (!streams.count(size_name))
|
2014-04-22 22:43:55 +00:00
|
|
|
|
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
|
2015-04-12 04:39:20 +00:00
|
|
|
|
path + escaped_size_name, uncompressed_cache, mark_cache, all_mark_ranges, aio_threshold, max_read_buffer_size)));
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2014-04-23 15:34:47 +00:00
|
|
|
|
addStream(name, *type_arr->getNestedType(), all_mark_ranges, level + 1);
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
2015-04-12 04:39:20 +00:00
|
|
|
|
streams[name].reset(new Stream(
|
|
|
|
|
path + escaped_column_name, uncompressed_cache, mark_cache, all_mark_ranges, aio_threshold, max_read_buffer_size));
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
2013-11-26 11:55:11 +00:00
|
|
|
|
void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read,
|
2015-04-09 00:37:08 +00:00
|
|
|
|
size_t level = 0, bool read_offsets = true)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
|
|
|
|
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
|
2014-06-26 00:58:14 +00:00
|
|
|
|
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
|
|
|
|
if (read_offsets)
|
|
|
|
|
{
|
|
|
|
|
Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
|
|
|
|
|
stream.seekToMark(from_mark);
|
|
|
|
|
type_arr->deserializeOffsets(
|
|
|
|
|
column,
|
|
|
|
|
*stream.data_buffer,
|
|
|
|
|
max_rows_to_read);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (column.size())
|
2013-12-04 10:14:35 +00:00
|
|
|
|
{
|
2014-06-26 00:58:14 +00:00
|
|
|
|
ColumnArray & array = typeid_cast<ColumnArray &>(column);
|
2015-04-09 00:37:08 +00:00
|
|
|
|
const size_t required_internal_size = array.getOffsets()[column.size() - 1];
|
2013-11-26 11:55:11 +00:00
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
if (required_internal_size)
|
2013-11-26 11:55:11 +00:00
|
|
|
|
{
|
|
|
|
|
readData(
|
2015-04-09 00:37:08 +00:00
|
|
|
|
name,
|
|
|
|
|
*type_arr->getNestedType(),
|
|
|
|
|
array.getData(),
|
2013-11-26 11:55:11 +00:00
|
|
|
|
from_mark,
|
2015-04-09 00:37:08 +00:00
|
|
|
|
required_internal_size - array.getData().size(),
|
2013-11-26 11:55:11 +00:00
|
|
|
|
level + 1);
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
|
|
|
|
/** Исправление для ошибочно записанных пустых файлов с данными массива.
|
|
|
|
|
* Такое бывает после ALTER с добавлением новых столбцов во вложенную структуру данных.
|
|
|
|
|
*/
|
|
|
|
|
size_t read_internal_size = array.getData().size();
|
|
|
|
|
if (required_internal_size != read_internal_size)
|
|
|
|
|
{
|
|
|
|
|
if (read_internal_size != 0)
|
2015-04-09 02:10:06 +00:00
|
|
|
|
LOG_ERROR((&Logger::get("MergeTreeReader")),
|
|
|
|
|
"Internal size of array " + name + " doesn't match offsets: corrupted data, filling with default values.");
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
|
|
|
|
array.getDataPtr() = dynamic_cast<IColumnConst &>(
|
|
|
|
|
*type_arr->getNestedType()->createConstColumn(
|
|
|
|
|
required_internal_size,
|
|
|
|
|
type_arr->getNestedType()->getDefault())).convertToFullColumn();
|
|
|
|
|
|
|
|
|
|
/** NOTE Можно было бы занулять этот столбец, чтобы он не добавлялся в блок,
|
|
|
|
|
* а впоследствии создавался с более правильными (из определения таблицы) значениями по-умолчанию.
|
|
|
|
|
*/
|
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
Stream & stream = *streams[name];
|
|
|
|
|
stream.seekToMark(from_mark);
|
2015-02-15 12:38:21 +00:00
|
|
|
|
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, stream.avg_value_size_hint);
|
|
|
|
|
|
|
|
|
|
/// Вычисление подсказки о среднем размере значения.
|
|
|
|
|
size_t column_size = column.size();
|
|
|
|
|
if (column_size)
|
|
|
|
|
{
|
|
|
|
|
double current_avg_value_size = static_cast<double>(column.byteSize()) / column_size;
|
|
|
|
|
|
|
|
|
|
/// Эвристика, чтобы при изменениях, значение avg_value_size_hint быстро росло, но медленно уменьшалось.
|
|
|
|
|
if (current_avg_value_size > stream.avg_value_size_hint)
|
|
|
|
|
stream.avg_value_size_hint = current_avg_value_size;
|
|
|
|
|
else if (current_avg_value_size * 2 < stream.avg_value_size_hint)
|
|
|
|
|
stream.avg_value_size_hint = (current_avg_value_size + stream.avg_value_size_hint * 3) / 4;
|
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2015-04-02 03:08:43 +00:00
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
2015-04-02 03:08:43 +00:00
|
|
|
|
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder)
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
/** Для недостающих столбцов из вложенной структуры нужно создавать не столбец пустых массивов, а столбец массивов
|
|
|
|
|
* правильных длин.
|
|
|
|
|
* TODO: Если для какой-то вложенной структуры были запрошены только отсутствующие столбцы, для них вернутся пустые
|
|
|
|
|
* массивы, даже если в куске есть смещения для этой вложенной структуры. Это можно исправить.
|
2015-04-09 02:10:06 +00:00
|
|
|
|
* NOTE: Похожий код есть в Block::addDefaults, но он немного отличается.
|
2015-04-02 03:08:43 +00:00
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/// Сначала запомним столбцы смещений для всех массивов в блоке.
|
|
|
|
|
OffsetColumns offset_columns;
|
|
|
|
|
for (size_t i = 0; i < res.columns(); ++i)
|
|
|
|
|
{
|
|
|
|
|
const ColumnWithNameAndType & column = res.getByPosition(i);
|
|
|
|
|
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*column.column))
|
|
|
|
|
{
|
|
|
|
|
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
|
2015-04-09 00:37:08 +00:00
|
|
|
|
auto & offsets_column = offset_columns[offsets_name];
|
|
|
|
|
|
|
|
|
|
/// Если почему-то есть разные столбцы смещений для одной вложенной структуры, то берём непустой.
|
|
|
|
|
if (!offsets_column || offsets_column->empty())
|
|
|
|
|
offsets_column = array->getOffsetsColumn();
|
2015-04-02 03:08:43 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto should_evaluate_defaults = false;
|
|
|
|
|
auto should_sort = always_reorder;
|
2015-04-09 00:37:08 +00:00
|
|
|
|
|
|
|
|
|
for (const auto & requested_column : columns)
|
2015-04-02 03:08:43 +00:00
|
|
|
|
{
|
|
|
|
|
/// insert default values only for columns without default expressions
|
2015-04-09 00:37:08 +00:00
|
|
|
|
if (!res.has(requested_column.name))
|
2015-04-02 03:08:43 +00:00
|
|
|
|
{
|
|
|
|
|
should_sort = true;
|
2015-04-09 00:37:08 +00:00
|
|
|
|
if (storage.column_defaults.count(requested_column.name) != 0)
|
2015-04-02 03:08:43 +00:00
|
|
|
|
{
|
|
|
|
|
should_evaluate_defaults = true;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
ColumnWithNameAndType column_to_add;
|
|
|
|
|
column_to_add.name = requested_column.name;
|
|
|
|
|
column_to_add.type = requested_column.type;
|
2015-04-02 03:08:43 +00:00
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
String offsets_name = DataTypeNested::extractNestedTableName(column_to_add.name);
|
2015-04-02 03:08:43 +00:00
|
|
|
|
if (offset_columns.count(offsets_name))
|
|
|
|
|
{
|
|
|
|
|
ColumnPtr offsets_column = offset_columns[offsets_name];
|
2015-04-09 00:37:08 +00:00
|
|
|
|
DataTypePtr nested_type = typeid_cast<DataTypeArray &>(*column_to_add.type).getNestedType();
|
2015-04-02 03:08:43 +00:00
|
|
|
|
size_t nested_rows = offsets_column->empty() ? 0
|
|
|
|
|
: typeid_cast<ColumnUInt64 &>(*offsets_column).getData().back();
|
|
|
|
|
|
|
|
|
|
ColumnPtr nested_column = dynamic_cast<IColumnConst &>(*nested_type->createConstColumn(
|
|
|
|
|
nested_rows, nested_type->getDefault())).convertToFullColumn();
|
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
column_to_add.column = new ColumnArray(nested_column, offsets_column);
|
2015-04-02 03:08:43 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
|
|
|
|
|
* он может быть полноценным (а то интерпретатор может посчитать, что он константный везде).
|
|
|
|
|
*/
|
2015-04-09 00:37:08 +00:00
|
|
|
|
column_to_add.column = dynamic_cast<IColumnConst &>(*column_to_add.type->createConstColumn(
|
|
|
|
|
res.rows(), column_to_add.type->getDefault())).convertToFullColumn();
|
2015-04-02 03:08:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-04-09 00:37:08 +00:00
|
|
|
|
res.insert(column_to_add);
|
2015-04-02 03:08:43 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// evaluate defaulted columns if necessary
|
|
|
|
|
if (should_evaluate_defaults)
|
|
|
|
|
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
|
|
|
|
|
|
|
|
|
|
/// remove added column to ensure same content among all blocks
|
2015-04-09 00:37:08 +00:00
|
|
|
|
if (added_minimum_size_column)
|
2015-04-02 03:08:43 +00:00
|
|
|
|
{
|
|
|
|
|
res.erase(0);
|
2015-04-09 00:37:08 +00:00
|
|
|
|
streams.erase(added_minimum_size_column->name);
|
2015-04-02 03:08:43 +00:00
|
|
|
|
columns.erase(std::begin(columns));
|
2015-04-09 00:37:08 +00:00
|
|
|
|
added_minimum_size_column = nullptr;
|
2015-04-02 03:08:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// sort columns to ensure consistent order among all blocks
|
|
|
|
|
if (should_sort)
|
|
|
|
|
{
|
|
|
|
|
Block ordered_block;
|
|
|
|
|
|
|
|
|
|
for (const auto & name : ordered_names)
|
|
|
|
|
if (res.has(name))
|
|
|
|
|
ordered_block.insert(res.getByName(name));
|
|
|
|
|
|
|
|
|
|
if (res.columns() != ordered_block.columns())
|
|
|
|
|
throw Exception{
|
|
|
|
|
"Ordered block has different number of columns than original one:\n" +
|
|
|
|
|
ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(),
|
|
|
|
|
ErrorCodes::LOGICAL_ERROR};
|
|
|
|
|
|
|
|
|
|
std::swap(res, ordered_block);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (const Exception & e)
|
|
|
|
|
{
|
|
|
|
|
/// Более хорошая диагностика.
|
|
|
|
|
throw Exception(e.message() + '\n' + e.getStackTrace().toString()
|
|
|
|
|
+ "\n(while reading from part " + path + ")", e.code());
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-11-26 11:55:11 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|