Merge branch 'master' into minor-changes-3

This commit is contained in:
Alexey Milovidov 2021-06-15 00:45:45 +03:00
commit 4e982a3ae6
17 changed files with 390 additions and 55 deletions

View File

@ -112,7 +112,7 @@ A hand-written recursive descent parser parses a query. For example, `ParserSele
Interpreters are responsible for creating the query execution pipeline from an `AST`. There are simple interpreters, such as `InterpreterExistsQuery` and `InterpreterDropQuery`, or the more sophisticated `InterpreterSelectQuery`. The query execution pipeline is a combination of block input or output streams. For example, the result of interpreting the `SELECT` query is the `IBlockInputStream` to read the result set from; the result of the INSERT query is the `IBlockOutputStream` to write data for insertion to, and the result of interpreting the `INSERT SELECT` query is the `IBlockInputStream` that returns an empty result set on the first read, but that copies data from `SELECT` to `INSERT` at the same time.
`InterpreterSelectQuery` uses `ExpressionAnalyzer` and `ExpressionActions` machinery for query analysis and transformations. This is where most rule-based query optimizations are done. `ExpressionAnalyzer` is quite messy and should be rewritten: various query transformations and optimizations should be extracted to separate classes to allow modular transformations or query.
`InterpreterSelectQuery` uses `ExpressionAnalyzer` and `ExpressionActions` machinery for query analysis and transformations. This is where most rule-based query optimizations are done. `ExpressionAnalyzer` is quite messy and should be rewritten: various query transformations and optimizations should be extracted to separate classes to allow modular transformations of query.
## Functions {#functions}
@ -169,7 +169,7 @@ There is no global query plan for distributed query execution. Each node has its
`MergeTree` is a family of storage engines that supports indexing by primary key. The primary key can be an arbitrary tuple of columns or expressions. Data in a `MergeTree` table is stored in “parts”. Each part stores data in the primary key order, so data is ordered lexicographically by the primary key tuple. All the table columns are stored in separate `column.bin` files in these parts. The files consist of compressed blocks. Each block is usually from 64 KB to 1 MB of uncompressed data, depending on the average value size. The blocks consist of column values placed contiguously one after the other. Column values are in the same order for each column (the primary key defines the order), so when you iterate by many columns, you get values for the corresponding rows.
The primary key itself is “sparse”. It does not address every single row, but only some ranges of data. A separate `primary.idx` file has the value of the primary key for each N-th row, where N is called `index_granularity` (usually, N = 8192). Also, for each column, we have `column.mrk` files with “marks,” which are offsets to each N-th row in the data file. Each mark is a pair: the offset in the file to the beginning of the compressed block, and the offset in the decompressed block to the beginning of data. Usually, compressed blocks are aligned by marks, and the offset in the decompressed block is zero. Data for `primary.idx` always resides in memory, and data for `column.mrk` files is cached.
The primary key itself is “sparse”. It does not address every single row, but only some ranges of data. A separate `primary.idx` file has the value of the primary key for each N-th row, where N is called `index_granularity` (usually, N = 8192). Also, for each column, we have `column.mrk` files with “marks”, which are offsets to each N-th row in the data file. Each mark is a pair: the offset in the file to the beginning of the compressed block, and the offset in the decompressed block to the beginning of data. Usually, compressed blocks are aligned by marks, and the offset in the decompressed block is zero. Data for `primary.idx` always resides in memory, and data for `column.mrk` files is cached.
When we are going to read something from a part in `MergeTree`, we look at `primary.idx` data and locate ranges that could contain requested data, then look at `column.mrk` data and calculate offsets for where to start reading those ranges. Because of sparseness, excess data may be read. ClickHouse is not suitable for a high load of simple point queries, because the entire range with `index_granularity` rows must be read for each key, and the entire compressed block must be decompressed for each column. We made the index sparse because we must be able to maintain trillions of rows per single server without noticeable memory consumption for the index. Also, because the primary key is sparse, it is not unique: it cannot check the existence of the key in the table at INSERT time. You could have many rows with the same key in a table.

View File

@ -17,7 +17,7 @@ Main features:
- Partitions can be used if the [partitioning key](../../../engines/table-engines/mergetree-family/custom-partitioning-key.md) is specified.
ClickHouse supports certain operations with partitions that are more effective than general operations on the same data with the same result. ClickHouse also automatically cuts off the partition data where the partitioning key is specified in the query.
ClickHouse supports certain operations with partitions that are more efficient than general operations on the same data with the same result. ClickHouse also automatically cuts off the partition data where the partitioning key is specified in the query.
- Data replication support.
@ -83,7 +83,7 @@ For a description of parameters, see the [CREATE query description](../../../sql
Expression must have one `Date` or `DateTime` column as a result. Example:
`TTL date + INTERVAL 1 DAY`
Type of the rule `DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'|GROUP BY` specifies an action to be done with the part if the expression is satisfied (reaches current time): removal of expired rows, moving a part (if expression is satisfied for all rows in a part) to specified disk (`TO DISK 'xxx'`) or to volume (`TO VOLUME 'xxx'`), or aggregating values in expired rows. Default type of the rule is removal (`DELETE`). List of multiple rules can specified, but there should be no more than one `DELETE` rule.
Type of the rule `DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'|GROUP BY` specifies an action to be done with the part if the expression is satisfied (reaches current time): removal of expired rows, moving a part (if expression is satisfied for all rows in a part) to specified disk (`TO DISK 'xxx'`) or to volume (`TO VOLUME 'xxx'`), or aggregating values in expired rows. Default type of the rule is removal (`DELETE`). List of multiple rules can be specified, but there should be no more than one `DELETE` rule.
For more details, see [TTL for columns and tables](#table_engine-mergetree-ttl)
@ -474,7 +474,7 @@ With `WHERE` clause you may specify which of the expired rows to delete or aggre
`GROUP BY` expression must be a prefix of the table primary key.
If a column is not part of the `GROUP BY` expression and is not set explicitely in the `SET` clause, in result row it contains an occasional value from the grouped rows (as if aggregate function `any` is applied to it).
If a column is not part of the `GROUP BY` expression and is not set explicitly in the `SET` clause, in result row it contains an occasional value from the grouped rows (as if aggregate function `any` is applied to it).
**Examples**

View File

@ -96,7 +96,7 @@ SELECT key, sum(value) FROM summtt GROUP BY key
When data are inserted into a table, they are saved as-is. ClickHouse merges the inserted parts of data periodically and this is when rows with the same primary key are summed and replaced with one for each resulting part of data.
ClickHouse can merge the data parts so that different resulting parts of data cat consist rows with the same primary key, i.e. the summation will be incomplete. Therefore (`SELECT`) an aggregate function [sum()](../../../sql-reference/aggregate-functions/reference/sum.md#agg_function-sum) and `GROUP BY` clause should be used in a query as described in the example above.
ClickHouse can merge the data parts so that different resulting parts of data can consist rows with the same primary key, i.e. the summation will be incomplete. Therefore (`SELECT`) an aggregate function [sum()](../../../sql-reference/aggregate-functions/reference/sum.md#agg_function-sum) and `GROUP BY` clause should be used in a query as described in the example above.
### Common Rules for Summation {#common-rules-for-summation}

View File

@ -14,6 +14,7 @@
#endif
#include <chrono>
#include <optional>
#include <set>
#include <vector>

View File

@ -496,6 +496,7 @@ if (ENABLE_TESTS AND USE_GTEST)
# gtest framework has substandard code
target_compile_options(unit_tests_dbms PRIVATE
-Wno-zero-as-null-pointer-constant
-Wno-covered-switch-default
-Wno-undef
-Wno-sign-compare
-Wno-used-but-marked-unused

View File

@ -0,0 +1,65 @@
#include <Common/VersionNumber.h>
#include <cstdlib>
#include <iostream>
namespace DB
{
VersionNumber::VersionNumber(std::string version_string)
{
if (version_string.empty())
return;
char * start = &version_string.front();
char * end = start;
const char * eos = &version_string.back() + 1;
do
{
Int64 value = strtol(start, &end, 10);
components.push_back(value);
start = end + 1;
}
while (start < eos && (end < eos && *end == '.'));
}
std::string VersionNumber::toString() const
{
std::string str;
for (Int64 v : components)
{
if (!str.empty())
str += '.';
str += std::to_string(v);
}
return str;
}
int VersionNumber::compare(const VersionNumber & rhs) const
{
size_t min = std::min(components.size(), rhs.components.size());
for (size_t i = 0; i < min; ++i)
{
if (int d = components[i] - rhs.components[i])
return d;
}
if (components.size() > min)
{
if (components[min] != 0)
return components[min];
else
return 1;
}
else if (rhs.components.size() > min)
{
if (rhs.components[min] != 0)
return -rhs.components[min];
else
return -1;
}
return 0;
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <tuple>
#include <string>
#include <vector>
#include <iostream>
#include <common/types.h>
namespace DB
{
/// Simple numeric version representation.
///
/// Based on QVersionNumber.
struct VersionNumber
{
explicit VersionNumber() = default;
VersionNumber(const std::initializer_list<Int64> & init)
: components(init)
{}
VersionNumber(Int64 major, Int64 minor = 0, Int64 patch = 0)
: components{major, minor, patch}
{}
VersionNumber(const std::vector<Int64> & components_)
: components(components_)
{}
/// Parse version number from string.
VersionNumber(std::string version);
/// NOTE: operator<=> can be used once libc++ will be upgraded.
bool operator<(const VersionNumber & rhs) const { return compare(rhs.components) < 0; }
bool operator<=(const VersionNumber & rhs) const { return compare(rhs.components) <= 0; }
bool operator==(const VersionNumber & rhs) const { return compare(rhs.components) == 0; }
bool operator>(const VersionNumber & rhs) const { return compare(rhs.components) > 0; }
bool operator>=(const VersionNumber & rhs) const { return compare(rhs.components) >= 0; }
std::string toString() const;
friend std::ostream & operator<<(std::ostream & os, const VersionNumber & v)
{
return os << v.toString();
}
private:
using Components = std::vector<Int64>;
Components components;
int compare(const VersionNumber & rhs) const;
};
}

View File

@ -1,5 +1,7 @@
#include <Common/renameat2.h>
#include <Common/Exception.h>
#include <Common/VersionNumber.h>
#include <Poco/Environment.h>
#include <filesystem>
#if defined(linux) || defined(__linux) || defined(__linux__)
@ -7,7 +9,6 @@
#include <fcntl.h>
#include <sys/syscall.h>
#include <linux/fs.h>
#include <sys/utsname.h>
#endif
namespace fs = std::filesystem;
@ -27,22 +28,9 @@ namespace ErrorCodes
static bool supportsRenameat2Impl()
{
#if defined(__NR_renameat2)
/// renameat2 is available in linux since 3.15
struct utsname sysinfo;
if (uname(&sysinfo))
return false;
char * point = nullptr;
auto v_major = strtol(sysinfo.release, &point, 10);
errno = 0;
if (errno || *point != '.' || v_major < 3)
return false;
if (3 < v_major)
return true;
errno = 0;
auto v_minor = strtol(point + 1, nullptr, 10);
return !errno && 15 <= v_minor;
VersionNumber renameat2_minimal_version(3, 15, 0);
VersionNumber linux_version(Poco::Environment::osVersion());
return linux_version >= renameat2_minimal_version;
#else
return false;
#endif

View File

@ -0,0 +1,26 @@
#include <Common/VersionNumber.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(VersionNumber, VersionNumber)
{
VersionNumber version(1, 2, 3);
EXPECT_NE(VersionNumber(1, 1, 1), version);
EXPECT_EQ(VersionNumber(1, 2, 3), version);
EXPECT_GE(VersionNumber(1, 2, 3), version);
EXPECT_GT(VersionNumber(1, 2, 4), version);
EXPECT_LE(VersionNumber(1, 2, 3), version);
EXPECT_LT(VersionNumber(1, 2, 2), version);
}
TEST(VersionNumber, fromString)
{
EXPECT_EQ(VersionNumber("1.1.1"), VersionNumber(1, 1, 1));
EXPECT_EQ(VersionNumber("5.5.13prefix"), VersionNumber(5, 5, 13));
EXPECT_GT(VersionNumber("1.1.1.1"), VersionNumber(1, 1, 1));
EXPECT_LT(VersionNumber("1.1"), VersionNumber(1, 1, 0));
EXPECT_LT(VersionNumber("1"), VersionNumber(1, 0, 0));
EXPECT_LT(VersionNumber(""), VersionNumber(0, 0, 0));
}

View File

@ -90,6 +90,7 @@ SRCS(
TraceCollector.cpp
UTF8Helpers.cpp
UnicodeBar.cpp
VersionNumber.cpp
WeakHash.cpp
ZooKeeper/IKeeper.cpp
ZooKeeper/TestKeeper.cpp

View File

@ -1,9 +1,10 @@
#include <Core/SettingsQuirks.h>
#include <Core/Settings.h>
#include <Poco/Environment.h>
#include <Poco/Platform.h>
#include <Common/VersionNumber.h>
#include <common/logger_useful.h>
#ifdef __linux__
#include <linux/version.h>
#include <cstdlib>
/// Detect does epoll_wait with nested epoll fds works correctly.
/// Polling nested epoll fds from epoll_wait is required for async_socket_for_remote and use_hedged_requests.
@ -14,19 +15,21 @@
/// [2]: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=0c54a6a44bf3
bool nestedEpollWorks(Poco::Logger * log)
{
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(5, 5, 0)) && (LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 13))
/// the check is correct since there will be no more 5.5.x releases.
if (Poco::Environment::os() != POCO_OS_LINUX)
return true;
DB::VersionNumber linux_version(Poco::Environment::osVersion());
/// the check is correct since there will be no more 5.5.x releases.
if (linux_version >= DB::VersionNumber{5, 5, 0} && linux_version < DB::VersionNumber{5, 6, 13})
{
if (log)
LOG_WARNING(log, "Nested epoll_wait has some issues on kernels [5.5.0, 5.6.13). You should upgrade it to avoid possible issues.");
return false;
#else
(void)log;
return true;
#endif
}
return true;
}
#else
bool nestedEpollWorks(Poco::Logger *) { return true; }
#endif
namespace DB
{

View File

@ -102,6 +102,7 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
, query_string(queryToString(query_ast_))
, cluster(cluster_)
, insert_sync(insert_sync_)
, allow_materialized(context->getSettingsRef().insert_allow_materialized_columns)
, insert_timeout(insert_timeout_)
, main_table(main_table_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
@ -115,7 +116,10 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(
Block DistributedBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlock();
if (!allow_materialized)
return metadata_snapshot->getSampleBlockNonMaterialized();
else
return metadata_snapshot->getSampleBlock();
}
@ -129,19 +133,21 @@ void DistributedBlockOutputStream::write(const Block & block)
{
Block ordinary_block{ block };
/* They are added by the AddingDefaultBlockOutputStream, and we will get
* different number of columns eventually */
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
if (!allow_materialized)
{
if (ordinary_block.has(col.name))
/* They are added by the AddingDefaultBlockOutputStream, and we will get
* different number of columns eventually */
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
{
ordinary_block.erase(col.name);
LOG_DEBUG(log, "{}: column {} will be removed, because it is MATERIALIZED",
storage.getStorageID().getNameForLogs(), col.name);
if (ordinary_block.has(col.name))
{
ordinary_block.erase(col.name);
LOG_DEBUG(log, "{}: column {} will be removed, because it is MATERIALIZED",
storage.getStorageID().getNameForLogs(), col.name);
}
}
}
if (insert_sync)
writeSync(ordinary_block);
else
@ -375,7 +381,7 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
/// to resolve tables (in InterpreterInsertQuery::getTable())
auto copy_query_ast = query_ast->clone();
InterpreterInsertQuery interp(copy_query_ast, job.local_context);
InterpreterInsertQuery interp(copy_query_ast, job.local_context, allow_materialized);
auto block_io = interp.execute();
job.stream = block_io.out;
@ -611,8 +617,7 @@ void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, size_t sh
void DistributedBlockOutputStream::writeToLocal(const Block & block, size_t repeats)
{
/// Async insert does not support settings forwarding yet whereas sync one supports
InterpreterInsertQuery interp(query_ast, context);
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
auto block_io = interp.execute();

View File

@ -94,6 +94,7 @@ private:
size_t inserted_rows = 0;
bool insert_sync;
bool allow_materialized;
/// Sync-related stuff
UInt64 insert_timeout; // in seconds

View File

@ -29,12 +29,133 @@ namespace
/// It worked this way until 21.5, and we cannot change it,
/// or partition ID will be different in case UUID is used in partition key.
/// (It is not recommended to use UUID as partition key).
class LegacyFieldVisitorHash : public FieldVisitorHash
/// NOTE: The code is intentionally copy-pasted,
/// so when FieldVisitorHash is changed, LegacyFieldVisitorHash will not change.
class LegacyFieldVisitorHash : public StaticVisitor<>
{
private:
SipHash & hash;
public:
using FieldVisitorHash::FieldVisitorHash;
using FieldVisitorHash::operator();
void operator() (const UUID & x) const { FieldVisitorHash::operator()(x.toUnderType()); }
explicit LegacyFieldVisitorHash(SipHash & hash_) : hash(hash_) {}
void operator() (const Null &) const
{
UInt8 type = Field::Types::Null;
hash.update(type);
}
void operator() (const UInt64 & x) const
{
UInt8 type = Field::Types::UInt64;
hash.update(type);
hash.update(x);
}
void operator() (const UInt128 & x) const
{
UInt8 type = Field::Types::UInt128;
hash.update(type);
hash.update(x);
}
void operator() (const UInt256 & x) const
{
UInt8 type = Field::Types::UInt256;
hash.update(type);
hash.update(x);
}
void operator() (const Int64 & x) const
{
UInt8 type = Field::Types::Int64;
hash.update(type);
hash.update(x);
}
void operator() (const Int128 & x) const
{
UInt8 type = Field::Types::Int128;
hash.update(type);
hash.update(x);
}
void operator() (const Int256 & x) const
{
UInt8 type = Field::Types::Int256;
hash.update(type);
hash.update(x);
}
void operator() (const UUID & x) const
{
operator()(x.toUnderType());
}
void operator() (const Float64 & x) const
{
UInt8 type = Field::Types::Float64;
hash.update(type);
hash.update(x);
}
void operator() (const String & x) const
{
UInt8 type = Field::Types::String;
hash.update(type);
hash.update(x.size());
hash.update(x.data(), x.size());
}
void operator() (const Array & x) const
{
UInt8 type = Field::Types::Array;
hash.update(type);
hash.update(x.size());
for (const auto & elem : x)
applyVisitor(*this, elem);
}
void operator() (const Tuple & x) const
{
UInt8 type = Field::Types::Tuple;
hash.update(type);
hash.update(x.size());
for (const auto & elem : x)
applyVisitor(*this, elem);
}
void operator() (const Map & x) const
{
UInt8 type = Field::Types::Map;
hash.update(type);
hash.update(x.size());
for (const auto & elem : x)
applyVisitor(*this, elem);
}
void operator() (const DecimalField<Decimal32> & x) const
{
UInt8 type = Field::Types::Decimal32;
hash.update(type);
hash.update(x.getValue().value);
}
void operator() (const DecimalField<Decimal64> & x) const
{
UInt8 type = Field::Types::Decimal64;
hash.update(type);
hash.update(x.getValue().value);
}
void operator() (const DecimalField<Decimal128> & x) const
{
UInt8 type = Field::Types::Decimal128;
hash.update(type);
hash.update(x.getValue().value);
}
void operator() (const DecimalField<Decimal256> & x) const
{
UInt8 type = Field::Types::Decimal256;
hash.update(type);
hash.update(x.getValue().value);
}
void operator() (const AggregateFunctionStateData & x) const
{
UInt8 type = Field::Types::AggregateFunctionState;
hash.update(type);
hash.update(x.name.size());
hash.update(x.name.data(), x.name.size());
hash.update(x.data.size());
hash.update(x.data.data(), x.data.size());
}
};
}

View File

@ -160,7 +160,7 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
/// the sample block instead.
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block_non_materialized)
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block)
{
auto query = std::make_shared<ASTInsertQuery>();
query->table_id = StorageID(database, table);
@ -168,7 +168,7 @@ ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::s
auto columns = std::make_shared<ASTExpressionList>();
query->columns = columns;
query->children.push_back(columns);
for (const auto & col : sample_block_non_materialized)
for (const auto & col : sample_block)
columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
return query;
@ -648,11 +648,16 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
bool insert_sync = settings.insert_distributed_sync || settings.insert_shard_id || owned_cluster;
auto timeout = settings.insert_distributed_timeout;
Block sample_block;
if (!settings.insert_allow_materialized_columns)
sample_block = metadata_snapshot->getSampleBlockNonMaterialized();
else
sample_block = metadata_snapshot->getSampleBlock();
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
local_context, *this, metadata_snapshot,
createInsertToRemoteTableQuery(
remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()),
createInsertToRemoteTableQuery(remote_database, remote_table, sample_block),
cluster, insert_sync, timeout, StorageID{remote_database, remote_table});
}

View File

@ -1,3 +1,4 @@
insert_allow_materialized_columns=0
insert_distributed_sync=0
2018-08-01
2018-08-01
@ -12,3 +13,18 @@ insert_distributed_sync=1
2018-08-01 2017-08-01
2018-08-01
2018-08-01 2017-08-01
insert_allow_materialized_columns=1
insert_distributed_sync=0
2018-08-01
2018-08-01
2018-08-01 2019-08-01
2018-08-01 2019-08-01
2018-08-01
2018-08-01 2019-08-01
insert_distributed_sync=1
2018-08-01
2018-08-01
2018-08-01 2019-08-01
2018-08-01 2019-08-01
2018-08-01
2018-08-01 2019-08-01

View File

@ -1,6 +1,12 @@
DROP TABLE IF EXISTS local_00952;
DROP TABLE IF EXISTS distributed_00952;
--
-- insert_allow_materialized_columns=0
--
SELECT 'insert_allow_materialized_columns=0';
SET insert_allow_materialized_columns=0;
--
-- insert_distributed_sync=0
--
@ -40,3 +46,47 @@ SELECT date, value FROM local_00952;
DROP TABLE distributed_00952;
DROP TABLE local_00952;
--
-- insert_allow_materialized_columns=1
--
SELECT 'insert_allow_materialized_columns=1';
SET insert_allow_materialized_columns=1;
--
-- insert_distributed_sync=0
--
SELECT 'insert_distributed_sync=0';
SET insert_distributed_sync=0;
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());
INSERT INTO distributed_00952 (date, value) VALUES ('2018-08-01', '2019-08-01');
SYSTEM FLUSH DISTRIBUTED distributed_00952;
SELECT * FROM distributed_00952;
SELECT date, value FROM distributed_00952;
SELECT * FROM local_00952;
SELECT date, value FROM local_00952;
DROP TABLE distributed_00952;
DROP TABLE local_00952;
--
-- insert_distributed_sync=1
--
SELECT 'insert_distributed_sync=1';
SET insert_distributed_sync=1;
CREATE TABLE local_00952 (date Date, value Date MATERIALIZED toDate('2017-08-01')) ENGINE = MergeTree(date, date, 8192);
CREATE TABLE distributed_00952 AS local_00952 ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_00952, rand());
INSERT INTO distributed_00952 (date, value) VALUES ('2018-08-01', '2019-08-01');
SELECT * FROM distributed_00952;
SELECT date, value FROM distributed_00952;
SELECT * FROM local_00952;
SELECT date, value FROM local_00952;
DROP TABLE distributed_00952;
DROP TABLE local_00952;