mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Fix rocksdb with analyzer.
This commit is contained in:
parent
1267246785
commit
34fdb8a7c6
@ -1,5 +1,7 @@
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
|
||||
#include <Columns/ColumnSet.h>
|
||||
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
@ -121,6 +123,121 @@ bool traverseASTFilter(
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool traverseDAGFilter(
|
||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const ActionsDAG::Node * elem, const ContextPtr & context, FieldVectorPtr & res)
|
||||
{
|
||||
if (elem->type == ActionsDAG::ActionType::ALIAS)
|
||||
return traverseDAGFilter(primary_key, primary_key_type, elem->children.at(0), context, res);
|
||||
|
||||
if (elem->type != ActionsDAG::ActionType::FUNCTION)
|
||||
return false;
|
||||
|
||||
auto func_name = elem->function_base->getName();
|
||||
|
||||
if (func_name == "and")
|
||||
{
|
||||
// one child has the key filter condition is ok
|
||||
for (const auto * child : elem->children)
|
||||
if (traverseDAGFilter(primary_key, primary_key_type, child, context, res))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (func_name == "or")
|
||||
{
|
||||
// make sure every child has the key filter condition
|
||||
for (const auto * child : elem->children)
|
||||
if (!traverseDAGFilter(primary_key, primary_key_type, child, context, res))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
else if (func_name == "equals" || func_name == "in")
|
||||
{
|
||||
if (elem->children.size() != 2)
|
||||
return false;
|
||||
|
||||
if (func_name == "in")
|
||||
{
|
||||
const auto * key = elem->children.at(0);
|
||||
while (key->type == ActionsDAG::ActionType::ALIAS)
|
||||
key = key->children.at(0);
|
||||
|
||||
if (key->type != ActionsDAG::ActionType::INPUT)
|
||||
return false;
|
||||
|
||||
if (key->result_name != primary_key)
|
||||
return false;
|
||||
|
||||
const auto * value = elem->children.at(1);
|
||||
if (value->type != ActionsDAG::ActionType::COLUMN)
|
||||
return false;
|
||||
|
||||
const IColumn * value_col = value->column.get();
|
||||
if (const auto * col_const = typeid_cast<const ColumnConst *>(value_col))
|
||||
value_col = &col_const->getDataColumn();
|
||||
|
||||
const auto * col_set = typeid_cast<const ColumnSet *>(value_col);
|
||||
if (!col_set)
|
||||
return false;
|
||||
|
||||
auto future_set = col_set->getData();
|
||||
future_set->buildOrderedSetInplace(context);
|
||||
|
||||
auto set = future_set->get();
|
||||
if (!set)
|
||||
return false;
|
||||
|
||||
if (!set->hasExplicitSetElements())
|
||||
return false;
|
||||
|
||||
set->checkColumnsNumber(1);
|
||||
const auto & set_column = *set->getSetElements()[0];
|
||||
|
||||
if (set_column.getDataType() != primary_key_type->getTypeId())
|
||||
return false;
|
||||
|
||||
for (size_t row = 0; row < set_column.size(); ++row)
|
||||
res->push_back(set_column[row]);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto * key = elem->children.at(0);
|
||||
while (key->type == ActionsDAG::ActionType::ALIAS)
|
||||
key = key->children.at(0);
|
||||
|
||||
if (key->type != ActionsDAG::ActionType::INPUT)
|
||||
return false;
|
||||
|
||||
if (key->result_name != primary_key)
|
||||
return false;
|
||||
|
||||
const auto * value = elem->children.at(1);
|
||||
if (value->type != ActionsDAG::ActionType::COLUMN)
|
||||
return false;
|
||||
|
||||
auto converted_field = convertFieldToType((*value->column)[0], *primary_key_type);
|
||||
if (!converted_field.isNull())
|
||||
res->push_back(converted_field);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const ActionDAGNodes & filter_nodes, const ContextPtr & context)
|
||||
{
|
||||
if (filter_nodes.nodes.empty())
|
||||
return {{}, true};
|
||||
|
||||
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
|
||||
const auto * predicate = filter_actions_dag->getOutputs().at(0);
|
||||
|
||||
FieldVectorPtr res = std::make_shared<FieldVector>();
|
||||
auto matched_keys = traverseDAGFilter(primary_key, primary_key_type, predicate, context, res);
|
||||
return std::make_pair(res, !matched_keys);
|
||||
}
|
||||
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
|
@ -21,6 +21,9 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context);
|
||||
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const ActionDAGNodes & filter_nodes, const ContextPtr & context);
|
||||
|
||||
template <typename K, typename V>
|
||||
void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
||||
{
|
||||
|
@ -20,6 +20,8 @@
|
||||
#include <Interpreters/MutationsInterpreter.h>
|
||||
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
@ -415,7 +417,46 @@ void StorageEmbeddedRocksDB::initDB()
|
||||
}
|
||||
}
|
||||
|
||||
Pipe StorageEmbeddedRocksDB::read(
|
||||
class ReadFromEmbeddedRocksDB : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromEmbeddedRocksDB"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
void applyFilters() override;
|
||||
|
||||
ReadFromEmbeddedRocksDB(
|
||||
Block sample_block,
|
||||
StorageSnapshotPtr storage_snapshot_,
|
||||
const StorageEmbeddedRocksDB & storage_,
|
||||
SelectQueryInfo query_info_,
|
||||
ContextPtr context_,
|
||||
size_t max_block_size_,
|
||||
size_t num_streams_)
|
||||
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
|
||||
, storage_snapshot(std::move(storage_snapshot_))
|
||||
, storage(storage_)
|
||||
, query_info(std::move(query_info_))
|
||||
, context(std::move(context_))
|
||||
, max_block_size(max_block_size_)
|
||||
, num_streams(num_streams_)
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
StorageSnapshotPtr storage_snapshot;
|
||||
const StorageEmbeddedRocksDB & storage;
|
||||
SelectQueryInfo query_info;
|
||||
ContextPtr context;
|
||||
|
||||
size_t max_block_size;
|
||||
size_t num_streams;
|
||||
|
||||
FieldVectorPtr keys;
|
||||
bool all_scan = false;
|
||||
};
|
||||
|
||||
void StorageEmbeddedRocksDB::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -425,23 +466,36 @@ Pipe StorageEmbeddedRocksDB::read(
|
||||
size_t num_streams)
|
||||
{
|
||||
storage_snapshot->check(column_names);
|
||||
|
||||
FieldVectorPtr keys;
|
||||
bool all_scan = false;
|
||||
|
||||
Block sample_block = storage_snapshot->metadata->getSampleBlock();
|
||||
auto primary_key_data_type = sample_block.getByName(primary_key).type;
|
||||
std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info, context_);
|
||||
|
||||
auto reading = std::make_unique<ReadFromEmbeddedRocksDB>(
|
||||
std::move(sample_block),
|
||||
storage_snapshot,
|
||||
*this,
|
||||
query_info,
|
||||
context_,
|
||||
max_block_size,
|
||||
num_streams);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromEmbeddedRocksDB::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
const auto & sample_block = getOutputStream().header;
|
||||
|
||||
if (all_scan)
|
||||
{
|
||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
|
||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
|
||||
iterator->SeekToFirst();
|
||||
return Pipe(std::make_shared<EmbeddedRocksDBSource>(*this, sample_block, std::move(iterator), max_block_size));
|
||||
auto source = std::make_shared<EmbeddedRocksDBSource>(storage, sample_block, std::move(iterator), max_block_size);
|
||||
source->setStorageLimits(query_info.storage_limits);
|
||||
pipeline.init(Pipe(std::move(source)));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (keys->empty())
|
||||
return {};
|
||||
return;
|
||||
|
||||
::sort(keys->begin(), keys->end());
|
||||
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
|
||||
@ -459,13 +513,22 @@ Pipe StorageEmbeddedRocksDB::read(
|
||||
size_t begin = num_keys * thread_idx / num_threads;
|
||||
size_t end = num_keys * (thread_idx + 1) / num_threads;
|
||||
|
||||
pipes.emplace_back(std::make_shared<EmbeddedRocksDBSource>(
|
||||
*this, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
|
||||
auto source = std::make_shared<EmbeddedRocksDBSource>(
|
||||
storage, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size);
|
||||
source->setStorageLimits(query_info.storage_limits);
|
||||
pipes.emplace_back(std::move(source));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
pipeline.init(Pipe::unitePipes(std::move(pipes)));
|
||||
}
|
||||
}
|
||||
|
||||
void ReadFromEmbeddedRocksDB::applyFilters()
|
||||
{
|
||||
const auto & sample_block = getOutputStream().header;
|
||||
auto primary_key_data_type = sample_block.getByName(storage.primary_key).type;
|
||||
std::tie(keys, all_scan) = getFilterKeys(storage.primary_key, primary_key_data_type, filter_nodes, context);
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageEmbeddedRocksDB::write(
|
||||
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/, bool /*async_insert*/)
|
||||
{
|
||||
|
@ -26,6 +26,7 @@ class Context;
|
||||
class StorageEmbeddedRocksDB final : public IStorage, public IKeyValueEntity, WithContext
|
||||
{
|
||||
friend class EmbeddedRocksDBSink;
|
||||
friend class ReadFromEmbeddedRocksDB;
|
||||
public:
|
||||
StorageEmbeddedRocksDB(const StorageID & table_id_,
|
||||
const String & relative_data_path_,
|
||||
@ -39,7 +40,8 @@ public:
|
||||
|
||||
std::string getName() const override { return "EmbeddedRocksDB"; }
|
||||
|
||||
Pipe read(
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
|
@ -14,7 +14,6 @@
|
||||
01268_shard_avgweighted
|
||||
01455_shard_leaf_max_rows_bytes_to_read
|
||||
01495_subqueries_in_with_statement
|
||||
01504_rocksdb
|
||||
01560_merge_distributed_join
|
||||
01584_distributed_buffer_cannot_find_column
|
||||
01586_columns_pruning
|
||||
@ -39,7 +38,6 @@
|
||||
02352_grouby_shadows_arg
|
||||
02354_annoy
|
||||
02366_union_decimal_conversion
|
||||
02375_rocksdb_with_filters
|
||||
02402_merge_engine_with_view
|
||||
02404_memory_bound_merging
|
||||
02426_orc_bug
|
||||
|
Loading…
Reference in New Issue
Block a user