Merge branch 'master' into sql-for-workload-management

This commit is contained in:
serxa 2024-10-29 22:18:11 +00:00
commit fe9b323981
120 changed files with 1896 additions and 222 deletions

View File

@ -42,17 +42,18 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
* [New York Meetup](https://www.meetup.com/clickhouse-new-york-user-group/events/304268174) - December 9
Recently completed meetups
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1
* [ClickHouse Guangzhou User Group Meetup](https://mp.weixin.qq.com/s/GSvo-7xUoVzCsuUvlLTpCw) - August 25
* [Seattle Meetup (Statsig)](https://www.meetup.com/clickhouse-seattle-user-group/events/302518075/) - August 27
* [Melbourne Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302732666/) - August 27

View File

@ -78,6 +78,10 @@ Specifying privileges you can use asterisk (`*`) instead of a table or a databas
Also, you can omit database name. In this case privileges are granted for current database.
For example, `GRANT SELECT ON * TO john` grants the privilege on all the tables in the current database, `GRANT SELECT ON mytable TO john` grants the privilege on the `mytable` table in the current database.
:::note
The feature described below is available starting with the 24.10 ClickHouse version.
:::
You can also put asterisks at the end of a table or a database name. This feature allows you to grant privileges on an abstract prefix of the table's path.
Example: `GRANT SELECT ON db.my_tables* TO john`. This query allows `john` to execute the `SELECT` query over all the `db` database tables with the prefix `my_tables*`.

View File

@ -207,7 +207,6 @@ namespace ServerSetting
extern const ServerSettingsBool format_alter_operations_with_parentheses;
extern const ServerSettingsUInt64 global_profiler_cpu_time_period_ns;
extern const ServerSettingsUInt64 global_profiler_real_time_period_ns;
extern const ServerSettingsDouble gwp_asan_force_sample_probability;
extern const ServerSettingsUInt64 http_connections_soft_limit;
extern const ServerSettingsUInt64 http_connections_store_limit;
extern const ServerSettingsUInt64 http_connections_warn_limit;
@ -622,7 +621,7 @@ void sanityChecks(Server & server)
#if defined(OS_LINUX)
try
{
const std::unordered_set<std::string> fastClockSources = {
const std::unordered_set<std::string> fast_clock_sources = {
// ARM clock
"arch_sys_counter",
// KVM guest clock
@ -631,7 +630,7 @@ void sanityChecks(Server & server)
"tsc",
};
const char * filename = "/sys/devices/system/clocksource/clocksource0/current_clocksource";
if (!fastClockSources.contains(readLine(filename)))
if (!fast_clock_sources.contains(readLine(filename)))
server.context()->addWarningMessage("Linux is not using a fast clock source. Performance can be degraded. Check " + String(filename));
}
catch (...) // NOLINT(bugprone-empty-catch)
@ -1929,10 +1928,6 @@ try
if (global_context->isServerCompletelyStarted())
CannotAllocateThreadFaultInjector::setFaultProbability(new_server_settings[ServerSetting::cannot_allocate_thread_fault_injection_probability]);
#if USE_GWP_ASAN
GWPAsan::setForceSampleProbability(new_server_settings[ServerSetting::gwp_asan_force_sample_probability]);
#endif
ProfileEvents::increment(ProfileEvents::MainConfigLoads);
/// Must be the last.
@ -2441,7 +2436,6 @@ try
#if USE_GWP_ASAN
GWPAsan::initFinished();
GWPAsan::setForceSampleProbability(server_settings[ServerSetting::gwp_asan_force_sample_probability]);
#endif
try

View File

@ -163,6 +163,7 @@ enum class AccessType : uint8_t
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -470,8 +470,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
{
if (!need_render_progress && select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true);
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), toggle_enabled);
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), progress_table_toggle_enabled);
}
}
@ -825,6 +824,9 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
if (!need_render_progress && !need_render_progress_table)
return;
progress_table_toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle");
progress_table_toggle_on = !progress_table_toggle_enabled;
/// If need_render_progress and need_render_progress_table are enabled,
/// use ProgressOption that was set for the progress bar for progress table as well.
ProgressOption progress = progress_option ? progress_option : progress_table_option;
@ -881,7 +883,7 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
void ClientBase::initKeystrokeInterceptor()
{
if (is_interactive && need_render_progress_table && getClientConfiguration().getBool("enable-progress-table-toggle", true))
if (is_interactive && need_render_progress_table && progress_table_toggle_enabled)
{
keystroke_interceptor = std::make_unique<TerminalKeystrokeInterceptor>(in_fd, error_stream);
keystroke_interceptor->registerCallback(' ', [this]() { progress_table_toggle_on = !progress_table_toggle_on; });
@ -1151,6 +1153,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
if (keystroke_interceptor)
{
progress_table_toggle_on = false;
try
{
keystroke_interceptor->startIntercept();
@ -1446,6 +1449,9 @@ void ClientBase::onProfileEvents(Block & block)
/// Flush all buffers.
void ClientBase::resetOutput()
{
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
/// Order is important: format, compression, file
if (output_format)

View File

@ -340,6 +340,7 @@ protected:
ProgressTable progress_table;
bool need_render_progress = true;
bool need_render_progress_table = true;
bool progress_table_toggle_enabled = true;
std::atomic_bool progress_table_toggle_on = false;
bool need_render_profile_events = true;
bool written_first_block = false;

View File

@ -180,9 +180,12 @@ void writeWithWidth(Out & out, std::string_view s, size_t width)
template <typename Out>
void writeWithWidthStrict(Out & out, std::string_view s, size_t width)
{
chassert(width != 0);
constexpr std::string_view ellipsis = "";
if (s.size() > width)
out << s.substr(0, width - 1) << "";
if (width <= ellipsis.size())
out << s.substr(0, width);
else
out << s.substr(0, width - ellipsis.size()) << ellipsis;
else
out << s;
}
@ -219,7 +222,9 @@ void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool sho
writeWithWidth(message, COLUMN_EVENT_NAME, column_event_name_width);
writeWithWidth(message, COLUMN_VALUE, COLUMN_VALUE_WIDTH);
writeWithWidth(message, COLUMN_PROGRESS, COLUMN_PROGRESS_WIDTH);
writeWithWidth(message, COLUMN_DOCUMENTATION_NAME, COLUMN_DOCUMENTATION_WIDTH);
auto col_doc_width = getColumnDocumentationWidth(terminal_width);
if (col_doc_width)
writeWithWidth(message, COLUMN_DOCUMENTATION_NAME, col_doc_width);
message << CLEAR_TO_END_OF_LINE;
double elapsed_sec = watch.elapsedSeconds();
@ -257,9 +262,12 @@ void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool sho
writeWithWidth(message, formatReadableValue(value_type, progress) + "/s", COLUMN_PROGRESS_WIDTH);
message << setColorForDocumentation();
const auto * doc = getDocumentation(event_name_to_event.at(name));
writeWithWidthStrict(message, doc, COLUMN_DOCUMENTATION_WIDTH);
if (col_doc_width)
{
message << setColorForDocumentation();
const auto * doc = getDocumentation(event_name_to_event.at(name));
writeWithWidthStrict(message, doc, col_doc_width);
}
message << RESET_COLOR;
message << CLEAR_TO_END_OF_LINE;
@ -372,6 +380,14 @@ size_t ProgressTable::tableSize() const
return metrics.empty() ? 0 : metrics.size() + 1;
}
size_t ProgressTable::getColumnDocumentationWidth(size_t terminal_width) const
{
auto fixed_columns_width = column_event_name_width + COLUMN_VALUE_WIDTH + COLUMN_PROGRESS_WIDTH;
if (terminal_width < fixed_columns_width + COLUMN_DOCUMENTATION_MIN_WIDTH)
return 0;
return terminal_width - fixed_columns_width;
}
ProgressTable::MetricInfo::MetricInfo(ProfileEvents::Type t) : type(t)
{
}

View File

@ -87,6 +87,7 @@ private:
};
size_t tableSize() const;
size_t getColumnDocumentationWidth(size_t terminal_width) const;
using MetricName = String;
@ -110,7 +111,7 @@ private:
static constexpr std::string_view COLUMN_DOCUMENTATION_NAME = "Documentation";
static constexpr size_t COLUMN_VALUE_WIDTH = 20;
static constexpr size_t COLUMN_PROGRESS_WIDTH = 20;
static constexpr size_t COLUMN_DOCUMENTATION_WIDTH = 100;
static constexpr size_t COLUMN_DOCUMENTATION_MIN_WIDTH = COLUMN_DOCUMENTATION_NAME.size();
std::ostream & output_stream;
int in_fd;

View File

@ -369,6 +369,23 @@ void ColumnArray::popBack(size_t n)
offsets_data.resize_assume_reserved(offsets_data.size() - n);
}
ColumnCheckpointPtr ColumnArray::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
}
void ColumnArray::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
getData().updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getData().rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
{
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);

View File

@ -161,6 +161,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(offsets);

View File

@ -1000,6 +1000,56 @@ ColumnPtr ColumnDynamic::compress() const
});
}
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & nested = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
const auto & variants = variant_column_ptr->getVariants();
size_t old_size = nested.size();
chassert(old_size <= variants.size());
for (size_t i = 0; i < old_size; ++i)
{
variants[i]->updateCheckpoint(*nested[i]);
}
/// If column has new variants since last checkpoint create checkpoints for them.
if (old_size < variants.size())
{
nested.resize(variants.size());
for (size_t i = old_size; i < variants.size(); ++i)
nested[i] = variants[i]->getCheckpoint();
}
checkpoint.size = size();
}
void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(nested.size() <= variant_column_ptr->getNumVariants());
/// The structure hasn't changed, so we can use generic rollback of Variant column
if (nested.size() == variant_column_ptr->getNumVariants())
{
variant_column_ptr->rollback(checkpoint);
return;
}
/// Manually rollback internals of Variant column
variant_column_ptr->getOffsets().resize_assume_reserved(checkpoint.size);
variant_column_ptr->getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
auto & variants = variant_column_ptr->getVariants();
for (size_t i = 0; i < nested.size(); ++i)
variants[i]->rollback(*nested[i]);
/// Keep the structure of variant as is but rollback
/// to 0 variants that are not in the checkpoint.
for (size_t i = nested.size(); i < variants.size(); ++i)
variants[i] = variants[i]->cloneEmpty();
}
String ColumnDynamic::getTypeNameAt(size_t row_num) const
{
const auto & variant_col = getVariantColumn();

View File

@ -304,6 +304,15 @@ public:
variant_column_ptr->protect();
}
ColumnCheckpointPtr getCheckpoint() const override
{
return variant_column_ptr->getCheckpoint();
}
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(variant_column);

View File

@ -312,6 +312,21 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
max = std::move(map_max_value);
}
ColumnCheckpointPtr ColumnMap::getCheckpoint() const
{
return nested->getCheckpoint();
}
void ColumnMap::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
nested->updateCheckpoint(checkpoint);
}
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
{
nested->rollback(checkpoint);
}
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
{
callback(nested);

View File

@ -102,6 +102,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -302,6 +302,23 @@ void ColumnNullable::popBack(size_t n)
getNullMapColumn().popBack(n);
}
ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
}
void ColumnNullable::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
nested_column->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
{
getNullMapData().resize_assume_reserved(checkpoint.size);
nested_column->rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
{
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);

View File

@ -143,6 +143,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(nested_column);

View File

@ -30,6 +30,23 @@ const std::shared_ptr<SerializationDynamic> & getDynamicSerialization()
return dynamic_serialization;
}
struct ColumnObjectCheckpoint : public ColumnCheckpoint
{
using CheckpointsMap = std::unordered_map<std::string_view, ColumnCheckpointPtr>;
ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_)
: ColumnCheckpoint(size_)
, typed_paths(std::move(typed_paths_))
, dynamic_paths(std::move(dynamic_paths_))
, shared_data(std::move(shared_data_))
{
}
CheckpointsMap typed_paths;
CheckpointsMap dynamic_paths;
ColumnCheckpointPtr shared_data;
};
}
ColumnObject::ColumnObject(
@ -698,6 +715,69 @@ void ColumnObject::popBack(size_t n)
shared_data->popBack(n);
}
ColumnCheckpointPtr ColumnObject::getCheckpoint() const
{
auto get_checkpoints = [](const auto & columns)
{
ColumnObjectCheckpoint::CheckpointsMap checkpoints;
for (const auto & [name, column] : columns)
checkpoints[name] = column->getCheckpoint();
return checkpoints;
};
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
}
void ColumnObject::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & object_checkpoint = assert_cast<ColumnObjectCheckpoint &>(checkpoint);
auto update_checkpoints = [&](const auto & columns_map, auto & checkpoints_map)
{
for (const auto & [name, column] : columns_map)
{
auto & nested = checkpoints_map[name];
if (!nested)
nested = column->getCheckpoint();
else
column->updateCheckpoint(*nested);
}
};
checkpoint.size = size();
update_checkpoints(typed_paths, object_checkpoint.typed_paths);
update_checkpoints(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->updateCheckpoint(*object_checkpoint.shared_data);
}
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
auto rollback_columns = [&](auto & columns_map, const auto & checkpoints_map)
{
NameSet names_to_remove;
/// Rollback subcolumns and remove paths that were not in checkpoint.
for (auto & [name, column] : columns_map)
{
auto it = checkpoints_map.find(name);
if (it == checkpoints_map.end())
names_to_remove.insert(name);
else
column->rollback(*it->second);
}
for (const auto & name : names_to_remove)
columns_map.erase(name);
};
rollback_columns(typed_paths, object_checkpoint.typed_paths);
rollback_columns(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->rollback(*object_checkpoint.shared_data);
}
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
{
StringRef res(begin, 0);

View File

@ -161,6 +161,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -308,6 +308,28 @@ void ColumnSparse::popBack(size_t n)
_size = new_size;
}
ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
}
void ColumnSparse::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
values->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
{
_size = checkpoint.size;
const auto & nested = *assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested;
chassert(nested.size > 0);
values->rollback(nested);
getOffsetsData().resize_assume_reserved(nested.size - 1);
}
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
{
if (_size != filt.size())

View File

@ -149,6 +149,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -240,6 +240,23 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
return permuteImpl(*this, perm, limit);
}
ColumnCheckpointPtr ColumnString::getCheckpoint() const
{
auto nested = std::make_shared<ColumnCheckpoint>(chars.size());
return std::make_shared<ColumnCheckpointWithNested>(size(), std::move(nested));
}
void ColumnString::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested->size = chars.size();
}
void ColumnString::rollback(const ColumnCheckpoint & checkpoint)
{
offsets.resize_assume_reserved(checkpoint.size);
chars.resize_assume_reserved(assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested->size);
}
void ColumnString::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{

View File

@ -194,6 +194,10 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;

View File

@ -254,6 +254,37 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(columns.size());
for (const auto & column : columns)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnTuple::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == columns.size());
checkpoint.size = size();
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
{
column_length = checkpoint.size;
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(columns.size() == checkpoints.size());
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->rollback(*checkpoints[i]);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if (columns.empty())

View File

@ -118,6 +118,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -739,6 +739,39 @@ void ColumnVariant::popBack(size_t n)
offsets->popBack(n);
}
ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(variants.size());
for (const auto & column : variants)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnVariant::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == variants.size());
checkpoint.size = size();
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(variants.size() == checkpoints.size());
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->rollback(*checkpoints[i]);
}
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
/// During any serialization/deserialization we should always use global discriminators.

View File

@ -248,6 +248,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -49,6 +49,40 @@ struct EqualRange
using EqualRanges = std::vector<EqualRange>;
/// A checkpoint that contains size of column and all its subcolumns.
/// It can be used to rollback column to the previous state, for example
/// after failed parsing when column may be in inconsistent state.
struct ColumnCheckpoint
{
size_t size;
explicit ColumnCheckpoint(size_t size_) : size(size_) {}
virtual ~ColumnCheckpoint() = default;
};
using ColumnCheckpointPtr = std::shared_ptr<ColumnCheckpoint>;
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
struct ColumnCheckpointWithNested : public ColumnCheckpoint
{
ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_)
: ColumnCheckpoint(size_), nested(std::move(nested_))
{
}
ColumnCheckpointPtr nested;
};
struct ColumnCheckpointWithMultipleNested : public ColumnCheckpoint
{
ColumnCheckpointWithMultipleNested(size_t size_, ColumnCheckpoints nested_)
: ColumnCheckpoint(size_), nested(std::move(nested_))
{
}
ColumnCheckpoints nested;
};
/// Declares interface to store columns in memory.
class IColumn : public COW<IColumn>
{
@ -509,6 +543,17 @@ public:
/// The operation is slow and performed only for debug builds.
virtual void protect() {}
/// Returns checkpoint of current state of column.
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
/// Updates the checkpoint with current state. It is used to avoid extra allocations in 'getCheckpoint'.
virtual void updateCheckpoint(ColumnCheckpoint & checkpoint) const { checkpoint.size = size(); }
/// Rollbacks column to the checkpoint.
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
/// Sizes of columns in checkpoint must be less or equal than current size.
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.

View File

@ -920,3 +920,71 @@ TEST(ColumnDynamic, compare)
ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1);
ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1);
}
TEST(ColumnDynamic, rollback)
{
auto check_variant = [](const ColumnVariant & column_variant, std::vector<size_t> sizes)
{
ASSERT_EQ(column_variant.getNumVariants(), sizes.size());
size_t num_rows = 0;
for (size_t i = 0; i < sizes.size(); ++i)
{
ASSERT_EQ(column_variant.getVariants()[i]->size(), sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, column_variant.size());
};
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector<size_t> sizes)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(cp).nested;
size_t num_rows = 0;
for (size_t i = 0; i < nested.size(); ++i)
{
ASSERT_EQ(nested[i]->size, sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, cp.size);
};
std::vector<std::pair<ColumnCheckpointPtr, std::vector<size_t>>> checkpoints;
auto column = ColumnDynamic::create(2);
auto checkpoint = column->getCheckpoint();
column->insert(Field(42));
column->updateCheckpoint(*checkpoint);
checkpoints.emplace_back(checkpoint, std::vector<size_t>{0, 1, 0});
column->insert(Field("str1"));
column->rollback(*checkpoint);
check_checkpoint(*checkpoint, checkpoints.back().second);
check_variant(column->getVariantColumn(), checkpoints.back().second);
column->insert("str1");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 1});
column->insert("str2");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 2});
column->insert(Array({1, 2}));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{1, 1, 2});
column->insert(Field(42.42));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{2, 1, 2});
for (const auto & [cp, sizes] : checkpoints)
{
auto column_copy = column->clone();
column_copy->rollback(*cp);
check_checkpoint(*cp, sizes);
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), sizes);
}
}

View File

@ -5,6 +5,7 @@
#include <IO/WriteBufferFromString.h>
#include <Common/Arena.h>
#include "Core/Field.h"
#include <gtest/gtest.h>
using namespace DB;
@ -349,3 +350,65 @@ TEST(ColumnObject, SkipSerializedInArena)
pos = col2->skipSerializedInArena(pos);
ASSERT_EQ(pos, end);
}
TEST(ColumnObject, rollback)
{
auto type = DataTypeFactory::instance().get("JSON(max_dynamic_types=10, max_dynamic_paths=2, a.a UInt32, a.b UInt32)");
auto col = type->createColumn();
auto & col_object = assert_cast<ColumnObject &>(*col);
const auto & typed_paths = col_object.getTypedPaths();
const auto & dynamic_paths = col_object.getDynamicPaths();
const auto & shared_data = col_object.getSharedDataColumn();
auto assert_sizes = [&](size_t size)
{
for (const auto & [name, column] : typed_paths)
ASSERT_EQ(column->size(), size);
for (const auto & [name, column] : dynamic_paths)
ASSERT_EQ(column->size(), size);
ASSERT_EQ(shared_data.size(), size);
};
auto checkpoint = col_object.getCheckpoint();
col_object.insert(Object{{"a.a", Field{1u}}});
col_object.updateCheckpoint(*checkpoint);
col_object.insert(Object{{"a.b", Field{2u}}});
col_object.insert(Object{{"a.a", Field{3u}}});
col_object.rollback(*checkpoint);
assert_sizes(1);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 0);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*typed_paths.at("a.b"))[0], Field{0u});
col_object.insert(Object{{"a.c", Field{"ccc"}}});
checkpoint = col_object.getCheckpoint();
col_object.insert(Object{{"a.d", Field{"ddd"}}});
col_object.insert(Object{{"a.e", Field{"eee"}}});
assert_sizes(4);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 2);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
ASSERT_EQ((*dynamic_paths.at("a.d"))[2], Field{"ddd"});
col_object.rollback(*checkpoint);
assert_sizes(2);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 1);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
}

View File

@ -57,7 +57,7 @@ static bool guarded_alloc_initialized = []
opts.MaxSimultaneousAllocations = 1024;
if (!env_options_raw || !std::string_view{env_options_raw}.contains("SampleRate"))
opts.SampleRate = 10000;
opts.SampleRate = 0;
const char * collect_stacktraces = std::getenv("GWP_ASAN_COLLECT_STACKTRACES"); // NOLINT(concurrency-mt-unsafe)
if (collect_stacktraces && std::string_view{collect_stacktraces} == "1")

View File

@ -8,7 +8,6 @@
#include <Common/thread_local_rng.h>
#include <atomic>
#include <random>
namespace GWPAsan
{
@ -39,14 +38,6 @@ inline bool shouldSample()
return init_finished.load(std::memory_order_relaxed) && GuardedAlloc.shouldSample();
}
inline bool shouldForceSample()
{
if (!init_finished.load(std::memory_order_relaxed))
return false;
std::bernoulli_distribution dist(force_sample_probability.load(std::memory_order_relaxed));
return dist(thread_local_rng);
}
}
#endif

View File

@ -115,11 +115,6 @@ protected:
template <typename ... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams &&... allocator_params)
{
#if USE_GWP_ASAN
if (unlikely(GWPAsan::shouldForceSample()))
gwp_asan::getThreadLocals()->NextSampleCounter = 1;
#endif
char * allocated = reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...));
c_start = allocated + pad_left;
@ -149,11 +144,6 @@ protected:
return;
}
#if USE_GWP_ASAN
if (unlikely(GWPAsan::shouldForceSample()))
gwp_asan::getThreadLocals()->NextSampleCounter = 1;
#endif
unprotect();
ptrdiff_t end_diff = c_end - c_start;

View File

@ -100,6 +100,7 @@ namespace DB
DECLARE(String, mark_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Mark cache policy name.", 0) \
DECLARE(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
DECLARE(Double, mark_cache_size_ratio, DEFAULT_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the mark cache relative to the cache's total size.", 0) \
DECLARE(Double, mark_cache_prewarm_ratio, 0.95, "The ratio of total size of mark cache to fill during prewarm.", 0) \
DECLARE(String, index_uncompressed_cache_policy, DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY, "Secondary index uncompressed cache policy name.", 0) \
DECLARE(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of secondary indices. Zero means disabled.", 0) \
DECLARE(Double, index_uncompressed_cache_size_ratio, DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index uncompressed cache relative to the cache's total size.", 0) \
@ -183,7 +184,6 @@ namespace DB
DECLARE(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overridden by a merge tree setting)", 0) \
DECLARE(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
DECLARE(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
DECLARE(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
DECLARE(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
DECLARE(UInt64, memory_worker_period_ms, 0, "Tick period of background memory worker which corrects memory tracker memory usages and cleans up unused pages during higher memory usage. If set to 0, default value will be used depending on the memory usage source", 0) \
DECLARE(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0) \

View File

@ -108,7 +108,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"input_format_orc_dictionary_as_low_cardinality", false, true, "Treat ORC dictionary encoded columns as LowCardinality columns while reading ORC files"},
{"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"},
{"max_parts_to_move", 0, 1000, "New setting"},
{"hnsw_candidate_list_size_for_search", 0, 0, "New setting"},
{"hnsw_candidate_list_size_for_search", 64, 256, "New setting. Previously, the value was optionally specified in CREATE INDEX and 64 by default."},
{"allow_reorder_prewhere_conditions", false, true, "New setting"},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."}

View File

@ -1085,7 +1085,7 @@ public:
}
auto & col_lc = assert_cast<ColumnLowCardinality &>(column);
auto tmp_nested = col_lc.getDictionary().getNestedColumn()->cloneEmpty();
auto tmp_nested = removeNullable(col_lc.getDictionary().getNestedColumn()->cloneEmpty())->assumeMutable();
if (!nested->insertResultToColumn(*tmp_nested, element, insert_settings, format_settings, error))
return false;

View File

@ -119,4 +119,6 @@ private:
std::tuple<const BlockInfo *, size_t> lookUpMark(size_t idx) const;
};
using PlainMarksByName = std::unordered_map<String, std::unique_ptr<MarksInCompressedFile::PlainArray>>;
}

View File

@ -1,27 +1,28 @@
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionDynamicAdaptor.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/SipHash.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNothing.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnTuple.h>
#include <Core/Block.h>
#include <Core/TypeId.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnNothing.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/Native.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Functions/FunctionHelpers.h>
#include <cstdlib>
#include <memory>
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include "config.h"
#include <cstdlib>
#include <memory>
#if USE_EMBEDDED_COMPILER
# include <llvm/IR/IRBuilder.h>
#endif
@ -451,6 +452,7 @@ FunctionBasePtr IFunctionOverloadResolver::build(const ColumnsWithTypeAndName &
/// Use FunctionBaseDynamicAdaptor if default implementation for Dynamic is enabled and we have Dynamic type in arguments.
if (useDefaultImplementationForDynamic())
{
checkNumberOfArguments(arguments.size());
for (const auto & arg : arguments)
{
if (isDynamic(arg.type))

View File

@ -44,16 +44,10 @@ struct Memory : boost::noncopyable, Allocator
char * m_data = nullptr;
size_t alignment = 0;
[[maybe_unused]] bool allow_gwp_asan_force_sample{false};
Memory() = default;
/// If alignment != 0, then allocate memory aligned to specified value.
explicit Memory(size_t size_, size_t alignment_ = 0, bool allow_gwp_asan_force_sample_ = false)
: alignment(alignment_), allow_gwp_asan_force_sample(allow_gwp_asan_force_sample_)
{
alloc(size_);
}
explicit Memory(size_t size_, size_t alignment_ = 0) : alignment(alignment_) { alloc(size_); }
~Memory()
{
@ -133,11 +127,6 @@ private:
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, new_capacity);
#if USE_GWP_ASAN
if (unlikely(allow_gwp_asan_force_sample && GWPAsan::shouldForceSample()))
gwp_asan::getThreadLocals()->NextSampleCounter = 1;
#endif
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = new_size;
@ -165,7 +154,7 @@ protected:
public:
/// If non-nullptr 'existing_memory' is passed, then buffer will not create its own memory and will use existing_memory without ownership.
explicit BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment, /*allow_gwp_asan_force_sample_=*/true)
: Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment)
{
Base::set(existing_memory ? existing_memory : memory.data(), size);
Base::padded = !existing_memory;

View File

@ -1050,15 +1050,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
}
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
current_exception = e.displayText();
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
key.query_str, current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->rollback(*checkpoints[i]);
current_entry->finish(std::current_exception());
return 0;

View File

@ -89,6 +89,9 @@ namespace CurrentMetrics
extern const Metric RestartReplicaThreads;
extern const Metric RestartReplicaThreadsActive;
extern const Metric RestartReplicaThreadsScheduled;
extern const Metric MergeTreePartsLoaderThreads;
extern const Metric MergeTreePartsLoaderThreadsActive;
extern const Metric MergeTreePartsLoaderThreadsScheduled;
}
namespace DB
@ -97,6 +100,7 @@ namespace Setting
{
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsSeconds receive_timeout;
extern const SettingsMaxThreads max_threads;
}
namespace ServerSetting
@ -359,6 +363,11 @@ BlockIO InterpreterSystemQuery::execute()
HTTPConnectionPools::instance().dropCache();
break;
}
case Type::PREWARM_MARK_CACHE:
{
prewarmMarkCache();
break;
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->clearMarkCache();
@ -1298,6 +1307,28 @@ RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
return tasks;
}
void InterpreterSystemQuery::prewarmMarkCache()
{
if (table_id.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table is not specified for prewarming marks cache");
getContext()->checkAccess(AccessType::SYSTEM_PREWARM_MARK_CACHE, table_id);
auto table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
auto * merge_tree = dynamic_cast<MergeTreeData *>(table_ptr.get());
if (!merge_tree)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Command PREWARM MARK CACHE is supported only for MergeTree table, but got: {}", table_ptr->getName());
ThreadPool pool(
CurrentMetrics::MergeTreePartsLoaderThreads,
CurrentMetrics::MergeTreePartsLoaderThreadsActive,
CurrentMetrics::MergeTreePartsLoaderThreadsScheduled,
getContext()->getSettingsRef()[Setting::max_threads]);
merge_tree->prewarmMarkCache(pool);
}
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
@ -1499,6 +1530,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_WAIT_LOADING_PARTS, query.getDatabase(), query.getTable());
break;
}
case Type::PREWARM_MARK_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_PREWARM_MARK_CACHE, query.getDatabase(), query.getTable());
break;
}
case Type::SYNC_DATABASE_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_DATABASE_REPLICA, query.getDatabase());

View File

@ -82,6 +82,7 @@ private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);
void prewarmMarkCache();
void stopReplicatedDDLQueries();
void startReplicatedDDLQueries();

View File

@ -191,6 +191,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::FLUSH_DISTRIBUTED:
case Type::PREWARM_MARK_CACHE:
{
if (table)
{

View File

@ -23,6 +23,7 @@ public:
SUSPEND,
DROP_DNS_CACHE,
DROP_CONNECTIONS_CACHE,
PREWARM_MARK_CACHE,
DROP_MARK_CACHE,
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,

View File

@ -276,6 +276,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::PREWARM_MARK_CACHE:
{
if (!parseQueryWithOnCluster(res, pos, expected))
return false;

View File

@ -6,11 +6,24 @@
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Common/typeid_cast.h>
#include <Core/Settings.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
}
namespace Setting
{
extern const SettingsUInt64 max_query_size;
extern const SettingsUInt64 max_parser_depth;
extern const SettingsUInt64 max_parser_backtracks;
}
bool parseIdentifierOrStringLiteral(IParser::Pos & pos, Expected & expected, String & result)
{
return IParserBase::wrapParseImpl(pos, [&]
@ -54,4 +67,18 @@ bool parseIdentifiersOrStringLiterals(IParser::Pos & pos, Expected & expected, S
return true;
}
std::vector<String> parseIdentifiersOrStringLiterals(const String & str, const Settings & settings)
{
Tokens tokens(str.data(), str.data() + str.size(), settings[Setting::max_query_size]);
IParser::Pos pos(tokens, static_cast<unsigned>(settings[Setting::max_parser_depth]), static_cast<unsigned>(settings[Setting::max_parser_backtracks]));
Expected expected;
std::vector<String> res;
if (!parseIdentifiersOrStringLiterals(pos, expected, res))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse string ('{}') into vector of identifiers", str);
return res;
}
}

View File

@ -7,6 +7,8 @@
namespace DB
{
struct Settings;
/** Parses a name of an object which could be written in the following forms:
* name / `name` / "name" (identifier) or 'name'.
* Note that empty strings are not allowed.
@ -16,4 +18,7 @@ bool parseIdentifierOrStringLiteral(IParser::Pos & pos, Expected & expected, Str
/** Parse a list of identifiers or string literals. */
bool parseIdentifiersOrStringLiterals(IParser::Pos & pos, Expected & expected, Strings & result);
/** Parse a list of identifiers or string literals into vector of strings. */
std::vector<String> parseIdentifiersOrStringLiterals(const String & str, const Settings & settings);
}

View File

@ -22,8 +22,12 @@ StreamingFormatExecutor::StreamingFormatExecutor(
, adding_defaults_transform(std::move(adding_defaults_transform_))
, port(format->getPort().getHeader(), format.get())
, result_columns(header.cloneEmptyColumns())
, checkpoints(result_columns.size())
{
connect(format->getPort(), port);
for (size_t i = 0; i < result_columns.size(); ++i)
checkpoints[i] = result_columns[i]->getCheckpoint();
}
MutableColumns StreamingFormatExecutor::getResultColumns()
@ -53,6 +57,9 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
size_t StreamingFormatExecutor::execute()
{
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->updateCheckpoint(*checkpoints[i]);
try
{
size_t new_rows = 0;
@ -85,19 +92,19 @@ size_t StreamingFormatExecutor::execute()
catch (Exception & e)
{
format->resetParser();
return on_error(result_columns, e);
return on_error(result_columns, checkpoints, e);
}
catch (std::exception & e)
{
format->resetParser();
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
return on_error(result_columns, exception);
return on_error(result_columns, checkpoints, exception);
}
catch (...)
{
format->resetParser();
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, exception);
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, checkpoints, exception);
}
}

View File

@ -19,12 +19,12 @@ public:
/// and exception to rethrow it or add context to it.
/// Should return number of new rows, which are added in callback
/// to result columns in comparison to previous call of `execute`.
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
using ErrorCallback = std::function<size_t(const MutableColumns &, const ColumnCheckpoints &, Exception &)>;
StreamingFormatExecutor(
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); },
ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); },
SimpleTransformPtr adding_defaults_transform_ = nullptr);
/// Returns numbers of new read rows.
@ -50,6 +50,7 @@ private:
InputPort port;
MutableColumns result_columns;
ColumnCheckpoints checkpoints;
};
}

View File

@ -111,6 +111,10 @@ Chunk IRowInputFormat::read()
for (size_t i = 0; i < num_columns; ++i)
columns[i] = header.getByPosition(i).type->createColumn(*serializations[i]);
ColumnCheckpoints checkpoints(columns.size());
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
checkpoints[column_idx] = columns[column_idx]->getCheckpoint();
block_missing_values.clear();
size_t num_rows = 0;
@ -136,6 +140,9 @@ Chunk IRowInputFormat::read()
{
try
{
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
columns[column_idx]->updateCheckpoint(*checkpoints[column_idx]);
info.read_columns.clear();
continue_reading = readRow(columns, info);
@ -199,14 +206,9 @@ Chunk IRowInputFormat::read()
syncAfterError();
/// Truncate all columns in block to initial size (remove values, that was appended to only part of columns).
/// Rollback all columns in block to initial size (remove values, that was appended to only part of columns).
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
auto & column = columns[column_idx];
if (column->size() > num_rows)
column->popBack(column->size() - num_rows);
}
columns[column_idx]->rollback(*checkpoints[column_idx]);
}
}
}

View File

@ -172,7 +172,7 @@ JSONAsObjectRowInputFormat::JSONAsObjectRowInputFormat(
const auto & type = header_.getByPosition(0).type;
if (!isObject(type) && !isObjectDeprecated(type))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Input format JSONAsObject is only suitable for tables with a single column of type Object/JSON but the column type is {}",
"Input format JSONAsObject is only suitable for tables with a single column of type JSON but the column type is {}",
type->getName());
}
@ -193,8 +193,8 @@ JSONAsObjectExternalSchemaReader::JSONAsObjectExternalSchemaReader(const FormatS
if (!settings.json.allow_deprecated_object_type && !settings.json.allow_json_type)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Cannot infer the data structure in JSONAsObject format because experimental Object/JSON type is not allowed. Set setting "
"allow_experimental_object_type = 1 or allow_experimental_json_type=1 in order to allow it");
"Cannot infer the data structure in JSONAsObject format because experimental JSON type is not allowed. Set setting "
"allow_experimental_json_type = 1 in order to allow it");
}
void registerInputFormatJSONAsString(FormatFactory & factory)

View File

@ -301,6 +301,9 @@ void TCPHandler::runImpl()
{
receiveHello();
if (!default_database.empty())
DatabaseCatalog::instance().assertDatabaseExists(default_database);
/// In interserver mode queries are executed without a session context.
if (!is_interserver_mode)
session->makeSessionContext();

View File

@ -86,21 +86,18 @@ Chunk FileLogSource::generate()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -114,23 +114,20 @@ Chunk KafkaSource::generateImpl()
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -854,23 +854,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -180,6 +180,9 @@ public:
void loadRowsCountFileForUnexpectedPart();
/// Loads marks and saves them into mark cache for specified columns.
virtual void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const = 0;
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.

View File

@ -91,6 +91,13 @@ Columns IMergeTreeDataPartWriter::releaseIndexColumns()
return result;
}
PlainMarksByName IMergeTreeDataPartWriter::releaseCachedMarks()
{
PlainMarksByName res;
std::swap(cached_marks, res);
return res;
}
SerializationPtr IMergeTreeDataPartWriter::getSerialization(const String & column_name) const
{
auto it = serializations.find(column_name);

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/VirtualColumnsDescription.h>
#include <Formats/MarkInCompressedFile.h>
namespace DB
@ -46,6 +47,9 @@ public:
virtual void finish(bool sync) = 0;
Columns releaseIndexColumns();
PlainMarksByName releaseCachedMarks();
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
protected:
@ -69,6 +73,8 @@ protected:
MutableDataPartStoragePtr data_part_storage;
MutableColumns index_columns;
MergeTreeIndexGranularity index_granularity;
/// Marks that will be saved to cache on finish.
PlainMarksByName cached_marks;
};
using MergeTreeDataPartWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;

View File

@ -34,6 +34,11 @@ public:
return writer->getIndexGranularity();
}
PlainMarksByName releaseCachedMarks()
{
return writer->releaseCachedMarks();
}
protected:
/// Remove all columns marked expired in data_part. Also, clears checksums

View File

@ -371,6 +371,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log)
{
part = merge_task->getFuture().get();
auto cached_marks = merge_task->releaseCachedMarks();
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);
/// Why we reset task here? Because it holds shared pointer to part and tryRemovePartImmediately will
@ -444,6 +445,9 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
addMarksToCache(*part, cached_marks, mark_cache);
write_part_log({});
StorageReplicatedMergeTree::incrementMergedPartsProfileEvent(part->getType());

View File

@ -152,6 +152,12 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
auto marks = merge_task->releaseCachedMarks();
addMarksToCache(*new_part, marks, mark_cache);
}
write_part_log({});
StorageMergeTree::incrementMergedPartsProfileEvent(new_part->getType());
transfer_profile_counters_to_initial_query();
@ -163,7 +169,6 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
}
}
ContextMutablePtr MergePlainMergeTreeTask::createTaskContext() const

View File

@ -93,6 +93,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsUInt64 vertical_merge_algorithm_min_columns_to_activate;
extern const MergeTreeSettingsUInt64 vertical_merge_algorithm_min_rows_to_activate;
extern const MergeTreeSettingsBool vertical_merge_remote_filesystem_prefetch;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ErrorCodes
@ -546,6 +547,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
}
bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
global_ctx->to = std::make_shared<MergedBlockOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
@ -555,6 +558,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec,
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
save_marks_in_cache,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
@ -1085,6 +1089,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
ctx->executor = std::make_unique<PullingPipelineExecutor>(ctx->column_parts_pipeline);
NamesAndTypesList columns_list = {*ctx->it_name_and_type};
bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
ctx->column_to = std::make_unique<MergedColumnOnlyOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
@ -1093,6 +1099,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
column_pipepline.indexes_to_recalc,
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
&global_ctx->written_offset_columns,
save_marks_in_cache,
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
@ -1130,6 +1137,10 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
auto cached_marks = ctx->column_to->releaseCachedMarks();
for (auto & [name, marks] : cached_marks)
global_ctx->cached_marks.emplace(name, std::move(marks));
ctx->delayed_streams.emplace_back(std::move(ctx->column_to));
while (ctx->delayed_streams.size() > ctx->max_delayed_streams)
@ -1276,6 +1287,10 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
else
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
auto cached_marks = global_ctx->to->releaseCachedMarks();
for (auto & [name, marks] : cached_marks)
global_ctx->cached_marks.emplace(name, std::move(marks));
global_ctx->new_data_part->getDataPartStorage().precommitTransaction();
global_ctx->promise.set_value(global_ctx->new_data_part);

View File

@ -5,6 +5,7 @@
#include <Common/ProfileEvents.h>
#include <Common/filesystemHelpers.h>
#include <Formats/MarkInCompressedFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
@ -132,6 +133,13 @@ public:
return nullptr;
}
PlainMarksByName releaseCachedMarks() const
{
PlainMarksByName res;
std::swap(global_ctx->cached_marks, res);
return res;
}
bool execute();
private:
@ -209,6 +217,7 @@ private:
std::promise<MergeTreeData::MutableDataPartPtr> promise{};
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns{};
PlainMarksByName cached_marks;
MergeTreeTransactionPtr txn;
bool need_prefix;

View File

@ -22,6 +22,7 @@
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
@ -154,6 +155,7 @@ namespace
namespace DB
{
namespace Setting
{
extern const SettingsBool allow_drop_detached;
@ -229,6 +231,12 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsString storage_policy;
extern const MergeTreeSettingsFloat zero_copy_concurrent_part_removal_max_postpone_ratio;
extern const MergeTreeSettingsUInt64 zero_copy_concurrent_part_removal_max_split_times;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ServerSetting
{
extern const ServerSettingsDouble mark_cache_prewarm_ratio;
}
namespace ErrorCodes
@ -2335,6 +2343,55 @@ void MergeTreeData::stopOutdatedAndUnexpectedDataPartsLoadingTask()
}
}
void MergeTreeData::prewarmMarkCache(ThreadPool & pool)
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return;
auto * mark_cache = getContext()->getMarkCache().get();
if (!mark_cache)
return;
auto metadata_snaphost = getInMemoryMetadataPtr();
auto column_names = getColumnsToPrewarmMarks(*getSettings(), metadata_snaphost->getColumns().getAllPhysical());
if (column_names.empty())
return;
Stopwatch watch;
LOG_TRACE(log, "Prewarming mark cache");
auto data_parts = getDataPartsVectorForInternalUsage();
/// Prewarm mark cache firstly for the most fresh parts according
/// to time columns in partition key (if exists) and by modification time.
auto to_tuple = [](const auto & part)
{
return std::make_tuple(part->getMinMaxDate().second, part->getMinMaxTime().second, part->modification_time);
};
std::sort(data_parts.begin(), data_parts.end(), [&to_tuple](const auto & lhs, const auto & rhs)
{
return to_tuple(lhs) > to_tuple(rhs);
});
ThreadPoolCallbackRunnerLocal<void> runner(pool, "PrewarmMarks");
double ratio_to_prewarm = getContext()->getServerSettings()[ServerSetting::mark_cache_prewarm_ratio];
for (const auto & part : data_parts)
{
if (mark_cache->sizeInBytes() >= mark_cache->maxSizeInBytes() * ratio_to_prewarm)
break;
runner([&] { part->loadMarksToCache(column_names, mark_cache); });
}
runner.waitForAllToFinishAndRethrowFirstError();
watch.stop();
LOG_TRACE(log, "Prewarmed mark cache in {} seconds", watch.elapsedSeconds());
}
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).

View File

@ -506,6 +506,9 @@ public:
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts);
/// Prewarm mark cache for the most recent data parts.
void prewarmMarkCache(ThreadPool & pool);
String getLogName() const { return log.loadName(); }
Int64 getMaxBlockNumber() const;

View File

@ -136,6 +136,32 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage());
}
void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const
{
if (column_names.empty() || !mark_cache)
return;
auto context = storage.getContext();
auto read_settings = context->getReadSettings();
auto * load_marks_threadpool = read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
auto info_for_read = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), std::make_shared<AlterConversions>());
LOG_TEST(getLogger("MergeTreeDataPartCompact"), "Loading marks into mark cache for columns {} of part {}", toString(column_names), name);
MergeTreeMarksLoader loader(
info_for_read,
mark_cache,
index_granularity_info.getMarksFilePath(DATA_FILE_NAME),
index_granularity.getMarksCount(),
index_granularity_info,
/*save_marks_in_cache=*/ true,
read_settings,
load_marks_threadpool,
columns.size());
loader.loadMarks();
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{
if (!getColumnPosition(column.getNameInStorage()))

View File

@ -54,6 +54,8 @@ public:
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
~MergeTreeDataPartCompact() override;
protected:

View File

@ -182,6 +182,47 @@ void MergeTreeDataPartWide::loadIndexGranularity()
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename);
}
void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const
{
if (column_names.empty() || !mark_cache)
return;
std::vector<std::unique_ptr<MergeTreeMarksLoader>> loaders;
auto context = storage.getContext();
auto read_settings = context->getReadSettings();
auto * load_marks_threadpool = read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
auto info_for_read = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), std::make_shared<AlterConversions>());
LOG_TEST(getLogger("MergeTreeDataPartWide"), "Loading marks into mark cache for columns {} of part {}", toString(column_names), name);
for (const auto & column_name : column_names)
{
auto serialization = getSerialization(column_name);
serialization->enumerateStreams([&](const auto & subpath)
{
auto stream_name = getStreamNameForColumn(column_name, subpath, checksums);
if (!stream_name)
return;
loaders.emplace_back(std::make_unique<MergeTreeMarksLoader>(
info_for_read,
mark_cache,
index_granularity_info.getMarksFilePath(*stream_name),
index_granularity.getMarksCount(),
index_granularity_info,
/*save_marks_in_cache=*/ true,
read_settings,
load_marks_threadpool,
/*num_columns_in_mark=*/ 1));
loaders.back()->startAsyncLoad();
});
}
for (auto & loader : loaders)
loader->loadMarks();
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{

View File

@ -51,6 +51,8 @@ public:
std::optional<time_t> getColumnModificationTime(const String & column_name) const override;
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include "Formats/MarkInCompressedFile.h"
namespace DB
{
@ -54,6 +55,11 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);
}
if (settings.save_marks_in_cache)
{
cached_marks[MergeTreeDataPartCompact::DATA_FILE_NAME] = std::make_unique<MarksInCompressedFile::PlainArray>();
}
for (const auto & column : columns_list)
{
auto compression = getCodecDescOrDefault(column.name, default_codec);
@ -255,9 +261,12 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
return &result_stream->hashing_buf;
};
MarkInCompressedFile mark{plain_hashing.count(), static_cast<UInt64>(0)};
writeBinaryLittleEndian(mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(mark.offset_in_decompressed_block, marks_out);
writeBinaryLittleEndian(plain_hashing.count(), marks_out);
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
if (!cached_marks.empty())
cached_marks.begin()->second->push_back(mark);
writeColumnSingleGranule(
block.getByName(name_and_type->name), getSerialization(name_and_type->name),
@ -296,11 +305,17 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(MergeTreeDataPartChecksum
if (with_final_mark && data_written)
{
MarkInCompressedFile mark{plain_hashing.count(), 0};
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeBinaryLittleEndian(plain_hashing.count(), marks_out);
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
writeBinaryLittleEndian(mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(mark.offset_in_decompressed_block, marks_out);
if (!cached_marks.empty())
cached_marks.begin()->second->push_back(mark);
}
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
}

View File

@ -8,6 +8,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseQuery.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/MarkCache.h>
namespace DB
{

View File

@ -6,6 +6,8 @@
#include <Common/escapeForFileName.h>
#include <Columns/ColumnSparse.h>
#include <Common/logger_useful.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MarkCache.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
@ -105,6 +107,12 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
indices_to_recalc_, stats_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
if (settings.save_marks_in_cache)
{
auto columns_vec = getColumnsToPrewarmMarks(*storage_settings, columns_list);
columns_to_load_marks = NameSet(columns_vec.begin(), columns_vec.end());
}
for (const auto & column : columns_list)
{
auto compression = getCodecDescOrDefault(column.name, default_codec);
@ -198,6 +206,9 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.marks_compress_block_size,
query_write_settings);
if (columns_to_load_marks.contains(name_and_type.name))
cached_marks.emplace(stream_name, std::make_unique<MarksInCompressedFile::PlainArray>());
full_name_to_stream_name.emplace(full_stream_name, stream_name);
stream_name_to_full_name.emplace(stream_name, full_stream_name);
};
@ -366,8 +377,12 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
if (settings.can_use_adaptive_granularity)
writeBinaryLittleEndian(rows_in_mark, marks_out);
if (auto it = cached_marks.find(stream_with_mark.stream_name); it != cached_marks.end())
it->second->push_back(stream_with_mark.mark);
}
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
@ -742,7 +757,6 @@ void MergeTreeDataPartWriterWide::fillChecksums(MergeTreeDataPartChecksums & che
fillPrimaryIndexChecksums(checksums);
fillSkipIndicesChecksums(checksums);
fillStatisticsChecksums(checksums);
}
@ -756,7 +770,6 @@ void MergeTreeDataPartWriterWide::finish(bool sync)
finishPrimaryIndexSerialization(sync);
finishSkipIndicesSerialization(sync);
finishStatisticsSerialization(sync);
}

View File

@ -136,6 +136,9 @@ private:
using MarksForColumns = std::unordered_map<String, StreamsWithMarks>;
MarksForColumns last_non_written_marks;
/// Set of columns to put marks in cache during write.
NameSet columns_to_load_marks;
/// How many rows we have already written in the current mark.
/// More than zero when incoming blocks are smaller then their granularity.
size_t rows_written_in_last_mark = 0;

View File

@ -71,10 +71,7 @@ namespace Setting
extern const SettingsString force_data_skipping_indices;
extern const SettingsBool force_index_by_date;
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsUInt64 max_parser_backtracks;
extern const SettingsUInt64 max_parser_depth;
extern const SettingsInt64 max_partitions_to_read;
extern const SettingsUInt64 max_query_size;
extern const SettingsUInt64 max_threads_for_indexes;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 merge_tree_coarse_index_granularity;
@ -640,20 +637,11 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed)
{
const auto & indices = settings[Setting::force_data_skipping_indices].toString();
Strings forced_indices;
{
Tokens tokens(indices.data(), indices.data() + indices.size(), settings[Setting::max_query_size]);
IParser::Pos pos(
tokens, static_cast<unsigned>(settings[Setting::max_parser_depth]), static_cast<unsigned>(settings[Setting::max_parser_backtracks]));
Expected expected;
if (!parseIdentifiersOrStringLiterals(pos, expected, forced_indices))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse force_data_skipping_indices ('{}')", indices);
}
const auto & indices_str = settings[Setting::force_data_skipping_indices].toString();
auto forced_indices = parseIdentifiersOrStringLiterals(indices_str, settings);
if (forced_indices.empty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices);
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices_str);
std::unordered_set<std::string> useful_indices_names;
for (const auto & useful_index : skip_indexes.useful_indices)

View File

@ -73,6 +73,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsFloat min_free_disk_ratio_to_perform_insert;
extern const MergeTreeSettingsBool optimize_row_order;
extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ErrorCodes
@ -684,6 +685,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data_settings)[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
@ -693,8 +695,9 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
statistics,
compression_codec,
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
false,
false,
/*reset_columns=*/ false,
save_marks_in_cache,
/*blocks_are_granules_size=*/ false,
context->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
@ -829,6 +832,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data.getSettings())[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
@ -839,7 +843,10 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
ColumnsStatistics{},
compression_codec,
Tx::PrehistoricTID,
false, false, data.getContext()->getWriteSettings());
/*reset_columns=*/ false,
save_marks_in_cache,
/*blocks_are_granules_size=*/ false,
data.getContext()->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, false);

View File

@ -34,6 +34,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
const MergeTreeSettingsPtr & storage_settings,
bool can_use_adaptive_granularity_,
bool rewrite_primary_key_,
bool save_marks_in_cache_,
bool blocks_are_granules_size_)
: min_compress_block_size(
(*storage_settings)[MergeTreeSetting::min_compress_block_size] ? (*storage_settings)[MergeTreeSetting::min_compress_block_size] : global_settings[Setting::min_compress_block_size])
@ -46,6 +47,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, primary_key_compress_block_size((*storage_settings)[MergeTreeSetting::primary_key_compress_block_size])
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, rewrite_primary_key(rewrite_primary_key_)
, save_marks_in_cache(save_marks_in_cache_)
, blocks_are_granules_size(blocks_are_granules_size_)
, query_write_settings(query_write_settings_)
, low_cardinality_max_dictionary_size(global_settings[Setting::low_cardinality_max_dictionary_size])

View File

@ -61,7 +61,8 @@ struct MergeTreeWriterSettings
const MergeTreeSettingsPtr & storage_settings,
bool can_use_adaptive_granularity_,
bool rewrite_primary_key_,
bool blocks_are_granules_size_ = false);
bool save_marks_in_cache_,
bool blocks_are_granules_size_);
size_t min_compress_block_size;
size_t max_compress_block_size;
@ -75,6 +76,7 @@ struct MergeTreeWriterSettings
bool can_use_adaptive_granularity;
bool rewrite_primary_key;
bool save_marks_in_cache;
bool blocks_are_granules_size;
WriteSettings query_write_settings;

View File

@ -3,10 +3,12 @@
#include <Common/threadPoolCallbackRunner.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <utility>
@ -21,6 +23,11 @@ namespace ProfileEvents
namespace DB
{
namespace MergeTreeSetting
{
extern const MergeTreeSettingsString columns_to_prewarm_mark_cache;
}
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
@ -211,6 +218,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksSync()
if (mark_cache)
{
auto key = MarkCache::hash(fs::path(data_part_storage->getFullPath()) / mrk_path);
if (save_marks_in_cache)
{
auto callback = [this] { return loadMarksImpl(); };
@ -249,4 +257,25 @@ std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
"LoadMarksThread");
}
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache)
{
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
for (const auto & [stream_name, marks] : cached_marks)
{
auto mark_path = part.index_granularity_info.getMarksFilePath(stream_name);
auto key = MarkCache::hash(fs::path(part.getDataPartStorage().getFullPath()) / mark_path);
mark_cache->set(key, std::make_shared<MarksInCompressedFile>(*marks));
}
}
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list)
{
auto columns_str = settings[MergeTreeSetting::columns_to_prewarm_mark_cache].toString();
if (columns_str.empty())
return columns_list.getNames();
return parseIdentifiersOrStringLiterals(columns_str, Context::getGlobalContextInstance()->getSettingsRef());
}
}

View File

@ -77,4 +77,13 @@ private:
using MergeTreeMarksLoaderPtr = std::shared_ptr<MergeTreeMarksLoader>;
class IMergeTreeDataPart;
struct MergeTreeSettings;
/// Adds computed marks for part to the marks cache.
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache);
/// Returns the list of columns suitable for prewarming of mark cache according to settings.
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list);
}

View File

@ -232,6 +232,8 @@ namespace ErrorCodes
DECLARE(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \
DECLARE(Bool, primary_key_lazy_load, true, "Load primary key in memory on first use instead of on table initialization. This can save memory in the presence of a large number of tables.", 0) \
DECLARE(Float, primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns, 0.9f, "If the value of a column of the primary key in data part changes at least in this ratio of times, skip loading next columns in memory. This allows to save memory usage by not loading useless columns of the primary key.", 0) \
DECLARE(Bool, prewarm_mark_cache, false, "If true mark cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(String, columns_to_prewarm_mark_cache, "", "List of columns to prewarm mark cache for (if enabled). Empty means all columns", 0) \
/** Projection settings. */ \
DECLARE(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
DECLARE(LightweightMutationProjectionMode, lightweight_mutation_projection_mode, LightweightMutationProjectionMode::THROW, "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop projections of this table's relevant parts, or rebuild the projections.", 0) \

View File

@ -243,6 +243,15 @@ void MergeTreeSink::finishDelayedChunk()
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (added)
{
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache);
}
}
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot));
StorageMergeTree::incrementInsertedPartsProfileEvent(part->getType());

View File

@ -25,6 +25,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
CompressionCodecPtr default_codec_,
TransactionID tid,
bool reset_columns_,
bool save_marks_in_cache,
bool blocks_are_granules_size,
const WriteSettings & write_settings_,
const MergeTreeIndexGranularity & computed_index_granularity)
@ -39,6 +40,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
storage_settings,
data_part->index_granularity_info.mark_type.adaptive,
/* rewrite_primary_key = */ true,
save_marks_in_cache,
blocks_are_granules_size);
/// TODO: looks like isStoredOnDisk() is always true for MergeTreeDataPart

View File

@ -24,6 +24,7 @@ public:
CompressionCodecPtr default_codec_,
TransactionID tid,
bool reset_columns_ = false,
bool save_marks_in_cache = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {},
const MergeTreeIndexGranularity & computed_index_granularity = {});

View File

@ -19,6 +19,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeIndices & indices_to_recalc,
const ColumnsStatistics & stats_to_recalc_,
WrittenOffsetColumns * offset_columns_,
bool save_marks_in_cache,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
@ -30,7 +31,9 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
data_part->storage.getContext()->getWriteSettings(),
storage_settings,
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */ false);
/* rewrite_primary_key = */ false,
save_marks_in_cache,
/* blocks_are_granules_size = */ false);
writer = createMergeTreeDataPartWriter(
data_part->getType(),

View File

@ -22,6 +22,7 @@ public:
const MergeTreeIndices & indices_to_recalc_,
const ColumnsStatistics & stats_to_recalc_,
WrittenOffsetColumns * offset_columns_ = nullptr,
bool save_marks_in_cache = false,
const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);

View File

@ -1623,6 +1623,7 @@ private:
ctx->compression_codec,
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
/*save_marks_in_cache=*/ false,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings(),
computed_granularity);
@ -1851,6 +1852,7 @@ private:
std::vector<MergeTreeIndexPtr>(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()),
ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()),
nullptr,
/*save_marks_in_cache=*/ false,
ctx->source_part->index_granularity,
&ctx->source_part->index_granularity_info
);

View File

@ -481,6 +481,17 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
auto * mark_cache = storage.getContext()->getMarkCache().get();
if (!error && mark_cache)
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache);
}
}
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot), ExecutionStatus(error));
StorageReplicatedMergeTree::incrementInsertedPartsProfileEvent(part->getType());
@ -521,8 +532,18 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
{
partition.temp_part.finalize();
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num).first;
if (conflict_block_ids.empty())
{
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*partition.temp_part.part, marks, mark_cache);
}
}
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(
storage.getContext(),

View File

@ -107,21 +107,18 @@ Chunk NATSSource::generate()
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -219,6 +219,108 @@ ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Proces
return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
}
void ObjectStorageQueueMetadata::alterSettings(const SettingsChanges & changes)
{
const fs::path alter_settings_lock_path = zookeeper_path / "alter_settings_lock";
zkutil::EphemeralNodeHolder::Ptr alter_settings_lock;
auto zookeeper = getZooKeeper();
/// We will retry taking alter_settings_lock for the duration of 5 seconds.
/// Do we need to add a setting for this?
const size_t num_tries = 100;
for (size_t i = 0; i < num_tries; ++i)
{
alter_settings_lock = zkutil::EphemeralNodeHolder::tryCreate(alter_settings_lock_path, *zookeeper, toString(getCurrentTime()));
if (alter_settings_lock)
break;
if (i == num_tries - 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to take alter setting lock");
sleepForMilliseconds(50);
}
Coordination::Stat stat;
auto metadata_str = zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(metadata_str);
auto new_table_metadata{table_metadata};
for (const auto & change : changes)
{
if (!ObjectStorageQueueTableMetadata::isStoredInKeeper(change.name))
continue;
if (change.name == "processing_threads_num")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.processing_threads_num == value)
{
LOG_TRACE(log, "Setting `processing_threads_num` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.processing_threads_num = value;
}
else if (change.name == "loading_retries")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.loading_retries == value)
{
LOG_TRACE(log, "Setting `loading_retries` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.loading_retries = value;
}
else if (change.name == "after_processing")
{
const auto value = ObjectStorageQueueTableMetadata::actionFromString(change.value.safeGet<String>());
if (table_metadata.after_processing == value)
{
LOG_TRACE(log, "Setting `after_processing` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.after_processing = value;
}
else if (change.name == "tracked_files_limit")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.tracked_files_limit == value)
{
LOG_TRACE(log, "Setting `tracked_files_limit` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.tracked_files_limit = value;
}
else if (change.name == "tracked_file_ttl_sec")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.tracked_files_ttl_sec == value)
{
LOG_TRACE(log, "Setting `tracked_file_ttl_sec` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.tracked_files_ttl_sec = value;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Setting `{}` is not changeable", change.name);
}
}
const auto new_metadata_str = new_table_metadata.toString();
LOG_TRACE(log, "New metadata: {}", new_metadata_str);
const fs::path table_metadata_path = zookeeper_path / "metadata";
zookeeper->set(table_metadata_path, new_metadata_str, stat.version);
table_metadata.syncChangeableSettings(new_table_metadata);
}
ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
const fs::path & zookeeper_path,
const ObjectStorageQueueSettings & settings,

View File

@ -9,6 +9,7 @@
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/SettingsChanges.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
@ -89,6 +90,8 @@ public:
const ObjectStorageQueueTableMetadata & getTableMetadata() const { return table_metadata; }
ObjectStorageQueueTableMetadata & getTableMetadata() { return table_metadata; }
void alterSettings(const SettingsChanges & changes);
private:
void cleanupThreadFunc();
void cleanupThreadFuncImpl();

View File

@ -381,10 +381,10 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH = 0,
REMOVE_PROCESSING_ID_PATH = 1,
REMOVE_PROCESSING_PATH = 2,
SET_MAX_PROCESSED_PATH = 3,
};
const auto zk_client = getZooKeeper();
@ -409,8 +409,18 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
if (Coordination::isHardwareError(code))
failure_reason = "Lost connection to keeper";
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
{
LOG_TRACE(log, "Cannot set file {} as processed. "
@ -418,13 +428,12 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
"Will retry.", path, code);
continue;
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
failure_reason = "Failed to remove processing path";
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
return;
}

View File

@ -23,15 +23,15 @@ namespace ErrorCodes
0) \
DECLARE(ObjectStorageQueueAction, after_processing, ObjectStorageQueueAction::KEEP, "Delete or keep file in after successful processing", 0) \
DECLARE(String, keeper_path, "", "Zookeeper node path", 0) \
DECLARE(UInt32, loading_retries, 10, "Retry loading up to specified number of times", 0) \
DECLARE(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
DECLARE(UInt64, loading_retries, 10, "Retry loading up to specified number of times", 0) \
DECLARE(UInt64, processing_threads_num, 1, "Number of processing threads", 0) \
DECLARE(UInt32, enable_logging_to_queue_log, 1, "Enable logging to system table system.(s3/azure_)queue_log", 0) \
DECLARE(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
DECLARE(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
DECLARE(UInt32, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
DECLARE(UInt32, polling_backoff_ms, 1000, "Polling backoff", 0) \
DECLARE(UInt32, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
DECLARE(UInt64, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
DECLARE(UInt64, polling_backoff_ms, 1000, "Polling backoff", 0) \
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
@ -112,6 +112,11 @@ ObjectStorageQueueSettings::~ObjectStorageQueueSettings() = default;
OBJECT_STORAGE_QUEUE_SETTINGS_SUPPORTED_TYPES(ObjectStorageQueueSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
void ObjectStorageQueueSettings::applyChanges(const SettingsChanges & changes)
{
impl->applyChanges(changes);
}
void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
@ -156,4 +161,9 @@ void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
}
}
Field ObjectStorageQueueSettings::get(const std::string & name)
{
return impl->get(name);
}
}

View File

@ -12,6 +12,7 @@ class ASTStorage;
struct ObjectStorageQueueSettingsImpl;
struct MutableColumnsAndConstraints;
class StorageObjectStorageQueue;
class SettingsChanges;
/// List of available types supported in ObjectStorageQueueSettings object
#define OBJECT_STORAGE_QUEUE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
@ -61,6 +62,10 @@ struct ObjectStorageQueueSettings
void loadFromQuery(ASTStorage & storage_def);
void applyChanges(const SettingsChanges & changes);
Field get(const std::string & name);
private:
std::unique_ptr<ObjectStorageQueueSettingsImpl> impl;
};

View File

@ -657,7 +657,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
{
if (files_metadata->getTableMetadata().after_processing == "delete")
if (files_metadata->getTableMetadata().after_processing == ObjectStorageQueueAction::DELETE)
{
object_storage->removeObject(StoredObject(path));
}

View File

@ -17,11 +17,11 @@ namespace ObjectStorageQueueSetting
extern const ObjectStorageQueueSettingsObjectStorageQueueAction after_processing;
extern const ObjectStorageQueueSettingsUInt32 buckets;
extern const ObjectStorageQueueSettingsString last_processed_path;
extern const ObjectStorageQueueSettingsUInt32 loading_retries;
extern const ObjectStorageQueueSettingsObjectStorageQueueMode mode;
extern const ObjectStorageQueueSettingsUInt32 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt32 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt32 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt64 loading_retries;
extern const ObjectStorageQueueSettingsUInt64 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt64 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt64 tracked_file_ttl_sec;
}
@ -56,13 +56,13 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
const std::string & format_)
: format_name(format_)
, columns(columns_.toString())
, after_processing(engine_settings[ObjectStorageQueueSetting::after_processing].toString())
, mode(engine_settings[ObjectStorageQueueSetting::mode].toString())
, tracked_files_limit(engine_settings[ObjectStorageQueueSetting::tracked_files_limit])
, tracked_files_ttl_sec(engine_settings[ObjectStorageQueueSetting::tracked_file_ttl_sec])
, buckets(engine_settings[ObjectStorageQueueSetting::buckets])
, last_processed_path(engine_settings[ObjectStorageQueueSetting::last_processed_path])
, after_processing(engine_settings[ObjectStorageQueueSetting::after_processing])
, loading_retries(engine_settings[ObjectStorageQueueSetting::loading_retries])
, tracked_files_limit(engine_settings[ObjectStorageQueueSetting::tracked_files_limit])
, tracked_files_ttl_sec(engine_settings[ObjectStorageQueueSetting::tracked_file_ttl_sec])
{
processing_threads_num_changed = engine_settings[ObjectStorageQueueSetting::processing_threads_num].changed;
if (!processing_threads_num_changed && engine_settings[ObjectStorageQueueSetting::processing_threads_num] <= 1)
@ -74,16 +74,16 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
String ObjectStorageQueueTableMetadata::toString() const
{
Poco::JSON::Object json;
json.set("after_processing", after_processing);
json.set("after_processing", actionToString(after_processing.load()));
json.set("mode", mode);
json.set("tracked_files_limit", tracked_files_limit);
json.set("tracked_files_ttl_sec", tracked_files_ttl_sec);
json.set("processing_threads_num", processing_threads_num);
json.set("tracked_files_limit", tracked_files_limit.load());
json.set("tracked_files_ttl_sec", tracked_files_ttl_sec.load());
json.set("processing_threads_num", processing_threads_num.load());
json.set("buckets", buckets);
json.set("format_name", format_name);
json.set("columns", columns);
json.set("last_processed_file", last_processed_path);
json.set("loading_retries", loading_retries);
json.set("loading_retries", loading_retries.load());
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
@ -91,6 +91,26 @@ String ObjectStorageQueueTableMetadata::toString() const
return oss.str();
}
ObjectStorageQueueAction ObjectStorageQueueTableMetadata::actionFromString(const std::string & action)
{
if (action == "keep")
return ObjectStorageQueueAction::KEEP;
if (action == "delete")
return ObjectStorageQueueAction::DELETE;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected ObjectStorageQueue action: {}", action);
}
std::string ObjectStorageQueueTableMetadata::actionToString(ObjectStorageQueueAction action)
{
switch (action)
{
case ObjectStorageQueueAction::DELETE:
return "delete";
case ObjectStorageQueueAction::KEEP:
return "keep";
}
}
ObjectStorageQueueMode ObjectStorageQueueTableMetadata::getMode() const
{
return modeFromString(mode);
@ -115,14 +135,14 @@ static auto getOrDefault(
ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json)
: format_name(json->getValue<String>("format_name"))
, columns(json->getValue<String>("columns"))
, after_processing(json->getValue<String>("after_processing"))
, mode(json->getValue<String>("mode"))
, tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
, tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0)))
, buckets(getOrDefault(json, "buckets", "", 0))
, last_processed_path(getOrDefault<String>(json, "last_processed_file", "s3queue_", ""))
, after_processing(actionFromString(json->getValue<String>("after_processing")))
, loading_retries(getOrDefault(json, "loading_retries", "", 10))
, processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 1))
, tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
, tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0)))
{
validateMode(mode);
}
@ -148,7 +168,7 @@ void ObjectStorageQueueTableMetadata::adjustFromKeeper(const ObjectStorageQueueT
else
LOG_TRACE(log, "{}", message);
processing_threads_num = from_zk.processing_threads_num;
processing_threads_num = from_zk.processing_threads_num.load();
}
}
@ -164,8 +184,8 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs "
"in action after processing. Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.after_processing),
DB::toString(after_processing));
DB::toString(from_zk.after_processing.load()),
DB::toString(after_processing.load()));
if (mode != from_zk.mode)
throw Exception(

View File

@ -19,17 +19,19 @@ class ReadBuffer;
*/
struct ObjectStorageQueueTableMetadata
{
/// Non-changeable settings.
const String format_name;
const String columns;
const String after_processing;
const String mode;
const UInt32 tracked_files_limit;
const UInt32 tracked_files_ttl_sec;
const UInt32 buckets;
const String last_processed_path;
const UInt32 loading_retries;
/// Changeable settings.
std::atomic<ObjectStorageQueueAction> after_processing;
std::atomic<UInt64> loading_retries;
std::atomic<UInt64> processing_threads_num;
std::atomic<UInt64> tracked_files_limit;
std::atomic<UInt64> tracked_files_ttl_sec;
UInt32 processing_threads_num; /// Can be changed from keeper.
bool processing_threads_num_changed = false;
ObjectStorageQueueTableMetadata(
@ -37,10 +39,36 @@ struct ObjectStorageQueueTableMetadata
const ColumnsDescription & columns_,
const std::string & format_);
ObjectStorageQueueTableMetadata(const ObjectStorageQueueTableMetadata & other)
: format_name(other.format_name)
, columns(other.columns)
, mode(other.mode)
, buckets(other.buckets)
, last_processed_path(other.last_processed_path)
, after_processing(other.after_processing.load())
, loading_retries(other.loading_retries.load())
, processing_threads_num(other.processing_threads_num.load())
, tracked_files_limit(other.tracked_files_limit.load())
, tracked_files_ttl_sec(other.tracked_files_ttl_sec.load())
{
}
void syncChangeableSettings(const ObjectStorageQueueTableMetadata & other)
{
after_processing = other.after_processing.load();
loading_retries = other.loading_retries.load();
processing_threads_num = other.processing_threads_num.load();
tracked_files_limit = other.tracked_files_limit.load();
tracked_files_ttl_sec = other.tracked_files_ttl_sec.load();
}
explicit ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json);
static ObjectStorageQueueTableMetadata parse(const String & metadata_str);
static ObjectStorageQueueAction actionFromString(const std::string & action);
static std::string actionToString(ObjectStorageQueueAction action);
String toString() const;
ObjectStorageQueueMode getMode() const;
@ -49,6 +77,25 @@ struct ObjectStorageQueueTableMetadata
void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
static bool isStoredInKeeper(const std::string & name)
{
static const std::unordered_set<std::string_view> settings_names
{
"format_name",
"columns",
"mode",
"buckets",
"last_processed_path",
"after_processing",
"loading_retries",
"processing_threads_num",
"tracked_files_limit",
"tracked_file_ttl_sec",
"tracked_files_ttl_sec",
};
return settings_names.contains(name);
}
private:
void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
};

View File

@ -103,29 +103,46 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH,
REMOVE_PROCESSING_ID_PATH,
REMOVE_PROCESSING_PATH,
SET_PROCESSED_PATH,
};
const auto zk_client = getZooKeeper();
std::string failure_reason;
Coordination::Requests requests;
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
std::map<RequestType, UInt8> request_index;
if (processing_id_version.has_value())
{
requests.push_back(zkutil::makeCheckRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1));
/// The order is important:
/// we must first check processing nodes and set processed_path the last.
request_index[CHECK_PROCESSING_ID_PATH] = 0;
request_index[REMOVE_PROCESSING_ID_PATH] = 1;
request_index[REMOVE_PROCESSING_PATH] = 2;
request_index[SET_PROCESSED_PATH] = 3;
}
else
{
request_index[SET_PROCESSED_PATH] = 0;
}
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; };
auto is_request_failed = [&](RequestType type)
{
if (!request_index.contains(type))
return false;
chassert(request_index[type] < responses.size());
return responses[request_index[type]]->error != Coordination::Error::ZOK;
};
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
@ -140,18 +157,41 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
std::string failure_reason;
if (Coordination::isHardwareError(code))
{
failure_reason = "Lost connection to keeper";
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot create a persistent node in /processed since it already exists");
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
{
/// This is normal in case of expired session with keeper.
failure_reason = "Version of processing id node changed";
}
else if (is_request_failed(REMOVE_PROCESSING_ID_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// This is normal in case of expired session with keeper as this node is ephemeral.
failure_reason = "Failed to remove processing path";
}
else if (is_request_failed(SET_PROCESSED_PATH))
{
unexpected_error = true;
failure_reason = "Cannot create a persistent node in /processed since it already exists";
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
}

View File

@ -24,6 +24,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/AlterCommands.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <filesystem>
@ -51,15 +52,15 @@ namespace ObjectStorageQueueSetting
extern const ObjectStorageQueueSettingsUInt64 max_processed_files_before_commit;
extern const ObjectStorageQueueSettingsUInt64 max_processed_rows_before_commit;
extern const ObjectStorageQueueSettingsUInt64 max_processing_time_sec_before_commit;
extern const ObjectStorageQueueSettingsUInt32 polling_min_timeout_ms;
extern const ObjectStorageQueueSettingsUInt32 polling_max_timeout_ms;
extern const ObjectStorageQueueSettingsUInt32 polling_backoff_ms;
extern const ObjectStorageQueueSettingsUInt32 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt64 polling_min_timeout_ms;
extern const ObjectStorageQueueSettingsUInt64 polling_max_timeout_ms;
extern const ObjectStorageQueueSettingsUInt64 polling_backoff_ms;
extern const ObjectStorageQueueSettingsUInt64 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt32 buckets;
extern const ObjectStorageQueueSettingsUInt32 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt32 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt64 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt64 tracked_files_limit;
extern const ObjectStorageQueueSettingsString last_processed_path;
extern const ObjectStorageQueueSettingsUInt32 loading_retries;
extern const ObjectStorageQueueSettingsUInt64 loading_retries;
extern const ObjectStorageQueueSettingsObjectStorageQueueAction after_processing;
}
@ -69,6 +70,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int BAD_QUERY_PARAMETER;
extern const int QUERY_NOT_ALLOWED;
extern const int SUPPORT_IS_DISABLED;
}
namespace
@ -353,10 +355,11 @@ void StorageObjectStorageQueue::read(
void ReadFromObjectStorageQueue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const size_t adjusted_num_streams = storage->getTableMetadata().processing_threads_num;
size_t processing_threads_num = storage->getTableMetadata().processing_threads_num;
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
for (size_t i = 0; i < processing_threads_num; ++i)
pipes.emplace_back(storage->createSource(
i/* processor_id */,
info,
@ -555,6 +558,174 @@ bool StorageObjectStorageQueue::streamToViews()
return total_rows > 0;
}
static const std::unordered_set<std::string_view> changeable_settings_unordered_mode
{
"processing_threads_num",
"loading_retries",
"after_processing",
"tracked_files_limit",
"tracked_file_ttl_sec",
"polling_min_timeout_ms",
"polling_max_timeout_ms",
"polling_backoff_ms",
/// For compatibility.
"s3queue_processing_threads_num",
"s3queue_loading_retries",
"s3queue_after_processing",
"s3queue_tracked_files_limit",
"s3queue_tracked_file_ttl_sec",
"s3queue_polling_min_timeout_ms",
"s3queue_polling_max_timeout_ms",
"s3queue_polling_backoff_ms",
};
static const std::unordered_set<std::string_view> changeable_settings_ordered_mode
{
"loading_retries",
"after_processing",
"polling_min_timeout_ms",
"polling_max_timeout_ms",
"polling_backoff_ms",
/// For compatibility.
"s3queue_loading_retries",
"s3queue_after_processing",
"s3queue_polling_min_timeout_ms",
"s3queue_polling_max_timeout_ms",
"s3queue_polling_backoff_ms",
};
static bool isSettingChangeable(const std::string & name, ObjectStorageQueueMode mode)
{
if (mode == ObjectStorageQueueMode::UNORDERED)
return changeable_settings_unordered_mode.contains(name);
else
return changeable_settings_ordered_mode.contains(name);
}
void StorageObjectStorageQueue::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::MODIFY_SETTING && command.type != AlterCommand::RESET_SETTING)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only MODIFY SETTING alter is allowed for {}", getName());
}
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
commands.apply(new_metadata, local_context);
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
if (!new_metadata.hasSettingsChanges())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No settings changes");
const auto & new_changes = new_metadata.settings_changes->as<const ASTSetQuery &>().changes;
const auto & old_changes = old_metadata.settings_changes->as<const ASTSetQuery &>().changes;
const auto mode = getTableMetadata().getMode();
for (const auto & changed_setting : new_changes)
{
auto it = std::find_if(
old_changes.begin(), old_changes.end(),
[&](const SettingChange & change) { return change.name == changed_setting.name; });
const bool setting_changed = it != old_changes.end() && it->value != changed_setting.value;
if (setting_changed)
{
if (!isSettingChangeable(changed_setting.name, mode))
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Changing setting {} is not allowed for {} mode of {}",
changed_setting.name, magic_enum::enum_name(mode), getName());
}
}
}
}
void StorageObjectStorageQueue::alter(
const AlterCommands & commands,
ContextPtr local_context,
AlterLockHolder &)
{
if (commands.isSettingsAlter())
{
auto table_id = getStorageID();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
const auto & old_settings = old_metadata.settings_changes->as<const ASTSetQuery &>().changes;
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
commands.apply(new_metadata, local_context);
auto new_settings = new_metadata.settings_changes->as<ASTSetQuery &>().changes;
ObjectStorageQueueSettings default_settings;
for (const auto & setting : old_settings)
{
auto it = std::find_if(
new_settings.begin(), new_settings.end(),
[&](const SettingChange & change) { return change.name == setting.name; });
if (it == new_settings.end())
{
/// Setting was reset.
new_settings.push_back(SettingChange(setting.name, default_settings.get(setting.name)));
}
}
SettingsChanges changed_settings;
std::set<std::string> changed_settings_set;
const auto mode = getTableMetadata().getMode();
for (const auto & setting : new_settings)
{
auto it = std::find_if(
old_settings.begin(), old_settings.end(),
[&](const SettingChange & change) { return change.name == setting.name; });
const bool setting_changed = it == old_settings.end() || it->value != setting.value;
if (!setting_changed)
continue;
if (!isSettingChangeable(setting.name, mode))
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Changing setting {} is not allowed for {} mode of {}",
setting.name, magic_enum::enum_name(mode), getName());
}
SettingChange result_setting(setting);
if (result_setting.name.starts_with("s3queue_"))
result_setting.name = result_setting.name.substr(std::strlen("s3queue_"));
changed_settings.push_back(result_setting);
auto inserted = changed_settings_set.emplace(result_setting.name).second;
if (!inserted)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting {} is duplicated", setting.name);
}
/// Alter settings which are stored in keeper.
files_metadata->alterSettings(changed_settings);
/// Alter settings which are not stored in keeper.
for (const auto & change : changed_settings)
{
if (change.name == "polling_min_timeout_ms")
polling_min_timeout_ms = change.value.safeGet<UInt64>();
if (change.name == "polling_max_timeout_ms")
polling_max_timeout_ms = change.value.safeGet<UInt64>();
if (change.name == "polling_backoff_ms")
polling_backoff_ms = change.value.safeGet<UInt64>();
}
StorageInMemoryMetadata metadata = getInMemoryMetadata();
metadata.setSettingsChanges(new_metadata.settings_changes);
setInMemoryMetadata(metadata);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
}
zkutil::ZooKeeperPtr StorageObjectStorageQueue::getZooKeeper() const
{
return getContext()->getZooKeeper();
@ -582,8 +753,8 @@ ObjectStorageQueueSettings StorageObjectStorageQueue::getSettings() const
settings[ObjectStorageQueueSetting::processing_threads_num] = table_metadata.processing_threads_num;
settings[ObjectStorageQueueSetting::enable_logging_to_queue_log] = enable_logging_to_queue_log;
settings[ObjectStorageQueueSetting::last_processed_path] = table_metadata.last_processed_path;
settings[ObjectStorageQueueSetting::tracked_file_ttl_sec] = 0;
settings[ObjectStorageQueueSetting::tracked_files_limit] = 0;
settings[ObjectStorageQueueSetting::tracked_file_ttl_sec] = table_metadata.tracked_files_ttl_sec;
settings[ObjectStorageQueueSetting::tracked_files_limit] = table_metadata.tracked_files_limit;
settings[ObjectStorageQueueSetting::polling_min_timeout_ms] = polling_min_timeout_ms;
settings[ObjectStorageQueueSetting::polling_max_timeout_ms] = polling_max_timeout_ms;
settings[ObjectStorageQueueSetting::polling_backoff_ms] = polling_backoff_ms;

View File

@ -48,6 +48,13 @@ public:
size_t max_block_size,
size_t num_streams) override;
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const override;
void alter(
const AlterCommands & commands,
ContextPtr local_context,
AlterLockHolder & table_lock_holder) override;
const auto & getFormatName() const { return configuration->format; }
const fs::path & getZooKeeperPath() const { return zk_path; }
@ -65,9 +72,9 @@ private:
const std::string engine_name;
const fs::path zk_path;
const bool enable_logging_to_queue_log;
const UInt32 polling_min_timeout_ms;
const UInt32 polling_max_timeout_ms;
const UInt32 polling_backoff_ms;
UInt64 polling_min_timeout_ms;
UInt64 polling_max_timeout_ms;
UInt64 polling_backoff_ms;
const CommitSettings commit_settings;
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;

View File

@ -165,21 +165,18 @@ Chunk RabbitMQSource::generateImpl()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -38,6 +38,7 @@
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
#include <Common/escapeForFileName.h>
#include <IO/SharedThreadPools.h>
namespace DB
@ -154,6 +155,7 @@ StorageMergeTree::StorageMergeTree(
loadMutations();
loadDeduplicationLog();
prewarmMarkCache(getActivePartsLoadingThreadPool().get());
}

Some files were not shown because too many files have changed in this diff Show More