Merge branch 'master' into fix-direct-io

This commit is contained in:
alesapin 2020-06-09 12:39:51 +03:00
commit 30b02b20e5
7 changed files with 107 additions and 49 deletions

View File

@ -6,6 +6,7 @@
#include <IO/WriteBufferFromArena.h> #include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Common/FieldVisitors.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/AlignedBuffer.h> #include <Common/AlignedBuffer.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
@ -27,6 +28,51 @@ namespace ErrorCodes
} }
static std::string getTypeString(const AggregateFunctionPtr & func)
{
WriteBufferFromOwnString stream;
stream << "AggregateFunction(" << func->getName();
const auto & parameters = func->getParameters();
const auto & argument_types = func->getArgumentTypes();
if (!parameters.empty())
{
stream << '(';
for (size_t i = 0; i < parameters.size(); ++i)
{
if (i)
stream << ", ";
stream << applyVisitor(FieldVisitorToString(), parameters[i]);
}
stream << ')';
}
for (const auto & argument_type : argument_types)
stream << ", " << argument_type->getName();
stream << ')';
return stream.str();
}
ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & func_)
: func(func_), type_string(getTypeString(func))
{
}
ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & func_, const ConstArenas & arenas_)
: foreign_arenas(arenas_), func(func_), type_string(getTypeString(func))
{
}
void ColumnAggregateFunction::set(const AggregateFunctionPtr & func_)
{
func = func_;
type_string = getTypeString(func);
}
ColumnAggregateFunction::~ColumnAggregateFunction() ColumnAggregateFunction::~ColumnAggregateFunction()
{ {
if (!func->hasTrivialDestructor() && !src) if (!func->hasTrivialDestructor() && !src)
@ -336,15 +382,10 @@ MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const
return create(func); return create(func);
} }
String ColumnAggregateFunction::getTypeString() const
{
return DataTypeAggregateFunction(func, func->getArgumentTypes(), func->getParameters()).getName();
}
Field ColumnAggregateFunction::operator[](size_t n) const Field ColumnAggregateFunction::operator[](size_t n) const
{ {
Field field = AggregateFunctionStateData(); Field field = AggregateFunctionStateData();
field.get<AggregateFunctionStateData &>().name = getTypeString(); field.get<AggregateFunctionStateData &>().name = type_string;
{ {
WriteBufferFromString buffer(field.get<AggregateFunctionStateData &>().data); WriteBufferFromString buffer(field.get<AggregateFunctionStateData &>().data);
func->serialize(data[n], buffer); func->serialize(data[n], buffer);
@ -355,7 +396,7 @@ Field ColumnAggregateFunction::operator[](size_t n) const
void ColumnAggregateFunction::get(size_t n, Field & res) const void ColumnAggregateFunction::get(size_t n, Field & res) const
{ {
res = AggregateFunctionStateData(); res = AggregateFunctionStateData();
res.get<AggregateFunctionStateData &>().name = getTypeString(); res.get<AggregateFunctionStateData &>().name = type_string;
{ {
WriteBufferFromString buffer(res.get<AggregateFunctionStateData &>().data); WriteBufferFromString buffer(res.get<AggregateFunctionStateData &>().data);
func->serialize(data[n], buffer); func->serialize(data[n], buffer);
@ -425,8 +466,6 @@ static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Ar
void ColumnAggregateFunction::insert(const Field & x) void ColumnAggregateFunction::insert(const Field & x)
{ {
String type_string = getTypeString();
if (x.getType() != Field::Types::AggregateFunctionState) if (x.getType() != Field::Types::AggregateFunctionState)
throw Exception(String("Inserting field of type ") + x.getTypeName() + " into ColumnAggregateFunction. " throw Exception(String("Inserting field of type ") + x.getTypeName() + " into ColumnAggregateFunction. "
"Expected " + Field::Types::toString(Field::Types::AggregateFunctionState), ErrorCodes::LOGICAL_ERROR); "Expected " + Field::Types::toString(Field::Types::AggregateFunctionState), ErrorCodes::LOGICAL_ERROR);
@ -564,7 +603,7 @@ void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const
AggregateDataPtr place = place_buffer.data(); AggregateDataPtr place = place_buffer.data();
AggregateFunctionStateData serialized; AggregateFunctionStateData serialized;
serialized.name = getTypeString(); serialized.name = type_string;
func->create(place); func->create(place);
try try

View File

@ -74,6 +74,9 @@ private:
/// Array of pointers to aggregation states, that are placed in arenas. /// Array of pointers to aggregation states, that are placed in arenas.
Container data; Container data;
/// Name of the type to distinguish different aggregation states.
String type_string;
ColumnAggregateFunction() {} ColumnAggregateFunction() {}
/// Create a new column that has another column as a source. /// Create a new column that has another column as a source.
@ -84,29 +87,17 @@ private:
/// but ownership of different elements cannot be mixed by different columns. /// but ownership of different elements cannot be mixed by different columns.
void ensureOwnership(); void ensureOwnership();
ColumnAggregateFunction(const AggregateFunctionPtr & func_) ColumnAggregateFunction(const AggregateFunctionPtr & func_);
: func(func_)
{
}
ColumnAggregateFunction(const AggregateFunctionPtr & func_, ColumnAggregateFunction(const AggregateFunctionPtr & func_,
const ConstArenas & arenas_) const ConstArenas & arenas_);
: foreign_arenas(arenas_), func(func_)
{
}
ColumnAggregateFunction(const ColumnAggregateFunction & src_); ColumnAggregateFunction(const ColumnAggregateFunction & src_);
String getTypeString() const;
public: public:
~ColumnAggregateFunction() override; ~ColumnAggregateFunction() override;
void set(const AggregateFunctionPtr & func_) void set(const AggregateFunctionPtr & func_);
{
func = func_;
}
AggregateFunctionPtr getAggregateFunction() { return func; } AggregateFunctionPtr getAggregateFunction() { return func; }
AggregateFunctionPtr getAggregateFunction() const { return func; } AggregateFunctionPtr getAggregateFunction() const { return func; }

View File

@ -14,6 +14,8 @@
#include <Formats/ProtobufWriter.h> #include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <AggregateFunctions/AggregateFunctionFactory.h> #include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
@ -36,25 +38,25 @@ namespace ErrorCodes
std::string DataTypeAggregateFunction::doGetName() const std::string DataTypeAggregateFunction::doGetName() const
{ {
std::stringstream stream; WriteBufferFromOwnString stream;
stream << "AggregateFunction(" << function->getName(); stream << "AggregateFunction(" << function->getName();
if (!parameters.empty()) if (!parameters.empty())
{ {
stream << "("; stream << '(';
for (size_t i = 0; i < parameters.size(); ++i) for (size_t i = 0; i < parameters.size(); ++i)
{ {
if (i) if (i)
stream << ", "; stream << ", ";
stream << applyVisitor(DB::FieldVisitorToString(), parameters[i]); stream << applyVisitor(DB::FieldVisitorToString(), parameters[i]);
} }
stream << ")"; stream << ')';
} }
for (const auto & argument_type : argument_types) for (const auto & argument_type : argument_types)
stream << ", " << argument_type->getName(); stream << ", " << argument_type->getName();
stream << ")"; stream << ')';
return stream.str(); return stream.str();
} }

View File

@ -1,5 +1,6 @@
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ParserCreateQuery.h> #include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <TableFunctions/parseColumnsListForTableFunction.h> #include <TableFunctions/parseColumnsListForTableFunction.h>
@ -11,27 +12,20 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
} }
ColumnsDescription parseColumnsListFromString(const std::string & structure, const Context & context) ColumnsDescription parseColumnsListFromString(const std::string & structure, const Context & context)
{ {
Expected expected;
Tokens tokens(structure.c_str(), structure.c_str() + structure.size());
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
ParserColumnDeclarationList parser; ParserColumnDeclarationList parser;
ASTPtr columns_list_raw; const Settings & settings = context.getSettingsRef();
if (!parser.parse(token_iterator, columns_list_raw, expected)) ASTPtr columns_list_raw = parseQuery(parser, structure, "columns declaration list", settings.max_query_size, settings.max_parser_depth);
throw Exception("Cannot parse columns declaration list.", ErrorCodes::SYNTAX_ERROR);
auto * columns_list = dynamic_cast<ASTExpressionList *>(columns_list_raw.get()); auto * columns_list = dynamic_cast<ASTExpressionList *>(columns_list_raw.get());
if (!columns_list) if (!columns_list)
throw Exception("Could not cast AST to ASTExpressionList", ErrorCodes::LOGICAL_ERROR); throw Exception("Could not cast AST to ASTExpressionList", ErrorCodes::LOGICAL_ERROR);
return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !context.getSettingsRef().allow_suspicious_codecs); return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !settings.allow_suspicious_codecs);
} }
} }

View File

@ -0,0 +1,30 @@
<test>
<create_query>
CREATE TABLE test(
t UInt64,
q1 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q2 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q3 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q4 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q5 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64)
) ENGINE=SummingMergeTree()
ORDER BY t
</create_query>
<create_query>
INSERT INTO test
SELECT
number / 10 as t,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q1,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q2,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q3,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q4,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q5
FROM numbers(1000 * 1000)
GROUP BY t
</create_query>
<query>OPTIMIZE TABLE test FINAL</query>
<drop_query>DROP TABLE test</drop_query>
</test>

View File

@ -33,11 +33,11 @@ LIMIT 10;
SELECT '-'; SELECT '-';
SELECT SELECT
toTypeName(i)s toTypeName(i)s
FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200)))') FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200))')
LIMIT 1; LIMIT 1;
SELECT SELECT
i i
FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200)))', 1, 10, 10) FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200))', 1, 10, 10)
LIMIT 10; LIMIT 10;
SELECT '-'; SELECT '-';
SELECT SELECT

View File

@ -17,6 +17,7 @@ function read_numbers_func()
function show_processes_func() function show_processes_func()
{ {
while true; do
sleep 0.1; sleep 0.1;
# These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc # These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc
@ -24,7 +25,8 @@ function show_processes_func()
SELECT count() > 0 FROM system.processes\ SELECT count() > 0 FROM system.processes\
WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\ WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\
SETTINGS max_threads = 1 SETTINGS max_threads = 1
"; " | grep '1' && break;
done
} }