Merge pull request #68963 from yariks5s/hive_partitioning_filtration

Filtering for hive partitioning
This commit is contained in:
Yarik Briukhovetskyi 2024-09-18 22:16:58 +00:00 committed by GitHub
commit 3eb5bc1a0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 153 additions and 31 deletions

View File

@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
return res;
}
NameSet NamesAndTypesList::getNameSet() const
{
NameSet res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
res.insert(column.name);
return res;
}
DataTypes NamesAndTypesList::getTypes() const
{
DataTypes res;

View File

@ -100,6 +100,7 @@ public:
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
Names getNames() const;
NameSet getNameSet() const;
DataTypes getTypes() const;
/// Remove columns which names are not in the `names`.

View File

@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
else
{
ConfigurationPtr copy_configuration = configuration->clone();
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
if (filter_dag)
{
auto keys = configuration->getPaths();
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
copy_configuration->setPaths(keys);
}
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
, virtual_columns(virtual_columns_)
, throw_on_zero_files_match(throw_on_zero_files_match_)
, read_keys(read_keys_)
, local_context(context_)
, file_progress_callback(file_progress_callback_)
{
if (configuration->isNamespaceWithGlobs())
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
}
recursive = key_with_globs == "/**";
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
for (const auto & object_info : new_batch)
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
}

View File

@ -220,6 +220,7 @@ private:
bool is_finished = false;
bool first_iteration = true;
std::mutex next_mutex;
const ContextPtr local_context;
std::function<void(FileProgress)> file_progress_callback;
};

View File

@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
{
std::optional<ActionsDAG> filter_dag;
if (!distributed_processing && !archive_info && !files.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
if (filter_dag)
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
}
}

View File

@ -227,7 +227,7 @@ public:
std::optional<ActionsDAG> filter_dag;
if (!uris.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
if (filter_dag)
{
@ -238,7 +238,7 @@ public:
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
}
}

View File

@ -1,5 +1,6 @@
#include <memory>
#include <stack>
#include <unordered_set>
#include <Core/NamesAndTypes.h>
#include <Core/TypeId.h>
@ -46,6 +47,7 @@
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <IO/ReadBufferFromString.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
}
}
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
{
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
NameSet getVirtualNamesForFileLikeStorage()
{
return {"_path", "_file", "_size", "_time", "_etag"};
return getCommonVirtualsForFileLikeStorage().getNameSet();
}
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
{
VirtualColumnsDescription desc;
auto add_virtual = [&](const auto & name, const auto & type)
auto add_virtual = [&](const NameAndTypePair & pair)
{
const auto & name = pair.getNameInStorage();
const auto & type = pair.getTypeInStorage();
if (storage_columns.has(name))
{
if (!context->getSettingsRef().use_hive_partitioning)
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
desc.addEphemeral(name, type, "");
};
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
for (const auto & item : getCommonVirtualsForFileLikeStorage())
add_virtual(item);
if (context->getSettingsRef().use_hive_partitioning)
{
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
if (type == nullptr)
type = std::make_shared<DataTypeString>();
if (type->canBeInsideLowCardinality())
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
else
add_virtual(item.first, type);
add_virtual({item.first, type});
}
}
return desc;
}
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
block.getByName("_file").column->assumeMutableRef().insert(file);
}
if (use_hive_partitioning)
{
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
for (const auto & [key, value] : keys_and_values)
{
if (const auto * column = block.findByName(key))
{
ReadBufferFromString buf(value);
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
}
}
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
if (!predicate || virtual_columns.empty())
return {};
Block block;
NameSet common_virtuals;
if (context->getSettingsRef().use_hive_partitioning)
common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
return splitFilterDagForAllowedInputs(predicate, &block);
}
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
Block block;
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i);
addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
filterBlockWithExpression(actions, block);

View File

@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
const std::string & sample_path = "",
std::optional<FormatSettings> format_settings_ = std::nullopt);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size())
return;

View File

@ -33,8 +33,8 @@ Cross Elizabeth
[1,2,3] 42.42
Array(Int64) LowCardinality(Float64)
101
2070
2070
2071
2071
b
1
1

View File

@ -0,0 +1,6 @@
1
1
1
1
1
1

View File

@ -0,0 +1,72 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
mkdir -p $DATA_DIR
cp -r $CURDIR/data_hive/ $DATA_DIR
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
rm -rf $DATA_DIR

View File

@ -1 +1 @@
data_hive/partitioning/column0=Elizabeth/sample.parquet
data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;"
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"

View File

@ -0,0 +1,5 @@
_login_email,_identifier,_first_name,_last_name
laura@example.com,2070,Laura,Grey
craig@example.com,4081,Craig,Johnson
mary@example.com,9346,Mary,Jenkins
jamie@example.com,5079,Jamie,Smith
1 _login_email _identifier _first_name _last_name
2 laura@example.com 2070 Laura Grey
3 craig@example.com 4081 Craig Johnson
4 mary@example.com 9346 Mary Jenkins
5 jamie@example.com 5079 Jamie Smith