Merge remote-tracking branch 'upstream/master' into delete-non-evictable-feature

This commit is contained in:
kssenii 2023-06-21 20:52:08 +02:00
commit 78ea4feca0
23 changed files with 812 additions and 462 deletions

View File

@ -1,16 +0,0 @@
---
slug: /en/operations/server-configuration-parameters/
sidebar_position: 54
sidebar_label: Server Configuration Parameters
pagination_next: en/operations/server-configuration-parameters/settings
---
# Server Configuration Parameters
This section contains descriptions of server settings that cannot be changed at the session or query level.
These settings are stored in the `config.xml` file on the ClickHouse server.
Other settings are described in the “[Settings](../../operations/settings/index.md#session-settings-intro)” section.
Before studying the settings, read the [Configuration files](../../operations/configuration-files.md#configuration_files) section and note the use of substitutions (the `incl` and `optional` attributes).

View File

@ -7,6 +7,14 @@ description: This section contains descriptions of server settings that cannot b
# Server Settings
This section contains descriptions of server settings that cannot be changed at the session or query level.
These settings are stored in the `config.xml` file on the ClickHouse server.
Other settings are described in the “[Settings](../../operations/settings/index.md#session-settings-intro)” section.
Before studying the settings, read the [Configuration files](../../operations/configuration-files.md#configuration_files) section and note the use of substitutions (the `incl` and `optional` attributes).
## allow_use_jemalloc_memory
Allows to use jemalloc memory.

View File

@ -10,7 +10,7 @@ sidebar_label: SET
SET param = value
```
Assigns `value` to the `param` [setting](../../operations/settings/index.md) for the current session. You cannot change [server settings](../../operations/server-configuration-parameters/index.md) this way.
Assigns `value` to the `param` [setting](../../operations/settings/index.md) for the current session. You cannot change [server settings](../../operations/server-configuration-parameters/settings.md) this way.
You can also set all the values from the specified settings profile in a single query.

View File

@ -1,11 +1,7 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
@ -17,24 +13,27 @@ namespace DB
namespace
{
template <typename DataType>
class FunctionEmptyArray : public IFunction
{
private:
String element_type;
public:
static String getNameImpl() { return "emptyArray" + DataType().getName(); }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionEmptyArray>(); }
static String getNameImpl(const String & element_type) { return "emptyArray" + element_type; }
explicit FunctionEmptyArray(const String & element_type_) : element_type(element_type_) {}
private:
String getName() const override
{
return getNameImpl();
return getNameImpl(element_type);
}
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataType>());
return std::make_shared<DataTypeArray>(DataTypeFactory::instance().get(element_type));
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
@ -42,34 +41,35 @@ private:
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return ColumnArray::create(
DataType().createColumn(),
DataTypeFactory::instance().get(element_type)->createColumn(),
ColumnArray::ColumnOffsets::create(input_rows_count, 0));
}
};
template <typename F>
void registerFunction(FunctionFactory & factory)
void registerFunction(FunctionFactory & factory, const String & element_type)
{
factory.registerFunction<F>(F::getNameImpl());
factory.registerFunction(FunctionEmptyArray::getNameImpl(element_type),
[element_type](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(
std::make_shared<FunctionEmptyArray>(element_type)); });
}
}
REGISTER_FUNCTION(EmptyArray)
{
registerFunction<FunctionEmptyArray<DataTypeUInt8>>(factory);
registerFunction<FunctionEmptyArray<DataTypeUInt16>>(factory);
registerFunction<FunctionEmptyArray<DataTypeUInt32>>(factory);
registerFunction<FunctionEmptyArray<DataTypeUInt64>>(factory);
registerFunction<FunctionEmptyArray<DataTypeInt8>>(factory);
registerFunction<FunctionEmptyArray<DataTypeInt16>>(factory);
registerFunction<FunctionEmptyArray<DataTypeInt32>>(factory);
registerFunction<FunctionEmptyArray<DataTypeInt64>>(factory);
registerFunction<FunctionEmptyArray<DataTypeFloat32>>(factory);
registerFunction<FunctionEmptyArray<DataTypeFloat64>>(factory);
registerFunction<FunctionEmptyArray<DataTypeDate>>(factory);
registerFunction<FunctionEmptyArray<DataTypeDateTime>>(factory);
registerFunction<FunctionEmptyArray<DataTypeString>>(factory);
registerFunction(factory, "UInt8");
registerFunction(factory, "UInt16");
registerFunction(factory, "UInt32");
registerFunction(factory, "UInt64");
registerFunction(factory, "Int8");
registerFunction(factory, "Int16");
registerFunction(factory, "Int32");
registerFunction(factory, "Int64");
registerFunction(factory, "Float32");
registerFunction(factory, "Float64");
registerFunction(factory, "Date");
registerFunction(factory, "DateTime");
registerFunction(factory, "String");
}
}

View File

@ -812,6 +812,11 @@ void FileCache::removeKeyIfExists(const Key & key)
locked_key->removeAllReleasable();
}
void FileCache::removePathIfExists(const String & path)
{
removeKeyIfExists(createKeyForPath(path));
}
void FileCache::removeAllReleasable()
{
assertInitialized();

View File

@ -86,6 +86,9 @@ public:
/// Remove files by `key`. Removes files which might be used at the moment.
void removeKeyIfExists(const Key & key);
/// Removes files by `path`. Removes files which might be used at the moment.
void removePathIfExists(const String & path);
/// Remove files by `key`. Will not remove files which are used at the moment.
void removeAllReleasable();

View File

@ -90,6 +90,15 @@ UInt32 DataPartStorageOnDiskFull::getRefCount(const String & file_name) const
return volume->getDisk()->getRefCount(fs::path(root_path) / part_dir / file_name);
}
std::string DataPartStorageOnDiskFull::getRemotePath(const std::string & file_name) const
{
auto objects = volume->getDisk()->getStorageObjects(fs::path(root_path) / part_dir / file_name);
if (objects.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "One file must be mapped to one object on blob storage in MergeTree tables");
return objects[0].remote_path;
}
String DataPartStorageOnDiskFull::getUniqueId() const
{
auto disk = volume->getDisk();

View File

@ -22,6 +22,7 @@ public:
DataPartStorageIteratorPtr iterate() const override;
size_t getFileSize(const std::string & file_name) const override;
UInt32 getRefCount(const std::string & file_name) const override;
std::string getRemotePath(const std::string & file_name) const override;
String getUniqueId() const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(

View File

@ -111,6 +111,9 @@ public:
virtual size_t getFileSize(const std::string & file_name) const = 0;
virtual UInt32 getRefCount(const std::string & file_name) const = 0;
/// Get path on remote filesystem from file name on local filesystem.
virtual std::string getRemotePath(const std::string & file_name) const = 0;
virtual UInt64 calculateTotalSizeOnDisk() const = 0;
/// Open the file for read and return ReadBufferFromFileBase object.
@ -173,7 +176,6 @@ public:
/// Required for distinguish different copies of the same part on remote FS.
virtual String getUniqueId() const = 0;
/// Represents metadata which is required for fetching of part.
struct ReplicatedFilesDescription
{
@ -284,7 +286,6 @@ public:
bool remove_new_dir_if_exists,
bool fsync_part_dir) = 0;
/// Starts a transaction of mutable operations.
virtual void beginTransaction() = 0;
/// Commits a transaction of mutable operations.

View File

@ -494,7 +494,7 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
ThreadsTasks result_threads_tasks;
for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i)
{
auto need_marks = min_marks_per_thread;
int64_t need_marks = min_marks_per_thread;
/// Priority is given according to the prefetch number for each thread,
/// e.g. the first task of each thread has the same priority and is greater
@ -515,7 +515,7 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
}
MarkRanges ranges_to_get_from_part;
size_t marks_to_get_from_part = std::min(need_marks, marks_in_part);
size_t marks_to_get_from_part = std::min<size_t>(need_marks, marks_in_part);
/// Split by prefetch step even if !allow_prefetch below. Because it will allow
/// to make a better distribution of tasks which did not fill into memory limit

View File

@ -1,4 +1,4 @@
#include "Storages/MergeTree/IDataPartStorage.h"
#include <Poco/Logger.h>
#include <algorithm>
#include <optional>
@ -8,6 +8,9 @@
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/HashingReadBuffer.h>
#include <Common/CurrentMetrics.h>
@ -45,12 +48,13 @@ bool isNotEnoughMemoryErrorCode(int code)
}
IMergeTreeDataPart::Checksums checkDataPart(
static IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
const IDataPartStorage & data_part_storage,
const NamesAndTypesList & columns_list,
const MergeTreeDataPartType & part_type,
const NameSet & files_without_checksums,
const ReadSettings & read_settings,
bool require_checksums,
std::function<bool()> is_cancelled)
{
@ -65,7 +69,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
NamesAndTypesList columns_txt;
{
auto buf = data_part_storage.readFile("columns.txt", {}, std::nullopt, std::nullopt);
auto buf = data_part_storage.readFile("columns.txt", read_settings, std::nullopt, std::nullopt);
columns_txt.readText(*buf);
assertEOF(*buf);
}
@ -78,9 +82,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
IMergeTreeDataPart::Checksums checksums_data;
/// This function calculates checksum for both compressed and decompressed contents of compressed file.
auto checksum_compressed_file = [](const IDataPartStorage & data_part_storage_, const String & file_path)
auto checksum_compressed_file = [&read_settings](const IDataPartStorage & data_part_storage_, const String & file_path)
{
auto file_buf = data_part_storage_.readFile(file_path, {}, std::nullopt, std::nullopt);
auto file_buf = data_part_storage_.readFile(file_path, read_settings, std::nullopt, std::nullopt);
HashingReadBuffer compressed_hashing_buf(*file_buf);
CompressedReadBuffer uncompressing_buf(compressed_hashing_buf);
HashingReadBuffer uncompressed_hashing_buf(uncompressing_buf);
@ -98,7 +102,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, read_settings, std::nullopt, std::nullopt);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}
@ -114,7 +118,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
/// This function calculates only checksum of file content (compressed or uncompressed).
auto checksum_file = [&](const String & file_name)
{
auto file_buf = data_part_storage.readFile(file_name, {}, std::nullopt, std::nullopt);
auto file_buf = data_part_storage.readFile(file_name, read_settings, std::nullopt, std::nullopt);
HashingReadBuffer hashing_buf(*file_buf);
hashing_buf.ignoreAll();
checksums_data.files[file_name] = IMergeTreeDataPart::Checksums::Checksum(hashing_buf.count(), hashing_buf.getHash());
@ -152,7 +156,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (require_checksums || data_part_storage.exists("checksums.txt"))
{
auto buf = data_part_storage.readFile("checksums.txt", {}, std::nullopt, std::nullopt);
auto buf = data_part_storage.readFile("checksums.txt", read_settings, std::nullopt, std::nullopt);
checksums_txt.read(*buf);
assertEOF(*buf);
}
@ -202,7 +206,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
projection, *data_part_storage.getProjection(projection_file),
projection->getColumns(), projection->getType(),
projection->getFileNamesWithoutChecksums(),
require_checksums, is_cancelled);
read_settings, require_checksums, is_cancelled);
checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(
projection_checksums.getTotalSizeOnDisk(),
@ -243,14 +247,70 @@ IMergeTreeDataPart::Checksums checkDataPart(
if (auto part_in_memory = asInMemoryPart(data_part))
return checkDataPartInMemory(part_in_memory);
return checkDataPart(
data_part,
data_part->getDataPartStorage(),
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
require_checksums,
is_cancelled);
/// If check of part has failed and it is stored on disk with cache
/// try to drop cache and check it once again because maybe the cache
/// is broken not the part itself.
auto drop_cache_and_check = [&]
{
const auto & data_part_storage = data_part->getDataPartStorage();
auto cache_name = data_part_storage.getCacheName();
if (!cache_name)
throw;
LOG_DEBUG(
&Poco::Logger::get("checkDataPart"),
"Will drop cache for data part {} and will check it once again", data_part->name);
auto & cache = *FileCacheFactory::instance().getByName(*cache_name).cache;
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
auto file_name = it->name();
if (!data_part_storage.isDirectory(file_name))
{
auto remote_path = data_part_storage.getRemotePath(file_name);
cache.removePathIfExists(remote_path);
}
}
ReadSettings read_settings;
read_settings.enable_filesystem_cache = false;
return checkDataPart(
data_part,
data_part_storage,
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled);
};
try
{
ReadSettings read_settings;
return checkDataPart(
data_part,
data_part->getDataPartStorage(),
data_part->getColumns(),
data_part->getType(),
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled);
}
catch (const Exception & e)
{
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
return drop_cache_and_check();
}
catch (...)
{
return drop_cache_and_check();
}
}
}

View File

@ -12,15 +12,6 @@ IMergeTreeDataPart::Checksums checkDataPart(
bool require_checksums,
std::function<bool()> is_cancelled = []{ return false; });
IMergeTreeDataPart::Checksums checkDataPart(
const DiskPtr & disk,
const String & full_relative_path,
const NamesAndTypesList & columns_list,
const MergeTreeDataPartType & part_type,
const NameSet & files_without_checksums,
bool require_checksums,
std::function<bool()> is_cancelled = []{ return false; });
bool isNotEnoughMemoryErrorCode(int code);
}

View File

@ -81,6 +81,7 @@
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/IStorageCluster.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
@ -153,7 +154,6 @@ namespace ErrorCodes
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED;
}
namespace ActionLocks
@ -650,264 +650,6 @@ StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery(
namespace
{
/// Visitor that collect column source to columns mapping from query and all subqueries
class CollectColumnSourceToColumnsVisitor : public InDepthQueryTreeVisitor<CollectColumnSourceToColumnsVisitor>
{
public:
struct Columns
{
NameSet column_names;
NamesAndTypes columns;
void addColumn(NameAndTypePair column)
{
if (column_names.contains(column.name))
return;
column_names.insert(column.name);
columns.push_back(std::move(column));
}
};
const std::unordered_map<QueryTreeNodePtr, Columns> & getColumnSourceToColumns() const
{
return column_source_to_columns;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;
auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
return;
auto it = column_source_to_columns.find(column_source);
if (it == column_source_to_columns.end())
{
auto [insert_it, _] = column_source_to_columns.emplace(column_source, Columns());
it = insert_it;
}
it->second.addColumn(column_node->getColumn());
}
private:
std::unordered_map<QueryTreeNodePtr, Columns> column_source_to_columns;
};
/** Visitor that rewrites IN and JOINs in query and all subqueries according to distributed_product_mode and
* prefer_global_in_and_join settings.
*
* Additionally collects GLOBAL JOIN and GLOBAL IN query nodes.
*
* If distributed_product_mode = deny, then visitor throws exception if there are multiple distributed tables.
* If distributed_product_mode = local, then visitor collects replacement map for tables that must be replaced
* with local tables.
* If distributed_product_mode = global or prefer_global_in_and_join setting is true, then visitor rewrites JOINs and IN functions that
* contain distributed tables to GLOBAL JOINs and GLOBAL IN functions.
* If distributed_product_mode = allow, then visitor does not rewrite query if there are multiple distributed tables.
*/
class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
using Base::Base;
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
: Base(context_)
{}
struct InFunctionOrJoin
{
QueryTreeNodePtr query_node;
size_t subquery_depth = 0;
};
const std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> & getReplacementMap() const
{
return replacement_map;
}
const std::vector<InFunctionOrJoin> & getGlobalInOrJoinNodes() const
{
return global_in_or_join_nodes;
}
static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
{
auto * function_node = parent->as<FunctionNode>();
if (function_node && isNameOfGlobalInFunction(function_node->getFunctionName()))
return false;
auto * join_node = parent->as<JoinNode>();
if (join_node && join_node->getLocality() == JoinLocality::Global && join_node->getRightTableExpression() == child)
return false;
return true;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
auto * join_node = node->as<JoinNode>();
if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() == JoinLocality::Global))
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
global_in_or_join_nodes.push_back(std::move(in_function_or_join_entry));
return;
}
if ((function_node && isNameOfLocalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() != JoinLocality::Global))
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
in_function_or_join_stack.push_back(in_function_or_join_entry);
return;
}
if (node->getNodeType() == QueryTreeNodeType::TABLE)
tryRewriteTableNodeIfNeeded(node);
}
void leaveImpl(QueryTreeNodePtr & node)
{
if (!in_function_or_join_stack.empty() && node.get() == in_function_or_join_stack.back().query_node.get())
in_function_or_join_stack.pop_back();
}
private:
void tryRewriteTableNodeIfNeeded(const QueryTreeNodePtr & table_node)
{
const auto & table_node_typed = table_node->as<TableNode &>();
const auto * distributed_storage = typeid_cast<const StorageDistributed *>(table_node_typed.getStorage().get());
if (!distributed_storage)
return;
bool distributed_valid_for_rewrite = distributed_storage->getShardCount() >= 2;
if (!distributed_valid_for_rewrite)
return;
auto distributed_product_mode = getSettings().distributed_product_mode;
if (distributed_product_mode == DistributedProductMode::LOCAL)
{
StorageID remote_storage_id = StorageID{distributed_storage->getRemoteDatabaseName(),
distributed_storage->getRemoteTableName()};
auto resolved_remote_storage_id = getContext()->resolveStorageID(remote_storage_id);
const auto & distributed_storage_columns = table_node_typed.getStorageSnapshot()->metadata->getColumns();
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_columns);
auto replacement_table_expression = std::make_shared<TableNode>(std::move(storage), getContext());
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
}
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings().prefer_global_in_and_join) &&
!in_function_or_join_stack.empty())
{
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
if (auto * in_function_to_modify = in_or_join_node_to_modify->as<FunctionNode>())
{
auto global_in_function_name = getGlobalInFunctionNameForLocalInFunctionName(in_function_to_modify->getFunctionName());
auto global_in_function_resolver = FunctionFactory::instance().get(global_in_function_name, getContext());
in_function_to_modify->resolveAsFunction(global_in_function_resolver->build(in_function_to_modify->getArgumentColumns()));
}
else if (auto * join_node_to_modify = in_or_join_node_to_modify->as<JoinNode>())
{
join_node_to_modify->setLocality(JoinLocality::Global);
}
global_in_or_join_nodes.push_back(in_function_or_join_stack.back());
}
else if (distributed_product_mode == DistributedProductMode::ALLOW)
{
return;
}
else if (distributed_product_mode == DistributedProductMode::DENY)
{
throw Exception(ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED,
"Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny'). "
"You may rewrite query to use local tables "
"in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.");
}
}
std::vector<InFunctionOrJoin> in_function_or_join_stack;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
std::vector<InFunctionOrJoin> global_in_or_join_nodes;
};
/** Execute subquery node and put result in mutable context temporary table.
* Returns table node that is initialized with temporary table storage.
*/
TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
ContextMutablePtr & mutable_context,
size_t subquery_depth)
{
auto subquery_hash = subquery_node->getTreeHash();
String temporary_table_name = fmt::format("_data_{}_{}", subquery_hash.first, subquery_hash.second);
const auto & external_tables = mutable_context->getExternalTables();
auto external_table_it = external_tables.find(temporary_table_name);
if (external_table_it != external_tables.end())
{
auto temporary_table_expression_node = std::make_shared<TableNode>(external_table_it->second, mutable_context);
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
return temporary_table_expression_node;
}
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth, true /*is_subquery*/);
auto context_copy = Context::createCopy(mutable_context);
updateContextForSubqueryExecution(context_copy);
InterpreterSelectQueryAnalyzer interpreter(subquery_node, context_copy, subquery_options);
auto & query_plan = interpreter.getQueryPlan();
auto sample_block_with_unique_names = query_plan.getCurrentDataStream().header;
makeUniqueColumnNamesInBlock(sample_block_with_unique_names);
if (!blocksHaveEqualStructure(sample_block_with_unique_names, query_plan.getCurrentDataStream().header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
sample_block_with_unique_names.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(actions_dag));
query_plan.addStep(std::move(converting_step));
}
Block sample = interpreter.getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
auto external_storage_holder = TemporaryTableHolder(
mutable_context,
ColumnsDescription{columns},
ConstraintsDescription{},
nullptr /*query*/,
true /*create_for_global_subquery*/);
StoragePtr external_storage = external_storage_holder.getTable();
auto temporary_table_expression_node = std::make_shared<TableNode>(external_storage, mutable_context);
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
auto table_out = external_storage->write({}, external_storage->getInMemoryMetadataPtr(), mutable_context, /*async_insert=*/false);
auto io = interpreter.execute();
io.pipeline.complete(std::move(table_out));
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
mutable_context->addExternalTable(temporary_table_name, std::move(external_storage_holder));
return temporary_table_expression_node;
}
QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
const StorageSnapshotPtr & distributed_storage_snapshot,
const StorageID & remote_storage_id,
@ -963,81 +705,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,
auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression));
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);
const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();
DistributedProductModeRewriteInJoinVisitor visitor(query_info.planner_context->getQueryContext());
visitor.visit(query_tree_to_modify);
auto replacement_map = visitor.getReplacementMap();
const auto & global_in_or_join_nodes = visitor.getGlobalInOrJoinNodes();
for (const auto & global_in_or_join_node : global_in_or_join_nodes)
{
if (auto * join_node = global_in_or_join_node.query_node->as<JoinNode>())
{
auto join_right_table_expression = join_node->getRightTableExpression();
auto join_right_table_expression_node_type = join_right_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY ||
join_right_table_expression_node_type == QueryTreeNodeType::UNION)
{
subquery_node = join_right_table_expression;
}
else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE ||
join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
{
const auto & columns = column_source_to_columns.at(join_right_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns,
join_right_table_expression,
planner_context->getQueryContext());
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected JOIN right table expression to be table, table function, query or union node. Actual {}",
join_right_table_expression->formatASTForErrorMessage());
}
auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_right_table_expression->getAlias());
replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
else if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
{
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
auto in_function_node_type = in_function_subquery_node->getNodeType();
if (in_function_node_type != QueryTreeNodeType::QUERY && in_function_node_type != QueryTreeNodeType::UNION)
continue;
auto temporary_table_expression_node = executeSubqueryNode(in_function_subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
in_function_subquery_node = std::move(temporary_table_expression_node);
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected global IN or JOIN query node. Actual {}",
global_in_or_join_node.query_node->formatASTForErrorMessage());
}
}
if (!replacement_map.empty())
query_tree_to_modify = query_tree_to_modify->cloneAndReplace(replacement_map);
removeGroupingFunctionSpecializations(query_tree_to_modify);
return query_tree_to_modify;
return buildQueryTreeForShard(query_info, query_tree_to_modify);
}
}

View File

@ -22,6 +22,7 @@
#include <base/sort.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/AlterCommands.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/Freeze.h>
@ -75,20 +76,23 @@
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h>
#include <Planner/Utils.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/JoinedTables.h>
#include <Backups/BackupEntriesCollector.h>
@ -4835,15 +4839,28 @@ void StorageReplicatedMergeTree::read(
{
auto table_id = getStorageID();
const auto & modified_query_ast = ClusterProxy::rewriteSelectQuery(
local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
}
else
{
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
Block header =
InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,

View File

@ -435,13 +435,14 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) const
ZkPathType path_type;
String prefix;
String path_corrected;
std::future<Coordination::ListResponse> future;
String path_part;
};
std::vector<ListTask> list_tasks;
std::unordered_set<String> added;
while (!paths.empty())
{
list_tasks.clear();
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
{
auto [path, path_type] = std::move(paths.front());
@ -459,75 +460,91 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) const
task.path_corrected = pathCorrected(path);
task.future = zookeeper->asyncTryGetChildren(task.path_corrected);
paths_to_list.emplace_back(task.path_corrected);
list_tasks.emplace_back(std::move(task));
}
auto list_responses = zookeeper->tryGetChildren(paths_to_list);
for (auto & task : list_tasks)
struct GetTask
{
context->getProcessListElement()->checkTimeLimit();
auto list_result = task.future.get();
size_t list_task_idx; /// Index of 'parent' request in list_tasks
String node; /// Node name
};
std::vector<GetTask> get_tasks;
std::vector<String> paths_to_get;
for (size_t list_task_idx = 0; list_task_idx < list_tasks.size(); ++list_task_idx)
{
auto & list_result = list_responses[list_task_idx];
/// Node can be deleted concurrently. It's Ok, we don't provide any
/// consistency guarantees for system.zookeeper table.
if (list_result.error == Coordination::Error::ZNONODE)
continue;
auto & task = list_tasks[list_task_idx];
context->getProcessListElement()->checkTimeLimit();
Strings nodes = std::move(list_result.names);
String path_part = task.path_corrected;
if (path_part == "/")
path_part.clear();
task.path_part = task.path_corrected;
if (task.path_part == "/")
task.path_part.clear();
if (!task.prefix.empty())
{
// Remove nodes that do not match specified prefix
std::erase_if(nodes, [&task, &path_part] (const String & node)
std::erase_if(nodes, [&task] (const String & node)
{
return (path_part + '/' + node).substr(0, task.prefix.size()) != task.prefix;
return (task.path_part + '/' + node).substr(0, task.prefix.size()) != task.prefix;
});
}
std::vector<std::future<Coordination::GetResponse>> futures;
futures.reserve(nodes.size());
get_tasks.reserve(get_tasks.size() + nodes.size());
for (const String & node : nodes)
futures.push_back(zookeeper->asyncTryGet(path_part + '/' + node));
for (size_t i = 0, size = nodes.size(); i < size; ++i)
{
context->getProcessListElement()->checkTimeLimit();
auto res = futures[i].get();
if (res.error == Coordination::Error::ZNONODE)
continue; /// Node was deleted meanwhile.
paths_to_get.emplace_back(task.path_part + '/' + node);
get_tasks.emplace_back(GetTask{list_task_idx, node});
}
}
// Deduplication
String key = path_part + '/' + nodes[i];
if (auto [it, inserted] = added.emplace(key); !inserted)
continue;
auto get_responses = zookeeper->tryGet(paths_to_get);
const Coordination::Stat & stat = res.stat;
for (size_t i = 0, size = get_tasks.size(); i < size; ++i)
{
auto & res = get_responses[i];
if (res.error == Coordination::Error::ZNONODE)
continue; /// Node was deleted meanwhile.
size_t col_num = 0;
res_columns[col_num++]->insert(nodes[i]);
res_columns[col_num++]->insert(res.data);
res_columns[col_num++]->insert(stat.czxid);
res_columns[col_num++]->insert(stat.mzxid);
res_columns[col_num++]->insert(UInt64(stat.ctime / 1000));
res_columns[col_num++]->insert(UInt64(stat.mtime / 1000));
res_columns[col_num++]->insert(stat.version);
res_columns[col_num++]->insert(stat.cversion);
res_columns[col_num++]->insert(stat.aversion);
res_columns[col_num++]->insert(stat.ephemeralOwner);
res_columns[col_num++]->insert(stat.dataLength);
res_columns[col_num++]->insert(stat.numChildren);
res_columns[col_num++]->insert(stat.pzxid);
res_columns[col_num++]->insert(
task.path); /// This is the original path. In order to process the request, condition in WHERE should be triggered.
auto & get_task = get_tasks[i];
auto & list_task = list_tasks[get_task.list_task_idx];
context->getProcessListElement()->checkTimeLimit();
if (task.path_type != ZkPathType::Exact && res.stat.numChildren > 0)
{
paths.emplace_back(key, ZkPathType::Recurse);
}
// Deduplication
String key = list_task.path_part + '/' + get_task.node;
if (auto [it, inserted] = added.emplace(key); !inserted)
continue;
const Coordination::Stat & stat = res.stat;
size_t col_num = 0;
res_columns[col_num++]->insert(get_task.node);
res_columns[col_num++]->insert(res.data);
res_columns[col_num++]->insert(stat.czxid);
res_columns[col_num++]->insert(stat.mzxid);
res_columns[col_num++]->insert(UInt64(stat.ctime / 1000));
res_columns[col_num++]->insert(UInt64(stat.mtime / 1000));
res_columns[col_num++]->insert(stat.version);
res_columns[col_num++]->insert(stat.cversion);
res_columns[col_num++]->insert(stat.aversion);
res_columns[col_num++]->insert(stat.ephemeralOwner);
res_columns[col_num++]->insert(stat.dataLength);
res_columns[col_num++]->insert(stat.numChildren);
res_columns[col_num++]->insert(stat.pzxid);
res_columns[col_num++]->insert(
list_task.path); /// This is the original path. In order to process the request, condition in WHERE should be triggered.
if (list_task.path_type != ZkPathType::Exact && res.stat.numChildren > 0)
{
paths.emplace_back(key, ZkPathType::Recurse);
}
}
}

View File

@ -0,0 +1,373 @@
#include <Storages/buildQueryTreeForShard.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/IQueryTreeNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/Utils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Storages/removeGroupingFunctionSpecializations.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDummy.h>
#include <Planner/Utils.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/ExpressionStep.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED;
}
namespace
{
/// Visitor that collect column source to columns mapping from query and all subqueries
class CollectColumnSourceToColumnsVisitor : public InDepthQueryTreeVisitor<CollectColumnSourceToColumnsVisitor>
{
public:
struct Columns
{
NameSet column_names;
NamesAndTypes columns;
void addColumn(NameAndTypePair column)
{
if (column_names.contains(column.name))
return;
column_names.insert(column.name);
columns.push_back(std::move(column));
}
};
const std::unordered_map<QueryTreeNodePtr, Columns> & getColumnSourceToColumns() const
{
return column_source_to_columns;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * column_node = node->as<ColumnNode>();
if (!column_node)
return;
auto column_source = column_node->getColumnSourceOrNull();
if (!column_source)
return;
auto it = column_source_to_columns.find(column_source);
if (it == column_source_to_columns.end())
{
auto [insert_it, _] = column_source_to_columns.emplace(column_source, Columns());
it = insert_it;
}
it->second.addColumn(column_node->getColumn());
}
private:
std::unordered_map<QueryTreeNodePtr, Columns> column_source_to_columns;
};
/** Visitor that rewrites IN and JOINs in query and all subqueries according to distributed_product_mode and
* prefer_global_in_and_join settings.
*
* Additionally collects GLOBAL JOIN and GLOBAL IN query nodes.
*
* If distributed_product_mode = deny, then visitor throws exception if there are multiple distributed tables.
* If distributed_product_mode = local, then visitor collects replacement map for tables that must be replaced
* with local tables.
* If distributed_product_mode = global or prefer_global_in_and_join setting is true, then visitor rewrites JOINs and IN functions that
* contain distributed tables to GLOBAL JOINs and GLOBAL IN functions.
* If distributed_product_mode = allow, then visitor does not rewrite query if there are multiple distributed tables.
*/
class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
using Base::Base;
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
: Base(context_)
{}
struct InFunctionOrJoin
{
QueryTreeNodePtr query_node;
size_t subquery_depth = 0;
};
const std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> & getReplacementMap() const
{
return replacement_map;
}
const std::vector<InFunctionOrJoin> & getGlobalInOrJoinNodes() const
{
return global_in_or_join_nodes;
}
static bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
{
auto * function_node = parent->as<FunctionNode>();
if (function_node && isNameOfGlobalInFunction(function_node->getFunctionName()))
return false;
auto * join_node = parent->as<JoinNode>();
if (join_node && join_node->getLocality() == JoinLocality::Global && join_node->getRightTableExpression() == child)
return false;
return true;
}
void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
auto * join_node = node->as<JoinNode>();
if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() == JoinLocality::Global))
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
global_in_or_join_nodes.push_back(std::move(in_function_or_join_entry));
return;
}
if ((function_node && isNameOfLocalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() != JoinLocality::Global))
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
in_function_or_join_entry.subquery_depth = getSubqueryDepth();
in_function_or_join_stack.push_back(in_function_or_join_entry);
return;
}
if (node->getNodeType() == QueryTreeNodeType::TABLE)
tryRewriteTableNodeIfNeeded(node);
}
void leaveImpl(QueryTreeNodePtr & node)
{
if (!in_function_or_join_stack.empty() && node.get() == in_function_or_join_stack.back().query_node.get())
in_function_or_join_stack.pop_back();
}
private:
void tryRewriteTableNodeIfNeeded(const QueryTreeNodePtr & table_node)
{
const auto & table_node_typed = table_node->as<TableNode &>();
const auto * distributed_storage = typeid_cast<const StorageDistributed *>(table_node_typed.getStorage().get());
if (!distributed_storage)
return;
bool distributed_valid_for_rewrite = distributed_storage->getShardCount() >= 2;
if (!distributed_valid_for_rewrite)
return;
auto distributed_product_mode = getSettings().distributed_product_mode;
if (distributed_product_mode == DistributedProductMode::LOCAL)
{
StorageID remote_storage_id = StorageID{distributed_storage->getRemoteDatabaseName(),
distributed_storage->getRemoteTableName()};
auto resolved_remote_storage_id = getContext()->resolveStorageID(remote_storage_id);
const auto & distributed_storage_columns = table_node_typed.getStorageSnapshot()->metadata->getColumns();
auto storage = std::make_shared<StorageDummy>(resolved_remote_storage_id, distributed_storage_columns);
auto replacement_table_expression = std::make_shared<TableNode>(std::move(storage), getContext());
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
}
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings().prefer_global_in_and_join) &&
!in_function_or_join_stack.empty())
{
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
if (auto * in_function_to_modify = in_or_join_node_to_modify->as<FunctionNode>())
{
auto global_in_function_name = getGlobalInFunctionNameForLocalInFunctionName(in_function_to_modify->getFunctionName());
auto global_in_function_resolver = FunctionFactory::instance().get(global_in_function_name, getContext());
in_function_to_modify->resolveAsFunction(global_in_function_resolver->build(in_function_to_modify->getArgumentColumns()));
}
else if (auto * join_node_to_modify = in_or_join_node_to_modify->as<JoinNode>())
{
join_node_to_modify->setLocality(JoinLocality::Global);
}
global_in_or_join_nodes.push_back(in_function_or_join_stack.back());
}
else if (distributed_product_mode == DistributedProductMode::ALLOW)
{
return;
}
else if (distributed_product_mode == DistributedProductMode::DENY)
{
throw Exception(ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED,
"Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny'). "
"You may rewrite query to use local tables "
"in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.");
}
}
std::vector<InFunctionOrJoin> in_function_or_join_stack;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
std::vector<InFunctionOrJoin> global_in_or_join_nodes;
};
/** Execute subquery node and put result in mutable context temporary table.
* Returns table node that is initialized with temporary table storage.
*/
TableNodePtr executeSubqueryNode(const QueryTreeNodePtr & subquery_node,
ContextMutablePtr & mutable_context,
size_t subquery_depth)
{
auto subquery_hash = subquery_node->getTreeHash();
String temporary_table_name = fmt::format("_data_{}_{}", subquery_hash.first, subquery_hash.second);
const auto & external_tables = mutable_context->getExternalTables();
auto external_table_it = external_tables.find(temporary_table_name);
if (external_table_it != external_tables.end())
{
auto temporary_table_expression_node = std::make_shared<TableNode>(external_table_it->second, mutable_context);
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
return temporary_table_expression_node;
}
auto subquery_options = SelectQueryOptions(QueryProcessingStage::Complete, subquery_depth, true /*is_subquery*/);
auto context_copy = Context::createCopy(mutable_context);
updateContextForSubqueryExecution(context_copy);
InterpreterSelectQueryAnalyzer interpreter(subquery_node, context_copy, subquery_options);
auto & query_plan = interpreter.getQueryPlan();
auto sample_block_with_unique_names = query_plan.getCurrentDataStream().header;
makeUniqueColumnNamesInBlock(sample_block_with_unique_names);
if (!blocksHaveEqualStructure(sample_block_with_unique_names, query_plan.getCurrentDataStream().header))
{
auto actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
sample_block_with_unique_names.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), std::move(actions_dag));
query_plan.addStep(std::move(converting_step));
}
Block sample = interpreter.getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
auto external_storage_holder = TemporaryTableHolder(
mutable_context,
ColumnsDescription{columns},
ConstraintsDescription{},
nullptr /*query*/,
true /*create_for_global_subquery*/);
StoragePtr external_storage = external_storage_holder.getTable();
auto temporary_table_expression_node = std::make_shared<TableNode>(external_storage, mutable_context);
temporary_table_expression_node->setTemporaryTableName(temporary_table_name);
auto table_out = external_storage->write({}, external_storage->getInMemoryMetadataPtr(), mutable_context, /*async_insert=*/false);
auto io = interpreter.execute();
io.pipeline.complete(std::move(table_out));
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
mutable_context->addExternalTable(temporary_table_name, std::move(external_storage_holder));
return temporary_table_expression_node;
}
}
QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeNodePtr query_tree_to_modify)
{
auto & planner_context = query_info.planner_context;
const auto & query_context = planner_context->getQueryContext();
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);
const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();
DistributedProductModeRewriteInJoinVisitor visitor(query_info.planner_context->getQueryContext());
visitor.visit(query_tree_to_modify);
auto replacement_map = visitor.getReplacementMap();
const auto & global_in_or_join_nodes = visitor.getGlobalInOrJoinNodes();
for (const auto & global_in_or_join_node : global_in_or_join_nodes)
{
if (auto * join_node = global_in_or_join_node.query_node->as<JoinNode>())
{
auto join_right_table_expression = join_node->getRightTableExpression();
auto join_right_table_expression_node_type = join_right_table_expression->getNodeType();
QueryTreeNodePtr subquery_node;
if (join_right_table_expression_node_type == QueryTreeNodeType::QUERY ||
join_right_table_expression_node_type == QueryTreeNodeType::UNION)
{
subquery_node = join_right_table_expression;
}
else if (join_right_table_expression_node_type == QueryTreeNodeType::TABLE ||
join_right_table_expression_node_type == QueryTreeNodeType::TABLE_FUNCTION)
{
const auto & columns = column_source_to_columns.at(join_right_table_expression).columns;
subquery_node = buildSubqueryToReadColumnsFromTableExpression(columns,
join_right_table_expression,
planner_context->getQueryContext());
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected JOIN right table expression to be table, table function, query or union node. Actual {}",
join_right_table_expression->formatASTForErrorMessage());
}
auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_right_table_expression->getAlias());
replacement_map.emplace(join_right_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
else if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
{
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
auto in_function_node_type = in_function_subquery_node->getNodeType();
if (in_function_node_type != QueryTreeNodeType::QUERY && in_function_node_type != QueryTreeNodeType::UNION)
continue;
auto temporary_table_expression_node = executeSubqueryNode(in_function_subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
in_function_subquery_node = std::move(temporary_table_expression_node);
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expected global IN or JOIN query node. Actual {}",
global_in_or_join_node.query_node->formatASTForErrorMessage());
}
}
if (!replacement_map.empty())
query_tree_to_modify = query_tree_to_modify->cloneAndReplace(replacement_map);
removeGroupingFunctionSpecializations(query_tree_to_modify);
return query_tree_to_modify;
}
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <memory>
namespace DB
{
struct SelectQueryInfo;
class IQueryTreeNode;
using QueryTreeNodePtr = std::shared_ptr<IQueryTreeNode>;
QueryTreeNodePtr buildQueryTreeForShard(SelectQueryInfo & query_info, QueryTreeNodePtr query_tree_to_modify);
}

View File

@ -122,6 +122,5 @@
02703_row_policy_for_database
02721_url_cluster
02534_s3_cluster_insert_select_schema_inference
02764_parallel_replicas_plain_merge_tree
02765_parallel_replicas_final_modifier
02784_parallel_replicas_automatic_disabling

View File

@ -0,0 +1,29 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3>
<s3_cache>
<type>cache</type>
<disk>s3</disk>
<path>/s3_cache/</path>
<max_size>1000000000</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
</s3_cache>
</disks>
<policies>
<s3_cache>
<volumes>
<main>
<disk>s3_cache</disk>
</main>
</volumes>
</s3_cache>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,106 @@
import pytest
import os
import json
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_check_part_with_cache(start_cluster):
if node.is_built_with_sanitizer() or node.is_debug_build():
pytest.skip(
"Skip with debug build and sanitizers. \
This test manually corrupts cache which triggers LOGICAL_ERROR \
and leads to crash with those builds"
)
node.query(
"""
CREATE TABLE s3_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
SETTINGS storage_policy='s3_cache'
"""
)
node.query("SYSTEM STOP MERGES s3_test")
node.query(
"INSERT INTO s3_test VALUES (0, 'data')",
settings={"enable_filesystem_cache_on_write_operations": 1},
)
node.query(
"INSERT INTO s3_test VALUES (1, 'data')",
settings={"enable_filesystem_cache_on_write_operations": 1},
)
def get_cache_path_of_data_file(part_name):
disk_path = node.query(
"SELECT path FROM system.disks WHERE name = 's3_cache'"
).strip("\n")
part_path = node.query(
f"SELECT path FROM system.parts WHERE table = 's3_test' AND name = '{part_name}'"
).strip("\n")
local_data_file_path = os.path.relpath(part_path, disk_path) + "/data.bin"
return node.query(
f"SELECT cache_paths[1] FROM system.remote_data_paths WHERE disk_name = 's3_cache' AND local_path = '{local_data_file_path}'"
).strip("\n")
cache_path = get_cache_path_of_data_file("all_1_1_0")
assert len(cache_path) > 0
node.exec_in_container(
["bash", "-c", f"truncate -s -1 {cache_path}"], privileged=True
)
assert (
node.query(
"SELECT count() FROM s3_test WHERE NOT ignore(*)",
settings={"enable_filesystem_cache": 0},
)
== "2\n"
)
with pytest.raises(Exception):
node.query(
"SELECT count() FROM s3_test WHERE NOT ignore(*)",
settings={"enable_filesystem_cache": 1},
)
assert node.query("CHECK TABLE s3_test") == "1\n"
# Check that cache is removed only for one part after CHECK TABLE
cache_path = get_cache_path_of_data_file("all_1_1_0")
assert len(cache_path) == 0
cache_path = get_cache_path_of_data_file("all_2_2_0")
assert len(cache_path) > 0
assert (
node.query(
"SELECT count() FROM s3_test WHERE NOT ignore(*)",
settings={"enable_filesystem_cache": 1},
)
== "2\n"
)

View File

@ -0,0 +1,12 @@
-8888150036649430454
-2788931093724180887
-75175454385331084
368066018677693974
821735343441964030
2804162938822577320
4357435422797280898
5935810273536892891
7885388429666205427
8124171311239967992
1 1 -- Simple query with analyzer and pure parallel replicas\nSELECT number\nFROM join_inner_table__fuzz_146_replicated\n SETTINGS\n allow_experimental_analyzer = 1,\n max_parallel_replicas = 2,\n cluster_for_parallel_replicas = \'test_cluster_one_shard_three_replicas_localhost\',\n allow_experimental_parallel_reading_from_replicas = 1,\n use_hedged_requests = 0;
0 2 SELECT `join_inner_table__fuzz_146_replicated`.`number` AS `number` FROM `default`.`join_inner_table__fuzz_146_replicated` SETTINGS allow_experimental_analyzer = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = \'test_cluster_one_shard_three_replicas_localhost\', allow_experimental_parallel_reading_from_replicas = 1, use_hedged_requests = 0

View File

@ -0,0 +1,52 @@
-- Tags: zookeeper
CREATE TABLE join_inner_table__fuzz_146_replicated
(
`id` UUID,
`key` String,
`number` Int64,
`value1` String,
`value2` String,
`time` Nullable(Int64)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/join_inner_table__fuzz_146_replicated', '{replica}')
ORDER BY (id, number, key)
SETTINGS index_granularity = 8192;
INSERT INTO join_inner_table__fuzz_146_replicated
SELECT CAST('833c9e22-c245-4eb5-8745-117a9a1f26b1', 'UUID') AS id, CAST(rowNumberInAllBlocks(), 'String') AS key, *
FROM generateRandom('number Int64, value1 String, value2 String, time Int64', 1, 10, 2) LIMIT 10;
-- Simple query with analyzer and pure parallel replicas
SELECT number
FROM join_inner_table__fuzz_146_replicated
SETTINGS
allow_experimental_analyzer = 1,
max_parallel_replicas = 2,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 1,
use_hedged_requests = 0;
SYSTEM FLUSH LOGS;
-- There should be 2 different queries
-- The initial query
-- The query sent to each replica (which should appear 2 times as we are setting max_parallel_replicas to 2)
SELECT
is_initial_query,
count() as c, query,
FROM system.query_log
WHERE
event_date >= yesterday()
AND type = 'QueryFinish'
AND initial_query_id =
(
SELECT query_id
FROM system.query_log
WHERE
current_database = currentDatabase()
AND event_date >= yesterday()
AND type = 'QueryFinish'
AND query LIKE '-- Simple query with analyzer and pure parallel replicas%'
)
GROUP BY is_initial_query, query
ORDER BY is_initial_query DESC, c, query;