2019-10-10 16:30:30 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeReaderWide.h>
|
2020-02-25 09:49:45 +00:00
|
|
|
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Columns/ColumnArray.h>
|
2021-03-12 16:33:41 +00:00
|
|
|
#include <Columns/ColumnSparse.h>
|
2020-02-25 09:49:45 +00:00
|
|
|
#include <DataTypes/DataTypeArray.h>
|
|
|
|
#include <DataTypes/NestedUtils.h>
|
2022-07-26 17:31:56 +00:00
|
|
|
#include <DataTypes/DataTypeNested.h>
|
2020-02-25 08:53:14 +00:00
|
|
|
#include <Interpreters/inplaceBlockConversions.h>
|
2020-02-25 09:49:45 +00:00
|
|
|
#include <Storages/MergeTree/IMergeTreeReader.h>
|
2020-02-10 20:27:06 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
|
2020-02-25 09:49:45 +00:00
|
|
|
#include <Common/escapeForFileName.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2016-07-19 10:57:57 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using OffsetColumns = std::map<std::string, ColumnPtr>;
|
|
|
|
constexpr auto DATA_FILE_EXTENSION = ".bin";
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int MEMORY_LIMIT_EXCEEDED;
|
2016-11-20 12:43:20 +00:00
|
|
|
}
|
2016-07-21 16:22:24 +00:00
|
|
|
|
2020-02-10 20:27:06 +00:00
|
|
|
MergeTreeReaderWide::MergeTreeReaderWide(
|
2020-03-23 02:12:31 +00:00
|
|
|
DataPartWidePtr data_part_,
|
|
|
|
NamesAndTypesList columns_,
|
2020-06-17 16:39:58 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot_,
|
2020-02-10 20:27:06 +00:00
|
|
|
UncompressedCache * uncompressed_cache_,
|
|
|
|
MarkCache * mark_cache_,
|
2020-03-23 02:12:31 +00:00
|
|
|
MarkRanges mark_ranges_,
|
|
|
|
MergeTreeReaderSettings settings_,
|
|
|
|
IMergeTreeDataPart::ValueSizeMap avg_value_size_hints_,
|
2020-02-10 20:27:06 +00:00
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
|
|
|
|
clockid_t clock_type_)
|
2020-02-25 09:49:45 +00:00
|
|
|
: IMergeTreeReader(
|
2022-03-02 17:22:12 +00:00
|
|
|
data_part_,
|
|
|
|
columns_,
|
2020-06-17 16:39:58 +00:00
|
|
|
metadata_snapshot_,
|
|
|
|
uncompressed_cache_,
|
2022-03-02 17:22:12 +00:00
|
|
|
mark_cache_,
|
|
|
|
mark_ranges_,
|
|
|
|
settings_,
|
|
|
|
avg_value_size_hints_)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
2022-07-27 14:05:16 +00:00
|
|
|
for (size_t i = 0; i < columns_to_read.size(); ++i)
|
|
|
|
addStreams(columns_to_read[i], serializations[i], profile_callback_, clock_type_);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2022-01-31 20:47:04 +00:00
|
|
|
storage.reportBrokenPart(data_part);
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
2021-10-15 08:36:26 +00:00
|
|
|
size_t MergeTreeReaderWide::readRows(
|
|
|
|
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2017-06-16 20:11:02 +00:00
|
|
|
size_t read_rows = 0;
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
2022-06-10 09:44:53 +00:00
|
|
|
size_t num_columns = res_columns.size();
|
2020-04-14 19:47:19 +00:00
|
|
|
checkNumberOfColumns(num_columns);
|
2019-09-23 19:22:02 +00:00
|
|
|
|
2022-06-10 09:44:53 +00:00
|
|
|
if (num_columns == 0)
|
|
|
|
return max_rows_to_read;
|
|
|
|
|
2021-03-09 14:46:52 +00:00
|
|
|
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-10-05 09:11:25 +00:00
|
|
|
std::unordered_set<std::string> prefetched_streams;
|
2022-04-12 18:59:49 +00:00
|
|
|
if (data_part->data_part_storage->isStoredOnRemoteDisk() ? settings.read_settings.remote_fs_prefetch : settings.read_settings.local_fs_prefetch)
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2021-08-24 23:38:08 +00:00
|
|
|
/// Request reading of data in advance,
|
|
|
|
/// so if reading can be asynchronous, it will also be performed in parallel for all columns.
|
2022-07-27 14:05:16 +00:00
|
|
|
for (size_t pos = 0; pos < num_columns; ++pos)
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2021-08-24 23:38:08 +00:00
|
|
|
try
|
|
|
|
{
|
2022-07-27 14:05:16 +00:00
|
|
|
auto & cache = caches[columns_to_read[pos].getNameInStorage()];
|
|
|
|
prefetch(columns_to_read[pos], serializations[pos], from_mark, continue_reading, current_task_last_mark, cache, prefetched_streams);
|
2021-08-24 23:38:08 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
2022-07-27 14:05:16 +00:00
|
|
|
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
|
2021-08-24 23:38:08 +00:00
|
|
|
throw;
|
|
|
|
}
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-27 14:05:16 +00:00
|
|
|
for (size_t pos = 0; pos < num_columns; ++pos)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2022-07-27 14:05:16 +00:00
|
|
|
const auto & column_to_read = columns_to_read[pos];
|
2019-09-23 19:22:02 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// The column is already present in the block so we will append the values to the end.
|
2019-09-23 19:22:02 +00:00
|
|
|
bool append = res_columns[pos] != nullptr;
|
2017-12-15 21:11:24 +00:00
|
|
|
if (!append)
|
2022-07-27 14:05:16 +00:00
|
|
|
res_columns[pos] = column_to_read.type->createColumn(*serializations[pos]);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-11-10 17:32:00 +00:00
|
|
|
auto & column = res_columns[pos];
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
2017-12-15 21:11:24 +00:00
|
|
|
size_t column_size_before_reading = column->size();
|
2022-07-27 14:05:16 +00:00
|
|
|
auto & cache = caches[column_to_read.getNameInStorage()];
|
2017-11-21 02:23:41 +00:00
|
|
|
|
2021-10-05 09:11:25 +00:00
|
|
|
readData(
|
2022-07-27 14:05:16 +00:00
|
|
|
column_to_read, serializations[pos], column,
|
|
|
|
from_mark, continue_reading, current_task_last_mark,
|
2021-10-05 09:11:25 +00:00
|
|
|
max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());
|
2017-11-21 02:23:41 +00:00
|
|
|
|
|
|
|
/// For elements of Nested, column_size_before_reading may be greater than column size
|
|
|
|
/// if offsets are not empty and were already read, but elements are empty.
|
2019-09-23 19:22:02 +00:00
|
|
|
if (!column->empty())
|
2017-12-15 21:11:24 +00:00
|
|
|
read_rows = std::max(read_rows, column->size() - column_size_before_reading);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
/// Better diagnostics.
|
2022-07-27 14:05:16 +00:00
|
|
|
e.addMessage("(while reading column " + column_to_read.name + ")");
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2019-09-23 19:22:02 +00:00
|
|
|
if (column->empty())
|
|
|
|
res_columns[pos] = nullptr;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2019-09-23 19:22:02 +00:00
|
|
|
/// NOTE: positions for all streams must be kept in sync.
|
|
|
|
/// In particular, even if for some streams there are no rows to be read,
|
2017-04-01 07:20:54 +00:00
|
|
|
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
|
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
2022-01-31 20:47:04 +00:00
|
|
|
storage.reportBrokenPart(data_part);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Better diagnostics.
|
2022-04-12 18:59:49 +00:00
|
|
|
e.addMessage("(while reading from part " + data_part->data_part_storage->getFullPath() + " "
|
2019-09-23 19:22:02 +00:00
|
|
|
"from mark " + toString(from_mark) + " "
|
|
|
|
"with max_rows_to_read = " + toString(max_rows_to_read) + ")");
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2022-01-31 20:47:04 +00:00
|
|
|
storage.reportBrokenPart(data_part);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
throw;
|
|
|
|
}
|
2017-06-16 20:11:02 +00:00
|
|
|
|
|
|
|
return read_rows;
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
2022-07-27 14:05:16 +00:00
|
|
|
void MergeTreeReaderWide::addStreams(
|
|
|
|
const NameAndTypePair & name_and_type,
|
|
|
|
const SerializationPtr & serialization,
|
|
|
|
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
|
|
|
|
clockid_t clock_type)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2021-03-09 14:46:52 +00:00
|
|
|
ISerialization::StreamCallback callback = [&] (const ISerialization::SubstreamPath & substream_path)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2021-03-09 14:46:52 +00:00
|
|
|
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
|
2017-11-20 02:15:15 +00:00
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
if (streams.contains(stream_name))
|
2017-08-07 07:31:16 +00:00
|
|
|
return;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
bool data_file_exists = data_part->checksums.files.contains(stream_name + DATA_FILE_EXTENSION);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-08-07 07:31:16 +00:00
|
|
|
/** If data file is missing then we will not try to open it.
|
|
|
|
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.
|
|
|
|
*/
|
2017-11-21 02:23:41 +00:00
|
|
|
if (!data_file_exists)
|
2017-04-01 07:20:54 +00:00
|
|
|
return;
|
2019-12-18 16:41:11 +00:00
|
|
|
|
2022-05-02 18:06:47 +00:00
|
|
|
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
|
|
|
|
|
2019-02-05 14:50:25 +00:00
|
|
|
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
|
2022-04-12 18:59:49 +00:00
|
|
|
data_part->data_part_storage, stream_name, DATA_FILE_EXTENSION,
|
2020-02-27 16:47:40 +00:00
|
|
|
data_part->getMarksCount(), all_mark_ranges, settings, mark_cache,
|
2019-02-27 20:02:48 +00:00
|
|
|
uncompressed_cache, data_part->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
|
2019-06-19 10:07:56 +00:00
|
|
|
&data_part->index_granularity_info,
|
2022-05-02 18:06:47 +00:00
|
|
|
profile_callback, clock_type, is_lc_dict));
|
2017-08-07 07:31:16 +00:00
|
|
|
};
|
|
|
|
|
2022-07-27 14:05:16 +00:00
|
|
|
serialization->enumerateStreams(callback);
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
static ReadBuffer * getStream(
|
2021-10-05 09:11:25 +00:00
|
|
|
bool seek_to_start,
|
2021-07-26 00:34:36 +00:00
|
|
|
const ISerialization::SubstreamPath & substream_path,
|
|
|
|
MergeTreeReaderWide::FileStreams & streams,
|
|
|
|
const NameAndTypePair & name_and_type,
|
2021-10-05 09:11:25 +00:00
|
|
|
size_t from_mark, bool seek_to_mark,
|
2021-10-15 08:36:26 +00:00
|
|
|
size_t current_task_last_mark,
|
2021-03-09 14:46:52 +00:00
|
|
|
ISerialization::SubstreamsCache & cache)
|
2016-07-19 10:57:57 +00:00
|
|
|
{
|
2021-07-26 00:34:36 +00:00
|
|
|
/// If substream have already been read.
|
2022-04-18 10:18:43 +00:00
|
|
|
if (cache.contains(ISerialization::getSubcolumnNameForStream(substream_path)))
|
2021-07-26 00:34:36 +00:00
|
|
|
return nullptr;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
auto it = streams.find(stream_name);
|
|
|
|
if (it == streams.end())
|
|
|
|
return nullptr;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
MergeTreeReaderStream & stream = *it->second;
|
2022-02-16 13:35:23 +00:00
|
|
|
stream.adjustRightMark(current_task_last_mark);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-10-05 09:11:25 +00:00
|
|
|
if (seek_to_start)
|
2021-07-26 00:34:36 +00:00
|
|
|
stream.seekToStart();
|
2021-10-05 09:11:25 +00:00
|
|
|
else if (seek_to_mark)
|
2021-07-26 00:34:36 +00:00
|
|
|
stream.seekToMark(from_mark);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2022-04-29 17:39:06 +00:00
|
|
|
return stream.getDataBuffer();
|
2021-07-26 00:34:36 +00:00
|
|
|
}
|
2017-08-07 07:31:16 +00:00
|
|
|
|
2021-12-22 12:26:16 +00:00
|
|
|
void MergeTreeReaderWide::deserializePrefix(
|
|
|
|
const SerializationPtr & serialization,
|
|
|
|
const NameAndTypePair & name_and_type,
|
|
|
|
size_t current_task_last_mark,
|
|
|
|
ISerialization::SubstreamsCache & cache)
|
|
|
|
{
|
|
|
|
const auto & name = name_and_type.name;
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!deserialize_binary_bulk_state_map.contains(name))
|
2021-12-22 12:26:16 +00:00
|
|
|
{
|
|
|
|
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
|
|
|
|
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
|
|
|
|
{
|
|
|
|
return getStream(/* seek_to_start = */true, substream_path, streams, name_and_type, 0, /* seek_to_mark = */false, current_task_last_mark, cache);
|
|
|
|
};
|
|
|
|
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name]);
|
|
|
|
}
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
void MergeTreeReaderWide::prefetch(
|
|
|
|
const NameAndTypePair & name_and_type,
|
2022-07-27 14:05:16 +00:00
|
|
|
const SerializationPtr & serialization,
|
2021-07-26 00:34:36 +00:00
|
|
|
size_t from_mark,
|
|
|
|
bool continue_reading,
|
2021-10-15 08:36:26 +00:00
|
|
|
size_t current_task_last_mark,
|
2021-10-05 09:11:25 +00:00
|
|
|
ISerialization::SubstreamsCache & cache,
|
|
|
|
std::unordered_set<std::string> & prefetched_streams)
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2021-12-22 12:26:16 +00:00
|
|
|
deserializePrefix(serialization, name_and_type, current_task_last_mark, cache);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
serialization->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
|
|
|
|
{
|
2021-10-05 09:11:25 +00:00
|
|
|
String stream_name = ISerialization::getFileNameForStream(name_and_type, substream_path);
|
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!prefetched_streams.contains(stream_name))
|
2021-10-05 09:11:25 +00:00
|
|
|
{
|
|
|
|
bool seek_to_mark = !continue_reading;
|
2021-10-15 08:36:26 +00:00
|
|
|
if (ReadBuffer * buf = getStream(false, substream_path, streams, name_and_type, from_mark, seek_to_mark, current_task_last_mark, cache))
|
2021-10-05 09:11:25 +00:00
|
|
|
buf->prefetch();
|
|
|
|
|
|
|
|
prefetched_streams.insert(stream_name);
|
|
|
|
}
|
2021-07-26 00:34:36 +00:00
|
|
|
});
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-08-07 07:31:16 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
void MergeTreeReaderWide::readData(
|
2022-07-27 14:05:16 +00:00
|
|
|
const NameAndTypePair & name_and_type, const SerializationPtr & serialization, ColumnPtr & column,
|
2021-10-15 08:36:26 +00:00
|
|
|
size_t from_mark, bool continue_reading, size_t current_task_last_mark,
|
|
|
|
size_t max_rows_to_read, ISerialization::SubstreamsCache & cache, bool was_prefetched)
|
2021-07-26 00:34:36 +00:00
|
|
|
{
|
2020-09-14 11:22:17 +00:00
|
|
|
double & avg_value_size_hint = avg_value_size_hints[name_and_type.name];
|
2021-03-09 14:46:52 +00:00
|
|
|
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
|
2019-12-12 18:55:19 +00:00
|
|
|
deserialize_settings.avg_value_size_hint = avg_value_size_hint;
|
2018-05-21 16:21:15 +00:00
|
|
|
|
2021-12-22 12:26:16 +00:00
|
|
|
deserializePrefix(serialization, name_and_type, current_task_last_mark, cache);
|
2018-05-21 16:21:15 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
deserialize_settings.getter = [&](const ISerialization::SubstreamPath & substream_path)
|
|
|
|
{
|
2021-10-05 09:11:25 +00:00
|
|
|
bool seek_to_mark = !was_prefetched && !continue_reading;
|
|
|
|
|
|
|
|
return getStream(
|
|
|
|
/* seek_to_start = */false, substream_path, streams, name_and_type, from_mark,
|
2021-10-15 08:36:26 +00:00
|
|
|
seek_to_mark, current_task_last_mark, cache);
|
2021-07-26 00:34:36 +00:00
|
|
|
};
|
2019-12-12 18:55:19 +00:00
|
|
|
deserialize_settings.continuous_reading = continue_reading;
|
2022-07-27 14:05:16 +00:00
|
|
|
auto & deserialize_state = deserialize_binary_bulk_state_map[name_and_type.name];
|
2021-03-09 14:46:52 +00:00
|
|
|
|
2021-07-26 00:34:36 +00:00
|
|
|
serialization->deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, deserialize_settings, deserialize_state, &cache);
|
2020-11-10 17:32:00 +00:00
|
|
|
IDataType::updateAvgValueSizeHint(*column, avg_value_size_hint);
|
2016-07-19 10:57:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|