Fixing more tests.

This commit is contained in:
Nikolai Kochetov 2023-05-26 19:25:33 +00:00
parent b5b261b22c
commit 6ed7a3b73f
15 changed files with 127 additions and 45 deletions

View File

@ -1420,8 +1420,12 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
return data.prepared_sets->addFromStorage(set_key, storage_set->getSet());
}
if (auto tmp_table = data.getContext()->findExternalTable(table_id.getShortName()))
// std::cerr << ".... checking for " << identifier->getColumnName() << std::endl;
if (auto tmp_table = data.getContext()->findExternalTable(identifier->getColumnName()))
{
external_table_set = tmp_table->future_set;
// std::cerr << "Found " << reinterpret_cast<const void *>(tmp_table.get()) << " " << reinterpret_cast<const void *>(external_table_set.get()) << std::endl;
}
}
/// We get the stream of blocks for the subquery. Create Set and put it in place of the subquery.

View File

@ -110,7 +110,7 @@ TemporaryTableHolder::TemporaryTableHolder(
}
TemporaryTableHolder::TemporaryTableHolder(TemporaryTableHolder && rhs) noexcept
: WithContext(rhs.context), temporary_tables(rhs.temporary_tables), id(rhs.id)
: WithContext(rhs.context), temporary_tables(rhs.temporary_tables), id(rhs.id), future_set(std::move(rhs.future_set))
{
rhs.id = UUIDHelpers::Nil;
}

View File

@ -30,6 +30,7 @@ namespace DB
namespace ErrorCodes
{
extern const int WRONG_GLOBAL_SUBQUERY;
extern const int LOGICAL_ERROR;
}
class GlobalSubqueriesMatcher
@ -159,6 +160,8 @@ public:
/*create_for_global_subquery*/ true);
StoragePtr external_storage = external_storage_holder->getTable();
// std::cerr << "......... adding external table " << external_table_name << std::endl;
external_tables.emplace(external_table_name, external_storage_holder);
/// We need to materialize external tables immediately because reading from distributed
@ -195,6 +198,7 @@ public:
//std::cerr << reinterpret_cast<const void *>(prepared_sets.get()) << std::endl;
auto future_set = prepared_sets->addFromSubquery(set_key, std::move(subquery_for_set), nullptr);
// std::cerr << "... Future set " << reinterpret_cast<const void *>(external_storage_holder.get()) << " " << reinterpret_cast<const void *>(future_set.get()) << std::endl;
external_storage_holder->future_set = std::move(future_set);
}
else

View File

@ -239,6 +239,8 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
return nullptr;
}
// std::cerr << "... external_table_set " << reinterpret_cast<const void *>(external_table_set.get()) << std::endl;
if (external_table_set)
return set = external_table_set->buildOrderedSetInplace(context);
@ -337,7 +339,7 @@ FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings)
set_key_columns.filter = ColumnUInt8::create(block.rows());
set->initSetElements();
//set->initSetElements();
set->insertFromColumns(columns, set_key_columns);
set->finishInsert();
//block(std::move(block_))
@ -350,6 +352,9 @@ FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {
SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context)
{
if (set->hasExplicitSetElements())
return set;
const auto & settings = context->getSettingsRef();
auto limits = getSizeLimitsForSet(settings, true);

View File

@ -202,15 +202,14 @@ bool Set::insertFromColumns(const Columns & columns, SetKeyColumns & holder)
if (data.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Method Set::setHeader must be called before Set::insertFromBlock");
ColumnRawPtrs key_columns;
key_columns.reserve(keys_size);
holder.key_columns.reserve(keys_size);
holder.materialized_columns.reserve(keys_size);
/// Remember the columns we will work with
for (size_t i = 0; i < keys_size; ++i)
{
holder.key_columns.emplace_back(columns.at(i)->convertToFullIfNeeded());
key_columns.emplace_back(holder.key_columns.back().get());
holder.materialized_columns.emplace_back(columns.at(i)->convertToFullIfNeeded());
holder.key_columns.emplace_back(holder.materialized_columns.back().get());
}
size_t rows = columns.at(0)->size();
@ -219,7 +218,7 @@ bool Set::insertFromColumns(const Columns & columns, SetKeyColumns & holder)
ConstNullMapPtr null_map{};
ColumnPtr null_map_holder;
if (!transform_null_in)
null_map_holder = extractNestedColumnsAndNullMap(key_columns, null_map);
null_map_holder = extractNestedColumnsAndNullMap(holder.key_columns, null_map);
switch (data.type)
{
@ -227,7 +226,7 @@ bool Set::insertFromColumns(const Columns & columns, SetKeyColumns & holder)
break;
#define M(NAME) \
case SetVariants::Type::NAME: \
insertFromBlockImpl(*data.NAME, key_columns, rows, data, null_map, holder.filter ? &holder.filter->getData() : nullptr); \
insertFromBlockImpl(*data.NAME, holder.key_columns, rows, data, null_map, holder.filter ? &holder.filter->getData() : nullptr); \
break;
APPLY_FOR_SET_VARIANTS(M)
#undef M
@ -445,6 +444,11 @@ void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) c
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_)
: has_all_keys(set_elements.size() == indexes_mapping_.size()), indexes_mapping(std::move(indexes_mapping_))
{
// std::cerr << "MergeTreeSetIndex::MergeTreeSetIndex "
// << set_elements.size() << ' ' << indexes_mapping.size() << std::endl;
// for (const auto & vv : indexes_mapping)
// std::cerr << vv.key_index << ' ' << vv.tuple_index << std::endl;
::sort(indexes_mapping.begin(), indexes_mapping.end(),
[](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r)
{
@ -487,6 +491,7 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<K
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types, bool single_point) const
{
size_t tuple_size = indexes_mapping.size();
// std::cerr << "MergeTreeSetIndex::checkInRange " << single_point << ' ' << tuple_size << ' ' << has_all_keys << std::endl;
FieldValues left_point;
FieldValues right_point;

View File

@ -53,7 +53,8 @@ public:
{
//ColumnRawPtrs key_columns;
/// The constant columns to the right of IN are not supported directly. For this, they first materialize.
Columns key_columns;
ColumnRawPtrs key_columns;
Columns materialized_columns;
ColumnPtr null_map_holder;
ColumnUInt8::MutablePtr filter;
};

View File

@ -1174,6 +1174,7 @@ static ActionsDAGPtr buildFilterDAG(
static void buildIndexes(
std::optional<ReadFromMergeTree::Indexes> & indexes,
ActionsDAGPtr filter_actions_dag,
const MergeTreeData & data,
const ContextPtr & context,
const SelectQueryInfo & query_info,
const StorageMetadataPtr & metadata_snapshot)
@ -1196,7 +1197,7 @@ static void buildIndexes(
context,
primary_key_column_names,
primary_key.expression,
array_join_name_set}, {}, false});
array_join_name_set}, {}, {}, {}, false});
}
else
{
@ -1204,7 +1205,22 @@ static void buildIndexes(
query_info,
context,
primary_key_column_names,
primary_key.expression}, {}, false});
primary_key.expression}, {}, {}, {}, false});
}
if (metadata_snapshot->hasPartitionKey())
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key);
auto minmax_expression_actions = data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context));
// minmax_columns_types = data.getMinMaxColumnsTypes(partition_key);
// if (context->getSettingsRef().allow_experimental_analyzer)
indexes->minmax_idx_condition.emplace(filter_actions_dag, context, minmax_columns_names, minmax_expression_actions, NameSet());
// else
// indexes->minmax_idx_condition.emplace(query_info, context, minmax_columns_names, minmax_expression_actions);
indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
}
indexes->use_skip_indexes = settings.use_skip_indexes;
@ -1250,7 +1266,7 @@ void ReadFromMergeTree::onAddFilterFinish()
if (!filter_nodes.nodes.empty())
{
auto filter_actions_dag = buildFilterDAG(context, prewhere_info, filter_nodes, query_info);
buildIndexes(indexes, filter_actions_dag, context, query_info, metadata_for_reading);
buildIndexes(indexes, filter_actions_dag, data, context, query_info, metadata_for_reading);
}
}
@ -1366,7 +1382,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
// }
if (!indexes)
buildIndexes(indexes, query_info.filter_actions_dag, context, query_info, metadata_snapshot);
buildIndexes(indexes, query_info.filter_actions_dag, data, context, query_info, metadata_snapshot);
if (settings.force_primary_key && indexes->key_condition.alwaysUnknownOrTrue())
{
@ -1386,11 +1402,12 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl(
try
{
MergeTreeDataSelectExecutor::filterPartsByPartition(
indexes->partition_pruner,
indexes->minmax_idx_condition,
parts,
part_values,
metadata_snapshot_base,
data,
query_info,
context,
max_block_numbers_to_read.get(),
log,

View File

@ -5,6 +5,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/PartitionPruner.h>
namespace DB
{
@ -164,6 +165,8 @@ public:
struct Indexes
{
KeyCondition key_condition;
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;
UsefulSkipIndexes skip_indexes;
bool use_skip_indexes;
};

View File

@ -1157,6 +1157,8 @@ bool KeyCondition::tryPrepareSetIndex(
RPNElement & out,
size_t & out_key_column_num)
{
// std::cerr << "::: tryPrepareSetIndex for " << func.getColumnName() << std::endl;
// std::cerr << StackTrace().toString() << std::endl;
const auto & left_arg = func.getArgumentAt(0);
out_key_column_num = 0;
@ -1200,7 +1202,10 @@ bool KeyCondition::tryPrepareSetIndex(
}
if (indexes_mapping.empty())
{
// std::cerr << ".. index mapping is empty\n";
return false;
}
const auto right_arg = func.getArgumentAt(1);
@ -1208,7 +1213,10 @@ bool KeyCondition::tryPrepareSetIndex(
auto future_set = right_arg.tryGetPreparedSet(indexes_mapping, data_types);
if (!future_set)
{
// std::cerr << ".. no future set\n";
return false;
}
// LOG_TRACE(&Poco::Logger::get("KK"), "Found set for {}", right_arg.getColumnName());
@ -1220,13 +1228,21 @@ bool KeyCondition::tryPrepareSetIndex(
auto prepared_set = future_set->get();
if (!prepared_set)
{
// std::cerr << ".. no prepared set\n";
return false;
}
// LOG_TRACE(&Poco::Logger::get("KK"), "Set if ready for {}", right_arg.getColumnName());
/// The index can be prepared if the elements of the set were saved in advance.
if (!prepared_set->hasExplicitSetElements())
{
// std::cerr << ".. no explicit elements\n";
return false;
}
// LOG_TRACE(&Poco::Logger::get("KK"), "Has explicit elements for {}", right_arg.getColumnName());
@ -1235,7 +1251,7 @@ bool KeyCondition::tryPrepareSetIndex(
prepared_set->checkTypesEqual(indexes_mapping[i].tuple_index, data_types[i]);
out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping));
// std::cerr << ".. can use\n";
return true;
}

View File

@ -793,38 +793,28 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
}
void MergeTreeDataSelectExecutor::filterPartsByPartition(
std::optional<PartitionPruner> & partition_pruner,
std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,
ReadFromMergeTree::IndexStats & index_stats)
{
const Settings & settings = context->getSettingsRef();
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;
DataTypes minmax_columns_types;
if (metadata_snapshot->hasPartitionKey())
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key);
auto minmax_expression_actions = data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context));
minmax_columns_types = data.getMinMaxColumnsTypes(partition_key);
if (context->getSettingsRef().allow_experimental_analyzer)
minmax_idx_condition.emplace(query_info.filter_actions_dag, context, minmax_columns_names, minmax_expression_actions, NameSet());
else
minmax_idx_condition.emplace(query_info, context, minmax_columns_names, minmax_expression_actions);
partition_pruner.emplace(metadata_snapshot, query_info, context, false /* strict */);
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
{
auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key);
throw Exception(ErrorCodes::INDEX_NOT_USED,
"Neither MinMax index by columns ({}) nor partition expr is used and setting 'force_index_by_date' is set",
fmt::join(minmax_columns_names, ", "));
@ -1835,7 +1825,9 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
if (partition_pruner)
{
if (partition_pruner->canBePruned(*part))
auto val = partition_pruner->canBePruned(*part);
// std::cerr << "... part " << part->getNameWithState() << " cbp ? " << val << std::endl;
if (val)
continue;
}

View File

@ -173,11 +173,12 @@ public:
/// Filter parts using minmax index and partition key.
static void filterPartsByPartition(
std::optional<PartitionPruner> & partition_pruner,
std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
const SelectQueryInfo & query_info,
const ContextPtr & context,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
Poco::Logger * log,

View File

@ -31,12 +31,21 @@ namespace
ColumnWithTypeAndName getPreparedSetInfo(const ConstSetPtr & prepared_set)
{
// std::cerr << "====== " << prepared_set->getDataTypes().size() << std::endl;
if (prepared_set->getDataTypes().size() == 1)
return {prepared_set->getSetElements()[0], prepared_set->getElementsTypes()[0], "dummy"};
Columns set_elements;
for (auto & set_element : prepared_set->getSetElements())
{
// std::cerr << set_element->dumpStructure() << std::endl;
set_elements.emplace_back(set_element->convertToFullColumnIfConst());
}
// for (auto & set_element : prepared_set->getElementsTypes())
// {
// // std::cerr << set_element->getName() << std::endl;
// }
return {ColumnTuple::create(set_elements), std::make_shared<DataTypeTuple>(prepared_set->getElementsTypes()), "dummy"};
}
@ -331,6 +340,7 @@ bool MergeTreeIndexConditionBloomFilter::traverseFunction(const RPNBuilderTreeNo
if (prepared_set && prepared_set->hasExplicitSetElements())
{
const auto prepared_info = getPreparedSetInfo(prepared_set);
// std::cerr << "...... " << prepared_info.dumpStructure() << std::endl;
if (traverseTreeIn(function_name, lhs_argument, prepared_set, prepared_info.type, prepared_info.column, out))
maybe_useful = true;
}
@ -377,6 +387,7 @@ bool MergeTreeIndexConditionBloomFilter::traverseTreeIn(
size_t row_size = column->size();
size_t position = header.getPositionByName(key_node_column_name);
const DataTypePtr & index_type = header.getByPosition(position).type;
// std::cerr << "::::: " << ColumnWithTypeAndName{column, type, ""}.dumpStructure() << " -> " << index_type->getName() << std::endl;
const auto & converted_column = castColumn(ColumnWithTypeAndName{column, type, ""}, index_type);
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithColumn(index_type, converted_column, 0, row_size)));

View File

@ -24,6 +24,19 @@ PartitionPruner::PartitionPruner(const StorageMetadataPtr & metadata, const Sele
{
}
PartitionPruner::PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAGPtr filter_actions_dag, ContextPtr context, bool strict)
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
, partition_condition(filter_actions_dag, context, partition_key.column_names, partition_key.expression, {}, true /* single_point */, strict)
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())
{
// auto description = getKeyCondition().getDescription();
// std::cerr << ".... " << description.condition << std::endl;
// std::cerr << filter_actions_dag->dumpDAG() << std::endl;
// for (const auto & name : partition_key.column_names)
// std::cerr << ". " << name << std::endl;
// std::cerr << partition_key.expression->dumpActions() << std::endl;
}
bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part)
{
if (part.isEmpty())
@ -39,6 +52,8 @@ bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part)
else
{
const auto & partition_value = part.partition.value;
// for (const auto & val : partition_value)
// std::cerr << val.dump() << std::endl;
std::vector<FieldRef> index_value(partition_value.begin(), partition_value.end());
for (auto & field : index_value)
{
@ -49,6 +64,7 @@ bool PartitionPruner::canBePruned(const IMergeTreeDataPart & part)
is_valid = partition_condition.mayBeTrueInRange(
partition_value.size(), index_value.data(), index_value.data(), partition_key.data_types);
// std::cerr << "may be true " << is_valid << std::endl;
partition_filter_map.emplace(partition_id, is_valid);
if (!is_valid)

View File

@ -14,6 +14,7 @@ class PartitionPruner
{
public:
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict);
PartitionPruner(const StorageMetadataPtr & metadata, ActionsDAGPtr filter_actions_dag, ContextPtr context, bool strict);
bool canBePruned(const IMergeTreeDataPart & part);

View File

@ -161,6 +161,17 @@ public:
}
};
/// Type of path to be fetched
enum class ZkPathType
{
Exact, /// Fetch all nodes under this path
Prefix, /// Fetch all nodes starting with this prefix, recursively (multiple paths may match prefix)
Recurse, /// Fatch all nodes under this path, recursively
};
/// List of paths to be feched from zookeeper
using Paths = std::deque<std::pair<String, ZkPathType>>;
class ReadFromSystemZooKeeper final : public SourceStepWithFilter
{
public:
@ -170,11 +181,14 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
void onAddFilterFinish() override;
private:
void fillData(MutableColumns & res_columns) const;
void fillData(MutableColumns & res_columns);
std::shared_ptr<const StorageLimitsList> storage_limits;
ContextPtr context;
Paths paths;
};
StorageSystemZooKeeper::StorageSystemZooKeeper(const StorageID & table_id_)
@ -246,17 +260,6 @@ NamesAndTypesList StorageSystemZooKeeper::getNamesAndTypes()
};
}
/// Type of path to be fetched
enum class ZkPathType
{
Exact, /// Fetch all nodes under this path
Prefix, /// Fetch all nodes starting with this prefix, recursively (multiple paths may match prefix)
Recurse, /// Fatch all nodes under this path, recursively
};
/// List of paths to be feched from zookeeper
using Paths = std::deque<std::pair<String, ZkPathType>>;
static String pathCorrected(const String & path)
{
String path_corrected;
@ -421,10 +424,13 @@ static Paths extractPath(const ActionsDAG::NodeRawConstPtrs & filter_nodes, Cont
}
void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) const
void ReadFromSystemZooKeeper::onAddFilterFinish()
{
Paths paths = extractPath(getFilterNodes().nodes, context, context->getSettingsRef().allow_unrestricted_reads_from_keeper);
paths = extractPath(getFilterNodes().nodes, context, context->getSettingsRef().allow_unrestricted_reads_from_keeper);
}
void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
if (paths.empty())