ClickHouse/src/Storages/MergeTree/MergeTreeReaderWide.cpp

288 lines
10 KiB
C++
Raw Normal View History

2019-10-10 16:30:30 +00:00
#include <Storages/MergeTree/MergeTreeReaderWide.h>
2020-02-25 09:49:45 +00:00
#include <Columns/ColumnArray.h>
2020-02-25 09:49:45 +00:00
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
2020-02-25 08:53:14 +00:00
#include <Interpreters/inplaceBlockConversions.h>
2020-02-25 09:49:45 +00:00
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
2020-02-25 09:49:45 +00:00
#include <Common/escapeForFileName.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
2016-07-19 10:57:57 +00:00
namespace DB
{
namespace
{
using OffsetColumns = std::map<std::string, ColumnPtr>;
constexpr auto DATA_FILE_EXTENSION = ".bin";
2016-07-19 10:57:57 +00:00
}
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
2016-11-20 12:43:20 +00:00
}
MergeTreeReaderWide::MergeTreeReaderWide(
DataPartWidePtr data_part_,
NamesAndTypesList columns_,
const StorageMetadataPtr & metadata_snapshot_,
UncompressedCache * uncompressed_cache_,
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_,
IMergeTreeDataPart::ValueSizeMap avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
2020-02-25 09:49:45 +00:00
: IMergeTreeReader(
std::move(data_part_),
std::move(columns_),
metadata_snapshot_,
uncompressed_cache_,
std::move(mark_cache_),
std::move(mark_ranges_),
std::move(settings_),
std::move(avg_value_size_hints_))
2016-07-19 10:57:57 +00:00
{
try
{
2021-08-24 22:24:47 +00:00
disk = data_part->volume->getDisk();
for (const NameAndTypePair & column : columns)
2020-02-25 08:53:14 +00:00
{
auto column_from_part = getColumnFromPart(column);
addStreams(column_from_part, profile_callback_, clock_type_);
2020-02-25 08:53:14 +00:00
}
}
catch (...)
{
storage.reportBrokenPart(data_part->name);
throw;
}
2016-07-19 10:57:57 +00:00
}
2019-11-13 01:57:45 +00:00
size_t MergeTreeReaderWide::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
2016-07-19 10:57:57 +00:00
{
size_t read_rows = 0;
try
{
2019-09-23 19:22:02 +00:00
size_t num_columns = columns.size();
2020-04-14 19:47:19 +00:00
checkNumberOfColumns(num_columns);
2019-09-23 19:22:02 +00:00
2021-03-09 14:46:52 +00:00
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
2021-10-05 09:11:25 +00:00
std::unordered_set<std::string> prefetched_streams;
2021-08-24 23:38:08 +00:00
if (disk->isRemote() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
2021-07-26 00:34:36 +00:00
{
2021-08-24 23:38:08 +00:00
/// Request reading of data in advance,
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
2021-07-26 00:34:36 +00:00
{
2021-08-24 23:38:08 +00:00
auto column_from_part = getColumnFromPart(*name_and_type);
try
{
auto & cache = caches[column_from_part.getNameInStorage()];
prefetch(column_from_part, from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams);
2021-08-24 23:38:08 +00:00
}
catch (Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading column " + column_from_part.name + ")");
throw;
}
2021-07-26 00:34:36 +00:00
}
}
2021-08-24 23:38:08 +00:00
auto name_and_type = columns.begin();
2021-10-05 09:11:25 +00:00
2021-07-26 00:34:36 +00:00
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
auto column_from_part = getColumnFromPart(*name_and_type);
const auto & [name, type] = column_from_part;
2019-09-23 19:22:02 +00:00
/// The column is already present in the block so we will append the values to the end.
2019-09-23 19:22:02 +00:00
bool append = res_columns[pos] != nullptr;
if (!append)
2020-02-25 08:53:14 +00:00
res_columns[pos] = type->createColumn();
2020-11-10 17:32:00 +00:00
auto & column = res_columns[pos];
try
{
size_t column_size_before_reading = column->size();
2020-12-22 15:03:48 +00:00
auto & cache = caches[column_from_part.getNameInStorage()];
2021-10-05 09:11:25 +00:00
readData(
column_from_part, column, from_mark, continue_reading, current_task_last_mark,
2021-10-05 09:11:25 +00:00
max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());
/// For elements of Nested, column_size_before_reading may be greater than column size
/// if offsets are not empty and were already read, but elements are empty.
2019-09-23 19:22:02 +00:00
if (!column->empty())
read_rows = std::max(read_rows, column->size() - column_size_before_reading);
}
catch (Exception & e)
{
/// Better diagnostics.
2019-09-23 19:22:02 +00:00
e.addMessage("(while reading column " + name + ")");
throw;
}
2019-09-23 19:22:02 +00:00
if (column->empty())
res_columns[pos] = nullptr;
}
2019-09-23 19:22:02 +00:00
/// NOTE: positions for all streams must be kept in sync.
/// In particular, even if for some streams there are no rows to be read,
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
/// Better diagnostics.
e.addMessage("(while reading from part " + data_part->getFullPath() + " "
2019-09-23 19:22:02 +00:00
"from mark " + toString(from_mark) + " "
"with max_rows_to_read = " + toString(max_rows_to_read) + ")");
throw;
}
catch (...)
{
storage.reportBrokenPart(data_part->name);
throw;
}
return read_rows;
2016-07-19 10:57:57 +00:00
}
void MergeTreeReaderWide::addStreams(const NameAndTypePair & name_and_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
2016-07-19 10:57:57 +00:00
{
2021-03-09 14:46:52 +00:00
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
{
2021-03-09 14:46:52 +00:00
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
if (streams.count(stream_name))
return;
bool data_file_exists = data_part->checksums.files.count(stream_name + DATA_FILE_EXTENSION);
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
*/
if (!data_file_exists)
return;
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
2021-08-24 22:24:47 +00:00
disk, data_part->getFullRelativePath() + stream_name, DATA_FILE_EXTENSION,
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
2019-02-27 20:02:48 +00:00
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
2019-06-19 10:07:56 +00:00
&data_part->index_granularity_info,
2018-11-15 14:06:54 +00:00
profile_callback, clock_type));
};
2021-03-09 14:46:52 +00:00
auto serialization = data_part->getSerializationForColumn(name_and_type);
serialization->enumerateStreams(callback);
serializations.emplace(name_and_type.name, std::move(serialization));
2016-07-19 10:57:57 +00:00
}
2021-07-26 00:34:36 +00:00
static ReadBuffer * getStream(
2021-10-05 09:11:25 +00:00
bool seek_to_start,
2021-07-26 00:34:36 +00:00
const ISerialization::SubstreamPath & substream_path,
MergeTreeReaderWide::FileStreams & streams,
const NameAndTypePair & name_and_type,
2021-10-05 09:11:25 +00:00
size_t from_mark, bool seek_to_mark,
size_t current_task_last_mark,
2021-03-09 14:46:52 +00:00
ISerialization::SubstreamsCache & cache)
2016-07-19 10:57:57 +00:00
{
2021-07-26 00:34:36 +00:00
/// If substream have already been read.
if (cache.count(ISerialization::getSubcolumnNameForStream(substream_path)))
return nullptr;
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
auto it = streams.find(stream_name);
if (it == streams.end())
return nullptr;
MergeTreeReaderStream & stream = *it->second;
stream.adjustForRange(seek_to_start ? 0 : from_mark, current_task_last_mark);
2021-07-26 00:34:36 +00:00
2021-10-05 09:11:25 +00:00
if (seek_to_start)
2021-07-26 00:34:36 +00:00
stream.seekToStart();
2021-10-05 09:11:25 +00:00
else if (seek_to_mark)
2021-07-26 00:34:36 +00:00
stream.seekToMark(from_mark);
2021-07-26 00:34:36 +00:00
return stream.data_buffer;
}
2021-07-26 00:34:36 +00:00
void MergeTreeReaderWide::prefetch(
const NameAndTypePair & name_and_type,
size_t from_mark,
bool continue_reading,
size_t current_task_last_mark,
2021-10-05 09:11:25 +00:00
ISerialization::SubstreamsCache & cache,
std::unordered_set<std::string> & prefetched_streams)
2021-07-26 00:34:36 +00:00
{
const auto & name = name_and_type.name;
auto & serialization = serializations[name];
2021-07-26 00:34:36 +00:00
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
2021-10-05 09:11:25 +00:00
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
if (!prefetched_streams.count(stream_name))
{
bool seek_to_mark = !continue_reading;
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
2021-10-05 09:11:25 +00:00
buf->prefetch();
prefetched_streams.insert(stream_name);
}
2021-07-26 00:34:36 +00:00
});
}
2021-07-26 00:34:36 +00:00
void MergeTreeReaderWide::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, bool continue_reading, size_t current_task_last_mark,
size_t max_rows_to_read, ISerialization::SubstreamsCache & cache, bool was_prefetched)
2021-07-26 00:34:36 +00:00
{
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
2021-03-09 14:46:52 +00:00
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
2019-12-12 18:55:19 +00:00
deserialize_settings.avg_value_size_hint = avg_value_size_hint;
2021-03-09 14:46:52 +00:00
const auto & name = name_and_type.name;
2021-07-26 00:34:36 +00:00
auto & serialization = serializations[name];
2021-03-09 14:46:52 +00:00
if (deserialize_binary_bulk_state_map.count(name) == 0)
{
2021-07-26 00:34:36 +00:00
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, from_mark, /* seek_to_mark = */false, current_task_last_mark, cache);
2021-07-26 00:34:36 +00:00
};
2021-03-09 14:46:52 +00:00
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
}
2021-07-26 00:34:36 +00:00
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
{
2021-10-05 09:11:25 +00:00
bool seek_to_mark = !was_prefetched && !continue_reading;
return getStream(
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
seek_to_mark, current_task_last_mark, cache);
2021-07-26 00:34:36 +00:00
};
2019-12-12 18:55:19 +00:00
deserialize_settings.continuous_reading = continue_reading;
2021-03-09 14:46:52 +00:00
auto & deserialize_state = deserialize_binary_bulk_state_map[name];
2021-07-26 00:34:36 +00:00
serialization->deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, deserialize_settings, deserialize_state, &cache);
2020-11-10 17:32:00 +00:00
IDataType::updateAvgValueSizeHint(*column, avg_value_size_hint);
2016-07-19 10:57:57 +00:00
}
}