Fix missing type check in StorageEmbeddedRocksDB

This commit is contained in:
Alexey Milovidov 2021-02-02 10:56:22 +03:00
parent 12be9d51a9
commit 82ab793731
5 changed files with 103 additions and 60 deletions

View File

@ -319,6 +319,7 @@ function run_tests
# In fasttest, ENABLE_LIBRARIES=0, so rocksdb engine is not enabled by default
01504_rocksdb
01686_rocksdb
# Look at DistributedFilesToInsert, so cannot run in parallel.
01460_DistributedFilesToInsert

View File

@ -24,6 +24,7 @@
#include <Interpreters/Set.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/convertFieldToType.h>
#include <Poco/File.h>
#include <Poco/Path.h>
@ -44,9 +45,12 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
using FieldVectorPtr = std::shared_ptr<FieldVector>;
// returns keys may be filter by condition
static bool traverseASTFilter(const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res)
static bool traverseASTFilter(
const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, FieldVectorPtr & res)
{
const auto * function = elem->as<ASTFunction>();
if (!function)
@ -63,13 +67,9 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
else if (function->name == "or")
{
// make sure every child has the key filter condition
FieldVector child_res;
for (const auto & child : function->arguments->children)
{
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, child_res))
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, res))
return false;
}
res.insert(res.end(), child_res.begin(), child_res.end());
return true;
}
else if (function->name == "equals" || function->name == "in")
@ -108,9 +108,7 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
prepared_set->checkColumnsNumber(1);
const auto & set_column = *prepared_set->getSetElements()[0];
for (size_t row = 0; row < set_column.size(); ++row)
{
res.push_back(set_column[row]);
}
res->push_back(set_column[row]);
return true;
}
else
@ -125,10 +123,12 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
if (ident->name() != primary_key)
return false;
//function->name == "equals"
/// function->name == "equals"
if (const auto * literal = value->as<ASTLiteral>())
{
res.push_back(literal->value);
auto converted_field = convertFieldToType(literal->value, *primary_key_type);
if (!converted_field.isNull())
res->push_back(converted_field);
return true;
}
}
@ -140,14 +140,14 @@ static bool traverseASTFilter(const String & primary_key, const DataTypePtr & pr
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
* TODO support key like search
*/
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info)
static std::pair<FieldVectorPtr, bool> getFilterKeys(
const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (!select.where())
{
return std::make_pair(FieldVector{}, true);
}
FieldVector res;
return {{}, true};
FieldVectorPtr res = std::make_shared<FieldVector>();
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.sets, res);
return std::make_pair(res, !matched_keys);
}
@ -159,23 +159,19 @@ public:
EmbeddedRocksDBSource(
const StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const FieldVector & keys_,
const size_t start_,
const size_t end_,
FieldVectorPtr keys_,
FieldVector::const_iterator begin_,
FieldVector::const_iterator end_,
const size_t max_block_size_)
: SourceWithProgress(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, start(start_)
, keys(std::move(keys_))
, begin(begin_)
, end(end_)
, it(begin)
, max_block_size(max_block_size_)
{
// slice the keys
if (end > start)
{
keys.resize(end - start);
std::copy(keys_.begin() + start, keys_.begin() + end, keys.begin());
}
}
String getName() const override
@ -185,27 +181,34 @@ public:
Chunk generate() override
{
if (processed_keys >= keys.size() || (start == end))
if (it >= end)
return {};
std::vector<rocksdb::Slice> slices_keys;
slices_keys.reserve(keys.size());
std::vector<String> values;
std::vector<WriteBufferFromOwnString> wbs(keys.size());
size_t num_keys = end - begin;
std::vector<std::string> serialized_keys(num_keys);
std::vector<rocksdb::Slice> slices_keys(num_keys);
const auto & sample_block = metadata_snapshot->getSampleBlock();
const auto & key_column = sample_block.getByName(storage.primary_key);
auto columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key);
for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + max_block_size); ++i)
size_t rows_processed = 0;
while (it < end && rows_processed < max_block_size)
{
key_column.type->serializeBinary(keys[i], wbs[i]);
auto str_ref = wbs[i].stringRef();
slices_keys.emplace_back(str_ref.data, str_ref.size);
WriteBufferFromString wb(serialized_keys[rows_processed]);
key_column.type->serializeBinary(*it, wb);
wb.finalize();
slices_keys[rows_processed] = std::move(serialized_keys[rows_processed]);
++it;
++rows_processed;
}
std::vector<String> values;
auto statuses = storage.rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i].ok())
@ -221,7 +224,6 @@ public:
}
}
}
processed_keys += max_block_size;
UInt64 num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
@ -231,12 +233,11 @@ private:
const StorageEmbeddedRocksDB & storage;
const StorageMetadataPtr metadata_snapshot;
const size_t start;
const size_t end;
FieldVectorPtr keys;
FieldVector::const_iterator begin;
FieldVector::const_iterator end;
FieldVector::const_iterator it;
const size_t max_block_size;
FieldVector keys;
size_t processed_keys = 0;
};
@ -289,7 +290,8 @@ Pipe StorageEmbeddedRocksDB::read(
unsigned num_streams)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
FieldVector keys;
FieldVectorPtr keys;
bool all_scan = false;
auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type;
@ -302,37 +304,34 @@ Pipe StorageEmbeddedRocksDB::read(
}
else
{
if (keys.empty())
if (keys->empty())
return {};
std::sort(keys.begin(), keys.end());
auto unique_iter = std::unique(keys.begin(), keys.end());
if (unique_iter != keys.end())
keys.erase(unique_iter, keys.end());
std::sort(keys->begin(), keys->end());
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
Pipes pipes;
size_t start = 0;
size_t end;
const size_t num_threads = std::min(size_t(num_streams), keys.size());
const size_t batch_per_size = ceil(keys.size() * 1.0 / num_threads);
size_t num_keys = keys->size();
size_t num_threads = std::min(size_t(num_streams), keys->size());
for (size_t t = 0; t < num_threads; ++t)
assert(num_keys <= std::numeric_limits<uint32_t>::max());
assert(num_threads <= std::numeric_limits<uint32_t>::max());
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
{
if (start >= keys.size())
start = end = 0;
else
end = start + batch_per_size > keys.size() ? keys.size() : start + batch_per_size;
size_t begin = num_keys * thread_idx / num_threads;
size_t end = num_keys * (thread_idx + 1) / num_threads;
pipes.emplace_back(
std::make_shared<EmbeddedRocksDBSource>(*this, metadata_snapshot, keys, start, end, max_block_size));
start += batch_per_size;
pipes.emplace_back(std::make_shared<EmbeddedRocksDBSource>(
*this, metadata_snapshot, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
}
return Pipe::unitePipes(std::move(pipes));
}
}
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<EmbeddedRocksDBBlockOutputStream>(*this, metadata_snapshot);
}

View File

@ -0,0 +1,15 @@
123 Hello, world (123)
--
--
123 Hello, world (123)
4567 Hello, world (4567)
--
--
0 Hello, world (0)
--
123 Hello, world (123)
456 Hello, world (456)
--
99 Hello, world (99)
999 Hello, world (999)
9999 Hello, world (9999)

View File

@ -0,0 +1,27 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt64, value String) Engine=EmbeddedRocksDB PRIMARY KEY(key);
INSERT INTO test SELECT number, format('Hello, world ({})', toString(number)) FROM numbers(10000);
SELECT * FROM test WHERE key = 123;
SELECT '--';
SELECT * FROM test WHERE key = -123;
SELECT '--';
SELECT * FROM test WHERE key = 123 OR key = 4567 ORDER BY key;
SELECT '--';
SELECT * FROM test WHERE key = NULL;
SELECT '--';
SELECT * FROM test WHERE key = NULL OR key = 0;
SELECT '--';
SELECT * FROM test WHERE key IN (123, 456, -123) ORDER BY key;
SELECT '--';
SELECT * FROM test WHERE key = 'Hello'; -- { serverError 53 }
DETACH TABLE test NO DELAY;
ATTACH TABLE test;
SELECT * FROM test WHERE key IN (99, 999, 9999, -123) ORDER BY key;
DROP TABLE IF EXISTS test;

View File

@ -200,3 +200,4 @@
01676_clickhouse_client_autocomplete
01671_aggregate_function_group_bitmap_data
01674_executable_dictionary_implicit_key
01686_rocksdb