2017-12-25 18:58:39 +00:00
|
|
|
#include <DataTypes/NestedUtils.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <DataTypes/DataTypeArray.h>
|
|
|
|
#include <Common/escapeForFileName.h>
|
2017-04-08 01:32:05 +00:00
|
|
|
#include <Common/MemoryTracker.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/CachedCompressedReadBuffer.h>
|
|
|
|
#include <IO/CompressedReadBufferFromFile.h>
|
|
|
|
#include <Columns/ColumnArray.h>
|
|
|
|
#include <Interpreters/evaluateMissingDefaults.h>
|
|
|
|
#include <Storages/MergeTree/MergeTreeReader.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
#include <Poco/File.h>
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using OffsetColumns = std::map<std::string, ColumnPtr>;
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
constexpr auto DATA_FILE_EXTENSION = ".bin";
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int NOT_FOUND_EXPECTED_DATA_PART;
|
|
|
|
extern const int MEMORY_LIMIT_EXCEEDED;
|
2017-12-20 08:14:33 +00:00
|
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
2016-07-21 16:22:24 +00:00
|
|
|
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
MergeTreeReader::~MergeTreeReader() = default;
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
|
2017-01-24 17:25:47 +00:00
|
|
|
MergeTreeReader::MergeTreeReader(const String & path,
|
2017-12-25 21:57:29 +00:00
|
|
|
const MergeTreeData::DataPartPtr & data_part, const NamesAndTypesList & columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
UncompressedCache * uncompressed_cache, MarkCache * mark_cache, bool save_marks_in_cache,
|
2018-10-17 03:13:00 +00:00
|
|
|
const MergeTreeData & storage, const MarkRanges & all_mark_ranges,
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t aio_threshold, size_t max_read_buffer_size, const ValueSizeMap & avg_value_size_hints,
|
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
|
|
|
|
clockid_t clock_type)
|
|
|
|
: avg_value_size_hints(avg_value_size_hints), path(path), data_part(data_part), columns(columns)
|
|
|
|
, uncompressed_cache(uncompressed_cache), mark_cache(mark_cache), save_marks_in_cache(save_marks_in_cache), storage(storage)
|
2018-05-21 14:15:55 +00:00
|
|
|
, all_mark_ranges(all_mark_ranges), aio_threshold(aio_threshold), max_read_buffer_size(max_read_buffer_size), index_granularity(storage.index_granularity)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
if (!Poco::File(path).exists())
|
|
|
|
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
|
|
|
|
|
2017-12-25 21:57:29 +00:00
|
|
|
for (const NameAndTypePair & column : columns)
|
2017-11-20 02:15:15 +00:00
|
|
|
addStreams(column.name, *column.type, all_mark_ranges, profile_callback, clock_type);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
throw;
|
|
|
|
}
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() const
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return avg_value_size_hints;
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-07-11 09:32:39 +00:00
|
|
|
size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-06-16 20:11:02 +00:00
|
|
|
size_t read_rows = 0;
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
/// Pointers to offset columns that are common to the nested data structure columns.
|
|
|
|
/// If append is true, then the value will be equal to nullptr and will be used only to
|
|
|
|
/// check that the offsets column has been already read.
|
|
|
|
OffsetColumns offset_columns;
|
|
|
|
|
2017-12-25 21:57:29 +00:00
|
|
|
for (const NameAndTypePair & it : columns)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
/// The column is already present in the block so we will append the values to the end.
|
|
|
|
bool append = res.has(it.name);
|
2017-12-15 21:11:24 +00:00
|
|
|
if (!append)
|
|
|
|
res.insert(ColumnWithTypeAndName(it.type->createColumn(), it.type, it.name));
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-18 05:37:20 +00:00
|
|
|
/// To keep offsets shared. TODO Very dangerous. Get rid of this.
|
|
|
|
MutableColumnPtr column = res.getByName(it.name).column->assumeMutable();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
bool read_offsets = true;
|
|
|
|
|
|
|
|
/// For nested data structures collect pointers to offset columns.
|
2017-12-15 21:11:24 +00:00
|
|
|
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(it.type.get()))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-12-25 18:58:39 +00:00
|
|
|
String name = Nested::extractTableName(it.name);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-15 21:11:24 +00:00
|
|
|
auto it_inserted = offset_columns.emplace(name, nullptr);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-17 07:15:13 +00:00
|
|
|
/// offsets have already been read on the previous iteration and we don't need to read it again
|
2017-12-15 21:11:24 +00:00
|
|
|
if (!it_inserted.second)
|
|
|
|
read_offsets = false;
|
2017-12-17 07:15:13 +00:00
|
|
|
|
|
|
|
/// need to create new offsets
|
|
|
|
if (it_inserted.second && !append)
|
|
|
|
it_inserted.first->second = ColumnArray::ColumnOffsets::create();
|
|
|
|
|
|
|
|
/// share offsets in all elements of nested structure
|
|
|
|
if (!append)
|
2018-03-20 14:17:09 +00:00
|
|
|
column = ColumnArray::create(type_arr->getNestedType()->createColumn(),
|
|
|
|
it_inserted.first->second)->assumeMutable();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2017-12-15 21:11:24 +00:00
|
|
|
size_t column_size_before_reading = column->size();
|
2017-11-21 02:23:41 +00:00
|
|
|
|
2017-12-15 21:11:24 +00:00
|
|
|
readData(it.name, *it.type, *column, from_mark, continue_reading, max_rows_to_read, read_offsets);
|
2017-11-21 02:23:41 +00:00
|
|
|
|
|
|
|
/// For elements of Nested, column_size_before_reading may be greater than column size
|
|
|
|
/// if offsets are not empty and were already read, but elements are empty.
|
2017-12-15 21:11:24 +00:00
|
|
|
if (column->size())
|
|
|
|
read_rows = std::max(read_rows, column->size() - column_size_before_reading);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
2017-12-15 21:11:24 +00:00
|
|
|
e.addMessage("(while reading column " + it.name + ")");
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2017-12-15 21:11:24 +00:00
|
|
|
if (column->size())
|
|
|
|
res.getByName(it.name).column = std::move(column);
|
2017-12-17 05:21:04 +00:00
|
|
|
else
|
|
|
|
res.erase(it.name);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// NOTE: positions for all streams must be kept in sync. In particular, even if for some streams there are no rows to be read,
|
|
|
|
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
|
|
|
|
/// Better diagnostics.
|
2017-06-14 10:50:22 +00:00
|
|
|
e.addMessage("(while reading from part " + path + " from mark " + toString(from_mark) + " with max_rows_to_read = " + toString(max_rows_to_read) + ")");
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
storage.reportBrokenPart(data_part->name);
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
2017-06-16 20:11:02 +00:00
|
|
|
|
|
|
|
return read_rows;
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2016-11-20 12:43:20 +00:00
|
|
|
MergeTreeReader::Stream::Stream(
|
2017-04-01 07:20:54 +00:00
|
|
|
const String & path_prefix_, const String & extension_, size_t marks_count_,
|
|
|
|
const MarkRanges & all_mark_ranges,
|
|
|
|
MarkCache * mark_cache_, bool save_marks_in_cache_,
|
|
|
|
UncompressedCache * uncompressed_cache,
|
|
|
|
size_t aio_threshold, size_t max_read_buffer_size,
|
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
|
|
|
: path_prefix(path_prefix_), extension(extension_), marks_count(marks_count_)
|
|
|
|
, mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_)
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Compute the size of the buffer.
|
|
|
|
size_t max_mark_range = 0;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < all_mark_ranges.size(); ++i)
|
|
|
|
{
|
|
|
|
size_t right = all_mark_ranges[i].end;
|
|
|
|
/// NOTE: if we are reading the whole file, then right == marks_count
|
|
|
|
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
|
|
|
|
|
|
|
|
/// If the end of range is inside the block, we will need to read it too.
|
|
|
|
if (right < marks_count && getMark(right).offset_in_decompressed_block > 0)
|
|
|
|
{
|
|
|
|
while (right < marks_count
|
|
|
|
&& getMark(right).offset_in_compressed_file
|
|
|
|
== getMark(all_mark_ranges[i].end).offset_in_compressed_file)
|
|
|
|
{
|
|
|
|
++right;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// If there are no marks after the end of range, just use max_read_buffer_size
|
|
|
|
if (right >= marks_count
|
|
|
|
|| (right + 1 == marks_count
|
|
|
|
&& getMark(right).offset_in_compressed_file
|
|
|
|
== getMark(all_mark_ranges[i].end).offset_in_compressed_file))
|
|
|
|
{
|
|
|
|
max_mark_range = max_read_buffer_size;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
max_mark_range = std::max(max_mark_range,
|
|
|
|
getMark(right).offset_in_compressed_file - getMark(all_mark_ranges[i].begin).offset_in_compressed_file);
|
|
|
|
}
|
|
|
|
|
2018-09-27 15:55:22 +00:00
|
|
|
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
|
2018-07-09 18:19:03 +00:00
|
|
|
/// For example: part has single dictionary and all marks point to the same position.
|
|
|
|
if (max_mark_range == 0)
|
|
|
|
max_mark_range = max_read_buffer_size;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t buffer_size = std::min(max_read_buffer_size, max_mark_range);
|
|
|
|
|
|
|
|
/// Estimate size of the data to be read.
|
|
|
|
size_t estimated_size = 0;
|
|
|
|
if (aio_threshold > 0)
|
|
|
|
{
|
|
|
|
for (const auto & mark_range : all_mark_ranges)
|
|
|
|
{
|
|
|
|
size_t offset_begin = (mark_range.begin > 0)
|
|
|
|
? getMark(mark_range.begin).offset_in_compressed_file
|
|
|
|
: 0;
|
|
|
|
|
|
|
|
size_t offset_end = (mark_range.end < marks_count)
|
|
|
|
? getMark(mark_range.end).offset_in_compressed_file
|
|
|
|
: Poco::File(path_prefix + extension).getSize();
|
|
|
|
|
|
|
|
if (offset_end > offset_begin)
|
|
|
|
estimated_size += offset_end - offset_begin;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Initialize the objects that shall be used to perform read operations.
|
|
|
|
if (uncompressed_cache)
|
|
|
|
{
|
|
|
|
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
|
|
|
|
path_prefix + extension, uncompressed_cache, estimated_size, aio_threshold, buffer_size);
|
|
|
|
|
|
|
|
if (profile_callback)
|
|
|
|
buffer->setProfileCallback(profile_callback, clock_type);
|
|
|
|
|
|
|
|
cached_buffer = std::move(buffer);
|
|
|
|
data_buffer = cached_buffer.get();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
|
|
|
|
path_prefix + extension, estimated_size, aio_threshold, buffer_size);
|
|
|
|
|
|
|
|
if (profile_callback)
|
|
|
|
buffer->setProfileCallback(profile_callback, clock_type);
|
|
|
|
|
|
|
|
non_cached_buffer = std::move(buffer);
|
|
|
|
data_buffer = non_cached_buffer.get();
|
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
2016-11-03 12:00:44 +00:00
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
const MarkInCompressedFile & MergeTreeReader::Stream::getMark(size_t index)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!marks)
|
|
|
|
loadMarks();
|
|
|
|
return (*marks)[index];
|
2017-01-19 11:40:06 +00:00
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
|
2017-11-21 02:25:36 +00:00
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
void MergeTreeReader::Stream::loadMarks()
|
2016-11-20 12:43:20 +00:00
|
|
|
{
|
2017-11-20 02:15:15 +00:00
|
|
|
std::string path = path_prefix + ".mrk";
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
auto load = [&]() -> MarkCache::MappedPtr
|
|
|
|
{
|
|
|
|
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
|
2018-05-31 15:54:08 +00:00
|
|
|
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
size_t file_size = Poco::File(path).getSize();
|
|
|
|
size_t expected_file_size = sizeof(MarkInCompressedFile) * marks_count;
|
|
|
|
if (expected_file_size != file_size)
|
|
|
|
throw Exception(
|
2018-11-26 00:56:50 +00:00
|
|
|
"bad size of marks file `" + path + "':" + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size),
|
2017-11-20 02:15:15 +00:00
|
|
|
ErrorCodes::CORRUPTED_DATA);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
auto res = std::make_shared<MarksInCompressedFile>(marks_count);
|
|
|
|
|
|
|
|
/// Read directly to marks.
|
|
|
|
ReadBufferFromFile buffer(path, file_size, -1, reinterpret_cast<char *>(res->data()));
|
|
|
|
|
|
|
|
if (buffer.eof() || buffer.buffer().size() != file_size)
|
|
|
|
throw Exception("Cannot read all marks from file " + path, ErrorCodes::CANNOT_READ_ALL_DATA);
|
|
|
|
|
|
|
|
return res;
|
|
|
|
};
|
|
|
|
|
|
|
|
if (mark_cache)
|
|
|
|
{
|
|
|
|
auto key = mark_cache->hash(path);
|
|
|
|
if (save_marks_in_cache)
|
|
|
|
{
|
|
|
|
marks = mark_cache->getOrSet(key, load);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
marks = mark_cache->get(key);
|
|
|
|
if (!marks)
|
|
|
|
marks = load();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
marks = load();
|
|
|
|
|
|
|
|
if (!marks)
|
|
|
|
throw Exception("Failed to load marks: " + path, ErrorCodes::LOGICAL_ERROR);
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void MergeTreeReader::Stream::seekToMark(size_t index)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
MarkInCompressedFile mark = getMark(index);
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (cached_buffer)
|
|
|
|
cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
|
|
|
if (non_cached_buffer)
|
|
|
|
non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
|
|
|
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
|
|
|
e.addMessage("(while seeking to mark " + toString(index)
|
|
|
|
+ " of column " + path_prefix + "; offsets are: "
|
|
|
|
+ toString(mark.offset_in_compressed_file) + " "
|
|
|
|
+ toString(mark.offset_in_decompressed_block) + ")");
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-07-09 18:19:03 +00:00
|
|
|
void MergeTreeReader::Stream::seekToStart()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (cached_buffer)
|
|
|
|
cached_buffer->seek(0, 0);
|
|
|
|
if (non_cached_buffer)
|
|
|
|
non_cached_buffer->seek(0, 0);
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
|
|
|
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
|
|
|
|
e.addMessage("(while seeking to start of column " + path_prefix + ")");
|
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-11-20 02:15:15 +00:00
|
|
|
void MergeTreeReader::addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
|
2017-08-07 07:31:16 +00:00
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-08-07 07:31:16 +00:00
|
|
|
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-08-07 07:31:16 +00:00
|
|
|
String stream_name = IDataType::getFileNameForStream(name, substream_path);
|
2017-11-20 02:15:15 +00:00
|
|
|
|
2017-08-07 07:31:16 +00:00
|
|
|
if (streams.count(stream_name))
|
|
|
|
return;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-08-07 07:31:16 +00:00
|
|
|
bool data_file_exists = Poco::File(path + stream_name + DATA_FILE_EXTENSION).exists();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-08-07 07:31:16 +00:00
|
|
|
/** If data file is missing then we will not try to open it.
|
|
|
|
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
|
|
|
|
*/
|
2017-11-21 02:23:41 +00:00
|
|
|
if (!data_file_exists)
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
|
|
|
|
2017-11-20 02:15:15 +00:00
|
|
|
streams.emplace(stream_name, std::make_unique<Stream>(
|
2017-11-19 23:16:18 +00:00
|
|
|
path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
|
2017-04-01 07:20:54 +00:00
|
|
|
all_mark_ranges, mark_cache, save_marks_in_cache,
|
|
|
|
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
|
2017-08-07 07:31:16 +00:00
|
|
|
};
|
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
IDataType::SubstreamPath path;
|
|
|
|
type.enumerateStreams(callback, path);
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-01-19 11:40:06 +00:00
|
|
|
void MergeTreeReader::readData(
|
2017-06-05 20:43:23 +00:00
|
|
|
const String & name, const IDataType & type, IColumn & column,
|
2017-07-11 09:32:39 +00:00
|
|
|
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
|
2017-08-07 07:31:16 +00:00
|
|
|
bool with_offsets)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2018-06-07 18:14:37 +00:00
|
|
|
auto get_stream_getter = [&](bool stream_for_prefix) -> IDataType::InputStreamGetter
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-08-09 09:28:52 +00:00
|
|
|
return [&, stream_for_prefix](const IDataType::SubstreamPath & path) -> ReadBuffer *
|
2018-06-07 18:14:37 +00:00
|
|
|
{
|
|
|
|
/// If offsets for arrays have already been read.
|
|
|
|
if (!with_offsets && path.size() == 1 && path[0].type == IDataType::Substream::ArraySizes)
|
|
|
|
return nullptr;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
String stream_name = IDataType::getFileNameForStream(name, path);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
auto it = streams.find(stream_name);
|
|
|
|
if (it == streams.end())
|
|
|
|
return nullptr;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
Stream & stream = *it->second;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-07-09 18:19:03 +00:00
|
|
|
if (stream_for_prefix)
|
|
|
|
{
|
|
|
|
stream.seekToStart();
|
|
|
|
continue_reading = false;
|
|
|
|
}
|
|
|
|
else if (!continue_reading)
|
2018-06-07 18:14:37 +00:00
|
|
|
stream.seekToMark(from_mark);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
return stream.data_buffer;
|
|
|
|
};
|
2017-08-07 07:31:16 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
double & avg_value_size_hint = avg_value_size_hints[name];
|
2018-06-07 18:14:37 +00:00
|
|
|
IDataType::DeserializeBinaryBulkSettings settings;
|
|
|
|
settings.avg_value_size_hint = avg_value_size_hint;
|
2018-05-21 16:21:15 +00:00
|
|
|
|
|
|
|
if (deserialize_binary_bulk_state_map.count(name) == 0)
|
2018-06-07 18:14:37 +00:00
|
|
|
{
|
|
|
|
settings.getter = get_stream_getter(true);
|
|
|
|
type.deserializeBinaryBulkStatePrefix(settings, deserialize_binary_bulk_state_map[name]);
|
|
|
|
}
|
2018-05-21 16:21:15 +00:00
|
|
|
|
2018-06-07 18:14:37 +00:00
|
|
|
settings.getter = get_stream_getter(false);
|
2018-08-21 12:31:09 +00:00
|
|
|
settings.continuous_reading = continue_reading;
|
2018-05-21 16:21:15 +00:00
|
|
|
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
|
2018-06-07 18:14:37 +00:00
|
|
|
type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_state);
|
2017-08-07 07:31:16 +00:00
|
|
|
IDataType::updateAvgValueSizeHint(column, avg_value_size_hint);
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-11-28 23:31:21 +00:00
|
|
|
static bool arrayHasNoElementsRead(const IColumn & column)
|
|
|
|
{
|
|
|
|
const ColumnArray * column_array = typeid_cast<const ColumnArray *>(&column);
|
|
|
|
|
|
|
|
if (!column_array)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
size_t size = column_array->size();
|
|
|
|
if (!size)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
size_t data_size = column_array->getData().size();
|
|
|
|
if (data_size)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
size_t last_offset = column_array->getOffsets()[size - 1];
|
|
|
|
return last_offset != 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-10-03 13:55:21 +00:00
|
|
|
void MergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults, size_t num_rows)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
/// For a missing column of a nested data structure we must create not a column of empty
|
|
|
|
/// arrays, but a column of arrays of correct length.
|
|
|
|
|
|
|
|
/// First, collect offset columns for all arrays in the block.
|
|
|
|
OffsetColumns offset_columns;
|
|
|
|
for (size_t i = 0; i < res.columns(); ++i)
|
|
|
|
{
|
|
|
|
const ColumnWithTypeAndName & column = res.safeGetByPosition(i);
|
|
|
|
|
2017-08-07 07:31:16 +00:00
|
|
|
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(column.column.get()))
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-12-25 18:58:39 +00:00
|
|
|
String offsets_name = Nested::extractTableName(column.name);
|
2017-04-01 07:20:54 +00:00
|
|
|
auto & offsets_column = offset_columns[offsets_name];
|
|
|
|
|
|
|
|
/// If for some reason multiple offsets columns are present for the same nested data structure,
|
|
|
|
/// choose the one that is not empty.
|
|
|
|
if (!offsets_column || offsets_column->empty())
|
2017-12-15 02:36:40 +00:00
|
|
|
offsets_column = array->getOffsetsPtr();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-02-22 12:43:57 +00:00
|
|
|
should_evaluate_missing_defaults = false;
|
|
|
|
should_reorder = false;
|
2017-11-28 23:31:21 +00:00
|
|
|
|
|
|
|
/// insert default values only for columns without default expressions
|
2017-04-01 07:20:54 +00:00
|
|
|
for (const auto & requested_column : columns)
|
|
|
|
{
|
2017-11-28 23:31:21 +00:00
|
|
|
bool has_column = res.has(requested_column.name);
|
|
|
|
if (has_column)
|
|
|
|
{
|
|
|
|
const auto & col = *res.getByName(requested_column.name).column;
|
|
|
|
if (arrayHasNoElementsRead(col))
|
|
|
|
{
|
|
|
|
res.erase(requested_column.name);
|
|
|
|
has_column = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!has_column)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-02-22 12:43:57 +00:00
|
|
|
should_reorder = true;
|
2018-03-13 14:18:11 +00:00
|
|
|
if (storage.getColumns().defaults.count(requested_column.name) != 0)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-02-22 12:43:57 +00:00
|
|
|
should_evaluate_missing_defaults = true;
|
2017-04-01 07:20:54 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
ColumnWithTypeAndName column_to_add;
|
|
|
|
column_to_add.name = requested_column.name;
|
|
|
|
column_to_add.type = requested_column.type;
|
|
|
|
|
2017-12-25 18:58:39 +00:00
|
|
|
String offsets_name = Nested::extractTableName(column_to_add.name);
|
2017-04-01 07:20:54 +00:00
|
|
|
if (offset_columns.count(offsets_name))
|
|
|
|
{
|
|
|
|
ColumnPtr offsets_column = offset_columns[offsets_name];
|
2017-12-18 01:11:48 +00:00
|
|
|
DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*column_to_add.type).getNestedType();
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t nested_rows = offsets_column->empty() ? 0
|
2017-12-15 21:11:24 +00:00
|
|
|
: typeid_cast<const ColumnUInt64 &>(*offsets_column).getData().back();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-18 04:07:26 +00:00
|
|
|
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-14 01:43:19 +00:00
|
|
|
column_to_add.column = ColumnArray::create(nested_column, offsets_column);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/// We must turn a constant column into a full column because the interpreter could infer that it is constant everywhere
|
|
|
|
/// but in some blocks (from other parts) it can be a full column.
|
2018-10-03 13:55:21 +00:00
|
|
|
column_to_add.column = column_to_add.type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
res.insert(std::move(column_to_add));
|
|
|
|
}
|
|
|
|
}
|
2018-02-22 12:43:57 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
|
|
|
e.addMessage("(while reading from part " + path + ")");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-04-16 12:21:36 +00:00
|
|
|
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names, const String * filter_name)
|
2018-02-22 12:43:57 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
Block ordered_block;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-02-22 12:43:57 +00:00
|
|
|
for (const auto & name : ordered_names)
|
|
|
|
if (res.has(name))
|
|
|
|
ordered_block.insert(res.getByName(name));
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-04-16 12:21:36 +00:00
|
|
|
if (filter_name && !ordered_block.has(*filter_name) && res.has(*filter_name))
|
|
|
|
ordered_block.insert(res.getByName(*filter_name));
|
|
|
|
|
2018-02-22 12:43:57 +00:00
|
|
|
std::swap(res, ordered_block);
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
|
|
|
e.addMessage("(while reading from part " + path + ")");
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-02-22 12:43:57 +00:00
|
|
|
void MergeTreeReader::evaluateMissingDefaults(Block & res)
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2018-04-01 19:29:08 +00:00
|
|
|
DB::evaluateMissingDefaults(res, columns, storage.getColumns().defaults, storage.context);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
|
|
|
e.addMessage("(while reading from part " + path + ")");
|
|
|
|
throw;
|
|
|
|
}
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|