Merge remote-tracking branch 'public/master' into fixed_hints_db_name

This commit is contained in:
yariks5s 2023-08-16 10:17:55 +00:00
commit 85c97f85fb
57 changed files with 528 additions and 359 deletions

View File

@ -12,6 +12,7 @@ ENV \
# install systemd packages
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sudo \
systemd \
&& \
apt-get clean && \

View File

@ -11,7 +11,7 @@ Inserts data into a table.
**Syntax**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
You can specify a list of columns to insert using the `(c1, c2, c3)`. You can also use an expression with column [matcher](../../sql-reference/statements/select/index.md#asterisk) such as `*` and/or [modifiers](../../sql-reference/statements/select/index.md#select-modifiers) such as [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
@ -107,7 +107,7 @@ If table has [constraints](../../sql-reference/statements/create/table.md#constr
**Syntax**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
```
Columns are mapped according to their position in the SELECT clause. However, their names in the SELECT expression and the table for INSERT may differ. If necessary, type casting is performed.
@ -126,7 +126,7 @@ To insert a default value instead of `NULL` into a column with not nullable data
**Syntax**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
```
Use the syntax above to insert data from a file, or files, stored on the **client** side. `file_name` and `type` are string literals. Input file [format](../../interfaces/formats.md) must be set in the `FORMAT` clause.

View File

@ -11,7 +11,7 @@ sidebar_label: INSERT INTO
**Синтаксис**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
Вы можете указать список столбцов для вставки, используя синтаксис `(c1, c2, c3)`. Также можно использовать выражение cо [звездочкой](../../sql-reference/statements/select/index.md#asterisk) и/или модификаторами, такими как [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
@ -100,7 +100,7 @@ INSERT INTO t FORMAT TabSeparated
**Синтаксис**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
```
Соответствие столбцов определяется их позицией в секции SELECT. При этом, их имена в выражении SELECT и в таблице для INSERT, могут отличаться. При необходимости выполняется приведение типов данных, эквивалентное соответствующему оператору CAST.
@ -120,7 +120,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
**Синтаксис**
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
```
Используйте этот синтаксис, чтобы вставить данные из файла, который хранится на стороне **клиента**. `file_name` и `type` задаются в виде строковых литералов. [Формат](../../interfaces/formats.md) входного файла должен быть задан в секции `FORMAT`.

View File

@ -8,7 +8,7 @@ INSERT INTO 语句主要用于向系统中添加数据.
查询的基本格式:
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
```
您可以在查询中指定要插入的列的列表,如:`[(c1, c2, c3)]`。您还可以使用列[匹配器](../../sql-reference/statements/select/index.md#asterisk)的表达式,例如`*`和/或[修饰符](../../sql-reference/statements/select/index.md#select-modifiers),例如 [APPLY](../../sql-reference/statements/select/index.md#apply-modifier) [EXCEPT](../../sql-reference/statements/select/index.md#apply-modifier) [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier)。
@ -71,7 +71,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] FORMAT format_name data_set
例如下面的查询所使用的输入格式就与上面INSERT … VALUES的中使用的输入格式相同
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
```
ClickHouse会清除数据前所有的空白字符与一个换行符如果有换行符的话。所以在进行查询时我们建议您将数据放入到输入输出格式名称后的新的一行中去如果数据是以空白字符开始的这将非常重要
@ -93,7 +93,7 @@ INSERT INTO t FORMAT TabSeparated
### 使用`SELECT`的结果写入 {#inserting-the-results-of-select}
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
```
写入与SELECT的列的对应关系是使用位置来进行对应的尽管它们在SELECT表达式与INSERT中的名称可能是不同的。如果需要会对它们执行对应的类型转换。

View File

@ -997,7 +997,9 @@ namespace
{
/// sudo respects limits in /etc/security/limits.conf e.g. open files,
/// that's why we are using it instead of the 'clickhouse su' tool.
command = fmt::format("sudo -u '{}' {}", user, command);
/// by default, sudo resets all the ENV variables, but we should preserve
/// the values /etc/default/clickhouse in /etc/init.d/clickhouse file
command = fmt::format("sudo --preserve-env -u '{}' {}", user, command);
}
fmt::print("Will run {}\n", command);

View File

@ -105,6 +105,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int FILE_ALREADY_EXISTS;
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
}
@ -2408,6 +2409,13 @@ void ClientBase::runInteractive()
}
}
if (suggest && suggest->getLastError() == ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
{
// If a separate connection loading suggestions failed to open a new session,
// use the main session to receive them.
suggest->load(*connection, connection_parameters.timeouts, config().getInt("suggestion_limit"));
}
try
{
if (!processQueryText(input))

View File

@ -22,9 +22,11 @@ namespace DB
{
namespace ErrorCodes
{
extern const int OK;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int DEADLOCK_AVOIDED;
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
Suggest::Suggest()
@ -121,21 +123,24 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
}
catch (const Exception & e)
{
last_error = e.code();
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
continue;
/// Client can successfully connect to the server and
/// get ErrorCodes::USER_SESSION_LIMIT_EXCEEDED for suggestion connection.
else if (e.code() != ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
{
/// We should not use std::cerr here, because this method works concurrently with the main thread.
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
///
/// USER_SESSION_LIMIT_EXCEEDED is ignored here. The client will try to receive
/// suggestions using the main connection later.
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
}
}
catch (...)
{
last_error = getCurrentExceptionCode();
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
@ -148,6 +153,21 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
});
}
void Suggest::load(IServerConnection & connection,
const ConnectionTimeouts & timeouts,
Int32 suggestion_limit)
{
try
{
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true));
}
catch (...)
{
std::cerr << "Suggestions loading exception: " << getCurrentExceptionMessage(false, true) << std::endl;
last_error = getCurrentExceptionCode();
}
}
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(
@ -176,6 +196,7 @@ void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & t
return;
case Protocol::Server::EndOfStream:
last_error = ErrorCodes::OK;
return;
default:

View File

@ -7,6 +7,7 @@
#include <Client/LocalConnection.h>
#include <Client/LineReader.h>
#include <IO/ConnectionTimeouts.h>
#include <atomic>
#include <thread>
@ -28,9 +29,15 @@ public:
template <typename ConnectionType>
void load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
void load(IServerConnection & connection,
const ConnectionTimeouts & timeouts,
Int32 suggestion_limit);
/// Older server versions cannot execute the query loading suggestions.
static constexpr int MIN_SERVER_REVISION = DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED;
int getLastError() const { return last_error.load(); }
private:
void fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
@ -38,6 +45,8 @@ private:
/// Words are fetched asynchronously.
std::thread loading_thread;
std::atomic<int> last_error { -1 };
};
}

View File

@ -152,7 +152,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
throw KeeperException(code, "/");
if (code == Coordination::Error::ZNONODE)
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + args.chroot + " before start.", Coordination::Error::ZNONODE);
throw KeeperException(Coordination::Error::ZNONODE, "ZooKeeper root doesn't exist. You should create root node {} before start.", args.chroot);
}
}
@ -491,7 +491,7 @@ std::string ZooKeeper::get(const std::string & path, Coordination::Stat * stat,
if (tryGet(path, res, stat, watch, &code))
return res;
else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
throw KeeperException(code, "Can't get data for node '{}': node doesn't exist", path);
}
std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
@ -501,7 +501,7 @@ std::string ZooKeeper::getWatch(const std::string & path, Coordination::Stat * s
if (tryGetWatch(path, res, stat, watch_callback, &code))
return res;
else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
throw KeeperException(code, "Can't get data for node '{}': node doesn't exist", path);
}
bool ZooKeeper::tryGet(

View File

@ -213,7 +213,7 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
};
}
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key);
}
}

View File

@ -776,8 +776,12 @@ namespace
UInt64 key = 0;
auto * dst = reinterpret_cast<char *>(&key);
const auto ref = cache.from_column->getDataAt(i);
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if constexpr (std::endian::native == std::endian::big)
dst += sizeof(key) - ref.size;
#pragma clang diagnostic pop
memcpy(dst, ref.data, ref.size);
table[key] = i;

View File

@ -188,7 +188,7 @@ Client::Client(
}
}
LOG_TRACE(log, "API mode: {}", toString(api_mode));
LOG_TRACE(log, "API mode of the S3 client: {}", api_mode);
detect_region = provider_type == ProviderType::AWS && explicit_region == Aws::Region::AWS_GLOBAL;

View File

@ -60,9 +60,6 @@ public:
/// (When there is a local replica with big delay).
bool lazy = false;
time_t local_delay = 0;
/// Set only if parallel reading from replicas is used.
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator;
};
using Shards = std::vector<Shard>;

View File

@ -28,7 +28,6 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
@ -281,7 +280,6 @@ void executeQueryWithParallelReplicas(
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
auto remote_plan = std::make_unique<QueryPlan>();
auto plans = std::vector<QueryPlanPtr>();
/// This is a little bit weird, but we construct an "empty" coordinator without
/// any specified reading/coordination method (like Default, InOrder, InReverseOrder)
@ -309,20 +307,7 @@ void executeQueryWithParallelReplicas(
&Poco::Logger::get("ReadFromParallelRemoteReplicasStep"),
query_info.storage_limits);
remote_plan->addStep(std::move(read_from_remote));
remote_plan->addInterpreterContext(context);
plans.emplace_back(std::move(remote_plan));
if (std::all_of(plans.begin(), plans.end(), [](const QueryPlanPtr & plan) { return !plan; }))
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from shard. This is a bug");
DataStreams input_streams;
input_streams.reserve(plans.size());
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
query_plan.addStep(std::move(read_from_remote));
}
}

View File

@ -184,7 +184,7 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(query_tree_)
, planner(query_tree_, select_query_options_)
, planner(query_tree_, select_query_options)
{
}

View File

@ -299,6 +299,7 @@ Session::~Session()
if (notified_session_log_about_login)
{
LOG_DEBUG(log, "{} Logout, user_id: {}", toString(auth_id), toString(*user_id));
if (auto session_log = getSessionLog())
{
/// TODO: We have to ensure that the same info is added to the session log on a LoginSuccess event and on the corresponding Logout event.
@ -320,6 +321,7 @@ AuthenticationType Session::getAuthenticationTypeOrLogInFailure(const String & u
}
catch (const Exception & e)
{
LOG_ERROR(log, "{} Authentication failed with error: {}", toString(auth_id), e.what());
if (auto session_log = getSessionLog())
session_log->addLoginFailure(auth_id, getClientInfo(), user_name, e);

View File

@ -45,6 +45,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/InterpreterTransactionControlQuery.h>
#include <Interpreters/NormalizeSelectWithUnionQueryVisitor.h>
@ -1033,6 +1034,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
// InterpreterSelectQueryAnalyzer does not build QueryPlan in the constructor.
// We need to force to build it here to check if we need to ignore quota.
if (auto * interpreter_with_analyzer = dynamic_cast<InterpreterSelectQueryAnalyzer *>(interpreter.get()))
interpreter_with_analyzer->getQueryPlan();
if (!interpreter->ignoreQuota() && !quota_checked)
{
quota = context->getQuota();

View File

@ -1047,7 +1047,7 @@ PlannerContextPtr buildPlannerContext(const QueryTreeNodePtr & query_tree_node,
}
Planner::Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_)
SelectQueryOptions & select_query_options_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)
, planner_context(buildPlannerContext(query_tree, select_query_options, std::make_shared<GlobalPlannerContext>()))
@ -1055,7 +1055,7 @@ Planner::Planner(const QueryTreeNodePtr & query_tree_,
}
Planner::Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
SelectQueryOptions & select_query_options_,
GlobalPlannerContextPtr global_planner_context_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)
@ -1064,7 +1064,7 @@ Planner::Planner(const QueryTreeNodePtr & query_tree_,
}
Planner::Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
SelectQueryOptions & select_query_options_,
PlannerContextPtr planner_context_)
: query_tree(query_tree_)
, select_query_options(select_query_options_)

View File

@ -22,16 +22,16 @@ class Planner
public:
/// Initialize planner with query tree after analysis phase
Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_);
SelectQueryOptions & select_query_options_);
/// Initialize planner with query tree after query analysis phase and global planner context
Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
SelectQueryOptions & select_query_options_,
GlobalPlannerContextPtr global_planner_context_);
/// Initialize planner with query tree after query analysis phase and planner context
Planner(const QueryTreeNodePtr & query_tree_,
const SelectQueryOptions & select_query_options_,
SelectQueryOptions & select_query_options_,
PlannerContextPtr planner_context_);
const QueryPlan & getQueryPlan() const
@ -66,7 +66,7 @@ private:
void buildPlanForQueryNode();
QueryTreeNodePtr query_tree;
SelectQueryOptions select_query_options;
SelectQueryOptions & select_query_options;
PlannerContextPtr planner_context;
QueryPlan query_plan;
StorageLimitsList storage_limits;

View File

@ -113,6 +113,20 @@ void checkAccessRights(const TableNode & table_node, const Names & column_names,
query_context->checkAccess(AccessType::SELECT, storage_id, column_names);
}
bool shouldIgnoreQuotaAndLimits(const TableNode & table_node)
{
const auto & storage_id = table_node.getStorageID();
if (!storage_id.hasDatabase())
return false;
if (storage_id.database_name == DatabaseCatalog::SYSTEM_DATABASE)
{
static const boost::container::flat_set<String> tables_ignoring_quota{"quotas", "quota_limits", "quota_usage", "quotas_usage", "one"};
if (tables_ignoring_quota.count(storage_id.table_name))
return true;
}
return false;
}
NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage, const StorageSnapshotPtr & storage_snapshot)
{
/** We need to read at least one column to find the number of rows.
@ -828,8 +842,9 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
}
else
{
SelectQueryOptions analyze_query_options = SelectQueryOptions(from_stage).analyze();
Planner planner(select_query_info.query_tree,
SelectQueryOptions(from_stage).analyze(),
analyze_query_options,
select_query_info.planner_context);
planner.buildQueryPlanIfNeeded();
@ -1375,7 +1390,7 @@ JoinTreeQueryPlan buildQueryPlanForArrayJoinNode(const QueryTreeNodePtr & array_
JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
const SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
SelectQueryOptions & select_query_options,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
{
@ -1386,6 +1401,16 @@ JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
std::vector<ColumnIdentifierSet> table_expressions_outer_scope_columns(table_expressions_stack_size);
ColumnIdentifierSet current_outer_scope_columns = outer_scope_columns;
if (is_single_table_expression)
{
auto * table_node = table_expressions_stack[0]->as<TableNode>();
if (table_node && shouldIgnoreQuotaAndLimits(*table_node))
{
select_query_options.ignore_quota = true;
select_query_options.ignore_limits = true;
}
}
/// For each table, table function, query, union table expressions prepare before query plan build
for (size_t i = 0; i < table_expressions_stack_size; ++i)
{

View File

@ -20,7 +20,7 @@ struct JoinTreeQueryPlan
/// Build JOIN TREE query plan for query node
JoinTreeQueryPlan buildJoinTreeQueryPlan(const QueryTreeNodePtr & query_node,
const SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
SelectQueryOptions & select_query_options,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context);

View File

@ -1780,7 +1780,8 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix)
part_is_probably_removed_from_disk = true;
}
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/,
const DiskTransactionPtr & disk_transaction) const
{
/// Avoid unneeded duplicates of broken parts if we try to detach the same broken part multiple times.
/// Otherwise it may pollute detached/ with dirs with _tryN suffix and we will fail to remove broken part after 10 attempts.
@ -1795,7 +1796,8 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
IDataPartStorage::ClonePartParams params
{
.copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication,
.make_source_readonly = true
.make_source_readonly = true,
.external_transaction = disk_transaction
};
return getDataPartStorage().freeze(
storage.relative_data_path,

View File

@ -371,7 +371,8 @@ public:
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists);
/// Makes clone of a part in detached/ directory via hard links
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const;
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const;

View File

@ -2619,8 +2619,50 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
if (detached_parts.empty())
return 0;
PartsTemporaryRename renamed_parts(*this, "detached/");
auto get_last_touched_time = [&](const DetachedPartInfo & part_info) -> time_t
{
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
time_t last_change_time = part_info.disk->getLastChanged(path);
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
return std::max(last_change_time, last_modification_time);
};
time_t ttl_seconds = getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
size_t unfinished_deleting_parts = 0;
time_t current_time = time(nullptr);
for (const auto & part_info : detached_parts)
{
if (!part_info.dir_name.starts_with("deleting_"))
continue;
time_t startup_time = current_time - static_cast<time_t>(Context::getGlobalContextInstance()->getUptimeSeconds());
time_t last_touch_time = get_last_touched_time(part_info);
/// Maybe it's being deleted right now (for example, in ALTER DROP DETACHED)
bool had_restart = last_touch_time < startup_time;
bool ttl_expired = last_touch_time + ttl_seconds <= current_time;
if (!had_restart && !ttl_expired)
continue;
/// We were trying to delete this detached part but did not finish deleting, probably because the server crashed
LOG_INFO(log, "Removing detached part {} that we failed to remove previously", part_info.dir_name);
try
{
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / part_info.dir_name / "", part_info.dir_name);
++unfinished_deleting_parts;
}
catch (...)
{
tryLogCurrentException(log);
}
}
if (!getSettings()->merge_tree_enable_clear_old_broken_detached)
return unfinished_deleting_parts;
const auto full_path = fs::path(relative_data_path) / "detached";
size_t removed_count = 0;
for (const auto & part_info : detached_parts)
{
if (!part_info.valid_name || part_info.prefix.empty())
@ -2635,31 +2677,24 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
if (!can_be_removed_by_timeout)
continue;
time_t current_time = time(nullptr);
ssize_t threshold = current_time - getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
time_t last_change_time = part_info.disk->getLastChanged(path);
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
time_t last_touch_time = std::max(last_change_time, last_modification_time);
ssize_t threshold = current_time - ttl_seconds;
time_t last_touch_time = get_last_touched_time(part_info);
if (last_touch_time == 0 || last_touch_time >= threshold)
continue;
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name, part_info.disk);
}
const String & old_name = part_info.dir_name;
String new_name = "deleting_" + part_info.dir_name;
part_info.disk->moveFile(fs::path(full_path) / old_name, fs::path(full_path) / new_name);
LOG_INFO(log, "Will clean up {} detached parts", renamed_parts.old_and_new_names.size());
renamed_parts.tryRenameAll();
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_WARNING(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
old_name.clear();
++removed_count;
}
return renamed_parts.old_and_new_names.size();
LOG_INFO(log, "Cleaned up {} detached parts", removed_count);
return removed_count + unfinished_deleting_parts;
}
size_t MergeTreeData::clearOldWriteAheadLogs()
@ -4035,7 +4070,7 @@ void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLo
void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part_to_detach)
{
LOG_INFO(log, "Cloning part {} to unexpected_{} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr());
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
DataPartsLock lock = lockParts();
part_to_detach->is_unexpected_local_part = true;
@ -5797,18 +5832,21 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
{
const String source_dir = "detached/";
std::map<String, DiskPtr> name_to_disk;
/// Let's compose a list of parts that should be added.
if (attach_part)
{
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
validateDetachedPartName(part_id);
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_id))
{
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
"probably it's being detached right now", part_id);
}
else
{
auto disk = getDiskForDetachedPart(part_id);
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
if (MergeTreePartInfo::tryParsePartName(part_id, format_version))
name_to_disk[part_id] = getDiskForDetachedPart(part_id);
}
}
else
{
@ -5825,6 +5863,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_info : detached_parts)
{
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_info.dir_name))
{
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
"probably it's being detached right now", part_info.dir_name);
continue;
}
LOG_DEBUG(log, "Found part {}", part_info.dir_name);
active_parts.add(part_info.dir_name);
}
@ -5835,6 +5879,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
for (const auto & part_info : detached_parts)
{
const String containing_part = active_parts.getContainingPart(part_info.dir_name);
if (containing_part.empty())
continue;
LOG_DEBUG(log, "Found containing part {} for part {}", containing_part, part_info.dir_name);
@ -8435,7 +8481,7 @@ void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
}
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createEmptyPart(
MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name,
const MergeTreeTransactionPtr & txn)
{
@ -8454,6 +8500,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
ReservationPtr reservation = reserveSpacePreferringTTLRules(metadata_snapshot, 0, move_ttl_infos, time(nullptr), 0, true);
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
auto tmp_dir_holder = getTemporaryPartDirectoryHolder(EMPTY_PART_TMP_PREFIX + new_part_name);
auto new_data_part = getDataPartBuilder(new_part_name, data_part_volume, EMPTY_PART_TMP_PREFIX + new_part_name)
.withBytesAndRowsOnDisk(0, 0)
.withPartInfo(new_part_info)
@ -8513,7 +8560,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
out.finalizePart(new_data_part, sync_on_insert);
new_data_part_storage->precommitTransaction();
return new_data_part;
return std::make_pair(std::move(new_data_part), std::move(tmp_dir_holder));
}
bool MergeTreeData::allowRemoveStaleMovingParts() const

View File

@ -936,7 +936,9 @@ public:
WriteAheadLogPtr getWriteAheadLog();
constexpr static auto EMPTY_PART_TMP_PREFIX = "tmp_empty_";
MergeTreeData::MutableDataPartPtr createEmptyPart(MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name, const MergeTreeTransactionPtr & txn);
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> createEmptyPart(
MergeTreePartInfo & new_part_info, const MergeTreePartition & partition,
const String & new_part_name, const MergeTreeTransactionPtr & txn);
MergeTreeDataFormatVersion format_version;

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NOT_IMPLEMENTED;
}
MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
@ -138,8 +139,12 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
return new_data_part_storage;
}
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix,
const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const
{
if (disk_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "InMemory parts are not compatible with disk transactions");
String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false);
return flushToDisk(detached_path, metadata_snapshot);
}

View File

@ -42,7 +42,8 @@ public:
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const override;
std::optional<time_t> getColumnModificationTime(const String & /* column_name */) const override { return {}; }
MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -149,7 +149,6 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
/// do it under share lock
cleaned_other += storage.clearOldWriteAheadLogs();
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
}

View File

@ -633,8 +633,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
delayed_chunk.reset();
}
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
template<>
bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
{
/// NOTE: No delay in this case. That's Ok.
auto origin_zookeeper = storage.getZooKeeper();
@ -649,8 +649,13 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData:
try
{
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
commitPart(zookeeper, part, BlockIDsType(), replicas_num, true);
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()));
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num, /* writing_existing_part */ true).second;
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
return deduplicated;
}
catch (...)
{

View File

@ -56,7 +56,7 @@ public:
String getName() const override { return "ReplicatedMergeTreeSink"; }
/// For ATTACHing existing data on filesystem.
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
/// For proper deduplication in MaterializedViews
bool lastBlockIsDuplicate() const override

View File

@ -691,7 +691,11 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
if (remote_storage_id.hasDatabase())
resolved_remote_storage_id = query_context->resolveStorageID(remote_storage_id);
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_snapshot->metadata->getColumns(), distributed_storage_snapshot->object_columns);
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
auto column_names_and_types = distributed_storage_snapshot->getColumns(get_column_options);
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, ColumnsDescription{column_names_and_types});
auto table_node = std::make_shared<TableNode>(std::move(storage), query_context);
if (table_expression_modifiers)

View File

@ -1379,7 +1379,6 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldWriteAheadLogs();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
if (getSettings()->merge_tree_enable_clear_old_broken_detached)
cleared_count += clearOldBrokenPartsFromDetachedDirectory();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
@ -1653,11 +1652,7 @@ struct FutureNewEmptyPart
MergeTreePartition partition;
std::string part_name;
scope_guard tmp_dir_guard;
StorageMergeTree::MutableDataPartPtr data_part;
std::string getDirName() const { return StorageMergeTree::EMPTY_PART_TMP_PREFIX + part_name; }
};
using FutureNewEmptyParts = std::vector<FutureNewEmptyPart>;
@ -1688,19 +1683,19 @@ FutureNewEmptyParts initCoverageWithNewEmptyParts(const DataPartsVector & old_pa
return future_parts;
}
StorageMergeTree::MutableDataPartsVector createEmptyDataParts(MergeTreeData & data, FutureNewEmptyParts & future_parts, const MergeTreeTransactionPtr & txn)
std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> createEmptyDataParts(
MergeTreeData & data, FutureNewEmptyParts & future_parts, const MergeTreeTransactionPtr & txn)
{
StorageMergeTree::MutableDataPartsVector data_parts;
std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> data_parts;
for (auto & part: future_parts)
data_parts.push_back(data.createEmptyPart(part.part_info, part.partition, part.part_name, txn));
{
auto [new_data_part, tmp_dir_holder] = data.createEmptyPart(part.part_info, part.partition, part.part_name, txn);
data_parts.first.emplace_back(std::move(new_data_part));
data_parts.second.emplace_back(std::move(tmp_dir_holder));
}
return data_parts;
}
void captureTmpDirectoryHolders(MergeTreeData & data, FutureNewEmptyParts & future_parts)
{
for (auto & part : future_parts)
part.tmp_dir_guard = data.getTemporaryPartDirectoryHolder(part.getDirName());
}
void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction)
{
@ -1767,9 +1762,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
transaction.getTID());
captureTmpDirectoryHolders(*this, future_parts);
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
renameAndCommitEmptyParts(new_data_parts, transaction);
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
@ -1817,8 +1810,10 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
if (detach)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
{
@ -1828,9 +1823,7 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames({part}), ", "),
transaction.getTID());
captureTmpDirectoryHolders(*this, future_parts);
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
renameAndCommitEmptyParts(new_data_parts, transaction);
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
@ -1902,8 +1895,10 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
for (const auto & part : parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
@ -1914,9 +1909,8 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
transaction.getTID());
captureTmpDirectoryHolders(*this, future_parts);
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
renameAndCommitEmptyParts(new_data_parts, transaction);
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
@ -1944,8 +1938,10 @@ void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool de
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
String part_dir = part->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}

View File

@ -2097,8 +2097,10 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
{
if (auto part_to_detach = part.getPartIfItWasActive())
{
LOG_INFO(log, "Detaching {}", part_to_detach->getDataPartStorage().getPartDirectory());
part_to_detach->makeCloneInDetached("", metadata_snapshot);
String part_dir = part_to_detach->getDataPartStorage().getPartDirectory();
LOG_INFO(log, "Detaching {}", part_dir);
auto holder = getTemporaryPartDirectoryHolder(String(DETACHED_DIR_NAME) + "/" + part_dir);
part_to_detach->makeCloneInDetached("", metadata_snapshot, /*disk_transaction*/ {});
}
}
}
@ -2828,7 +2830,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
for (const auto & part : parts_to_remove_from_working_set)
{
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("clone", metadata_snapshot);
part->makeCloneInDetached("clone", metadata_snapshot, /*disk_transaction*/ {});
}
}
@ -3794,12 +3796,12 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
chassert(!broken_part);
chassert(!storage_init);
part->was_removed_as_broken = true;
part->makeCloneInDetached("broken", getInMemoryMetadataPtr());
part->makeCloneInDetached("broken", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
broken_part = part;
}
else
{
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr());
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
}
detached_parts.push_back(part->name);
}
@ -6133,8 +6135,9 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, false, query_context,
/*is_attach*/true);
ReplicatedMergeTreeSink output(*this, metadata_snapshot, /* quorum */ 0, /* quorum_timeout_ms */ 0, /* max_parts_per_block */ 0,
/* quorum_parallel */ false, query_context->getSettingsRef().insert_deduplicate,
/* majority_quorum */ false, query_context, /*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
@ -9509,7 +9512,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
}
}
MergeTreeData::MutableDataPartPtr new_data_part = createEmptyPart(new_part_info, partition, lost_part_name, NO_TRANSACTION_PTR);
auto [new_data_part, tmp_dir_holder] = createEmptyPart(new_part_info, partition, lost_part_name, NO_TRANSACTION_PTR);
new_data_part->setName(lost_part_name);
try

View File

@ -5,24 +5,6 @@ test_distributed_ddl/test.py::test_default_database[configs_secure]
test_distributed_ddl/test.py::test_on_server_fail[configs]
test_distributed_ddl/test.py::test_on_server_fail[configs_secure]
test_distributed_insert_backward_compatibility/test.py::test_distributed_in_tuple
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_per_user_inline_settings_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_per_user_protocol_settings_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_user_insecure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_from_backward[pass-foo]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[default-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[nopass-]
test_distributed_inter_server_secret/test.py::test_user_secure_cluster_with_backward[pass-foo]
test_distributed_load_balancing/test.py::test_distributed_replica_max_ignored_errors
test_distributed_load_balancing/test.py::test_load_balancing_default
test_distributed_load_balancing/test.py::test_load_balancing_priority_round_robin[dist_priority]
@ -96,22 +78,6 @@ test_executable_table_function/test.py::test_executable_function_input_python
test_settings_profile/test.py::test_show_profiles
test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functions_on_cluster
test_postgresql_protocol/test.py::test_python_client
test_quota/test.py::test_add_remove_interval
test_quota/test.py::test_add_remove_quota
test_quota/test.py::test_consumption_of_show_clusters
test_quota/test.py::test_consumption_of_show_databases
test_quota/test.py::test_consumption_of_show_privileges
test_quota/test.py::test_consumption_of_show_processlist
test_quota/test.py::test_consumption_of_show_tables
test_quota/test.py::test_dcl_introspection
test_quota/test.py::test_dcl_management
test_quota/test.py::test_exceed_quota
test_quota/test.py::test_query_inserts
test_quota/test.py::test_quota_from_users_xml
test_quota/test.py::test_reload_users_xml_by_timer
test_quota/test.py::test_simpliest_quota
test_quota/test.py::test_tracking_quota
test_quota/test.py::test_users_xml_is_readonly
test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database
test_profile_events_s3/test.py::test_profile_events
test_user_defined_object_persistence/test.py::test_persistence
@ -121,22 +87,6 @@ test_select_access_rights/test_main.py::test_alias_columns
test_select_access_rights/test_main.py::test_select_count
test_select_access_rights/test_main.py::test_select_join
test_postgresql_protocol/test.py::test_python_client
test_quota/test.py::test_add_remove_interval
test_quota/test.py::test_add_remove_quota
test_quota/test.py::test_consumption_of_show_clusters
test_quota/test.py::test_consumption_of_show_databases
test_quota/test.py::test_consumption_of_show_privileges
test_quota/test.py::test_consumption_of_show_processlist
test_quota/test.py::test_consumption_of_show_tables
test_quota/test.py::test_dcl_introspection
test_quota/test.py::test_dcl_management
test_quota/test.py::test_exceed_quota
test_quota/test.py::test_query_inserts
test_quota/test.py::test_quota_from_users_xml
test_quota/test.py::test_reload_users_xml_by_timer
test_quota/test.py::test_simpliest_quota
test_quota/test.py::test_tracking_quota
test_quota/test.py::test_users_xml_is_readonly
test_replicating_constants/test.py::test_different_versions
test_merge_tree_s3/test.py::test_heavy_insert_select_check_memory[node]
test_wrong_db_or_table_name/test.py::test_wrong_table_name

View File

@ -145,6 +145,8 @@ def main():
ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
)
if ci_logs_host != "CLICKHOUSE_CI_LOGS_HOST":
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}' '{main_log_path}'",
shell=True,

View File

@ -394,6 +394,7 @@ def main():
ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
)
if ci_logs_host != "CLICKHOUSE_CI_LOGS_HOST":
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True,

View File

@ -50,8 +50,19 @@ def prepare_test_scripts():
server_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
test_env='TEST_THE_DEFAULT_PARAMETER=15'
echo "$test_env" >> /etc/default/clickhouse
systemctl start clickhouse-server
clickhouse-client -q 'SELECT version()'"""
clickhouse-client -q 'SELECT version()'
grep "$test_env" /proc/$(cat /var/run/clickhouse-server/clickhouse-server.pid)/environ"""
initd_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
test_env='TEST_THE_DEFAULT_PARAMETER=15'
echo "$test_env" >> /etc/default/clickhouse
/etc/init.d/clickhouse-server start
clickhouse-client -q 'SELECT version()'
grep "$test_env" /proc/$(cat /var/run/clickhouse-server/clickhouse-server.pid)/environ"""
keeper_test = r"""#!/bin/bash
set -e
trap "bash -ex /packages/preserve_logs.sh" ERR
@ -102,6 +113,7 @@ chmod a+rw -R /tests_logs
exit 1
"""
(TEMP_PATH / "server_test.sh").write_text(server_test, encoding="utf-8")
(TEMP_PATH / "initd_test.sh").write_text(initd_test, encoding="utf-8")
(TEMP_PATH / "keeper_test.sh").write_text(keeper_test, encoding="utf-8")
(TEMP_PATH / "binary_test.sh").write_text(binary_test, encoding="utf-8")
(TEMP_PATH / "preserve_logs.sh").write_text(preserve_logs, encoding="utf-8")
@ -112,6 +124,9 @@ def test_install_deb(image: DockerImage) -> TestResults:
"Install server deb": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-{server,client,common}*deb
bash -ex /packages/server_test.sh""",
"Run server init.d": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-{server,client,common}*deb
bash -ex /packages/initd_test.sh""",
"Install keeper deb": r"""#!/bin/bash -ex
apt-get install /packages/clickhouse-keeper*deb
bash -ex /packages/keeper_test.sh""",
@ -191,6 +206,9 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
retcode = process.wait()
if retcode == 0:
status = OK
subprocess.check_call(
f"docker kill -s 9 {container_id}", shell=True
)
break
status = FAIL
@ -198,8 +216,8 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
archive_path = TEMP_PATH / f"{container_name}-{retry}.tar.gz"
compress_fast(LOGS_PATH, archive_path)
logs.append(archive_path)
subprocess.check_call(f"docker kill -s 9 {container_id}", shell=True)
test_results.append(TestResult(name, status, stopwatch.duration_seconds, logs))
return test_results
@ -276,7 +294,7 @@ def main():
sys.exit(0)
docker_images = {
name: get_image_with_version(REPORTS_PATH, name)
name: get_image_with_version(REPORTS_PATH, name, args.download)
for name in (RPM_IMAGE, DEB_IMAGE)
}
prepare_test_scripts()
@ -293,6 +311,8 @@ def main():
is_match = is_match or path.endswith(".rpm")
if args.tgz:
is_match = is_match or path.endswith(".tgz")
# We don't need debug packages, so let's filter them out
is_match = is_match and "-dbg" not in path
return is_match
download_builds_filter(

View File

@ -279,7 +279,7 @@ class PRInfo:
"user_orgs": self.user_orgs,
}
def has_changes_in_documentation(self):
def has_changes_in_documentation(self) -> bool:
# If the list wasn't built yet the best we can do is to
# assume that there were changes.
if self.changed_files is None or not self.changed_files:
@ -287,10 +287,9 @@ class PRInfo:
for f in self.changed_files:
_, ext = os.path.splitext(f)
path_in_docs = "docs" in f
path_in_website = "website" in f
path_in_docs = f.startswith("docs/")
if (
ext in DIFF_IN_DOCUMENTATION_EXT and (path_in_docs or path_in_website)
ext in DIFF_IN_DOCUMENTATION_EXT and path_in_docs
) or "docker/docs" in f:
return True
return False

View File

@ -137,17 +137,20 @@ def main():
if pr_labels_to_remove:
remove_labels(gh, pr_info, pr_labels_to_remove)
if FEATURE_LABEL in pr_info.labels:
print(f"The '{FEATURE_LABEL}' in the labels, expect the 'Docs Check' status")
if FEATURE_LABEL in pr_info.labels and not pr_info.has_changes_in_documentation():
print(
f"The '{FEATURE_LABEL}' in the labels, "
"but there's no changed documentation"
)
post_commit_status( # do not pass pr_info here intentionally
commit,
"pending",
"failure",
NotSet,
f"expect adding docs for {FEATURE_LABEL}",
DOCS_NAME,
pr_info,
)
elif not description_error:
set_mergeable_check(commit, "skipped")
sys.exit(1)
if description_error:
print(
@ -173,6 +176,7 @@ def main():
)
sys.exit(1)
set_mergeable_check(commit, "skipped")
ci_report_url = create_ci_report(pr_info, [])
if not can_run:
print("::notice ::Cannot run")

View File

@ -209,6 +209,7 @@ def run_stress_test(docker_image_name):
ci_logs_password = os.getenv(
"CLICKHOUSE_CI_LOGS_PASSWORD", "CLICKHOUSE_CI_LOGS_PASSWORD"
)
if ci_logs_host != "CLICKHOUSE_CI_LOGS_HOST":
subprocess.check_call(
f"sed -i -r -e 's!{ci_logs_host}!CLICKHOUSE_CI_LOGS_HOST!g; s!{ci_logs_password}!CLICKHOUSE_CI_LOGS_PASSWORD!g;' '{run_log_path}'",
shell=True,

View File

@ -64,6 +64,7 @@ NEED_RERUN_WORKFLOWS = {
"DocsCheck",
"MasterCI",
"NightlyBuilds",
"PublishedReleaseCI",
"PullRequestCI",
"ReleaseBranchCI",
}

View File

@ -2152,7 +2152,7 @@ def reportLogStats(args):
print("\n")
query = """
SELECT message_format_string, count(), substr(any(message), 1, 120) AS any_message
SELECT message_format_string, count(), any(message) AS any_message
FROM system.text_log
WHERE (now() - toIntervalMinute(240)) < event_time
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))

View File

@ -91,5 +91,6 @@
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_http_named_session",
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_grpc",
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_tcp_and_others",
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query"
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query",
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_client_suggestions_load"
]

View File

@ -57,25 +57,28 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
]
)
node.exec_in_container(["mkdir", f"{path_to_detached}../unexpected_all_42_1337_5"])
for name in [
"unexpected_all_42_1337_5",
"deleting_all_123_456_7",
"covered-by-broken_all_12_34_5",
]:
node.exec_in_container(["mkdir", f"{path_to_detached}../{name}"])
node.exec_in_container(
[
"touch",
"-t",
"1312031429.30",
f"{path_to_detached}../unexpected_all_42_1337_5",
f"{path_to_detached}../{name}",
]
)
result = node.exec_in_container(
["stat", f"{path_to_detached}../unexpected_all_42_1337_5"]
)
result = node.exec_in_container(["stat", f"{path_to_detached}../{name}"])
print(result)
assert "Modify: 2013-12-03" in result
node.exec_in_container(
[
"mv",
f"{path_to_detached}../unexpected_all_42_1337_5",
f"{path_to_detached}unexpected_all_42_1337_5",
f"{path_to_detached}../{name}",
f"{path_to_detached}{name}",
]
)
@ -87,17 +90,20 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
node.query(f"DETACH TABLE {table}")
node.query(f"ATTACH TABLE {table}")
result = node.exec_in_container(["ls", path_to_detached])
print(result)
assert f"{expect_broken_prefix}_all_3_3_0" in result
assert "all_1_1_0" in result
assert "trash" in result
assert "broken_all_fake" in result
assert "unexpected_all_42_1337_5" in result
time.sleep(15)
assert node.contains_in_log(
"Removed broken detached part unexpected_all_42_1337_5 due to a timeout"
node.wait_for_log_line(
"Removing detached part deleting_all_123_456_7",
timeout=90,
look_behind_lines=1000000,
)
node.wait_for_log_line(
f"Removed broken detached part {expect_broken_prefix}_all_3_3_0 due to a timeout",
timeout=10,
look_behind_lines=1000000,
)
node.wait_for_log_line(
"Removed broken detached part unexpected_all_42_1337_5 due to a timeout",
timeout=10,
look_behind_lines=1000000,
)
result = node.exec_in_container(["ls", path_to_detached])
@ -106,7 +112,16 @@ def remove_broken_detached_part_impl(table, node, expect_broken_prefix):
assert "all_1_1_0" in result
assert "trash" in result
assert "broken_all_fake" in result
assert "covered-by-broken_all_12_34_5" in result
assert "unexpected_all_42_1337_5" not in result
assert "deleting_all_123_456_7" not in result
node.query(
f"ALTER TABLE {table} DROP DETACHED PART 'covered-by-broken_all_12_34_5'",
settings={"allow_drop_detached": 1},
)
result = node.exec_in_container(["ls", path_to_detached])
assert "covered-by-broken_all_12_34_5" not in result
node.query(f"DROP TABLE {table} SYNC")

View File

@ -110,10 +110,6 @@ def start_cluster():
cluster.shutdown()
def query_with_id(node, id_, query, **kwargs):
return node.query("WITH '{}' AS __id {}".format(id_, query), **kwargs)
# @return -- [user, initial_user]
def get_query_user_info(node, query_pattern):
node.query("SYSTEM FLUSH LOGS")
@ -334,7 +330,7 @@ def test_secure_disagree_insert():
@users
def test_user_insecure_cluster(user, password):
id_ = "query-dist_insecure-" + user
query_with_id(n1, id_, "SELECT * FROM dist_insecure", user=user, password=password)
n1.query(f"SELECT *, '{id_}' FROM dist_insecure", user=user, password=password)
assert get_query_user_info(n1, id_) == [
user,
user,
@ -345,7 +341,7 @@ def test_user_insecure_cluster(user, password):
@users
def test_user_secure_cluster(user, password):
id_ = "query-dist_secure-" + user
query_with_id(n1, id_, "SELECT * FROM dist_secure", user=user, password=password)
n1.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user]
@ -353,11 +349,9 @@ def test_user_secure_cluster(user, password):
@users
def test_per_user_inline_settings_insecure_cluster(user, password):
id_ = "query-ddl-settings-dist_insecure-" + user
query_with_id(
n1,
id_,
"""
SELECT * FROM dist_insecure
n1.query(
f"""
SELECT *, '{id_}' FROM dist_insecure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
@ -372,11 +366,9 @@ def test_per_user_inline_settings_insecure_cluster(user, password):
@users
def test_per_user_inline_settings_secure_cluster(user, password):
id_ = "query-ddl-settings-dist_secure-" + user
query_with_id(
n1,
id_,
"""
SELECT * FROM dist_secure
n1.query(
f"""
SELECT *, '{id_}' FROM dist_secure
SETTINGS
prefer_localhost_replica=0,
max_memory_usage_for_user=1e9,
@ -393,10 +385,8 @@ def test_per_user_inline_settings_secure_cluster(user, password):
@users
def test_per_user_protocol_settings_insecure_cluster(user, password):
id_ = "query-protocol-settings-dist_insecure-" + user
query_with_id(
n1,
id_,
"SELECT * FROM dist_insecure",
n1.query(
f"SELECT *, '{id_}' FROM dist_insecure",
user=user,
password=password,
settings={
@ -411,10 +401,8 @@ def test_per_user_protocol_settings_insecure_cluster(user, password):
@users
def test_per_user_protocol_settings_secure_cluster(user, password):
id_ = "query-protocol-settings-dist_secure-" + user
query_with_id(
n1,
id_,
"SELECT * FROM dist_secure",
n1.query(
f"SELECT *, '{id_}' FROM dist_secure",
user=user,
password=password,
settings={
@ -431,8 +419,8 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
@users
def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user
query_with_id(
n1, id_, "SELECT * FROM dist_secure_backward", user=user, password=password
n1.query(
f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
@ -441,13 +429,7 @@ def test_user_secure_cluster_with_backward(user, password):
@users
def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user
query_with_id(
backward,
id_,
"SELECT * FROM dist_secure_backward",
user=user,
password=password,
)
backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]

View File

@ -10,6 +10,7 @@ import threading
from helpers.cluster import ClickHouseCluster, run_and_check
from helpers.test_tools import assert_logs_contain_with_retry
from helpers.uclient import client, prompt
MAX_SESSIONS_FOR_USER = 2
POSTGRES_SERVER_PORT = 5433
@ -209,3 +210,36 @@ def test_profile_max_sessions_for_user_tcp_and_others(started_cluster):
def test_profile_max_sessions_for_user_setting_in_query(started_cluster):
instance.query_and_get_error("SET max_sessions_for_user = 10")
def test_profile_max_sessions_for_user_client_suggestions_connection(started_cluster):
command_text = f"{started_cluster.get_client_cmd()} --host {instance.ip_address} --port 9000 -u {TEST_USER} --password {TEST_PASSWORD}"
command_text_without_suggestions = command_text + " --disable_suggestion"
# Launch client1 without suggestions to avoid a race condition:
# Client1 opens a session.
# Client1 opens a session for suggestion connection.
# Client2 fails to open a session and gets the USER_SESSION_LIMIT_EXCEEDED error.
#
# Expected order:
# Client1 opens a session.
# Client2 opens a session.
# Client2 fails to open a session for suggestions and with USER_SESSION_LIMIT_EXCEEDED (No error printed).
# Client3 fails to open a session.
# Client1 executes the query.
# Client2 loads suggestions from the server using the main connection and executes a query.
with client(
name="client1>", log=None, command=command_text_without_suggestions
) as client1:
client1.expect(prompt)
with client(name="client2>", log=None, command=command_text) as client2:
client2.expect(prompt)
with client(name="client3>", log=None, command=command_text) as client3:
client3.expect("USER_SESSION_LIMIT_EXCEEDED")
client1.send("SELECT 'CLIENT_1_SELECT' FORMAT CSV")
client1.expect("CLIENT_1_SELECT")
client1.expect(prompt)
client2.send("SELECT 'CLIENT_2_SELECT' FORMAT CSV")
client2.expect("CLIENT_2_SELECT")
client2.expect(prompt)

View File

@ -9,10 +9,10 @@ create view logs as select * from system.text_log where now() - toIntervalMinute
-- Check that we don't have too many messages formatted with fmt::runtime or strings concatenation.
-- 0.001 threshold should be always enough, the value was about 0.00025
select 'runtime messages', max2(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.001) from logs;
select 'runtime messages', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.001) from logs;
-- Check the same for exceptions. The value was 0.03
select 'runtime exceptions', max2(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.05) from logs where message like '%DB::Exception%';
select 'runtime exceptions', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.05) from logs where message like '%DB::Exception%';
-- FIXME some of the following messages are not informative and it has to be fixed
create temporary table known_short_messages (s String) as select * from (select
@ -36,7 +36,7 @@ create temporary table known_short_messages (s String) as select * from (select
'Database {} does not exist', 'Dictionary ({}) not found', 'Unknown table function {}',
'Unknown format {}', 'Unknown explain kind ''{}''', 'Unknown setting {}', 'Unknown input format {}',
'Unknown identifier: ''{}''', 'User name is empty', 'Expected function, got: {}',
'Attempt to read after eof', 'String size is too big ({}), maximum: {}', 'API mode: {}',
'Attempt to read after eof', 'String size is too big ({}), maximum: {}',
'Processed: {}%', 'Creating {}: {}', 'Table {}.{} doesn''t exist', 'Invalid cache key hex: {}',
'User has been dropped', 'Illegal type {} of argument of function {}. Should be DateTime or DateTime64'
] as arr) array join arr;

View File

@ -11,26 +11,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
local query="$1" && shift
local retry=0
until [ $retry -ge 5 ]
do
local result
result="$($CLICKHOUSE_CLIENT "$@" --query="$query" 2>&1)"
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$((retry + 1))
sleep 3
fi
done
echo "Query '$query' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r1;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst_r2;"

View File

@ -5,22 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl1"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS ttl_repl2"

View File

@ -7,23 +7,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS src;"
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS dst;"

View File

@ -118,7 +118,6 @@ ExpressionTransform
MergingAggregatedBucketTransform × 4
Resize 1 → 4
GroupingAggregatedTransform 3 → 1
(Union)
(ReadFromRemoteParallelReplicas)
select a, count() from pr_t group by a order by a limit 5 offset 500;
500 1000

View File

@ -0,0 +1,4 @@
default begin inserts
default end inserts
20 210
20 210

View File

@ -0,0 +1,76 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a;
CREATE TABLE alter_table1 (a UInt8, b Int16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a;
" || exit 1
function thread_detach()
{
while true; do
$CLICKHOUSE_CLIENT -mn -q "ALTER TABLE alter_table$(($RANDOM % 2)) DETACH PARTITION ID 'all'; SELECT sleep($RANDOM / 32000) format Null;" 2>/dev/null ||:
done
}
function thread_attach()
{
while true; do
$CLICKHOUSE_CLIENT -mn -q "ALTER TABLE alter_table$(($RANDOM % 2)) ATTACH PARTITION ID 'all'; SELECT sleep($RANDOM / 32000) format Null;" 2>/dev/null ||:
done
}
function insert()
{
$CLICKHOUSE_CLIENT -q "INSERT INTO alter_table$(($RANDOM % 2)) SELECT $RANDOM, $i" 2>/dev/null
}
thread_detach & PID_1=$!
thread_attach & PID_2=$!
thread_detach & PID_3=$!
thread_attach & PID_4=$!
function do_inserts()
{
for i in {1..20}; do
while ! insert; do $CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'retrying insert $i' FORMAT Null"; done
done
}
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'begin inserts'"
do_inserts 2>&1| grep -Fa "Exception: " | grep -Fv "was cancelled by concurrent ALTER PARTITION"
$CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'end inserts'"
kill -TERM $PID_1 && kill -TERM $PID_2 && kill -TERM $PID_3 && kill -TERM $PID_4
wait
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
query_with_retry "ALTER TABLE alter_table0 ATTACH PARTITION ID 'all'" 2>/dev/null;
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table1 ATTACH PARTITION ID 'all'" 2>/dev/null
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
$CLICKHOUSE_CLIENT -q "ALTER TABLE alter_table1 ATTACH PARTITION ID 'all'"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
engine=$($CLICKHOUSE_CLIENT -q "SELECT engine FROM system.tables WHERE database=currentDatabase() AND table='alter_table0'")
if [[ "$engine" == "ReplicatedMergeTree" ]]; then
# ReplicatedMergeTree may duplicate data on ATTACH PARTITION (when one replica has a merged part and another replica has source parts only)
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE alter_table0 FINAL DEDUPLICATE"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1"
fi
$CLICKHOUSE_CLIENT -q "SELECT count(), sum(b) FROM alter_table0"
$CLICKHOUSE_CLIENT -q "SELECT count(), sum(b) FROM alter_table1"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table0"
$CLICKHOUSE_CLIENT -q "DROP TABLE alter_table1"

View File

@ -5,23 +5,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query_with_retry
{
retry=0
until [ $retry -ge 5 ]
do
result=$($CLICKHOUSE_CLIENT $2 --query="$1" 2>&1)
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$(($retry + 1))
sleep 3
fi
done
echo "Query '$1' failed with '$result'"
}
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS load_parts_refcounts SYNC;

View File

@ -0,0 +1,5 @@
drop table if exists data_01072;
drop table if exists dist_01072;
create table data_01072 (key Int) Engine=MergeTree() ORDER BY key;
create table dist_01072 (key Int) Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01072, key);
select * from dist_01072 where key=0 and _part='0';

View File

@ -155,3 +155,23 @@ function random_str()
local n=$1 && shift
tr -cd '[:lower:]' < /dev/urandom | head -c"$n"
}
function query_with_retry
{
local query="$1" && shift
local retry=0
until [ $retry -ge 5 ]
do
local result
result="$($CLICKHOUSE_CLIENT "$@" --query="$query" 2>&1)"
if [ "$?" == 0 ]; then
echo -n "$result"
return
else
retry=$((retry + 1))
sleep 3
fi
done
echo "Query '$query' failed with '$result'"
}