mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge remote-tracking branch 'origin/master' into fix-parallel-replicas-multiply-result
This commit is contained in:
commit
1678cda8b3
@ -11,7 +11,7 @@ Inserts data into a table.
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
|
||||
```
|
||||
|
||||
You can specify a list of columns to insert using the `(c1, c2, c3)`. You can also use an expression with column [matcher](../../sql-reference/statements/select/index.md#asterisk) such as `*` and/or [modifiers](../../sql-reference/statements/select/index.md#select-modifiers) such as [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
|
||||
@ -107,7 +107,7 @@ If table has [constraints](../../sql-reference/statements/create/table.md#constr
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
|
||||
```
|
||||
|
||||
Columns are mapped according to their position in the SELECT clause. However, their names in the SELECT expression and the table for INSERT may differ. If necessary, type casting is performed.
|
||||
@ -126,7 +126,7 @@ To insert a default value instead of `NULL` into a column with not nullable data
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
|
||||
```
|
||||
|
||||
Use the syntax above to insert data from a file, or files, stored on the **client** side. `file_name` and `type` are string literals. Input file [format](../../interfaces/formats.md) must be set in the `FORMAT` clause.
|
||||
|
@ -11,7 +11,7 @@ sidebar_label: INSERT INTO
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
|
||||
```
|
||||
|
||||
Вы можете указать список столбцов для вставки, используя синтаксис `(c1, c2, c3)`. Также можно использовать выражение cо [звездочкой](../../sql-reference/statements/select/index.md#asterisk) и/или модификаторами, такими как [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#except-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier).
|
||||
@ -100,7 +100,7 @@ INSERT INTO t FORMAT TabSeparated
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
|
||||
```
|
||||
|
||||
Соответствие столбцов определяется их позицией в секции SELECT. При этом, их имена в выражении SELECT и в таблице для INSERT, могут отличаться. При необходимости выполняется приведение типов данных, эквивалентное соответствующему оператору CAST.
|
||||
@ -120,7 +120,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
|
||||
```
|
||||
|
||||
Используйте этот синтаксис, чтобы вставить данные из файла, который хранится на стороне **клиента**. `file_name` и `type` задаются в виде строковых литералов. [Формат](../../interfaces/formats.md) входного файла должен быть задан в секции `FORMAT`.
|
||||
|
@ -8,7 +8,7 @@ INSERT INTO 语句主要用于向系统中添加数据.
|
||||
查询的基本格式:
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] VALUES (v11, v12, v13), (v21, v22, v23), ...
|
||||
```
|
||||
|
||||
您可以在查询中指定要插入的列的列表,如:`[(c1, c2, c3)]`。您还可以使用列[匹配器](../../sql-reference/statements/select/index.md#asterisk)的表达式,例如`*`和/或[修饰符](../../sql-reference/statements/select/index.md#select-modifiers),例如 [APPLY](../../sql-reference/statements/select/index.md#apply-modifier), [EXCEPT](../../sql-reference/statements/select/index.md#apply-modifier), [REPLACE](../../sql-reference/statements/select/index.md#replace-modifier)。
|
||||
@ -71,7 +71,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] FORMAT format_name data_set
|
||||
例如,下面的查询所使用的输入格式就与上面INSERT … VALUES的中使用的输入格式相同:
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
|
||||
```
|
||||
|
||||
ClickHouse会清除数据前所有的空白字符与一个换行符(如果有换行符的话)。所以在进行查询时,我们建议您将数据放入到输入输出格式名称后的新的一行中去(如果数据是以空白字符开始的,这将非常重要)。
|
||||
@ -93,7 +93,7 @@ INSERT INTO t FORMAT TabSeparated
|
||||
### 使用`SELECT`的结果写入 {#inserting-the-results-of-select}
|
||||
|
||||
``` sql
|
||||
INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
|
||||
INSERT INTO [TABLE] [db.]table [(c1, c2, c3)] SELECT ...
|
||||
```
|
||||
|
||||
写入与SELECT的列的对应关系是使用位置来进行对应的,尽管它们在SELECT表达式与INSERT中的名称可能是不同的。如果需要,会对它们执行对应的类型转换。
|
||||
|
@ -997,7 +997,9 @@ namespace
|
||||
{
|
||||
/// sudo respects limits in /etc/security/limits.conf e.g. open files,
|
||||
/// that's why we are using it instead of the 'clickhouse su' tool.
|
||||
command = fmt::format("sudo -u '{}' {}", user, command);
|
||||
/// by default, sudo resets all the ENV variables, but we should preserve
|
||||
/// the values /etc/default/clickhouse in /etc/init.d/clickhouse file
|
||||
command = fmt::format("sudo --preserve-env -u '{}' {}", user, command);
|
||||
}
|
||||
|
||||
fmt::print("Will run {}\n", command);
|
||||
|
@ -105,6 +105,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int FILE_ALREADY_EXISTS;
|
||||
extern const int USER_SESSION_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
}
|
||||
@ -2408,6 +2409,13 @@ void ClientBase::runInteractive()
|
||||
}
|
||||
}
|
||||
|
||||
if (suggest && suggest->getLastError() == ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
|
||||
{
|
||||
// If a separate connection loading suggestions failed to open a new session,
|
||||
// use the main session to receive them.
|
||||
suggest->load(*connection, connection_parameters.timeouts, config().getInt("suggestion_limit"));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (!processQueryText(input))
|
||||
|
@ -22,9 +22,11 @@ namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int OK;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
||||
extern const int DEADLOCK_AVOIDED;
|
||||
extern const int USER_SESSION_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
Suggest::Suggest()
|
||||
@ -121,21 +123,24 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
last_error = e.code();
|
||||
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
|
||||
continue;
|
||||
|
||||
/// Client can successfully connect to the server and
|
||||
/// get ErrorCodes::USER_SESSION_LIMIT_EXCEEDED for suggestion connection.
|
||||
|
||||
else if (e.code() != ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
|
||||
{
|
||||
/// We should not use std::cerr here, because this method works concurrently with the main thread.
|
||||
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
|
||||
|
||||
///
|
||||
/// USER_SESSION_LIMIT_EXCEEDED is ignored here. The client will try to receive
|
||||
/// suggestions using the main connection later.
|
||||
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
|
||||
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
out.next();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
last_error = getCurrentExceptionCode();
|
||||
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
|
||||
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
out.next();
|
||||
@ -148,6 +153,21 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
|
||||
});
|
||||
}
|
||||
|
||||
void Suggest::load(IServerConnection & connection,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
Int32 suggestion_limit)
|
||||
{
|
||||
try
|
||||
{
|
||||
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << "Suggestions loading exception: " << getCurrentExceptionMessage(false, true) << std::endl;
|
||||
last_error = getCurrentExceptionCode();
|
||||
}
|
||||
}
|
||||
|
||||
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
|
||||
{
|
||||
connection.sendQuery(
|
||||
@ -176,6 +196,7 @@ void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & t
|
||||
return;
|
||||
|
||||
case Protocol::Server::EndOfStream:
|
||||
last_error = ErrorCodes::OK;
|
||||
return;
|
||||
|
||||
default:
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Client/LocalConnection.h>
|
||||
#include <Client/LineReader.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
|
||||
|
||||
@ -28,9 +29,15 @@ public:
|
||||
template <typename ConnectionType>
|
||||
void load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
|
||||
|
||||
void load(IServerConnection & connection,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
Int32 suggestion_limit);
|
||||
|
||||
/// Older server versions cannot execute the query loading suggestions.
|
||||
static constexpr int MIN_SERVER_REVISION = DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED;
|
||||
|
||||
int getLastError() const { return last_error.load(); }
|
||||
|
||||
private:
|
||||
void fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
|
||||
|
||||
@ -38,6 +45,8 @@ private:
|
||||
|
||||
/// Words are fetched asynchronously.
|
||||
std::thread loading_thread;
|
||||
|
||||
std::atomic<int> last_error { -1 };
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ Client::Client(
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "API mode: {}", toString(api_mode));
|
||||
LOG_TRACE(log, "API mode of the S3 client: {}", api_mode);
|
||||
|
||||
detect_region = provider_type == ProviderType::AWS && explicit_region == Aws::Region::AWS_GLOBAL;
|
||||
|
||||
|
@ -60,9 +60,6 @@ public:
|
||||
/// (When there is a local replica with big delay).
|
||||
bool lazy = false;
|
||||
time_t local_delay = 0;
|
||||
|
||||
/// Set only if parallel reading from replicas is used.
|
||||
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator;
|
||||
};
|
||||
|
||||
using Shards = std::vector<Shard>;
|
||||
|
@ -28,7 +28,6 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace ClusterProxy
|
||||
@ -299,7 +298,6 @@ void executeQueryWithParallelReplicas(
|
||||
auto all_replicas_count = std::min(static_cast<size_t>(settings.max_parallel_replicas), new_cluster->getShardCount());
|
||||
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(all_replicas_count);
|
||||
auto remote_plan = std::make_unique<QueryPlan>();
|
||||
auto plans = std::vector<QueryPlanPtr>();
|
||||
|
||||
/// This is a little bit weird, but we construct an "empty" coordinator without
|
||||
/// any specified reading/coordination method (like Default, InOrder, InReverseOrder)
|
||||
@ -325,20 +323,7 @@ void executeQueryWithParallelReplicas(
|
||||
&Poco::Logger::get("ReadFromParallelRemoteReplicasStep"),
|
||||
query_info.storage_limits);
|
||||
|
||||
remote_plan->addStep(std::move(read_from_remote));
|
||||
remote_plan->addInterpreterContext(context);
|
||||
plans.emplace_back(std::move(remote_plan));
|
||||
|
||||
if (std::all_of(plans.begin(), plans.end(), [](const QueryPlanPtr & plan) { return !plan; }))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from shard. This is a bug");
|
||||
|
||||
DataStreams input_streams;
|
||||
input_streams.reserve(plans.size());
|
||||
for (const auto & plan : plans)
|
||||
input_streams.emplace_back(plan->getCurrentDataStream());
|
||||
|
||||
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
|
||||
query_plan.unitePlans(std::move(union_step), std::move(plans));
|
||||
query_plan.addStep(std::move(read_from_remote));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -299,6 +299,7 @@ Session::~Session()
|
||||
|
||||
if (notified_session_log_about_login)
|
||||
{
|
||||
LOG_DEBUG(log, "{} Logout, user_id: {}", toString(auth_id), toString(*user_id));
|
||||
if (auto session_log = getSessionLog())
|
||||
{
|
||||
/// TODO: We have to ensure that the same info is added to the session log on a LoginSuccess event and on the corresponding Logout event.
|
||||
@ -320,6 +321,7 @@ AuthenticationType Session::getAuthenticationTypeOrLogInFailure(const String & u
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
LOG_ERROR(log, "{} Authentication failed with error: {}", toString(auth_id), e.what());
|
||||
if (auto session_log = getSessionLog())
|
||||
session_log->addLoginFailure(auth_id, getClientInfo(), user_name, e);
|
||||
|
||||
|
@ -8435,7 +8435,7 @@ void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
|
||||
}
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createEmptyPart(
|
||||
MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name,
|
||||
const MergeTreeTransactionPtr & txn)
|
||||
{
|
||||
@ -8454,6 +8454,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
|
||||
ReservationPtr reservation = reserveSpacePreferringTTLRules(metadata_snapshot, 0, move_ttl_infos, time(nullptr), 0, true);
|
||||
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
|
||||
|
||||
auto tmp_dir_holder = getTemporaryPartDirectoryHolder(EMPTY_PART_TMP_PREFIX + new_part_name);
|
||||
auto new_data_part = getDataPartBuilder(new_part_name, data_part_volume, EMPTY_PART_TMP_PREFIX + new_part_name)
|
||||
.withBytesAndRowsOnDisk(0, 0)
|
||||
.withPartInfo(new_part_info)
|
||||
@ -8513,7 +8514,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
|
||||
out.finalizePart(new_data_part, sync_on_insert);
|
||||
|
||||
new_data_part_storage->precommitTransaction();
|
||||
return new_data_part;
|
||||
return std::make_pair(std::move(new_data_part), std::move(tmp_dir_holder));
|
||||
}
|
||||
|
||||
bool MergeTreeData::allowRemoveStaleMovingParts() const
|
||||
|
@ -936,7 +936,9 @@ public:
|
||||
WriteAheadLogPtr getWriteAheadLog();
|
||||
|
||||
constexpr static auto EMPTY_PART_TMP_PREFIX = "tmp_empty_";
|
||||
MergeTreeData::MutableDataPartPtr createEmptyPart(MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name, const MergeTreeTransactionPtr & txn);
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> createEmptyPart(
|
||||
MergeTreePartInfo & new_part_info, const MergeTreePartition & partition,
|
||||
const String & new_part_name, const MergeTreeTransactionPtr & txn);
|
||||
|
||||
MergeTreeDataFormatVersion format_version;
|
||||
|
||||
|
@ -691,7 +691,11 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
|
||||
if (remote_storage_id.hasDatabase())
|
||||
resolved_remote_storage_id = query_context->resolveStorageID(remote_storage_id);
|
||||
|
||||
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_snapshot->metadata->getColumns(), distributed_storage_snapshot->object_columns);
|
||||
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
|
||||
|
||||
auto column_names_and_types = distributed_storage_snapshot->getColumns(get_column_options);
|
||||
|
||||
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, ColumnsDescription{column_names_and_types});
|
||||
auto table_node = std::make_shared<TableNode>(std::move(storage), query_context);
|
||||
|
||||
if (table_expression_modifiers)
|
||||
|
@ -1664,11 +1664,7 @@ struct FutureNewEmptyPart
|
||||
MergeTreePartition partition;
|
||||
std::string part_name;
|
||||
|
||||
scope_guard tmp_dir_guard;
|
||||
|
||||
StorageMergeTree::MutableDataPartPtr data_part;
|
||||
|
||||
std::string getDirName() const { return StorageMergeTree::EMPTY_PART_TMP_PREFIX + part_name; }
|
||||
};
|
||||
|
||||
using FutureNewEmptyParts = std::vector<FutureNewEmptyPart>;
|
||||
@ -1699,19 +1695,19 @@ FutureNewEmptyParts initCoverageWithNewEmptyParts(const DataPartsVector & old_pa
|
||||
return future_parts;
|
||||
}
|
||||
|
||||
StorageMergeTree::MutableDataPartsVector createEmptyDataParts(MergeTreeData & data, FutureNewEmptyParts & future_parts, const MergeTreeTransactionPtr & txn)
|
||||
std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> createEmptyDataParts(
|
||||
MergeTreeData & data, FutureNewEmptyParts & future_parts, const MergeTreeTransactionPtr & txn)
|
||||
{
|
||||
StorageMergeTree::MutableDataPartsVector data_parts;
|
||||
std::pair<StorageMergeTree::MutableDataPartsVector, std::vector<scope_guard>> data_parts;
|
||||
for (auto & part: future_parts)
|
||||
data_parts.push_back(data.createEmptyPart(part.part_info, part.partition, part.part_name, txn));
|
||||
{
|
||||
auto [new_data_part, tmp_dir_holder] = data.createEmptyPart(part.part_info, part.partition, part.part_name, txn);
|
||||
data_parts.first.emplace_back(std::move(new_data_part));
|
||||
data_parts.second.emplace_back(std::move(tmp_dir_holder));
|
||||
}
|
||||
return data_parts;
|
||||
}
|
||||
|
||||
void captureTmpDirectoryHolders(MergeTreeData & data, FutureNewEmptyParts & future_parts)
|
||||
{
|
||||
for (auto & part : future_parts)
|
||||
part.tmp_dir_guard = data.getTemporaryPartDirectoryHolder(part.getDirName());
|
||||
}
|
||||
|
||||
void StorageMergeTree::renameAndCommitEmptyParts(MutableDataPartsVector & new_parts, Transaction & transaction)
|
||||
{
|
||||
@ -1778,9 +1774,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
|
||||
@ -1839,9 +1833,7 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames({part}), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
|
||||
@ -1925,9 +1917,8 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
|
||||
fmt::join(getPartsNames(future_parts), ", "), fmt::join(getPartsNames(parts), ", "),
|
||||
transaction.getTID());
|
||||
|
||||
captureTmpDirectoryHolders(*this, future_parts);
|
||||
|
||||
auto new_data_parts = createEmptyDataParts(*this, future_parts, txn);
|
||||
auto [new_data_parts, tmp_dir_holders] = createEmptyDataParts(*this, future_parts, txn);
|
||||
renameAndCommitEmptyParts(new_data_parts, transaction);
|
||||
|
||||
PartLog::addNewParts(query_context, PartLog::createPartLogEntries(new_data_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
|
||||
|
@ -9520,7 +9520,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
|
||||
}
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part = createEmptyPart(new_part_info, partition, lost_part_name, NO_TRANSACTION_PTR);
|
||||
auto [new_data_part, tmp_dir_holder] = createEmptyPart(new_part_info, partition, lost_part_name, NO_TRANSACTION_PTR);
|
||||
new_data_part->setName(lost_part_name);
|
||||
|
||||
try
|
||||
|
@ -279,7 +279,7 @@ class PRInfo:
|
||||
"user_orgs": self.user_orgs,
|
||||
}
|
||||
|
||||
def has_changes_in_documentation(self):
|
||||
def has_changes_in_documentation(self) -> bool:
|
||||
# If the list wasn't built yet the best we can do is to
|
||||
# assume that there were changes.
|
||||
if self.changed_files is None or not self.changed_files:
|
||||
@ -287,10 +287,9 @@ class PRInfo:
|
||||
|
||||
for f in self.changed_files:
|
||||
_, ext = os.path.splitext(f)
|
||||
path_in_docs = "docs" in f
|
||||
path_in_website = "website" in f
|
||||
path_in_docs = f.startswith("docs/")
|
||||
if (
|
||||
ext in DIFF_IN_DOCUMENTATION_EXT and (path_in_docs or path_in_website)
|
||||
ext in DIFF_IN_DOCUMENTATION_EXT and path_in_docs
|
||||
) or "docker/docs" in f:
|
||||
return True
|
||||
return False
|
||||
|
@ -137,17 +137,20 @@ def main():
|
||||
if pr_labels_to_remove:
|
||||
remove_labels(gh, pr_info, pr_labels_to_remove)
|
||||
|
||||
if FEATURE_LABEL in pr_info.labels:
|
||||
print(f"The '{FEATURE_LABEL}' in the labels, expect the 'Docs Check' status")
|
||||
if FEATURE_LABEL in pr_info.labels and not pr_info.has_changes_in_documentation():
|
||||
print(
|
||||
f"The '{FEATURE_LABEL}' in the labels, "
|
||||
"but there's no changed documentation"
|
||||
)
|
||||
post_commit_status( # do not pass pr_info here intentionally
|
||||
commit,
|
||||
"pending",
|
||||
"failure",
|
||||
NotSet,
|
||||
f"expect adding docs for {FEATURE_LABEL}",
|
||||
DOCS_NAME,
|
||||
pr_info,
|
||||
)
|
||||
elif not description_error:
|
||||
set_mergeable_check(commit, "skipped")
|
||||
sys.exit(1)
|
||||
|
||||
if description_error:
|
||||
print(
|
||||
@ -173,6 +176,7 @@ def main():
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
set_mergeable_check(commit, "skipped")
|
||||
ci_report_url = create_ci_report(pr_info, [])
|
||||
if not can_run:
|
||||
print("::notice ::Cannot run")
|
||||
|
@ -64,6 +64,7 @@ NEED_RERUN_WORKFLOWS = {
|
||||
"DocsCheck",
|
||||
"MasterCI",
|
||||
"NightlyBuilds",
|
||||
"PublishedReleaseCI",
|
||||
"PullRequestCI",
|
||||
"ReleaseBranchCI",
|
||||
}
|
||||
|
@ -91,5 +91,6 @@
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_http_named_session",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_grpc",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_tcp_and_others",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query"
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_client_suggestions_load"
|
||||
]
|
||||
|
@ -10,6 +10,7 @@ import threading
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
from helpers.test_tools import assert_logs_contain_with_retry
|
||||
|
||||
from helpers.uclient import client, prompt
|
||||
|
||||
MAX_SESSIONS_FOR_USER = 2
|
||||
POSTGRES_SERVER_PORT = 5433
|
||||
@ -209,3 +210,36 @@ def test_profile_max_sessions_for_user_tcp_and_others(started_cluster):
|
||||
|
||||
def test_profile_max_sessions_for_user_setting_in_query(started_cluster):
|
||||
instance.query_and_get_error("SET max_sessions_for_user = 10")
|
||||
|
||||
|
||||
def test_profile_max_sessions_for_user_client_suggestions_connection(started_cluster):
|
||||
command_text = f"{started_cluster.get_client_cmd()} --host {instance.ip_address} --port 9000 -u {TEST_USER} --password {TEST_PASSWORD}"
|
||||
command_text_without_suggestions = command_text + " --disable_suggestion"
|
||||
|
||||
# Launch client1 without suggestions to avoid a race condition:
|
||||
# Client1 opens a session.
|
||||
# Client1 opens a session for suggestion connection.
|
||||
# Client2 fails to open a session and gets the USER_SESSION_LIMIT_EXCEEDED error.
|
||||
#
|
||||
# Expected order:
|
||||
# Client1 opens a session.
|
||||
# Client2 opens a session.
|
||||
# Client2 fails to open a session for suggestions and with USER_SESSION_LIMIT_EXCEEDED (No error printed).
|
||||
# Client3 fails to open a session.
|
||||
# Client1 executes the query.
|
||||
# Client2 loads suggestions from the server using the main connection and executes a query.
|
||||
with client(
|
||||
name="client1>", log=None, command=command_text_without_suggestions
|
||||
) as client1:
|
||||
client1.expect(prompt)
|
||||
with client(name="client2>", log=None, command=command_text) as client2:
|
||||
client2.expect(prompt)
|
||||
with client(name="client3>", log=None, command=command_text) as client3:
|
||||
client3.expect("USER_SESSION_LIMIT_EXCEEDED")
|
||||
|
||||
client1.send("SELECT 'CLIENT_1_SELECT' FORMAT CSV")
|
||||
client1.expect("CLIENT_1_SELECT")
|
||||
client1.expect(prompt)
|
||||
client2.send("SELECT 'CLIENT_2_SELECT' FORMAT CSV")
|
||||
client2.expect("CLIENT_2_SELECT")
|
||||
client2.expect(prompt)
|
||||
|
@ -9,10 +9,10 @@ create view logs as select * from system.text_log where now() - toIntervalMinute
|
||||
|
||||
-- Check that we don't have too many messages formatted with fmt::runtime or strings concatenation.
|
||||
-- 0.001 threshold should be always enough, the value was about 0.00025
|
||||
select 'runtime messages', max2(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.001) from logs;
|
||||
select 'runtime messages', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.001) from logs;
|
||||
|
||||
-- Check the same for exceptions. The value was 0.03
|
||||
select 'runtime exceptions', max2(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.05) from logs where message like '%DB::Exception%';
|
||||
select 'runtime exceptions', greatest(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.05) from logs where message like '%DB::Exception%';
|
||||
|
||||
-- FIXME some of the following messages are not informative and it has to be fixed
|
||||
create temporary table known_short_messages (s String) as select * from (select
|
||||
@ -36,7 +36,7 @@ create temporary table known_short_messages (s String) as select * from (select
|
||||
'Database {} does not exist', 'Dictionary ({}) not found', 'Unknown table function {}',
|
||||
'Unknown format {}', 'Unknown explain kind ''{}''', 'Unknown setting {}', 'Unknown input format {}',
|
||||
'Unknown identifier: ''{}''', 'User name is empty', 'Expected function, got: {}',
|
||||
'Attempt to read after eof', 'String size is too big ({}), maximum: {}', 'API mode: {}',
|
||||
'Attempt to read after eof', 'String size is too big ({}), maximum: {}',
|
||||
'Processed: {}%', 'Creating {}: {}', 'Table {}.{} doesn''t exist', 'Invalid cache key hex: {}',
|
||||
'User has been dropped', 'Illegal type {} of argument of function {}. Should be DateTime or DateTime64'
|
||||
] as arr) array join arr;
|
||||
|
@ -118,7 +118,6 @@ ExpressionTransform
|
||||
MergingAggregatedBucketTransform × 4
|
||||
Resize 1 → 4
|
||||
GroupingAggregatedTransform 3 → 1
|
||||
(Union)
|
||||
(ReadFromRemoteParallelReplicas)
|
||||
select a, count() from pr_t group by a order by a limit 5 offset 500;
|
||||
500 1000
|
||||
|
@ -0,0 +1,5 @@
|
||||
drop table if exists data_01072;
|
||||
drop table if exists dist_01072;
|
||||
create table data_01072 (key Int) Engine=MergeTree() ORDER BY key;
|
||||
create table dist_01072 (key Int) Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01072, key);
|
||||
select * from dist_01072 where key=0 and _part='0';
|
Loading…
Reference in New Issue
Block a user