mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
Merge branch 'master' into recompression_in_background
This commit is contained in:
commit
034f1a895d
@ -36,8 +36,9 @@ color_good='#b0d050'
|
||||
|
||||
header_template = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<style>
|
||||
<html lang="en">
|
||||
<link rel="preload" as="font" href="https://yastatic.net/adv-www/_/sUYVCPUAQE7ExrvMS7FoISoO83s.woff2" type="font/woff2" crossorigin="anonymous"/>
|
||||
<style>
|
||||
@font-face {{
|
||||
font-family:'Yandex Sans Display Web';
|
||||
src:url(https://yastatic.net/adv-www/_/H63jN0veW07XQUIA2317lr9UIm8.eot);
|
||||
@ -48,7 +49,8 @@ header_template = """
|
||||
url(https://yastatic.net/adv-www/_/lF_KG5g4tpQNlYIgA0e77fBSZ5s.svg#YandexSansDisplayWeb-Regular) format('svg');
|
||||
font-weight:400;
|
||||
font-style:normal;
|
||||
font-stretch:normal
|
||||
font-stretch:normal;
|
||||
font-display: swap;
|
||||
}}
|
||||
|
||||
body {{
|
||||
@ -579,6 +581,7 @@ if args.report == 'main':
|
||||
print(t)
|
||||
|
||||
print("""
|
||||
</div>
|
||||
<p class="links">
|
||||
<a href="all-queries.html">All queries</a>
|
||||
<a href="compare.log">Log</a>
|
||||
@ -696,6 +699,7 @@ elif args.report == 'all-queries':
|
||||
print(t)
|
||||
|
||||
print("""
|
||||
</div>
|
||||
<p class="links">
|
||||
<a href="report.html">Main report</a>
|
||||
<a href="compare.log">Log</a>
|
||||
|
@ -13,21 +13,9 @@ Each functional test sends one or multiple queries to the running ClickHouse ser
|
||||
|
||||
Tests are located in `queries` directory. There are two subdirectories: `stateless` and `stateful`. Stateless tests run queries without any preloaded test data - they often create small synthetic datasets on the fly, within the test itself. Stateful tests require preloaded test data from Yandex.Metrica and not available to general public. We tend to use only `stateless` tests and avoid adding new `stateful` tests.
|
||||
|
||||
Each test can be one of two types: `.sql` and `.sh`. `.sql` test is the simple SQL script that is piped to `clickhouse-client --multiquery --testmode`. `.sh` test is a script that is run by itself.
|
||||
Each test can be one of two types: `.sql` and `.sh`. `.sql` test is the simple SQL script that is piped to `clickhouse-client --multiquery --testmode`. `.sh` test is a script that is run by itself. SQL tests are generally preferable to `.sh` tests. You should use `.sh` tests only when you have to test some feature that cannot be exercised from pure SQL, such as piping some input data into `clickhouse-client` or testing `clickhouse-local`.
|
||||
|
||||
To run all tests, use `clickhouse-test` tool. Look `--help` for the list of possible options. You can simply run all tests or run subset of tests filtered by substring in test name: `./clickhouse-test substring`.
|
||||
|
||||
The most simple way to invoke functional tests is to copy `clickhouse-client` to `/usr/bin/`, run `clickhouse-server` and then run `./clickhouse-test` from its own directory.
|
||||
|
||||
To add new test, create a `.sql` or `.sh` file in `queries/0_stateless` directory, check it manually and then generate `.reference` file in the following way: `clickhouse-client -n --testmode < 00000_test.sql > 00000_test.reference` or `./00000_test.sh > ./00000_test.reference`.
|
||||
|
||||
Tests should use (create, drop, etc) only tables in `test` database that is assumed to be created beforehand; also tests can use temporary tables.
|
||||
|
||||
If you want to use distributed queries in functional tests, you can leverage `remote` table function with `127.0.0.{1..2}` addresses for the server to query itself; or you can use predefined test clusters in server configuration file like `test_shard_localhost`.
|
||||
|
||||
Some tests are marked with `zookeeper`, `shard` or `long` in their names. `zookeeper` is for tests that are using ZooKeeper. `shard` is for tests that requires server to listen `127.0.0.*`; `distributed` or `global` have the same meaning. `long` is for tests that run slightly longer that one second. You can disable these groups of tests using `--no-zookeeper`, `--no-shard` and `--no-long` options, respectively.
|
||||
|
||||
### Running a particular test locally {#functional-test-locally}
|
||||
### Running a Test Locally {#functional-test-locally}
|
||||
|
||||
Start the ClickHouse server locally, listening on the default port (9000). To
|
||||
run, for example, the test `01428_hash_set_nan_key`, change to the repository
|
||||
@ -37,7 +25,32 @@ folder and run the following command:
|
||||
PATH=$PATH:<path to clickhouse-client> tests/clickhouse-test 01428_hash_set_nan_key
|
||||
```
|
||||
|
||||
For more options, see `tests/clickhouse-test --help`.
|
||||
For more options, see `tests/clickhouse-test --help`. You can simply run all tests or run subset of tests filtered by substring in test name: `./clickhouse-test substring`. There are also options to run tests in parallel or in randomized order.
|
||||
|
||||
### Adding a New Test
|
||||
|
||||
To add new test, create a `.sql` or `.sh` file in `queries/0_stateless` directory, check it manually and then generate `.reference` file in the following way: `clickhouse-client -n --testmode < 00000_test.sql > 00000_test.reference` or `./00000_test.sh > ./00000_test.reference`.
|
||||
|
||||
Tests should use (create, drop, etc) only tables in `test` database that is assumed to be created beforehand; also tests can use temporary tables.
|
||||
|
||||
### Choosing the Test Name
|
||||
|
||||
The name of the test starts with a five-digit prefix followed by a descriptive name, such as `00422_hash_function_constexpr.sql`. To choose the prefix, find the largest prefix already present in the directory, and increment it by one. In the meantime, some other tests might be added with the same numeric prefix, but this is OK and doesn't lead to any problems, you don't have to change it later.
|
||||
|
||||
Some tests are marked with `zookeeper`, `shard` or `long` in their names. `zookeeper` is for tests that are using ZooKeeper. `shard` is for tests that requires server to listen `127.0.0.*`; `distributed` or `global` have the same meaning. `long` is for tests that run slightly longer that one second. You can disable these groups of tests using `--no-zookeeper`, `--no-shard` and `--no-long` options, respectively. Make sure to add a proper prefix to your test name if it needs ZooKeeper or distributed queries.
|
||||
|
||||
### Checking for an Error that Must Occur
|
||||
|
||||
Sometimes you want to test that a server error occurs for an incorrect query. We support special annotations for this in SQL tests, in the following form:
|
||||
```
|
||||
select x; -- { serverError 49 }
|
||||
```
|
||||
This test ensures that the server returns an error with code 49 about unknown column `x`. If there is no error, or the error is different, the test will fail. If you want to ensure that an error occurs on the client side, use `clientError` annotation instead.
|
||||
|
||||
### Testing a Distributed Query
|
||||
|
||||
If you want to use distributed queries in functional tests, you can leverage `remote` table function with `127.0.0.{1..2}` addresses for the server to query itself; or you can use predefined test clusters in server configuration file like `test_shard_localhost`. Remember to add the words `shard` or `distributed` to the test name, so that it is ran in CI in correct configurations, where the server is configured to support distributed queries.
|
||||
|
||||
|
||||
## Known Bugs {#known-bugs}
|
||||
|
||||
|
@ -106,7 +106,7 @@ CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs
|
||||
Create table with files named `file000`, `file001`, … , `file999`:
|
||||
|
||||
``` sql
|
||||
CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
|
||||
CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
|
||||
```
|
||||
|
||||
## Virtual Columns {#virtual-columns}
|
||||
|
@ -352,7 +352,7 @@ void ColumnLowCardinality::updatePermutation(bool reverse, size_t limit, int nan
|
||||
auto new_first = first;
|
||||
for (auto j = first + 1; j < last; ++j)
|
||||
{
|
||||
if (compareAt(new_first, j, *this, nan_direction_hint) != 0)
|
||||
if (compareAt(res[new_first], res[j], *this, nan_direction_hint) != 0)
|
||||
{
|
||||
if (j - new_first > 1)
|
||||
new_ranges.emplace_back(new_first, j);
|
||||
@ -376,7 +376,7 @@ void ColumnLowCardinality::updatePermutation(bool reverse, size_t limit, int nan
|
||||
auto new_first = first;
|
||||
for (auto j = first + 1; j < limit; ++j)
|
||||
{
|
||||
if (getDictionary().compareAt(getIndexes().getUInt(new_first), getIndexes().getUInt(j), getDictionary(), nan_direction_hint) != 0)
|
||||
if (getDictionary().compareAt(getIndexes().getUInt(res[new_first]), getIndexes().getUInt(res[j]), getDictionary(), nan_direction_hint) != 0)
|
||||
{
|
||||
if (j - new_first > 1)
|
||||
new_ranges.emplace_back(new_first, j);
|
||||
@ -387,7 +387,7 @@ void ColumnLowCardinality::updatePermutation(bool reverse, size_t limit, int nan
|
||||
auto new_last = limit;
|
||||
for (auto j = limit; j < last; ++j)
|
||||
{
|
||||
if (getDictionary().compareAt(getIndexes().getUInt(new_first), getIndexes().getUInt(j), getDictionary(), nan_direction_hint) == 0)
|
||||
if (getDictionary().compareAt(getIndexes().getUInt(res[new_first]), getIndexes().getUInt(res[j]), getDictionary(), nan_direction_hint) == 0)
|
||||
{
|
||||
std::swap(res[new_last], res[j]);
|
||||
++new_last;
|
||||
|
@ -306,6 +306,9 @@ struct ContextShared
|
||||
|
||||
mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
|
||||
|
||||
mutable std::mutex auxiliary_zookeepers_mutex;
|
||||
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
|
||||
|
||||
String interserver_io_host; /// The host name by which this server is available for other servers.
|
||||
UInt16 interserver_io_port = 0; /// and port.
|
||||
String interserver_io_user;
|
||||
@ -1446,6 +1449,24 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
|
||||
return shared->zookeeper;
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
|
||||
{
|
||||
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);
|
||||
|
||||
auto zookeeper = shared->auxiliary_zookeepers.find(name);
|
||||
if (zookeeper == shared->auxiliary_zookeepers.end())
|
||||
{
|
||||
if (!getConfigRef().has("auxiliary_zookeepers." + name))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown auxiliary ZooKeeper name '{}'. If it's required it can be added to the section <auxiliary_zookeepers> in config.xml", name);
|
||||
|
||||
zookeeper->second = std::make_shared<zkutil::ZooKeeper>(getConfigRef(), "auxiliary_zookeepers." + name);
|
||||
}
|
||||
else if (zookeeper->second->expired())
|
||||
zookeeper->second = zookeeper->second->startNewSession();
|
||||
|
||||
return zookeeper->second;
|
||||
}
|
||||
|
||||
void Context::resetZooKeeper() const
|
||||
{
|
||||
std::lock_guard lock(shared->zookeeper_mutex);
|
||||
|
@ -474,6 +474,8 @@ public:
|
||||
/// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
|
||||
/// If no ZooKeeper configured, throws an exception.
|
||||
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
|
||||
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
|
||||
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
|
||||
/// Has ready or expired ZooKeeper
|
||||
bool hasZooKeeper() const;
|
||||
/// Reset current zookeeper session. Do not create a new one.
|
||||
|
@ -586,8 +586,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
TableLockHolder &,
|
||||
time_t time_of_merge,
|
||||
const ReservationPtr & space_reservation,
|
||||
bool deduplicate,
|
||||
bool force_ttl)
|
||||
bool deduplicate)
|
||||
{
|
||||
static const String TMP_PREFIX = "tmp_merge_";
|
||||
|
||||
@ -635,7 +634,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
new_data_part->partition.assign(future_part.getPartition());
|
||||
new_data_part->is_temp = true;
|
||||
|
||||
bool need_remove_expired_values = force_ttl;
|
||||
bool need_remove_expired_values = false;
|
||||
for (const auto & part : parts)
|
||||
new_data_part->ttl_infos.update(part->ttl_infos);
|
||||
|
||||
@ -810,7 +809,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
||||
merged_stream = std::make_shared<DistinctSortedBlockInputStream>(merged_stream, sort_description, SizeLimits(), 0 /*limit_hint*/, Names());
|
||||
|
||||
if (need_remove_expired_values)
|
||||
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, force_ttl);
|
||||
merged_stream = std::make_shared<TTLBlockInputStream>(merged_stream, data, metadata_snapshot, new_data_part, time_of_merge, false);
|
||||
|
||||
|
||||
if (metadata_snapshot->hasSecondaryIndices())
|
||||
|
@ -111,8 +111,7 @@ public:
|
||||
TableLockHolder & table_lock_holder,
|
||||
time_t time_of_merge,
|
||||
const ReservationPtr & space_reservation,
|
||||
bool deduplicate,
|
||||
bool force_ttl);
|
||||
bool deduplicate);
|
||||
|
||||
/// Mutate a single data part with the specified commands. Will create and return a temporary part.
|
||||
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
|
||||
|
@ -1552,7 +1552,10 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
|
||||
continue;
|
||||
}
|
||||
|
||||
stack.emplace_back(check_order[1].begin, check_order[1].end);
|
||||
if (may_be_true_in_range(check_order[1]))
|
||||
stack.emplace_back(check_order[1].begin, check_order[1].end);
|
||||
else
|
||||
break; // No mark range would suffice
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -82,10 +82,6 @@ struct ReplicatedMergeTreeLogEntryData
|
||||
String column_name;
|
||||
String index_name;
|
||||
|
||||
/// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
|
||||
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
|
||||
bool force_ttl = false;
|
||||
|
||||
/// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory.
|
||||
bool detach = false;
|
||||
|
||||
|
@ -724,13 +724,9 @@ bool StorageMergeTree::merge(
|
||||
|
||||
try
|
||||
{
|
||||
/// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
|
||||
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
|
||||
bool force_ttl = (final && metadata_snapshot->hasAnyTTL());
|
||||
|
||||
new_part = merger_mutator.mergePartsToTemporaryPart(
|
||||
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr),
|
||||
merging_tagger->reserved_space, deduplicate, force_ttl);
|
||||
merging_tagger->reserved_space, deduplicate);
|
||||
|
||||
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
|
||||
write_part_log({});
|
||||
|
@ -113,6 +113,7 @@ namespace ErrorCodes
|
||||
extern const int ALL_REPLICAS_LOST;
|
||||
extern const int REPLICA_STATUS_CHANGED;
|
||||
extern const int CANNOT_ASSIGN_ALTER;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
}
|
||||
|
||||
namespace ActionLocks
|
||||
@ -1396,8 +1397,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
{
|
||||
part = merger_mutator.mergePartsToTemporaryPart(
|
||||
future_merged_part, metadata_snapshot, *merge_entry,
|
||||
table_lock, entry.create_time, reserved_space, entry.deduplicate,
|
||||
entry.force_ttl);
|
||||
table_lock, entry.create_time, reserved_space, entry.deduplicate);
|
||||
|
||||
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
|
||||
|
||||
@ -2497,8 +2497,6 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
|
||||
const auto storage_settings_ptr = getSettings();
|
||||
const bool deduplicate = false; /// TODO: read deduplicate option from table config
|
||||
const bool force_ttl = false;
|
||||
|
||||
CreateMergeEntryResult create_result = CreateMergeEntryResult::Other;
|
||||
|
||||
try
|
||||
@ -2536,7 +2534,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred, nullptr))
|
||||
{
|
||||
create_result = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
|
||||
future_merged_part.name, future_merged_part.type, deduplicate, force_ttl, nullptr, merge_pred.getVersion());
|
||||
future_merged_part.name, future_merged_part.type, deduplicate, nullptr, merge_pred.getVersion());
|
||||
}
|
||||
/// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts
|
||||
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
|
||||
@ -2618,7 +2616,6 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
|
||||
const String & merged_name,
|
||||
const MergeTreeDataPartType & merged_part_type,
|
||||
bool deduplicate,
|
||||
bool force_ttl,
|
||||
ReplicatedMergeTreeLogEntryData * out_log_entry,
|
||||
int32_t log_version)
|
||||
{
|
||||
@ -2653,7 +2650,6 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
|
||||
entry.new_part_name = merged_name;
|
||||
entry.new_part_type = merged_part_type;
|
||||
entry.deduplicate = deduplicate;
|
||||
entry.force_ttl = force_ttl;
|
||||
entry.create_time = time(nullptr);
|
||||
|
||||
for (const auto & part : parts)
|
||||
@ -3121,8 +3117,9 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
|
||||
|
||||
|
||||
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot,
|
||||
const String & source_replica_path, bool to_detached, size_t quorum)
|
||||
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_)
|
||||
{
|
||||
auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
|
||||
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||
|
||||
if (auto part = getPartIfExists(part_info, {IMergeTreeDataPart::State::Outdated, IMergeTreeDataPart::State::Deleting}))
|
||||
@ -3182,7 +3179,6 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
source_part_checksums.computeTotalChecksums(source_part->checksums);
|
||||
|
||||
MinimalisticDataPartChecksums desired_checksums;
|
||||
auto zookeeper = getZooKeeper();
|
||||
String part_path = source_replica_path + "/parts/" + part_name;
|
||||
String part_znode = zookeeper->get(part_path);
|
||||
if (!part_znode.empty())
|
||||
@ -3212,7 +3208,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
}
|
||||
else
|
||||
{
|
||||
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
|
||||
ReplicatedMergeTreeAddress address(zookeeper->get(source_replica_path + "/host"));
|
||||
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
|
||||
auto user_password = global_context.getInterserverCredentials();
|
||||
String interserver_scheme = global_context.getInterserverScheme();
|
||||
@ -3265,6 +3261,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
}
|
||||
else
|
||||
{
|
||||
// The fetched part is valuable and should not be cleaned like a temp part.
|
||||
part->is_temp = false;
|
||||
part->renameTo("detached/" + part_name, true);
|
||||
}
|
||||
}
|
||||
@ -3529,7 +3527,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
|
||||
|
||||
bool StorageReplicatedMergeTree::optimize(
|
||||
const ASTPtr &,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const StorageMetadataPtr &,
|
||||
const ASTPtr & partition,
|
||||
bool final,
|
||||
bool deduplicate,
|
||||
@ -3553,7 +3551,6 @@ bool StorageReplicatedMergeTree::optimize(
|
||||
return false;
|
||||
};
|
||||
|
||||
bool force_ttl = (final && metadata_snapshot->hasAnyTTL());
|
||||
const auto storage_settings_ptr = getSettings();
|
||||
|
||||
if (!partition && final)
|
||||
@ -3586,7 +3583,7 @@ bool StorageReplicatedMergeTree::optimize(
|
||||
ReplicatedMergeTreeLogEntryData merge_entry;
|
||||
CreateMergeEntryResult create_result = createLogEntryToMergeParts(
|
||||
zookeeper, future_merged_part.parts,
|
||||
future_merged_part.name, future_merged_part.type, deduplicate, force_ttl,
|
||||
future_merged_part.name, future_merged_part.type, deduplicate,
|
||||
&merge_entry, can_merge.getVersion());
|
||||
|
||||
if (create_result == CreateMergeEntryResult::MissingPart)
|
||||
@ -3641,7 +3638,7 @@ bool StorageReplicatedMergeTree::optimize(
|
||||
ReplicatedMergeTreeLogEntryData merge_entry;
|
||||
CreateMergeEntryResult create_result = createLogEntryToMergeParts(
|
||||
zookeeper, future_merged_part.parts,
|
||||
future_merged_part.name, future_merged_part.type, deduplicate, force_ttl,
|
||||
future_merged_part.name, future_merged_part.type, deduplicate,
|
||||
&merge_entry, can_merge.getVersion());
|
||||
|
||||
if (create_result == CreateMergeEntryResult::MissingPart)
|
||||
@ -4568,13 +4565,36 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
const String & from_,
|
||||
const Context & query_context)
|
||||
{
|
||||
String from = from_;
|
||||
if (from.empty())
|
||||
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
String partition_id = getPartitionIDFromQuery(partition, query_context);
|
||||
|
||||
String from = from_;
|
||||
zkutil::ZooKeeperPtr zookeeper;
|
||||
if (from[0] != '/')
|
||||
{
|
||||
auto delimiter = from.find(':');
|
||||
if (delimiter == String::npos)
|
||||
throw Exception("Zookeeper path should start with '/' or '<auxiliary_zookeeper_name>:/'", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
auto auxiliary_zookeeper_name = from.substr(0, delimiter);
|
||||
from = from.substr(delimiter + 1, String::npos);
|
||||
if (from.empty())
|
||||
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
zookeeper = global_context.getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
|
||||
|
||||
LOG_INFO(log, "Will fetch partition {} from shard {} (auxiliary zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
zookeeper = getZooKeeper();
|
||||
|
||||
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
|
||||
}
|
||||
|
||||
if (from.back() == '/')
|
||||
from.resize(from.size() - 1);
|
||||
|
||||
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
|
||||
|
||||
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
|
||||
* Unreliable (there is a race condition) - such a partition may appear a little later.
|
||||
@ -4597,7 +4617,6 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
String best_replica;
|
||||
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
|
||||
/// List of replicas of source shard.
|
||||
replicas = zookeeper->getChildren(from + "/replicas");
|
||||
@ -4666,7 +4685,7 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
if (try_no >= query_context.getSettings().max_fetch_partition_retries_count)
|
||||
throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
|
||||
|
||||
Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
|
||||
Strings parts = zookeeper->getChildren(best_replica_path + "/parts");
|
||||
ActiveDataPartSet active_parts_set(format_version, parts);
|
||||
Strings parts_to_fetch;
|
||||
|
||||
@ -4706,7 +4725,7 @@ void StorageReplicatedMergeTree::fetchPartition(
|
||||
{
|
||||
try
|
||||
{
|
||||
fetchPart(part, metadata_snapshot, best_replica_path, true, 0);
|
||||
fetchPart(part, metadata_snapshot, best_replica_path, true, 0, zookeeper);
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
|
@ -449,7 +449,6 @@ private:
|
||||
const String & merged_name,
|
||||
const MergeTreeDataPartType & merged_part_type,
|
||||
bool deduplicate,
|
||||
bool force_ttl,
|
||||
ReplicatedMergeTreeLogEntryData * out_log_entry,
|
||||
int32_t log_version);
|
||||
|
||||
@ -478,7 +477,13 @@ private:
|
||||
* If quorum != 0, then the node for tracking the quorum is updated.
|
||||
* Returns false if part is already fetching right now.
|
||||
*/
|
||||
bool fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot, const String & replica_path, bool to_detached, size_t quorum);
|
||||
bool fetchPart(
|
||||
const String & part_name,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const String & replica_path,
|
||||
bool to_detached,
|
||||
size_t quorum,
|
||||
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr);
|
||||
|
||||
/// Required only to avoid races between executeLogEntry and fetchPartition
|
||||
std::unordered_set<String> currently_fetching_parts;
|
||||
|
@ -0,0 +1,28 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>zoo1</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<node index="2">
|
||||
<host>zoo2</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<node index="3">
|
||||
<host>zoo3</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
</zookeeper>
|
||||
<auxiliary_zookeepers>
|
||||
<zookeeper2>
|
||||
<node index="1">
|
||||
<host>zoo1</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<node index="2">
|
||||
<host>zoo2</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
</zookeeper2>
|
||||
</auxiliary_zookeepers>
|
||||
</yandex>
|
@ -0,0 +1,42 @@
|
||||
from __future__ import print_function
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
import helpers
|
||||
import pytest
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance("node", main_configs=["configs/zookeeper_config.xml"], with_zookeeper=True)
|
||||
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_fetch_part_from_allowed_zookeeper(start_cluster):
|
||||
node.query(
|
||||
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
|
||||
)
|
||||
node.query("INSERT INTO simple VALUES ('2020-08-27', 1)")
|
||||
|
||||
node.query(
|
||||
"CREATE TABLE simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
|
||||
)
|
||||
node.query(
|
||||
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
|
||||
)
|
||||
node.query("ALTER TABLE simple2 ATTACH PARTITION '2020-08-27';")
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node.query(
|
||||
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper:/clickhouse/tables/0/simple';"
|
||||
)
|
||||
|
||||
assert node.query("SELECT id FROM simple2").strip() == "1"
|
@ -0,0 +1,31 @@
|
||||
green 2020-08-21 18:46:08
|
||||
green 2020-08-21 18:46:07
|
||||
green 2020-08-21 18:46:06
|
||||
red 2020-08-21 18:46:08
|
||||
red 2020-08-21 18:46:07
|
||||
red 2020-08-21 18:46:06
|
||||
|
||||
|
||||
green 2020-08-21 18:46:08.000
|
||||
green 2020-08-21 18:46:07.000
|
||||
green 2020-08-21 18:46:06.000
|
||||
red 2020-08-21 18:46:08.000
|
||||
red 2020-08-21 18:46:07.000
|
||||
red 2020-08-21 18:46:06.000
|
||||
|
||||
------cast to String----
|
||||
|
||||
green 2020-08-21 18:46:08
|
||||
green 2020-08-21 18:46:07
|
||||
green 2020-08-21 18:46:06
|
||||
red 2020-08-21 18:46:08
|
||||
red 2020-08-21 18:46:07
|
||||
red 2020-08-21 18:46:06
|
||||
|
||||
|
||||
green 2020-08-21 18:46:08.000
|
||||
green 2020-08-21 18:46:07.000
|
||||
green 2020-08-21 18:46:06.000
|
||||
red 2020-08-21 18:46:08.000
|
||||
red 2020-08-21 18:46:07.000
|
||||
red 2020-08-21 18:46:06.000
|
@ -0,0 +1,41 @@
|
||||
drop table if exists order_test1;
|
||||
|
||||
create table order_test1
|
||||
(
|
||||
timestamp DateTime64(3),
|
||||
color LowCardinality(String)
|
||||
) engine = MergeTree() ORDER BY tuple();
|
||||
|
||||
insert into order_test1 values ('2020-08-21 18:46:08.000','red')('2020-08-21 18:46:08.000','green');
|
||||
insert into order_test1 values ('2020-08-21 18:46:07.000','red')('2020-08-21 18:46:07.000','green');
|
||||
insert into order_test1 values ('2020-08-21 18:46:06.000','red')('2020-08-21 18:46:06.000','green');
|
||||
|
||||
SELECT color, toDateTime(timestamp) AS second
|
||||
FROM order_test1
|
||||
GROUP BY color, second
|
||||
ORDER BY color ASC, second DESC;
|
||||
|
||||
select '';
|
||||
select '';
|
||||
|
||||
SELECT color, timestamp
|
||||
FROM order_test1
|
||||
GROUP BY color, timestamp
|
||||
ORDER BY color ASC, timestamp DESC;
|
||||
|
||||
select '';
|
||||
select '------cast to String----';
|
||||
select '';
|
||||
|
||||
SELECT cast(color,'String') color, toDateTime(timestamp) AS second
|
||||
FROM order_test1
|
||||
GROUP BY color, second
|
||||
ORDER BY color ASC, second DESC;
|
||||
|
||||
select '';
|
||||
select '';
|
||||
|
||||
SELECT cast(color,'String') color, timestamp
|
||||
FROM order_test1
|
||||
GROUP BY color, timestamp
|
||||
ORDER BY color ASC, timestamp DESC;
|
@ -0,0 +1,12 @@
|
||||
DROP TABLE IF EXISTS pk;
|
||||
|
||||
CREATE TABLE pk (d Date DEFAULT '2000-01-01', x DateTime, y UInt64, z UInt64) ENGINE = MergeTree() PARTITION BY d ORDER BY (toStartOfMinute(x), y, z) SETTINGS index_granularity_bytes=19, min_index_granularity_bytes=1, write_final_mark = 0; -- one row granule
|
||||
|
||||
INSERT INTO pk (x, y, z) VALUES (1, 11, 1235), (2, 11, 4395), (3, 22, 3545), (4, 22, 6984), (5, 33, 4596), (61, 11, 4563), (62, 11, 4578), (63, 11, 3572), (64, 22, 5786), (65, 22, 5786), (66, 22, 2791), (67, 22, 2791), (121, 33, 2791), (122, 33, 2791), (123, 33, 1235), (124, 44, 4935), (125, 44, 4578), (126, 55, 5786), (127, 55, 2791), (128, 55, 1235);
|
||||
|
||||
SET max_block_size = 1;
|
||||
SET max_rows_to_read = 5;
|
||||
|
||||
SELECT toUInt32(x), y, z FROM pk WHERE (x >= toDateTime(100000)) AND (x <= toDateTime(3));
|
||||
|
||||
DROP TABLE IF EXISTS pk;
|
Loading…
Reference in New Issue
Block a user