implement StorageGenerate

This commit is contained in:
Yatsishin Ilya 2020-02-26 17:12:07 +03:00
parent 7b462e2070
commit d2ab30ec48
8 changed files with 219 additions and 98 deletions

View File

@ -1,8 +1,12 @@
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageGenerate.h>
#include <Storages/StorageFactory.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Pipe.h>
#include <Parsers/ASTLiteral.h>
#include <Core/Block.h>
#include <Storages/StorageValues.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
@ -16,39 +20,24 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Common/SipHash.h>
#include <Common/randomSeed.h>
#include <pcg_random.hpp>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionRandom.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int BAD_TYPE_OF_FIELD;
extern const int LOGICAL_ERROR;
extern const int DATABASE_ACCESS_DENIED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
}
void fillColumnWithRandomData(IColumn & column, DataTypePtr type, UInt64 limit,
UInt64 max_array_length, UInt64 max_string_length, UInt64 random_seed)
{
TypeIndex idx = type->getTypeId();
if (!random_seed)
random_seed = randomSeed();
(void) max_string_length;
switch (idx)
@ -328,7 +317,7 @@ void fillColumnWithRandomData(IColumn & column, DataTypePtr type, UInt64 limit,
data[i] = x;
}
}
break;
break;
case TypeIndex::UUID:
{
auto & data = typeid_cast<ColumnVector<UInt128> &>(column).getData();
@ -342,7 +331,7 @@ void fillColumnWithRandomData(IColumn & column, DataTypePtr type, UInt64 limit,
data[i] = x;
}
}
break;
break;
case TypeIndex::Array:
{
auto & column_array = typeid_cast<ColumnArray &>(column);
@ -380,23 +369,23 @@ void fillColumnWithRandomData(IColumn & column, DataTypePtr type, UInt64 limit,
case TypeIndex::Interval:
throw Exception("Type 'Interval' can not be stored in a table.", ErrorCodes::LOGICAL_ERROR);
case TypeIndex::Nullable:
{
auto & column_nullable = typeid_cast<ColumnNullable &>(column);
auto nested_type = typeid_cast<const DataTypeNullable *>(type.get())->getNestedType();
auto & null_map = column_nullable.getNullMapData();
IColumn & nested_column = column_nullable.getNestedColumn();
fillColumnWithRandomData(nested_column, nested_type, limit, max_array_length, max_string_length, random_seed);
pcg32 generator(random_seed);
null_map.resize(limit);
for (UInt64 i = 0; i < limit; ++i)
{
auto & column_nullable = typeid_cast<ColumnNullable &>(column);
auto nested_type = typeid_cast<const DataTypeNullable *>(type.get())->getNestedType();
auto & null_map = column_nullable.getNullMapData();
IColumn & nested_column = column_nullable.getNestedColumn();
fillColumnWithRandomData(nested_column, nested_type, limit, max_array_length, max_string_length, random_seed);
pcg32 generator(random_seed);
null_map.resize(limit);
for (UInt64 i = 0; i < limit; ++i)
{
null_map[i] = generator() < 1024;
}
break;
null_map[i] = generator() < 1024;
}
break;
}
case TypeIndex::Function:
throw Exception("Type 'Funclion' can not be stored in a table.", ErrorCodes::LOGICAL_ERROR);
case TypeIndex::AggregateFunction:
@ -406,72 +395,85 @@ void fillColumnWithRandomData(IColumn & column, DataTypePtr type, UInt64 limit,
}
}
StoragePtr TableFunctionRandom::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
StorageGenerate::StorageGenerate(const StorageID & table_id_, const ColumnsDescription & columns_,
UInt64 max_array_length_, UInt64 max_string_length_, UInt64 random_seed_)
: IStorage(table_id_), max_array_length(max_array_length_), max_string_length(max_string_length_), random_seed(random_seed_)
{
ASTs & args_func = ast_function->children;
setColumns(columns_);
}
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.at(0)->children;
if (args.size() < 2)
throw Exception("Table function '" + getName() + "' requires at least two arguments: "\
" structure, limit(, max_array_length, max_string_length, random_seed).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (args.size() > 5)
throw Exception("Table function '" + getName() + "' requires at most five arguments: "\
" structure, limit, max_array_length, max_string_length, random_seed.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// Parsing first argument as table structure and creating a sample block
std::string structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
UInt64 limit = 1;
UInt64 max_array_length = 10;
UInt64 max_string_length = 10;
UInt64 random_seed = 0; // zero for random
/// Parsing second argument if present
if (args.size() >= 2)
limit = args[1]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (!limit)
throw Exception("Table function '" + getName() + "' limit should not be 0.", ErrorCodes::BAD_ARGUMENTS);
if (args.size() >= 3)
max_array_length = args[1]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (args.size() >= 4)
max_string_length = args[1]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (args.size() == 5)
random_seed = args[1]->as<ASTLiteral &>().value.safeGet<UInt64>();
ColumnsDescription columns = parseColumnsListFromString(structure, context);
Block res_block;
for (const auto & name_type : columns.getOrdinary())
void registerStorageGenerate(StorageFactory & factory)
{
factory.registerStorage("Generate", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() < 1)
throw Exception("Storage Generate requires at least one argument: "\
" structure(, max_array_length, max_string_length, random_seed).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (engine_args.size() > 5)
throw Exception("Storage Generate requires at most five arguments: "\
" structure, max_array_length, max_string_length, random_seed.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// Parsing first argument as table structure and creating a sample block
std::string structure = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
UInt64 max_array_length_ = 10;
UInt64 max_string_length_ = 10;
UInt64 random_seed_ = 0; // zero for random
/// Parsing second argument if present
if (engine_args.size() >= 2)
max_array_length_ = engine_args[1]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (engine_args.size() >= 3)
max_string_length_ = engine_args[2]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (engine_args.size() == 4)
random_seed_ = engine_args[3]->as<ASTLiteral &>().value.safeGet<UInt64>();
/// do not use predefined seed
if (!random_seed_)
random_seed_ = randomSeed();
return StorageGenerate::create(args.table_id, args.columns, max_array_length_, max_string_length_, random_seed_);
});
}
Pipes StorageGenerate::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned /*num_streams*/)
{
check(column_names, true);
Pipes pipes;
const ColumnsDescription & columns_ = getColumns();
for (const auto & name : column_names)
{
const auto & name_type = columns_.get(name);
MutableColumnPtr column = name_type.type->createColumn();
res_block.insert({std::move(column), name_type.type, name_type.name});
}
for (auto & ctn : res_block.getColumnsWithTypeAndName())
{
fillColumnWithRandomData(ctn.column->assumeMutableRef(), ctn.type, limit, max_array_length, max_string_length, random_seed);
fillColumnWithRandomData(ctn.column->assumeMutableRef(), ctn.type, max_block_size, max_array_length, max_string_length, random_seed);
}
auto res = StorageValues::create(StorageID(getDatabaseName(), table_name), columns, res_block);
res->startup();
return res;
}
void registerTableFunctionRandom(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionRandom>(TableFunctionFactory::CaseInsensitive);
Chunk chunk(res_block.getColumns(), res_block.rows());
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(res_block.cloneEmpty(), std::move(chunk)));
return pipes;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
namespace DB
{
/* Generates random data for given schema.
*/
class StorageGenerate : public ext::shared_ptr_helper<StorageGenerate>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageGenerate>;
public:
std::string getName() const override { return "Generate"; }
Pipes read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
Block res_block;
UInt64 max_array_length = 10;
UInt64 max_string_length = 10;
UInt64 random_seed = 0;
protected:
StorageGenerate(const StorageID & table_id_, const ColumnsDescription & columns_,
UInt64 max_array_length, UInt64 max_string_length,UInt64 random_seed);
};
}

View File

@ -29,8 +29,9 @@ void registerStorages()
registerStorageView(factory);
registerStorageMaterializedView(factory);
registerStorageLiveView(factory);
registerStorageGenerate(factory);
#if USE_AWS_S3
#if USE_AWS_S3
registerStorageS3(factory);
#endif

View File

@ -23,6 +23,7 @@ void registerStorageJoin(StorageFactory & factory);
void registerStorageView(StorageFactory & factory);
void registerStorageMaterializedView(StorageFactory & factory);
void registerStorageLiveView(StorageFactory & factory);
void registerStorageGenerate(StorageFactory & factory);
#if USE_AWS_S3
void registerStorageS3(StorageFactory & factory);

View File

@ -0,0 +1,80 @@
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Core/Block.h>
#include <Storages/StorageGenerate.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionGenerate.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int BAD_TYPE_OF_FIELD;
extern const int LOGICAL_ERROR;
}
StoragePtr TableFunctionGenerate::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
{
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::LOGICAL_ERROR);
ASTs & args = args_func.at(0)->children;
if (args.size() < 1)
throw Exception("Table function '" + getName() + "' requires at least one argument: "\
" structure(, max_array_length, max_string_length, random_seed).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (args.size() > 4)
throw Exception("Table function '" + getName() + "' requires at most four arguments: "\
" structure, max_array_length, max_string_length, random_seed.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// Parsing first argument as table structure and creating a sample block
std::string structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
UInt64 max_array_length = 10;
UInt64 max_string_length = 10;
UInt64 random_seed = 0; // zero for random
/// Parsing second argument if present
if (args.size() >= 2)
max_array_length = args[1]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (args.size() >= 3)
max_string_length = args[2]->as<ASTLiteral &>().value.safeGet<UInt64>();
if (args.size() == 4)
random_seed = args[3]->as<ASTLiteral &>().value.safeGet<UInt64>();
ColumnsDescription columns = parseColumnsListFromString(structure, context);
auto res = StorageGenerate::create(StorageID(getDatabaseName(), table_name), columns, max_array_length, max_string_length, random_seed);
res->startup();
return res;
}
void registerTableFunctionGenerate(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionGenerate>(TableFunctionFactory::CaseInsensitive);
}
}

View File

@ -7,7 +7,7 @@ namespace DB
/* random(structure, limit) - creates a temporary storage filling columns with random data
* random is case-insensitive table function
*/
class TableFunctionRandom : public ITableFunction
class TableFunctionGenerate : public ITableFunction
{
public:
static constexpr auto name = "generate";

View File

@ -15,7 +15,7 @@ void registerTableFunctions()
registerTableFunctionURL(factory);
registerTableFunctionValues(factory);
registerTableFunctionInput(factory);
registerTableFunctionRandom(factory);
registerTableFunctionGenerate(factory);
#if USE_AWS_S3
registerTableFunctionS3(factory);

View File

@ -12,7 +12,7 @@ void registerTableFunctionFile(TableFunctionFactory & factory);
void registerTableFunctionURL(TableFunctionFactory & factory);
void registerTableFunctionValues(TableFunctionFactory & factory);
void registerTableFunctionInput(TableFunctionFactory & factory);
void registerTableFunctionRandom(TableFunctionFactory & factory);
void registerTableFunctionGenerate(TableFunctionFactory & factory);
#if USE_AWS_S3
void registerTableFunctionS3(TableFunctionFactory & factory);