2016-07-19 10:57:57 +00:00
|
|
|
#include <DB/DataTypes/IDataType.h>
|
|
|
|
#include <DB/DataTypes/DataTypeNested.h>
|
|
|
|
#include <DB/DataTypes/DataTypeArray.h>
|
2016-07-21 16:22:24 +00:00
|
|
|
#include <DB/DataTypes/DataTypeNullable.h>
|
2016-07-19 10:57:57 +00:00
|
|
|
#include <DB/Common/escapeForFileName.h>
|
2016-11-20 12:43:20 +00:00
|
|
|
#include <DB/IO/CachedCompressedReadBuffer.h>
|
|
|
|
#include <DB/IO/CompressedReadBufferFromFile.h>
|
|
|
|
#include <DB/Columns/ColumnArray.h>
|
2016-07-19 10:57:57 +00:00
|
|
|
#include <DB/Interpreters/evaluateMissingDefaults.h>
|
2016-11-20 12:43:20 +00:00
|
|
|
#include <DB/Storages/MergeTree/MergeTreeReader.h>
|
2016-12-10 04:51:36 +00:00
|
|
|
#include <DB/Columns/ColumnNullable.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
#include <Poco/File.h>
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2016-12-10 04:51:36 +00:00
|
|
|
using OffsetColumns = std::map<std::string, ColumnPtr>;
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2016-12-10 04:51:36 +00:00
|
|
|
constexpr auto DATA_FILE_EXTENSION = ".bin";
|
2017-01-04 04:15:38 +00:00
|
|
|
constexpr auto NULL_MAP_EXTENSION = ".null.bin";
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2016-12-10 04:51:36 +00:00
|
|
|
bool isNullStream(const std::string & extension)
|
|
|
|
{
|
|
|
|
return extension == NULL_MAP_EXTENSION;
|
|
|
|
}
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2016-11-20 12:43:20 +00:00
|
|
|
extern const int NOT_FOUND_EXPECTED_DATA_PART;
|
|
|
|
extern const int MEMORY_LIMIT_EXCEEDED;
|
|
|
|
}
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
MergeTreeReader::~MergeTreeReader() = default;
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
MergeTreeReader::MergeTreeReader(const String & path,
|
2016-07-19 10:57:57 +00:00
|
|
|
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
|
2016-11-20 12:43:20 +00:00
|
|
|
UncompressedCache * uncompressed_cache, MarkCache * mark_cache, bool save_marks_in_cache,
|
2016-07-19 10:57:57 +00:00
|
|
|
MergeTreeData & storage, const MarkRanges & all_mark_ranges,
|
|
|
|
size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints,
|
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
|
|
|
|
clockid_t clock_type)
|
|
|
|
: avg_value_size_hints(avg_value_size_hints), path(path), data_part(data_part), columns(columns),
|
2016-11-20 12:43:20 +00:00
|
|
|
uncompressed_cache(uncompressed_cache), mark_cache(mark_cache), save_marks_in_cache(save_marks_in_cache), storage(storage),
|
2016-07-19 10:57:57 +00:00
|
|
|
all_mark_ranges(all_mark_ranges), aio_threshold(aio_threshold), max_read_buffer_size(max_read_buffer_size)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (!Poco::File(path).exists())
|
|
|
|
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
|
|
|
|
|
|
|
|
for (const NameAndTypePair & column : columns)
|
|
|
|
addStream(column.name, *column.type, all_mark_ranges, profile_callback, clock_type);
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() const
|
|
|
|
{
|
|
|
|
return avg_value_size_hints;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeReader::readRange(size_t from_mark, size_t to_mark, Block & res)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
size_t max_rows_to_read = (to_mark - from_mark) * storage.index_granularity;
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Pointers to offset columns that are common to the nested data structure columns.
|
|
|
|
/// If append is true, then the value will be equal to nullptr and will be used only to
|
|
|
|
/// check that the offsets column has been already read.
|
2016-07-19 10:57:57 +00:00
|
|
|
OffsetColumns offset_columns;
|
|
|
|
|
|
|
|
for (const NameAndTypePair & it : columns)
|
|
|
|
{
|
|
|
|
if (streams.end() == streams.find(it.name))
|
|
|
|
continue;
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// The column is already present in the block so we will append the values to the end.
|
2016-07-19 10:57:57 +00:00
|
|
|
bool append = res.has(it.name);
|
|
|
|
|
|
|
|
ColumnWithTypeAndName column;
|
|
|
|
column.name = it.name;
|
|
|
|
column.type = it.type;
|
|
|
|
if (append)
|
|
|
|
column.column = res.getByName(column.name).column;
|
|
|
|
|
|
|
|
bool read_offsets = true;
|
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
const IDataType * observed_type;
|
2016-08-24 00:39:38 +00:00
|
|
|
bool is_nullable;
|
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
if (column.type.get()->isNullable())
|
|
|
|
{
|
2016-08-10 19:12:29 +00:00
|
|
|
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*column.type);
|
2016-07-21 16:22:24 +00:00
|
|
|
observed_type = nullable_type.getNestedType().get();
|
2016-08-24 00:39:38 +00:00
|
|
|
is_nullable = true;
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
else
|
2016-08-24 00:39:38 +00:00
|
|
|
{
|
2016-07-21 16:22:24 +00:00
|
|
|
observed_type = column.type.get();
|
2016-08-24 00:39:38 +00:00
|
|
|
is_nullable = false;
|
|
|
|
}
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// For nested data structures collect pointers to offset columns.
|
2016-07-21 16:22:24 +00:00
|
|
|
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(observed_type))
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
|
|
|
String name = DataTypeNested::extractNestedTableName(column.name);
|
|
|
|
|
|
|
|
if (offset_columns.count(name) == 0)
|
|
|
|
offset_columns[name] = append ? nullptr : std::make_shared<ColumnArray::ColumnOffsets_t>();
|
|
|
|
else
|
2017-01-24 17:25:47 +00:00
|
|
|
read_offsets = false; /// offsets have already been read on the previous iteration
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
if (!append)
|
2016-08-24 00:39:38 +00:00
|
|
|
{
|
2016-07-19 10:57:57 +00:00
|
|
|
column.column = std::make_shared<ColumnArray>(type_arr->getNestedType()->createColumn(), offset_columns[name]);
|
2016-08-24 00:39:38 +00:00
|
|
|
if (is_nullable)
|
|
|
|
column.column = std::make_shared<ColumnNullable>(column.column, std::make_shared<ColumnUInt8>());
|
|
|
|
}
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
else if (!append)
|
|
|
|
column.column = column.type->createColumn();
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2017-01-24 20:44:12 +00:00
|
|
|
readData(column.name, *column.type, *column.column, from_mark, max_rows_to_read, 0, read_offsets);
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Better diagnostics.
|
2016-07-19 10:57:57 +00:00
|
|
|
e.addMessage("(while reading column " + column.name + ")");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!append && column.column->size())
|
2016-08-05 15:44:19 +00:00
|
|
|
res.insert(std::move(column));
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
2017-01-24 20:44:12 +00:00
|
|
|
|
|
|
|
cur_mark_idx = to_mark;
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Better diagnostics.
|
2016-07-19 10:57:57 +00:00
|
|
|
e.addMessage("(while reading from part " + path + " from mark " + toString(from_mark) + " to " + toString(to_mark) + ")");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeReader::fillMissingColumns(Block & res, const Names & ordered_names, const bool always_reorder)
|
|
|
|
{
|
|
|
|
fillMissingColumnsImpl(res, ordered_names, always_reorder);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeReader::fillMissingColumnsAndReorder(Block & res, const Names & ordered_names)
|
|
|
|
{
|
|
|
|
fillMissingColumnsImpl(res, ordered_names, true);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
MergeTreeReader::Stream::Stream(
|
2017-01-19 11:40:06 +00:00
|
|
|
const String & path_prefix_, const String & extension_, size_t marks_count_,
|
|
|
|
const MarkRanges & all_mark_ranges,
|
|
|
|
MarkCache * mark_cache_, bool save_marks_in_cache_,
|
2016-12-10 06:10:29 +00:00
|
|
|
UncompressedCache * uncompressed_cache,
|
2017-01-19 11:40:06 +00:00
|
|
|
size_t aio_threshold, size_t max_read_buffer_size,
|
2016-11-20 12:43:20 +00:00
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
2017-01-19 11:40:06 +00:00
|
|
|
: path_prefix(path_prefix_), extension(extension_), marks_count(marks_count_)
|
|
|
|
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2016-12-10 04:51:36 +00:00
|
|
|
/// Compute the size of the buffer.
|
2016-11-20 12:43:20 +00:00
|
|
|
size_t max_mark_range = 0;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < all_mark_ranges.size(); ++i)
|
|
|
|
{
|
|
|
|
size_t right = all_mark_ranges[i].end;
|
2017-01-24 17:54:21 +00:00
|
|
|
/// NOTE: if we are reading the whole file, then right == marks_count
|
|
|
|
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// If the end of range is inside the block, we will need to read it too.
|
2017-01-19 11:40:06 +00:00
|
|
|
if (right < marks_count && getMark(right).offset_in_decompressed_block > 0)
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2017-01-19 11:40:06 +00:00
|
|
|
while (right < marks_count
|
|
|
|
&& getMark(right).offset_in_compressed_file
|
|
|
|
== getMark(all_mark_ranges[i].end).offset_in_compressed_file)
|
2016-12-10 04:51:36 +00:00
|
|
|
{
|
2016-11-20 12:43:20 +00:00
|
|
|
++right;
|
2016-12-10 04:51:36 +00:00
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// If there are no marks after the end of range, just use max_read_buffer_size
|
2017-01-19 11:40:06 +00:00
|
|
|
if (right >= marks_count
|
|
|
|
|| (right + 1 == marks_count
|
|
|
|
&& getMark(right).offset_in_compressed_file
|
|
|
|
== getMark(all_mark_ranges[i].end).offset_in_compressed_file))
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
|
|
|
max_mark_range = max_read_buffer_size;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
max_mark_range = std::max(max_mark_range,
|
2017-01-19 11:40:06 +00:00
|
|
|
getMark(right).offset_in_compressed_file - getMark(all_mark_ranges[i].begin).offset_in_compressed_file);
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Estimate size of the data to be read.
|
2016-11-20 12:43:20 +00:00
|
|
|
size_t estimated_size = 0;
|
|
|
|
if (aio_threshold > 0)
|
|
|
|
{
|
|
|
|
for (const auto & mark_range : all_mark_ranges)
|
|
|
|
{
|
2017-01-19 11:40:06 +00:00
|
|
|
size_t offset_begin = (mark_range.begin > 0)
|
|
|
|
? getMark(mark_range.begin).offset_in_compressed_file
|
|
|
|
: 0;
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
size_t offset_end = (mark_range.end < marks_count)
|
|
|
|
? getMark(mark_range.end).offset_in_compressed_file
|
|
|
|
: Poco::File(path_prefix + extension).getSize();
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
if (offset_end > offset_begin)
|
2016-11-20 12:43:20 +00:00
|
|
|
estimated_size += offset_end - offset_begin;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-10 04:51:36 +00:00
|
|
|
/// Initialize the objects that shall be used to perform read operations.
|
2016-11-20 12:43:20 +00:00
|
|
|
if (uncompressed_cache)
|
|
|
|
{
|
|
|
|
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
|
2016-12-10 04:51:36 +00:00
|
|
|
path_prefix + extension, uncompressed_cache, estimated_size, aio_threshold, buffer_size);
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
if (profile_callback)
|
|
|
|
buffer->setProfileCallback(profile_callback, clock_type);
|
|
|
|
|
|
|
|
cached_buffer = std::move(buffer);
|
|
|
|
data_buffer = cached_buffer.get();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
|
2016-12-12 07:25:31 +00:00
|
|
|
path_prefix + extension, estimated_size, aio_threshold, buffer_size);
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
if (profile_callback)
|
|
|
|
buffer->setProfileCallback(profile_callback, clock_type);
|
|
|
|
|
|
|
|
non_cached_buffer = std::move(buffer);
|
2016-11-20 13:17:51 +00:00
|
|
|
data_buffer = non_cached_buffer.get();
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-03 12:00:44 +00:00
|
|
|
std::unique_ptr<MergeTreeReader::Stream> MergeTreeReader::Stream::createEmptyPtr()
|
|
|
|
{
|
2016-12-10 06:10:29 +00:00
|
|
|
std::unique_ptr<Stream> res(new Stream);
|
2016-11-03 12:00:44 +00:00
|
|
|
res->is_empty = true;
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index)
|
|
|
|
{
|
|
|
|
if (!marks)
|
|
|
|
loadMarks();
|
|
|
|
return (*marks)[index];
|
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
void MergeTreeReader::Stream::loadMarks()
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2016-12-10 04:51:36 +00:00
|
|
|
std::string path;
|
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
if (isNullStream(extension))
|
2017-01-04 04:15:38 +00:00
|
|
|
path = path_prefix + ".null.mrk";
|
2016-12-10 04:51:36 +00:00
|
|
|
else
|
|
|
|
path = path_prefix + ".mrk";
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
UInt128 key;
|
2017-01-19 11:40:06 +00:00
|
|
|
if (mark_cache)
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2017-01-19 11:40:06 +00:00
|
|
|
key = mark_cache->hash(path);
|
|
|
|
marks = mark_cache->get(key);
|
2016-11-20 12:43:20 +00:00
|
|
|
if (marks)
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-11-22 20:55:45 +00:00
|
|
|
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
|
|
|
TemporarilyDisableMemoryTracker temporarily_disable_memory_tracker;
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
size_t file_size = Poco::File(path).getSize();
|
2017-01-19 11:40:06 +00:00
|
|
|
size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count;
|
|
|
|
if (expected_file_size != file_size)
|
|
|
|
throw Exception(
|
|
|
|
"bad size of marks file `" + path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
|
|
|
|
ErrorCodes::CORRUPTED_DATA);
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
marks = std::make_shared<MarksInCompressedFile>(marks_count);
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
/// Read directly to marks.
|
|
|
|
ReadBufferFromFile buffer(path, file_size, -1, reinterpret_cast<char *>(marks->data()));
|
|
|
|
|
|
|
|
if (buffer.eof() || buffer.buffer().size() != file_size)
|
|
|
|
throw Exception("Cannot read all marks from file " + path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
if (mark_cache && save_marks_in_cache)
|
|
|
|
mark_cache->set(key, marks);
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeReader::Stream::seekToMark(size_t index)
|
|
|
|
{
|
2017-01-19 11:40:06 +00:00
|
|
|
MarkInCompressedFile mark = getMark(index);
|
2016-11-20 12:43:20 +00:00
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (cached_buffer)
|
|
|
|
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
|
|
|
if (non_cached_buffer)
|
|
|
|
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Better diagnostics.
|
2016-11-20 12:43:20 +00:00
|
|
|
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
|
|
|
e.addMessage("(while seeking to mark " + toString(index)
|
|
|
|
+ " of column " + path_prefix + "; offsets are: "
|
|
|
|
+ toString(mark.offset_in_compressed_file) + " "
|
|
|
|
+ toString(mark.offset_in_decompressed_block) + ")");
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-07-19 10:57:57 +00:00
|
|
|
void MergeTreeReader::addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
|
2016-11-20 12:43:20 +00:00
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
|
|
|
|
size_t level)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
|
|
|
String escaped_column_name = escapeForFileName(name);
|
|
|
|
|
2016-11-03 12:00:44 +00:00
|
|
|
const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type);
|
2016-12-12 07:25:31 +00:00
|
|
|
bool data_file_exists = Poco::File(path + escaped_column_name + DATA_FILE_EXTENSION).exists();
|
2016-11-03 12:00:44 +00:00
|
|
|
bool is_column_of_nested_type = type_arr && level == 0 && DataTypeNested::extractNestedTableName(name) != name;
|
|
|
|
|
|
|
|
/** If data file is missing then we will not try to open it.
|
|
|
|
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
|
|
|
|
* But we should try to load offset data for array columns of Nested subtable (their data will be filled by default value).
|
2016-07-19 10:57:57 +00:00
|
|
|
*/
|
2016-11-03 12:00:44 +00:00
|
|
|
if (!data_file_exists && !is_column_of_nested_type)
|
2016-07-19 10:57:57 +00:00
|
|
|
return;
|
|
|
|
|
2016-07-21 16:22:24 +00:00
|
|
|
if (type.isNullable())
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2016-08-10 19:12:29 +00:00
|
|
|
/// First create the stream that handles the null map of the given column.
|
2016-07-21 16:22:24 +00:00
|
|
|
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
|
2016-08-10 19:12:29 +00:00
|
|
|
const IDataType & nested_type = *nullable_type.getNestedType();
|
|
|
|
|
|
|
|
std::string filename = name + NULL_MAP_EXTENSION;
|
|
|
|
|
|
|
|
streams.emplace(filename, std::make_unique<Stream>(
|
2017-01-19 11:40:06 +00:00
|
|
|
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->size,
|
|
|
|
all_mark_ranges, mark_cache, save_marks_in_cache,
|
|
|
|
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
2016-08-10 19:12:29 +00:00
|
|
|
|
|
|
|
/// Then create the stream that handles the data of the given column.
|
2016-07-21 16:22:24 +00:00
|
|
|
addStream(name, nested_type, all_mark_ranges, profile_callback, clock_type, level);
|
|
|
|
}
|
2017-01-24 17:25:47 +00:00
|
|
|
/// For arrays separate streams for sizes are used.
|
2016-12-10 04:51:36 +00:00
|
|
|
else if (type_arr)
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2016-07-19 10:57:57 +00:00
|
|
|
String size_name = DataTypeNested::extractNestedTableName(name)
|
|
|
|
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
|
|
|
String escaped_size_name = escapeForFileName(DataTypeNested::extractNestedTableName(name))
|
|
|
|
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
|
2016-12-12 07:25:31 +00:00
|
|
|
String size_path = path + escaped_size_name + DATA_FILE_EXTENSION;
|
2016-11-03 12:00:44 +00:00
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// We have neither offsets nor data -> skipping, default values will be filled after
|
2016-11-03 12:00:44 +00:00
|
|
|
if (!data_file_exists && !Poco::File(size_path).exists())
|
|
|
|
return;
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
if (!streams.count(size_name))
|
|
|
|
streams.emplace(size_name, std::make_unique<Stream>(
|
2017-01-19 11:40:06 +00:00
|
|
|
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->size,
|
|
|
|
all_mark_ranges, mark_cache, save_marks_in_cache,
|
|
|
|
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
2016-07-19 10:57:57 +00:00
|
|
|
|
2016-11-03 12:00:44 +00:00
|
|
|
if (data_file_exists)
|
|
|
|
addStream(name, *type_arr->getNestedType(), all_mark_ranges, profile_callback, clock_type, level + 1);
|
|
|
|
else
|
|
|
|
streams.emplace(name, Stream::createEmptyPtr());
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
streams.emplace(name, std::make_unique<Stream>(
|
2017-01-19 11:40:06 +00:00
|
|
|
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->size,
|
|
|
|
all_mark_ranges, mark_cache, save_marks_in_cache,
|
|
|
|
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
void MergeTreeReader::readData(
|
|
|
|
const String & name, const IDataType & type, IColumn & column,
|
2017-01-24 20:44:12 +00:00
|
|
|
size_t from_mark, size_t max_rows_to_read,
|
2017-01-19 11:40:06 +00:00
|
|
|
size_t level, bool read_offsets)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2016-07-21 16:22:24 +00:00
|
|
|
if (type.isNullable())
|
|
|
|
{
|
2016-08-10 19:12:29 +00:00
|
|
|
/// First read from the null map.
|
2016-07-21 16:22:24 +00:00
|
|
|
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
|
2016-08-10 19:12:29 +00:00
|
|
|
const IDataType & nested_type = *nullable_type.getNestedType();
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
|
2016-08-10 19:12:29 +00:00
|
|
|
IColumn & nested_col = *nullable_col.getNestedColumn();
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2016-08-10 19:12:29 +00:00
|
|
|
std::string filename = name + NULL_MAP_EXTENSION;
|
|
|
|
|
|
|
|
Stream & stream = *(streams.at(filename));
|
2017-01-24 20:44:12 +00:00
|
|
|
if (from_mark != cur_mark_idx)
|
|
|
|
stream.seekToMark(from_mark);
|
2017-01-02 23:08:09 +00:00
|
|
|
IColumn & col8 = nullable_col.getNullMapConcreteColumn();
|
2017-01-02 22:47:28 +00:00
|
|
|
DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0);
|
2016-08-10 19:12:29 +00:00
|
|
|
|
|
|
|
/// Then read data.
|
2017-01-24 20:44:12 +00:00
|
|
|
readData(name, nested_type, nested_col, from_mark, max_rows_to_read, level, read_offsets);
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-01-24 17:25:47 +00:00
|
|
|
/// For arrays the sizes must be deserialized first, then the values.
|
2016-07-19 10:57:57 +00:00
|
|
|
if (read_offsets)
|
|
|
|
{
|
|
|
|
Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
|
2017-01-24 20:44:12 +00:00
|
|
|
if (from_mark != cur_mark_idx)
|
|
|
|
stream.seekToMark(from_mark);
|
2016-07-19 10:57:57 +00:00
|
|
|
type_arr->deserializeOffsets(
|
|
|
|
column,
|
|
|
|
*stream.data_buffer,
|
|
|
|
max_rows_to_read);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (column.size())
|
|
|
|
{
|
|
|
|
ColumnArray & array = typeid_cast<ColumnArray &>(column);
|
|
|
|
const size_t required_internal_size = array.getOffsets()[column.size() - 1];
|
|
|
|
|
|
|
|
if (required_internal_size)
|
|
|
|
{
|
|
|
|
readData(
|
|
|
|
name,
|
|
|
|
*type_arr->getNestedType(),
|
|
|
|
array.getData(),
|
2017-01-24 20:44:12 +00:00
|
|
|
from_mark, required_internal_size - array.getData().size(),
|
2016-07-19 10:57:57 +00:00
|
|
|
level + 1);
|
|
|
|
|
|
|
|
size_t read_internal_size = array.getData().size();
|
2016-11-03 12:00:44 +00:00
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Fix for erroneously written empty files with array data.
|
|
|
|
/// This can happen after ALTER that adds new columns to nested data structures.
|
2016-07-19 10:57:57 +00:00
|
|
|
if (required_internal_size != read_internal_size)
|
|
|
|
{
|
|
|
|
if (read_internal_size != 0)
|
|
|
|
LOG_ERROR((&Logger::get("MergeTreeReader")),
|
|
|
|
"Internal size of array " + name + " doesn't match offsets: corrupted data, filling with default values.");
|
|
|
|
|
|
|
|
array.getDataPtr() = dynamic_cast<IColumnConst &>(
|
|
|
|
*type_arr->getNestedType()->createConstColumn(
|
|
|
|
required_internal_size,
|
|
|
|
type_arr->getNestedType()->getDefault())).convertToFullColumn();
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// NOTE: we could zero this column so that it won't get added to the block
|
|
|
|
/// and later be recreated with more correct default values (from the table definition).
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
Stream & stream = *streams[name];
|
2016-11-03 12:00:44 +00:00
|
|
|
|
|
|
|
/// It means that data column of array column will be empty, and it will be replaced by const data column
|
2016-12-10 06:10:29 +00:00
|
|
|
if (stream.isEmpty())
|
2016-11-03 12:00:44 +00:00
|
|
|
return;
|
|
|
|
|
2016-07-19 10:57:57 +00:00
|
|
|
double & avg_value_size_hint = avg_value_size_hints[name];
|
2017-01-24 20:44:12 +00:00
|
|
|
if (from_mark != cur_mark_idx)
|
|
|
|
stream.seekToMark(from_mark);
|
2017-01-02 22:47:28 +00:00
|
|
|
type.deserializeBinaryBulk(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);
|
2016-07-19 10:57:57 +00:00
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Calculate the average value size hint.
|
2016-07-19 10:57:57 +00:00
|
|
|
size_t column_size = column.size();
|
|
|
|
if (column_size)
|
|
|
|
{
|
|
|
|
double current_avg_value_size = static_cast<double>(column.byteSize()) / column_size;
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Heuristic is chosen so that avg_value_size_hint increases rapidly but decreases slowly.
|
2016-07-19 10:57:57 +00:00
|
|
|
if (current_avg_value_size > avg_value_size_hint)
|
|
|
|
avg_value_size_hint = current_avg_value_size;
|
|
|
|
else if (current_avg_value_size * 2 < avg_value_size_hint)
|
|
|
|
avg_value_size_hint = (current_avg_value_size + avg_value_size_hint * 3) / 4;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeReader::fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder)
|
|
|
|
{
|
|
|
|
if (!res)
|
|
|
|
throw Exception("Empty block passed to fillMissingColumnsImpl", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2017-01-24 17:25:47 +00:00
|
|
|
/// For a missing column of a nested data structure we must create not a column of empty
|
|
|
|
/// arrays, but a column of arrays of correct length.
|
|
|
|
/// TODO: If for some nested data structure only missing columns were selected, the arrays in these columns will be empty,
|
|
|
|
/// even if the offsets for this nested structure are present in the current part. This can be fixed.
|
|
|
|
/// NOTE: Similar, but slightly different code is present in Block::addDefaults.
|
|
|
|
|
|
|
|
/// First, collect offset columns for all arrays in the block.
|
2016-07-19 10:57:57 +00:00
|
|
|
OffsetColumns offset_columns;
|
|
|
|
for (size_t i = 0; i < res.columns(); ++i)
|
|
|
|
{
|
2017-01-02 20:12:12 +00:00
|
|
|
const ColumnWithTypeAndName & column = res.safeGetByPosition(i);
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2016-08-10 19:12:29 +00:00
|
|
|
IColumn * observed_column;
|
2016-07-21 16:22:24 +00:00
|
|
|
std::string column_name;
|
2016-08-10 19:12:29 +00:00
|
|
|
if (column.column->isNullable())
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
2016-08-10 19:12:29 +00:00
|
|
|
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(*(column.column));
|
|
|
|
observed_column = nullable_col.getNestedColumn().get();
|
|
|
|
column_name = observed_column->getName();
|
2016-07-21 16:22:24 +00:00
|
|
|
}
|
|
|
|
else
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2016-08-10 19:12:29 +00:00
|
|
|
observed_column = column.column.get();
|
2016-07-21 16:22:24 +00:00
|
|
|
column_name = column.name;
|
|
|
|
}
|
|
|
|
|
2016-08-10 19:12:29 +00:00
|
|
|
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(observed_column))
|
2016-07-21 16:22:24 +00:00
|
|
|
{
|
|
|
|
String offsets_name = DataTypeNested::extractNestedTableName(column_name);
|
2016-07-19 10:57:57 +00:00
|
|
|
auto & offsets_column = offset_columns[offsets_name];
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
/// If for some reason multiple offsets columns are present for the same nested data structure,
|
|
|
|
/// choose the one that is not empty.
|
2016-07-19 10:57:57 +00:00
|
|
|
if (!offsets_column || offsets_column->empty())
|
|
|
|
offsets_column = array->getOffsetsColumn();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
auto should_evaluate_defaults = false;
|
|
|
|
auto should_sort = always_reorder;
|
|
|
|
|
|
|
|
for (const auto & requested_column : columns)
|
|
|
|
{
|
|
|
|
/// insert default values only for columns without default expressions
|
|
|
|
if (!res.has(requested_column.name))
|
|
|
|
{
|
|
|
|
should_sort = true;
|
|
|
|
if (storage.column_defaults.count(requested_column.name) != 0)
|
|
|
|
{
|
|
|
|
should_evaluate_defaults = true;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
ColumnWithTypeAndName column_to_add;
|
|
|
|
column_to_add.name = requested_column.name;
|
|
|
|
column_to_add.type = requested_column.type;
|
|
|
|
|
|
|
|
String offsets_name = DataTypeNested::extractNestedTableName(column_to_add.name);
|
|
|
|
if (offset_columns.count(offsets_name))
|
|
|
|
{
|
|
|
|
ColumnPtr offsets_column = offset_columns[offsets_name];
|
|
|
|
DataTypePtr nested_type = typeid_cast<DataTypeArray &>(*column_to_add.type).getNestedType();
|
|
|
|
size_t nested_rows = offsets_column->empty() ? 0
|
|
|
|
: typeid_cast<ColumnUInt64 &>(*offsets_column).getData().back();
|
|
|
|
|
|
|
|
ColumnPtr nested_column = dynamic_cast<IColumnConst &>(*nested_type->createConstColumn(
|
|
|
|
nested_rows, nested_type->getDefault())).convertToFullColumn();
|
|
|
|
|
|
|
|
column_to_add.column = std::make_shared<ColumnArray>(nested_column, offsets_column);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2017-01-24 17:25:47 +00:00
|
|
|
/// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere
|
|
|
|
/// but in some blocks (from other parts) it can be a full column.
|
2016-07-19 10:57:57 +00:00
|
|
|
column_to_add.column = dynamic_cast<IColumnConst &>(*column_to_add.type->createConstColumn(
|
|
|
|
res.rows(), column_to_add.type->getDefault())).convertToFullColumn();
|
|
|
|
}
|
|
|
|
|
2016-08-05 15:44:19 +00:00
|
|
|
res.insert(std::move(column_to_add));
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// evaluate defaulted columns if necessary
|
|
|
|
if (should_evaluate_defaults)
|
|
|
|
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
|
|
|
|
|
|
|
|
/// sort columns to ensure consistent order among all blocks
|
|
|
|
if (should_sort)
|
|
|
|
{
|
|
|
|
Block ordered_block;
|
|
|
|
|
|
|
|
for (const auto & name : ordered_names)
|
|
|
|
if (res.has(name))
|
|
|
|
ordered_block.insert(res.getByName(name));
|
|
|
|
|
|
|
|
std::swap(res, ordered_block);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
2017-01-24 17:25:47 +00:00
|
|
|
/// Better diagnostics.
|
2016-07-19 10:57:57 +00:00
|
|
|
e.addMessage("(while reading from part " + path + ")");
|
2016-08-11 14:15:27 +00:00
|
|
|
throw;
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|