mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 13:32:13 +00:00
Merge remote-tracking branch 'origin/master' into alternative_keeper
This commit is contained in:
commit
6c1a756225
@ -1,11 +1,11 @@
|
||||
sudo apt-get install apt-transport-https ca-certificates dirmngr
|
||||
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
|
||||
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
|
||||
|
||||
echo "deb https://packages.clickhouse.com/deb stable main/" | sudo tee \
|
||||
echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
|
||||
/etc/apt/sources.list.d/clickhouse.list
|
||||
sudo apt-get update
|
||||
|
||||
sudo apt-get install -y clickhouse-server clickhouse-client
|
||||
|
||||
sudo service clickhouse-server start
|
||||
clickhouse-client # or "clickhouse-client --password" if you set up a password.
|
||||
clickhouse-client # or "clickhouse-client --password" if you've set up a password.
|
||||
|
@ -1 +0,0 @@
|
||||
../../../../en/sql-reference/statements/alter/settings-profile.md
|
16
docs/zh/sql-reference/statements/alter/settings-profile.md
Normal file
16
docs/zh/sql-reference/statements/alter/settings-profile.md
Normal file
@ -0,0 +1,16 @@
|
||||
---
|
||||
toc_priority: 48
|
||||
toc_title: 配置文件设置
|
||||
---
|
||||
|
||||
## 更改配置文件设置 {#alter-settings-profile-statement}
|
||||
|
||||
更改配置文件设置。
|
||||
|
||||
语法:
|
||||
|
||||
``` sql
|
||||
ALTER SETTINGS PROFILE [IF EXISTS] TO name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1]
|
||||
[, name2 [ON CLUSTER cluster_name2] [RENAME TO new_name2] ...]
|
||||
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY|WRITABLE] | INHERIT 'profile_name'] [,...]
|
||||
```
|
@ -435,6 +435,8 @@ private:
|
||||
Progress progress;
|
||||
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
|
||||
|
||||
executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY);
|
||||
|
||||
ProfileInfo info;
|
||||
while (Block block = executor.read())
|
||||
info.update(block);
|
||||
|
@ -133,7 +133,12 @@ void MultiplexedConnections::sendQuery(
|
||||
modified_settings.group_by_two_level_threshold_bytes = 0;
|
||||
}
|
||||
|
||||
if (settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas)
|
||||
bool parallel_reading_from_replicas = settings.max_parallel_replicas > 1
|
||||
&& settings.allow_experimental_parallel_reading_from_replicas
|
||||
/// To avoid trying to coordinate with clickhouse-benchmark,
|
||||
/// since it uses the same code.
|
||||
&& client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY;
|
||||
if (parallel_reading_from_replicas)
|
||||
{
|
||||
client_info.collaborate_with_initiator = true;
|
||||
client_info.count_participating_replicas = replica_info.all_replicas_count;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/renameat2.h>
|
||||
#include <IO/createReadBufferFromFileBase.h>
|
||||
|
||||
#include <fstream>
|
||||
@ -325,7 +326,7 @@ DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
|
||||
|
||||
void DiskLocal::moveFile(const String & from_path, const String & to_path)
|
||||
{
|
||||
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
|
||||
renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
|
||||
}
|
||||
|
||||
void DiskLocal::replaceFile(const String & from_path, const String & to_path)
|
||||
|
@ -207,7 +207,6 @@ public:
|
||||
|
||||
using ShardsInfo = std::vector<ShardInfo>;
|
||||
|
||||
String getHashOfAddresses() const { return hash_of_addresses; }
|
||||
const ShardsInfo & getShardsInfo() const { return shards_info; }
|
||||
const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
|
||||
|
||||
@ -263,7 +262,6 @@ private:
|
||||
/// Inter-server secret
|
||||
String secret;
|
||||
|
||||
String hash_of_addresses;
|
||||
/// Description of the cluster shards.
|
||||
ShardsInfo shards_info;
|
||||
/// Any remote shard.
|
||||
|
@ -116,7 +116,7 @@ void executeQuery(
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
|
||||
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
|
||||
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
|
||||
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
|
||||
|
||||
std::vector<QueryPlanPtr> plans;
|
||||
|
@ -210,7 +210,7 @@ static Block adaptBlockStructure(const Block & block, const Block & header)
|
||||
return res;
|
||||
}
|
||||
|
||||
void RemoteQueryExecutor::sendQuery()
|
||||
void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind)
|
||||
{
|
||||
if (sent_query)
|
||||
return;
|
||||
@ -237,13 +237,7 @@ void RemoteQueryExecutor::sendQuery()
|
||||
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
|
||||
ClientInfo modified_client_info = context->getClientInfo();
|
||||
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
|
||||
/// Set initial_query_id to query_id for the clickhouse-benchmark.
|
||||
///
|
||||
/// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY,
|
||||
/// due to it executes queries via RemoteBlockInputStream)
|
||||
if (modified_client_info.initial_query_id.empty())
|
||||
modified_client_info.initial_query_id = query_id;
|
||||
modified_client_info.query_kind = query_kind;
|
||||
if (CurrentThread::isInitialized())
|
||||
{
|
||||
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
|
||||
|
@ -83,7 +83,13 @@ public:
|
||||
~RemoteQueryExecutor();
|
||||
|
||||
/// Create connection and send query, external tables and scalars.
|
||||
void sendQuery();
|
||||
///
|
||||
/// @param query_kind - kind of query, usually it is SECONDARY_QUERY,
|
||||
/// since this is the queries between servers
|
||||
/// (for which this code was written in general).
|
||||
/// But clickhouse-benchmark uses the same code,
|
||||
/// and it should pass INITIAL_QUERY.
|
||||
void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY);
|
||||
|
||||
/// Query is resent to a replica, the query itself can be modified.
|
||||
std::atomic<bool> resent_query { false };
|
||||
|
@ -126,7 +126,7 @@ DistributedSink::DistributedSink(
|
||||
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
|
||||
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
|
||||
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
|
||||
context->getClientInfo().distributed_depth += 1;
|
||||
random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
|
||||
|
@ -16,7 +16,7 @@ struct ExternalDataSourceConfiguration
|
||||
{
|
||||
String host;
|
||||
UInt16 port = 0;
|
||||
String username;
|
||||
String username = "default";
|
||||
String password;
|
||||
String database;
|
||||
String table;
|
||||
|
@ -116,13 +116,12 @@ public:
|
||||
, compression_method(compression_method_)
|
||||
, max_block_size(max_block_size_)
|
||||
, sample_block(std::move(sample_block_))
|
||||
, to_read_block(sample_block)
|
||||
, columns_description(getColumnsDescription(sample_block, source_info))
|
||||
, text_input_field_names(text_input_field_names_)
|
||||
, format_settings(getFormatSettings(getContext()))
|
||||
{
|
||||
/// Initialize to_read_block, which is used to read data from HDFS.
|
||||
to_read_block = sample_block;
|
||||
/// Initialize to_read_block, which is used to read data from HDFS.
|
||||
for (const auto & name_type : source_info->partition_name_types)
|
||||
{
|
||||
to_read_block.erase(name_type.name);
|
||||
@ -207,11 +206,17 @@ public:
|
||||
|
||||
/// Enrich with partition columns.
|
||||
auto types = source_info->partition_name_types.getTypes();
|
||||
auto names = source_info->partition_name_types.getNames();
|
||||
auto fields = source_info->hive_files[current_idx]->getPartitionValues();
|
||||
for (size_t i = 0; i < types.size(); ++i)
|
||||
{
|
||||
auto column = types[i]->createColumnConst(num_rows, source_info->hive_files[current_idx]->getPartitionValues()[i]);
|
||||
auto previous_idx = sample_block.getPositionByName(source_info->partition_name_types.getNames()[i]);
|
||||
columns.insert(columns.begin() + previous_idx, column->convertToFullColumnIfConst());
|
||||
// Only add the required partition columns. partition columns are not read from readbuffer
|
||||
// the column must be in sample_block, otherwise sample_block.getPositionByName(names[i]) will throw an exception
|
||||
if (!sample_block.has(names[i]))
|
||||
continue;
|
||||
auto column = types[i]->createColumnConst(num_rows, fields[i]);
|
||||
auto previous_idx = sample_block.getPositionByName(names[i]);
|
||||
columns.insert(columns.begin() + previous_idx, column);
|
||||
}
|
||||
|
||||
/// Enrich with virtual columns.
|
||||
@ -551,7 +556,34 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded(
|
||||
}
|
||||
return hive_file;
|
||||
}
|
||||
bool StorageHive::isColumnOriented() const
|
||||
{
|
||||
return format_name == "Parquet" || format_name == "ORC";
|
||||
}
|
||||
|
||||
void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const
|
||||
{
|
||||
if (!isColumnOriented())
|
||||
sample_block = header_block;
|
||||
UInt32 erased_columns = 0;
|
||||
for (const auto & column : partition_columns)
|
||||
{
|
||||
if (sample_block.has(column))
|
||||
erased_columns++;
|
||||
}
|
||||
if (erased_columns == sample_block.columns())
|
||||
{
|
||||
for (size_t i = 0; i < header_block.columns(); ++i)
|
||||
{
|
||||
const auto & col = header_block.getByPosition(i);
|
||||
if (!partition_columns.count(col.name))
|
||||
{
|
||||
sample_block.insert(col);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Pipe StorageHive::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
@ -617,14 +649,20 @@ Pipe StorageHive::read(
|
||||
sources_info->table_name = hive_table;
|
||||
sources_info->hive_metastore_client = hive_metastore_client;
|
||||
sources_info->partition_name_types = partition_name_types;
|
||||
|
||||
const auto & header_block = metadata_snapshot->getSampleBlock();
|
||||
Block sample_block;
|
||||
for (const auto & column : column_names)
|
||||
{
|
||||
sample_block.insert(header_block.getByName(column));
|
||||
if (column == "_path")
|
||||
sources_info->need_path_column = true;
|
||||
if (column == "_file")
|
||||
sources_info->need_file_column = true;
|
||||
}
|
||||
|
||||
getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()});
|
||||
|
||||
if (num_streams > sources_info->hive_files.size())
|
||||
num_streams = sources_info->hive_files.size();
|
||||
|
||||
@ -636,7 +674,7 @@ Pipe StorageHive::read(
|
||||
hdfs_namenode_url,
|
||||
format_name,
|
||||
compression_method,
|
||||
metadata_snapshot->getSampleBlock(),
|
||||
sample_block,
|
||||
context_,
|
||||
max_block_size,
|
||||
text_input_field_names));
|
||||
|
@ -53,6 +53,8 @@ public:
|
||||
|
||||
NamesAndTypesList getVirtuals() const override;
|
||||
|
||||
bool isColumnOriented() const override;
|
||||
|
||||
protected:
|
||||
friend class StorageHiveSource;
|
||||
StorageHive(
|
||||
@ -88,6 +90,8 @@ private:
|
||||
HiveFilePtr
|
||||
createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_);
|
||||
|
||||
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
|
||||
|
||||
String hive_metastore_url;
|
||||
|
||||
/// Hive database and table
|
||||
|
@ -126,13 +126,9 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
if (ctx->disk->exists(local_new_part_tmp_path))
|
||||
throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
||||
|
||||
{
|
||||
std::lock_guard lock(global_ctx->mutator->tmp_parts_lock);
|
||||
global_ctx->mutator->tmp_parts.emplace(local_tmp_part_basename);
|
||||
}
|
||||
global_ctx->data->temporary_parts.add(local_tmp_part_basename);
|
||||
SCOPE_EXIT(
|
||||
std::lock_guard lock(global_ctx->mutator->tmp_parts_lock);
|
||||
global_ctx->mutator->tmp_parts.erase(local_tmp_part_basename);
|
||||
global_ctx->data->temporary_parts.remove(local_tmp_part_basename);
|
||||
);
|
||||
|
||||
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
|
||||
|
@ -1386,7 +1386,7 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds)
|
||||
size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds)
|
||||
{
|
||||
/// If the method is already called from another thread, then we don't need to do anything.
|
||||
std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
|
||||
@ -1418,9 +1418,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta
|
||||
{
|
||||
if (disk->isDirectory(it->path()) && isOldPartDirectory(disk, it->path(), deadline))
|
||||
{
|
||||
if (merger_mutator.hasTemporaryPart(basename))
|
||||
if (temporary_parts.contains(basename))
|
||||
{
|
||||
LOG_WARNING(log, "{} is an active destination for one of merge/mutation (consider increasing temporary_directories_lifetime setting)", full_path);
|
||||
LOG_WARNING(log, "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
|
||||
continue;
|
||||
}
|
||||
else
|
||||
|
@ -3,30 +3,31 @@
|
||||
#include <Common/SimpleIncrement.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
#include <Storages/DataDestinationType.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Processors/Merges/Algorithms/Graphite.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
||||
#include <Storages/IndicesDescription.h>
|
||||
#include <Storages/MergeTree/MergeTreePartsMover.h>
|
||||
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
|
||||
#include <Storages/MergeTree/PinnedPartUUIDs.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
#include <Disks/StoragePolicy.h>
|
||||
#include <Storages/MergeTree/ZeroCopyLock.h>
|
||||
#include <Storages/MergeTree/TemporaryParts.h>
|
||||
#include <Storages/IndicesDescription.h>
|
||||
#include <Storages/DataDestinationType.h>
|
||||
#include <Storages/extractKeyExpressionList.h>
|
||||
#include <Storages/PartitionCommands.h>
|
||||
#include <Storages/MergeTree/ZeroCopyLock.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
#include <Disks/StoragePolicy.h>
|
||||
|
||||
|
||||
#include <boost/multi_index_container.hpp>
|
||||
@ -566,7 +567,7 @@ public:
|
||||
|
||||
/// Delete all directories which names begin with "tmp"
|
||||
/// Must be called with locked lockForShare() because it's using relative_data_path.
|
||||
size_t clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds);
|
||||
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds);
|
||||
|
||||
size_t clearEmptyParts();
|
||||
|
||||
@ -906,7 +907,6 @@ public:
|
||||
mutable std::mutex currently_submerging_emerging_mutex;
|
||||
|
||||
protected:
|
||||
|
||||
friend class IMergeTreeDataPart;
|
||||
friend class MergeTreeDataMergerMutator;
|
||||
friend struct ReplicatedMergeTreeTableMetadata;
|
||||
@ -1200,6 +1200,8 @@ private:
|
||||
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
|
||||
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
|
||||
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
|
||||
|
||||
TemporaryParts temporary_parts;
|
||||
};
|
||||
|
||||
/// RAII struct to record big parts that are submerging or emerging.
|
||||
|
@ -782,10 +782,4 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat
|
||||
}
|
||||
|
||||
|
||||
bool MergeTreeDataMergerMutator::hasTemporaryPart(const std::string & basename) const
|
||||
{
|
||||
std::lock_guard lock(tmp_parts_lock);
|
||||
return tmp_parts.contains(basename);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -192,26 +192,6 @@ private:
|
||||
ITTLMergeSelector::PartitionIdToTTLs next_recompress_ttl_merge_times_by_partition;
|
||||
/// Performing TTL merges independently for each partition guarantees that
|
||||
/// there is only a limited number of TTL merges and no partition stores data, that is too stale
|
||||
|
||||
public:
|
||||
/// Returns true if passed part name is active.
|
||||
/// (is the destination for one of active mutation/merge).
|
||||
///
|
||||
/// NOTE: that it accept basename (i.e. dirname), not the path,
|
||||
/// since later requires canonical form.
|
||||
bool hasTemporaryPart(const std::string & basename) const;
|
||||
|
||||
private:
|
||||
/// Set of active temporary paths that is used as the destination.
|
||||
/// List of such paths is required to avoid trying to remove them during cleanup.
|
||||
///
|
||||
/// NOTE: It is pretty short, so use STL is fine.
|
||||
std::unordered_set<std::string> tmp_parts;
|
||||
/// Lock for "tmp_parts".
|
||||
///
|
||||
/// NOTE: mutable is required to mark hasTemporaryPath() const
|
||||
mutable std::mutex tmp_parts_lock;
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -64,7 +64,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
|
||||
/// Both use relative_data_path which changes during rename, so we
|
||||
/// do it under share lock
|
||||
storage.clearOldWriteAheadLogs();
|
||||
storage.clearOldTemporaryDirectories(storage.merger_mutator, storage.getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
}
|
||||
|
||||
/// This is loose condition: no problem if we actually had lost leadership at this moment
|
||||
|
24
src/Storages/MergeTree/TemporaryParts.cpp
Normal file
24
src/Storages/MergeTree/TemporaryParts.cpp
Normal file
@ -0,0 +1,24 @@
|
||||
#include <Storages/MergeTree/TemporaryParts.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool TemporaryParts::contains(const std::string & basename) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return parts.contains(basename);
|
||||
}
|
||||
|
||||
void TemporaryParts::add(std::string basename)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
parts.emplace(std::move(basename));
|
||||
}
|
||||
|
||||
void TemporaryParts::remove(const std::string & basename)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
parts.erase(basename);
|
||||
}
|
||||
|
||||
}
|
33
src/Storages/MergeTree/TemporaryParts.h
Normal file
33
src/Storages/MergeTree/TemporaryParts.h
Normal file
@ -0,0 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Manages set of active temporary paths that should not be cleaned by background thread.
|
||||
class TemporaryParts : private boost::noncopyable
|
||||
{
|
||||
private:
|
||||
/// To add const qualifier for contains()
|
||||
mutable std::mutex mutex;
|
||||
|
||||
/// NOTE: It is pretty short, so use STL is fine.
|
||||
std::unordered_set<std::string> parts;
|
||||
|
||||
public:
|
||||
/// Returns true if passed part name is active.
|
||||
/// (is the destination for one of active mutation/merge).
|
||||
///
|
||||
/// NOTE: that it accept basename (i.e. dirname), not the path,
|
||||
/// since later requires canonical form.
|
||||
bool contains(const std::string & basename) const;
|
||||
|
||||
void add(std::string basename);
|
||||
void remove(const std::string & basename);
|
||||
};
|
||||
|
||||
}
|
@ -118,6 +118,7 @@ namespace ErrorCodes
|
||||
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
|
||||
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
|
||||
}
|
||||
|
||||
namespace ActionLocks
|
||||
@ -705,6 +706,9 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
|
||||
QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
|
||||
{
|
||||
const Settings & settings = local_context->getSettingsRef();
|
||||
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
|
||||
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
|
||||
|
||||
std::shared_ptr<StorageDistributed> storage_src;
|
||||
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
|
||||
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
|
||||
@ -733,14 +737,34 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
|
||||
}
|
||||
}
|
||||
|
||||
if (!storage_src || storage_src->getClusterName() != getClusterName())
|
||||
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
|
||||
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
|
||||
/// Compare addresses instead of cluster name, to handle remote()/cluster().
|
||||
/// (since for remote()/cluster() the getClusterName() is empty string)
|
||||
if (src_addresses != dst_addresses)
|
||||
{
|
||||
/// The warning should be produced only for root queries,
|
||||
/// since in case of parallel_distributed_insert_select=1,
|
||||
/// it will produce warning for the rewritten insert,
|
||||
/// since destination table is still Distributed there.
|
||||
if (local_context->getClientInfo().distributed_depth == 0)
|
||||
{
|
||||
LOG_WARNING(log,
|
||||
"Parallel distributed INSERT SELECT is not possible "
|
||||
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
|
||||
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
|
||||
src_addresses.size(),
|
||||
getClusterName(),
|
||||
dst_addresses.size());
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
|
||||
{
|
||||
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
|
||||
/// Reset table function for INSERT INTO remote()/cluster()
|
||||
new_query->table_function.reset();
|
||||
}
|
||||
|
||||
const auto & cluster = getCluster();
|
||||
@ -757,12 +781,15 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
|
||||
new_query_str = buf.str();
|
||||
}
|
||||
|
||||
ContextMutablePtr query_context = Context::createCopy(local_context);
|
||||
++query_context->getClientInfo().distributed_depth;
|
||||
|
||||
for (size_t shard_index : collections::range(0, shards_info.size()))
|
||||
{
|
||||
const auto & shard_info = shards_info[shard_index];
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
InterpreterInsertQuery interpreter(new_query, local_context);
|
||||
InterpreterInsertQuery interpreter(new_query, query_context);
|
||||
pipelines.emplace_back(std::make_unique<QueryPipelineBuilder>());
|
||||
pipelines.back()->init(interpreter.execute().pipeline);
|
||||
}
|
||||
@ -776,7 +803,7 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
|
||||
|
||||
/// INSERT SELECT query returns empty block
|
||||
auto remote_query_executor
|
||||
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context);
|
||||
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, query_context);
|
||||
pipelines.emplace_back(std::make_unique<QueryPipelineBuilder>());
|
||||
pipelines.back()->init(Pipe(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote)));
|
||||
pipelines.back()->setSinks([](const Block & header, QueryPipelineBuilder::StreamType) -> ProcessorPtr
|
||||
|
@ -114,8 +114,6 @@ public:
|
||||
/// Used by InterpreterInsertQuery
|
||||
std::string getRemoteDatabaseName() const { return remote_database; }
|
||||
std::string getRemoteTableName() const { return remote_table; }
|
||||
/// Returns empty string if tables is used by TableFunctionRemote
|
||||
std::string getClusterName() const { return cluster_name; }
|
||||
ClusterPtr getCluster() const;
|
||||
|
||||
/// Used by InterpreterSystemQuery
|
||||
@ -201,6 +199,7 @@ private:
|
||||
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const;
|
||||
|
||||
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
|
||||
std::string getClusterName() const { return cluster_name.empty() ? "<remote>" : cluster_name; }
|
||||
|
||||
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }
|
||||
|
||||
|
@ -108,7 +108,7 @@ void StorageMergeTree::startup()
|
||||
|
||||
/// Temporary directories contain incomplete results of merges (after forced restart)
|
||||
/// and don't allow to reinitialize them, so delete each of them immediately
|
||||
clearOldTemporaryDirectories(merger_mutator, 0);
|
||||
clearOldTemporaryDirectories(0);
|
||||
|
||||
/// NOTE background task will also do the above cleanups periodically.
|
||||
time_after_previous_cleanup_parts.restart();
|
||||
@ -1062,7 +1062,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
|
||||
assignee.scheduleCommonTask(ExecutableLambdaAdapter::create(
|
||||
[this, share_lock] ()
|
||||
{
|
||||
return clearOldTemporaryDirectories(merger_mutator, getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
|
||||
scheduled = true;
|
||||
}
|
||||
|
@ -451,7 +451,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
}
|
||||
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
|
||||
/// don't allow to reinitialize them, delete each of them immediately.
|
||||
clearOldTemporaryDirectories(merger_mutator, 0);
|
||||
clearOldTemporaryDirectories(0);
|
||||
clearOldWriteAheadLogs();
|
||||
}
|
||||
|
||||
|
@ -231,7 +231,6 @@ CI_CONFIG = {
|
||||
},
|
||||
"Stateful tests (aarch64, actions)": {
|
||||
"required_build": "package_aarch64",
|
||||
"force_tests": True,
|
||||
},
|
||||
"Stateful tests (release, DatabaseOrdinary, actions)": {
|
||||
"required_build": "package_release",
|
||||
@ -259,7 +258,6 @@ CI_CONFIG = {
|
||||
},
|
||||
"Stateless tests (aarch64, actions)": {
|
||||
"required_build": "package_aarch64",
|
||||
"force_tests": True,
|
||||
},
|
||||
"Stateless tests (release, wide parts enabled, actions)": {
|
||||
"required_build": "package_release",
|
||||
|
@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-asan, no-msan, no-fasttest
|
||||
# Tags: no-asan, no-msan, no-fasttest, no-cpu-aarch64
|
||||
# Tag no-msan: can't pass because odbc libraries are not instrumented
|
||||
# Tag no-cpu-aarch64: clickhouse-odbc is not setup for arm
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
|
@ -0,0 +1,27 @@
|
||||
-- { echoOn }
|
||||
truncate table dst_02224;
|
||||
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
|
||||
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH }
|
||||
select * from dst_02224;
|
||||
truncate table dst_02224;
|
||||
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
|
||||
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=1, max_distributed_depth=2;
|
||||
select * from dst_02224;
|
||||
1
|
||||
1
|
||||
truncate table dst_02224;
|
||||
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
|
||||
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
|
||||
select * from dst_02224;
|
||||
1
|
||||
1
|
||||
truncate table dst_02224;
|
||||
insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key)
|
||||
select * from remote('127.{1,2}', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
|
||||
select * from dst_02224;
|
||||
1
|
||||
1
|
@ -0,0 +1,34 @@
|
||||
drop table if exists dst_02224;
|
||||
drop table if exists src_02224;
|
||||
create table dst_02224 (key Int) engine=Memory();
|
||||
create table src_02224 (key Int) engine=Memory();
|
||||
insert into src_02224 values (1);
|
||||
|
||||
-- { echoOn }
|
||||
truncate table dst_02224;
|
||||
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
|
||||
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=1, max_distributed_depth=1; -- { serverError TOO_LARGE_DISTRIBUTED_DEPTH }
|
||||
select * from dst_02224;
|
||||
|
||||
truncate table dst_02224;
|
||||
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
|
||||
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=1, max_distributed_depth=2;
|
||||
select * from dst_02224;
|
||||
|
||||
truncate table dst_02224;
|
||||
insert into function cluster('test_cluster_two_shards', currentDatabase(), dst_02224, key)
|
||||
select * from cluster('test_cluster_two_shards', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
|
||||
select * from dst_02224;
|
||||
|
||||
truncate table dst_02224;
|
||||
insert into function remote('127.{1,2}', currentDatabase(), dst_02224, key)
|
||||
select * from remote('127.{1,2}', currentDatabase(), src_02224, key)
|
||||
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
|
||||
select * from dst_02224;
|
||||
-- { echoOff }
|
||||
|
||||
drop table src_02224;
|
||||
drop table dst_02224;
|
29
tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh
Executable file
29
tests/queries/0_stateless/02226_parallel_reading_from_replicas_benchmark.sh
Executable file
@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -nm -q "
|
||||
drop table if exists data_02226;
|
||||
create table data_02226 (key Int) engine=MergeTree() order by key
|
||||
as select * from numbers(1);
|
||||
"
|
||||
|
||||
# Regression for:
|
||||
#
|
||||
# Logical error: 'Coordinator for parallel reading from replicas is not initialized'.
|
||||
opts=(
|
||||
--allow_experimental_parallel_reading_from_replicas 1
|
||||
--max_parallel_replicas 3
|
||||
|
||||
--iterations 1
|
||||
)
|
||||
$CLICKHOUSE_BENCHMARK --query "select * from remote('127.1', $CLICKHOUSE_DATABASE, data_02226)" "${opts[@]}" >& /dev/null
|
||||
ret=$?
|
||||
|
||||
$CLICKHOUSE_CLIENT -nm -q "
|
||||
drop table data_02226;
|
||||
"
|
||||
|
||||
exit $ret
|
Loading…
Reference in New Issue
Block a user