Merge branch 'master' into enable-analyzer

This commit is contained in:
Dmitry Novik 2024-03-22 15:56:32 +01:00 committed by GitHub
commit df88eb26a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
41 changed files with 1016 additions and 806 deletions

View File

@ -4,7 +4,11 @@ sidebar_position: 50
sidebar_label: MySQL
---
# MySQL
import CloudNotSupportedBadge from '@theme/badges/CloudNotSupportedBadge';
# MySQL Database Engine
<CloudNotSupportedBadge />
Allows to connect to databases on a remote MySQL server and perform `INSERT` and `SELECT` queries to exchange data between ClickHouse and MySQL.

View File

@ -4,7 +4,11 @@ sidebar_position: 138
sidebar_label: MySQL
---
# MySQL
import CloudAvailableBadge from '@theme/badges/CloudAvailableBadge';
# MySQL Table Engine
<CloudAvailableBadge />
The MySQL engine allows you to perform `SELECT` and `INSERT` queries on data that is stored on a remote MySQL server.

View File

@ -21,6 +21,35 @@ The queries to terminate are selected from the system.processes table using the
Examples:
First, you'll need to get the list of incomplete queries. This SQL query provides them according to those running the longest:
List from a single ClickHouse node:
``` sql
SELECT
initial_query_id,
query_id,
formatReadableTimeDelta(elapsed) AS time_delta,
query,
*
FROM system.processes
WHERE query ILIKE 'SELECT%'
ORDER BY time_delta DESC;
```
List from a ClickHouse cluster:
``` sql
SELECT
initial_query_id,
query_id,
formatReadableTimeDelta(elapsed) AS time_delta,
query,
*
FROM clusterAllReplicas(default, system.processes)
WHERE query ILIKE 'SELECT%'
ORDER BY time_delta DESC;
```
Kill the query:
``` sql
-- Forcibly terminates all queries with the specified query_id:
KILL QUERY WHERE query_id='2-857d-4a57-9ee0-327da5d60a90'
@ -44,6 +73,11 @@ A test query (`TEST`) only checks the users rights and displays a list of que
## KILL MUTATION
The presence of long-running or incomplete mutations often indicates that a ClickHouse service is running poorly. The asynchronous nature of mutations can cause them to consume all available resources on a system. You may need to either:
- Pause all new mutations, `INSERT`s , and `SELECT`s and allow the queue of mutations to complete.
- Or manually kill some of these mutations by sending a `KILL` command.
``` sql
KILL MUTATION [ON CLUSTER cluster]
WHERE <where expression to SELECT FROM system.mutations query>
@ -57,6 +91,39 @@ A test query (`TEST`) only checks the users rights and displays a list of mut
Examples:
Get a `count()` of the number of incomplete mutations:
Count of mutations from a single ClickHouse node:
``` sql
SELECT count(*)
FROM system.mutations
WHERE is_done = 0;
```
Count of mutations from a ClickHouse cluster of replicas:
``` sql
SELECT count(*)
FROM clusterAllReplicas('default', system.mutations)
WHERE is_done = 0;
```
Query the list of incomplete mutations:
List of mutations from a single ClickHouse node:
``` sql
SELECT mutation_id, *
FROM system.mutations
WHERE is_done = 0;
```
List of mutations from a ClickHouse cluster:
``` sql
SELECT mutation_id, *
FROM clusterAllReplicas('default', system.mutations)
WHERE is_done = 0;
```
Kill the mutations as needed:
``` sql
-- Cancel and remove all mutations of the single table:
KILL MUTATION WHERE database = 'default' AND table = 'table'

View File

@ -1,10 +1,8 @@
#include "MergeTreeDataPartCompact.h"
#include <DataTypes/NestedUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergeTreeReaderCompactSingleBuffer.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Compression/CompressedReadBufferFromFile.h>
namespace DB
@ -41,21 +39,12 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
auto * load_marks_threadpool
= reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderCompact>(
read_info,
columns_to_read,
virtual_fields,
storage_snapshot,
uncompressed_cache,
mark_cache,
mark_ranges,
reader_settings,
load_marks_threadpool,
avg_value_size_hints,
profile_callback);
return std::make_unique<MergeTreeReaderCompactSingleBuffer>(
read_info, columns_to_read, virtual_fields,
storage_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
avg_value_size_hints, profile_callback, CLOCK_MONOTONIC_COARSE);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(

View File

@ -13,6 +13,11 @@ namespace DB
class MMappedFileCache;
using MMappedFileCachePtr = std::shared_ptr<MMappedFileCache>;
enum class CompactPartsReadMethod
{
SingleBuffer,
MultiBuffer,
};
struct MergeTreeReaderSettings
{
@ -25,12 +30,20 @@ struct MergeTreeReaderSettings
bool checksum_on_read = true;
/// True if we read in order of sorting key.
bool read_in_order = false;
/// Use one buffer for each column or for all columns while reading from compact.
CompactPartsReadMethod compact_parts_read_method = CompactPartsReadMethod::SingleBuffer;
/// True if we read stream for dictionary of LowCardinality type.
bool is_low_cardinality_dictionary = false;
/// True if data may be compressed by different codecs in one stream.
bool allow_different_codecs = false;
/// Deleted mask is applied to all reads except internal select from mutate some part columns.
bool apply_deleted_mask = true;
/// Put reading task in a common I/O pool, return Async state on prepare()
bool use_asynchronous_read_from_pool = false;
/// If PREWHERE has multiple conditions combined with AND, execute them in separate read/filtering steps.
bool enable_multiple_prewhere_read_steps = false;
/// If true, try to lower size of read buffer according to granule size and compressed block size.
bool adjust_read_buffer_size = true;
};
struct MergeTreeWriterSettings

View File

@ -20,14 +20,23 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
auto context = part->storage.getContext();
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderStream>(
auto marks_loader = std::make_shared<MergeTreeMarksLoader>(
std::make_shared<LoadedMergeTreeDataPartInfoForReader>(part, std::make_shared<AlterConversions>()),
mark_cache,
part->index_granularity_info.getMarksFilePath(index->getFileName()),
marks_count,
part->index_granularity_info,
settings.save_marks_in_cache,
settings.read_settings,
load_marks_threadpool,
/*num_columns_in_mark=*/ 1);
return std::make_unique<MergeTreeReaderStreamSingleColumn>(
part->getDataPartStoragePtr(),
index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), mark_cache, uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE, false, load_marks_threadpool);
all_mark_ranges, std::move(settings), uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension), std::move(marks_loader),
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE);
}
}

View File

@ -28,6 +28,23 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
MergeTreeMarksGetter::MergeTreeMarksGetter(MarkCache::MappedPtr marks_, size_t num_columns_in_mark_)
: marks(std::move(marks_)), num_columns_in_mark(num_columns_in_mark_)
{
assert(marks);
}
MarkInCompressedFile MergeTreeMarksGetter::getMark(size_t row_index, size_t column_index) const
{
#ifndef NDEBUG
if (column_index >= num_columns_in_mark)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Column index: {} is out of range [0, {})", column_index, num_columns_in_mark);
#endif
return marks->get(row_index * num_columns_in_mark + column_index);
}
MergeTreeMarksLoader::MergeTreeMarksLoader(
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
@ -37,58 +54,49 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
bool save_marks_in_cache_,
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_)
size_t num_columns_in_mark_)
: data_part_reader(data_part_reader_)
, mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, marks_count(marks_count_)
, index_granularity_info(index_granularity_info_)
, save_marks_in_cache(save_marks_in_cache_)
, columns_in_mark(columns_in_mark_)
, read_settings(read_settings_)
, num_columns_in_mark(num_columns_in_mark_)
, load_marks_threadpool(load_marks_threadpool_)
{
if (load_marks_threadpool)
{
future = loadMarksAsync();
}
}
MergeTreeMarksLoader::~MergeTreeMarksLoader()
{
if (future.valid())
{
future.wait();
}
}
MarkInCompressedFile MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
MergeTreeMarksGetterPtr MergeTreeMarksLoader::loadMarks()
{
if (!marks)
std::lock_guard lock(load_mutex);
if (marks)
return std::make_unique<MergeTreeMarksGetter>(marks, num_columns_in_mark);
Stopwatch watch(CLOCK_MONOTONIC);
if (future.valid())
{
Stopwatch watch(CLOCK_MONOTONIC);
if (future.valid())
{
marks = future.get();
future = {};
}
else
{
marks = loadMarks();
}
watch.stop();
ProfileEvents::increment(ProfileEvents::WaitMarksLoadMicroseconds, watch.elapsedMicroseconds());
marks = future.get();
future = {};
}
else
{
marks = loadMarksSync();
}
#ifndef NDEBUG
if (column_index >= columns_in_mark)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column index: {} is out of range [0, {})", column_index, columns_in_mark);
#endif
return marks->get(row_index * columns_in_mark + column_index);
watch.stop();
ProfileEvents::increment(ProfileEvents::WaitMarksLoadMicroseconds, watch.elapsedMicroseconds());
return std::make_unique<MergeTreeMarksGetter>(marks, num_columns_in_mark);
}
@ -100,12 +108,12 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto data_part_storage = data_part_reader->getDataPartStorage();
size_t file_size = data_part_storage->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(num_columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
// We first read the marks into a temporary simple array, then compress them into a more compact
// representation.
PODArray<MarkInCompressedFile> plain_marks(marks_count * columns_in_mark); // temporary
PODArray<MarkInCompressedFile> plain_marks(marks_count * num_columns_in_mark); // temporary
auto full_mark_path = std::string(fs::path(data_part_storage->getFullPath()) / mrk_path);
if (file_size == 0 && marks_count != 0)
@ -159,7 +167,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
size_t granularity;
reader->readStrict(
reinterpret_cast<char *>(plain_marks.data() + i * columns_in_mark), columns_in_mark * sizeof(MarkInCompressedFile));
reinterpret_cast<char *>(plain_marks.data() + i * num_columns_in_mark), num_columns_in_mark * sizeof(MarkInCompressedFile));
readBinaryLittleEndian(granularity, *reader);
}
@ -182,13 +190,13 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * num_columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksMemoryBytes, res->approximateMemoryUsage());
return res;
}
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksSync()
{
MarkCache::MappedPtr loaded_marks;
@ -227,7 +235,7 @@ std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
[this]() -> MarkCache::MappedPtr
{
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
return loadMarks();
return loadMarksSync();
},
*load_marks_threadpool,
"LoadMarksThread");

View File

@ -10,13 +10,33 @@ namespace DB
{
struct MergeTreeIndexGranularityInfo;
using MarksPtr = MarkCache::MappedPtr;
class Threadpool;
/// Class that helps to get marks by indexes.
/// Always immutable and thread safe.
/// Marks can be shared between several threads
/// that read columns from the same file.
class MergeTreeMarksGetter
{
public:
MergeTreeMarksGetter(MarkCache::MappedPtr marks_, size_t num_columns_in_mark_);
MarkInCompressedFile getMark(size_t row_index, size_t column_index) const;
size_t getNumColumns() const { return num_columns_in_mark; }
private:
const MarkCache::MappedPtr marks;
const size_t num_columns_in_mark;
};
using MergeTreeMarksGetterPtr = std::unique_ptr<const MergeTreeMarksGetter>;
/// Class that helps to load marks on demand.
/// Thread safe, but locks while loading marks.
class MergeTreeMarksLoader
{
public:
using MarksPtr = MarkCache::MappedPtr;
MergeTreeMarksLoader(
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
@ -26,24 +46,27 @@ public:
bool save_marks_in_cache_,
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_ = 1);
size_t num_columns_in_mark_);
~MergeTreeMarksLoader();
MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0);
MergeTreeMarksGetterPtr loadMarks();
size_t getNumColumns() const { return num_columns_in_mark; }
private:
MergeTreeDataPartInfoForReaderPtr data_part_reader;
MarkCache * mark_cache = nullptr;
String mrk_path;
size_t marks_count;
const MergeTreeDataPartInfoForReaderPtr data_part_reader;
MarkCache * const mark_cache;
const String mrk_path;
const size_t marks_count;
const MergeTreeIndexGranularityInfo & index_granularity_info;
bool save_marks_in_cache = false;
size_t columns_in_mark;
MarkCache::MappedPtr marks;
ReadSettings read_settings;
const bool save_marks_in_cache;
const ReadSettings read_settings;
const size_t num_columns_in_mark;
MarkCache::MappedPtr loadMarks();
std::mutex load_mutex;
MarkCache::MappedPtr marks;
MarkCache::MappedPtr loadMarksSync();
std::future<MarkCache::MappedPtr> loadMarksAsync();
MarkCache::MappedPtr loadMarksImpl();
@ -51,4 +74,6 @@ private:
ThreadPool * load_marks_threadpool;
};
using MergeTreeMarksLoaderPtr = std::shared_ptr<MergeTreeMarksLoader>;
}

View File

@ -10,10 +10,8 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int ARGUMENT_OUT_OF_BOUND;
}
MergeTreeReaderCompact::MergeTreeReaderCompact(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
NamesAndTypesList columns_,
@ -23,7 +21,6 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_,
ThreadPool * load_marks_threadpool_,
ValueSizeMap avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
@ -37,91 +34,22 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
mark_ranges_,
settings_,
avg_value_size_hints_)
, marks_loader(
data_part_info_for_read_,
mark_cache,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),
data_part_info_for_read_->getIndexGranularityInfo(),
settings.save_marks_in_cache,
settings.read_settings,
load_marks_threadpool_,
data_part_info_for_read_->getColumns().size())
, marks_loader(std::make_shared<MergeTreeMarksLoader>(
data_part_info_for_read_,
mark_cache,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),
data_part_info_for_read_->getIndexGranularityInfo(),
settings.save_marks_in_cache,
settings.read_settings,
settings_.read_settings.load_marks_asynchronously
? &data_part_info_for_read_->getContext()->getLoadMarksThreadpool() : nullptr,
data_part_info_for_read_->getColumns().size()))
, profile_callback(profile_callback_)
, clock_type(clock_type_)
{
}
void MergeTreeReaderCompact::initialize()
{
try
{
fillColumnPositions();
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(*data_part_info_for_read, marks_loader, column_positions, all_mark_ranges);
if (buffer_size)
settings.read_settings = settings.read_settings.adjustBufferSize(buffer_size);
if (!settings.read_settings.local_fs_buffer_size || !settings.read_settings.remote_fs_buffer_size)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read to empty buffer.");
const String path = MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
auto data_part_storage = data_part_info_for_read->getDataPartStorage();
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
std::string(fs::path(data_part_storage->getFullPath()) / path),
[this, path, data_part_storage]()
{
return data_part_storage->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt);
},
uncompressed_cache,
/* allow_different_codecs = */ true);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
if (!settings.checksum_on_read)
buffer->disableChecksumming();
cached_buffer = std::move(buffer);
data_buffer = cached_buffer.get();
compressed_data_buffer = cached_buffer.get();
}
else
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part_storage->readFile(
path,
settings.read_settings,
std::nullopt, std::nullopt),
/* allow_different_codecs = */ true);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
if (!settings.checksum_on_read)
buffer->disableChecksumming();
non_cached_buffer = std::move(buffer);
data_buffer = non_cached_buffer.get();
compressed_data_buffer = non_cached_buffer.get();
}
}
catch (...)
{
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}
}
void MergeTreeReaderCompact::fillColumnPositions()
{
size_t columns_num = columns_to_read.size();
@ -150,31 +78,7 @@ void MergeTreeReaderCompact::fillColumnPositions()
/// we have to read its offsets if they exist.
if (!position && is_array)
{
NameAndTypePair column_to_read_with_subcolumns = column_to_read;
auto [name_in_storage, subcolumn_name] = Nested::splitName(column_to_read.name);
/// If it is a part of Nested, we need to get the column from
/// storage metadata which is converted to Nested type with subcolumns.
/// It is required for proper counting of shared streams.
if (!subcolumn_name.empty())
{
/// If column is renamed get the new name from storage metadata.
if (alter_conversions->columnHasNewName(name_in_storage))
name_in_storage = alter_conversions->getColumnNewName(name_in_storage);
if (!storage_columns_with_collected_nested)
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns_list = Nested::collect(storage_snapshot->getColumns(options));
storage_columns_with_collected_nested = ColumnsDescription(std::move(storage_columns_list));
}
column_to_read_with_subcolumns = storage_columns_with_collected_nested
->getColumnOrSubcolumn(
GetColumnsOptions::All,
Nested::concatenateName(name_in_storage, subcolumn_name));
}
auto column_to_read_with_subcolumns = getColumnConvertedToSubcolumnOfNested(column_to_read);
auto name_level_for_offsets = findColumnForOffsets(column_to_read_with_subcolumns);
if (name_level_for_offsets.has_value())
@ -191,189 +95,60 @@ void MergeTreeReaderCompact::fillColumnPositions()
}
}
size_t MergeTreeReaderCompact::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
NameAndTypePair MergeTreeReaderCompact::getColumnConvertedToSubcolumnOfNested(const NameAndTypePair & column)
{
if (!initialized)
if (!isArray(column.type))
return column;
/// If it is a part of Nested, we need to get the column from
/// storage metadata which is converted to Nested type with subcolumns.
/// It is required for proper counting of shared streams.
auto [name_in_storage, subcolumn_name] = Nested::splitName(column.name);
if (subcolumn_name.empty())
return column;
/// If column is renamed get the new name from storage metadata.
if (alter_conversions->columnHasNewName(name_in_storage))
name_in_storage = alter_conversions->getColumnNewName(name_in_storage);
if (!storage_columns_with_collected_nested)
{
initialize();
initialized = true;
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns_list = Nested::collect(storage_snapshot->getColumns(options));
storage_columns_with_collected_nested = ColumnsDescription(std::move(storage_columns_list));
}
if (continue_reading)
from_mark = next_mark;
size_t read_rows = 0;
size_t num_columns = columns_to_read.size();
checkNumberOfColumns(num_columns);
MutableColumns mutable_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
if (column_positions[i] && res_columns[i] == nullptr)
res_columns[i] = columns_to_read[i].type->createColumn(*serializations[i]);
}
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
/// If we need to read multiple subcolumns from a single column in storage,
/// we will read it this column only once and then reuse to extract all subcolumns.
std::unordered_map<String, ColumnPtr> columns_cache_for_subcolumns;
for (size_t pos = 0; pos < num_columns; ++pos)
{
if (!res_columns[pos])
continue;
try
{
auto & column = res_columns[pos];
size_t column_size_before_reading = column->size();
readData(columns_to_read[pos], column, from_mark, current_task_last_mark, *column_positions[pos], rows_to_read, columns_for_offsets[pos], columns_cache_for_subcolumns);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column != rows_to_read)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all data in MergeTreeReaderCompact. Rows read: {}. Rows expected: {}.",
read_rows_in_column, rows_to_read);
}
catch (...)
{
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
/// Better diagnostics.
try
{
rethrow_exception(std::current_exception());
}
catch (Exception & e)
{
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
}
throw;
}
}
++from_mark;
read_rows += rows_to_read;
}
next_mark = from_mark;
return read_rows;
return storage_columns_with_collected_nested->getColumnOrSubcolumn(
GetColumnsOptions::All,
Nested::concatenateName(name_in_storage, subcolumn_name));
}
void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
ColumnNameLevel name_level_for_offsets, std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns)
const NameAndTypePair & name_and_type,
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter)
{
const auto & [name, type] = name_and_type;
std::optional<NameAndTypePair> column_for_offsets;
if (name_level_for_offsets.has_value())
try
{
const auto & part_columns = data_part_info_for_read->getColumnsDescription();
column_for_offsets = part_columns.getPhysical(name_level_for_offsets->first);
}
const auto [name, type] = name_and_type;
size_t column_size_before_reading = column->size();
adjustUpperBound(current_task_last_mark); /// Must go before seek.
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.getter = getter;
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
if (!isContinuousReading(from_mark, column_position))
seekToMark(from_mark, column_position);
/// If we read only offsets we have to read prefix anyway
/// to preserve correctness of serialization.
auto buffer_getter_for_prefix = [&](const auto &) -> ReadBuffer *
{
return data_buffer;
};
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
{
/// Offset stream from another column could be read, in case of current
/// column does not exists (see findColumnForOffsets() in
/// MergeTreeReaderCompact::fillColumnPositions())
if (name_level_for_offsets.has_value())
if (name_and_type.isSubcolumn())
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (!is_offsets)
return nullptr;
const auto & type_in_storage = name_and_type.getTypeInStorage();
const auto & name_in_storage = name_and_type.getNameInStorage();
/// Offset stream can be read only from columns of current level or
/// below (since it is OK to read all parent streams from the
/// alternative).
///
/// Consider the following columns in nested "root":
/// - root.array Array(UInt8) - exists
/// - root.nested_array Array(Array(UInt8)) - does not exists (only_offsets_level=1)
///
/// For root.nested_array it will try to read multiple streams:
/// - offsets (substream_path = {ArraySizes})
/// OK
/// - root.nested_array elements (substream_path = {ArrayElements, ArraySizes})
/// NOT OK - cannot use root.array offsets stream for this
///
/// Here only_offsets_level is the level of the alternative stream,
/// and substream_path.size() is the level of the current stream.
if (name_level_for_offsets->second < ISerialization::getArrayLevel(substream_path))
return nullptr;
}
auto serialization = getSerializationInPart({name_in_storage, type_in_storage});
ColumnPtr temp_column = type_in_storage->createColumn(*serialization);
return data_buffer;
};
ISerialization::DeserializeBinaryBulkStatePtr state;
ISerialization::DeserializeBinaryBulkStatePtr state_for_prefix;
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
bool columns_cache_was_used = false;
if (name_and_type.isSubcolumn())
{
NameAndTypePair name_type_in_storage{name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()};
ColumnPtr temp_column;
auto it = columns_cache_for_subcolumns.find(name_type_in_storage.name);
if (!column_for_offsets && it != columns_cache_for_subcolumns.end())
{
temp_column = it->second;
auto subcolumn = name_type_in_storage.type->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
if (column->empty())
column = IColumn::mutate(subcolumn);
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
columns_cache_was_used = true;
}
else
{
/// In case of reading only offset use the correct serialization for reading of the prefix
auto serialization = getSerializationInPart(name_type_in_storage);
temp_column = name_type_in_storage.type->createColumn(*serialization);
if (column_for_offsets)
{
auto serialization_for_prefix = getSerializationInPart(*column_for_offsets);
deserialize_settings.getter = buffer_getter_for_prefix;
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
}
deserialize_settings.getter = buffer_getter;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, state, nullptr);
if (!column_for_offsets)
columns_cache_for_subcolumns[name_type_in_storage.name] = temp_column;
auto subcolumn = name_type_in_storage.type->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
serialization->deserializeBinaryBulkWithMultipleStreams(temp_column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map[name], nullptr);
auto subcolumn = type_in_storage->getSubcolumn(name_and_type.getSubcolumnName(), temp_column);
/// TODO: Avoid extra copying.
if (column->empty())
@ -381,185 +156,98 @@ void MergeTreeReaderCompact::readData(
else
column->assumeMutable()->insertRangeFrom(*subcolumn, 0, subcolumn->size());
}
}
else
{
/// In case of reading only offsets use the correct serialization for reading the prefix
auto serialization = getSerializationInPart(name_and_type);
if (column_for_offsets)
else
{
auto serialization_for_prefix = getSerializationInPart(*column_for_offsets);
deserialize_settings.getter = buffer_getter_for_prefix;
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
auto serialization = getSerializationInPart(name_and_type);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map[name], nullptr);
}
deserialize_settings.getter = buffer_getter;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, state, nullptr);
}
/// The buffer is left in inconsistent state after reading single offsets or using columns cache during subcolumns reading.
if (name_level_for_offsets.has_value() || columns_cache_was_used)
last_read_granule.reset();
else
last_read_granule.emplace(from_mark, column_position);
}
void MergeTreeReaderCompact::prefetchBeginOfRange(Priority priority)
try
{
if (!initialized)
{
initialize();
initialized = true;
}
adjustUpperBound(all_mark_ranges.back().end);
seekToMark(all_mark_ranges.front().begin, 0);
data_buffer->prefetch(priority);
}
catch (...)
{
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{
MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index);
try
{
compressed_data_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block);
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column != rows_to_read)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,
"Cannot read all data in MergeTreeReaderCompact. Rows read: {}. Rows expected: {}.",
read_rows_in_column, rows_to_read);
}
catch (Exception & e)
{
/// Better diagnostics.
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
e.addMessage("(while seeking to mark (" + toString(row_index) + ", " + toString(column_index) + ")");
e.addMessage("(while reading column " + name_and_type.name + ")");
throw;
}
}
void MergeTreeReaderCompact::adjustUpperBound(size_t last_mark)
void MergeTreeReaderCompact::readPrefix(
const NameAndTypePair & name_and_type,
const InputStreamGetter & buffer_getter,
const InputStreamGetter & buffer_getter_for_prefix,
const ColumnNameLevel & name_level_for_offsets)
{
size_t right_offset = 0;
if (last_mark < data_part_info_for_read->getMarksCount()) /// Otherwise read until the end of file
right_offset = marks_loader.getMark(last_mark).offset_in_compressed_file;
if (right_offset == 0)
try
{
/// If already reading till the end of file.
if (last_right_offset && *last_right_offset == 0)
return;
ISerialization::DeserializeBinaryBulkSettings deserialize_settings;
last_right_offset = 0; // Zero value means the end of file.
data_buffer->setReadUntilEnd();
if (name_level_for_offsets.has_value())
{
const auto & part_columns = data_part_info_for_read->getColumnsDescription();
auto column_for_offsets = part_columns.getPhysical(name_level_for_offsets->first);
auto serialization_for_prefix = getSerializationInPart(column_for_offsets);
deserialize_settings.getter = buffer_getter_for_prefix;
ISerialization::DeserializeBinaryBulkStatePtr state_for_prefix;
serialization_for_prefix->deserializeBinaryBulkStatePrefix(deserialize_settings, state_for_prefix);
}
SerializationPtr serialization;
if (name_and_type.isSubcolumn())
serialization = getSerializationInPart({name_and_type.getNameInStorage(), name_and_type.getTypeInStorage()});
else
serialization = getSerializationInPart(name_and_type);
deserialize_settings.getter = buffer_getter;
serialization->deserializeBinaryBulkStatePrefix(deserialize_settings, deserialize_binary_bulk_state_map[name_and_type.name]);
}
else
catch (Exception & e)
{
if (last_right_offset && right_offset <= last_right_offset.value())
return;
last_right_offset = right_offset;
data_buffer->setReadUntilPosition(right_offset);
e.addMessage("(while reading column " + name_and_type.name + ")");
throw;
}
}
bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_position)
void MergeTreeReaderCompact::createColumnsForReading(Columns & res_columns) const
{
if (!last_read_granule)
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (column_positions[i] && res_columns[i] == nullptr)
res_columns[i] = columns_to_read[i].type->createColumn(*serializations[i]);
}
}
bool MergeTreeReaderCompact::needSkipStream(size_t column_pos, const ISerialization::SubstreamPath & substream) const
{
/// Offset stream can be read only from columns of current level or
/// below (since it is OK to read all parent streams from the
/// alternative).
///
/// Consider the following columns in nested "root":
/// - root.array Array(UInt8) - exists
/// - root.nested_array Array(Array(UInt8)) - does not exists (only_offsets_level=1)
///
/// For root.nested_array it will try to read multiple streams:
/// - offsets (substream_path = {ArraySizes})
/// OK
/// - root.nested_array elements (substream_path = {ArrayElements, ArraySizes})
/// NOT OK - cannot use root.array offsets stream for this
///
/// Here only_offsets_level is the level of the alternative stream,
/// and substream_path.size() is the level of the current stream.
if (!columns_for_offsets[column_pos])
return false;
const auto & [last_mark, last_column] = *last_read_granule;
return (mark == last_mark && column_position == last_column + 1)
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part_info_for_read->getColumns().size() - 1);
}
namespace
{
/// A simple class that helps to iterate over 2-dim marks of compact parts.
class MarksCounter
{
public:
MarksCounter(size_t rows_num_, size_t columns_num_)
: rows_num(rows_num_), columns_num(columns_num_) {}
struct Iterator
{
size_t row;
size_t column;
MarksCounter * counter;
Iterator(size_t row_, size_t column_, MarksCounter * counter_)
: row(row_), column(column_), counter(counter_) {}
Iterator operator++()
{
if (column + 1 == counter->columns_num)
{
++row;
column = 0;
}
else
{
++column;
}
return *this;
}
bool operator==(const Iterator & other) const { return row == other.row && column == other.column; }
bool operator!=(const Iterator & other) const { return !(*this == other); }
};
Iterator get(size_t row, size_t column) { return Iterator(row, column, this); }
Iterator end() { return get(rows_num, 0); }
private:
size_t rows_num;
size_t columns_num;
};
}
size_t MergeTreeReaderCompact::getReadBufferSize(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges)
{
size_t buffer_size = 0;
size_t columns_num = column_positions.size();
size_t file_size = data_part_info_for_reader.getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
MarksCounter counter(data_part_info_for_reader.getMarksCount(), data_part_info_for_reader.getColumns().size());
for (const auto & mark_range : mark_ranges)
{
for (size_t mark = mark_range.begin; mark < mark_range.end; ++mark)
{
for (size_t i = 0; i < columns_num; ++i)
{
if (!column_positions[i])
continue;
auto it = counter.get(mark, *column_positions[i]);
size_t cur_offset = marks_loader.getMark(it.row, it.column).offset_in_compressed_file;
while (it != counter.end() && cur_offset == marks_loader.getMark(it.row, it.column).offset_in_compressed_file)
++it;
size_t next_offset = (it == counter.end() ? file_size : marks_loader.getMark(it.row, it.column).offset_in_compressed_file);
buffer_size = std::max(buffer_size, next_offset - cur_offset);
}
}
}
return buffer_size;
bool is_offsets = !substream.empty() && substream.back().type == ISerialization::Substream::ArraySizes;
return !is_offsets || columns_for_offsets[column_pos]->second < ISerialization::getArrayLevel(substream);
}
}

View File

@ -3,7 +3,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <IO/ReadBufferFromFileBase.h>
#include <DataTypes/Serializations/ISerialization.h>
namespace DB
{
@ -14,7 +14,7 @@ using DataPartCompactPtr = std::shared_ptr<const MergeTreeDataPartCompact>;
class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
/// Reader for compact parts
/// Base class of readers for compact parts.
class MergeTreeReaderCompact : public IMergeTreeReader
{
public:
@ -27,31 +27,38 @@ public:
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_,
ThreadPool * load_marks_threadpool_,
ValueSizeMap avg_value_size_hints_ = {},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = {},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
ValueSizeMap avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_);
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
bool canReadIncompleteGranules() const final { return false; }
bool canReadIncompleteGranules() const override { return false; }
void prefetchBeginOfRange(Priority priority) override;
private:
bool isContinuousReading(size_t mark, size_t column_position);
protected:
void fillColumnPositions();
void initialize();
NameAndTypePair getColumnConvertedToSubcolumnOfNested(const NameAndTypePair & column);
ReadBuffer * data_buffer;
CompressedReadBufferBase * compressed_data_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
using InputStreamGetter = ISerialization::InputStreamGetter;
MergeTreeMarksLoader marks_loader;
void readData(
const NameAndTypePair & name_and_type,
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter);
void readPrefix(
const NameAndTypePair & name_and_type,
const InputStreamGetter & buffer_getter,
const InputStreamGetter & buffer_getter_for_prefix,
const ColumnNameLevel & name_level_for_offsets);
void createColumnsForReading(Columns & res_columns) const;
bool needSkipStream(size_t column_pos, const ISerialization::SubstreamPath & substream) const;
const MergeTreeMarksLoaderPtr marks_loader;
MergeTreeMarksGetterPtr marks_getter;
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type;
/// Storage columns with collected separate arrays of Nested to columns of Nested type.
/// They maybe be needed for finding offsets of missed Nested columns in parts.
@ -67,32 +74,9 @@ private:
/// Element of the vector is the level of the alternative stream.
std::vector<ColumnNameLevel> columns_for_offsets;
/// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream.
std::optional<size_t> last_right_offset;
/// Mark to read in next 'readRows' call in case,
/// when 'continue_reading' is true.
size_t next_mark = 0;
std::optional<std::pair<size_t, size_t>> last_read_granule;
void seekToMark(size_t row_index, size_t column_index);
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
size_t current_task_last_mark, size_t column_position,
size_t rows_to_read, ColumnNameLevel name_level_for_offsets, std::unordered_map<String, ColumnPtr> & columns_cache_for_subcolumns);
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.
static size_t getReadBufferSize(
const IMergeTreeDataPartInfoForReader & data_part_info_for_reader,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges);
/// For asynchronous reading from remote fs.
void adjustUpperBound(size_t last_mark);
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type;
bool initialized = false;
};
}

View File

@ -0,0 +1,108 @@
#include <Storages/MergeTree/MergeTreeReaderCompactSingleBuffer.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
namespace DB
{
size_t MergeTreeReaderCompactSingleBuffer::readRows(
size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
try
{
init();
if (continue_reading)
from_mark = next_mark;
size_t read_rows = 0;
size_t num_columns = columns_to_read.size();
checkNumberOfColumns(num_columns);
createColumnsForReading(res_columns);
while (read_rows < max_rows_to_read)
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
for (size_t pos = 0; pos < num_columns; ++pos)
{
if (!res_columns[pos])
continue;
auto & column = res_columns[pos];
stream->adjustRightMark(current_task_last_mark); /// Must go before seek.
stream->seekToMarkAndColumn(from_mark, *column_positions[pos]);
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
{
if (needSkipStream(pos, substream_path))
return nullptr;
return stream->getDataBuffer();
};
/// If we read only offsets we have to read prefix anyway
/// to preserve correctness of serialization.
auto buffer_getter_for_prefix = [&](const auto &) -> ReadBuffer *
{
return stream->getDataBuffer();
};
readPrefix(columns_to_read[pos], buffer_getter, buffer_getter_for_prefix, columns_for_offsets[pos]);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter);
}
++from_mark;
read_rows += rows_to_read;
}
next_mark = from_mark;
return read_rows;
}
catch (...)
{
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
/// Better diagnostics.
try
{
rethrow_exception(std::current_exception());
}
catch (Exception & e)
{
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
}
throw;
}
void MergeTreeReaderCompactSingleBuffer::init()
try
{
if (initialized)
return;
auto stream_settings = settings;
stream_settings.allow_different_codecs = true;
stream = std::make_unique<MergeTreeReaderStreamAllOfMultipleColumns>(
data_part_info_for_read->getDataPartStorage(), MergeTreeDataPartCompact::DATA_FILE_NAME,
MergeTreeDataPartCompact::DATA_FILE_EXTENSION, data_part_info_for_read->getMarksCount(),
all_mark_ranges, stream_settings,uncompressed_cache,
data_part_info_for_read->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION),
marks_loader, profile_callback, clock_type);
initialized = true;
}
catch (...)
{
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
namespace DB
{
/// Reader for compact parts, that uses one buffer for
/// all column and doesn't support parallel prefetch of columns.
/// It's suitable for compact parts with small size of stripe.
class MergeTreeReaderCompactSingleBuffer : public MergeTreeReaderCompact
{
public:
template <typename... Args>
explicit MergeTreeReaderCompactSingleBuffer(Args &&... args)
: MergeTreeReaderCompact{std::forward<Args>(args)...}
{
fillColumnPositions();
}
/// Returns the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, size_t current_task_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
private:
void init();
bool initialized = false;
std::unique_ptr<MergeTreeReaderStream> stream;
};
}

View File

@ -16,43 +16,28 @@ namespace ErrorCodes
}
MergeTreeReaderStream::MergeTreeReaderStream(
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache_,
size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_,
bool is_low_cardinality_dictionary_,
ThreadPool * load_marks_cache_threadpool_)
: settings(settings_)
, profile_callback(profile_callback_)
DataPartStoragePtr data_part_storage_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
UncompressedCache * uncompressed_cache_,
size_t file_size_,
MergeTreeMarksLoaderPtr marks_loader_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
: profile_callback(profile_callback_)
, clock_type(clock_type_)
, all_mark_ranges(all_mark_ranges_)
, file_size(file_size_)
, uncompressed_cache(uncompressed_cache_)
, data_part_storage(data_part_reader_->getDataPartStorage())
, data_part_storage(std::move(data_part_storage_))
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, is_low_cardinality_dictionary(is_low_cardinality_dictionary_)
, uncompressed_cache(uncompressed_cache_)
, settings(settings_)
, marks_count(marks_count_)
, mark_cache(mark_cache_)
, save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(
data_part_reader_,
mark_cache,
index_granularity_info->getMarksFilePath(path_prefix),
marks_count,
*index_granularity_info,
save_marks_in_cache,
settings.read_settings,
load_marks_cache_threadpool_)
, file_size(file_size_)
, marks_loader(std::move(marks_loader_))
{
}
@ -60,21 +45,12 @@ void MergeTreeReaderStream::init()
{
if (initialized)
return;
initialized = true;
marks_getter = marks_loader->loadMarks();
/// Compute the size of the buffer.
size_t max_mark_range_bytes = 0;
size_t sum_mark_range_bytes = 0;
for (const auto & mark_range : all_mark_ranges)
{
size_t left_mark = mark_range.begin;
size_t right_mark = mark_range.end;
size_t left_offset = left_mark < marks_count ? marks_loader.getMark(left_mark).offset_in_compressed_file : 0;
auto mark_range_bytes = getRightOffset(right_mark) - left_offset;
max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes);
sum_mark_range_bytes += mark_range_bytes;
}
auto [max_mark_range_bytes, sum_mark_range_bytes] = estimateMarkRangeBytes(all_mark_ranges);
std::optional<size_t> estimated_sum_mark_range_bytes;
if (sum_mark_range_bytes)
@ -83,7 +59,7 @@ void MergeTreeReaderStream::init()
/// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality.
/// For example: part has single dictionary and all marks point to the same position.
ReadSettings read_settings = settings.read_settings;
if (max_mark_range_bytes != 0)
if (settings.adjust_read_buffer_size && max_mark_range_bytes != 0)
read_settings = read_settings.adjustBufferSize(max_mark_range_bytes);
//// Empty buffer does not makes progress.
@ -102,7 +78,8 @@ void MergeTreeReaderStream::init()
read_settings,
estimated_sum_mark_range_bytes, std::nullopt);
},
uncompressed_cache);
uncompressed_cache,
settings.allow_different_codecs);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
@ -121,7 +98,7 @@ void MergeTreeReaderStream::init()
path_prefix + data_file_extension,
read_settings,
estimated_sum_mark_range_bytes,
std::nullopt));
std::nullopt), settings.allow_different_codecs);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
@ -135,99 +112,10 @@ void MergeTreeReaderStream::init()
}
}
size_t MergeTreeReaderStream::getRightOffset(size_t right_mark)
{
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
/// Special case, can happen in Collapsing/Replacing engines
if (marks_count == 0)
return 0;
assert(right_mark <= marks_count);
if (0 < right_mark && right_mark < marks_count)
{
/// Find the right border of the last mark we need to read.
/// To do that let's find the upper bound of the offset of the last
/// included mark.
if (is_low_cardinality_dictionary)
{
/// In LowCardinality dictionary several consecutive marks can point to the same offset.
///
/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// So, that's why we have to read one extra granule to the right,
/// while reading dictionary of LowCardinality.
///
/// Example:
/// Mark 0, points to [0, 8]
/// Mark 1, points to [0, 8]
/// Mark 2, points to [0, 8]
/// Mark 3, points to [0, 8]
/// Mark 4, points to [42336, 2255]
/// Mark 5, points to [42336, 2255] <--- for example need to read until 5
/// Mark 6, points to [42336, 2255] <--- not suitable, because have same offset
/// Mark 7, points to [84995, 7738] <--- next different mark
/// Mark 8, points to [84995, 7738]
/// Mark 9, points to [126531, 8637] <--- what we are looking for
auto indices = collections::range(right_mark, marks_count);
auto next_different_mark = [&](auto lhs, auto rhs)
{
return marks_loader.getMark(lhs).asTuple() < marks_loader.getMark(rhs).asTuple();
};
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, std::move(next_different_mark));
if (it == indices.end())
return file_size;
right_mark = *it;
}
/// This is a good scenario. The compressed block is finished within the right mark,
/// and previous mark was different.
if (marks_loader.getMark(right_mark).offset_in_decompressed_block == 0
&& marks_loader.getMark(right_mark) != marks_loader.getMark(right_mark - 1))
return marks_loader.getMark(right_mark).offset_in_compressed_file;
/// If right_mark has non-zero offset in decompressed block, we have to
/// read its compressed block in a whole, because it may consist of data from previous granule.
///
/// For example:
/// Mark 6, points to [42336, 2255]
/// Mark 7, points to [84995, 7738] <--- right_mark
/// Mark 8, points to [84995, 7738]
/// Mark 9, points to [126531, 8637] <--- what we are looking for
///
/// Since mark 7 starts from offset in decompressed block 7738,
/// it has some data from mark 6 and we have to read
/// compressed block [84995; 126531 in a whole.
auto indices = collections::range(right_mark, marks_count);
auto next_different_compressed_offset = [&](auto lhs, auto rhs)
{
return marks_loader.getMark(lhs).offset_in_compressed_file < marks_loader.getMark(rhs).offset_in_compressed_file;
};
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, std::move(next_different_compressed_offset));
if (it != indices.end())
return marks_loader.getMark(*it).offset_in_compressed_file;
}
else if (right_mark == 0)
return marks_loader.getMark(right_mark).offset_in_compressed_file;
return file_size;
}
void MergeTreeReaderStream::seekToMark(size_t index)
void MergeTreeReaderStream::seekToMarkAndColumn(size_t row_index, size_t column_position)
{
init();
MarkInCompressedFile mark = marks_loader.getMark(index);
const auto & mark = marks_getter->getMark(row_index, column_position);
try
{
@ -237,7 +125,7 @@ void MergeTreeReaderStream::seekToMark(size_t index)
{
/// Better diagnostics.
if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND)
e.addMessage("(while seeking to mark " + toString(index)
e.addMessage("(while seeking to mark " + toString(row_index)
+ " of column " + path_prefix + "; offsets are: "
+ toString(mark.offset_in_compressed_file) + " "
+ toString(mark.offset_in_decompressed_block) + ")");
@ -274,6 +162,7 @@ void MergeTreeReaderStream::adjustRightMark(size_t right_mark)
*/
init();
auto right_offset = getRightOffset(right_mark);
if (!right_offset)
{
if (last_right_offset && *last_right_offset == 0)
@ -304,4 +193,276 @@ CompressedReadBufferBase * MergeTreeReaderStream::getCompressedDataBuffer()
return compressed_data_buffer;
}
size_t MergeTreeReaderStreamSingleColumn::getRightOffset(size_t right_mark) const
{
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
/// Special case, can happen in Collapsing/Replacing engines
if (marks_count == 0)
return 0;
assert(right_mark <= marks_count);
if (right_mark == 0)
return marks_getter->getMark(right_mark, 0).offset_in_compressed_file;
if (right_mark == marks_count)
return file_size;
/// Find the right border of the last mark we need to read.
/// To do that let's find the upper bound of the offset of the last
/// included mark.
if (settings.is_low_cardinality_dictionary)
{
/// In LowCardinality dictionary several consecutive marks can point to the same offset.
///
/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// So, that's why we have to read one extra granule to the right,
/// while reading dictionary of LowCardinality.
///
/// Example:
/// Mark 0, points to [0, 8]
/// Mark 1, points to [0, 8]
/// Mark 2, points to [0, 8]
/// Mark 3, points to [0, 8]
/// Mark 4, points to [42336, 2255]
/// Mark 5, points to [42336, 2255] <--- for example need to read until 5
/// Mark 6, points to [42336, 2255] <--- not suitable, because have same offset
/// Mark 7, points to [84995, 7738] <--- next different mark
/// Mark 8, points to [84995, 7738]
/// Mark 9, points to [126531, 8637] <--- what we are looking for
auto indices = collections::range(right_mark, marks_count);
auto next_different_mark = [&](auto lhs, auto rhs)
{
return marks_getter->getMark(lhs, 0).asTuple() < marks_getter->getMark(rhs, 0).asTuple();
};
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, std::move(next_different_mark));
if (it == indices.end())
return file_size;
right_mark = *it;
}
/// This is a good scenario. The compressed block is finished within the right mark,
/// and previous mark was different.
if (marks_getter->getMark(right_mark, 0).offset_in_decompressed_block == 0
&& marks_getter->getMark(right_mark, 0) != marks_getter->getMark(right_mark - 1, 0))
return marks_getter->getMark(right_mark, 0).offset_in_compressed_file;
/// If right_mark has non-zero offset in decompressed block, we have to
/// read its compressed block in a whole, because it may consist of data from previous granule.
///
/// For example:
/// Mark 6, points to [42336, 2255]
/// Mark 7, points to [84995, 7738] <--- right_mark
/// Mark 8, points to [84995, 7738]
/// Mark 9, points to [126531, 8637] <--- what we are looking for
///
/// Since mark 7 starts from offset in decompressed block 7738,
/// it has some data from mark 6 and we have to read
/// compressed block [84995; 126531 in a whole.
auto indices = collections::range(right_mark, marks_count);
auto next_different_compressed_offset = [&](auto lhs, auto rhs)
{
return marks_getter->getMark(lhs, 0).offset_in_compressed_file < marks_getter->getMark(rhs, 0).offset_in_compressed_file;
};
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark, std::move(next_different_compressed_offset));
if (it != indices.end())
return marks_getter->getMark(*it, 0).offset_in_compressed_file;
return file_size;
}
std::pair<size_t, size_t> MergeTreeReaderStreamSingleColumn::estimateMarkRangeBytes(const MarkRanges & mark_ranges) const
{
assert(marks_getter != nullptr);
size_t max_range_bytes = 0;
size_t sum_range_bytes = 0;
for (const auto & mark_range : mark_ranges)
{
size_t left_mark = mark_range.begin;
size_t right_mark = mark_range.end;
size_t left_offset = left_mark < marks_count ? marks_getter->getMark(left_mark, 0).offset_in_compressed_file : 0;
auto mark_range_bytes = getRightOffset(right_mark) - left_offset;
max_range_bytes = std::max(max_range_bytes, mark_range_bytes);
sum_range_bytes += mark_range_bytes;
}
return {max_range_bytes, sum_range_bytes};
}
size_t MergeTreeReaderStreamMultipleColumns::getRightOffsetOneColumn(size_t right_mark_non_included, size_t column_position) const
{
/// NOTE: if we are reading the whole file, then right_mark == marks_count
/// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks.
/// Special case, can happen in Collapsing/Replacing engines
if (marks_count == 0)
return 0;
assert(right_mark_non_included <= marks_count);
if (right_mark_non_included == 0)
return marks_getter->getMark(right_mark_non_included, column_position).offset_in_compressed_file;
size_t right_mark_included = right_mark_non_included - 1;
if (right_mark_non_included != marks_count
&& marks_getter->getMark(right_mark_non_included, column_position).offset_in_decompressed_block != 0)
++right_mark_included;
/// The right bound for case, where there is no smaller suitable mark
/// is the start of the next stripe (in which the next column is written)
/// because each stripe always start from a new compressed block.
const auto & right_mark_in_file = marks_getter->getMark(right_mark_included, column_position);
auto next_stripe_right_mark_in_file = getStartOfNextStripeMark(right_mark_included, column_position);
/// Try to find suitable right mark in current stripe.
for (size_t mark = right_mark_included + 1; mark < marks_count; ++mark)
{
const auto & current_mark = marks_getter->getMark(mark, column_position);
/// We found first mark that starts from the new compressed block.
if (current_mark.offset_in_compressed_file > right_mark_in_file.offset_in_compressed_file)
{
/// If it is in current stripe return it to reduce amount of read data.
if (current_mark < next_stripe_right_mark_in_file)
return current_mark.offset_in_compressed_file;
/// Otherwise return start of new stripe as an upper bound.
break;
}
}
return next_stripe_right_mark_in_file.offset_in_compressed_file;
}
std::pair<size_t, size_t>
MergeTreeReaderStreamMultipleColumns::estimateMarkRangeBytesOneColumn(const MarkRanges & mark_ranges, size_t column_position) const
{
assert(marks_getter != nullptr);
/// As a maximal range we return the maximal size of a whole stripe.
size_t max_range_bytes = 0;
size_t sum_range_bytes = 0;
for (const auto & mark_range : mark_ranges)
{
auto start_of_stripe_mark = marks_getter->getMark(mark_range.begin, column_position);
auto start_of_next_stripe_mark = getStartOfNextStripeMark(mark_range.begin, column_position);
for (size_t mark = mark_range.begin; mark < mark_range.end; ++mark)
{
const auto & current_mark = marks_getter->getMark(mark, column_position);
/// We found a start of new stripe, now update values.
if (current_mark > start_of_next_stripe_mark)
{
auto current_range_bytes = getRightOffsetOneColumn(mark, column_position) - start_of_stripe_mark.offset_in_compressed_file;
max_range_bytes = std::max(max_range_bytes, current_range_bytes);
sum_range_bytes += current_range_bytes;
start_of_stripe_mark = current_mark;
start_of_next_stripe_mark = getStartOfNextStripeMark(mark, column_position);
}
}
auto current_range_bytes = getRightOffsetOneColumn(mark_range.end, column_position) - start_of_stripe_mark.offset_in_compressed_file;
max_range_bytes = std::max(max_range_bytes, current_range_bytes);
sum_range_bytes += current_range_bytes;
}
return {max_range_bytes, sum_range_bytes};
}
MarkInCompressedFile MergeTreeReaderStreamMultipleColumns::getStartOfNextStripeMark(size_t row_index, size_t column_position) const
{
const auto & current_mark = marks_getter->getMark(row_index, column_position);
if (marks_getter->getNumColumns() == 1)
return MarkInCompressedFile{file_size, 0};
if (column_position + 1 == marks_getter->getNumColumns())
{
/**
* In case of the last column (c3), we have the following picture:
* c1 c2 c3
* x x x
* (row_index, 0) -> o x o <- (row_index, column_position)
* x x x
* ------- <- start of new stripe
* what we are -> o x x
* looking for x x x
* x x x
* -------
* So, we need to iterate forward.
*/
size_t mark_index = row_index + 1;
while (mark_index < marks_count && marks_getter->getMark(mark_index, 0) <= current_mark)
++mark_index;
return mark_index == marks_count
? MarkInCompressedFile{file_size, 0}
: marks_getter->getMark(mark_index, 0);
}
/**
* Otherwise, we have the following picture:
* c1 c2 c3
* x x o <- what we are looking for
* (row, column) --> o o <- (row, column + 1)
* x x x
* ------- <- start of new stripe
* So, we need to iterate backward.
*/
ssize_t mark_index = row_index;
while (mark_index >= 0 && marks_getter->getMark(mark_index, column_position + 1) >= current_mark)
--mark_index;
return marks_getter->getMark(mark_index + 1, column_position + 1);
}
size_t MergeTreeReaderStreamOneOfMultipleColumns::getRightOffset(size_t right_mark_non_included) const
{
return getRightOffsetOneColumn(right_mark_non_included, column_position);
}
std::pair<size_t, size_t> MergeTreeReaderStreamOneOfMultipleColumns::estimateMarkRangeBytes(const MarkRanges & mark_ranges) const
{
return estimateMarkRangeBytesOneColumn(mark_ranges, column_position);
}
size_t MergeTreeReaderStreamAllOfMultipleColumns::getRightOffset(size_t right_mark_non_included) const
{
return getRightOffsetOneColumn(right_mark_non_included, marks_loader->getNumColumns() - 1);
}
std::pair<size_t, size_t> MergeTreeReaderStreamAllOfMultipleColumns::estimateMarkRangeBytes(const MarkRanges & mark_ranges) const
{
size_t max_range_bytes = 0;
size_t sum_range_bytes = 0;
for (size_t i = 0; i < marks_getter->getNumColumns(); ++i)
{
auto [current_max, current_sum] = estimateMarkRangeBytesOneColumn(mark_ranges, i);
max_range_bytes = std::max(max_range_bytes, current_max);
sum_range_bytes += current_sum;
}
return {max_range_bytes, sum_range_bytes};
}
}

View File

@ -14,27 +14,31 @@
namespace DB
{
/// Class for reading a single column (or index).
/// Basic and the most low-level class
/// for reading single columns or indexes.
class MergeTreeReaderStream
{
public:
MergeTreeReaderStream(
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
DataPartStoragePtr data_part_storage_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
UncompressedCache * uncompressed_cache_,
size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type,
bool is_low_cardinality_dictionary_,
ThreadPool * load_marks_cache_threadpool_);
MergeTreeMarksLoaderPtr marks_loader_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_);
void seekToMark(size_t index);
virtual ~MergeTreeReaderStream() = default;
/// Seeks to start of @row_index mark. Column position is implementation defined.
virtual void seekToMark(size_t row_index) = 0;
/// Seeks to exact mark in file.
void seekToMarkAndColumn(size_t row_index, size_t column_position);
void seekToStart();
@ -48,39 +52,111 @@ public:
CompressedReadBufferBase * getCompressedDataBuffer();
private:
void init();
size_t getRightOffset(size_t right_mark);
/// Returns offset in file up to which it's needed to read file to read all rows up to @right_mark mark.
virtual size_t getRightOffset(size_t right_mark) const = 0;
/// Returns estimated max amount of bytes to read among mark ranges (which is used as size for read buffer)
/// and total amount of bytes to read in all mark ranges.
virtual std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const = 0;
const MergeTreeReaderSettings settings;
const ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type;
const clockid_t clock_type;
const MarkRanges all_mark_ranges;
size_t file_size;
UncompressedCache * uncompressed_cache;
DataPartStoragePtr data_part_storage;
std::string path_prefix;
std::string data_file_extension;
bool is_low_cardinality_dictionary = false;
size_t marks_count;
const DataPartStoragePtr data_part_storage;
const std::string path_prefix;
const std::string data_file_extension;
UncompressedCache * const uncompressed_cache;
ReadBuffer * data_buffer;
CompressedReadBufferBase * compressed_data_buffer;
MarkCache * mark_cache;
bool save_marks_in_cache;
bool initialized = false;
std::optional<size_t> last_right_offset;
const MergeTreeIndexGranularityInfo * index_granularity_info;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
MergeTreeMarksLoader marks_loader;
protected:
void init();
const MergeTreeReaderSettings settings;
const size_t marks_count;
const size_t file_size;
const MergeTreeMarksLoaderPtr marks_loader;
MergeTreeMarksGetterPtr marks_getter;
};
/// Class for reading a single column (or index) from file
/// that contains a single column (for wide parts).
class MergeTreeReaderStreamSingleColumn : public MergeTreeReaderStream
{
public:
template <typename... Args>
explicit MergeTreeReaderStreamSingleColumn(Args &&... args)
: MergeTreeReaderStream{std::forward<Args>(args)...}
{
}
size_t getRightOffset(size_t right_mark_non_included) const override;
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const override;
void seekToMark(size_t row_index) override { seekToMarkAndColumn(row_index, 0); }
};
/// Base class for reading from file that contains multiple columns.
/// It is used to read from compact parts.
/// See more details about data layout in MergeTreeDataPartCompact.h.
class MergeTreeReaderStreamMultipleColumns : public MergeTreeReaderStream
{
public:
template <typename... Args>
explicit MergeTreeReaderStreamMultipleColumns(Args &&... args)
: MergeTreeReaderStream{std::forward<Args>(args)...}
{
}
protected:
size_t getRightOffsetOneColumn(size_t right_mark_non_included, size_t column_position) const;
std::pair<size_t, size_t> estimateMarkRangeBytesOneColumn(const MarkRanges & mark_ranges, size_t column_position) const;
MarkInCompressedFile getStartOfNextStripeMark(size_t row_index, size_t column_position) const;
};
/// Class for reading a single column from file that contains multiple columns
/// (for parallel reading from compact parts with large stripes).
class MergeTreeReaderStreamOneOfMultipleColumns : public MergeTreeReaderStreamMultipleColumns
{
public:
template <typename... Args>
explicit MergeTreeReaderStreamOneOfMultipleColumns(size_t column_position_, Args &&... args)
: MergeTreeReaderStreamMultipleColumns{std::forward<Args>(args)...}
, column_position(column_position_)
{
}
size_t getRightOffset(size_t right_mark_non_included) const override;
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const override;
void seekToMark(size_t row_index) override { seekToMarkAndColumn(row_index, column_position); }
private:
const size_t column_position;
};
/// Class for reading multiple columns from file that contains multiple columns
/// (for reading from compact parts with small stripes).
class MergeTreeReaderStreamAllOfMultipleColumns : public MergeTreeReaderStreamMultipleColumns
{
public:
template <typename... Args>
explicit MergeTreeReaderStreamAllOfMultipleColumns(Args &&... args)
: MergeTreeReaderStreamMultipleColumns{std::forward<Args>(args)...}
{
}
size_t getRightOffset(size_t right_mark_non_included) const override;
std::pair<size_t, size_t> estimateMarkRangeBytes(const MarkRanges & mark_ranges) const override;
void seekToMark(size_t row_index) override { seekToMarkAndColumn(row_index, 0); }
};
}

View File

@ -225,18 +225,29 @@ void MergeTreeReaderWide::addStreams(
return;
}
has_any_stream = true;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
auto context = data_part_info_for_read->getContext();
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(*stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read, *stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
auto marks_loader = std::make_shared<MergeTreeMarksLoader>(
data_part_info_for_read,
mark_cache,
data_part_info_for_read->getIndexGranularityInfo().getMarksFilePath(*stream_name),
data_part_info_for_read->getMarksCount(),
data_part_info_for_read->getIndexGranularityInfo(),
settings.save_marks_in_cache,
settings.read_settings,
load_marks_threadpool,
/*num_columns_in_mark=*/ 1);
has_any_stream = true;
auto stream_settings = settings;
stream_settings.is_low_cardinality_dictionary = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
streams.emplace(*stream_name, std::make_unique<MergeTreeReaderStreamSingleColumn>(
data_part_info_for_read->getDataPartStorage(), *stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, stream_settings,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(*stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),
profile_callback, clock_type, is_lc_dict, load_marks_threadpool));
std::move(marks_loader), profile_callback, clock_type));
};
serialization->enumerateStreams(callback);

View File

@ -184,9 +184,11 @@ private:
auto & compressed_data = compressed->getData();
auto & uncompressed_data = uncompressed->getData();
auto marks_getter = marks_loader->loadMarks();
for (size_t i = 0; i < num_rows; ++i)
{
auto mark = marks_loader->getMark(i, col_idx);
auto mark = marks_getter->getMark(i, col_idx);
compressed_data[i] = mark.offset_in_compressed_file;
uncompressed_data[i] = mark.offset_in_decompressed_block;

View File

@ -190,7 +190,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database,
block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared<DataTypeUUID>(), "storage_uuid"));
block_to_filter.insert(ColumnWithTypeAndName(std::move(storage_uuid_column_mut), std::make_shared<DataTypeUUID>(), "uuid"));
if (rows)
{
@ -203,7 +203,7 @@ StoragesInfoStream::StoragesInfoStream(const ActionsDAGPtr & filter_by_database,
database_column = block_to_filter.getByName("database").column;
table_column = block_to_filter.getByName("table").column;
active_column = block_to_filter.getByName("active").column;
storage_uuid_column = block_to_filter.getByName("storage_uuid").column;
storage_uuid_column = block_to_filter.getByName("uuid").column;
}
class ReadFromSystemPartsBase : public SourceStepWithFilter

View File

@ -385,7 +385,7 @@ upgrade_check_digest = DigestConfig(
docker=["clickhouse/upgrade-check"],
)
integration_check_digest = DigestConfig(
include_paths=["./tests/ci/integration_test_check.py", "./tests/integration"],
include_paths=["./tests/ci/integration_test_check.py", "./tests/integration/"],
exclude_files=[".md"],
docker=IMAGES.copy(),
)

View File

@ -475,7 +475,7 @@ class ClickhouseIntegrationTestsRunner:
return result
@staticmethod
def _update_counters(main_counters, current_counters, broken_tests):
def _update_counters(main_counters, current_counters):
for test in current_counters["PASSED"]:
if test not in main_counters["PASSED"]:
if test in main_counters["FAILED"]:
@ -483,21 +483,14 @@ class ClickhouseIntegrationTestsRunner:
if test in main_counters["BROKEN"]:
main_counters["BROKEN"].remove(test)
if test not in broken_tests:
main_counters["PASSED"].append(test)
else:
main_counters["NOT_FAILED"].append(test)
main_counters["PASSED"].append(test)
for state in ("ERROR", "FAILED"):
for test in current_counters[state]:
if test in main_counters["PASSED"]:
main_counters["PASSED"].remove(test)
if test not in broken_tests:
if test not in main_counters[state]:
main_counters[state].append(test)
else:
if test not in main_counters["BROKEN"]:
main_counters["BROKEN"].append(test)
if test not in main_counters[state]:
main_counters[state].append(test)
for state in ("SKIPPED",):
for test in current_counters[state]:
@ -564,7 +557,6 @@ class ClickhouseIntegrationTestsRunner:
tests_in_group,
num_tries,
num_workers,
broken_tests,
):
try:
return self.run_test_group(
@ -573,7 +565,6 @@ class ClickhouseIntegrationTestsRunner:
tests_in_group,
num_tries,
num_workers,
broken_tests,
)
except Exception as e:
logging.info("Failed to run %s:\n%s", test_group, e)
@ -596,7 +587,6 @@ class ClickhouseIntegrationTestsRunner:
tests_in_group,
num_tries,
num_workers,
broken_tests,
):
counters = {
"ERROR": [],
@ -706,7 +696,7 @@ class ClickhouseIntegrationTestsRunner:
)
times_lines = parse_test_times(info_path)
new_tests_times = get_test_times(times_lines)
self._update_counters(counters, new_counters, broken_tests)
self._update_counters(counters, new_counters)
for test_name, test_time in new_tests_times.items():
tests_times[test_name] = test_time
@ -783,7 +773,7 @@ class ClickhouseIntegrationTestsRunner:
final_retry += 1
logging.info("Running tests for the %s time", i)
counters, tests_times, log_paths = self.try_run_test_group(
repo_path, "bugfix" if should_fail else "flaky", tests_to_run, 1, 1, []
repo_path, "bugfix" if should_fail else "flaky", tests_to_run, 1, 1
)
logs += log_paths
if counters["FAILED"]:
@ -915,20 +905,10 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Shuffling test groups")
random.shuffle(items_to_run)
broken_tests = []
if not self.use_analyzer:
with open(
f"{repo_path}/tests/analyzer_integration_broken_tests.txt",
"r",
encoding="utf-8",
) as f:
broken_tests = f.read().splitlines()
logging.info("Broken tests in the list: %s", len(broken_tests))
for group, tests in items_to_run:
logging.info("Running test group %s containing %s tests", group, len(tests))
group_counters, group_test_times, log_paths = self.try_run_test_group(
repo_path, group, tests, MAX_RETRY, NUM_WORKERS, broken_tests
repo_path, group, tests, MAX_RETRY, NUM_WORKERS
)
total_tests = 0
for counter, value in group_counters.items():

View File

@ -1,7 +1,7 @@
version: '2.3'
services:
mysql_client:
image: mysql:5.7
image: mysql:8.0
restart: always
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: 1

View File

@ -1,8 +1,10 @@
import base64
import errno
from functools import cache
import http.client
import logging
import os
import platform
import stat
import os.path as p
import pprint
@ -4743,3 +4745,8 @@ class ClickHouseKiller(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.clickhouse_node.start_clickhouse()
@cache
def is_arm():
return any(arch in platform.processor().lower() for arch in ("arm, aarch"))

View File

@ -1,5 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/config_with_hosts.xml"])
@ -16,9 +16,11 @@ node5 = cluster.add_instance(
"node5", main_configs=["configs/config_without_allowed_hosts.xml"]
)
node6 = cluster.add_instance("node6", main_configs=["configs/config_for_remote.xml"])
node7 = cluster.add_instance(
"node7", main_configs=["configs/config_for_redirect.xml"], with_hdfs=True
)
if not is_arm():
node7 = cluster.add_instance(
"node7", main_configs=["configs/config_for_redirect.xml"], with_hdfs=True
)
@pytest.fixture(scope="module")
@ -270,6 +272,7 @@ def test_table_function_remote(start_cluster):
)
@pytest.mark.skipif(is_arm(), reason="skip for ARM")
def test_redirect(start_cluster):
hdfs_api = start_cluster.hdfs_api
@ -284,6 +287,7 @@ def test_redirect(start_cluster):
node7.query("DROP TABLE table_test_7_1")
@pytest.mark.skipif(is_arm(), reason="skip for ARM")
def test_HDFS(start_cluster):
assert "not allowed" in node7.query_and_get_error(
"CREATE TABLE table_test_7_2 (word String) ENGINE=HDFS('http://hdfs1:50075/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'CSV')"
@ -293,6 +297,7 @@ def test_HDFS(start_cluster):
)
@pytest.mark.skipif(is_arm(), reason="skip for ARM")
def test_schema_inference(start_cluster):
error = node7.query_and_get_error("desc url('http://test.com`, 'TSVRaw'')")
assert error.find("ReadWriteBufferFromHTTPBase") == -1

View File

@ -1,14 +1,17 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
disk_types = {
"default": "Local",
"disk_s3": "S3",
"disk_hdfs": "HDFS",
"disk_encrypted": "S3",
}
# do not test HDFS on ARM
if not is_arm():
disk_types["disk_hdfs"] = "HDFS"
@pytest.fixture(scope="module")
def cluster():
@ -18,7 +21,7 @@ def cluster():
"node",
main_configs=["configs/storage.xml"],
with_minio=True,
with_hdfs=True,
with_hdfs=not is_arm(),
)
cluster.start()

View File

@ -1,5 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from pyhdfs import HdfsClient
@ -10,6 +10,9 @@ disk_types = {
"disk_encrypted": "S3",
}
if is_arm():
pytestmark = pytest.mark.skip
@pytest.fixture(scope="module")
def cluster():

View File

@ -8,9 +8,13 @@ from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
from urllib import parse
# Skip on ARM due to Confluent/Kafka
if is_arm():
pytestmark = pytest.mark.skip
@pytest.fixture(scope="module")
def started_cluster():

View File

@ -3,7 +3,7 @@ import os.path as p
import pytest
import uuid
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from string import Template
@ -14,6 +14,9 @@ instance = cluster.add_instance(
datasource = "self"
records = 1000
if is_arm():
pytestmark = pytest.mark.skip
@pytest.fixture(scope="module")
def started_cluster():

View File

@ -2,10 +2,14 @@ import time
import logging
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",

View File

@ -1,11 +1,15 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
import helpers.keeper_utils as keeper_utils
from minio.deleteobjects import DeleteObject
import os
if is_arm():
pytestmark = pytest.mark.skip
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
cluster = ClickHouseCluster(__file__)
node_logs = cluster.add_instance(

View File

@ -1,5 +1,9 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
instance1 = cluster.add_instance(

View File

@ -2,10 +2,13 @@ import logging
import sys
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from pyhdfs import HdfsClient
if is_arm():
pytestmark = pytest.mark.skip
@pytest.fixture(scope="module")
def started_cluster():

View File

@ -6,6 +6,7 @@ from helpers.cluster import (
ClickHouseCluster,
ClickHouseInstance,
get_docker_compose_path,
is_arm,
)
import logging
@ -13,6 +14,10 @@ from . import materialized_with_ddl
DOCKER_COMPOSE_PATH = get_docker_compose_path()
# skip all test on arm due to no arm support in mysql57
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
mysql_node = None
mysql8_node = None

View File

@ -3,7 +3,7 @@ import time
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.utility import generate_values
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
@ -16,6 +16,10 @@ CONFIG_PATH = os.path.join(
)
if is_arm():
pytestmark = pytest.mark.skip
def create_table(cluster, table_name, additional_settings=None):
node = cluster.instances["node"]

View File

@ -12,11 +12,16 @@ from typing import Literal
import docker
import pymysql.connections
import pytest
from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check
from helpers.cluster import (
ClickHouseCluster,
get_docker_compose_path,
run_and_check,
)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DOCKER_COMPOSE_PATH = get_docker_compose_path()
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",

View File

@ -1,10 +1,15 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.network import PartitionManager
import threading
import time
# skip all tests in the module on ARM due to HDFS
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",

View File

@ -2,10 +2,13 @@ import os
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from pyhdfs import HdfsClient
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",

View File

@ -22,7 +22,7 @@ import kafka.errors
import pytest
from google.protobuf.internal.encoder import _VarintBytes
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
@ -40,6 +40,8 @@ from . import kafka_pb2
from . import social_pb2
from . import message_with_repeated_pb2
if is_arm():
pytestmark = pytest.mark.skip
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
# TODO: add test for SELECT LIMIT is working.

View File

@ -3,9 +3,12 @@ import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
import subprocess
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",

View File

@ -1,10 +0,0 @@
00000000-0000-0000-0000-000000000000 1231_1_1_0
00000000-0000-0000-0000-000000000000 6666_2_2_0
00000000-0000-0000-0000-000000000000 1231_1_1_0 users
00000000-0000-0000-0000-000000000000 6666_2_2_0 users
00000000-0000-0000-0000-000000000000 1231_1_1_0 users uid
00000000-0000-0000-0000-000000000000 1231_1_1_0 users name
00000000-0000-0000-0000-000000000000 1231_1_1_0 users age
00000000-0000-0000-0000-000000000000 6666_2_2_0 users uid
00000000-0000-0000-0000-000000000000 6666_2_2_0 users name
00000000-0000-0000-0000-000000000000 6666_2_2_0 users age

View File

@ -1,11 +0,0 @@
DROP TABLE IF EXISTS users;
CREATE TABLE users (uid Int16, name String, age Int16) ENGINE=MergeTree ORDER BY uid PARTITION BY uid;
INSERT INTO users VALUES (1231, 'John', 33);
INSERT INTO users VALUES (6666, 'Ksenia', 48);
SELECT uuid, name from system.parts WHERE database = currentDatabase() AND table = 'users';
SELECT uuid, name, table from system.parts WHERE database = currentDatabase() AND table = 'users' AND uuid = '00000000-0000-0000-0000-000000000000';
SELECT uuid, name, table, column from system.parts_columns WHERE database = currentDatabase() AND table = 'users' AND uuid = '00000000-0000-0000-0000-000000000000';
DROP TABLE IF EXISTS users;

View File

@ -161,7 +161,9 @@ ClickHouseNIO
ClickHouseVapor
ClickVisual
ClickableSquare
CloudAvailableBadge
CloudDetails
CloudNotSupportedBadge
CloudStorage
CodeBlock
CodeLLDB