mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' into fix-02404_memory_bound_merging-with-analyzer
This commit is contained in:
commit
d1c1e52030
@ -335,6 +335,28 @@ const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name)
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ActionsDAG::NodeRawConstPtrs ActionsDAG::findInOutpus(const Names & names) const
|
||||||
|
{
|
||||||
|
NodeRawConstPtrs required_nodes;
|
||||||
|
required_nodes.reserve(names.size());
|
||||||
|
|
||||||
|
std::unordered_map<std::string_view, const Node *> names_map;
|
||||||
|
for (const auto * node : outputs)
|
||||||
|
names_map[node->result_name] = node;
|
||||||
|
|
||||||
|
for (const auto & name : names)
|
||||||
|
{
|
||||||
|
auto it = names_map.find(name);
|
||||||
|
if (it == names_map.end())
|
||||||
|
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
|
||||||
|
"Unknown column: {}, there are only columns {}", name, dumpDAG());
|
||||||
|
|
||||||
|
required_nodes.push_back(it->second);
|
||||||
|
}
|
||||||
|
|
||||||
|
return required_nodes;
|
||||||
|
}
|
||||||
|
|
||||||
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
|
void ActionsDAG::addOrReplaceInOutputs(const Node & node)
|
||||||
{
|
{
|
||||||
for (auto & output_node : outputs)
|
for (auto & output_node : outputs)
|
||||||
@ -441,23 +463,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_
|
|||||||
|
|
||||||
void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_remove_inputs, bool allow_constant_folding)
|
void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_remove_inputs, bool allow_constant_folding)
|
||||||
{
|
{
|
||||||
NodeRawConstPtrs required_nodes;
|
auto required_nodes = findInOutpus(required_names);
|
||||||
required_nodes.reserve(required_names.size());
|
|
||||||
|
|
||||||
std::unordered_map<std::string_view, const Node *> names_map;
|
|
||||||
for (const auto * node : outputs)
|
|
||||||
names_map[node->result_name] = node;
|
|
||||||
|
|
||||||
for (const auto & name : required_names)
|
|
||||||
{
|
|
||||||
auto it = names_map.find(name);
|
|
||||||
if (it == names_map.end())
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
|
|
||||||
"Unknown column: {}, there are only columns {}", name, dumpDAG());
|
|
||||||
|
|
||||||
required_nodes.push_back(it->second);
|
|
||||||
}
|
|
||||||
|
|
||||||
outputs.swap(required_nodes);
|
outputs.swap(required_nodes);
|
||||||
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
|
removeUnusedActions(allow_remove_inputs, allow_constant_folding);
|
||||||
}
|
}
|
||||||
@ -535,6 +541,62 @@ void ActionsDAG::removeUnusedActions(const std::unordered_set<const Node *> & us
|
|||||||
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
|
std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ActionsDAGPtr ActionsDAG::cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases)
|
||||||
|
{
|
||||||
|
auto actions = std::make_shared<ActionsDAG>();
|
||||||
|
std::unordered_map<const Node *, Node *> copy_map;
|
||||||
|
|
||||||
|
struct Frame
|
||||||
|
{
|
||||||
|
const Node * node = nullptr;
|
||||||
|
size_t next_child = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::stack<Frame> stack;
|
||||||
|
|
||||||
|
for (const auto * output : outputs)
|
||||||
|
{
|
||||||
|
if (copy_map.contains(output))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
stack.push(Frame{output});
|
||||||
|
while (!stack.empty())
|
||||||
|
{
|
||||||
|
auto & frame = stack.top();
|
||||||
|
const auto & children = frame.node->children;
|
||||||
|
while (frame.next_child < children.size() && copy_map.contains(children[frame.next_child]))
|
||||||
|
++frame.next_child;
|
||||||
|
|
||||||
|
if (frame.next_child < children.size())
|
||||||
|
{
|
||||||
|
stack.push(Frame{children[frame.next_child]});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto & copy_node = copy_map[frame.node];
|
||||||
|
|
||||||
|
if (remove_aliases && frame.node->type == ActionType::ALIAS)
|
||||||
|
copy_node = copy_map[frame.node->children.front()];
|
||||||
|
else
|
||||||
|
copy_node = &actions->nodes.emplace_back(*frame.node);
|
||||||
|
|
||||||
|
if (frame.node->type == ActionType::INPUT)
|
||||||
|
actions->inputs.push_back(copy_node);
|
||||||
|
|
||||||
|
stack.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto & node : actions->nodes)
|
||||||
|
for (auto & child : node.children)
|
||||||
|
child = copy_map[child];
|
||||||
|
|
||||||
|
for (const auto * output : outputs)
|
||||||
|
actions->outputs.push_back(copy_map[output]);
|
||||||
|
|
||||||
|
return actions;
|
||||||
|
}
|
||||||
|
|
||||||
static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * node, ColumnsWithTypeAndName arguments)
|
static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * node, ColumnsWithTypeAndName arguments)
|
||||||
{
|
{
|
||||||
ColumnWithTypeAndName res_column;
|
ColumnWithTypeAndName res_column;
|
||||||
|
@ -157,6 +157,9 @@ public:
|
|||||||
/// Same, but return nullptr if node not found.
|
/// Same, but return nullptr if node not found.
|
||||||
const Node * tryFindInOutputs(const std::string & name) const;
|
const Node * tryFindInOutputs(const std::string & name) const;
|
||||||
|
|
||||||
|
/// Same, but for the list of names.
|
||||||
|
NodeRawConstPtrs findInOutpus(const Names & names) const;
|
||||||
|
|
||||||
/// Find first node with the same name in output nodes and replace it.
|
/// Find first node with the same name in output nodes and replace it.
|
||||||
/// If was not found, add node to outputs end.
|
/// If was not found, add node to outputs end.
|
||||||
void addOrReplaceInOutputs(const Node & node);
|
void addOrReplaceInOutputs(const Node & node);
|
||||||
@ -260,6 +263,8 @@ public:
|
|||||||
|
|
||||||
ActionsDAGPtr clone() const;
|
ActionsDAGPtr clone() const;
|
||||||
|
|
||||||
|
static ActionsDAGPtr cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases);
|
||||||
|
|
||||||
/// Execute actions for header. Input block must have empty columns.
|
/// Execute actions for header. Input block must have empty columns.
|
||||||
/// Result should be equal to the execution of ExpressionActions built from this DAG.
|
/// Result should be equal to the execution of ExpressionActions built from this DAG.
|
||||||
/// Actions are not changed, no expressions are compiled.
|
/// Actions are not changed, no expressions are compiled.
|
||||||
|
@ -95,7 +95,7 @@ bool allOutputsDependsOnlyOnAllowedNodes(
|
|||||||
{
|
{
|
||||||
const auto & match = matches.at(node);
|
const auto & match = matches.at(node);
|
||||||
/// Function could be mapped into its argument. In this case .monotonicity != std::nullopt (see matchTrees)
|
/// Function could be mapped into its argument. In this case .monotonicity != std::nullopt (see matchTrees)
|
||||||
if (match.node && match.node->result_name == node->result_name && !match.monotonicity)
|
if (match.node && !match.monotonicity)
|
||||||
res = irreducible_nodes.contains(match.node);
|
res = irreducible_nodes.contains(match.node);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,9 +155,10 @@ bool isPartitionKeySuitsGroupByKey(
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
/// We are interested only in calculations required to obtain group by keys (and not aggregate function arguments for example).
|
/// We are interested only in calculations required to obtain group by keys (and not aggregate function arguments for example).
|
||||||
group_by_actions->removeUnusedActions(aggregating.getParams().keys);
|
auto key_nodes = group_by_actions->findInOutpus(aggregating.getParams().keys);
|
||||||
|
auto group_by_key_actions = ActionsDAG::cloneSubDAG(key_nodes, /*remove_aliases=*/ true);
|
||||||
|
|
||||||
const auto & gb_key_required_columns = group_by_actions->getRequiredColumnsNames();
|
const auto & gb_key_required_columns = group_by_key_actions->getRequiredColumnsNames();
|
||||||
|
|
||||||
const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG();
|
const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG();
|
||||||
|
|
||||||
@ -166,9 +167,9 @@ bool isPartitionKeySuitsGroupByKey(
|
|||||||
if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end())
|
if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_actions);
|
const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_key_actions);
|
||||||
|
|
||||||
const auto matches = matchTrees(group_by_actions->getOutputs(), partition_actions);
|
const auto matches = matchTrees(group_by_key_actions->getOutputs(), partition_actions);
|
||||||
|
|
||||||
return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches);
|
return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches);
|
||||||
}
|
}
|
||||||
@ -206,7 +207,7 @@ size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::No
|
|||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
if (!reading->willOutputEachPartitionThroughSeparatePort()
|
if (!reading->willOutputEachPartitionThroughSeparatePort()
|
||||||
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression()->clone(), *aggregating_step))
|
&& isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression(), *aggregating_step))
|
||||||
{
|
{
|
||||||
if (reading->requestOutputEachPartitionThroughSeparatePort())
|
if (reading->requestOutputEachPartitionThroughSeparatePort())
|
||||||
aggregating_step->skipMerging();
|
aggregating_step->skipMerging();
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
#include <Storages/KVStorageUtils.h>
|
#include <Storages/KVStorageUtils.h>
|
||||||
|
|
||||||
|
#include <Columns/ColumnSet.h>
|
||||||
|
|
||||||
#include <Parsers/ASTFunction.h>
|
#include <Parsers/ASTFunction.h>
|
||||||
#include <Parsers/ASTIdentifier.h>
|
#include <Parsers/ASTIdentifier.h>
|
||||||
#include <Parsers/ASTSubquery.h>
|
#include <Parsers/ASTSubquery.h>
|
||||||
@ -121,6 +123,121 @@ bool traverseASTFilter(
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool traverseDAGFilter(
|
||||||
|
const std::string & primary_key, const DataTypePtr & primary_key_type, const ActionsDAG::Node * elem, const ContextPtr & context, FieldVectorPtr & res)
|
||||||
|
{
|
||||||
|
if (elem->type == ActionsDAG::ActionType::ALIAS)
|
||||||
|
return traverseDAGFilter(primary_key, primary_key_type, elem->children.at(0), context, res);
|
||||||
|
|
||||||
|
if (elem->type != ActionsDAG::ActionType::FUNCTION)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto func_name = elem->function_base->getName();
|
||||||
|
|
||||||
|
if (func_name == "and")
|
||||||
|
{
|
||||||
|
// one child has the key filter condition is ok
|
||||||
|
for (const auto * child : elem->children)
|
||||||
|
if (traverseDAGFilter(primary_key, primary_key_type, child, context, res))
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
else if (func_name == "or")
|
||||||
|
{
|
||||||
|
// make sure every child has the key filter condition
|
||||||
|
for (const auto * child : elem->children)
|
||||||
|
if (!traverseDAGFilter(primary_key, primary_key_type, child, context, res))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else if (func_name == "equals" || func_name == "in")
|
||||||
|
{
|
||||||
|
if (elem->children.size() != 2)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (func_name == "in")
|
||||||
|
{
|
||||||
|
const auto * key = elem->children.at(0);
|
||||||
|
while (key->type == ActionsDAG::ActionType::ALIAS)
|
||||||
|
key = key->children.at(0);
|
||||||
|
|
||||||
|
if (key->type != ActionsDAG::ActionType::INPUT)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (key->result_name != primary_key)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
const auto * value = elem->children.at(1);
|
||||||
|
if (value->type != ActionsDAG::ActionType::COLUMN)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
const IColumn * value_col = value->column.get();
|
||||||
|
if (const auto * col_const = typeid_cast<const ColumnConst *>(value_col))
|
||||||
|
value_col = &col_const->getDataColumn();
|
||||||
|
|
||||||
|
const auto * col_set = typeid_cast<const ColumnSet *>(value_col);
|
||||||
|
if (!col_set)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto future_set = col_set->getData();
|
||||||
|
future_set->buildOrderedSetInplace(context);
|
||||||
|
|
||||||
|
auto set = future_set->get();
|
||||||
|
if (!set)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!set->hasExplicitSetElements())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
set->checkColumnsNumber(1);
|
||||||
|
const auto & set_column = *set->getSetElements()[0];
|
||||||
|
|
||||||
|
if (set_column.getDataType() != primary_key_type->getTypeId())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
for (size_t row = 0; row < set_column.size(); ++row)
|
||||||
|
res->push_back(set_column[row]);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
const auto * key = elem->children.at(0);
|
||||||
|
while (key->type == ActionsDAG::ActionType::ALIAS)
|
||||||
|
key = key->children.at(0);
|
||||||
|
|
||||||
|
if (key->type != ActionsDAG::ActionType::INPUT)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (key->result_name != primary_key)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
const auto * value = elem->children.at(1);
|
||||||
|
if (value->type != ActionsDAG::ActionType::COLUMN)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto converted_field = convertFieldToType((*value->column)[0], *primary_key_type);
|
||||||
|
if (!converted_field.isNull())
|
||||||
|
res->push_back(converted_field);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||||
|
const String & primary_key, const DataTypePtr & primary_key_type, const ActionDAGNodes & filter_nodes, const ContextPtr & context)
|
||||||
|
{
|
||||||
|
if (filter_nodes.nodes.empty())
|
||||||
|
return {{}, true};
|
||||||
|
|
||||||
|
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
|
||||||
|
const auto * predicate = filter_actions_dag->getOutputs().at(0);
|
||||||
|
|
||||||
|
FieldVectorPtr res = std::make_shared<FieldVector>();
|
||||||
|
auto matched_keys = traverseDAGFilter(primary_key, primary_key_type, predicate, context, res);
|
||||||
|
return std::make_pair(res, !matched_keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||||
|
@ -21,6 +21,9 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
|
|||||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context);
|
const std::string & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context);
|
||||||
|
|
||||||
|
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||||
|
const String & primary_key, const DataTypePtr & primary_key_type, const ActionDAGNodes & filter_nodes, const ContextPtr & context);
|
||||||
|
|
||||||
template <typename K, typename V>
|
template <typename K, typename V>
|
||||||
void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
||||||
{
|
{
|
||||||
|
@ -20,6 +20,9 @@
|
|||||||
#include <Interpreters/MutationsInterpreter.h>
|
#include <Interpreters/MutationsInterpreter.h>
|
||||||
|
|
||||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||||
|
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||||
|
#include <Processors/QueryPlan/QueryPlan.h>
|
||||||
|
#include <Processors/Sources/NullSource.h>
|
||||||
|
|
||||||
#include <Poco/Logger.h>
|
#include <Poco/Logger.h>
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
@ -440,7 +443,46 @@ void StorageEmbeddedRocksDB::initDB()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Pipe StorageEmbeddedRocksDB::read(
|
class ReadFromEmbeddedRocksDB : public SourceStepWithFilter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
std::string getName() const override { return "ReadFromEmbeddedRocksDB"; }
|
||||||
|
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||||
|
void applyFilters() override;
|
||||||
|
|
||||||
|
ReadFromEmbeddedRocksDB(
|
||||||
|
Block sample_block,
|
||||||
|
StorageSnapshotPtr storage_snapshot_,
|
||||||
|
const StorageEmbeddedRocksDB & storage_,
|
||||||
|
SelectQueryInfo query_info_,
|
||||||
|
ContextPtr context_,
|
||||||
|
size_t max_block_size_,
|
||||||
|
size_t num_streams_)
|
||||||
|
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
|
||||||
|
, storage_snapshot(std::move(storage_snapshot_))
|
||||||
|
, storage(storage_)
|
||||||
|
, query_info(std::move(query_info_))
|
||||||
|
, context(std::move(context_))
|
||||||
|
, max_block_size(max_block_size_)
|
||||||
|
, num_streams(num_streams_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
StorageSnapshotPtr storage_snapshot;
|
||||||
|
const StorageEmbeddedRocksDB & storage;
|
||||||
|
SelectQueryInfo query_info;
|
||||||
|
ContextPtr context;
|
||||||
|
|
||||||
|
size_t max_block_size;
|
||||||
|
size_t num_streams;
|
||||||
|
|
||||||
|
FieldVectorPtr keys;
|
||||||
|
bool all_scan = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
void StorageEmbeddedRocksDB::read(
|
||||||
|
QueryPlan & query_plan,
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageSnapshotPtr & storage_snapshot,
|
const StorageSnapshotPtr & storage_snapshot,
|
||||||
SelectQueryInfo & query_info,
|
SelectQueryInfo & query_info,
|
||||||
@ -450,23 +492,39 @@ Pipe StorageEmbeddedRocksDB::read(
|
|||||||
size_t num_streams)
|
size_t num_streams)
|
||||||
{
|
{
|
||||||
storage_snapshot->check(column_names);
|
storage_snapshot->check(column_names);
|
||||||
|
|
||||||
FieldVectorPtr keys;
|
|
||||||
bool all_scan = false;
|
|
||||||
|
|
||||||
Block sample_block = storage_snapshot->metadata->getSampleBlock();
|
Block sample_block = storage_snapshot->metadata->getSampleBlock();
|
||||||
auto primary_key_data_type = sample_block.getByName(primary_key).type;
|
|
||||||
std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info, context_);
|
auto reading = std::make_unique<ReadFromEmbeddedRocksDB>(
|
||||||
|
std::move(sample_block),
|
||||||
|
storage_snapshot,
|
||||||
|
*this,
|
||||||
|
query_info,
|
||||||
|
context_,
|
||||||
|
max_block_size,
|
||||||
|
num_streams);
|
||||||
|
|
||||||
|
query_plan.addStep(std::move(reading));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReadFromEmbeddedRocksDB::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||||
|
{
|
||||||
|
const auto & sample_block = getOutputStream().header;
|
||||||
|
|
||||||
if (all_scan)
|
if (all_scan)
|
||||||
{
|
{
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
|
auto iterator = std::unique_ptr<rocksdb::Iterator>(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
|
||||||
iterator->SeekToFirst();
|
iterator->SeekToFirst();
|
||||||
return Pipe(std::make_shared<EmbeddedRocksDBSource>(*this, sample_block, std::move(iterator), max_block_size));
|
auto source = std::make_shared<EmbeddedRocksDBSource>(storage, sample_block, std::move(iterator), max_block_size);
|
||||||
|
source->setStorageLimits(query_info.storage_limits);
|
||||||
|
pipeline.init(Pipe(std::move(source)));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (keys->empty())
|
if (keys->empty())
|
||||||
return {};
|
{
|
||||||
|
pipeline.init(Pipe(std::make_shared<NullSource>(sample_block)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
::sort(keys->begin(), keys->end());
|
::sort(keys->begin(), keys->end());
|
||||||
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
|
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
|
||||||
@ -484,13 +542,22 @@ Pipe StorageEmbeddedRocksDB::read(
|
|||||||
size_t begin = num_keys * thread_idx / num_threads;
|
size_t begin = num_keys * thread_idx / num_threads;
|
||||||
size_t end = num_keys * (thread_idx + 1) / num_threads;
|
size_t end = num_keys * (thread_idx + 1) / num_threads;
|
||||||
|
|
||||||
pipes.emplace_back(std::make_shared<EmbeddedRocksDBSource>(
|
auto source = std::make_shared<EmbeddedRocksDBSource>(
|
||||||
*this, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
|
storage, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size);
|
||||||
|
source->setStorageLimits(query_info.storage_limits);
|
||||||
|
pipes.emplace_back(std::move(source));
|
||||||
}
|
}
|
||||||
return Pipe::unitePipes(std::move(pipes));
|
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ReadFromEmbeddedRocksDB::applyFilters()
|
||||||
|
{
|
||||||
|
const auto & sample_block = getOutputStream().header;
|
||||||
|
auto primary_key_data_type = sample_block.getByName(storage.primary_key).type;
|
||||||
|
std::tie(keys, all_scan) = getFilterKeys(storage.primary_key, primary_key_data_type, filter_nodes, context);
|
||||||
|
}
|
||||||
|
|
||||||
SinkToStoragePtr StorageEmbeddedRocksDB::write(
|
SinkToStoragePtr StorageEmbeddedRocksDB::write(
|
||||||
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/, bool /*async_insert*/)
|
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/, bool /*async_insert*/)
|
||||||
{
|
{
|
||||||
|
@ -26,6 +26,7 @@ class Context;
|
|||||||
class StorageEmbeddedRocksDB final : public IStorage, public IKeyValueEntity, WithContext
|
class StorageEmbeddedRocksDB final : public IStorage, public IKeyValueEntity, WithContext
|
||||||
{
|
{
|
||||||
friend class EmbeddedRocksDBSink;
|
friend class EmbeddedRocksDBSink;
|
||||||
|
friend class ReadFromEmbeddedRocksDB;
|
||||||
public:
|
public:
|
||||||
StorageEmbeddedRocksDB(const StorageID & table_id_,
|
StorageEmbeddedRocksDB(const StorageID & table_id_,
|
||||||
const String & relative_data_path_,
|
const String & relative_data_path_,
|
||||||
@ -39,7 +40,8 @@ public:
|
|||||||
|
|
||||||
std::string getName() const override { return "EmbeddedRocksDB"; }
|
std::string getName() const override { return "EmbeddedRocksDB"; }
|
||||||
|
|
||||||
Pipe read(
|
void read(
|
||||||
|
QueryPlan & query_plan,
|
||||||
const Names & column_names,
|
const Names & column_names,
|
||||||
const StorageSnapshotPtr & storage_snapshot,
|
const StorageSnapshotPtr & storage_snapshot,
|
||||||
SelectQueryInfo & query_info,
|
SelectQueryInfo & query_info,
|
||||||
|
@ -14,7 +14,6 @@
|
|||||||
01268_shard_avgweighted
|
01268_shard_avgweighted
|
||||||
01455_shard_leaf_max_rows_bytes_to_read
|
01455_shard_leaf_max_rows_bytes_to_read
|
||||||
01495_subqueries_in_with_statement
|
01495_subqueries_in_with_statement
|
||||||
01504_rocksdb
|
|
||||||
01560_merge_distributed_join
|
01560_merge_distributed_join
|
||||||
01584_distributed_buffer_cannot_find_column
|
01584_distributed_buffer_cannot_find_column
|
||||||
01586_columns_pruning
|
01586_columns_pruning
|
||||||
@ -37,14 +36,12 @@
|
|||||||
02345_implicit_transaction
|
02345_implicit_transaction
|
||||||
02352_grouby_shadows_arg
|
02352_grouby_shadows_arg
|
||||||
02354_annoy
|
02354_annoy
|
||||||
02375_rocksdb_with_filters
|
|
||||||
02402_merge_engine_with_view
|
02402_merge_engine_with_view
|
||||||
02426_orc_bug
|
02426_orc_bug
|
||||||
02428_parameterized_view
|
02428_parameterized_view
|
||||||
02458_use_structure_from_insertion_table
|
02458_use_structure_from_insertion_table
|
||||||
02479_race_condition_between_insert_and_droppin_mv
|
02479_race_condition_between_insert_and_droppin_mv
|
||||||
02493_inconsistent_hex_and_binary_number
|
02493_inconsistent_hex_and_binary_number
|
||||||
02521_aggregation_by_partitions
|
|
||||||
02554_fix_grouping_sets_predicate_push_down
|
02554_fix_grouping_sets_predicate_push_down
|
||||||
02575_merge_prewhere_different_default_kind
|
02575_merge_prewhere_different_default_kind
|
||||||
02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET
|
02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET
|
||||||
|
Loading…
Reference in New Issue
Block a user