Merge branch 'master' of github.com:ClickHouse/ClickHouse into divanik/add_schema_evolution_concise

This commit is contained in:
divanik 2024-11-14 09:33:23 +00:00
commit 10cb2fa8a9
70 changed files with 1387 additions and 805 deletions

View File

@ -42,7 +42,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
@ -53,6 +52,7 @@ Upcoming meetups
Recently completed meetups
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1

View File

@ -597,6 +597,30 @@ If number of tables is greater than this value, server will throw an exception.
<max_table_num_to_throw>400</max_table_num_to_throw>
```
## max\_replicated\_table\_num\_to\_throw {#max-replicated-table-num-to-throw}
If number of replicated tables is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.
**Example**
```xml
<max_replicated_table_num_to_throw>400</max_replicated_table_num_to_throw>
```
## max\_dictionary\_num\_to\_throw {#max-dictionary-num-to-throw}
If number of dictionaries is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.
**Example**
```xml
<max_dictionary_num_to_throw>400</max_dictionary_num_to_throw>
```
## max\_view\_num\_to\_throw {#max-view-num-to-throw}
If number of views is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.
**Example**
```xml
<max_view_num_to_throw>400</max_view_num_to_throw>
```
## max\_database\_num\_to\_throw {#max-table-num-to-throw}
If number of _database is greater than this value, server will throw an exception. 0 means no limitation.
Default value: 0

View File

@ -12,9 +12,12 @@
#include <Compression/ParallelCompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/getCompressionCodecForFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionFactory.h>
#include <Common/TerminalSize.h>
@ -43,29 +46,24 @@ namespace CurrentMetrics
namespace
{
/// Outputs sizes of uncompressed and compressed blocks for compressed file.
/// Outputs method, sizes of uncompressed and compressed blocks for compressed file.
void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out)
{
while (!in.eof())
{
in.ignore(16); /// checksum
char header[COMPRESSED_BLOCK_HEADER_SIZE];
in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
UInt32 size_compressed = unalignedLoad<UInt32>(&header[1]);
UInt32 size_compressed;
UInt32 size_decompressed;
auto codec = DB::getCompressionCodecForFile(in, size_compressed, size_decompressed, true /* skip_to_next_block */);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw DB::Exception(DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed. Most likely corrupted data.");
UInt32 size_decompressed = unalignedLoad<UInt32>(&header[5]);
DB::writeText(queryToString(codec->getFullCodecDesc()), out);
DB::writeChar('\t', out);
DB::writeText(size_decompressed, out);
DB::writeChar('\t', out);
DB::writeText(size_compressed, out);
DB::writeChar('\n', out);
in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
}

View File

@ -22,6 +22,13 @@ namespace ErrorCodes
namespace
{
/** Due to a lack of proper code review, this code was contributed with a multiplication of template instantiations
* over all pairs of data types, and we deeply regret that.
*
* We cannot remove all combinations, because the binary representation of serialized data has to remain the same,
* but we can partially heal the wound by treating unsigned and signed data types in the same way.
*/
template <typename ValueType, typename TimestampType>
struct AggregationFunctionDeltaSumTimestampData
{
@ -37,23 +44,22 @@ template <typename ValueType, typename TimestampType>
class AggregationFunctionDeltaSumTimestamp final
: public IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
>
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>
{
public:
AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
>{arguments, params, createResultType()}
{}
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>{arguments, params, createResultType()}
{
}
AggregationFunctionDeltaSumTimestamp()
: IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData<ValueType, TimestampType>,
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>
>{}
{}
AggregationFunctionDeltaSumTimestamp<ValueType, TimestampType>>{}
{
}
bool allocatesMemoryInArena() const override { return false; }
@ -63,8 +69,8 @@ public:
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
auto value = assert_cast<const ColumnVector<ValueType> &>(*columns[0]).getData()[row_num];
auto ts = assert_cast<const ColumnVector<TimestampType> &>(*columns[1]).getData()[row_num];
auto value = unalignedLoad<ValueType>(columns[0]->getRawData().data() + row_num * sizeof(ValueType));
auto ts = unalignedLoad<TimestampType>(columns[1]->getRawData().data() + row_num * sizeof(TimestampType));
auto & data = this->data(place);
@ -172,10 +178,48 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
assert_cast<ColumnVector<ValueType> &>(to).getData().push_back(this->data(place).sum);
static_cast<ColumnFixedSizeHelper &>(to).template insertRawData<sizeof(ValueType)>(
reinterpret_cast<const char *>(&this->data(place).sum));
}
};
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
IAggregateFunction * createWithTwoTypesSecond(const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(second_type);
if (which.idx == TypeIndex::UInt32) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
if (which.idx == TypeIndex::UInt64) return new AggregateFunctionTemplate<FirstType, UInt64>(args...);
if (which.idx == TypeIndex::Int32) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
if (which.idx == TypeIndex::Int64) return new AggregateFunctionTemplate<FirstType, UInt64>(args...);
if (which.idx == TypeIndex::Float32) return new AggregateFunctionTemplate<FirstType, Float32>(args...);
if (which.idx == TypeIndex::Float64) return new AggregateFunctionTemplate<FirstType, Float64>(args...);
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(args...);
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
IAggregateFunction * createWithTwoTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(first_type);
if (which.idx == TypeIndex::UInt8) return createWithTwoTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::UInt16) return createWithTwoTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::UInt32) return createWithTwoTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::UInt64) return createWithTwoTypesSecond<UInt64, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int8) return createWithTwoTypesSecond<UInt8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int16) return createWithTwoTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int32) return createWithTwoTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Int64) return createWithTwoTypesSecond<UInt64, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Float32) return createWithTwoTypesSecond<Float32, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Float64) return createWithTwoTypesSecond<Float64, AggregateFunctionTemplate>(second_type, args...);
return nullptr;
}
AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
const String & name,
const DataTypes & arguments,
@ -193,7 +237,7 @@ AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}, "
"must be Int, Float, Date, DateTime", arguments[1]->getName(), name);
return AggregateFunctionPtr(createWithTwoNumericOrDateTypes<AggregationFunctionDeltaSumTimestamp>(
return AggregateFunctionPtr(createWithTwoTypes<AggregationFunctionDeltaSumTimestamp>(
*arguments[0], *arguments[1], arguments, params));
}
}

View File

@ -184,36 +184,8 @@ static IAggregateFunction * createWithDecimalType(const IDataType & argument_typ
}
/** For template with two arguments.
* This is an extremely dangerous for code bloat - do not use.
*/
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(second_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(args...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(args...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(first_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \
return createWithTwoNumericTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8)
return createWithTwoNumericTypesSecond<Int8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Enum16)
return createWithTwoNumericTypesSecond<Int16, AggregateFunctionTemplate>(second_type, args...);
return nullptr;
}
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoBasicNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
{
@ -237,46 +209,6 @@ static IAggregateFunction * createWithTwoBasicNumericTypes(const IDataType & fir
return nullptr;
}
template <typename FirstType, template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericOrDateTypesSecond(const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(second_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<FirstType, TYPE>(args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<FirstType, Int8>(args...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<FirstType, Int16>(args...);
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<FirstType, UInt16>(args...);
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<FirstType, UInt32>(args...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoNumericOrDateTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
{
WhichDataType which(first_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \
return createWithTwoNumericOrDateTypesSecond<TYPE, AggregateFunctionTemplate>(second_type, args...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8)
return createWithTwoNumericOrDateTypesSecond<Int8, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::Enum16)
return createWithTwoNumericOrDateTypesSecond<Int16, AggregateFunctionTemplate>(second_type, args...);
/// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
if (which.idx == TypeIndex::Date)
return createWithTwoNumericOrDateTypesSecond<UInt16, AggregateFunctionTemplate>(second_type, args...);
if (which.idx == TypeIndex::DateTime)
return createWithTwoNumericOrDateTypesSecond<UInt32, AggregateFunctionTemplate>(second_type, args...);
return nullptr;
}
template <template <typename> class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithStringType(const IDataType & argument_type, TArgs && ... args)
{

View File

@ -14,12 +14,12 @@ namespace ErrorCodes
BackupConcurrencyCheck::BackupConcurrencyCheck(
const UUID & backup_or_restore_uuid_,
bool is_restore_,
bool on_cluster_,
const String & zookeeper_path_,
bool allow_concurrency_,
BackupConcurrencyCounters & counters_)
: is_restore(is_restore_), backup_or_restore_uuid(backup_or_restore_uuid_), on_cluster(on_cluster_), counters(counters_)
: is_restore(is_restore_), on_cluster(on_cluster_), zookeeper_path(zookeeper_path_), counters(counters_)
{
std::lock_guard lock{counters.mutex};
@ -32,7 +32,7 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
size_t num_on_cluster_restores = counters.on_cluster_restores.size();
if (on_cluster)
{
if (!counters.on_cluster_restores.contains(backup_or_restore_uuid))
if (!counters.on_cluster_restores.contains(zookeeper_path))
++num_on_cluster_restores;
}
else
@ -47,7 +47,7 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
size_t num_on_cluster_backups = counters.on_cluster_backups.size();
if (on_cluster)
{
if (!counters.on_cluster_backups.contains(backup_or_restore_uuid))
if (!counters.on_cluster_backups.contains(zookeeper_path))
++num_on_cluster_backups;
}
else
@ -64,9 +64,9 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
if (on_cluster)
{
if (is_restore)
++counters.on_cluster_restores[backup_or_restore_uuid];
++counters.on_cluster_restores[zookeeper_path];
else
++counters.on_cluster_backups[backup_or_restore_uuid];
++counters.on_cluster_backups[zookeeper_path];
}
else
{
@ -86,7 +86,7 @@ BackupConcurrencyCheck::~BackupConcurrencyCheck()
{
if (is_restore)
{
auto it = counters.on_cluster_restores.find(backup_or_restore_uuid);
auto it = counters.on_cluster_restores.find(zookeeper_path);
if (it != counters.on_cluster_restores.end())
{
if (!--it->second)
@ -95,7 +95,7 @@ BackupConcurrencyCheck::~BackupConcurrencyCheck()
}
else
{
auto it = counters.on_cluster_backups.find(backup_or_restore_uuid);
auto it = counters.on_cluster_backups.find(zookeeper_path);
if (it != counters.on_cluster_backups.end())
{
if (!--it->second)

View File

@ -1,7 +1,8 @@
#pragma once
#include <Core/UUID.h>
#include <base/defines.h>
#include <base/scope_guard.h>
#include <base/types.h>
#include <mutex>
#include <unordered_map>
@ -19,9 +20,9 @@ public:
/// Checks concurrency of a BACKUP operation or a RESTORE operation.
/// Keep a constructed instance of BackupConcurrencyCheck until the operation is done.
BackupConcurrencyCheck(
const UUID & backup_or_restore_uuid_,
bool is_restore_,
bool on_cluster_,
const String & zookeeper_path_,
bool allow_concurrency_,
BackupConcurrencyCounters & counters_);
@ -31,8 +32,8 @@ public:
private:
const bool is_restore;
const UUID backup_or_restore_uuid;
const bool on_cluster;
const String zookeeper_path;
BackupConcurrencyCounters & counters;
};
@ -47,8 +48,8 @@ private:
friend class BackupConcurrencyCheck;
size_t local_backups TSA_GUARDED_BY(mutex) = 0;
size_t local_restores TSA_GUARDED_BY(mutex) = 0;
std::unordered_map<UUID /* backup_uuid */, size_t /* num_refs */> on_cluster_backups TSA_GUARDED_BY(mutex);
std::unordered_map<UUID /* restore_uuid */, size_t /* num_refs */> on_cluster_restores TSA_GUARDED_BY(mutex);
std::unordered_map<String /* zookeeper_path */, size_t /* num_refs */> on_cluster_backups TSA_GUARDED_BY(mutex);
std::unordered_map<String /* zookeeper_path */, size_t /* num_refs */> on_cluster_restores TSA_GUARDED_BY(mutex);
std::mutex mutex;
};

View File

@ -4,31 +4,29 @@
namespace DB
{
BackupCoordinationCleaner::BackupCoordinationCleaner(const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_)
: zookeeper_path(zookeeper_path_), with_retries(with_retries_), log(log_)
BackupCoordinationCleaner::BackupCoordinationCleaner(bool is_restore_, const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_)
: is_restore(is_restore_), zookeeper_path(zookeeper_path_), with_retries(with_retries_), log(log_)
{
}
void BackupCoordinationCleaner::cleanup()
bool BackupCoordinationCleaner::cleanup(bool throw_if_error)
{
tryRemoveAllNodes(/* throw_if_error = */ true, /* retries_kind = */ WithRetries::kNormal);
WithRetries::Kind retries_kind = throw_if_error ? WithRetries::kNormal : WithRetries::kErrorHandling;
return cleanupImpl(throw_if_error, retries_kind);
}
bool BackupCoordinationCleaner::tryCleanupAfterError() noexcept
{
return tryRemoveAllNodes(/* throw_if_error = */ false, /* retries_kind = */ WithRetries::kNormal);
}
bool BackupCoordinationCleaner::tryRemoveAllNodes(bool throw_if_error, WithRetries::Kind retries_kind)
bool BackupCoordinationCleaner::cleanupImpl(bool throw_if_error, WithRetries::Kind retries_kind)
{
{
std::lock_guard lock{mutex};
if (cleanup_result.succeeded)
return true;
if (cleanup_result.exception)
if (succeeded)
{
if (throw_if_error)
std::rethrow_exception(cleanup_result.exception);
LOG_TRACE(log, "Nodes from ZooKeeper are already removed");
return true;
}
if (tried)
{
LOG_INFO(log, "Skipped removing nodes from ZooKeeper because because earlier we failed to do that");
return false;
}
}
@ -44,16 +42,18 @@ bool BackupCoordinationCleaner::tryRemoveAllNodes(bool throw_if_error, WithRetri
});
std::lock_guard lock{mutex};
cleanup_result.succeeded = true;
tried = true;
succeeded = true;
return true;
}
catch (...)
{
LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this restore: {}",
LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this {}: {}",
is_restore ? "restore" : "backup",
getCurrentExceptionMessage(/* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true));
std::lock_guard lock{mutex};
cleanup_result.exception = std::current_exception();
tried = true;
if (throw_if_error)
throw;

View File

@ -12,14 +12,14 @@ namespace DB
class BackupCoordinationCleaner
{
public:
BackupCoordinationCleaner(const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_);
BackupCoordinationCleaner(bool is_restore_, const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_);
void cleanup();
bool tryCleanupAfterError() noexcept;
bool cleanup(bool throw_if_error);
private:
bool tryRemoveAllNodes(bool throw_if_error, WithRetries::Kind retries_kind);
bool cleanupImpl(bool throw_if_error, WithRetries::Kind retries_kind);
const bool is_restore;
const String zookeeper_path;
/// A reference to a field of the parent object which is either BackupCoordinationOnCluster or RestoreCoordinationOnCluster.
@ -27,13 +27,8 @@ private:
const LoggerPtr log;
struct CleanupResult
{
bool succeeded = false;
std::exception_ptr exception;
};
CleanupResult cleanup_result TSA_GUARDED_BY(mutex);
bool tried TSA_GUARDED_BY(mutex) = false;
bool succeeded TSA_GUARDED_BY(mutex) = false;
std::mutex mutex;
};

View File

@ -11,12 +11,11 @@ namespace DB
{
BackupCoordinationLocal::BackupCoordinationLocal(
const UUID & backup_uuid_,
bool is_plain_backup_,
bool allow_concurrent_backup_,
BackupConcurrencyCounters & concurrency_counters_)
: log(getLogger("BackupCoordinationLocal"))
, concurrency_check(backup_uuid_, /* is_restore = */ false, /* on_cluster = */ false, allow_concurrent_backup_, concurrency_counters_)
, concurrency_check(/* is_restore = */ false, /* on_cluster = */ false, /* zookeeper_path = */ "", allow_concurrent_backup_, concurrency_counters_)
, file_infos(is_plain_backup_)
{
}

View File

@ -23,20 +23,19 @@ class BackupCoordinationLocal : public IBackupCoordination
{
public:
explicit BackupCoordinationLocal(
const UUID & backup_uuid_,
bool is_plain_backup_,
bool allow_concurrent_backup_,
BackupConcurrencyCounters & concurrency_counters_);
~BackupCoordinationLocal() override;
void setBackupQueryIsSentToOtherHosts() override {}
bool isBackupQuerySentToOtherHosts() const override { return false; }
Strings setStage(const String &, const String &, bool) override { return {}; }
void setBackupQueryWasSentToOtherHosts() override {}
bool trySetError(std::exception_ptr) override { return true; }
void finish() override {}
bool tryFinishAfterError() noexcept override { return true; }
void waitForOtherHostsToFinish() override {}
bool tryWaitForOtherHostsToFinishAfterError() noexcept override { return true; }
bool setError(std::exception_ptr, bool) override { return true; }
bool waitOtherHostsFinish(bool) const override { return true; }
bool finish(bool) override { return true; }
bool cleanup(bool) override { return true; }
void addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector<PartNameAndChecksum> & part_names_and_checksums) override;

View File

@ -184,17 +184,21 @@ BackupCoordinationOnCluster::BackupCoordinationOnCluster(
, plain_backup(is_plain_backup_)
, log(getLogger("BackupCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, concurrency_check(backup_uuid_, /* is_restore = */ false, /* on_cluster = */ true, allow_concurrent_backup_, concurrency_counters_)
, stage_sync(/* is_restore = */ false, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_backup_, with_retries, schedule_, process_list_element_, log)
, cleaner(zookeeper_path, with_retries, log)
, cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log)
, stage_sync(/* is_restore = */ false, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_backup_, concurrency_counters_, with_retries, schedule_, process_list_element_, log)
{
try
{
createRootNodes();
}
catch (...)
{
stage_sync.setError(std::current_exception(), /* throw_if_error = */ false);
throw;
}
}
BackupCoordinationOnCluster::~BackupCoordinationOnCluster()
{
tryFinishImpl();
}
BackupCoordinationOnCluster::~BackupCoordinationOnCluster() = default;
void BackupCoordinationOnCluster::createRootNodes()
{
@ -217,69 +221,52 @@ void BackupCoordinationOnCluster::createRootNodes()
});
}
void BackupCoordinationOnCluster::setBackupQueryIsSentToOtherHosts()
{
stage_sync.setQueryIsSentToOtherHosts();
}
bool BackupCoordinationOnCluster::isBackupQuerySentToOtherHosts() const
{
return stage_sync.isQuerySentToOtherHosts();
}
Strings BackupCoordinationOnCluster::setStage(const String & new_stage, const String & message, bool sync)
{
stage_sync.setStage(new_stage, message);
if (!sync)
if (sync)
return stage_sync.waitHostsReachStage(all_hosts_without_initiator, new_stage);
return {};
return stage_sync.waitForHostsToReachStage(new_stage, all_hosts_without_initiator);
}
void BackupCoordinationOnCluster::setBackupQueryWasSentToOtherHosts()
bool BackupCoordinationOnCluster::setError(std::exception_ptr exception, bool throw_if_error)
{
backup_query_was_sent_to_other_hosts = true;
return stage_sync.setError(exception, throw_if_error);
}
bool BackupCoordinationOnCluster::trySetError(std::exception_ptr exception)
bool BackupCoordinationOnCluster::waitOtherHostsFinish(bool throw_if_error) const
{
return stage_sync.trySetError(exception);
return stage_sync.waitOtherHostsFinish(throw_if_error);
}
void BackupCoordinationOnCluster::finish()
bool BackupCoordinationOnCluster::finish(bool throw_if_error)
{
bool other_hosts_also_finished = false;
stage_sync.finish(other_hosts_also_finished);
if ((current_host == kInitiator) && (other_hosts_also_finished || !backup_query_was_sent_to_other_hosts))
cleaner.cleanup();
return stage_sync.finish(throw_if_error);
}
bool BackupCoordinationOnCluster::tryFinishAfterError() noexcept
bool BackupCoordinationOnCluster::cleanup(bool throw_if_error)
{
return tryFinishImpl();
}
bool BackupCoordinationOnCluster::tryFinishImpl() noexcept
{
bool other_hosts_also_finished = false;
if (!stage_sync.tryFinishAfterError(other_hosts_also_finished))
return false;
if ((current_host == kInitiator) && (other_hosts_also_finished || !backup_query_was_sent_to_other_hosts))
/// All the hosts must finish before we remove the coordination nodes.
bool expect_other_hosts_finished = stage_sync.isQuerySentToOtherHosts() || !stage_sync.isErrorSet();
bool all_hosts_finished = stage_sync.finished() && (stage_sync.otherHostsFinished() || !expect_other_hosts_finished);
if (!all_hosts_finished)
{
if (!cleaner.tryCleanupAfterError())
auto unfinished_hosts = expect_other_hosts_finished ? stage_sync.getUnfinishedHosts() : Strings{current_host};
LOG_INFO(log, "Skipping removing nodes from ZooKeeper because hosts {} didn't finish",
BackupCoordinationStageSync::getHostsDesc(unfinished_hosts));
return false;
}
return true;
}
void BackupCoordinationOnCluster::waitForOtherHostsToFinish()
{
if ((current_host != kInitiator) || !backup_query_was_sent_to_other_hosts)
return;
stage_sync.waitForOtherHostsToFinish();
}
bool BackupCoordinationOnCluster::tryWaitForOtherHostsToFinishAfterError() noexcept
{
if (current_host != kInitiator)
return false;
if (!backup_query_was_sent_to_other_hosts)
return true;
return stage_sync.tryWaitForOtherHostsToFinishAfterError();
return cleaner.cleanup(throw_if_error);
}
ZooKeeperRetriesInfo BackupCoordinationOnCluster::getOnClusterInitializationKeeperRetriesInfo() const

View File

@ -1,7 +1,6 @@
#pragma once
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupConcurrencyCheck.h>
#include <Backups/BackupCoordinationCleaner.h>
#include <Backups/BackupCoordinationFileInfos.h>
#include <Backups/BackupCoordinationReplicatedAccess.h>
@ -20,7 +19,7 @@ class BackupCoordinationOnCluster : public IBackupCoordination
{
public:
/// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER query.
static const constexpr std::string_view kInitiator;
static const constexpr std::string_view kInitiator = BackupCoordinationStageSync::kInitiator;
BackupCoordinationOnCluster(
const UUID & backup_uuid_,
@ -37,13 +36,13 @@ public:
~BackupCoordinationOnCluster() override;
void setBackupQueryIsSentToOtherHosts() override;
bool isBackupQuerySentToOtherHosts() const override;
Strings setStage(const String & new_stage, const String & message, bool sync) override;
void setBackupQueryWasSentToOtherHosts() override;
bool trySetError(std::exception_ptr exception) override;
void finish() override;
bool tryFinishAfterError() noexcept override;
void waitForOtherHostsToFinish() override;
bool tryWaitForOtherHostsToFinishAfterError() noexcept override;
bool setError(std::exception_ptr exception, bool throw_if_error) override;
bool waitOtherHostsFinish(bool throw_if_error) const override;
bool finish(bool throw_if_error) override;
bool cleanup(bool throw_if_error) override;
void addReplicatedPartNames(
const String & table_zk_path,
@ -110,11 +109,10 @@ private:
const bool plain_backup;
LoggerPtr const log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;
BackupConcurrencyCheck concurrency_check;
BackupCoordinationStageSync stage_sync;
BackupCoordinationCleaner cleaner;
std::atomic<bool> backup_query_was_sent_to_other_hosts = false;
BackupCoordinationStageSync stage_sync;
mutable std::optional<BackupCoordinationReplicatedTables> replicated_tables TSA_GUARDED_BY(replicated_tables_mutex);
mutable std::optional<BackupCoordinationReplicatedAccess> replicated_access TSA_GUARDED_BY(replicated_access_mutex);

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,9 @@
#pragma once
#include <Backups/BackupConcurrencyCheck.h>
#include <Backups/WithRetries.h>
namespace DB
{
@ -9,12 +11,16 @@ namespace DB
class BackupCoordinationStageSync
{
public:
/// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER or RESTORE ON CLUSTER query.
static const constexpr std::string_view kInitiator;
BackupCoordinationStageSync(
bool is_restore_, /// true if this is a RESTORE ON CLUSTER command, false if this is a BACKUP ON CLUSTER command
const String & zookeeper_path_, /// path to the "stage" folder in ZooKeeper
const String & current_host_, /// the current host, or an empty string if it's the initiator of the BACKUP/RESTORE ON CLUSTER command
const Strings & all_hosts_, /// all the hosts (including the initiator and the current host) performing the BACKUP/RESTORE ON CLUSTER command
bool allow_concurrency_, /// whether it's allowed to have concurrent backups or restores.
BackupConcurrencyCounters & concurrency_counters_,
const WithRetries & with_retries_,
ThreadPoolCallbackRunnerUnsafe<void> schedule_,
QueryStatusPtr process_list_element_,
@ -22,30 +28,37 @@ public:
~BackupCoordinationStageSync();
/// Sets that the BACKUP or RESTORE query was sent to other hosts.
void setQueryIsSentToOtherHosts();
bool isQuerySentToOtherHosts() const;
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
void setStage(const String & stage, const String & stage_result = {});
/// Waits until all the specified hosts come to the specified stage.
/// The function returns the results which specified hosts set when they came to the required stage.
/// If it doesn't happen before the timeout then the function will stop waiting and throw an exception.
Strings waitForHostsToReachStage(const String & stage_to_wait, const Strings & hosts, std::optional<std::chrono::milliseconds> timeout = {}) const;
/// Waits until all the other hosts finish their work.
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
void waitForOtherHostsToFinish() const;
/// Lets other host know that the current host has finished its work.
void finish(bool & other_hosts_also_finished);
/// Waits until specified hosts come to the specified stage.
/// The function returns the results which the specified hosts set when they came to the required stage.
Strings waitHostsReachStage(const Strings & hosts, const String & stage_to_wait) const;
/// Lets other hosts know that the current host has encountered an error.
bool trySetError(std::exception_ptr exception) noexcept;
/// The function returns true if it successfully created the error node or if the error node was found already exist.
bool setError(std::exception_ptr exception, bool throw_if_error);
bool isErrorSet() const;
/// Waits until all the other hosts finish their work (as a part of error-handling process).
/// Doesn't stops waiting if some host encounters an error or gets cancelled.
bool tryWaitForOtherHostsToFinishAfterError() const noexcept;
/// Waits until the hosts other than the current host finish their work. Must be called before finish().
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
bool waitOtherHostsFinish(bool throw_if_error) const;
bool otherHostsFinished() const;
/// Lets other host know that the current host has finished its work (as a part of error-handling process).
bool tryFinishAfterError(bool & other_hosts_also_finished) noexcept;
/// Lets other hosts know that the current host has finished its work.
bool finish(bool throw_if_error);
bool finished() const;
/// Returns true if all the hosts have finished.
bool allHostsFinished() const { return finished() && otherHostsFinished(); }
/// Returns a list of the hosts which haven't finished yet.
Strings getUnfinishedHosts() const;
Strings getUnfinishedOtherHosts() const;
/// Returns a printable name of a specific host. For empty host the function returns "initiator".
static String getHostDesc(const String & host);
@ -78,14 +91,17 @@ private:
/// Reads the current state from ZooKeeper without throwing exceptions.
void readCurrentState(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
/// Creates a stage node to let other hosts know we've reached the specified stage.
void createStageNode(const String & stage, const String & stage_result, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
String getStageNodePath(const String & stage) const;
/// Lets other hosts know that the current host has encountered an error.
bool trySetError(const Exception & exception);
void setError(const Exception & exception);
bool setError(const Exception & exception, bool throw_if_error);
void createErrorNode(const Exception & exception, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
/// Deserializes an error stored in the error node.
static std::pair<std::exception_ptr, String> parseErrorNode(const String & error_node_contents);
std::pair<std::exception_ptr, String> parseErrorNode(const String & error_node_contents) const;
/// Reset the `connected` flag for each host.
void resetConnectedFlag();
@ -102,19 +118,27 @@ private:
void cancelQueryIfDisconnectedTooLong();
/// Used by waitForHostsToReachStage() to check if everything is ready to return.
bool checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, bool time_is_out, std::optional<std::chrono::milliseconds> timeout, Strings & results) const TSA_REQUIRES(mutex);
bool checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, Strings & results) const TSA_REQUIRES(mutex);
/// Creates the 'finish' node.
bool tryFinishImpl();
bool tryFinishImpl(bool & other_hosts_also_finished, bool throw_if_error, WithRetries::Kind retries_kind);
void createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
bool finishImpl(bool throw_if_error, WithRetries::Kind retries_kind);
void createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper, bool throw_if_error);
/// Returns the version used by the initiator.
int getInitiatorVersion() const;
/// Waits until all the other hosts finish their work.
bool tryWaitForOtherHostsToFinishImpl(const String & reason, bool throw_if_error, std::optional<std::chrono::seconds> timeout) const;
bool checkIfOtherHostsFinish(const String & reason, bool throw_if_error, bool time_is_out, std::optional<std::chrono::milliseconds> timeout) const TSA_REQUIRES(mutex);
bool waitOtherHostsFinishImpl(const String & reason, std::optional<std::chrono::seconds> timeout, bool throw_if_error) const;
bool checkIfOtherHostsFinish(const String & reason, std::optional<std::chrono::milliseconds> timeout, bool time_is_out, bool & result, bool throw_if_error) const TSA_REQUIRES(mutex);
/// Returns true if all the hosts have finished.
bool allHostsFinishedNoLock() const TSA_REQUIRES(mutex);
bool finishedNoLock() const TSA_REQUIRES(mutex);
bool otherHostsFinishedNoLock() const TSA_REQUIRES(mutex);
/// Returns a list of the hosts which haven't finished yet.
Strings getUnfinishedHostsNoLock() const TSA_REQUIRES(mutex);
Strings getUnfinishedOtherHostsNoLock() const TSA_REQUIRES(mutex);
const bool is_restore;
const String operation_name;
@ -138,15 +162,16 @@ private:
/// Paths in ZooKeeper.
const std::filesystem::path zookeeper_path;
const String root_zookeeper_path;
const String operation_node_path;
const String operation_zookeeper_path;
const String operation_node_name;
const String stage_node_path;
const String start_node_path;
const String finish_node_path;
const String num_hosts_node_path;
const String error_node_path;
const String alive_node_path;
const String alive_tracker_node_path;
const String error_node_path;
std::optional<BackupConcurrencyCheck> concurrency_check;
std::shared_ptr<Poco::Event> zk_nodes_changed;
@ -176,25 +201,21 @@ private:
{
std::map<String /* host */, HostInfo> hosts; /// std::map because we need to compare states
std::optional<String> host_with_error;
bool cancelled = false;
bool operator ==(const State & other) const;
bool operator !=(const State & other) const;
void merge(const State & other);
};
State state TSA_GUARDED_BY(mutex);
mutable std::condition_variable state_changed;
std::future<void> watching_thread_future;
std::atomic<bool> should_stop_watching_thread = false;
bool should_stop_watching_thread TSA_GUARDED_BY(mutex) = false;
struct FinishResult
{
bool succeeded = false;
std::exception_ptr exception;
bool other_hosts_also_finished = false;
};
FinishResult finish_result TSA_GUARDED_BY(mutex);
bool query_is_sent_to_other_hosts TSA_GUARDED_BY(mutex) = false;
bool tried_to_finish TSA_GUARDED_BY(mutex) = false;
bool tried_to_set_error TSA_GUARDED_BY(mutex) = false;
mutable std::mutex mutex;
};

View File

@ -329,6 +329,7 @@ std::pair<OperationID, BackupStatus> BackupsWorker::start(const ASTPtr & backup_
struct BackupsWorker::BackupStarter
{
BackupsWorker & backups_worker;
LoggerPtr log;
std::shared_ptr<ASTBackupQuery> backup_query;
ContextPtr query_context; /// We have to keep `query_context` until the end of the operation because a pointer to it is stored inside the ThreadGroup we're using.
ContextMutablePtr backup_context;
@ -345,6 +346,7 @@ struct BackupsWorker::BackupStarter
BackupStarter(BackupsWorker & backups_worker_, const ASTPtr & query_, const ContextPtr & context_)
: backups_worker(backups_worker_)
, log(backups_worker.log)
, backup_query(std::static_pointer_cast<ASTBackupQuery>(query_->clone()))
, query_context(context_)
, backup_context(Context::createCopy(query_context))
@ -399,9 +401,20 @@ struct BackupsWorker::BackupStarter
chassert(!backup);
backup = backups_worker.openBackupForWriting(backup_info, backup_settings, backup_coordination, backup_context);
backups_worker.doBackup(
backup, backup_query, backup_id, backup_name_for_logging, backup_settings, backup_coordination, backup_context,
backups_worker.doBackup(backup, backup_query, backup_id, backup_settings, backup_coordination, backup_context,
on_cluster, cluster);
backup_coordination->finish(/* throw_if_error = */ true);
backup.reset();
/// The backup coordination is not needed anymore.
if (!is_internal_backup)
backup_coordination->cleanup(/* throw_if_error = */ true);
backup_coordination.reset();
/// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
LOG_INFO(log, "{} {} was created successfully", (is_internal_backup ? "Internal backup" : "Backup"), backup_name_for_logging);
backups_worker.setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
void onException()
@ -416,15 +429,28 @@ struct BackupsWorker::BackupStarter
if (backup && !backup->setIsCorrupted())
should_remove_files_in_backup = false;
if (backup_coordination && backup_coordination->trySetError(std::current_exception()))
{
bool other_hosts_finished = backup_coordination->tryWaitForOtherHostsToFinishAfterError();
bool all_hosts_finished = false;
if (should_remove_files_in_backup && other_hosts_finished)
if (backup_coordination && backup_coordination->setError(std::current_exception(), /* throw_if_error = */ false))
{
bool other_hosts_finished = !is_internal_backup
&& (!backup_coordination->isBackupQuerySentToOtherHosts() || backup_coordination->waitOtherHostsFinish(/* throw_if_error = */ false));
all_hosts_finished = backup_coordination->finish(/* throw_if_error = */ false) && other_hosts_finished;
}
if (!all_hosts_finished)
should_remove_files_in_backup = false;
if (backup && should_remove_files_in_backup)
backup->tryRemoveAllFiles();
backup_coordination->tryFinishAfterError();
}
backup.reset();
if (backup_coordination && all_hosts_finished)
backup_coordination->cleanup(/* throw_if_error = */ false);
backup_coordination.reset();
backups_worker.setStatusSafe(backup_id, getBackupStatusFromCurrentException());
}
@ -497,7 +523,6 @@ void BackupsWorker::doBackup(
BackupMutablePtr backup,
const std::shared_ptr<ASTBackupQuery> & backup_query,
const OperationID & backup_id,
const String & backup_name_for_logging,
const BackupSettings & backup_settings,
std::shared_ptr<IBackupCoordination> backup_coordination,
ContextMutablePtr context,
@ -521,10 +546,10 @@ void BackupsWorker::doBackup(
backup_settings.copySettingsToQuery(*backup_query);
sendQueryToOtherHosts(*backup_query, cluster, backup_settings.shard_num, backup_settings.replica_num,
context, required_access, backup_coordination->getOnClusterInitializationKeeperRetriesInfo());
backup_coordination->setBackupQueryWasSentToOtherHosts();
backup_coordination->setBackupQueryIsSentToOtherHosts();
/// Wait until all the hosts have written their backup entries.
backup_coordination->waitForOtherHostsToFinish();
backup_coordination->waitOtherHostsFinish(/* throw_if_error = */ true);
}
else
{
@ -569,18 +594,8 @@ void BackupsWorker::doBackup(
compressed_size = backup->getCompressedSize();
}
/// Close the backup.
backup.reset();
/// The backup coordination is not needed anymore.
backup_coordination->finish();
/// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there.
setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0);
/// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
LOG_INFO(log, "{} {} was created successfully", (is_internal_backup ? "Internal backup" : "Backup"), backup_name_for_logging);
setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
@ -687,6 +702,7 @@ void BackupsWorker::writeBackupEntries(
struct BackupsWorker::RestoreStarter
{
BackupsWorker & backups_worker;
LoggerPtr log;
std::shared_ptr<ASTBackupQuery> restore_query;
ContextPtr query_context; /// We have to keep `query_context` until the end of the operation because a pointer to it is stored inside the ThreadGroup we're using.
ContextMutablePtr restore_context;
@ -702,6 +718,7 @@ struct BackupsWorker::RestoreStarter
RestoreStarter(BackupsWorker & backups_worker_, const ASTPtr & query_, const ContextPtr & context_)
: backups_worker(backups_worker_)
, log(backups_worker.log)
, restore_query(std::static_pointer_cast<ASTBackupQuery>(query_->clone()))
, query_context(context_)
, restore_context(Context::createCopy(query_context))
@ -753,16 +770,17 @@ struct BackupsWorker::RestoreStarter
}
restore_coordination = backups_worker.makeRestoreCoordination(on_cluster, restore_settings, restore_context);
backups_worker.doRestore(
restore_query,
restore_id,
backup_name_for_logging,
backup_info,
restore_settings,
restore_coordination,
restore_context,
on_cluster,
cluster);
backups_worker.doRestore(restore_query, restore_id, backup_info, restore_settings, restore_coordination, restore_context,
on_cluster, cluster);
/// The restore coordination is not needed anymore.
restore_coordination->finish(/* throw_if_error = */ true);
if (!is_internal_restore)
restore_coordination->cleanup(/* throw_if_error = */ true);
restore_coordination.reset();
LOG_INFO(log, "Restored from {} {} successfully", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging);
backups_worker.setStatus(restore_id, BackupStatus::RESTORED);
}
void onException()
@ -770,12 +788,16 @@ struct BackupsWorker::RestoreStarter
/// Something bad happened, some data were not restored.
tryLogCurrentException(backups_worker.log, fmt::format("Failed to restore from {} {}", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging));
if (restore_coordination && restore_coordination->trySetError(std::current_exception()))
if (restore_coordination && restore_coordination->setError(std::current_exception(), /* throw_if_error = */ false))
{
restore_coordination->tryWaitForOtherHostsToFinishAfterError();
restore_coordination->tryFinishAfterError();
bool other_hosts_finished = !is_internal_restore
&& (!restore_coordination->isRestoreQuerySentToOtherHosts() || restore_coordination->waitOtherHostsFinish(/* throw_if_error = */ false));
if (restore_coordination->finish(/* throw_if_error = */ false) && other_hosts_finished)
restore_coordination->cleanup(/* throw_if_error = */ false);
}
restore_coordination.reset();
backups_worker.setStatusSafe(restore_id, getRestoreStatusFromCurrentException());
}
};
@ -838,7 +860,6 @@ BackupPtr BackupsWorker::openBackupForReading(const BackupInfo & backup_info, co
void BackupsWorker::doRestore(
const std::shared_ptr<ASTBackupQuery> & restore_query,
const OperationID & restore_id,
const String & backup_name_for_logging,
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr<IRestoreCoordination> restore_coordination,
@ -882,10 +903,10 @@ void BackupsWorker::doRestore(
restore_settings.copySettingsToQuery(*restore_query);
sendQueryToOtherHosts(*restore_query, cluster, restore_settings.shard_num, restore_settings.replica_num,
context, {}, restore_coordination->getOnClusterInitializationKeeperRetriesInfo());
restore_coordination->setRestoreQueryWasSentToOtherHosts();
restore_coordination->setRestoreQueryIsSentToOtherHosts();
/// Wait until all the hosts have done with their restoring work.
restore_coordination->waitForOtherHostsToFinish();
restore_coordination->waitOtherHostsFinish(/* throw_if_error = */ true);
}
else
{
@ -905,12 +926,6 @@ void BackupsWorker::doRestore(
backup, context, getThreadPool(ThreadPoolId::RESTORE), after_task_callback};
restorer.run(RestorerFromBackup::RESTORE);
}
/// The restore coordination is not needed anymore.
restore_coordination->finish();
LOG_INFO(log, "Restored from {} {} successfully", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging);
setStatus(restore_id, BackupStatus::RESTORED);
}
@ -943,7 +958,7 @@ BackupsWorker::makeBackupCoordination(bool on_cluster, const BackupSettings & ba
if (!on_cluster)
{
return std::make_shared<BackupCoordinationLocal>(
*backup_settings.backup_uuid, !backup_settings.deduplicate_files, allow_concurrent_backups, *concurrency_counters);
!backup_settings.deduplicate_files, allow_concurrent_backups, *concurrency_counters);
}
bool is_internal_backup = backup_settings.internal;
@ -981,8 +996,7 @@ BackupsWorker::makeRestoreCoordination(bool on_cluster, const RestoreSettings &
{
if (!on_cluster)
{
return std::make_shared<RestoreCoordinationLocal>(
*restore_settings.restore_uuid, allow_concurrent_restores, *concurrency_counters);
return std::make_shared<RestoreCoordinationLocal>(allow_concurrent_restores, *concurrency_counters);
}
bool is_internal_restore = restore_settings.internal;

View File

@ -81,7 +81,6 @@ private:
BackupMutablePtr backup,
const std::shared_ptr<ASTBackupQuery> & backup_query,
const BackupOperationID & backup_id,
const String & backup_name_for_logging,
const BackupSettings & backup_settings,
std::shared_ptr<IBackupCoordination> backup_coordination,
ContextMutablePtr context,
@ -102,7 +101,6 @@ private:
void doRestore(
const std::shared_ptr<ASTBackupQuery> & restore_query,
const BackupOperationID & restore_id,
const String & backup_name_for_logging,
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr<IRestoreCoordination> restore_coordination,

View File

@ -20,29 +20,27 @@ class IBackupCoordination
public:
virtual ~IBackupCoordination() = default;
/// Sets that the backup query was sent to other hosts.
/// Function waitOtherHostsFinish() will check that to find out if it should really wait or not.
virtual void setBackupQueryIsSentToOtherHosts() = 0;
virtual bool isBackupQuerySentToOtherHosts() const = 0;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual Strings setStage(const String & new_stage, const String & message, bool sync) = 0;
/// Sets that the backup query was sent to other hosts.
/// Function waitForOtherHostsToFinish() will check that to find out if it should really wait or not.
virtual void setBackupQueryWasSentToOtherHosts() = 0;
/// Lets other hosts know that the current host has encountered an error.
virtual bool trySetError(std::exception_ptr exception) = 0;
/// Lets other hosts know that the current host has finished its work.
virtual void finish() = 0;
/// Lets other hosts know that the current host has finished its work (as a part of error-handling process).
virtual bool tryFinishAfterError() noexcept = 0;
/// Returns true if the information is successfully passed so other hosts can read it.
virtual bool setError(std::exception_ptr exception, bool throw_if_error) = 0;
/// Waits until all the other hosts finish their work.
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
virtual void waitForOtherHostsToFinish() = 0;
virtual bool waitOtherHostsFinish(bool throw_if_error) const = 0;
/// Waits until all the other hosts finish their work (as a part of error-handling process).
/// Doesn't stops waiting if some host encounters an error or gets cancelled.
virtual bool tryWaitForOtherHostsToFinishAfterError() noexcept = 0;
/// Lets other hosts know that the current host has finished its work.
virtual bool finish(bool throw_if_error) = 0;
/// Removes temporary nodes in ZooKeeper.
virtual bool cleanup(bool throw_if_error) = 0;
struct PartNameAndChecksum
{

View File

@ -18,29 +18,27 @@ class IRestoreCoordination
public:
virtual ~IRestoreCoordination() = default;
/// Sets that the restore query was sent to other hosts.
/// Function waitOtherHostsFinish() will check that to find out if it should really wait or not.
virtual void setRestoreQueryIsSentToOtherHosts() = 0;
virtual bool isRestoreQuerySentToOtherHosts() const = 0;
/// Sets the current stage and waits for other hosts to come to this stage too.
virtual Strings setStage(const String & new_stage, const String & message, bool sync) = 0;
/// Sets that the restore query was sent to other hosts.
/// Function waitForOtherHostsToFinish() will check that to find out if it should really wait or not.
virtual void setRestoreQueryWasSentToOtherHosts() = 0;
/// Lets other hosts know that the current host has encountered an error.
virtual bool trySetError(std::exception_ptr exception) = 0;
/// Lets other hosts know that the current host has finished its work.
virtual void finish() = 0;
/// Lets other hosts know that the current host has finished its work (as a part of error-handling process).
virtual bool tryFinishAfterError() noexcept = 0;
/// Returns true if the information is successfully passed so other hosts can read it.
virtual bool setError(std::exception_ptr exception, bool throw_if_error) = 0;
/// Waits until all the other hosts finish their work.
/// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
virtual void waitForOtherHostsToFinish() = 0;
virtual bool waitOtherHostsFinish(bool throw_if_error) const = 0;
/// Waits until all the other hosts finish their work (as a part of error-handling process).
/// Doesn't stops waiting if some host encounters an error or gets cancelled.
virtual bool tryWaitForOtherHostsToFinishAfterError() noexcept = 0;
/// Lets other hosts know that the current host has finished its work.
virtual bool finish(bool throw_if_error) = 0;
/// Removes temporary nodes in ZooKeeper.
virtual bool cleanup(bool throw_if_error) = 0;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
virtual bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) = 0;

View File

@ -10,9 +10,9 @@ namespace DB
{
RestoreCoordinationLocal::RestoreCoordinationLocal(
const UUID & restore_uuid, bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_)
bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_)
: log(getLogger("RestoreCoordinationLocal"))
, concurrency_check(restore_uuid, /* is_restore = */ true, /* on_cluster = */ false, allow_concurrent_restore_, concurrency_counters_)
, concurrency_check(/* is_restore = */ true, /* on_cluster = */ false, /* zookeeper_path = */ "", allow_concurrent_restore_, concurrency_counters_)
{
}

View File

@ -17,16 +17,16 @@ class ASTCreateQuery;
class RestoreCoordinationLocal : public IRestoreCoordination
{
public:
RestoreCoordinationLocal(const UUID & restore_uuid_, bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_);
RestoreCoordinationLocal(bool allow_concurrent_restore_, BackupConcurrencyCounters & concurrency_counters_);
~RestoreCoordinationLocal() override;
void setRestoreQueryIsSentToOtherHosts() override {}
bool isRestoreQuerySentToOtherHosts() const override { return false; }
Strings setStage(const String &, const String &, bool) override { return {}; }
void setRestoreQueryWasSentToOtherHosts() override {}
bool trySetError(std::exception_ptr) override { return true; }
void finish() override {}
bool tryFinishAfterError() noexcept override { return true; }
void waitForOtherHostsToFinish() override {}
bool tryWaitForOtherHostsToFinishAfterError() noexcept override { return true; }
bool setError(std::exception_ptr, bool) override { return true; }
bool waitOtherHostsFinish(bool) const override { return true; }
bool finish(bool) override { return true; }
bool cleanup(bool) override { return true; }
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;

View File

@ -35,17 +35,21 @@ RestoreCoordinationOnCluster::RestoreCoordinationOnCluster(
, current_host_index(BackupCoordinationOnCluster::findCurrentHostIndex(current_host, all_hosts))
, log(getLogger("RestoreCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
, concurrency_check(restore_uuid_, /* is_restore = */ true, /* on_cluster = */ true, allow_concurrent_restore_, concurrency_counters_)
, stage_sync(/* is_restore = */ true, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_restore_, with_retries, schedule_, process_list_element_, log)
, cleaner(zookeeper_path, with_retries, log)
, cleaner(/* is_restore = */ true, zookeeper_path, with_retries, log)
, stage_sync(/* is_restore = */ true, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_restore_, concurrency_counters_, with_retries, schedule_, process_list_element_, log)
{
try
{
createRootNodes();
}
catch (...)
{
stage_sync.setError(std::current_exception(), /* throw_if_error = */ false);
throw;
}
}
RestoreCoordinationOnCluster::~RestoreCoordinationOnCluster()
{
tryFinishImpl();
}
RestoreCoordinationOnCluster::~RestoreCoordinationOnCluster() = default;
void RestoreCoordinationOnCluster::createRootNodes()
{
@ -66,69 +70,52 @@ void RestoreCoordinationOnCluster::createRootNodes()
});
}
void RestoreCoordinationOnCluster::setRestoreQueryIsSentToOtherHosts()
{
stage_sync.setQueryIsSentToOtherHosts();
}
bool RestoreCoordinationOnCluster::isRestoreQuerySentToOtherHosts() const
{
return stage_sync.isQuerySentToOtherHosts();
}
Strings RestoreCoordinationOnCluster::setStage(const String & new_stage, const String & message, bool sync)
{
stage_sync.setStage(new_stage, message);
if (!sync)
if (sync)
return stage_sync.waitHostsReachStage(all_hosts_without_initiator, new_stage);
return {};
return stage_sync.waitForHostsToReachStage(new_stage, all_hosts_without_initiator);
}
void RestoreCoordinationOnCluster::setRestoreQueryWasSentToOtherHosts()
bool RestoreCoordinationOnCluster::setError(std::exception_ptr exception, bool throw_if_error)
{
restore_query_was_sent_to_other_hosts = true;
return stage_sync.setError(exception, throw_if_error);
}
bool RestoreCoordinationOnCluster::trySetError(std::exception_ptr exception)
bool RestoreCoordinationOnCluster::waitOtherHostsFinish(bool throw_if_error) const
{
return stage_sync.trySetError(exception);
return stage_sync.waitOtherHostsFinish(throw_if_error);
}
void RestoreCoordinationOnCluster::finish()
bool RestoreCoordinationOnCluster::finish(bool throw_if_error)
{
bool other_hosts_also_finished = false;
stage_sync.finish(other_hosts_also_finished);
if ((current_host == kInitiator) && (other_hosts_also_finished || !restore_query_was_sent_to_other_hosts))
cleaner.cleanup();
return stage_sync.finish(throw_if_error);
}
bool RestoreCoordinationOnCluster::tryFinishAfterError() noexcept
bool RestoreCoordinationOnCluster::cleanup(bool throw_if_error)
{
return tryFinishImpl();
}
bool RestoreCoordinationOnCluster::tryFinishImpl() noexcept
{
bool other_hosts_also_finished = false;
if (!stage_sync.tryFinishAfterError(other_hosts_also_finished))
return false;
if ((current_host == kInitiator) && (other_hosts_also_finished || !restore_query_was_sent_to_other_hosts))
/// All the hosts must finish before we remove the coordination nodes.
bool expect_other_hosts_finished = stage_sync.isQuerySentToOtherHosts() || !stage_sync.isErrorSet();
bool all_hosts_finished = stage_sync.finished() && (stage_sync.otherHostsFinished() || !expect_other_hosts_finished);
if (!all_hosts_finished)
{
if (!cleaner.tryCleanupAfterError())
auto unfinished_hosts = expect_other_hosts_finished ? stage_sync.getUnfinishedHosts() : Strings{current_host};
LOG_INFO(log, "Skipping removing nodes from ZooKeeper because hosts {} didn't finish",
BackupCoordinationStageSync::getHostsDesc(unfinished_hosts));
return false;
}
return true;
}
void RestoreCoordinationOnCluster::waitForOtherHostsToFinish()
{
if ((current_host != kInitiator) || !restore_query_was_sent_to_other_hosts)
return;
stage_sync.waitForOtherHostsToFinish();
}
bool RestoreCoordinationOnCluster::tryWaitForOtherHostsToFinishAfterError() noexcept
{
if (current_host != kInitiator)
return false;
if (!restore_query_was_sent_to_other_hosts)
return true;
return stage_sync.tryWaitForOtherHostsToFinishAfterError();
return cleaner.cleanup(throw_if_error);
}
ZooKeeperRetriesInfo RestoreCoordinationOnCluster::getOnClusterInitializationKeeperRetriesInfo() const

View File

@ -1,7 +1,6 @@
#pragma once
#include <Backups/IRestoreCoordination.h>
#include <Backups/BackupConcurrencyCheck.h>
#include <Backups/BackupCoordinationCleaner.h>
#include <Backups/BackupCoordinationStageSync.h>
#include <Backups/WithRetries.h>
@ -15,7 +14,7 @@ class RestoreCoordinationOnCluster : public IRestoreCoordination
{
public:
/// Empty string as the current host is used to mark the initiator of a RESTORE ON CLUSTER query.
static const constexpr std::string_view kInitiator;
static const constexpr std::string_view kInitiator = BackupCoordinationStageSync::kInitiator;
RestoreCoordinationOnCluster(
const UUID & restore_uuid_,
@ -31,13 +30,13 @@ public:
~RestoreCoordinationOnCluster() override;
void setRestoreQueryIsSentToOtherHosts() override;
bool isRestoreQuerySentToOtherHosts() const override;
Strings setStage(const String & new_stage, const String & message, bool sync) override;
void setRestoreQueryWasSentToOtherHosts() override;
bool trySetError(std::exception_ptr exception) override;
void finish() override;
bool tryFinishAfterError() noexcept override;
void waitForOtherHostsToFinish() override;
bool tryWaitForOtherHostsToFinishAfterError() noexcept override;
bool setError(std::exception_ptr exception, bool throw_if_error) override;
bool waitOtherHostsFinish(bool throw_if_error) const override;
bool finish(bool throw_if_error) override;
bool cleanup(bool throw_if_error) override;
/// Starts creating a table in a replicated database. Returns false if there is another host which is already creating this table.
bool acquireCreatingTableInReplicatedDatabase(const String & database_zk_path, const String & table_name) override;
@ -78,11 +77,10 @@ private:
const size_t current_host_index;
LoggerPtr const log;
/// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;
BackupConcurrencyCheck concurrency_check;
BackupCoordinationStageSync stage_sync;
BackupCoordinationCleaner cleaner;
std::atomic<bool> restore_query_was_sent_to_other_hosts = false;
BackupCoordinationStageSync stage_sync;
};
}

View File

@ -255,6 +255,7 @@
M(PartsActive, "Active data part, used by current and upcoming SELECTs.") \
M(AttachedDatabase, "Active databases.") \
M(AttachedTable, "Active tables.") \
M(AttachedReplicatedTable, "Active replicated tables.") \
M(AttachedView, "Active views.") \
M(AttachedDictionary, "Active dictionaries.") \
M(PartsOutdated, "Not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes.") \

View File

@ -10,33 +10,50 @@
namespace DB
{
using Checksum = CityHash_v1_0_2::uint128;
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path)
CompressionCodecPtr
getCompressionCodecForFile(ReadBuffer & read_buffer, UInt32 & size_compressed, UInt32 & size_decompressed, bool skip_to_next_block)
{
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt);
read_buffer->ignore(sizeof(Checksum));
read_buffer.ignore(sizeof(Checksum));
UInt8 header_size = ICompressionCodec::getHeaderSize();
size_t starting_bytes = read_buffer.count();
PODArray<char> compressed_buffer;
compressed_buffer.resize(header_size);
read_buffer->readStrict(compressed_buffer.data(), header_size);
read_buffer.readStrict(compressed_buffer.data(), header_size);
uint8_t method = ICompressionCodec::readMethod(compressed_buffer.data());
size_compressed = unalignedLoad<UInt32>(&compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&compressed_buffer[5]);
if (method == static_cast<uint8_t>(CompressionMethodByte::Multiple))
{
compressed_buffer.resize(1);
read_buffer->readStrict(compressed_buffer.data(), 1);
read_buffer.readStrict(compressed_buffer.data(), 1);
compressed_buffer.resize(1 + compressed_buffer[0]);
read_buffer->readStrict(compressed_buffer.data() + 1, compressed_buffer[0]);
read_buffer.readStrict(compressed_buffer.data() + 1, compressed_buffer[0]);
auto codecs_bytes = CompressionCodecMultiple::getCodecsBytesFromData(compressed_buffer.data());
Codecs codecs;
for (auto byte : codecs_bytes)
codecs.push_back(CompressionCodecFactory::instance().get(byte));
if (skip_to_next_block)
read_buffer.ignore(size_compressed - (read_buffer.count() - starting_bytes));
return std::make_shared<CompressionCodecMultiple>(codecs);
}
if (skip_to_next_block)
read_buffer.ignore(size_compressed - (read_buffer.count() - starting_bytes));
return CompressionCodecFactory::instance().get(method);
}
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path)
{
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt);
UInt32 size_compressed;
UInt32 size_decompressed;
return getCompressionCodecForFile(*read_buffer, size_compressed, size_decompressed, false);
}
}

View File

@ -13,4 +13,8 @@ namespace DB
/// from metadata.
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path);
/// Same as above which is used by clickhouse-compressor to print compression statistics of each data block.
CompressionCodecPtr
getCompressionCodecForFile(ReadBuffer & read_buffer, UInt32 & size_compressed, UInt32 & size_decompressed, bool skip_to_next_block);
}

View File

@ -131,6 +131,9 @@ namespace DB
DECLARE(UInt64, max_database_num_to_warn, 1000lu, "If the number of databases is greater than this value, the server will create a warning that will displayed to user.", 0) \
DECLARE(UInt64, max_part_num_to_warn, 100000lu, "If the number of parts is greater than this value, the server will create a warning that will displayed to user.", 0) \
DECLARE(UInt64, max_table_num_to_throw, 0lu, "If number of tables is greater than this value, server will throw an exception. 0 means no limitation. View, remote tables, dictionary, system tables are not counted. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.", 0) \
DECLARE(UInt64, max_replicated_table_num_to_throw, 0lu, "If number of replicated tables is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.", 0) \
DECLARE(UInt64, max_dictionary_num_to_throw, 0lu, "If number of dictionaries is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.", 0) \
DECLARE(UInt64, max_view_num_to_throw, 0lu, "If number of views is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.", 0) \
DECLARE(UInt64, max_database_num_to_throw, 0lu, "If number of databases is greater than this value, server will throw an exception. 0 means no limitation.", 0) \
DECLARE(UInt64, max_authentication_methods_per_user, 100, "The maximum number of authentication methods a user can be created with or altered. Changing this setting does not affect existing users. Zero means unlimited", 0) \
DECLARE(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means unlimited.", 0) \

View File

@ -3669,6 +3669,11 @@ Given that, for example, dictionaries, can be out of sync across nodes, mutation
</profiles>
```
)", 0) \
DECLARE(Bool, validate_mutation_query, true, R"(
Validate mutation queries before accepting them. Mutations are executed in the background, and running an invalid query will cause mutations to get stuck, requiring manual intervention.
Only change this setting if you encounter a backward-incompatible bug.
)", 0) \
DECLARE(Seconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, R"(
Defines how many seconds a locking request waits before failing.

View File

@ -64,6 +64,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
},
{"24.11",
{
{"validate_mutation_query", false, true, "New setting to validate mutation queries by default."},
{"enable_job_stack_trace", false, true, "Enable by default collecting stack traces from job's scheduling."},
{"allow_suspicious_types_in_group_by", true, false, "Don't allow Variant/Dynamic types in GROUP BY by default"},
{"allow_suspicious_types_in_order_by", true, false, "Don't allow Variant/Dynamic types in ORDER BY by default"},

View File

@ -382,7 +382,8 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
if (!table_storage->isSystemStorage() && !DatabaseCatalog::isPredefinedDatabase(database_name))
{
LOG_TEST(log, "Counting detached table {} to database {}", table_name, database_name);
CurrentMetrics::sub(getAttachedCounterForStorage(table_storage));
for (auto metric : getAttachedCountersForStorage(table_storage))
CurrentMetrics::sub(metric);
}
auto table_id = table_storage->getStorageID();
@ -430,7 +431,8 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
if (!table->isSystemStorage() && !DatabaseCatalog::isPredefinedDatabase(database_name))
{
LOG_TEST(log, "Counting attached table {} to database {}", table_name, database_name);
CurrentMetrics::add(getAttachedCounterForStorage(table));
for (auto metric : getAttachedCountersForStorage(table))
CurrentMetrics::add(metric);
}
}

View File

@ -1033,6 +1033,9 @@ private:
size_t tuple_size,
size_t input_rows_count) const
{
if (0 == tuple_size)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Comparison of zero-sized tuples is not implemented");
ColumnsWithTypeAndName less_columns(tuple_size);
ColumnsWithTypeAndName equal_columns(tuple_size - 1);
ColumnsWithTypeAndName tmp_columns(2);

View File

@ -668,6 +668,9 @@ private:
temporary_columns[0] = arguments[0];
size_t tuple_size = type1.getElements().size();
if (tuple_size == 0)
return ColumnTuple::create(input_rows_count);
Columns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)

View File

@ -37,7 +37,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss|eos)([.\-][a-z0-9\-.:]+))");
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss-data-acc|oss|eos)([.\-][a-z0-9\-.:]+))");
/// Case when AWS Private Link Interface is being used
/// E.g. (bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/bucket-name/key)
@ -115,7 +115,15 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
&& re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{
is_virtual_hosted_style = true;
if (name == "oss-data-acc")
{
bucket = bucket.substr(0, bucket.find('.'));
endpoint = uri.getScheme() + "://" + uri.getHost().substr(bucket.length() + 1);
}
else
{
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
}
validateBucket(bucket, uri);
if (!uri.getPath().empty())

View File

@ -212,6 +212,22 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
S3::URI uri("https://bucket-test1.oss-cn-beijing-internal.aliyuncs.com/ab-test");
ASSERT_EQ("https://oss-cn-beijing-internal.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test1", uri.bucket);
ASSERT_EQ("ab-test", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
S3::URI uri("https://bucket-test.cn-beijing-internal.oss-data-acc.aliyuncs.com/ab-test");
ASSERT_EQ("https://cn-beijing-internal.oss-data-acc.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test", uri.bucket);
ASSERT_EQ("ab-test", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
}
TEST(S3UriTest, versionIdChecks)

View File

@ -98,6 +98,9 @@
namespace CurrentMetrics
{
extern const Metric AttachedTable;
extern const Metric AttachedReplicatedTable;
extern const Metric AttachedDictionary;
extern const Metric AttachedView;
}
namespace DB
@ -145,7 +148,10 @@ namespace ServerSetting
{
extern const ServerSettingsBool ignore_empty_sql_security_in_create_view_query;
extern const ServerSettingsUInt64 max_database_num_to_throw;
extern const ServerSettingsUInt64 max_dictionary_num_to_throw;
extern const ServerSettingsUInt64 max_table_num_to_throw;
extern const ServerSettingsUInt64 max_replicated_table_num_to_throw;
extern const ServerSettingsUInt64 max_view_num_to_throw;
}
namespace ErrorCodes
@ -1912,16 +1918,8 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
}
UInt64 table_num_limit = getContext()->getGlobalContext()->getServerSettings()[ServerSetting::max_table_num_to_throw];
if (table_num_limit > 0 && !internal)
{
UInt64 table_count = CurrentMetrics::get(CurrentMetrics::AttachedTable);
if (table_count >= table_num_limit)
throw Exception(ErrorCodes::TOO_MANY_TABLES,
"Too many tables. "
"The limit (server configuration parameter `max_table_num_to_throw`) is set to {}, the current number of tables is {}",
table_num_limit, table_count);
}
if (!internal)
throwIfTooManyEntities(create, res);
database->createTable(getContext(), create.getTable(), res, query_ptr);
@ -1948,6 +1946,30 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
void InterpreterCreateQuery::throwIfTooManyEntities(ASTCreateQuery & create, StoragePtr storage) const
{
auto check_and_throw = [&](auto setting, CurrentMetrics::Metric metric, String setting_name, String entity_name)
{
UInt64 num_limit = getContext()->getGlobalContext()->getServerSettings()[setting];
UInt64 attached_count = CurrentMetrics::get(metric);
if (num_limit > 0 && attached_count >= num_limit)
throw Exception(ErrorCodes::TOO_MANY_TABLES,
"Too many {}. "
"The limit (server configuration parameter `{}`) is set to {}, the current number is {}",
entity_name, setting_name, num_limit, attached_count);
};
if (auto * replicated_storage = typeid_cast<StorageReplicatedMergeTree *>(storage.get()))
check_and_throw(ServerSetting::max_replicated_table_num_to_throw, CurrentMetrics::AttachedReplicatedTable, "max_replicated_table_num_to_throw", "replicated tables");
else if (create.is_dictionary)
check_and_throw(ServerSetting::max_dictionary_num_to_throw, CurrentMetrics::AttachedDictionary, "max_dictionary_num_to_throw", "dictionaries");
else if (create.isView())
check_and_throw(ServerSetting::max_view_num_to_throw, CurrentMetrics::AttachedView, "max_view_num_to_throw", "views");
else
check_and_throw(ServerSetting::max_table_num_to_throw, CurrentMetrics::AttachedTable, "max_table_num_to_throw", "tables");
}
BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties, LoadingStrictnessLevel mode)
{

View File

@ -122,6 +122,8 @@ private:
BlockIO executeQueryOnCluster(ASTCreateQuery & create);
void throwIfTooManyEntities(ASTCreateQuery & create, StoragePtr storage) const;
ASTPtr query_ptr;
/// Skip safety threshold when loading tables.

View File

@ -53,6 +53,7 @@ namespace Setting
extern const SettingsBool allow_nondeterministic_mutations;
extern const SettingsUInt64 max_block_size;
extern const SettingsBool use_concurrency_control;
extern const SettingsBool validate_mutation_query;
}
namespace MergeTreeSetting
@ -1386,6 +1387,18 @@ void MutationsInterpreter::validate()
}
}
// Make sure the mutation query is valid
if (context->getSettingsRef()[Setting::validate_mutation_query])
{
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
prepareQueryAffectedQueryTree(commands, source.getStorage(), context);
else
{
ASTPtr select_query = prepareQueryAffectedAST(commands, source.getStorage(), context);
InterpreterSelectQuery(select_query, context, source.getStorage(), metadata_snapshot);
}
}
QueryPlan plan;
initQueryPlan(stages.front(), plan);

View File

@ -9,6 +9,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSparse.h>
#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypesNumber.h>
@ -184,7 +185,7 @@ void AddingDefaultsTransform::transform(Chunk & chunk)
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
for (const ColumnWithTypeAndName & column_def : evaluate_block)
for (auto & column_def : evaluate_block)
{
const String & column_name = column_def.name;
@ -199,6 +200,9 @@ void AddingDefaultsTransform::transform(Chunk & chunk)
if (!defaults_mask.empty())
{
column_read.column = recursiveRemoveSparse(column_read.column);
column_def.column = recursiveRemoveSparse(column_def.column);
/// TODO: FixedString
if (isColumnedAsNumber(column_read.type) || isDecimal(column_read.type))
{

View File

@ -2263,18 +2263,18 @@ void IMergeTreeDataPart::checkConsistencyWithProjections(bool require_part_metad
proj_part->checkConsistency(require_part_metadata);
}
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk()
void IMergeTreeDataPart::calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample)
{
calculateColumnsSizesOnDisk();
calculateColumnsSizesOnDisk(columns_sample);
calculateSecondaryIndicesSizesOnDisk();
}
void IMergeTreeDataPart::calculateColumnsSizesOnDisk()
void IMergeTreeDataPart::calculateColumnsSizesOnDisk(std::optional<Block> columns_sample)
{
if (getColumns().empty() || checksums.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot calculate columns sizes when columns or checksums are not initialized");
calculateEachColumnSizes(columns_sizes, total_columns_size);
calculateEachColumnSizes(columns_sizes, total_columns_size, columns_sample);
}
void IMergeTreeDataPart::calculateSecondaryIndicesSizesOnDisk()
@ -2521,22 +2521,24 @@ ColumnPtr IMergeTreeDataPart::getColumnSample(const NameAndTypePair & column) co
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
MergeTreeReaderSettings settings;
settings.can_read_part_without_marks = true;
MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, 1)},
MarkRanges{MarkRange(0, total_mark)},
/*virtual_fields=*/ {},
/*uncompressed_cache=*/{},
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
settings,
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});
Columns result;
result.resize(1);
reader->readRows(0, 1, false, 0, result);
reader->readRows(0, total_mark, false, 0, result);
return result[0];
}

View File

@ -428,7 +428,7 @@ public:
bool shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const;
/// Calculate column and secondary indices sizes on disk.
void calculateColumnsAndSecondaryIndicesSizesOnDisk();
void calculateColumnsAndSecondaryIndicesSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
@ -633,7 +633,7 @@ protected:
/// Fill each_columns_size and total_size with sizes from columns files on
/// disk using columns and checksums.
virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const = 0;
virtual void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const = 0;
std::optional<String> getRelativePathForDetachedPart(const String & prefix, bool broken) const;
@ -715,7 +715,7 @@ private:
void loadPartitionAndMinMaxIndex();
void calculateColumnsSizesOnDisk();
void calculateColumnsSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
void calculateSecondaryIndicesSizesOnDisk();

View File

@ -54,6 +54,8 @@ public:
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
virtual Block getColumnsSample() const = 0;
protected:
SerializationPtr getSerialization(const String & column_name) const;

View File

@ -80,7 +80,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartCompactWriter(
}
void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size) const
void MergeTreeDataPartCompact::calculateEachColumnSizes(ColumnSizeByName & /*each_columns_size*/, ColumnSize & total_size, std::optional<Block> /*columns_sample*/) const
{
auto bin_checksum = checksums.files.find(DATA_FILE_NAME_WITH_EXTENSION);
if (bin_checksum != checksums.files.end())

View File

@ -70,7 +70,7 @@ private:
void loadIndexGranularity() override;
/// Compact parts don't support per column size, only total size
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
};
}

View File

@ -82,7 +82,7 @@ MergeTreeDataPartWriterPtr createMergeTreeDataPartWideWriter(
/// Takes into account the fact that several columns can e.g. share their .size substreams.
/// When calculating totals these should be counted only once.
ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const
const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const
{
ColumnSize size;
if (checksums.empty())
@ -108,7 +108,7 @@ ColumnSize MergeTreeDataPartWide::getColumnSizeImpl(
auto mrk_checksum = checksums.files.find(*stream_name + getMarksFileExtension());
if (mrk_checksum != checksums.files.end())
size.marks += mrk_checksum->second.file_size;
});
}, column.type, columns_sample && columns_sample->has(column.name) ? columns_sample->getByName(column.name).column : getColumnSample(column));
return size;
}
@ -374,12 +374,12 @@ std::optional<String> MergeTreeDataPartWide::getFileNameForColumn(const NameAndT
return filename;
}
void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const
void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const
{
std::unordered_set<String> processed_substreams;
for (const auto & column : columns)
{
ColumnSize size = getColumnSizeImpl(column, &processed_substreams);
ColumnSize size = getColumnSizeImpl(column, &processed_substreams, columns_sample);
each_columns_size[column.name] = size;
total_size.add(size);

View File

@ -64,9 +64,9 @@ private:
/// Loads marks index granularity into memory
void loadIndexGranularity() override;
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const;
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams, std::optional<Block> columns_sample) const;
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size, std::optional<Block> columns_sample) const override;
};

View File

@ -123,6 +123,8 @@ public:
written_offset_columns = written_offset_columns_;
}
Block getColumnsSample() const override { return block_sample; }
protected:
/// Count index_granularity for block and store in `index_granularity`
size_t computeIndexGranularity(const Block & block) const;

View File

@ -172,7 +172,7 @@ size_t MergeTreeReaderWide::readRows(
throw;
}
if (column->empty())
if (column->empty() && max_rows_to_read > 0)
res_columns[pos] = nullptr;
}

View File

@ -209,7 +209,7 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync(
new_part->index_granularity = writer->getIndexGranularity();
/// Just in case
new_part->index_granularity.shrinkToFitInMemory();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
new_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(writer->getColumnsSample());
/// In mutation, existing_rows_count is already calculated in PartMergerWriter
/// In merge situation, lightweight deleted rows was physically deleted, existing_rows_count equals rows_count

View File

@ -1,10 +1,13 @@
#include <vector>
#include <Storages/Utils.h>
#include <Storages/IStorage.h>
#include <Storages/StorageReplicatedMergeTree.h>
namespace CurrentMetrics
{
extern const Metric AttachedTable;
extern const Metric AttachedReplicatedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
}
@ -12,17 +15,20 @@ namespace CurrentMetrics
namespace DB
{
CurrentMetrics::Metric getAttachedCounterForStorage(const StoragePtr & storage)
std::vector<CurrentMetrics::Metric> getAttachedCountersForStorage(const StoragePtr & storage)
{
if (storage->isView())
{
return CurrentMetrics::AttachedView;
return {CurrentMetrics::AttachedView};
}
if (storage->isDictionary())
{
return CurrentMetrics::AttachedDictionary;
return {CurrentMetrics::AttachedDictionary};
}
return CurrentMetrics::AttachedTable;
if (auto * replicated_storage = typeid_cast<StorageReplicatedMergeTree *>(storage.get()))
{
return {CurrentMetrics::AttachedTable, CurrentMetrics::AttachedReplicatedTable};
}
return {CurrentMetrics::AttachedTable};
}
}

View File

@ -6,5 +6,5 @@
namespace DB
{
CurrentMetrics::Metric getAttachedCounterForStorage(const StoragePtr & storage);
std::vector<CurrentMetrics::Metric> getAttachedCountersForStorage(const StoragePtr & storage);
}

View File

@ -17,3 +17,9 @@ services:
- /misc/rabbitmq/server-cert.pem:/etc/rabbitmq/server-cert.pem
- /misc/rabbitmq/server-key.pem:/etc/rabbitmq/server-key.pem
- /misc/rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
# https://www.rabbitmq.com/docs/monitoring#health-checks
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
retries: 10
timeout: 2s

View File

@ -251,23 +251,16 @@ def kill_query(
if is_initial_query is not None
else ""
)
old_time = time.monotonic()
node.query(
f"KILL QUERY WHERE (query_kind='{query_kind}') AND (query LIKE '%{id}%'){filter_for_is_initial_query} SYNC"
)
node.query("SYSTEM FLUSH LOGS")
duration = (
int(
node.query(
f"SELECT query_duration_ms FROM system.query_log WHERE query_kind='KillQuery' AND query LIKE '%{id}%' AND type='QueryFinish'"
)
)
/ 1000
)
waited = time.monotonic() - old_time
print(
f"{get_node_name(node)}: Cancelled {operation_name} {id} after {duration} seconds"
f"{get_node_name(node)}: Cancelled {operation_name} {id} after {waited} seconds"
)
if timeout is not None:
assert duration < timeout
assert waited < timeout
# Stops all ZooKeeper servers.
@ -305,7 +298,7 @@ def sleep(seconds):
class NoTrashChecker:
def __init__(self):
self.expect_backups = []
self.expect_unfinished_backups = []
self.allow_unfinished_backups = []
self.expect_errors = []
self.allow_errors = []
self.check_zookeeper = True
@ -373,7 +366,7 @@ class NoTrashChecker:
if unfinished_backups:
print(f"Found unfinished backups: {unfinished_backups}")
assert new_backups == set(self.expect_backups)
assert unfinished_backups == set(self.expect_unfinished_backups)
assert unfinished_backups.difference(self.allow_unfinished_backups) == set()
all_errors = set()
start_time = time.strftime(
@ -641,7 +634,7 @@ def test_long_disconnection_stops_backup():
assert get_status(initiator, backup_id=backup_id) == "CREATING_BACKUP"
assert get_num_system_processes(initiator, backup_id=backup_id) >= 1
no_trash_checker.expect_unfinished_backups = [backup_id]
no_trash_checker.allow_unfinished_backups = [backup_id]
no_trash_checker.allow_errors = [
"FAILED_TO_SYNC_BACKUP_OR_RESTORE",
"KEEPER_EXCEPTION",
@ -674,7 +667,7 @@ def test_long_disconnection_stops_backup():
# A backup is expected to fail, but it isn't expected to fail too soon.
print(f"Backup failed after {time_to_fail} seconds disconnection")
assert time_to_fail > 3
assert time_to_fail < 30
assert time_to_fail < 35
# A backup must NOT be stopped if Zookeeper is disconnected shorter than `failure_after_host_disconnected_for_seconds`.
@ -695,7 +688,7 @@ def test_short_disconnection_doesnt_stop_backup():
backup_id = random_id()
initiator.query(
f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {get_backup_name(backup_id)} SETTINGS id='{backup_id}' ASYNC",
settings={"backup_restore_failure_after_host_disconnected_for_seconds": 6},
settings={"backup_restore_failure_after_host_disconnected_for_seconds": 10},
)
assert get_status(initiator, backup_id=backup_id) == "CREATING_BACKUP"
@ -703,13 +696,13 @@ def test_short_disconnection_doesnt_stop_backup():
# Dropping connection for less than `failure_after_host_disconnected_for_seconds`
with PartitionManager() as pm:
random_sleep(3)
random_sleep(4)
node_to_drop_zk_connection = random_node()
print(
f"Dropping connection between {get_node_name(node_to_drop_zk_connection)} and ZooKeeper"
)
pm.drop_instance_zk_connections(node_to_drop_zk_connection)
random_sleep(3)
random_sleep(4)
print(
f"Restoring connection between {get_node_name(node_to_drop_zk_connection)} and ZooKeeper"
)

View File

@ -27,6 +27,7 @@ REPLICATED_POSTPONE_MUTATION_LOG = (
POSTPONE_MUTATION_LOG = (
"According to exponential backoff policy, do not perform mutations for the part"
)
FAILING_MUTATION_QUERY = "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT throwIf(1)) SETTINGS allow_nondeterministic_mutations = 1"
all_nodes = [node_with_backoff, node_no_backoff]
@ -83,17 +84,13 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log
assert not node.contains_in_log(POSTPONE_MUTATION_LOG)
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
check_logs()
node.query("KILL MUTATION WHERE table='test_mutations'")
# Check that after kill new parts mutations are postponing.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
check_logs()
@ -101,9 +98,7 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log
def test_exponential_backoff_with_replicated_tree(started_cluster):
prepare_cluster(True)
node_with_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
)
node_with_backoff.query(FAILING_MUTATION_QUERY)
assert node_with_backoff.wait_for_log_line(REPLICATED_POSTPONE_MUTATION_LOG)
assert not node_no_backoff.contains_in_log(REPLICATED_POSTPONE_MUTATION_LOG)
@ -114,7 +109,7 @@ def test_exponential_backoff_create_dependent_table(started_cluster):
# Executing incorrect mutation.
node_with_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations = 1, validate_mutation_query = 0"
)
# Creating dependent table for mutation.
@ -148,9 +143,7 @@ def test_exponential_backoff_setting_override(started_cluster):
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
assert not node.contains_in_log(POSTPONE_MUTATION_LOG)
@ -166,9 +159,7 @@ def test_backoff_clickhouse_restart(started_cluster, replicated_table):
node = node_with_backoff
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
assert node.wait_for_log_line(
REPLICATED_POSTPONE_MUTATION_LOG if replicated_table else POSTPONE_MUTATION_LOG
)
@ -193,9 +184,8 @@ def test_no_backoff_after_killing_mutation(started_cluster, replicated_table):
node = node_with_backoff
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
)
node.query(FAILING_MUTATION_QUERY)
# Executing correct mutation.
node.query("ALTER TABLE test_mutations DELETE WHERE x=1")
assert node.wait_for_log_line(

View File

@ -1,4 +1,20 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
<max_dictionary_num_to_throw>10</max_dictionary_num_to_throw>
<max_table_num_to_throw>10</max_table_num_to_throw>
<max_database_num_to_throw>10</max_database_num_to_throw>
</clickhouse>

View File

@ -0,0 +1,4 @@
<clickhouse>
<max_replicated_table_num_to_throw>5</max_replicated_table_num_to_throw>
</clickhouse>

View File

@ -0,0 +1,4 @@
<clickhouse>
<max_replicated_table_num_to_throw>3</max_replicated_table_num_to_throw>
</clickhouse>

View File

@ -1,11 +1,22 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["config/config.xml"])
node = cluster.add_instance(
"node1",
with_zookeeper=True,
macros={"replica": "r1"},
main_configs=["config/config.xml", "config/config1.xml"],
)
node2 = cluster.add_instance(
"node2",
with_zookeeper=True,
macros={"replica": "r2"},
main_configs=["config/config.xml", "config/config2.xml"],
)
@pytest.fixture(scope="module")
@ -24,10 +35,9 @@ def test_table_db_limit(started_cluster):
for i in range(9):
node.query("create database db{}".format(i))
with pytest.raises(QueryRuntimeException) as exp_info:
node.query("create database db_exp".format(i))
assert "TOO_MANY_DATABASES" in str(exp_info)
assert "TOO_MANY_DATABASES" in node.query_and_get_error(
"create database db_exp".format(i)
)
for i in range(10):
node.query("create table t{} (a Int32) Engine = Log".format(i))
@ -35,13 +45,72 @@ def test_table_db_limit(started_cluster):
# This checks that system tables are not accounted in the number of tables.
node.query("system flush logs")
# Regular tables
for i in range(10):
node.query("drop table t{}".format(i))
for i in range(10):
node.query("create table t{} (a Int32) Engine = Log".format(i))
with pytest.raises(QueryRuntimeException) as exp_info:
node.query("create table default.tx (a Int32) Engine = Log")
assert "TOO_MANY_TABLES" in node.query_and_get_error(
"create table default.tx (a Int32) Engine = Log"
)
assert "TOO_MANY_TABLES" in str(exp_info)
# Dictionaries
for i in range(10):
node.query(
"create dictionary d{} (a Int32) primary key a source(null()) layout(flat()) lifetime(1000)".format(
i
)
)
assert "TOO_MANY_TABLES" in node.query_and_get_error(
"create dictionary dx (a Int32) primary key a source(null()) layout(flat()) lifetime(1000)"
)
# Replicated tables
for i in range(10):
node.query("drop table t{}".format(i))
for i in range(3):
node.query(
"create table t{} on cluster 'cluster' (a Int32) Engine = ReplicatedMergeTree('/clickhouse/tables/t{}', '{{replica}}') order by a".format(
i, i
)
)
# Test limit on other replica
assert "Too many replicated tables" in node2.query_and_get_error(
"create table tx (a Int32) Engine = ReplicatedMergeTree('/clickhouse/tables/tx', '{replica}') order by a"
)
for i in range(3, 5):
node.query(
"create table t{} (a Int32) Engine = ReplicatedMergeTree('/clickhouse/tables/t{}', '{{replica}}') order by a".format(
i, i
)
)
assert "Too many replicated tables" in node.query_and_get_error(
"create table tx (a Int32) Engine = ReplicatedMergeTree('/clickhouse/tables/tx', '{replica}') order by a"
)
# Checks that replicated tables are also counted as regular tables
for i in range(5, 10):
node.query("create table t{} (a Int32) Engine = Log".format(i))
assert "TOO_MANY_TABLES" in node.query_and_get_error(
"create table tx (a Int32) Engine = Log"
)
# Cleanup
for i in range(10):
node.query("drop table t{} sync".format(i))
for i in range(3):
node2.query("drop table t{} sync".format(i))
node.query("system drop replica 'r1' from ZKPATH '/clickhouse/tables/tx'")
node.query("system drop replica 'r2' from ZKPATH '/clickhouse/tables/tx'")
for i in range(9):
node.query("drop database db{}".format(i))
for i in range(10):
node.query("drop dictionary d{}".format(i))

View File

@ -7,5 +7,5 @@ select count() from test_qualify; -- 100
select * from test_qualify qualify row_number() over (order by number) = 50 SETTINGS enable_analyzer = 1; -- 49
select * from test_qualify qualify row_number() over (order by number) = 50 SETTINGS enable_analyzer = 0; -- { serverError NOT_IMPLEMENTED }
delete from test_qualify where number in (select number from test_qualify qualify row_number() over (order by number) = 50); -- { serverError UNFINISHED }
delete from test_qualify where number in (select number from test_qualify qualify row_number() over (order by number) = 50) SETTINGS validate_mutation_query = 0; -- { serverError UNFINISHED }
select count() from test_qualify; -- 100

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS t2;
CREATE TABLE t (x Int32) ENGINE = MergeTree ORDER BY x;
CREATE TABLE t2 (x Int32) ENGINE = MergeTree ORDER BY x;
SYSTEM STOP MERGES t;
SET max_insert_block_size = 1;
SET min_insert_block_size_rows = 1;
SET max_block_size = 1;
SET max_parts_to_move = 5;
INSERT INTO t SELECT number from numbers(10);
ALTER TABLE t MOVE PARTITION tuple() TO TABLE t2; -- { serverError TOO_MANY_PARTS }
SET max_parts_to_move = 15;
ALTER TABLE t MOVE PARTITION tuple() TO TABLE t2;
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS t2;

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS t2;
CREATE TABLE t (x int) ENGINE = MergeTree() ORDER BY ();
DELETE FROM t WHERE y in (SELECT x FROM t); -- { serverError UNKNOWN_IDENTIFIER }
DELETE FROM t WHERE x in (SELECT y FROM t); -- { serverError UNKNOWN_IDENTIFIER }
DELETE FROM t WHERE x IN (SELECT * FROM t2); -- { serverError UNKNOWN_TABLE }
ALTER TABLE t DELETE WHERE x in (SELECT y FROM t); -- { serverError UNKNOWN_IDENTIFIER }
ALTER TABLE t UPDATE x = 1 WHERE x IN (SELECT y FROM t); -- { serverError UNKNOWN_IDENTIFIER }
DELETE FROM t WHERE x IN (SELECT foo FROM bar) SETTINGS validate_mutation_query = 0;
ALTER TABLE t ADD COLUMN y int;
DELETE FROM t WHERE y in (SELECT y FROM t);
CREATE TABLE t2 (x int) ENGINE = MergeTree() ORDER BY ();
DELETE FROM t WHERE x IN (SELECT * FROM t2);
DROP TABLE t;
DROP TABLE t2;

View File

@ -0,0 +1 @@
CODEC(Delta(1), LZ4) 14 48

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo "Hello, World!" | $CLICKHOUSE_COMPRESSOR --codec 'Delta' --codec 'LZ4' | $CLICKHOUSE_COMPRESSOR --stat

View File

@ -0,0 +1 @@
test 10.00 million 352.87 MiB 39.43 MiB 39.45 MiB

View File

@ -0,0 +1,23 @@
-- Tags: no-random-settings
set allow_experimental_dynamic_type = 1;
set allow_experimental_json_type = 1;
drop table if exists test;
create table test (d Dynamic, json JSON) engine=MergeTree order by tuple() settings min_rows_for_wide_part=0, min_bytes_for_wide_part=1;
insert into test select number, '{"a" : 42, "b" : "Hello, World"}' from numbers(10000000);
SELECT
`table`,
formatReadableQuantity(sum(rows)) AS rows,
formatReadableSize(sum(data_uncompressed_bytes)) AS data_size_uncompressed,
formatReadableSize(sum(data_compressed_bytes)) AS data_size_compressed,
formatReadableSize(sum(bytes_on_disk)) AS total_size_on_disk
FROM system.parts
WHERE active AND (database = currentDatabase()) AND (`table` = 'test')
GROUP BY `table`
ORDER BY `table` ASC;
drop table test;

View File

@ -0,0 +1 @@
() 2

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS t0;
CREATE TABLE t0 (c0 Tuple(), c1 int) ENGINE = Memory();
INSERT INTO t0 VALUES ((), 1);
ALTER TABLE t0 UPDATE c0 = (), c1 = 2 WHERE EXISTS (SELECT 1);
SELECT * FROM t0;
DROP TABLE t0;

View File

@ -0,0 +1,4 @@
1 false
2 false
all_1_1_0 Sparse
all_2_2_0 Sparse

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "
DROP TABLE IF EXISTS test_default_bool;
CREATE TABLE test_default_bool (id Int8, b Bool DEFAULT false)
ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.9;
"
echo 'INSERT INTO test_default_bool FORMAT CSV 1,\N' | $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" --data-binary @-
echo 'INSERT INTO test_default_bool FORMAT CSV 2,\N' | $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" --data-binary @-
$CLICKHOUSE_CLIENT --query "
SELECT * FROM test_default_bool ORDER BY id;
SELECT name, serialization_kind FROM system.parts_columns WHERE database = currentDatabase() AND table = 'test_default_bool' AND column = 'b' AND active;
DROP TABLE test_default_bool;
"