Merge remote-tracking branch 'origin/master' into pr-right-joins

This commit is contained in:
Igor Nikonov 2024-10-30 00:03:16 +00:00
commit 6d47ef8f22
69 changed files with 1331 additions and 209 deletions

View File

@ -42,31 +42,18 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
* [Amsterdam Meetup](https://www.meetup.com/clickhouse-netherlands-user-group/events/303638814) - December 3
* [New York Meetup](https://www.meetup.com/clickhouse-new-york-user-group/events/304268174) - December 9
Recently completed meetups
* [ClickHouse Guangzhou User Group Meetup](https://mp.weixin.qq.com/s/GSvo-7xUoVzCsuUvlLTpCw) - August 25
* [Seattle Meetup (Statsig)](https://www.meetup.com/clickhouse-seattle-user-group/events/302518075/) - August 27
* [Melbourne Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302732666/) - August 27
* [Sydney Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302862966/) - September 5
* [Zurich Meetup](https://www.meetup.com/clickhouse-switzerland-meetup-group/events/302267429/) - September 5
* [San Francisco Meetup (Cloudflare)](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/302540575) - September 5
* [Raleigh Meetup (Deutsche Bank)](https://www.meetup.com/triangletechtalks/events/302723486/) - September 9
* [New York Meetup (Rokt)](https://www.meetup.com/clickhouse-new-york-user-group/events/302575342) - September 10
* [Toronto Meetup (Shopify)](https://www.meetup.com/clickhouse-toronto-user-group/events/301490855/) - September 10
* [Chicago Meetup (Jump Capital)](https://lu.ma/43tvmrfw) - September 12
* [London Meetup](https://www.meetup.com/clickhouse-london-user-group/events/302977267) - September 17
* [Austin Meetup](https://www.meetup.com/clickhouse-austin-user-group/events/302558689/) - September 17
* [Bangalore Meetup](https://www.meetup.com/clickhouse-bangalore-user-group/events/303208274/) - September 18
* [Tel Aviv Meetup](https://www.meetup.com/clickhouse-meetup-israel/events/303095121) - September 22
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1
## Recent Recordings
* **Recent Meetup Videos**: [Meetup Playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3iNDUzpY1S3L_iV4nARda_U) Whenever possible recordings of the ClickHouse Community Meetups are edited and presented as individual talks. Current featuring "Modern SQL in 2023", "Fast, Concurrent, and Consistent Asynchronous INSERTS in ClickHouse", and "Full-Text Indices: Design and Experiments"

View File

@ -78,6 +78,10 @@ Specifying privileges you can use asterisk (`*`) instead of a table or a databas
Also, you can omit database name. In this case privileges are granted for current database.
For example, `GRANT SELECT ON * TO john` grants the privilege on all the tables in the current database, `GRANT SELECT ON mytable TO john` grants the privilege on the `mytable` table in the current database.
:::note
The feature described below is available starting with the 24.10 ClickHouse version.
:::
You can also put asterisks at the end of a table or a database name. This feature allows you to grant privileges on an abstract prefix of the table's path.
Example: `GRANT SELECT ON db.my_tables* TO john`. This query allows `john` to execute the `SELECT` query over all the `db` database tables with the prefix `my_tables*`.

View File

@ -207,7 +207,6 @@ namespace ServerSetting
extern const ServerSettingsBool format_alter_operations_with_parentheses;
extern const ServerSettingsUInt64 global_profiler_cpu_time_period_ns;
extern const ServerSettingsUInt64 global_profiler_real_time_period_ns;
extern const ServerSettingsDouble gwp_asan_force_sample_probability;
extern const ServerSettingsUInt64 http_connections_soft_limit;
extern const ServerSettingsUInt64 http_connections_store_limit;
extern const ServerSettingsUInt64 http_connections_warn_limit;
@ -622,7 +621,7 @@ void sanityChecks(Server & server)
#if defined(OS_LINUX)
try
{
const std::unordered_set<std::string> fastClockSources = {
const std::unordered_set<std::string> fast_clock_sources = {
// ARM clock
"arch_sys_counter",
// KVM guest clock
@ -631,7 +630,7 @@ void sanityChecks(Server & server)
"tsc",
};
const char * filename = "/sys/devices/system/clocksource/clocksource0/current_clocksource";
if (!fastClockSources.contains(readLine(filename)))
if (!fast_clock_sources.contains(readLine(filename)))
server.context()->addWarningMessage("Linux is not using a fast clock source. Performance can be degraded. Check " + String(filename));
}
catch (...) // NOLINT(bugprone-empty-catch)
@ -1930,10 +1929,6 @@ try
if (global_context->isServerCompletelyStarted())
CannotAllocateThreadFaultInjector::setFaultProbability(new_server_settings[ServerSetting::cannot_allocate_thread_fault_injection_probability]);
#if USE_GWP_ASAN
GWPAsan::setForceSampleProbability(new_server_settings[ServerSetting::gwp_asan_force_sample_probability]);
#endif
ProfileEvents::increment(ProfileEvents::MainConfigLoads);
/// Must be the last.
@ -2440,7 +2435,6 @@ try
#if USE_GWP_ASAN
GWPAsan::initFinished();
GWPAsan::setForceSampleProbability(server_settings[ServerSetting::gwp_asan_force_sample_probability]);
#endif
try

View File

@ -470,8 +470,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
{
if (!need_render_progress && select_into_file && !select_into_file_and_stdout)
error_stream << "\r";
bool toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle", true);
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), toggle_enabled);
progress_table.writeTable(*tty_buf, progress_table_toggle_on.load(), progress_table_toggle_enabled);
}
}
@ -825,6 +824,9 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
if (!need_render_progress && !need_render_progress_table)
return;
progress_table_toggle_enabled = getClientConfiguration().getBool("enable-progress-table-toggle");
progress_table_toggle_on = !progress_table_toggle_enabled;
/// If need_render_progress and need_render_progress_table are enabled,
/// use ProgressOption that was set for the progress bar for progress table as well.
ProgressOption progress = progress_option ? progress_option : progress_table_option;
@ -881,7 +883,7 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
void ClientBase::initKeystrokeInterceptor()
{
if (is_interactive && need_render_progress_table && getClientConfiguration().getBool("enable-progress-table-toggle", true))
if (is_interactive && need_render_progress_table && progress_table_toggle_enabled)
{
keystroke_interceptor = std::make_unique<TerminalKeystrokeInterceptor>(in_fd, error_stream);
keystroke_interceptor->registerCallback(' ', [this]() { progress_table_toggle_on = !progress_table_toggle_on; });
@ -1151,6 +1153,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
if (keystroke_interceptor)
{
progress_table_toggle_on = false;
try
{
keystroke_interceptor->startIntercept();
@ -1446,6 +1449,9 @@ void ClientBase::onProfileEvents(Block & block)
/// Flush all buffers.
void ClientBase::resetOutput()
{
if (need_render_progress_table && tty_buf)
progress_table.clearTableOutput(*tty_buf);
/// Order is important: format, compression, file
if (output_format)

View File

@ -340,6 +340,7 @@ protected:
ProgressTable progress_table;
bool need_render_progress = true;
bool need_render_progress_table = true;
bool progress_table_toggle_enabled = true;
std::atomic_bool progress_table_toggle_on = false;
bool need_render_profile_events = true;
bool written_first_block = false;

View File

@ -180,9 +180,12 @@ void writeWithWidth(Out & out, std::string_view s, size_t width)
template <typename Out>
void writeWithWidthStrict(Out & out, std::string_view s, size_t width)
{
chassert(width != 0);
constexpr std::string_view ellipsis = "";
if (s.size() > width)
out << s.substr(0, width - 1) << "";
if (width <= ellipsis.size())
out << s.substr(0, width);
else
out << s.substr(0, width - ellipsis.size()) << ellipsis;
else
out << s;
}
@ -219,7 +222,9 @@ void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool sho
writeWithWidth(message, COLUMN_EVENT_NAME, column_event_name_width);
writeWithWidth(message, COLUMN_VALUE, COLUMN_VALUE_WIDTH);
writeWithWidth(message, COLUMN_PROGRESS, COLUMN_PROGRESS_WIDTH);
writeWithWidth(message, COLUMN_DOCUMENTATION_NAME, COLUMN_DOCUMENTATION_WIDTH);
auto col_doc_width = getColumnDocumentationWidth(terminal_width);
if (col_doc_width)
writeWithWidth(message, COLUMN_DOCUMENTATION_NAME, col_doc_width);
message << CLEAR_TO_END_OF_LINE;
double elapsed_sec = watch.elapsedSeconds();
@ -257,9 +262,12 @@ void ProgressTable::writeTable(WriteBufferFromFileDescriptor & message, bool sho
writeWithWidth(message, formatReadableValue(value_type, progress) + "/s", COLUMN_PROGRESS_WIDTH);
message << setColorForDocumentation();
const auto * doc = getDocumentation(event_name_to_event.at(name));
writeWithWidthStrict(message, doc, COLUMN_DOCUMENTATION_WIDTH);
if (col_doc_width)
{
message << setColorForDocumentation();
const auto * doc = getDocumentation(event_name_to_event.at(name));
writeWithWidthStrict(message, doc, col_doc_width);
}
message << RESET_COLOR;
message << CLEAR_TO_END_OF_LINE;
@ -372,6 +380,14 @@ size_t ProgressTable::tableSize() const
return metrics.empty() ? 0 : metrics.size() + 1;
}
size_t ProgressTable::getColumnDocumentationWidth(size_t terminal_width) const
{
auto fixed_columns_width = column_event_name_width + COLUMN_VALUE_WIDTH + COLUMN_PROGRESS_WIDTH;
if (terminal_width < fixed_columns_width + COLUMN_DOCUMENTATION_MIN_WIDTH)
return 0;
return terminal_width - fixed_columns_width;
}
ProgressTable::MetricInfo::MetricInfo(ProfileEvents::Type t) : type(t)
{
}

View File

@ -87,6 +87,7 @@ private:
};
size_t tableSize() const;
size_t getColumnDocumentationWidth(size_t terminal_width) const;
using MetricName = String;
@ -110,7 +111,7 @@ private:
static constexpr std::string_view COLUMN_DOCUMENTATION_NAME = "Documentation";
static constexpr size_t COLUMN_VALUE_WIDTH = 20;
static constexpr size_t COLUMN_PROGRESS_WIDTH = 20;
static constexpr size_t COLUMN_DOCUMENTATION_WIDTH = 100;
static constexpr size_t COLUMN_DOCUMENTATION_MIN_WIDTH = COLUMN_DOCUMENTATION_NAME.size();
std::ostream & output_stream;
int in_fd;

View File

@ -369,6 +369,23 @@ void ColumnArray::popBack(size_t n)
offsets_data.resize_assume_reserved(offsets_data.size() - n);
}
ColumnCheckpointPtr ColumnArray::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), getData().getCheckpoint());
}
void ColumnArray::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
getData().updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnArray::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getData().rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
int ColumnArray::compareAtImpl(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint, const Collator * collator) const
{
const ColumnArray & rhs = assert_cast<const ColumnArray &>(rhs_);

View File

@ -161,6 +161,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(offsets);

View File

@ -1000,6 +1000,56 @@ ColumnPtr ColumnDynamic::compress() const
});
}
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & nested = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
const auto & variants = variant_column_ptr->getVariants();
size_t old_size = nested.size();
chassert(old_size <= variants.size());
for (size_t i = 0; i < old_size; ++i)
{
variants[i]->updateCheckpoint(*nested[i]);
}
/// If column has new variants since last checkpoint create checkpoints for them.
if (old_size < variants.size())
{
nested.resize(variants.size());
for (size_t i = old_size; i < variants.size(); ++i)
nested[i] = variants[i]->getCheckpoint();
}
checkpoint.size = size();
}
void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(nested.size() <= variant_column_ptr->getNumVariants());
/// The structure hasn't changed, so we can use generic rollback of Variant column
if (nested.size() == variant_column_ptr->getNumVariants())
{
variant_column_ptr->rollback(checkpoint);
return;
}
/// Manually rollback internals of Variant column
variant_column_ptr->getOffsets().resize_assume_reserved(checkpoint.size);
variant_column_ptr->getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
auto & variants = variant_column_ptr->getVariants();
for (size_t i = 0; i < nested.size(); ++i)
variants[i]->rollback(*nested[i]);
/// Keep the structure of variant as is but rollback
/// to 0 variants that are not in the checkpoint.
for (size_t i = nested.size(); i < variants.size(); ++i)
variants[i] = variants[i]->cloneEmpty();
}
String ColumnDynamic::getTypeNameAt(size_t row_num) const
{
const auto & variant_col = getVariantColumn();

View File

@ -304,6 +304,15 @@ public:
variant_column_ptr->protect();
}
ColumnCheckpointPtr getCheckpoint() const override
{
return variant_column_ptr->getCheckpoint();
}
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(variant_column);

View File

@ -312,6 +312,21 @@ void ColumnMap::getExtremes(Field & min, Field & max) const
max = std::move(map_max_value);
}
ColumnCheckpointPtr ColumnMap::getCheckpoint() const
{
return nested->getCheckpoint();
}
void ColumnMap::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
nested->updateCheckpoint(checkpoint);
}
void ColumnMap::rollback(const ColumnCheckpoint & checkpoint)
{
nested->rollback(checkpoint);
}
void ColumnMap::forEachSubcolumn(MutableColumnCallback callback)
{
callback(nested);

View File

@ -102,6 +102,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -302,6 +302,23 @@ void ColumnNullable::popBack(size_t n)
getNullMapColumn().popBack(n);
}
ColumnCheckpointPtr ColumnNullable::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), nested_column->getCheckpoint());
}
void ColumnNullable::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
nested_column->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnNullable::rollback(const ColumnCheckpoint & checkpoint)
{
getNullMapData().resize_assume_reserved(checkpoint.size);
nested_column->rollback(*assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested);
}
ColumnPtr ColumnNullable::filter(const Filter & filt, ssize_t result_size_hint) const
{
ColumnPtr filtered_data = getNestedColumn().filter(filt, result_size_hint);

View File

@ -143,6 +143,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override
{
callback(nested_column);

View File

@ -30,6 +30,23 @@ const std::shared_ptr<SerializationDynamic> & getDynamicSerialization()
return dynamic_serialization;
}
struct ColumnObjectCheckpoint : public ColumnCheckpoint
{
using CheckpointsMap = std::unordered_map<std::string_view, ColumnCheckpointPtr>;
ColumnObjectCheckpoint(size_t size_, CheckpointsMap typed_paths_, CheckpointsMap dynamic_paths_, ColumnCheckpointPtr shared_data_)
: ColumnCheckpoint(size_)
, typed_paths(std::move(typed_paths_))
, dynamic_paths(std::move(dynamic_paths_))
, shared_data(std::move(shared_data_))
{
}
CheckpointsMap typed_paths;
CheckpointsMap dynamic_paths;
ColumnCheckpointPtr shared_data;
};
}
ColumnObject::ColumnObject(
@ -698,6 +715,69 @@ void ColumnObject::popBack(size_t n)
shared_data->popBack(n);
}
ColumnCheckpointPtr ColumnObject::getCheckpoint() const
{
auto get_checkpoints = [](const auto & columns)
{
ColumnObjectCheckpoint::CheckpointsMap checkpoints;
for (const auto & [name, column] : columns)
checkpoints[name] = column->getCheckpoint();
return checkpoints;
};
return std::make_shared<ColumnObjectCheckpoint>(size(), get_checkpoints(typed_paths), get_checkpoints(dynamic_paths_ptrs), shared_data->getCheckpoint());
}
void ColumnObject::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & object_checkpoint = assert_cast<ColumnObjectCheckpoint &>(checkpoint);
auto update_checkpoints = [&](const auto & columns_map, auto & checkpoints_map)
{
for (const auto & [name, column] : columns_map)
{
auto & nested = checkpoints_map[name];
if (!nested)
nested = column->getCheckpoint();
else
column->updateCheckpoint(*nested);
}
};
checkpoint.size = size();
update_checkpoints(typed_paths, object_checkpoint.typed_paths);
update_checkpoints(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->updateCheckpoint(*object_checkpoint.shared_data);
}
void ColumnObject::rollback(const ColumnCheckpoint & checkpoint)
{
const auto & object_checkpoint = assert_cast<const ColumnObjectCheckpoint &>(checkpoint);
auto rollback_columns = [&](auto & columns_map, const auto & checkpoints_map)
{
NameSet names_to_remove;
/// Rollback subcolumns and remove paths that were not in checkpoint.
for (auto & [name, column] : columns_map)
{
auto it = checkpoints_map.find(name);
if (it == checkpoints_map.end())
names_to_remove.insert(name);
else
column->rollback(*it->second);
}
for (const auto & name : names_to_remove)
columns_map.erase(name);
};
rollback_columns(typed_paths, object_checkpoint.typed_paths);
rollback_columns(dynamic_paths, object_checkpoint.dynamic_paths);
shared_data->rollback(*object_checkpoint.shared_data);
}
StringRef ColumnObject::serializeValueIntoArena(size_t n, Arena & arena, const char *& begin) const
{
StringRef res(begin, 0);

View File

@ -161,6 +161,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;

View File

@ -308,6 +308,28 @@ void ColumnSparse::popBack(size_t n)
_size = new_size;
}
ColumnCheckpointPtr ColumnSparse::getCheckpoint() const
{
return std::make_shared<ColumnCheckpointWithNested>(size(), values->getCheckpoint());
}
void ColumnSparse::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
values->updateCheckpoint(*assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested);
}
void ColumnSparse::rollback(const ColumnCheckpoint & checkpoint)
{
_size = checkpoint.size;
const auto & nested = *assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested;
chassert(nested.size > 0);
values->rollback(nested);
getOffsetsData().resize_assume_reserved(nested.size - 1);
}
ColumnPtr ColumnSparse::filter(const Filter & filt, ssize_t) const
{
if (_size != filt.size())

View File

@ -149,6 +149,10 @@ public:
ColumnPtr compress() const override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;

View File

@ -240,6 +240,23 @@ ColumnPtr ColumnString::permute(const Permutation & perm, size_t limit) const
return permuteImpl(*this, perm, limit);
}
ColumnCheckpointPtr ColumnString::getCheckpoint() const
{
auto nested = std::make_shared<ColumnCheckpoint>(chars.size());
return std::make_shared<ColumnCheckpointWithNested>(size(), std::move(nested));
}
void ColumnString::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
checkpoint.size = size();
assert_cast<ColumnCheckpointWithNested &>(checkpoint).nested->size = chars.size();
}
void ColumnString::rollback(const ColumnCheckpoint & checkpoint)
{
offsets.resize_assume_reserved(checkpoint.size);
chars.resize_assume_reserved(assert_cast<const ColumnCheckpointWithNested &>(checkpoint).nested->size);
}
void ColumnString::collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const
{

View File

@ -194,6 +194,10 @@ public:
offsets.resize_assume_reserved(offsets.size() - n);
}
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void collectSerializedValueSizes(PaddedPODArray<UInt64> & sizes, const UInt8 * is_null) const override;
StringRef serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const override;

View File

@ -254,6 +254,37 @@ void ColumnTuple::popBack(size_t n)
column->popBack(n);
}
ColumnCheckpointPtr ColumnTuple::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(columns.size());
for (const auto & column : columns)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnTuple::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == columns.size());
checkpoint.size = size();
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnTuple::rollback(const ColumnCheckpoint & checkpoint)
{
column_length = checkpoint.size;
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(columns.size() == checkpoints.size());
for (size_t i = 0; i < columns.size(); ++i)
columns[i]->rollback(*checkpoints[i]);
}
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
if (columns.empty())

View File

@ -118,6 +118,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -739,6 +739,39 @@ void ColumnVariant::popBack(size_t n)
offsets->popBack(n);
}
ColumnCheckpointPtr ColumnVariant::getCheckpoint() const
{
ColumnCheckpoints checkpoints;
checkpoints.reserve(variants.size());
for (const auto & column : variants)
checkpoints.push_back(column->getCheckpoint());
return std::make_shared<ColumnCheckpointWithMultipleNested>(size(), std::move(checkpoints));
}
void ColumnVariant::updateCheckpoint(ColumnCheckpoint & checkpoint) const
{
auto & checkpoints = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(checkpoints.size() == variants.size());
checkpoint.size = size();
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->updateCheckpoint(*checkpoints[i]);
}
void ColumnVariant::rollback(const ColumnCheckpoint & checkpoint)
{
getOffsets().resize_assume_reserved(checkpoint.size);
getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
const auto & checkpoints = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
chassert(variants.size() == checkpoints.size());
for (size_t i = 0; i < variants.size(); ++i)
variants[i]->rollback(*checkpoints[i]);
}
StringRef ColumnVariant::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
/// During any serialization/deserialization we should always use global discriminators.

View File

@ -248,6 +248,9 @@ public:
size_t byteSizeAt(size_t n) const override;
size_t allocatedBytes() const override;
void protect() override;
ColumnCheckpointPtr getCheckpoint() const override;
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
void rollback(const ColumnCheckpoint & checkpoint) override;
void forEachSubcolumn(MutableColumnCallback callback) override;
void forEachSubcolumnRecursively(RecursiveMutableColumnCallback callback) override;
bool structureEquals(const IColumn & rhs) const override;

View File

@ -49,6 +49,40 @@ struct EqualRange
using EqualRanges = std::vector<EqualRange>;
/// A checkpoint that contains size of column and all its subcolumns.
/// It can be used to rollback column to the previous state, for example
/// after failed parsing when column may be in inconsistent state.
struct ColumnCheckpoint
{
size_t size;
explicit ColumnCheckpoint(size_t size_) : size(size_) {}
virtual ~ColumnCheckpoint() = default;
};
using ColumnCheckpointPtr = std::shared_ptr<ColumnCheckpoint>;
using ColumnCheckpoints = std::vector<ColumnCheckpointPtr>;
struct ColumnCheckpointWithNested : public ColumnCheckpoint
{
ColumnCheckpointWithNested(size_t size_, ColumnCheckpointPtr nested_)
: ColumnCheckpoint(size_), nested(std::move(nested_))
{
}
ColumnCheckpointPtr nested;
};
struct ColumnCheckpointWithMultipleNested : public ColumnCheckpoint
{
ColumnCheckpointWithMultipleNested(size_t size_, ColumnCheckpoints nested_)
: ColumnCheckpoint(size_), nested(std::move(nested_))
{
}
ColumnCheckpoints nested;
};
/// Declares interface to store columns in memory.
class IColumn : public COW<IColumn>
{
@ -509,6 +543,17 @@ public:
/// The operation is slow and performed only for debug builds.
virtual void protect() {}
/// Returns checkpoint of current state of column.
virtual ColumnCheckpointPtr getCheckpoint() const { return std::make_shared<ColumnCheckpoint>(size()); }
/// Updates the checkpoint with current state. It is used to avoid extra allocations in 'getCheckpoint'.
virtual void updateCheckpoint(ColumnCheckpoint & checkpoint) const { checkpoint.size = size(); }
/// Rollbacks column to the checkpoint.
/// Unlike 'popBack' this method should work correctly even if column has invalid state.
/// Sizes of columns in checkpoint must be less or equal than current size.
virtual void rollback(const ColumnCheckpoint & checkpoint) { popBack(size() - checkpoint.size); }
/// If the column contains subcolumns (such as Array, Nullable, etc), do callback on them.
/// Shallow: doesn't do recursive calls; don't do call for itself.

View File

@ -920,3 +920,71 @@ TEST(ColumnDynamic, compare)
ASSERT_EQ(column_from->compareAt(3, 2, *column_from, -1), -1);
ASSERT_EQ(column_from->compareAt(3, 4, *column_from, -1), -1);
}
TEST(ColumnDynamic, rollback)
{
auto check_variant = [](const ColumnVariant & column_variant, std::vector<size_t> sizes)
{
ASSERT_EQ(column_variant.getNumVariants(), sizes.size());
size_t num_rows = 0;
for (size_t i = 0; i < sizes.size(); ++i)
{
ASSERT_EQ(column_variant.getVariants()[i]->size(), sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, column_variant.size());
};
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector<size_t> sizes)
{
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(cp).nested;
size_t num_rows = 0;
for (size_t i = 0; i < nested.size(); ++i)
{
ASSERT_EQ(nested[i]->size, sizes[i]);
num_rows += sizes[i];
}
ASSERT_EQ(num_rows, cp.size);
};
std::vector<std::pair<ColumnCheckpointPtr, std::vector<size_t>>> checkpoints;
auto column = ColumnDynamic::create(2);
auto checkpoint = column->getCheckpoint();
column->insert(Field(42));
column->updateCheckpoint(*checkpoint);
checkpoints.emplace_back(checkpoint, std::vector<size_t>{0, 1, 0});
column->insert(Field("str1"));
column->rollback(*checkpoint);
check_checkpoint(*checkpoint, checkpoints.back().second);
check_variant(column->getVariantColumn(), checkpoints.back().second);
column->insert("str1");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 1});
column->insert("str2");
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 2});
column->insert(Array({1, 2}));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{1, 1, 2});
column->insert(Field(42.42));
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{2, 1, 2});
for (const auto & [cp, sizes] : checkpoints)
{
auto column_copy = column->clone();
column_copy->rollback(*cp);
check_checkpoint(*cp, sizes);
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), sizes);
}
}

View File

@ -5,6 +5,7 @@
#include <IO/WriteBufferFromString.h>
#include <Common/Arena.h>
#include "Core/Field.h"
#include <gtest/gtest.h>
using namespace DB;
@ -349,3 +350,65 @@ TEST(ColumnObject, SkipSerializedInArena)
pos = col2->skipSerializedInArena(pos);
ASSERT_EQ(pos, end);
}
TEST(ColumnObject, rollback)
{
auto type = DataTypeFactory::instance().get("JSON(max_dynamic_types=10, max_dynamic_paths=2, a.a UInt32, a.b UInt32)");
auto col = type->createColumn();
auto & col_object = assert_cast<ColumnObject &>(*col);
const auto & typed_paths = col_object.getTypedPaths();
const auto & dynamic_paths = col_object.getDynamicPaths();
const auto & shared_data = col_object.getSharedDataColumn();
auto assert_sizes = [&](size_t size)
{
for (const auto & [name, column] : typed_paths)
ASSERT_EQ(column->size(), size);
for (const auto & [name, column] : dynamic_paths)
ASSERT_EQ(column->size(), size);
ASSERT_EQ(shared_data.size(), size);
};
auto checkpoint = col_object.getCheckpoint();
col_object.insert(Object{{"a.a", Field{1u}}});
col_object.updateCheckpoint(*checkpoint);
col_object.insert(Object{{"a.b", Field{2u}}});
col_object.insert(Object{{"a.a", Field{3u}}});
col_object.rollback(*checkpoint);
assert_sizes(1);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 0);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*typed_paths.at("a.b"))[0], Field{0u});
col_object.insert(Object{{"a.c", Field{"ccc"}}});
checkpoint = col_object.getCheckpoint();
col_object.insert(Object{{"a.d", Field{"ddd"}}});
col_object.insert(Object{{"a.e", Field{"eee"}}});
assert_sizes(4);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 2);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
ASSERT_EQ((*dynamic_paths.at("a.d"))[2], Field{"ddd"});
col_object.rollback(*checkpoint);
assert_sizes(2);
ASSERT_EQ(typed_paths.size(), 2);
ASSERT_EQ(dynamic_paths.size(), 1);
ASSERT_EQ((*typed_paths.at("a.a"))[0], Field{1u});
ASSERT_EQ((*dynamic_paths.at("a.c"))[1], Field{"ccc"});
}

View File

@ -57,7 +57,7 @@ static bool guarded_alloc_initialized = []
opts.MaxSimultaneousAllocations = 1024;
if (!env_options_raw || !std::string_view{env_options_raw}.contains("SampleRate"))
opts.SampleRate = 10000;
opts.SampleRate = 0;
const char * collect_stacktraces = std::getenv("GWP_ASAN_COLLECT_STACKTRACES"); // NOLINT(concurrency-mt-unsafe)
if (collect_stacktraces && std::string_view{collect_stacktraces} == "1")

View File

@ -8,7 +8,6 @@
#include <Common/thread_local_rng.h>
#include <atomic>
#include <random>
namespace GWPAsan
{
@ -39,14 +38,6 @@ inline bool shouldSample()
return init_finished.load(std::memory_order_relaxed) && GuardedAlloc.shouldSample();
}
inline bool shouldForceSample()
{
if (!init_finished.load(std::memory_order_relaxed))
return false;
std::bernoulli_distribution dist(force_sample_probability.load(std::memory_order_relaxed));
return dist(thread_local_rng);
}
}
#endif

View File

@ -115,11 +115,6 @@ protected:
template <typename ... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams &&... allocator_params)
{
#if USE_GWP_ASAN
if (unlikely(GWPAsan::shouldForceSample()))
gwp_asan::getThreadLocals()->NextSampleCounter = 1;
#endif
char * allocated = reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...));
c_start = allocated + pad_left;
@ -149,11 +144,6 @@ protected:
return;
}
#if USE_GWP_ASAN
if (unlikely(GWPAsan::shouldForceSample()))
gwp_asan::getThreadLocals()->NextSampleCounter = 1;
#endif
unprotect();
ptrdiff_t end_diff = c_end - c_start;

View File

@ -184,7 +184,6 @@ namespace DB
DECLARE(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overridden by a merge tree setting)", 0) \
DECLARE(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
DECLARE(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
DECLARE(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
DECLARE(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
DECLARE(UInt64, memory_worker_period_ms, 0, "Tick period of background memory worker which corrects memory tracker memory usages and cleans up unused pages during higher memory usage. If set to 0, default value will be used depending on the memory usage source", 0) \
DECLARE(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0) \

View File

@ -1085,7 +1085,7 @@ public:
}
auto & col_lc = assert_cast<ColumnLowCardinality &>(column);
auto tmp_nested = col_lc.getDictionary().getNestedColumn()->cloneEmpty();
auto tmp_nested = removeNullable(col_lc.getDictionary().getNestedColumn()->cloneEmpty())->assumeMutable();
if (!nested->insertResultToColumn(*tmp_nested, element, insert_settings, format_settings, error))
return false;

View File

@ -1,27 +1,28 @@
#include <Functions/IFunctionAdaptors.h>
#include <Functions/FunctionDynamicAdaptor.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/SipHash.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNothing.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnTuple.h>
#include <Core/Block.h>
#include <Core/TypeId.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnNothing.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/Native.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Functions/FunctionHelpers.h>
#include <cstdlib>
#include <memory>
#include <Common/SipHash.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include "config.h"
#include <cstdlib>
#include <memory>
#if USE_EMBEDDED_COMPILER
# include <llvm/IR/IRBuilder.h>
#endif
@ -451,6 +452,7 @@ FunctionBasePtr IFunctionOverloadResolver::build(const ColumnsWithTypeAndName &
/// Use FunctionBaseDynamicAdaptor if default implementation for Dynamic is enabled and we have Dynamic type in arguments.
if (useDefaultImplementationForDynamic())
{
checkNumberOfArguments(arguments.size());
for (const auto & arg : arguments)
{
if (isDynamic(arg.type))

View File

@ -44,16 +44,10 @@ struct Memory : boost::noncopyable, Allocator
char * m_data = nullptr;
size_t alignment = 0;
[[maybe_unused]] bool allow_gwp_asan_force_sample{false};
Memory() = default;
/// If alignment != 0, then allocate memory aligned to specified value.
explicit Memory(size_t size_, size_t alignment_ = 0, bool allow_gwp_asan_force_sample_ = false)
: alignment(alignment_), allow_gwp_asan_force_sample(allow_gwp_asan_force_sample_)
{
alloc(size_);
}
explicit Memory(size_t size_, size_t alignment_ = 0) : alignment(alignment_) { alloc(size_); }
~Memory()
{
@ -133,11 +127,6 @@ private:
ProfileEvents::increment(ProfileEvents::IOBufferAllocs);
ProfileEvents::increment(ProfileEvents::IOBufferAllocBytes, new_capacity);
#if USE_GWP_ASAN
if (unlikely(allow_gwp_asan_force_sample && GWPAsan::shouldForceSample()))
gwp_asan::getThreadLocals()->NextSampleCounter = 1;
#endif
m_data = static_cast<char *>(Allocator::alloc(new_capacity, alignment));
m_capacity = new_capacity;
m_size = new_size;
@ -165,7 +154,7 @@ protected:
public:
/// If non-nullptr 'existing_memory' is passed, then buffer will not create its own memory and will use existing_memory without ownership.
explicit BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE, char * existing_memory = nullptr, size_t alignment = 0)
: Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment, /*allow_gwp_asan_force_sample_=*/true)
: Base(nullptr, 0), memory(existing_memory ? 0 : size, alignment)
{
Base::set(existing_memory ? existing_memory : memory.data(), size);
Base::padded = !existing_memory;

View File

@ -1050,15 +1050,14 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
adding_defaults_transform = std::make_shared<AddingDefaultsTransform>(header, columns, *format, insert_context);
}
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
current_exception = e.displayText();
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
key.query_str, current_entry->query_id, current_exception);
for (const auto & column : result_columns)
if (column->size() > total_rows)
column->popBack(column->size() - total_rows);
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->rollback(*checkpoints[i]);
current_entry->finish(std::current_exception());
return 0;

View File

@ -22,8 +22,12 @@ StreamingFormatExecutor::StreamingFormatExecutor(
, adding_defaults_transform(std::move(adding_defaults_transform_))
, port(format->getPort().getHeader(), format.get())
, result_columns(header.cloneEmptyColumns())
, checkpoints(result_columns.size())
{
connect(format->getPort(), port);
for (size_t i = 0; i < result_columns.size(); ++i)
checkpoints[i] = result_columns[i]->getCheckpoint();
}
MutableColumns StreamingFormatExecutor::getResultColumns()
@ -53,6 +57,9 @@ size_t StreamingFormatExecutor::execute(ReadBuffer & buffer)
size_t StreamingFormatExecutor::execute()
{
for (size_t i = 0; i < result_columns.size(); ++i)
result_columns[i]->updateCheckpoint(*checkpoints[i]);
try
{
size_t new_rows = 0;
@ -85,19 +92,19 @@ size_t StreamingFormatExecutor::execute()
catch (Exception & e)
{
format->resetParser();
return on_error(result_columns, e);
return on_error(result_columns, checkpoints, e);
}
catch (std::exception & e)
{
format->resetParser();
auto exception = Exception(Exception::CreateFromSTDTag{}, e);
return on_error(result_columns, exception);
return on_error(result_columns, checkpoints, exception);
}
catch (...)
{
format->resetParser();
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknowk exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, exception);
auto exception = Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception while executing StreamingFormatExecutor with format {}", format->getName());
return on_error(result_columns, checkpoints, exception);
}
}

View File

@ -19,12 +19,12 @@ public:
/// and exception to rethrow it or add context to it.
/// Should return number of new rows, which are added in callback
/// to result columns in comparison to previous call of `execute`.
using ErrorCallback = std::function<size_t(const MutableColumns &, Exception &)>;
using ErrorCallback = std::function<size_t(const MutableColumns &, const ColumnCheckpoints &, Exception &)>;
StreamingFormatExecutor(
const Block & header_,
InputFormatPtr format_,
ErrorCallback on_error_ = [](const MutableColumns &, Exception & e) -> size_t { throw std::move(e); },
ErrorCallback on_error_ = [](const MutableColumns &, const ColumnCheckpoints, Exception & e) -> size_t { throw std::move(e); },
SimpleTransformPtr adding_defaults_transform_ = nullptr);
/// Returns numbers of new read rows.
@ -50,6 +50,7 @@ private:
InputPort port;
MutableColumns result_columns;
ColumnCheckpoints checkpoints;
};
}

View File

@ -111,6 +111,10 @@ Chunk IRowInputFormat::read()
for (size_t i = 0; i < num_columns; ++i)
columns[i] = header.getByPosition(i).type->createColumn(*serializations[i]);
ColumnCheckpoints checkpoints(columns.size());
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
checkpoints[column_idx] = columns[column_idx]->getCheckpoint();
block_missing_values.clear();
size_t num_rows = 0;
@ -136,6 +140,9 @@ Chunk IRowInputFormat::read()
{
try
{
for (size_t column_idx = 0; column_idx < columns.size(); ++column_idx)
columns[column_idx]->updateCheckpoint(*checkpoints[column_idx]);
info.read_columns.clear();
continue_reading = readRow(columns, info);
@ -199,14 +206,9 @@ Chunk IRowInputFormat::read()
syncAfterError();
/// Truncate all columns in block to initial size (remove values, that was appended to only part of columns).
/// Rollback all columns in block to initial size (remove values, that was appended to only part of columns).
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
auto & column = columns[column_idx];
if (column->size() > num_rows)
column->popBack(column->size() - num_rows);
}
columns[column_idx]->rollback(*checkpoints[column_idx]);
}
}
}

View File

@ -172,7 +172,7 @@ JSONAsObjectRowInputFormat::JSONAsObjectRowInputFormat(
const auto & type = header_.getByPosition(0).type;
if (!isObject(type) && !isObjectDeprecated(type))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Input format JSONAsObject is only suitable for tables with a single column of type Object/JSON but the column type is {}",
"Input format JSONAsObject is only suitable for tables with a single column of type JSON but the column type is {}",
type->getName());
}
@ -193,8 +193,8 @@ JSONAsObjectExternalSchemaReader::JSONAsObjectExternalSchemaReader(const FormatS
if (!settings.json.allow_deprecated_object_type && !settings.json.allow_json_type)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Cannot infer the data structure in JSONAsObject format because experimental Object/JSON type is not allowed. Set setting "
"allow_experimental_object_type = 1 or allow_experimental_json_type=1 in order to allow it");
"Cannot infer the data structure in JSONAsObject format because experimental JSON type is not allowed. Set setting "
"allow_experimental_json_type = 1 in order to allow it");
}
void registerInputFormatJSONAsString(FormatFactory & factory)

View File

@ -301,6 +301,9 @@ void TCPHandler::runImpl()
{
receiveHello();
if (!default_database.empty())
DatabaseCatalog::instance().assertDatabaseExists(default_database);
/// In interserver mode queries are executed without a session context.
if (!is_interserver_mode)
session->makeSessionContext();

View File

@ -86,21 +86,18 @@ Chunk FileLogSource::generate()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -114,23 +114,20 @@ Chunk KafkaSource::generateImpl()
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -854,23 +854,20 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
size_t total_rows = 0;
size_t failed_poll_attempts = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// all data columns will get default value in case of error
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -107,21 +107,18 @@ Chunk NATSSource::generate()
storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1);
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -219,6 +219,108 @@ ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Proces
return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
}
void ObjectStorageQueueMetadata::alterSettings(const SettingsChanges & changes)
{
const fs::path alter_settings_lock_path = zookeeper_path / "alter_settings_lock";
zkutil::EphemeralNodeHolder::Ptr alter_settings_lock;
auto zookeeper = getZooKeeper();
/// We will retry taking alter_settings_lock for the duration of 5 seconds.
/// Do we need to add a setting for this?
const size_t num_tries = 100;
for (size_t i = 0; i < num_tries; ++i)
{
alter_settings_lock = zkutil::EphemeralNodeHolder::tryCreate(alter_settings_lock_path, *zookeeper, toString(getCurrentTime()));
if (alter_settings_lock)
break;
if (i == num_tries - 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to take alter setting lock");
sleepForMilliseconds(50);
}
Coordination::Stat stat;
auto metadata_str = zookeeper->get(fs::path(zookeeper_path) / "metadata", &stat);
auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(metadata_str);
auto new_table_metadata{table_metadata};
for (const auto & change : changes)
{
if (!ObjectStorageQueueTableMetadata::isStoredInKeeper(change.name))
continue;
if (change.name == "processing_threads_num")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.processing_threads_num == value)
{
LOG_TRACE(log, "Setting `processing_threads_num` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.processing_threads_num = value;
}
else if (change.name == "loading_retries")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.loading_retries == value)
{
LOG_TRACE(log, "Setting `loading_retries` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.loading_retries = value;
}
else if (change.name == "after_processing")
{
const auto value = ObjectStorageQueueTableMetadata::actionFromString(change.value.safeGet<String>());
if (table_metadata.after_processing == value)
{
LOG_TRACE(log, "Setting `after_processing` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.after_processing = value;
}
else if (change.name == "tracked_files_limit")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.tracked_files_limit == value)
{
LOG_TRACE(log, "Setting `tracked_files_limit` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.tracked_files_limit = value;
}
else if (change.name == "tracked_file_ttl_sec")
{
const auto value = change.value.safeGet<UInt64>();
if (table_metadata.tracked_files_ttl_sec == value)
{
LOG_TRACE(log, "Setting `tracked_file_ttl_sec` already equals {}. "
"Will do nothing", value);
continue;
}
new_table_metadata.tracked_files_ttl_sec = value;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Setting `{}` is not changeable", change.name);
}
}
const auto new_metadata_str = new_table_metadata.toString();
LOG_TRACE(log, "New metadata: {}", new_metadata_str);
const fs::path table_metadata_path = zookeeper_path / "metadata";
zookeeper->set(table_metadata_path, new_metadata_str, stat.version);
table_metadata.syncChangeableSettings(new_table_metadata);
}
ObjectStorageQueueTableMetadata ObjectStorageQueueMetadata::syncWithKeeper(
const fs::path & zookeeper_path,
const ObjectStorageQueueSettings & settings,

View File

@ -9,6 +9,7 @@
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/SettingsChanges.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
@ -89,6 +90,8 @@ public:
const ObjectStorageQueueTableMetadata & getTableMetadata() const { return table_metadata; }
ObjectStorageQueueTableMetadata & getTableMetadata() { return table_metadata; }
void alterSettings(const SettingsChanges & changes);
private:
void cleanupThreadFunc();
void cleanupThreadFuncImpl();

View File

@ -381,10 +381,10 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH = 0,
REMOVE_PROCESSING_ID_PATH = 1,
REMOVE_PROCESSING_PATH = 2,
SET_MAX_PROCESSED_PATH = 3,
};
const auto zk_client = getZooKeeper();
@ -409,8 +409,18 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
if (Coordination::isHardwareError(code))
failure_reason = "Lost connection to keeper";
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
{
LOG_TRACE(log, "Cannot set file {} as processed. "
@ -418,13 +428,12 @@ void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
"Will retry.", path, code);
continue;
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
failure_reason = "Version of processing id node changed";
else if (is_request_failed(REMOVE_PROCESSING_PATH))
failure_reason = "Failed to remove processing path";
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
return;
}

View File

@ -23,15 +23,15 @@ namespace ErrorCodes
0) \
DECLARE(ObjectStorageQueueAction, after_processing, ObjectStorageQueueAction::KEEP, "Delete or keep file in after successful processing", 0) \
DECLARE(String, keeper_path, "", "Zookeeper node path", 0) \
DECLARE(UInt32, loading_retries, 10, "Retry loading up to specified number of times", 0) \
DECLARE(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
DECLARE(UInt64, loading_retries, 10, "Retry loading up to specified number of times", 0) \
DECLARE(UInt64, processing_threads_num, 1, "Number of processing threads", 0) \
DECLARE(UInt32, enable_logging_to_queue_log, 1, "Enable logging to system table system.(s3/azure_)queue_log", 0) \
DECLARE(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
DECLARE(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
DECLARE(UInt32, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
DECLARE(UInt32, polling_backoff_ms, 1000, "Polling backoff", 0) \
DECLARE(UInt32, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
DECLARE(UInt64, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
DECLARE(UInt64, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
DECLARE(UInt64, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
DECLARE(UInt64, polling_backoff_ms, 1000, "Polling backoff", 0) \
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
DECLARE(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
@ -112,6 +112,11 @@ ObjectStorageQueueSettings::~ObjectStorageQueueSettings() = default;
OBJECT_STORAGE_QUEUE_SETTINGS_SUPPORTED_TYPES(ObjectStorageQueueSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
void ObjectStorageQueueSettings::applyChanges(const SettingsChanges & changes)
{
impl->applyChanges(changes);
}
void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
@ -156,4 +161,9 @@ void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
}
}
Field ObjectStorageQueueSettings::get(const std::string & name)
{
return impl->get(name);
}
}

View File

@ -12,6 +12,7 @@ class ASTStorage;
struct ObjectStorageQueueSettingsImpl;
struct MutableColumnsAndConstraints;
class StorageObjectStorageQueue;
class SettingsChanges;
/// List of available types supported in ObjectStorageQueueSettings object
#define OBJECT_STORAGE_QUEUE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
@ -61,6 +62,10 @@ struct ObjectStorageQueueSettings
void loadFromQuery(ASTStorage & storage_def);
void applyChanges(const SettingsChanges & changes);
Field get(const std::string & name);
private:
std::unique_ptr<ObjectStorageQueueSettingsImpl> impl;
};

View File

@ -657,7 +657,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
{
if (files_metadata->getTableMetadata().after_processing == "delete")
if (files_metadata->getTableMetadata().after_processing == ObjectStorageQueueAction::DELETE)
{
object_storage->removeObject(StoredObject(path));
}

View File

@ -17,11 +17,11 @@ namespace ObjectStorageQueueSetting
extern const ObjectStorageQueueSettingsObjectStorageQueueAction after_processing;
extern const ObjectStorageQueueSettingsUInt32 buckets;
extern const ObjectStorageQueueSettingsString last_processed_path;
extern const ObjectStorageQueueSettingsUInt32 loading_retries;
extern const ObjectStorageQueueSettingsObjectStorageQueueMode mode;
extern const ObjectStorageQueueSettingsUInt32 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt32 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt32 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt64 loading_retries;
extern const ObjectStorageQueueSettingsUInt64 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt64 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt64 tracked_file_ttl_sec;
}
@ -56,13 +56,13 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
const std::string & format_)
: format_name(format_)
, columns(columns_.toString())
, after_processing(engine_settings[ObjectStorageQueueSetting::after_processing].toString())
, mode(engine_settings[ObjectStorageQueueSetting::mode].toString())
, tracked_files_limit(engine_settings[ObjectStorageQueueSetting::tracked_files_limit])
, tracked_files_ttl_sec(engine_settings[ObjectStorageQueueSetting::tracked_file_ttl_sec])
, buckets(engine_settings[ObjectStorageQueueSetting::buckets])
, last_processed_path(engine_settings[ObjectStorageQueueSetting::last_processed_path])
, after_processing(engine_settings[ObjectStorageQueueSetting::after_processing])
, loading_retries(engine_settings[ObjectStorageQueueSetting::loading_retries])
, tracked_files_limit(engine_settings[ObjectStorageQueueSetting::tracked_files_limit])
, tracked_files_ttl_sec(engine_settings[ObjectStorageQueueSetting::tracked_file_ttl_sec])
{
processing_threads_num_changed = engine_settings[ObjectStorageQueueSetting::processing_threads_num].changed;
if (!processing_threads_num_changed && engine_settings[ObjectStorageQueueSetting::processing_threads_num] <= 1)
@ -74,16 +74,16 @@ ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
String ObjectStorageQueueTableMetadata::toString() const
{
Poco::JSON::Object json;
json.set("after_processing", after_processing);
json.set("after_processing", actionToString(after_processing.load()));
json.set("mode", mode);
json.set("tracked_files_limit", tracked_files_limit);
json.set("tracked_files_ttl_sec", tracked_files_ttl_sec);
json.set("processing_threads_num", processing_threads_num);
json.set("tracked_files_limit", tracked_files_limit.load());
json.set("tracked_files_ttl_sec", tracked_files_ttl_sec.load());
json.set("processing_threads_num", processing_threads_num.load());
json.set("buckets", buckets);
json.set("format_name", format_name);
json.set("columns", columns);
json.set("last_processed_file", last_processed_path);
json.set("loading_retries", loading_retries);
json.set("loading_retries", loading_retries.load());
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
@ -91,6 +91,26 @@ String ObjectStorageQueueTableMetadata::toString() const
return oss.str();
}
ObjectStorageQueueAction ObjectStorageQueueTableMetadata::actionFromString(const std::string & action)
{
if (action == "keep")
return ObjectStorageQueueAction::KEEP;
if (action == "delete")
return ObjectStorageQueueAction::DELETE;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected ObjectStorageQueue action: {}", action);
}
std::string ObjectStorageQueueTableMetadata::actionToString(ObjectStorageQueueAction action)
{
switch (action)
{
case ObjectStorageQueueAction::DELETE:
return "delete";
case ObjectStorageQueueAction::KEEP:
return "keep";
}
}
ObjectStorageQueueMode ObjectStorageQueueTableMetadata::getMode() const
{
return modeFromString(mode);
@ -115,14 +135,14 @@ static auto getOrDefault(
ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json)
: format_name(json->getValue<String>("format_name"))
, columns(json->getValue<String>("columns"))
, after_processing(json->getValue<String>("after_processing"))
, mode(json->getValue<String>("mode"))
, tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
, tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0)))
, buckets(getOrDefault(json, "buckets", "", 0))
, last_processed_path(getOrDefault<String>(json, "last_processed_file", "s3queue_", ""))
, after_processing(actionFromString(json->getValue<String>("after_processing")))
, loading_retries(getOrDefault(json, "loading_retries", "", 10))
, processing_threads_num(getOrDefault(json, "processing_threads_num", "s3queue_", 1))
, tracked_files_limit(getOrDefault(json, "tracked_files_limit", "s3queue_", 0))
, tracked_files_ttl_sec(getOrDefault(json, "tracked_files_ttl_sec", "", getOrDefault(json, "tracked_file_ttl_sec", "s3queue_", 0)))
{
validateMode(mode);
}
@ -148,7 +168,7 @@ void ObjectStorageQueueTableMetadata::adjustFromKeeper(const ObjectStorageQueueT
else
LOG_TRACE(log, "{}", message);
processing_threads_num = from_zk.processing_threads_num;
processing_threads_num = from_zk.processing_threads_num.load();
}
}
@ -164,8 +184,8 @@ void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectSto
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs "
"in action after processing. Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.after_processing),
DB::toString(after_processing));
DB::toString(from_zk.after_processing.load()),
DB::toString(after_processing.load()));
if (mode != from_zk.mode)
throw Exception(

View File

@ -19,17 +19,19 @@ class ReadBuffer;
*/
struct ObjectStorageQueueTableMetadata
{
/// Non-changeable settings.
const String format_name;
const String columns;
const String after_processing;
const String mode;
const UInt32 tracked_files_limit;
const UInt32 tracked_files_ttl_sec;
const UInt32 buckets;
const String last_processed_path;
const UInt32 loading_retries;
/// Changeable settings.
std::atomic<ObjectStorageQueueAction> after_processing;
std::atomic<UInt64> loading_retries;
std::atomic<UInt64> processing_threads_num;
std::atomic<UInt64> tracked_files_limit;
std::atomic<UInt64> tracked_files_ttl_sec;
UInt32 processing_threads_num; /// Can be changed from keeper.
bool processing_threads_num_changed = false;
ObjectStorageQueueTableMetadata(
@ -37,10 +39,36 @@ struct ObjectStorageQueueTableMetadata
const ColumnsDescription & columns_,
const std::string & format_);
ObjectStorageQueueTableMetadata(const ObjectStorageQueueTableMetadata & other)
: format_name(other.format_name)
, columns(other.columns)
, mode(other.mode)
, buckets(other.buckets)
, last_processed_path(other.last_processed_path)
, after_processing(other.after_processing.load())
, loading_retries(other.loading_retries.load())
, processing_threads_num(other.processing_threads_num.load())
, tracked_files_limit(other.tracked_files_limit.load())
, tracked_files_ttl_sec(other.tracked_files_ttl_sec.load())
{
}
void syncChangeableSettings(const ObjectStorageQueueTableMetadata & other)
{
after_processing = other.after_processing.load();
loading_retries = other.loading_retries.load();
processing_threads_num = other.processing_threads_num.load();
tracked_files_limit = other.tracked_files_limit.load();
tracked_files_ttl_sec = other.tracked_files_ttl_sec.load();
}
explicit ObjectStorageQueueTableMetadata(const Poco::JSON::Object::Ptr & json);
static ObjectStorageQueueTableMetadata parse(const String & metadata_str);
static ObjectStorageQueueAction actionFromString(const std::string & action);
static std::string actionToString(ObjectStorageQueueAction action);
String toString() const;
ObjectStorageQueueMode getMode() const;
@ -49,6 +77,25 @@ struct ObjectStorageQueueTableMetadata
void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
static bool isStoredInKeeper(const std::string & name)
{
static const std::unordered_set<std::string_view> settings_names
{
"format_name",
"columns",
"mode",
"buckets",
"last_processed_path",
"after_processing",
"loading_retries",
"processing_threads_num",
"tracked_files_limit",
"tracked_file_ttl_sec",
"tracked_files_ttl_sec",
};
return settings_names.contains(name);
}
private:
void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
};

View File

@ -103,29 +103,46 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
/// In one zookeeper transaction do the following:
enum RequestType
{
SET_MAX_PROCESSED_PATH = 0,
CHECK_PROCESSING_ID_PATH = 1, /// Optional.
REMOVE_PROCESSING_ID_PATH = 2, /// Optional.
REMOVE_PROCESSING_PATH = 3, /// Optional.
CHECK_PROCESSING_ID_PATH,
REMOVE_PROCESSING_ID_PATH,
REMOVE_PROCESSING_PATH,
SET_PROCESSED_PATH,
};
const auto zk_client = getZooKeeper();
std::string failure_reason;
Coordination::Requests requests;
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
std::map<RequestType, UInt8> request_index;
if (processing_id_version.has_value())
{
requests.push_back(zkutil::makeCheckRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_id_path, processing_id_version.value()));
requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1));
/// The order is important:
/// we must first check processing nodes and set processed_path the last.
request_index[CHECK_PROCESSING_ID_PATH] = 0;
request_index[REMOVE_PROCESSING_ID_PATH] = 1;
request_index[REMOVE_PROCESSING_PATH] = 2;
request_index[SET_PROCESSED_PATH] = 3;
}
else
{
request_index[SET_PROCESSED_PATH] = 0;
}
requests.push_back(
zkutil::makeCreateRequest(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; };
auto is_request_failed = [&](RequestType type)
{
if (!request_index.contains(type))
return false;
chassert(request_index[type] < responses.size());
return responses[request_index[type]]->error != Coordination::Error::ZOK;
};
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
@ -140,18 +157,41 @@ void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
return;
}
bool unexpected_error = false;
std::string failure_reason;
if (Coordination::isHardwareError(code))
{
failure_reason = "Lost connection to keeper";
else if (is_request_failed(SET_MAX_PROCESSED_PATH))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot create a persistent node in /processed since it already exists");
}
else if (is_request_failed(CHECK_PROCESSING_ID_PATH))
{
/// This is normal in case of expired session with keeper.
failure_reason = "Version of processing id node changed";
}
else if (is_request_failed(REMOVE_PROCESSING_ID_PATH))
{
/// Remove processing_id node should not actually fail
/// because we just checked in a previous keeper request that it exists and has a certain version.
unexpected_error = true;
failure_reason = "Failed to remove processing id path";
}
else if (is_request_failed(REMOVE_PROCESSING_PATH))
{
/// This is normal in case of expired session with keeper as this node is ephemeral.
failure_reason = "Failed to remove processing path";
}
else if (is_request_failed(SET_PROCESSED_PATH))
{
unexpected_error = true;
failure_reason = "Cannot create a persistent node in /processed since it already exists";
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
if (unexpected_error)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}", failure_reason);
LOG_WARNING(log, "Cannot set file {} as processed: {}. Reason: {}", path, code, failure_reason);
}

View File

@ -24,6 +24,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/AlterCommands.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <filesystem>
@ -51,15 +52,15 @@ namespace ObjectStorageQueueSetting
extern const ObjectStorageQueueSettingsUInt64 max_processed_files_before_commit;
extern const ObjectStorageQueueSettingsUInt64 max_processed_rows_before_commit;
extern const ObjectStorageQueueSettingsUInt64 max_processing_time_sec_before_commit;
extern const ObjectStorageQueueSettingsUInt32 polling_min_timeout_ms;
extern const ObjectStorageQueueSettingsUInt32 polling_max_timeout_ms;
extern const ObjectStorageQueueSettingsUInt32 polling_backoff_ms;
extern const ObjectStorageQueueSettingsUInt32 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt64 polling_min_timeout_ms;
extern const ObjectStorageQueueSettingsUInt64 polling_max_timeout_ms;
extern const ObjectStorageQueueSettingsUInt64 polling_backoff_ms;
extern const ObjectStorageQueueSettingsUInt64 processing_threads_num;
extern const ObjectStorageQueueSettingsUInt32 buckets;
extern const ObjectStorageQueueSettingsUInt32 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt32 tracked_files_limit;
extern const ObjectStorageQueueSettingsUInt64 tracked_file_ttl_sec;
extern const ObjectStorageQueueSettingsUInt64 tracked_files_limit;
extern const ObjectStorageQueueSettingsString last_processed_path;
extern const ObjectStorageQueueSettingsUInt32 loading_retries;
extern const ObjectStorageQueueSettingsUInt64 loading_retries;
extern const ObjectStorageQueueSettingsObjectStorageQueueAction after_processing;
}
@ -69,6 +70,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int BAD_QUERY_PARAMETER;
extern const int QUERY_NOT_ALLOWED;
extern const int SUPPORT_IS_DISABLED;
}
namespace
@ -353,10 +355,11 @@ void StorageObjectStorageQueue::read(
void ReadFromObjectStorageQueue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const size_t adjusted_num_streams = storage->getTableMetadata().processing_threads_num;
size_t processing_threads_num = storage->getTableMetadata().processing_threads_num;
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
for (size_t i = 0; i < processing_threads_num; ++i)
pipes.emplace_back(storage->createSource(
i/* processor_id */,
info,
@ -555,6 +558,174 @@ bool StorageObjectStorageQueue::streamToViews()
return total_rows > 0;
}
static const std::unordered_set<std::string_view> changeable_settings_unordered_mode
{
"processing_threads_num",
"loading_retries",
"after_processing",
"tracked_files_limit",
"tracked_file_ttl_sec",
"polling_min_timeout_ms",
"polling_max_timeout_ms",
"polling_backoff_ms",
/// For compatibility.
"s3queue_processing_threads_num",
"s3queue_loading_retries",
"s3queue_after_processing",
"s3queue_tracked_files_limit",
"s3queue_tracked_file_ttl_sec",
"s3queue_polling_min_timeout_ms",
"s3queue_polling_max_timeout_ms",
"s3queue_polling_backoff_ms",
};
static const std::unordered_set<std::string_view> changeable_settings_ordered_mode
{
"loading_retries",
"after_processing",
"polling_min_timeout_ms",
"polling_max_timeout_ms",
"polling_backoff_ms",
/// For compatibility.
"s3queue_loading_retries",
"s3queue_after_processing",
"s3queue_polling_min_timeout_ms",
"s3queue_polling_max_timeout_ms",
"s3queue_polling_backoff_ms",
};
static bool isSettingChangeable(const std::string & name, ObjectStorageQueueMode mode)
{
if (mode == ObjectStorageQueueMode::UNORDERED)
return changeable_settings_unordered_mode.contains(name);
else
return changeable_settings_ordered_mode.contains(name);
}
void StorageObjectStorageQueue::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::MODIFY_SETTING && command.type != AlterCommand::RESET_SETTING)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only MODIFY SETTING alter is allowed for {}", getName());
}
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
commands.apply(new_metadata, local_context);
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
if (!new_metadata.hasSettingsChanges())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No settings changes");
const auto & new_changes = new_metadata.settings_changes->as<const ASTSetQuery &>().changes;
const auto & old_changes = old_metadata.settings_changes->as<const ASTSetQuery &>().changes;
const auto mode = getTableMetadata().getMode();
for (const auto & changed_setting : new_changes)
{
auto it = std::find_if(
old_changes.begin(), old_changes.end(),
[&](const SettingChange & change) { return change.name == changed_setting.name; });
const bool setting_changed = it != old_changes.end() && it->value != changed_setting.value;
if (setting_changed)
{
if (!isSettingChangeable(changed_setting.name, mode))
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Changing setting {} is not allowed for {} mode of {}",
changed_setting.name, magic_enum::enum_name(mode), getName());
}
}
}
}
void StorageObjectStorageQueue::alter(
const AlterCommands & commands,
ContextPtr local_context,
AlterLockHolder &)
{
if (commands.isSettingsAlter())
{
auto table_id = getStorageID();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
const auto & old_settings = old_metadata.settings_changes->as<const ASTSetQuery &>().changes;
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
commands.apply(new_metadata, local_context);
auto new_settings = new_metadata.settings_changes->as<ASTSetQuery &>().changes;
ObjectStorageQueueSettings default_settings;
for (const auto & setting : old_settings)
{
auto it = std::find_if(
new_settings.begin(), new_settings.end(),
[&](const SettingChange & change) { return change.name == setting.name; });
if (it == new_settings.end())
{
/// Setting was reset.
new_settings.push_back(SettingChange(setting.name, default_settings.get(setting.name)));
}
}
SettingsChanges changed_settings;
std::set<std::string> changed_settings_set;
const auto mode = getTableMetadata().getMode();
for (const auto & setting : new_settings)
{
auto it = std::find_if(
old_settings.begin(), old_settings.end(),
[&](const SettingChange & change) { return change.name == setting.name; });
const bool setting_changed = it == old_settings.end() || it->value != setting.value;
if (!setting_changed)
continue;
if (!isSettingChangeable(setting.name, mode))
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Changing setting {} is not allowed for {} mode of {}",
setting.name, magic_enum::enum_name(mode), getName());
}
SettingChange result_setting(setting);
if (result_setting.name.starts_with("s3queue_"))
result_setting.name = result_setting.name.substr(std::strlen("s3queue_"));
changed_settings.push_back(result_setting);
auto inserted = changed_settings_set.emplace(result_setting.name).second;
if (!inserted)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting {} is duplicated", setting.name);
}
/// Alter settings which are stored in keeper.
files_metadata->alterSettings(changed_settings);
/// Alter settings which are not stored in keeper.
for (const auto & change : changed_settings)
{
if (change.name == "polling_min_timeout_ms")
polling_min_timeout_ms = change.value.safeGet<UInt64>();
if (change.name == "polling_max_timeout_ms")
polling_max_timeout_ms = change.value.safeGet<UInt64>();
if (change.name == "polling_backoff_ms")
polling_backoff_ms = change.value.safeGet<UInt64>();
}
StorageInMemoryMetadata metadata = getInMemoryMetadata();
metadata.setSettingsChanges(new_metadata.settings_changes);
setInMemoryMetadata(metadata);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
}
}
zkutil::ZooKeeperPtr StorageObjectStorageQueue::getZooKeeper() const
{
return getContext()->getZooKeeper();
@ -582,8 +753,8 @@ ObjectStorageQueueSettings StorageObjectStorageQueue::getSettings() const
settings[ObjectStorageQueueSetting::processing_threads_num] = table_metadata.processing_threads_num;
settings[ObjectStorageQueueSetting::enable_logging_to_queue_log] = enable_logging_to_queue_log;
settings[ObjectStorageQueueSetting::last_processed_path] = table_metadata.last_processed_path;
settings[ObjectStorageQueueSetting::tracked_file_ttl_sec] = 0;
settings[ObjectStorageQueueSetting::tracked_files_limit] = 0;
settings[ObjectStorageQueueSetting::tracked_file_ttl_sec] = table_metadata.tracked_files_ttl_sec;
settings[ObjectStorageQueueSetting::tracked_files_limit] = table_metadata.tracked_files_limit;
settings[ObjectStorageQueueSetting::polling_min_timeout_ms] = polling_min_timeout_ms;
settings[ObjectStorageQueueSetting::polling_max_timeout_ms] = polling_max_timeout_ms;
settings[ObjectStorageQueueSetting::polling_backoff_ms] = polling_backoff_ms;

View File

@ -48,6 +48,13 @@ public:
size_t max_block_size,
size_t num_streams) override;
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const override;
void alter(
const AlterCommands & commands,
ContextPtr local_context,
AlterLockHolder & table_lock_holder) override;
const auto & getFormatName() const { return configuration->format; }
const fs::path & getZooKeeperPath() const { return zk_path; }
@ -65,9 +72,9 @@ private:
const std::string engine_name;
const fs::path zk_path;
const bool enable_logging_to_queue_log;
const UInt32 polling_min_timeout_ms;
const UInt32 polling_max_timeout_ms;
const UInt32 polling_backoff_ms;
UInt64 polling_min_timeout_ms;
UInt64 polling_max_timeout_ms;
UInt64 polling_backoff_ms;
const CommitSettings commit_settings;
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;

View File

@ -165,21 +165,18 @@ Chunk RabbitMQSource::generateImpl()
std::optional<String> exception_message;
size_t total_rows = 0;
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
{
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
exception_message = e.message();
for (const auto & column : result_columns)
for (size_t i = 0; i < result_columns.size(); ++i)
{
// We could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = column->size();
if (cur_rows > total_rows)
column->popBack(cur_rows - total_rows);
// We could already push some rows to result_columns before exception, we need to fix it.
result_columns[i]->rollback(*checkpoints[i]);
// All data columns will get default value in case of error.
column->insertDefault();
result_columns[i]->insertDefault();
}
return 1;

View File

@ -920,6 +920,7 @@ class SettingsRandomizer:
"optimize_functions_to_subcolumns": lambda: random.randint(0, 1),
"parallel_replicas_local_plan": lambda: random.randint(0, 1),
"output_format_native_write_json_as_string": lambda: random.randint(0, 1),
"enable_vertical_final": lambda: random.randint(0, 1),
}
@staticmethod

View File

@ -1721,6 +1721,7 @@ def test_upgrade(started_cluster):
files_path,
additional_settings={
"keeper_path": keeper_path,
"after_processing": "keep",
},
)
total_values = generate_random_files(
@ -2099,3 +2100,166 @@ def test_processing_threads(started_cluster):
assert node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Using 16 processing threads"
)
def test_alter_settings(started_cluster):
node1 = started_cluster.instances["node1"]
node2 = started_cluster.instances["node2"]
table_name = f"test_alter_settings_{uuid.uuid4().hex[:8]}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 1000
node1.query("DROP DATABASE IF EXISTS r")
node2.query("DROP DATABASE IF EXISTS r")
node1.query(
f"CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/{table_name}', 'shard1', 'node1')"
)
node2.query(
f"CREATE DATABASE r ENGINE=Replicated('/clickhouse/databases/{table_name}', 'shard1', 'node2')"
)
create_table(
started_cluster,
node1,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"processing_threads_num": 10,
"loading_retries": 20,
},
database_name="r",
)
assert '"processing_threads_num":10' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"loading_retries":20' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert '"after_processing":"keep"' in node1.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node1, f"r.{table_name}", dst_table_name)
create_mv(node2, f"r.{table_name}", dst_table_name)
def get_count():
return int(
node1.query(
f"SELECT count() FROM clusterAllReplicas(cluster, default.{dst_table_name})"
)
)
expected_rows = files_to_generate
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node1.query(
f"""
ALTER TABLE r.{table_name}
MODIFY SETTING processing_threads_num=5,
loading_retries=10,
after_processing='delete',
tracked_files_limit=50,
tracked_file_ttl_sec=10000,
polling_min_timeout_ms=222,
polling_max_timeout_ms=333,
polling_backoff_ms=111
"""
)
int_settings = {
"processing_threads_num": 5,
"loading_retries": 10,
"tracked_files_ttl_sec": 10000,
"tracked_files_limit": 50,
"polling_min_timeout_ms": 222,
"polling_max_timeout_ms": 333,
"polling_backoff_ms": 111,
}
string_settings = {"after_processing": "delete"}
def with_keeper(setting):
return setting in {
"after_processing",
"loading_retries",
"processing_threads_num",
"tracked_files_limit",
"tracked_files_ttl_sec",
}
def check_int_settings(node, settings):
for setting, value in settings.items():
if with_keeper(setting):
assert f'"{setting}":{value}' in node.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
if setting == "tracked_files_ttl_sec":
setting = "tracked_file_ttl_sec"
assert (
str(value)
== node.query(
f"SELECT value FROM system.s3_queue_settings WHERE name = '{setting}' and table = '{table_name}'"
).strip()
)
def check_string_settings(node, settings):
for setting, value in settings.items():
if with_keeper(setting):
assert f'"{setting}":"{value}"' in node.query(
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
)
assert (
str(value)
== node.query(
f"SELECT value FROM system.s3_queue_settings WHERE name = '{setting}' and table = '{table_name}'"
).strip()
)
for node in [node1, node2]:
check_int_settings(node, int_settings)
check_string_settings(node, string_settings)
node.restart_clickhouse()
check_int_settings(node, int_settings)
check_string_settings(node, string_settings)
node1.query(
f"""
ALTER TABLE r.{table_name} RESET SETTING after_processing, tracked_file_ttl_sec
"""
)
int_settings = {
"processing_threads_num": 5,
"loading_retries": 10,
"tracked_files_ttl_sec": 0,
"tracked_files_limit": 50,
}
string_settings = {"after_processing": "keep"}
for node in [node1, node2]:
check_int_settings(node, int_settings)
check_string_settings(node, string_settings)
node.restart_clickhouse()
assert expected_rows == get_count()
check_int_settings(node, int_settings)
check_string_settings(node, string_settings)

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS async_inserts_native;
CREATE TABLE async_inserts_native (m Map(UInt64, UInt64), v UInt64 MATERIALIZED m[4]) ENGINE = Memory;
"
url="${CLICKHOUSE_URL}&async_insert=1&async_insert_busy_timeout_max_ms=1000&async_insert_busy_timeout_min_ms=1000&wait_for_async_insert=1"
# This test runs inserts with memory_tracker_fault_probability > 0 to trigger memory limit during insertion.
# If rollback of columns is wrong in that case it may produce LOGICAL_ERROR and it will caught by termintation of server in debug mode.
for _ in {1..10}; do
${CLICKHOUSE_CLIENT} -q "SELECT (range(number), range(number))::Map(UInt64, UInt64) AS m FROM numbers(1000) FORMAT Native" | \
${CLICKHOUSE_CURL} -sS -X POST "${url}&max_block_size=100&memory_tracker_fault_probability=0.01&query=INSERT+INTO+async_inserts_native+FORMAT+Native" --data-binary @- >/dev/null 2>&1 &
done
wait
${CLICKHOUSE_CLIENT} -q "DROP TABLE async_inserts_native;"

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data
$CLICKHOUSE_LOCAL -q "select tuple(1, x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Tuple(UInt32, IPv6)') settings input_format_allow_errors_num=1"
$CLICKHOUSE_LOCAL -q "select [x'00000000000000000000FFFF00000000', x'00000000000000000000FFFF0000000000'] as x format BSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Array(IPv6)') settings input_format_allow_errors_num=1"
$CLICKHOUSE_LOCAL -q "select map('key1', x'00000000000000000000FFFF00000000', 'key2', x'00000000000000000000FFFF0000000000') as x format BSONEachRow" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "select * from file('$DATA_FILE', BSONEachRow, 'x Map(String, IPv6)') settings input_format_allow_errors_num=1"
rm $DATA_FILE

View File

@ -0,0 +1 @@
128

View File

@ -0,0 +1,2 @@
select JSONExtract('{"a" : 128}', 'a', 'LowCardinality(Nullable(Int128))');

View File

@ -0,0 +1,18 @@
set enable_analyzer=1;
set allow_experimental_json_type=1;
CREATE TABLE t
(
`a` JSON
)
ENGINE = MergeTree()
ORDER BY tuple();
insert into t values ('{"a":1}'), ('{"a":2.0}');
SELECT 1
FROM
(
SELECT 1 AS c0
) AS tx
FULL OUTER JOIN t AS t2 ON equals(t2.a.Float32); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }

View File

@ -0,0 +1,2 @@
UNKNOWN_DATABASE
OK

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
timeout 5 ${CLICKHOUSE_CLIENT_BINARY} --database "nonexistent" 2>&1 | grep -o "UNKNOWN_DATABASE" && echo "OK" || echo "FAIL"