* fix fasttest and cmake && pipline for all_scan

* unique the keys
* add inputstream && outputstream
This commit is contained in:
sundy-li 2020-10-20 21:26:09 +08:00
parent c88ede4f97
commit 94cb974f42
10 changed files with 260 additions and 129 deletions

View File

@ -13,9 +13,8 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/rocksdb/CMakeLists.txt")
if (USE_INTERNAL_ROCKSDB_LIBRARY)
message (WARNING "submodule contrib is missing. to fix try run: \n git submodule update --init --recursive")
message(${RECONFIGURE_MESSAGE_LEVEL} "cannot find internal rocksdb")
set (USE_INTERNAL_ROCKSDB_LIBRARY 0)
endif()
set (MISSING_ROCKSDB 1)
set (MISSING_INTERNAL_ROCKSDB 1)
endif ()
if (NOT USE_INTERNAL_ROCKSDB_LIBRARY)
@ -26,7 +25,9 @@ if (NOT USE_INTERNAL_ROCKSDB_LIBRARY)
endif()
endif ()
if (NOT ROCKSDB_LIBRARY AND NOT MISSING_ROCKSDB)
if(ROCKSDB_LIBRARY AND ROCKSDB_INCLUDE_DIR)
set(USE_ROCKSDB 1)
elseif (NOT MISSING_INTERNAL_ROCKSDB)
set (USE_INTERNAL_ROCKSDB_LIBRARY 1)
set (ROCKSDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb/include")

View File

@ -270,7 +270,7 @@ TESTS_TO_SKIP=(
00646_url_engine
00974_query_profiler
# Rocksdb is not enabled by default
# In fasttest, ENABLE_LIBRARIES=0, so rocksdb engine is not enabled by default
01504_rocksdb
# Look at DistributedFilesToInsert, so cannot run in parallel.

View File

@ -0,0 +1,65 @@
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h>
#include <ext/enumerate.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
EmbeddedRocksdbBlockInputStream::EmbeddedRocksdbBlockInputStream(
StorageEmbeddedRocksdb & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t max_block_size_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_block_size(max_block_size_)
{
sample_block = metadata_snapshot->getSampleBlock();
primary_key_pos = sample_block.getPositionByName(storage.primary_key);
}
Block EmbeddedRocksdbBlockInputStream::readImpl()
{
if (finished)
return {};
if (!iterator)
{
iterator = std::unique_ptr<rocksdb::Iterator>(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
iterator->SeekToFirst();
}
MutableColumns columns = sample_block.cloneEmptyColumns();
size_t rows = 0;
for (; iterator->Valid(); iterator->Next())
{
ReadBufferFromString key_buffer(iterator->key());
ReadBufferFromString value_buffer(iterator->value());
for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer);
}
++rows;
if (rows >= max_block_size)
break;
}
finished = !iterator->Valid();
if (!iterator->status().ok())
{
throw Exception("Engine " + getName() + " got error while seeking key value datas: " + iterator->status().ToString(),
ErrorCodes::LOGICAL_ERROR);
}
return sample_block.cloneWithColumns(std::move(columns));
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <rocksdb/db.h>
namespace DB
{
class EmbeddedRocksdbBlockInputStream : public IBlockInputStream
{
public:
EmbeddedRocksdbBlockInputStream(
StorageEmbeddedRocksdb & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_);
String getName() const override { return storage.getName(); }
Block getHeader() const override { return sample_block; }
Block readImpl() override;
private:
StorageEmbeddedRocksdb & storage;
StorageMetadataPtr metadata_snapshot;
const size_t max_block_size;
Block sample_block;
std::unique_ptr<rocksdb::Iterator> iterator;
size_t primary_key_pos;
bool finished = false;
};
}

View File

@ -0,0 +1,50 @@
#include <Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
}
Block EmbeddedRocksdbBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlock();
}
void EmbeddedRocksdbBlockOutputStream::write(const Block & block)
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;
rocksdb::WriteBatch batch;
auto columns = metadata_snapshot->getColumns();
for (size_t i = 0; i < rows; i++)
{
wb_key.restart();
wb_value.restart();
for (const auto & col : columns)
{
const auto & type = block.getByName(col.name).type;
const auto & column = block.getByName(col.name).column;
if (col.name == storage.primary_key)
type->serializeBinary(*column, i, wb_key);
else
type->serializeBinary(*column, i, wb_value);
}
batch.Put(wb_key.str(), wb_value.str());
}
auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok())
throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream
{
public:
explicit EmbeddedRocksdbBlockOutputStream(
StorageEmbeddedRocksdb & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override;
void write(const Block & block) override;
private:
StorageEmbeddedRocksdb & storage;
StorageMetadataPtr metadata_snapshot;
};
}

View File

@ -3,6 +3,8 @@
#include <DataTypes/DataTypeDateTime.h>
#include <Storages/StorageFactory.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h>
#include <Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -17,7 +19,7 @@
#include <IO/WriteBufferFromString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Interpreters/PreparedSets.h>
@ -39,13 +41,12 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SYSTEM_ERROR;
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
// returns keys may be filter by condition
static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res)
static bool traverseASTFilter(const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, FieldVector & res)
{
const auto * function = elem->as<ASTFunction>();
if (!function)
@ -53,34 +54,50 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c
if (function->name == "and")
{
// one child has the key filter condition is ok
for (const auto & child : function->arguments->children)
if (traverseASTFilter(primary_key, child, sets, res))
if (traverseASTFilter(primary_key, primary_key_type, child, sets, res))
return true;
return false;
}
else if (function->name == "or")
{
// make sure every child has the key filter condition
FieldVector child_res;
for (const auto & child : function->arguments->children)
{
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, child_res))
return false;
}
res.insert(res.end(), child_res.begin(), child_res.end());
return true;
}
else if (function->name == "equals" || function->name == "in")
{
const auto & args = function->arguments->as<ASTExpressionList &>();
const ASTIdentifier * ident;
const IAST * value;
if (args.children.size() != 2)
return false;
return false;
const ASTIdentifier * ident;
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
value = args.children.at(1).get();
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
value = args.children.at(0).get();
else
return false;
if (ident->name != primary_key)
return false;
if (function->name == "in" && ((value->as<ASTSubquery>() || value->as<ASTIdentifier>())))
if (function->name == "in")
{
auto set_it = sets.find(PreparedSetKey::forSubquery(*value));
ident = args.children.at(0)->as<ASTIdentifier>();
if (!ident)
return false;
if (ident->name != primary_key)
return false;
value = args.children.at(1).get();
PreparedSetKey set_key;
if ((value->as<ASTSubquery>() || value->as<ASTIdentifier>()))
set_key = PreparedSetKey::forSubquery(*value);
else
set_key = PreparedSetKey::forLiteral(*value, {primary_key_type});
auto set_it = sets.find(set_key);
if (set_it == sets.end())
return false;
SetPtr prepared_set = set_it->second;
@ -89,7 +106,6 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c
return false;
prepared_set->checkColumnsNumber(1);
const auto & set_column = *prepared_set->getSetElements()[0];
for (size_t row = 0; row < set_column.size(); ++row)
{
@ -97,27 +113,24 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c
}
return true;
}
if (const auto * literal = value->as<ASTLiteral>())
else
{
if (function->name == "equals")
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
value = args.children.at(1).get();
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
value = args.children.at(0).get();
else
return false;
if (ident->name != primary_key)
return false;
//function->name == "equals"
if (const auto * literal = value->as<ASTLiteral>())
{
res.push_back(literal->value);
return true;
}
else if (function->name == "in")
{
if (literal->value.getType() == Field::Types::Tuple)
{
auto tuple = literal->value.safeGet<Tuple>();
for (const auto & f : tuple)
{
res.push_back(f);
}
return true;
}
}
else return false;
}
}
return false;
@ -127,7 +140,7 @@ static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, c
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
* TODO support key like search
*/
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const SelectQueryInfo & query_info)
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (!select.where())
@ -135,7 +148,7 @@ static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, co
return std::make_pair(FieldVector{}, true);
}
FieldVector res;
auto matched_keys = traverseASTFilter(primary_key, select.where(), query_info.sets, res);
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.sets, res);
return std::make_pair(res, !matched_keys);
}
@ -149,13 +162,13 @@ public:
const FieldVector & keys_,
const size_t start_,
const size_t end_,
const size_t rocksdb_batch_read_size_)
const size_t max_block_size_)
: SourceWithProgress(metadata_snapshot_->getSampleBlock())
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, start(start_)
, end(end_)
, rocksdb_batch_read_size(rocksdb_batch_read_size_)
, max_block_size(max_block_size_)
{
// slice the keys
if (end > start)
@ -185,7 +198,7 @@ public:
auto columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key);
for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + size_t(rocksdb_batch_read_size)); ++i)
for (size_t i = processed_keys; i < std::min(keys.size(), processed_keys + max_block_size); ++i)
{
key_column.type->serializeBinary(keys[i], wbs[i]);
auto str_ref = wbs[i].stringRef();
@ -206,7 +219,7 @@ public:
}
}
}
processed_keys += rocksdb_batch_read_size;
processed_keys += max_block_size;
UInt64 num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
@ -218,62 +231,13 @@ private:
const StorageMetadataPtr metadata_snapshot;
const size_t start;
const size_t end;
const size_t rocksdb_batch_read_size;
const size_t max_block_size;
FieldVector keys;
size_t processed_keys = 0;
};
class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream
{
public:
explicit EmbeddedRocksdbBlockOutputStream(
StorageEmbeddedRocksdb & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;
rocksdb::WriteBatch batch;
auto columns = metadata_snapshot->getColumns();
for (size_t i = 0; i < rows; i++)
{
wb_key.restart();
wb_value.restart();
for (const auto & col : columns)
{
const auto & type = block.getByName(col.name).type;
const auto & column = block.getByName(col.name).column;
if (col.name == storage.primary_key)
type->serializeBinary(*column, i, wb_key);
else
type->serializeBinary(*column, i, wb_value);
}
batch.Put(wb_key.str(), wb_value.str());
}
auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok())
throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
}
private:
StorageEmbeddedRocksdb & storage;
StorageMetadataPtr metadata_snapshot;
};
StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata_,
@ -318,46 +282,31 @@ Pipe StorageEmbeddedRocksdb::read(
const SelectQueryInfo & query_info,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
size_t max_block_size,
unsigned num_streams)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Block sample_block = metadata_snapshot->getSampleBlock();
size_t primary_key_pos = sample_block.getPositionByName(primary_key);
FieldVector keys;
bool all_scan = false;
std::tie(keys, all_scan) = getFilterKeys(primary_key, query_info);
auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type;
std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info);
if (all_scan)
{
MutableColumns columns = sample_block.cloneEmptyColumns();
auto it = std::unique_ptr<rocksdb::Iterator>(rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next())
{
ReadBufferFromString key_buffer(it->key());
ReadBufferFromString value_buffer(it->value());
for (const auto [idx, column_type] : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
column_type.type->deserializeBinary(*columns[idx], idx == primary_key_pos? key_buffer: value_buffer);
}
}
if (!it->status().ok())
{
throw Exception("Engine " + getName() + " got error while seeking key value datas: " + it->status().ToString(),
ErrorCodes::LOGICAL_ERROR);
}
UInt64 num_rows = columns.at(0)->size();
Chunk chunk(std::move(columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
auto reader = std::make_shared<EmbeddedRocksdbBlockInputStream>(
*this, metadata_snapshot, max_block_size);
return Pipe(std::make_shared<SourceFromInputStream>(reader));
}
else
{
if (keys.empty())
return {};
std::sort(keys.begin(), keys.end());
auto unique_iter = std::unique(keys.begin(), keys.end());
if (unique_iter != keys.end())
keys.erase(unique_iter, keys.end());
Pipes pipes;
size_t start = 0;
size_t end;
@ -365,9 +314,6 @@ Pipe StorageEmbeddedRocksdb::read(
const size_t num_threads = std::min(size_t(num_streams), keys.size());
const size_t batch_per_size = ceil(keys.size() * 1.0 / num_threads);
// TODO settings
static constexpr size_t rocksdb_batch_read_size = 81920;
for (size_t t = 0; t < num_threads; ++t)
{
if (start >= keys.size())
@ -376,7 +322,7 @@ Pipe StorageEmbeddedRocksdb::read(
end = start + batch_per_size > keys.size() ? keys.size() : start + batch_per_size;
pipes.emplace_back(
std::make_shared<EmbeddedRocksdbSource>(*this, metadata_snapshot, keys, start, end, rocksdb_batch_read_size));
std::make_shared<EmbeddedRocksdbSource>(*this, metadata_snapshot, keys, start, end, max_block_size));
start += batch_per_size;
}
return Pipe::unitePipes(std::move(pipes));
@ -391,7 +337,7 @@ BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, con
static StoragePtr create(const StorageFactory::Arguments & args)
{
// TODO custom RocksdbSettings
// TODO custom RocksdbSettings, table function
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",

View File

@ -20,6 +20,7 @@ class StorageEmbeddedRocksdb final : public ext::shared_ptr_helper<StorageEmbedd
friend struct ext::shared_ptr_helper<StorageEmbeddedRocksdb>;
friend class EmbeddedRocksdbSource;
friend class EmbeddedRocksdbBlockOutputStream;
friend class EmbeddedRocksdbBlockInputStream;
public:
std::string getName() const override { return "EmbeddedRocksdb"; }

View File

@ -105,6 +105,12 @@ SRCS(
MutationCommands.cpp
PartitionCommands.cpp
ReadInOrderOptimizer.cpp
<<<<<<< HEAD
=======
registerStorages.cpp
Rocksdb/EmbeddedRocksdbBlockInputStream.cpp
Rocksdb/EmbeddedRocksdbBlockOutputStream.cpp
>>>>>>> * fix fasttest and cmake && pipline for all_scan
Rocksdb/StorageEmbeddedRocksdb.cpp
SelectQueryDescription.cpp
SetSettings.cpp

View File

@ -24,8 +24,8 @@ SELECT A.a - B.a, A.b - B.b, A.c - B.c, A.d - B.d, A.e - B.e FROM ( SELECT 0 AS
CREATE TEMPORARY TABLE keys AS SELECT * FROM numbers(1000);
SET max_rows_to_read = 2;
SELECT dummy == (1,1.2) FROM test WHERE k IN (1, 3);
SELECT k == 4 FROM test WHERE k = 4;
SELECT dummy == (1,1.2) FROM test WHERE k IN (1, 3) OR k IN (1) OR k IN (3, 1) OR k IN [1] OR k IN [1, 3] ;
SELECT k == 4 FROM test WHERE k = 4 OR k IN [4];
SELECT k == 4 FROM test WHERE k IN (SELECT toUInt32(number) FROM keys WHERE number = 4);
SELECT k, value FROM test WHERE k = 0 OR value > 0; -- { serverError 158 }