This commit is contained in:
Yarik Briukhovetskyi 2024-08-27 16:28:18 +02:00 committed by GitHub
parent 8b2db6276c
commit 189cbe25fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 91 additions and 18 deletions

View File

@ -131,10 +131,11 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
else
{
ConfigurationPtr copy_configuration = configuration->clone();
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
auto keys = configuration->getPaths();
String partitioning_path = fs::path(configuration->getNamespace()) / keys[0];
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, partitioning_path, local_context);
if (filter_dag)
{
auto keys = configuration->getPaths();
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
@ -142,7 +143,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
copy_configuration->setPaths(keys);
}
@ -492,6 +493,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
, virtual_columns(virtual_columns_)
, throw_on_zero_files_match(throw_on_zero_files_match_)
, read_keys(read_keys_)
, local_context(context_)
, file_progress_callback(file_progress_callback_)
{
if (configuration->isNamespaceWithGlobs())
@ -513,7 +515,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
}
recursive = key_with_globs == "/**";
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, key_with_globs, local_context))
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
@ -588,7 +590,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
for (const auto & object_info : new_batch)
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
}

View File

@ -220,6 +220,7 @@ private:
bool is_finished = false;
bool first_iteration = true;
std::mutex next_mutex;
const ContextPtr local_context;
std::function<void(FileProgress)> file_progress_callback;
};

View File

@ -1140,13 +1140,13 @@ StorageFileSource::FilesIterator::FilesIterator(
{
std::optional<ActionsDAG> filter_dag;
if (!distributed_processing && !archive_info && !files.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, files[0], context_);
if (filter_dag)
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
}
}

View File

@ -214,7 +214,10 @@ public:
std::optional<ActionsDAG> filter_dag;
if (!uris.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
{
String partitioning_path = Poco::URI(uris[0]).getPath();
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, partitioning_path, context);
}
if (filter_dag)
{
@ -225,7 +228,7 @@ public:
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
}
}

View File

@ -46,6 +46,7 @@
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <IO/ReadBufferFromString.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
@ -197,7 +198,7 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
return desc;
}
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
static void addFilterDataToVirtualColumns(Block & block, const String & path, size_t idx, ColumnsWithTypeAndName partitioning_keys, const ContextPtr & context)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
@ -214,18 +215,31 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
block.getByName("_file").column->assumeMutableRef().insert(file);
}
for (const auto & item : partitioning_keys)
{
if (block.has(item.name))
{
auto column = block.getByName(item.name).column;
ReadBufferFromString buf(item.column->getDataAt(0).toView());
item.type->getDefaultSerialization()->deserializeWholeText(column->assumeMutableRef(), buf, getFormatSettings(context));
}
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path, const ContextPtr & context)
{
if (!predicate || virtual_columns.empty())
return {};
Block block;
std::unordered_map<std::string, std::string> keys;
if (context->getSettingsRef().use_hive_partitioning)
keys = parseHivePartitioningKeysAndValues(path);
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || keys.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
@ -233,18 +247,31 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
return splitFilterDagForAllowedInputs(predicate, &block);
}
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
Block block;
std::unordered_map<std::string, std::string> keys;
ColumnsWithTypeAndName partitioning_columns;
if (context->getSettingsRef().use_hive_partitioning)
keys = parseHivePartitioningKeysAndValues(paths[0]);
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
block.insert({column.type->createColumn(), column.type, column.name});
auto it = keys.find(column.name);
if (it != keys.end())
{
auto c = std::make_shared<DataTypeString>()->createColumn();
c->insert(it->second);
block.insert({column.type->createColumn(), column.type, column.name});
partitioning_columns.push_back({c->getPtr(), column.type, column.name});
}
}
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i);
addFilterDataToVirtualColumns(block, paths[i], i, partitioning_columns, context);
filterBlockWithExpression(actions, block);

View File

@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
const std::string & sample_path = "",
std::optional<FormatSettings> format_settings_ = std::nullopt);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path, const ContextPtr & context);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size())
return;

View File

@ -0,0 +1,6 @@
1
1
1
1
1
1

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query_id="test_03231_1" --query "
SELECT countDistinct(_path) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth';
"
${CLICKHOUSE_CLIENT} --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id='test_03231_1';
"
${CLICKHOUSE_CLIENT} --query_id="test_03231_2" --query "
SELECT countDistinct(_path) FROM file('$CURDIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070;
"
${CLICKHOUSE_CLIENT} --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id='test_03231_2';
"
${CLICKHOUSE_CLIENT} --query_id="test_03231_3" --query "
SELECT countDistinct(_path) FROM file('$CURDIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3];
"
${CLICKHOUSE_CLIENT} --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id='test_03231_3';
"

View File

@ -0,0 +1,5 @@
_login_email,_identifier,_first_name,_last_name
laura@example.com,2070,Laura,Grey
craig@example.com,4081,Craig,Johnson
mary@example.com,9346,Mary,Jenkins
jamie@example.com,5079,Jamie,Smith
1 _login_email _identifier _first_name _last_name
2 laura@example.com 2070 Laura Grey
3 craig@example.com 4081 Craig Johnson
4 mary@example.com 9346 Mary Jenkins
5 jamie@example.com 5079 Jamie Smith