2013-04-24 10:31:32 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2013-05-03 10:20:53 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2013-04-24 10:31:32 +00:00
|
|
|
|
#include <DB/Storages/StorageMergeTree.h>
|
2013-05-03 10:20:53 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/PKCondition.h>
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
2013-09-08 05:53:10 +00:00
|
|
|
|
#include <DB/IO/CachedCompressedReadBuffer.h>
|
|
|
|
|
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов.
|
|
|
|
|
class MergeTreeBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
/// Параметры storage_ и owned_storage разделены, чтобы можно было сделать поток, не владеющий своим storage
|
|
|
|
|
/// (например, поток, сливаящий куски). В таком случае сам storage должен следить, чтобы не удалить данные, пока их читают.
|
|
|
|
|
MergeTreeBlockInputStream(const String & path_, /// Путь к куску
|
2013-09-08 05:53:10 +00:00
|
|
|
|
size_t block_size_, const Names & column_names_,
|
|
|
|
|
StorageMergeTree & storage_, const StorageMergeTree::DataPartPtr & owned_data_part_,
|
|
|
|
|
const MarkRanges & mark_ranges_, StoragePtr owned_storage, bool use_uncompressed_cache_)
|
|
|
|
|
: IProfilingBlockInputStream(owned_storage),
|
|
|
|
|
path(path_), block_size(block_size_), column_names(column_names_),
|
|
|
|
|
storage(storage_), owned_data_part(owned_data_part_),
|
|
|
|
|
mark_ranges(mark_ranges_), use_uncompressed_cache(use_uncompressed_cache_),
|
|
|
|
|
current_range(-1), rows_left_in_current_range(0)
|
2013-04-24 10:31:32 +00:00
|
|
|
|
{
|
|
|
|
|
LOG_TRACE(storage.log, "Reading " << mark_ranges.size() << " ranges from part " << owned_data_part->name
|
|
|
|
|
<< ", up to " << (mark_ranges.back().end - mark_ranges.front().begin) * storage.index_granularity
|
|
|
|
|
<< " rows starting from " << mark_ranges.front().begin * storage.index_granularity);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String getName() const { return "MergeTreeBlockInputStream"; }
|
2013-05-03 10:20:53 +00:00
|
|
|
|
|
|
|
|
|
String getID() const
|
|
|
|
|
{
|
|
|
|
|
std::stringstream res;
|
|
|
|
|
res << "MergeTree(" << owned_storage->getTableName() << ", " << path << ", columns";
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < column_names.size(); ++i)
|
|
|
|
|
res << ", " << column_names[i];
|
|
|
|
|
|
|
|
|
|
res << ", marks";
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < mark_ranges.size(); ++i)
|
|
|
|
|
res << ", " << mark_ranges[i].begin << ", " << mark_ranges[i].end;
|
|
|
|
|
|
|
|
|
|
res << ")";
|
|
|
|
|
return res.str();
|
|
|
|
|
}
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
|
|
|
|
|
static MarkRanges markRangesFromPkRange(const String & path,
|
|
|
|
|
size_t marks_count,
|
|
|
|
|
StorageMergeTree & storage,
|
|
|
|
|
PKCondition & key_condition)
|
|
|
|
|
{
|
|
|
|
|
MarkRanges res;
|
|
|
|
|
|
|
|
|
|
/// Если индекс не используется.
|
|
|
|
|
if (key_condition.alwaysTrue())
|
|
|
|
|
{
|
|
|
|
|
res.push_back(MarkRange(0, marks_count));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Читаем индекс.
|
|
|
|
|
typedef AutoArray<Row> Index;
|
|
|
|
|
size_t key_size = storage.sort_descr.size();
|
|
|
|
|
Index index(marks_count);
|
|
|
|
|
for (size_t i = 0; i < marks_count; ++i)
|
|
|
|
|
index[i].resize(key_size);
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
String index_path = path + "primary.idx";
|
|
|
|
|
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < marks_count; ++i)
|
|
|
|
|
{
|
|
|
|
|
for (size_t j = 0; j < key_size; ++j)
|
|
|
|
|
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i][j], index_file);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!index_file.eof())
|
|
|
|
|
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
|
|
|
|
|
/// На каждом шаге берем левый отрезок и проверяем, подходит ли он.
|
|
|
|
|
/// Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его.
|
|
|
|
|
/// Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем.
|
|
|
|
|
std::vector<MarkRange> ranges_stack;
|
|
|
|
|
ranges_stack.push_back(MarkRange(0, marks_count));
|
|
|
|
|
while (!ranges_stack.empty())
|
|
|
|
|
{
|
|
|
|
|
MarkRange range = ranges_stack.back();
|
|
|
|
|
ranges_stack.pop_back();
|
|
|
|
|
|
|
|
|
|
bool may_be_true;
|
|
|
|
|
if (range.end == marks_count)
|
|
|
|
|
may_be_true = key_condition.mayBeTrueAfter(index[range.begin]);
|
|
|
|
|
else
|
|
|
|
|
may_be_true = key_condition.mayBeTrueInRange(index[range.begin], index[range.end]);
|
|
|
|
|
|
|
|
|
|
if (!may_be_true)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
if (range.end == range.begin + 1)
|
|
|
|
|
{
|
|
|
|
|
/// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон.
|
|
|
|
|
if (res.empty() || range.begin - res.back().end > storage.min_marks_for_seek)
|
|
|
|
|
{
|
|
|
|
|
res.push_back(range);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
res.back().end = range.end;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Разбиваем отрезок и кладем результат в стек справа налево.
|
|
|
|
|
size_t step = (range.end - range.begin - 1) / storage.settings.coarse_index_granularity + 1;
|
|
|
|
|
size_t end;
|
|
|
|
|
for (end = range.end; end > range.begin + step; end -= step)
|
|
|
|
|
{
|
|
|
|
|
ranges_stack.push_back(MarkRange(end - step, end));
|
|
|
|
|
}
|
|
|
|
|
ranges_stack.push_back(MarkRange(range.begin, end));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
Block readImpl()
|
|
|
|
|
{
|
|
|
|
|
Block res;
|
|
|
|
|
|
|
|
|
|
/// Если нужно, переходим к следующему диапазону.
|
|
|
|
|
if (rows_left_in_current_range == 0)
|
|
|
|
|
{
|
|
|
|
|
++current_range;
|
|
|
|
|
if (static_cast<size_t>(current_range) == mark_ranges.size())
|
|
|
|
|
return res;
|
|
|
|
|
|
|
|
|
|
MarkRange & range = mark_ranges[current_range];
|
|
|
|
|
rows_left_in_current_range = (range.end - range.begin) * storage.index_granularity;
|
|
|
|
|
|
|
|
|
|
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
|
|
|
|
|
addStream(*it, *storage.getDataTypeByName(*it), range.begin);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Сколько строк читать для следующего блока.
|
|
|
|
|
size_t max_rows_to_read = std::min(block_size, rows_left_in_current_range);
|
|
|
|
|
|
|
|
|
|
/** Для некоторых столбцов файлы с данными могут отсутствовать.
|
|
|
|
|
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
|
|
|
|
|
*/
|
|
|
|
|
bool has_missing_columns = false;
|
|
|
|
|
bool has_normal_columns = false;
|
|
|
|
|
|
2013-07-16 14:55:01 +00:00
|
|
|
|
/// Указатели на столбцы смещений, общие для столбцов из вложенных структур данных
|
|
|
|
|
typedef std::map<std::string, ColumnPtr> OffsetColumns;
|
|
|
|
|
OffsetColumns offset_columns;
|
|
|
|
|
|
2013-04-24 10:31:32 +00:00
|
|
|
|
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
|
|
|
|
|
{
|
|
|
|
|
if (streams.end() == streams.find(*it))
|
|
|
|
|
{
|
|
|
|
|
has_missing_columns = true;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
has_normal_columns = true;
|
|
|
|
|
|
|
|
|
|
ColumnWithNameAndType column;
|
|
|
|
|
column.name = *it;
|
|
|
|
|
column.type = storage.getDataTypeByName(*it);
|
2013-07-16 14:55:01 +00:00
|
|
|
|
|
|
|
|
|
bool read_offsets = true;
|
|
|
|
|
|
|
|
|
|
/// Для вложенных структур запоминаем указатели на столбцы со смещениями
|
|
|
|
|
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&*column.type))
|
|
|
|
|
{
|
|
|
|
|
String name = DataTypeNested::extractNestedTableName(column.name);
|
|
|
|
|
|
|
|
|
|
if (offset_columns.count(name) == 0)
|
|
|
|
|
offset_columns[name] = new ColumnArray::ColumnOffsets_t;
|
|
|
|
|
else
|
|
|
|
|
read_offsets = false; /// на предыдущих итерациях смещения уже считали вызовом readData
|
|
|
|
|
|
|
|
|
|
column.column = new ColumnArray(type_arr->getNestedType()->createColumn(), offset_columns[name]);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
column.column = column.type->createColumn();
|
2013-06-25 12:19:10 +00:00
|
|
|
|
|
|
|
|
|
try
|
|
|
|
|
{
|
2013-07-16 14:55:01 +00:00
|
|
|
|
readData(*it, *column.type, *column.column, max_rows_to_read, 0, read_offsets);
|
2013-06-25 12:19:10 +00:00
|
|
|
|
}
|
2013-11-11 05:35:58 +00:00
|
|
|
|
catch (Exception & e)
|
2013-06-25 12:19:10 +00:00
|
|
|
|
{
|
|
|
|
|
/// Более хорошая диагностика.
|
|
|
|
|
if (e.code() == ErrorCodes::CHECKSUM_DOESNT_MATCH || e.code() == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED)
|
2013-11-11 05:35:58 +00:00
|
|
|
|
e.addMessage("(while reading column " + *it + " from part " + path + ")");
|
|
|
|
|
|
|
|
|
|
throw;
|
2013-06-25 12:19:10 +00:00
|
|
|
|
}
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
if (column.column->size())
|
|
|
|
|
res.insert(column);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (has_missing_columns && !has_normal_columns)
|
|
|
|
|
throw Exception("All requested columns are missing", ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING);
|
|
|
|
|
|
|
|
|
|
if (res)
|
|
|
|
|
{
|
2013-08-27 20:15:22 +00:00
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
rows_left_in_current_range -= res.rows();
|
|
|
|
|
}
|
2013-11-11 05:35:58 +00:00
|
|
|
|
catch (Exception & e)
|
2013-08-27 20:15:22 +00:00
|
|
|
|
{
|
|
|
|
|
/// Более хорошая диагностика.
|
|
|
|
|
if (e.code() == ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH)
|
2013-11-11 05:35:58 +00:00
|
|
|
|
e.addMessage("(while reading from part " + path + ")");
|
|
|
|
|
|
|
|
|
|
throw;
|
2013-08-27 20:15:22 +00:00
|
|
|
|
}
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
/// Заполним столбцы, для которых нет файлов, значениями по-умолчанию.
|
|
|
|
|
if (has_missing_columns)
|
|
|
|
|
{
|
|
|
|
|
size_t pos = 0; /// Позиция, куда надо вставить недостающий столбец.
|
|
|
|
|
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it, ++pos)
|
|
|
|
|
{
|
|
|
|
|
if (streams.end() == streams.find(*it))
|
|
|
|
|
{
|
|
|
|
|
ColumnWithNameAndType column;
|
|
|
|
|
column.name = *it;
|
|
|
|
|
column.type = storage.getDataTypeByName(*it);
|
|
|
|
|
|
|
|
|
|
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
|
|
|
|
|
* он может быть полноценным (а то интерпретатор может посчитать, что он константный везде).
|
|
|
|
|
*/
|
|
|
|
|
column.column = dynamic_cast<IColumnConst &>(*column.type->createConstColumn(
|
|
|
|
|
res.rows(), column.type->getDefault())).convertToFullColumn();
|
|
|
|
|
|
|
|
|
|
res.insert(pos, column);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!res || rows_left_in_current_range == 0)
|
|
|
|
|
{
|
|
|
|
|
rows_left_in_current_range = 0;
|
|
|
|
|
/** Закрываем файлы (ещё до уничтожения объекта).
|
|
|
|
|
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
|
|
|
|
|
* буферы не висели в памяти.
|
|
|
|
|
*/
|
|
|
|
|
streams.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
const String path;
|
|
|
|
|
size_t block_size;
|
|
|
|
|
Names column_names;
|
|
|
|
|
StorageMergeTree & storage;
|
|
|
|
|
const StorageMergeTree::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
|
|
|
|
|
MarkRanges mark_ranges; /// В каких диапазонах засечек читать.
|
2013-09-08 05:53:10 +00:00
|
|
|
|
bool use_uncompressed_cache;
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
int current_range; /// Какой из mark_ranges сейчас читаем.
|
|
|
|
|
size_t rows_left_in_current_range; /// Сколько строк уже прочитали из текущего элемента mark_ranges.
|
|
|
|
|
|
|
|
|
|
struct Stream
|
|
|
|
|
{
|
2013-09-08 05:53:10 +00:00
|
|
|
|
Stream(const String & path_prefix, size_t mark_number, UncompressedCache * uncompressed_cache)
|
2013-04-24 10:31:32 +00:00
|
|
|
|
{
|
2013-09-08 05:53:10 +00:00
|
|
|
|
size_t buf_size = std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path_prefix + ".bin").getSize());
|
|
|
|
|
|
|
|
|
|
size_t offset_in_compressed_file = 0;
|
|
|
|
|
size_t offset_in_decompressed_block = 0;
|
|
|
|
|
|
2013-04-24 10:31:32 +00:00
|
|
|
|
if (mark_number)
|
|
|
|
|
{
|
|
|
|
|
/// Прочитаем из файла с засечками смещение в файле с данными.
|
|
|
|
|
ReadBufferFromFile marks(path_prefix + ".mrk", MERGE_TREE_MARK_SIZE);
|
|
|
|
|
marks.seek(mark_number * MERGE_TREE_MARK_SIZE);
|
2013-09-08 05:53:10 +00:00
|
|
|
|
|
2013-04-24 10:31:32 +00:00
|
|
|
|
readIntBinary(offset_in_compressed_file, marks);
|
|
|
|
|
readIntBinary(offset_in_decompressed_block, marks);
|
|
|
|
|
}
|
2013-09-08 05:53:10 +00:00
|
|
|
|
|
|
|
|
|
if (uncompressed_cache)
|
|
|
|
|
{
|
|
|
|
|
compressed = new CachedCompressedReadBuffer(
|
|
|
|
|
path_prefix + ".bin", offset_in_compressed_file, *uncompressed_cache, buf_size);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
plain = new ReadBufferFromFile(path_prefix + ".bin", buf_size);
|
|
|
|
|
compressed = new CompressedReadBuffer(*plain);
|
|
|
|
|
|
|
|
|
|
if (offset_in_compressed_file)
|
|
|
|
|
plain->seek(offset_in_compressed_file);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
compressed->next();
|
|
|
|
|
compressed->position() += offset_in_decompressed_block;
|
2013-04-24 10:31:32 +00:00
|
|
|
|
}
|
2013-09-08 05:53:10 +00:00
|
|
|
|
|
|
|
|
|
SharedPtr<ReadBufferFromFile> plain; /// Может отсутствовать, если используется CachedCompressedReadBuffer.
|
|
|
|
|
SharedPtr<ReadBuffer> compressed; /// Либо CompressedReadBuffer, либо CachedCompressedReadBuffer.
|
2013-04-24 10:31:32 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
|
|
|
|
|
FileStreams streams;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void addStream(const String & name, const IDataType & type, size_t mark_number, size_t level = 0)
|
|
|
|
|
{
|
2013-08-05 17:05:30 +00:00
|
|
|
|
String escaped_column_name = escapeForFileName(name);
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
/** Если файла с данными нет - то не будем пытаться открыть его.
|
|
|
|
|
* Это нужно, чтобы можно было добавлять новые столбцы к структуре таблицы без создания файлов для старых кусков.
|
|
|
|
|
*/
|
|
|
|
|
if (!Poco::File(path + escaped_column_name + ".bin").exists())
|
|
|
|
|
return;
|
2013-09-08 05:53:10 +00:00
|
|
|
|
|
|
|
|
|
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
/// Для массивов используются отдельные потоки для размеров.
|
2013-04-24 12:57:36 +00:00
|
|
|
|
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
|
|
|
|
|
{
|
2013-08-05 17:05:30 +00:00
|
|
|
|
String size_name = DataTypeNested::extractNestedTableName(name)
|
|
|
|
|
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
|
|
|
|
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
|
|
|
|
|
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
2013-04-24 12:57:36 +00:00
|
|
|
|
|
|
|
|
|
streams.insert(std::make_pair(size_name, new Stream(
|
2013-09-08 05:53:10 +00:00
|
|
|
|
path + escaped_size_name, mark_number, uncompressed_cache)));
|
2013-04-24 12:57:36 +00:00
|
|
|
|
|
|
|
|
|
addStream(name, *type_arr->getNestedType(), mark_number, level + 1);
|
|
|
|
|
}
|
2013-07-12 13:35:05 +00:00
|
|
|
|
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
|
|
|
|
|
{
|
|
|
|
|
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
|
|
|
|
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
|
|
|
|
|
|
|
|
|
streams.insert(std::make_pair(size_name, new Stream(
|
2013-09-08 05:53:10 +00:00
|
|
|
|
path + escaped_size_name, mark_number, uncompressed_cache)));
|
2013-07-12 13:35:05 +00:00
|
|
|
|
|
|
|
|
|
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
|
|
|
|
|
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
|
2013-07-16 14:55:01 +00:00
|
|
|
|
addStream(DataTypeNested::concatenateNestedName(name, it->first), *it->second, mark_number, level + 1);
|
2013-07-12 13:35:05 +00:00
|
|
|
|
}
|
2013-04-24 12:57:36 +00:00
|
|
|
|
else
|
|
|
|
|
streams.insert(std::make_pair(name, new Stream(
|
2013-09-08 05:53:10 +00:00
|
|
|
|
path + escaped_column_name, mark_number, uncompressed_cache)));
|
2013-04-24 10:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2013-07-16 14:55:01 +00:00
|
|
|
|
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, size_t level = 0, bool read_offsets = true)
|
2013-04-24 10:31:32 +00:00
|
|
|
|
{
|
|
|
|
|
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
|
|
|
|
|
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
|
|
|
|
|
{
|
2013-07-16 14:55:01 +00:00
|
|
|
|
if (read_offsets)
|
|
|
|
|
{
|
|
|
|
|
type_arr->deserializeOffsets(
|
|
|
|
|
column,
|
2013-09-08 05:53:10 +00:00
|
|
|
|
*streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
|
2013-07-16 14:55:01 +00:00
|
|
|
|
max_rows_to_read);
|
|
|
|
|
}
|
2013-04-24 10:31:32 +00:00
|
|
|
|
|
|
|
|
|
if (column.size())
|
|
|
|
|
readData(
|
|
|
|
|
name,
|
2013-06-25 12:19:10 +00:00
|
|
|
|
*type_arr->getNestedType(),
|
2013-07-12 13:35:05 +00:00
|
|
|
|
dynamic_cast<ColumnArray &>(column).getData(),
|
|
|
|
|
dynamic_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
|
|
|
|
|
level + 1);
|
|
|
|
|
}
|
|
|
|
|
else if (const DataTypeNested * type_nested = dynamic_cast<const DataTypeNested *>(&type))
|
|
|
|
|
{
|
|
|
|
|
type_nested->deserializeOffsets(
|
|
|
|
|
column,
|
2013-09-08 05:53:10 +00:00
|
|
|
|
*streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
|
2013-07-12 13:35:05 +00:00
|
|
|
|
max_rows_to_read);
|
|
|
|
|
|
|
|
|
|
if (column.size())
|
|
|
|
|
{
|
|
|
|
|
ColumnNested & column_nested = dynamic_cast<ColumnNested &>(column);
|
|
|
|
|
|
|
|
|
|
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
|
|
|
|
|
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
|
|
|
|
|
{
|
|
|
|
|
readData(
|
2013-07-16 14:55:01 +00:00
|
|
|
|
DataTypeNested::concatenateNestedName(name, it->first),
|
2013-07-12 13:35:05 +00:00
|
|
|
|
*it->second,
|
|
|
|
|
*column_nested.getData()[i],
|
|
|
|
|
column_nested.getOffsets()[column.size() - 1],
|
|
|
|
|
level + 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-04-24 10:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
2013-09-08 05:53:10 +00:00
|
|
|
|
type.deserializeBinary(column, *streams[name]->compressed, max_rows_to_read);
|
2013-04-24 10:31:32 +00:00
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|