add supportsIndexForIn && supportsParallelInsert, support parallel reading

This commit is contained in:
sundy-li 2020-10-19 01:09:00 +08:00
parent b9e663af25
commit f95a66b584
5 changed files with 170 additions and 66 deletions

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
@ -18,6 +19,8 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TreeRewriter.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnString.h>
@ -36,12 +39,13 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SYSTEM_ERROR;
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
// returns keys may be filter by condition
static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, FieldVector & res)
static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res)
{
const auto * function = elem->as<ASTFunction>();
if (!function)
@ -50,7 +54,7 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, F
if (function->name == "and")
{
for (const auto & child : function->arguments->children)
if (traverseASTFilter(primary_key, child, res))
if (traverseASTFilter(primary_key, child, sets, res))
return true;
return false;
}
@ -73,6 +77,27 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, F
if (ident->name != primary_key)
return false;
if (function->name == "in" && ((value->as<ASTSubquery>() || value->as<ASTIdentifier>())))
{
auto set_it = sets.find(PreparedSetKey::forSubquery(*value));
if (set_it == sets.end())
return false;
SetPtr prepared_set = set_it->second;
if (!prepared_set->hasExplicitSetElements())
return false;
prepared_set->checkColumnsNumber(1);
const auto & set_column = *prepared_set->getSetElements()[0];
for (size_t row = 0; row < set_column.size(); ++row)
{
res.push_back(set_column[row]);
}
return true;
}
if (const auto * literal = value->as<ASTLiteral>())
{
if (function->name == "equals")
@ -80,14 +105,17 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, F
res.push_back(literal->value);
return true;
}
else if (function->name == "in" && literal->value.getType() == Field::Types::Tuple)
else if (function->name == "in")
{
auto tuple = literal->value.safeGet<Tuple>();
for (const auto & f : tuple)
if (literal->value.getType() == Field::Types::Tuple)
{
res.push_back(f);
auto tuple = literal->value.safeGet<Tuple>();
for (const auto & f : tuple)
{
res.push_back(f);
}
return true;
}
return true;
}
else return false;
}
@ -99,19 +127,104 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, F
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
* TODO support key like search
*/
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const ASTPtr & query)
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const SelectQueryInfo & query_info)
{
const auto & select = query->as<ASTSelectQuery &>();
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (!select.where())
{
return std::make_pair(FieldVector{}, true);
}
FieldVector res;
auto matched_keys = traverseASTFilter(primary_key, select.where(), res);
auto matched_keys = traverseASTFilter(primary_key, select.where(), query_info.sets, res);
return std::make_pair(res, !matched_keys);
}
class EmbeddedRocksdbSource : public SourceWithProgress
{
public:
EmbeddedRocksdbSource(
const StorageEmbeddedRocksdb & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const FieldVector & keys_,
const size_t start_,
const size_t end_,
const size_t rocksdb_batch_read_size_)
: SourceWithProgress(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, start(start_)
, end(end_)
, rocksdb_batch_read_size(rocksdb_batch_read_size_)
{
// slice the keys
if (end > start)
{
keys.resize(end - start);
std::copy(keys_.begin() + start, keys_.begin() + end, keys.begin());
}
}
String getName() const override
{
return storage.getName();
}
Chunk generate() override
{
if (processed_keys >= keys.size() || (start == end))
return {};
std::vector<rocksdb::Slice> slices_keys;
slices_keys.reserve(keys.size());
std::vector<String> values;
std::vector<WriteBufferFromOwnString> wbs(keys.size());
const auto & sample_block = metadata_snapshot->getSampleBlock();
const auto & key_column = sample_block.getByName(storage.primary_key);
auto columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key);
for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + size_t(rocksdb_batch_read_size)); ++i)
{
key_column.type->serializeBinary(keys[i], wbs[i]);
auto str_ref = wbs[i].stringRef();
slices_keys.emplace_back(str_ref.data, str_ref.size);
}
auto statuses = storage.rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i].ok())
{
ReadBufferFromString key_buffer(slices_keys[i]);
ReadBufferFromString value_buffer(values[i]);
for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer);
}
}
}
processed_keys += rocksdb_batch_read_size;
UInt64 num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
}
private:
const StorageEmbeddedRocksdb & storage;
const StorageMetadataPtr metadata_snapshot;
const size_t start;
const size_t end;
const size_t rocksdb_batch_read_size;
FieldVector keys;
size_t processed_keys = 0;
};
class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream
{
public:
@ -206,19 +319,19 @@ Pipe StorageEmbeddedRocksdb::read(
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
unsigned num_streams)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Block sample_block = metadata_snapshot->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(primary_key);
FieldVector keys;
bool all_scan = false;
std::tie(keys, all_scan) = getFilterKeys(primary_key, query_info.query);
// TODO pipline
std::tie(keys, all_scan) = getFilterKeys(primary_key, query_info);
if (all_scan)
{
MutableColumns columns = sample_block.cloneEmptyColumns();
auto it = std::unique_ptr<rocksdb::Iterator>(rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next())
{
@ -227,49 +340,47 @@ Pipe StorageEmbeddedRocksdb::read(
for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
if (column_type.name == primary_key)
column_type.type->deserializeBinary(*columns[idx], key_buffer);
else
column_type.type->deserializeBinary(*columns[idx], value_buffer);
column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer);
}
}
assert(it->status().ok());
if (!it->status().ok())
{
throw Exception("Engine " + getName() + " got error while seeking key value datas: " + it->status().ToString(),
ErrorCodes::LOGICAL_ERROR);
}
UInt64 num_rows = columns.at(0)->size();
Chunk chunk(std::move(columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
}
else
{
std::vector<rocksdb::Slice> slices_keys;
std::vector<String> values;
std::vector<WriteBufferFromOwnString> wbs(keys.size());
if (keys.empty())
return {};
for (size_t i = 0; i < keys.size(); ++i)
Pipes pipes;
size_t start = 0;
size_t end;
const size_t num_threads = std::min(size_t(num_streams), keys.size());
const size_t batch_per_size = ceil(keys.size() * 1.0 / num_threads);
// TODO settings
static constexpr size_t rocksdb_batch_read_size = 81920;
for (size_t t = 0; t < num_threads; ++t)
{
sample_block.getByName(primary_key).type->serializeBinary(keys[i], wbs[i]);
auto str_ref = wbs[i].stringRef();
slices_keys.emplace_back(str_ref.data, str_ref.size);
}
if (start >= keys.size())
start = end = 0;
else
end = start + batch_per_size > keys.size() ? keys.size() : start + batch_per_size;
auto statuses = rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i].ok())
{
ReadBufferFromString key_buffer(slices_keys[i]);
ReadBufferFromString value_buffer(values[i]);
for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
if (column_type.name == primary_key)
column_type.type->deserializeBinary(*columns[idx], key_buffer);
else
column_type.type->deserializeBinary(*columns[idx], value_buffer);
}
}
pipes.emplace_back(
std::make_shared<EmbeddedRocksdbSource>(*this, metadata_snapshot, keys, start, end, rocksdb_batch_read_size));
start += batch_per_size;
}
return Pipe::unitePipes(std::move(pipes));
}
UInt64 num_rows = columns.at(0)->size();
Chunk chunk(std::move(columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
}
BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
@ -280,7 +391,7 @@ BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, con
static StoragePtr create(const StorageFactory::Arguments & args)
{
// TODO RocksdbSettings
// TODO custom RocksdbSettings
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",

View File

@ -18,6 +18,7 @@ class Context;
class StorageEmbeddedRocksdb final : public ext::shared_ptr_helper<StorageEmbeddedRocksdb>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageEmbeddedRocksdb>;
friend class EmbeddedRocksdbSource;
friend class EmbeddedRocksdbBlockOutputStream;
public:
std::string getName() const override { return "EmbeddedRocksdb"; }
@ -34,6 +35,13 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(const ASTPtr & node, const Context & /*query_context*/, const StorageMetadataPtr & /*metadata_snapshot*/) const override
{
return node->getColumnName() == primary_key;
}
protected:
StorageEmbeddedRocksdb(const StorageID & table_id_,
const String & relative_data_path_,

View File

@ -1,18 +0,0 @@
<test max_ignored_relative_change="0.4">
<preconditions>
<table_exists>hits_100m_single</table_exists>
</preconditions>
<settings>
<max_memory_usage>30000000000</max_memory_usage>
</settings>
<create_query>CREATE TABLE ma_kv (key String, value AggregateFunction(groupBitmap, UInt64)) EmbeddedRocksdb primary key(key)</create_query>
<fill_query> INSERT INTO ma_kv SELECT concat('CodeVersion=', CodeVersion) as key, bitmapBuild(groupArray(UserID)) AS value FROM hits_100m_single GROUP BY key</fill_query>
<query>SELECT groupBitmapOr(value) FROM ma_kv WHERE key IN ('CodeVersion=1657', 'CodeVersion=1', 'CodeVersion=275')</query>
<drop_query>DROP TABLE IF EXISTS ma_kv</drop_query>
</test>

View File

@ -5,3 +5,4 @@
1
1
1
1

View File

@ -21,10 +21,12 @@ INSERT INTO test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2),
SELECT A.a - B.a, A.b - B.b, A.c - B.c, A.d - B.d, A.e - B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test_memory) B USING a ORDER BY a;
CREATE TEMPORARY TABLE keys AS SELECT * FROM numbers(1000);
SET max_rows_to_read = 2;
SELECT dummy == (1,1.2) FROM test WHERE k IN (1, 3);
SELECT k == 4 FROM test WHERE k = 4;
SELECT k == 4 FROM test WHERE k IN (SELECT toUInt32(number) FROM keys WHERE number = 4);
SELECT k, value FROM test WHERE k = 0 OR value > 0; -- { serverError 158 }
TRUNCATE TABLE test;