Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2018-07-06 03:27:58 +03:00
commit 5d52be3c7e
23 changed files with 205 additions and 166 deletions

View File

@ -18,9 +18,6 @@ BlockIO InterpreterOptimizeQuery::execute()
{
const ASTOptimizeQuery & ast = typeid_cast<const ASTOptimizeQuery &>(*query_ptr);
if (ast.final && !ast.partition)
throw Exception("FINAL flag for OPTIMIZE query is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS);
StoragePtr table = context.getTable(ast.database, ast.table);
auto table_lock = table->lockStructure(true, __PRETTY_FUNCTION__);
table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context);

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <common/logger_useful.h>
@ -11,59 +11,59 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
AbandonableLockInZooKeeper::AbandonableLockInZooKeeper(
EphemeralLockInZooKeeper::EphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops)
: zookeeper(&zookeeper_), path_prefix(path_prefix_)
{
String abandonable_path = temp_path + "/abandonable_lock-";
/// The /abandonable_lock- name is for backward compatibility.
String holder_path_prefix = temp_path + "/abandonable_lock-";
/// Let's create an secondary ephemeral node.
if (!precheck_ops || precheck_ops->empty())
{
holder_path = zookeeper->create(abandonable_path, "", zkutil::CreateMode::EphemeralSequential);
holder_path = zookeeper->create(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential);
}
else
{
precheck_ops->emplace_back(zkutil::makeCreateRequest(abandonable_path, "", zkutil::CreateMode::EphemeralSequential));
precheck_ops->emplace_back(zkutil::makeCreateRequest(holder_path_prefix, "", zkutil::CreateMode::EphemeralSequential));
zkutil::Responses op_results = zookeeper->multi(*precheck_ops);
holder_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
}
/// Write the path to the secondary node in the main node.
path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
path = zookeeper->create(path_prefix, holder_path, zkutil::CreateMode::EphemeralSequential);
if (path.size() <= path_prefix.size())
throw Exception("Logical error: name of sequential node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: name of the main node is shorter than prefix.", ErrorCodes::LOGICAL_ERROR);
}
void AbandonableLockInZooKeeper::unlock()
void EphemeralLockInZooKeeper::unlock()
{
checkCreated();
zookeeper->remove(path);
zookeeper->remove(holder_path);
zkutil::Requests ops;
getUnlockOps(ops);
zookeeper->multi(ops);
holder_path = "";
}
void AbandonableLockInZooKeeper::getUnlockOps(zkutil::Requests & ops)
void EphemeralLockInZooKeeper::getUnlockOps(zkutil::Requests & ops)
{
checkCreated();
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1));
}
AbandonableLockInZooKeeper::~AbandonableLockInZooKeeper()
EphemeralLockInZooKeeper::~EphemeralLockInZooKeeper()
{
if (!zookeeper || holder_path.empty())
if (!isCreated())
return;
try
{
zookeeper->tryRemove(holder_path);
zookeeper->trySet(path, ""); /// It's not strictly necessary.
unlock();
}
catch (...)
{
tryLogCurrentException("~AbandonableLockInZooKeeper");
tryLogCurrentException("~EphemeralLockInZooKeeper");
}
}

View File

@ -13,33 +13,24 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
/** The synchronization is primitive. Works as follows:
* Creates a non-ephemeral incremental node and marks it as locked (LOCKED).
* `unlock()` unlocks it (UNLOCKED).
* When the destructor is called or the session ends in ZooKeeper, it goes into the ABANDONED state.
* (Including when the program is halted).
*/
class AbandonableLockInZooKeeper : public boost::noncopyable
/// A class that is used for locking a block number in a partition.
/// It creates a secondary ephemeral node in `temp_path` and a main ephemeral node with `path_prefix`
/// that references the secondary node. The reasons for this two-level scheme are historical (of course
/// it would be simpler to allocate block numbers for all partitions in one ZK directory).
class EphemeralLockInZooKeeper : public boost::noncopyable
{
public:
enum State
{
UNLOCKED,
LOCKED,
ABANDONED,
};
AbandonableLockInZooKeeper(
EphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops = nullptr);
AbandonableLockInZooKeeper() = default;
EphemeralLockInZooKeeper() = default;
AbandonableLockInZooKeeper(AbandonableLockInZooKeeper && rhs) noexcept
EphemeralLockInZooKeeper(EphemeralLockInZooKeeper && rhs) noexcept
{
*this = std::move(rhs);
}
AbandonableLockInZooKeeper & operator=(AbandonableLockInZooKeeper && rhs) noexcept
EphemeralLockInZooKeeper & operator=(EphemeralLockInZooKeeper && rhs) noexcept
{
zookeeper = rhs.zookeeper;
rhs.zookeeper = nullptr;
@ -82,10 +73,10 @@ public:
void checkCreated() const
{
if (!isCreated())
throw Exception("AbandonableLock is not created", ErrorCodes::LOGICAL_ERROR);
throw Exception("EphemeralLock is not created", ErrorCodes::LOGICAL_ERROR);
}
~AbandonableLockInZooKeeper();
~EphemeralLockInZooKeeper();
private:
zkutil::ZooKeeper * zookeeper = nullptr;
@ -95,8 +86,7 @@ private:
};
/// Acquires block number locks in all partitions. The class is called Ephemeral- instead of Abandonable-
/// because it creates ephemeral block nodes (there is no need to leave abandoned tombstones).
/// Acquires block number locks in all partitions.
class EphemeralLocksInAllPartitions
{
public:

View File

@ -935,12 +935,12 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
* When M > N parts could be replaced?
* - new block was added in ReplicatedMergeTreeBlockOutputStream;
* - it was added to working dataset in memory and renamed on filesystem;
* - but ZooKeeper transaction that add its to reference dataset in ZK and unlocks AbandonableLock is failed;
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed;
* - and it is failed due to connection loss, so we don't rollback working dataset in memory,
* because we don't know if the part was added to ZK or not
* (see ReplicatedMergeTreeBlockOutputStream)
* - then method selectPartsToMerge selects a range and see, that AbandonableLock for this part is abandoned,
* and so, it is possible to merge a range skipping this part.
* - then method selectPartsToMerge selects a range and sees, that EphemeralLock for the block in this part is unlocked,
* and so it is possible to merge a range skipping this part.
* (NOTE: Merging with part that is not in ZK is not possible, see checks in 'createLogEntryToMergeParts'.)
* - and after merge, this part will be removed in addition to parts that was merged.
*/

View File

@ -1,5 +1,5 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Interpreters/PartLog.h>

View File

@ -1266,15 +1266,19 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
prev_virtual_parts = queue.virtual_parts;
}
/// Load current quorum status.
auto quorum_last_part_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/last_part");
auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status");
/// Load current inserts
std::unordered_set<String> abandonable_lock_holders;
std::unordered_set<String> lock_holder_paths;
for (const String & entry : zookeeper->getChildren(queue.zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(queue.zookeeper_path + "/temp/" + entry);
lock_holder_paths.insert(queue.zookeeper_path + "/temp/" + entry);
}
if (!abandonable_lock_holders.empty())
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(queue.zookeeper_path + "/block_numbers");
std::vector<std::future<zkutil::ListResponse>> lock_futures;
@ -1310,21 +1314,22 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
if (!resp.error && lock_holder_paths.count(resp.data))
committing_blocks[block.partition].insert(block.number);
}
}
queue_.pullLogsToQueue(zookeeper);
/// Load current quorum status.
zookeeper->tryGet(queue.zookeeper_path + "/quorum/last_part", last_quorum_part);
zkutil::GetResponse quorum_last_part_response = quorum_last_part_future.get();
if (!quorum_last_part_response.error)
last_quorum_part = quorum_last_part_response.data;
String quorum_status_str;
if (zookeeper->tryGet(queue.zookeeper_path + "/quorum/status", quorum_status_str))
zkutil::GetResponse quorum_status_response = quorum_status_future.get();
if (!quorum_status_response.error)
{
ReplicatedMergeTreeQuorumEntry quorum_status;
quorum_status.fromString(quorum_status_str);
quorum_status.fromString(quorum_status_response.data);
inprogress_quorum_part = quorum_status.part_name;
}
else
@ -1338,7 +1343,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
/// A sketch of a proof of why this method actually works:
///
/// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right.
/// Inserted parts get their block numbers by acquiring an abandonable lock (see AbandonableLockInZooKeeper.h).
/// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h).
/// These block numbers are monotonically increasing in a partition.
///
/// Because there is a window between the moment the inserted part gets its block number and

View File

@ -45,6 +45,11 @@ void MutationCommands::validate(const IStorage & table, const Context & context)
case MutationCommand::DELETE:
{
auto actions = ExpressionAnalyzer(command.predicate, context, {}, all_columns).getActions(true);
/// Try executing the resulting actions on the table sample block to detect malformed queries.
auto table_sample_block = table.getSampleBlock();
actions->execute(table_sample_block);
const ColumnWithTypeAndName & predicate_column = actions->getSampleBlock().getByName(
command.predicate->getColumnName());
checkColumnCanBeUsedAsFilter(predicate_column);

View File

@ -3,8 +3,9 @@
#include <Databases/IDatabase.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/StorageDistributed.h>
#include <Storages/VirtualColumnFactory.h>
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageFactory.h>
@ -316,10 +317,25 @@ void StorageDistributed::truncate(const ASTPtr &)
}
}
namespace
{
/// NOTE This is weird. Get rid of this.
std::map<String, String> virtual_columns =
{
{"_table", "String"},
{"_part", "String"},
{"_part_index", "UInt64"},
{"_sample_factor", "Float64"},
};
}
NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
{
if (const auto & type = VirtualColumnFactory::tryGetType(column_name))
return { column_name, type };
auto it = virtual_columns.find(column_name);
if (it != virtual_columns.end())
return { it->first, DataTypeFactory::instance().get(it->second) };
return getColumns().getPhysical(column_name);
}
@ -327,7 +343,7 @@ NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
bool StorageDistributed::hasColumn(const String & column_name) const
{
return VirtualColumnFactory::hasColumn(column_name) || getColumns().hasPhysical(column_name);
return virtual_columns.count(column_name) || getColumns().hasPhysical(column_name);
}
void StorageDistributed::createDirectoryMonitors()

View File

@ -11,8 +11,6 @@
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnFactory.h>
#include <Common/typeid_cast.h>

View File

@ -9,7 +9,6 @@
#include <Storages/StorageMerge.h>
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/VirtualColumnFactory.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -48,22 +47,38 @@ StorageMerge::StorageMerge(
}
/// NOTE Structure of underlying tables as well as their set are not constant,
/// so the results of these methods may become obsolete after the call.
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
{
auto type = VirtualColumnFactory::tryGetType(column_name);
if (type)
return NameAndTypePair(column_name, type);
/// virtual column of the Merge table itself
if (column_name == "_table")
return { column_name, std::make_shared<DataTypeString>() };
/// virtual (and real) columns of the underlying tables
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->getColumn(column_name);
return IStorage::getColumn(column_name);
}
bool StorageMerge::hasColumn(const String & column_name) const
{
return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name);
if (column_name == "_table")
return true;
auto first_table = getFirstTable([](auto &&) { return true; });
if (first_table)
return first_table->hasColumn(column_name);
return IStorage::hasColumn(column_name);
}
bool StorageMerge::isRemote() const
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
auto database = context.getDatabase(source_database);
auto iterator = database->getIterator(context);
@ -73,14 +88,21 @@ bool StorageMerge::isRemote() const
if (table_name_regexp.match(iterator->name()))
{
auto & table = iterator->table();
if (table.get() != this && table->isRemote())
return true;
if (table.get() != this && predicate(table))
return table;
}
iterator->next();
}
return false;
return {};
}
bool StorageMerge::isRemote() const
{
auto first_remote_table = getFirstTable([](const StoragePtr & table) { return table->isRemote(); });
return first_remote_table != nullptr;
}

View File

@ -58,6 +58,9 @@ private:
Block getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;
protected:
StorageMerge(
const std::string & name_,

View File

@ -479,11 +479,32 @@ bool StorageMergeTree::optimize(
partition_id = data.getPartitionIDFromQuery(partition, context);
String disable_reason;
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason))
if (!partition && final)
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
std::unordered_set<String> partition_ids;
for (const MergeTreeData::DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
{
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, true, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
}
}
else
{
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
}
return true;

View File

@ -106,6 +106,7 @@ namespace ErrorCodes
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int KEEPER_EXCEPTION;
extern const int BAD_ARGUMENTS;
}
namespace ActionLocks
@ -2062,7 +2063,10 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
{
restarting_thread->wakeup();
return;
}
queue_updating_task->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
@ -2279,14 +2283,20 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry)
{
bool all_in_zk = true;
std::vector<std::future<zkutil::ExistsResponse>> exists_futures;
exists_futures.reserve(parts.size());
for (const auto & part : parts)
exists_futures.emplace_back(zookeeper->asyncExists(replica_path + "/parts/" + part->name));
bool all_in_zk = true;
for (size_t i = 0; i < parts.size(); ++i)
{
/// If there is no information about part in ZK, we will not merge it.
if (!zookeeper->exists(replica_path + "/parts/" + part->name))
if (exists_futures[i].get().error == ZooKeeperImpl::ZooKeeper::ZNONODE)
{
all_in_zk = false;
const auto & part = parts[i];
if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
{
LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)"
@ -2297,6 +2307,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
}
}
}
if (!all_in_zk)
return false;
@ -2313,16 +2324,6 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
const String & partition_id = parts[0]->info.partition_id;
for (size_t i = 0; i + 1 < parts.size(); ++i)
{
/// Remove the unnecessary entries about non-existent blocks.
for (Int64 number = parts[i]->info.max_block + 1; number <= parts[i + 1]->info.min_block - 1; ++number)
{
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + partition_id + "/block-" + padIndex(number));
}
}
if (out_log_entry)
*out_log_entry = entry;
@ -2914,6 +2915,9 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
if (!partition)
{
if (final)
throw Exception("FINAL flag for OPTIMIZE query on Replicated table is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS);
selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
@ -3404,7 +3408,7 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
}
std::optional<AbandonableLockInZooKeeper>
std::optional<EphemeralLockInZooKeeper>
StorageReplicatedMergeTree::allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper, const String & zookeeper_block_id_path)
{
@ -3434,11 +3438,11 @@ StorageReplicatedMergeTree::allocateBlockNumber(
zkutil::KeeperMultiException::check(code, ops, responses);
}
AbandonableLockInZooKeeper lock;
EphemeralLockInZooKeeper lock;
/// 2 RTT
try
{
lock = AbandonableLockInZooKeeper(
lock = EphemeralLockInZooKeeper(
partition_path + "/block-", zookeeper_path + "/temp", *zookeeper, &deduplication_check_ops);
}
catch (const zkutil::KeeperMultiException & e)
@ -4375,7 +4379,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
MergeTreeData::MutableDataPartsVector dst_parts;
Strings block_id_paths;
Strings part_checksums;
std::vector<AbandonableLockInZooKeeper> abandonable_locks;
std::vector<EphemeralLockInZooKeeper> ephemeral_locks;
LOG_DEBUG(log, "Cloning " << src_all_parts.size() << " parts");
@ -4431,7 +4435,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
abandonable_locks.emplace_back(std::move(*lock));
ephemeral_locks.emplace_back(std::move(*lock));
block_id_paths.emplace_back(block_id_path);
part_checksums.emplace_back(hash_hex);
}
@ -4472,7 +4476,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
for (size_t i = 0; i < dst_parts.size(); ++i)
{
getCommitPartOps(ops, dst_parts[i], block_id_paths[i]);
abandonable_locks[i].getUnlockOps(ops);
ephemeral_locks[i].getUnlockOps(ops);
if (ops.size() > zkutil::MULTI_BATCH_SIZE)
{
@ -4513,7 +4517,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
String log_znode_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : abandonable_locks)
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
/// Forcibly remove replaced parts from ZooKeeper

View File

@ -14,7 +14,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
@ -460,8 +460,9 @@ private:
void updateQuorum(const String & part_name);
/// Creates new block number if block with such block_id does not exist
std::optional<AbandonableLockInZooKeeper> allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "");
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "");
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .

View File

@ -1,37 +0,0 @@
#include <Storages/VirtualColumnFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
DataTypePtr VirtualColumnFactory::getType(const String & name)
{
auto res = tryGetType(name);
if (!res)
throw Exception("There is no column " + name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
return res;
}
bool VirtualColumnFactory::hasColumn(const String & name)
{
return !!tryGetType(name);
}
DataTypePtr VirtualColumnFactory::tryGetType(const String & name)
{
if (name == "_table") return std::make_shared<DataTypeString>();
if (name == "_part") return std::make_shared<DataTypeString>();
if (name == "_part_index") return std::make_shared<DataTypeUInt64>();
if (name == "_sample_factor") return std::make_shared<DataTypeFloat64>();
return nullptr;
}
}

View File

@ -1,20 +0,0 @@
#pragma once
#include <DataTypes/IDataType.h>
namespace DB
{
/** Knows the names and types of all possible virtual columns.
* It is necessary for engines that redirect a request to other tables without knowing in advance what virtual columns they contain.
*/
class VirtualColumnFactory
{
public:
static bool hasColumn(const String & name);
static DataTypePtr getType(const String & name);
static DataTypePtr tryGetType(const String & name);
};
}

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>

View File

@ -4,7 +4,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <ext/scope_guard.h>
#include <pcg_random.hpp>
@ -37,17 +37,17 @@ try
Stopwatch total;
Stopwatch stage;
/// Load current inserts
std::unordered_set<String> abandonable_lock_holders;
std::unordered_set<String> lock_holder_paths;
for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(zookeeper_path + "/temp/" + entry);
lock_holder_paths.insert(zookeeper_path + "/temp/" + entry);
}
std::cerr << "Stage 1 (get lock holders): " << abandonable_lock_holders.size()
std::cerr << "Stage 1 (get lock holders): " << lock_holder_paths.size()
<< " lock holders, elapsed: " << stage.elapsedSeconds() << "s." << std::endl;
stage.restart();
if (!abandonable_lock_holders.empty())
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
std::cerr << "Stage 2 (get partitions): " << partitions.size()
@ -86,7 +86,7 @@ try
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
if (!resp.error && lock_holder_paths.count(resp.data))
{
++total_count;
current_inserts[block.partition].insert(block.number);

View File

@ -134,15 +134,19 @@ def test_insert_multithreaded(started_cluster):
# Sanity check: at least something was inserted
assert runner.total_inserted > 0
for i in range(30): # wait for replication 3 seconds max
all_replicated = False
for i in range(50): # wait for replication 5 seconds max
time.sleep(0.1)
def get_delay(node):
return int(node.query("SELECT absolute_delay FROM system.replicas WHERE table = 'repl_test'").rstrip())
if all([get_delay(n) == 0 for n in nodes]):
all_replicated = True
break
assert all_replicated
actual_inserted = []
for i, node in enumerate(nodes):
actual_inserted.append(int(node.query("SELECT sum(x) FROM repl_test").rstrip()))

View File

@ -1,3 +1,5 @@
Query should fail 1
Query should fail 2
2000-01-01 2 b
2000-01-01 5 e
2000-02-01 2 b

View File

@ -13,17 +13,22 @@ ${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.mutations_r2(d Date, x UInt32, s
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE x = 1"
# Insert some data
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1 VALUES ('2000-01-01', 1, 'a')"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1 VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 1, 'a')"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 2, 'b'), ('2000-01-01', 3, 'c'), ('2000-01-01', 4, 'd') \
('2000-02-01', 2, 'b'), ('2000-02-01', 3, 'c'), ('2000-02-01', 4, 'd')"
# Try some malformed queries that should fail validation.
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE nonexistent = 0" 2>/dev/null || echo "Query should fail 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE d = '11'" 2>/dev/null || echo "Query should fail 2"
# Delete some values
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE x % 2 = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE s = 'd'"
# Insert more data
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1 VALUES \
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 5, 'e'), ('2000-02-01', 5, 'e')"
# Wait until all mutations are done.
@ -36,7 +41,6 @@ do
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for mutations to execute!"
exit 1
fi
done

View File

@ -0,0 +1,7 @@
2000-01-01 1 first 1
2000-01-01 1 first 2
2000-01-01 2 first 2
2000-01-02 1 first 3
2000-01-01 1 first 3
2000-01-01 2 first 2
2000-01-02 1 first 3

View File

@ -0,0 +1,17 @@
DROP TABLE IF EXISTS test.partitioned_by_tuple;
CREATE TABLE test.partitioned_by_tuple (d Date, x UInt8, w String, y UInt8) ENGINE SummingMergeTree (y) PARTITION BY (d, x) ORDER BY (d, x, w);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-02', 1, 'first', 3);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 2, 'first', 2);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 1, 'first', 1), ('2000-01-01', 1, 'first', 2);
OPTIMIZE TABLE test.partitioned_by_tuple;
SELECT * FROM test.partitioned_by_tuple ORDER BY d, x, w, y;
OPTIMIZE TABLE test.partitioned_by_tuple FINAL;
SELECT * FROM test.partitioned_by_tuple ORDER BY d, x, w, y;
DROP TABLE test.partitioned_by_tuple;