Merge branch 'master' into feature/fix-windowFunnel-inconsistency

This commit is contained in:
HuFuwang 2021-01-10 07:34:05 +08:00
commit 21659e8aa5
25 changed files with 104 additions and 37 deletions

View File

@ -11,6 +11,7 @@ This table contains the following columns (the column type is shown in brackets)
- `supports_sort_order` (UInt8) — Flag that indicates if table engine supports clauses `PARTITION_BY`, `PRIMARY_KEY`, `ORDER_BY` and `SAMPLE_BY`.
- `supports_replication` (UInt8) — Flag that indicates if table engine supports [data replication](../../engines/table-engines/mergetree-family/replication.md).
- `supports_duduplication` (UInt8) — Flag that indicates if table engine supports data deduplication.
- `supports_parallel_insert` (UInt8) — Flag that indicates if table engine supports parallel insert (see [`max_insert_threads`](../../operations/settings/settings.md#settings-max-insert-threads) setting).
Example:
@ -21,11 +22,11 @@ WHERE name in ('Kafka', 'MergeTree', 'ReplicatedCollapsingMergeTree')
```
``` text
┌─name──────────────────────────┬─supports_settings─┬─supports_skipping_indices─┬─supports_sort_order─┬─supports_ttl─┬─supports_replication─┬─supports_deduplication─┐
Kafka │ 1 │ 0 │ 0 │ 0 │ 0 │ 0 │
MergeTree │ 1 │ 1 │ 1 │ 1 │ 0 │ 0 │
│ ReplicatedCollapsingMergeTree │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │
└───────────────────────────────┴───────────────────┴───────────────────────────┴─────────────────────┴──────────────┴──────────────────────┴────────────────────────┘
┌─name──────────────────────────┬─supports_settings─┬─supports_skipping_indices─┬─supports_sort_order─┬─supports_ttl─┬─supports_replication─┬─supports_deduplication─┬─supports_parallel_insert─
MergeTree │ 1 │ 1 │ 1 │ 1 │ 0 │ 0 │ 1 │
Kafka │ 1 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │
│ ReplicatedCollapsingMergeTree │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │
└───────────────────────────────┴───────────────────┴───────────────────────────┴─────────────────────┴──────────────┴──────────────────────┴────────────────────────┴──────────────────────────
```
**See also**

View File

@ -801,7 +801,8 @@ private:
connection->setDefaultDatabase(connection_parameters.default_database);
ReadBufferFromFile in(queries_file);
readStringUntilEOF(text, in);
processMultiQuery(text);
if (!processMultiQuery(text))
break;
}
return;
}
@ -984,7 +985,8 @@ private:
if (query_fuzzer_runs)
{
processWithFuzzing(full_query);
if (!processWithFuzzing(full_query))
return false;
}
else
{
@ -1034,7 +1036,8 @@ private:
}
void processWithFuzzing(const String & text)
/// Returns false when server is not available.
bool processWithFuzzing(const String & text)
{
ASTPtr orig_ast;
@ -1052,7 +1055,7 @@ private:
if (!orig_ast)
{
// Can't continue after a parsing error
return;
return true;
}
// Don't repeat inserts, the tables grow too big. Also don't repeat
@ -1147,7 +1150,7 @@ private:
// Probably the server is dead because we found an assertion
// failure. Fail fast.
fmt::print(stderr, "Lost connection to the server\n");
return;
return false;
}
// The server is still alive so we're going to continue fuzzing.
@ -1173,6 +1176,8 @@ private:
fuzz_base = ast_to_process;
}
}
return true;
}
void processTextAsSingleQuery(const String & text_)

View File

@ -33,7 +33,7 @@ template <typename T>
struct AggregateFunctionWindowFunnelData
{
using TimestampEvent = std::pair<T, UInt8>;
using TimestampEvents = PODArray<TimestampEvent, 64>;
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
using Comparator = ComparePair;
bool sorted = true;

View File

@ -1,5 +1,4 @@
#include "BackgroundSchedulePool.h"
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>

View File

@ -6,7 +6,6 @@
#include <Access/AccessControlManager.h>
#include <common/logger_useful.h>
#include <Common/MemoryTracker.h>
#include <Common/OpenSSLHelpers.h>
#include <ext/scope_guard.h>

View File

@ -120,9 +120,6 @@ public:
/// Returns true if the storage supports deduplication of inserted data blocks.
virtual bool supportsDeduplication() const { return false; }
/// Returns true if the storage supports settings.
virtual bool supportsSettings() const { return false; }
/// Returns true if the blocks shouldn't be pushed to associated views on insert.
virtual bool noPushingToViews() const { return false; }

View File

@ -36,7 +36,6 @@ class StorageKafka final : public ext::shared_ptr_helper<StorageKafka>, public I
public:
std::string getName() const override { return "Kafka"; }
bool supportsSettings() const override { return true; }
bool noPushingToViews() const override { return true; }
void startup() override;

View File

@ -357,7 +357,6 @@ public:
|| merging_params.mode == MergingParams::VersionedCollapsing;
}
bool supportsSettings() const override { return true; }
NamesAndTypesList getVirtuals() const override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &, const StorageMetadataPtr & metadata_snapshot) const override;

View File

@ -3,9 +3,9 @@
#include <boost/noncopyable.hpp>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <Poco/URI.h>
namespace CurrentMetrics
{
extern const Metric ReplicatedFetch;

View File

@ -756,6 +756,7 @@ void registerStorageMergeTree(StorageFactory & factory)
.supports_skipping_indices = true,
.supports_sort_order = true,
.supports_ttl = true,
.supports_parallel_insert = true,
};
factory.registerStorage("MergeTree", create, features);

View File

@ -29,7 +29,6 @@ class StorageRabbitMQ final: public ext::shared_ptr_helper<StorageRabbitMQ>, pub
public:
std::string getName() const override { return "RabbitMQ"; }
bool supportsSettings() const override { return true; }
bool noPushingToViews() const override { return true; }
void startup() override;

View File

@ -367,6 +367,7 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_sort_order = true,
.supports_parallel_insert = true,
};
factory.registerStorage("EmbeddedRocksDB", create, features);

View File

@ -996,6 +996,9 @@ void registerStorageBuffer(StorageFactory & factory)
StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
destination_id,
static_cast<bool>(args.local_context.getSettingsRef().insert_allow_materialized_columns));
},
{
.supports_parallel_insert = true,
});
}

View File

@ -1005,6 +1005,7 @@ void registerStorageDistributed(StorageFactory & factory)
args.attach);
},
{
.supports_parallel_insert = true,
.source_access_type = AccessType::REMOTE,
});
}

View File

@ -47,14 +47,20 @@ public:
bool has_force_restore_data_flag;
};
/// Analog of the IStorage::supports*() helpers
/// (But the former cannot be replaced with StorageFeatures due to nesting)
struct StorageFeatures
{
bool supports_settings = false;
bool supports_skipping_indices = false;
bool supports_sort_order = false;
bool supports_ttl = false;
/// See also IStorage::supportsReplication()
bool supports_replication = false;
/// See also IStorage::supportsDeduplication()
bool supports_deduplication = false;
/// See also IStorage::supportsParallelInsert()
bool supports_parallel_insert = false;
AccessType source_access_type = AccessType::NONE;
};
@ -85,6 +91,7 @@ public:
.supports_ttl = false,
.supports_replication = false,
.supports_deduplication = false,
.supports_parallel_insert = false,
.source_access_type = AccessType::NONE,
});

View File

@ -303,6 +303,9 @@ void registerStorageMemory(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageMemory::create(args.table_id, args.columns, args.constraints);
},
{
.supports_parallel_insert = true,
});
}

View File

@ -29,6 +29,9 @@ void registerStorageNull(StorageFactory & factory)
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageNull::create(args.table_id, args.columns, args.constraints);
},
{
.supports_parallel_insert = true,
});
}

View File

@ -25,7 +25,6 @@ public:
bool supportsReplication() const override { return getNested()->supportsReplication(); }
bool supportsParallelInsert() const override { return getNested()->supportsParallelInsert(); }
bool supportsDeduplication() const override { return getNested()->supportsDeduplication(); }
bool supportsSettings() const override { return getNested()->supportsSettings(); }
bool noPushingToViews() const override { return getNested()->noPushingToViews(); }
bool hasEvenlyDistributedRead() const override { return getNested()->hasEvenlyDistributedRead(); }

View File

@ -4958,8 +4958,13 @@ void StorageReplicatedMergeTree::fetchPartition(
const String & from_,
const Context & query_context)
{
String auxiliary_zookeeper_name = extractZooKeeperName(from_);
String from = extractZooKeeperPath(from_);
Macros::MacroExpansionInfo info;
info.expand_special_macros_only = false;
info.table_id = getStorageID();
info.table_id.uuid = UUIDHelpers::Nil;
auto expand_from = query_context.getMacros()->expand(from_, info);
String auxiliary_zookeeper_name = extractZooKeeperName(expand_from);
String from = extractZooKeeperPath(expand_from);
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

View File

@ -8,26 +8,31 @@ namespace DB
NamesAndTypesList StorageSystemTableEngines::getNamesAndTypes()
{
return {{"name", std::make_shared<DataTypeString>()},
{"supports_settings", std::make_shared<DataTypeUInt8>()},
{"supports_skipping_indices", std::make_shared<DataTypeUInt8>()},
{"supports_sort_order", std::make_shared<DataTypeUInt8>()},
{"supports_ttl", std::make_shared<DataTypeUInt8>()},
{"supports_replication", std::make_shared<DataTypeUInt8>()},
{"supports_deduplication", std::make_shared<DataTypeUInt8>()}};
return {
{"name", std::make_shared<DataTypeString>()},
{"supports_settings", std::make_shared<DataTypeUInt8>()},
{"supports_skipping_indices", std::make_shared<DataTypeUInt8>()},
{"supports_sort_order", std::make_shared<DataTypeUInt8>()},
{"supports_ttl", std::make_shared<DataTypeUInt8>()},
{"supports_replication", std::make_shared<DataTypeUInt8>()},
{"supports_deduplication", std::make_shared<DataTypeUInt8>()},
{"supports_parallel_insert", std::make_shared<DataTypeUInt8>()},
};
}
void StorageSystemTableEngines::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
for (const auto & pair : StorageFactory::instance().getAllStorages())
{
res_columns[0]->insert(pair.first);
res_columns[1]->insert(pair.second.features.supports_settings);
res_columns[2]->insert(pair.second.features.supports_skipping_indices);
res_columns[3]->insert(pair.second.features.supports_sort_order);
res_columns[4]->insert(pair.second.features.supports_ttl);
res_columns[5]->insert(pair.second.features.supports_replication);
res_columns[6]->insert(pair.second.features.supports_deduplication);
int i = 0;
res_columns[i++]->insert(pair.first);
res_columns[i++]->insert(pair.second.features.supports_settings);
res_columns[i++]->insert(pair.second.features.supports_skipping_indices);
res_columns[i++]->insert(pair.second.features.supports_sort_order);
res_columns[i++]->insert(pair.second.features.supports_ttl);
res_columns[i++]->insert(pair.second.features.supports_replication);
res_columns[i++]->insert(pair.second.features.supports_deduplication);
res_columns[i++]->insert(pair.second.features.supports_parallel_insert);
}
}

View File

@ -80,6 +80,24 @@ TEST(TransformQueryForExternalDatabase, InWithSingleElement)
state.context, state.columns);
}
TEST(TransformQueryForExternalDatabase, InWithTable)
{
const State & state = State::instance();
check("SELECT column FROM test.table WHERE 1 IN external_table",
R"(SELECT "column" FROM "test"."table")",
state.context, state.columns);
check("SELECT column FROM test.table WHERE 1 IN (x)",
R"(SELECT "column" FROM "test"."table")",
state.context, state.columns);
check("SELECT column, field, value FROM test.table WHERE column IN (field, value)",
R"(SELECT "column", "field", "value" FROM "test"."table" WHERE "column" IN ("field", "value"))",
state.context, state.columns);
check("SELECT column FROM test.table WHERE column NOT IN hello AND column = 123",
R"(SELECT "column" FROM "test"."table" WHERE ("column" = 123))",
state.context, state.columns);
}
TEST(TransformQueryForExternalDatabase, Like)
{
const State & state = State::instance();

View File

@ -138,6 +138,12 @@ bool isCompatible(const IAST & node)
if (name == "tuple" && function->arguments->children.size() <= 1)
return false;
/// If the right hand side of IN is an identifier (example: x IN table), then it's not compatible.
if ((name == "in" || name == "notIn")
&& (function->arguments->children.size() != 2
|| function->arguments->children[1]->as<ASTIdentifier>()))
return false;
for (const auto & expr : function->arguments->children)
if (!isCompatible(*expr))
return false;

View File

@ -0,0 +1,12 @@
<test>
<create_query>CREATE TABLE action(uid UInt64, event String, time DateTime) ENGINE = MergeTree ORDER BY uid</create_query>
<fill_query>INSERT INTO action SELECT arrayJoin(groupArray(number)), 'a', now() from numbers(1000000)</fill_query>
<fill_query>INSERT INTO action SELECT arrayJoin(groupArray(number)), 'b', now() + INTERVAL 6 hour from numbers(1000000)</fill_query>
<fill_query>INSERT INTO action SELECT arrayJoin(groupArray(number)), 'c', now() + INTERVAL 12 hour from numbers(1000000)</fill_query>
<fill_query>INSERT INTO action SELECT arrayJoin(groupArray(number)), 'd', now() + INTERVAL 18 hour from numbers(1000000)</fill_query>
<query>SELECT level, count() from (select windowFunnel(86400)(time, event='a', event='b', event='c', event='d') level from action group by uid) group by level FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS action</drop_query>
</test>

View File

@ -0,0 +1,4 @@
┌─name──────────────────────────┬─supports_settings─┬─supports_skipping_indices─┬─supports_sort_order─┬─supports_ttl─┬─supports_replication─┬─supports_deduplication─┬─supports_parallel_insert─┐
│ MergeTree │ 1 │ 1 │ 1 │ 1 │ 0 │ 0 │ 1 │
│ ReplicatedCollapsingMergeTree │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │
└───────────────────────────────┴───────────────────┴───────────────────────────┴─────────────────────┴──────────────┴──────────────────────┴────────────────────────┴──────────────────────────┘

View File

@ -0,0 +1 @@
SELECT * FROM system.table_engines WHERE name in ('MergeTree', 'ReplicatedCollapsingMergeTree') FORMAT PrettyCompactNoEscapes;