fix several critical races, fix tests

This commit is contained in:
Sema Checherinda 2022-09-23 00:51:13 +02:00
parent 49539e6862
commit db86cd0f0a
34 changed files with 469 additions and 181 deletions

View File

@ -213,7 +213,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
TableExclusiveLockHolder table_excl_lock;
/// We don't need any lock for ReplicatedMergeTree and for simple MergeTree
/// For the rest of tables types exclusive lock is needed
if (!table->supportsReplication() && !std::dynamic_pointer_cast<MergeTreeData>(table))
if (!std::dynamic_pointer_cast<MergeTreeData>(table))
table_excl_lock = table->lockExclusively(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();

View File

@ -1686,7 +1686,19 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_dir_name)
{
temporary_parts.add(part_dir_name);
bool inserted = temporary_parts.add(part_dir_name);
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary part {} already added", part_dir_name);
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
}
std::optional<scope_guard> MergeTreeData::tryGetTemporaryPartDirectoryHolder(const String & part_dir_name)
{
bool inserted = temporary_parts.add(part_dir_name);
if (!inserted)
return {};
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
}
@ -1827,6 +1839,8 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
(*it)->assertState({DataPartState::Deleting});
LOG_DEBUG(log, "Finally removing part from memory {}", part->name);
data_parts_indexes.erase(it);
}
}
@ -1922,6 +1936,8 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts, bool
{
get_failed_parts();
LOG_DEBUG(log, "Failed to remove all parts, all count {}, removed {}", parts.size(), part_names_succeed.size());
if (throw_on_error)
throw;
}
@ -2111,6 +2127,7 @@ size_t MergeTreeData::clearEmptyParts()
{
if (part->rows_count != 0)
continue;
LOG_TRACE(log, "Find empty part {}", part->name);
/// Do not try to drop uncommitted parts.
if (!part->version.getCreationTID().isPrehistoric() && !part->version.isVisible(TransactionLog::instance().getLatestSnapshot()))
@ -2120,12 +2137,15 @@ size_t MergeTreeData::clearEmptyParts()
/// Otherwise covered parts resurrect
{
auto lock = lockParts();
if (part->getState() != DataPartState::Active)
continue;
DataPartsVector covered_parts = getCoveredOutdatedParts(part->info, lock);
if (!covered_parts.empty())
continue;
}
LOG_TRACE(log, "Will drop empty part {}", part->name);
LOG_INFO(log, "Will drop empty part {}", part->name);
dropPartNoWaitNoThrow(part->name);
++cleared_count;
@ -5258,12 +5278,13 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
part->getDataPartStorage().commitTransaction();
bool commit_to_wal = has_in_memory_parts && settings->in_memory_parts_enable_wal;
MergeTreeData::WriteAheadLogPtr wal;
if (commit_to_wal)
wal = data.getWriteAheadLog();
if (txn || commit_to_wal)
{
MergeTreeData::WriteAheadLogPtr wal;
if (commit_to_wal)
wal = data.getWriteAheadLog();
for (const auto & part : precommitted_parts)
{
if (txn)
@ -5321,6 +5342,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
data.modifyPartState(covered_part, DataPartState::Outdated);
data.removePartContributionToColumnAndSecondaryIndexSizes(covered_part);
if (auto part_in_memory = asInMemoryPart(covered_part))
wal->dropPart(part_in_memory->name);
}
reduce_parts += covered_parts.size();
@ -6576,6 +6600,7 @@ bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const
if (canUseAdaptiveGranularity() && !src_part->index_granularity_info.mark_type.adaptive)
return false;
}
return true;
}
@ -7244,13 +7269,13 @@ void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
}
}
MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name, const MergeTreeTransactionPtr & txn)
MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name,
const MergeTreeTransactionPtr & txn)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
auto settings = getSettings();
constexpr static auto TMP_PREFIX = "tmp_empty_";
auto block = metadata_snapshot->getSampleBlock();
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
setAllObjectsToDummyTupleType(columns);
@ -7267,7 +7292,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(MergeTreePartIn
choosePartType(0, block.rows()),
new_part_info,
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + new_part_name);
EMPTY_PART_TMP_PREFIX + new_part_name);
new_data_part->name = new_part_name;
@ -7292,7 +7317,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(MergeTreePartIn
if (new_data_part->volume->getDisk()->exists(full_path))
{
LOG_WARNING(log, "Removing old temporary directory {}", fullPath(new_data_part->volume->getDisk(), full_path));
new_data_part->volume->getDisk()->removeRecursive(full_path);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"New empty part is about to matirialize but the dirrectory already exist"
", new part {}"
", directory {}",
new_part_name, full_path);
}
const auto disk = new_data_part->volume->getDisk();

View File

@ -918,6 +918,7 @@ public:
using WriteAheadLogPtr = std::shared_ptr<MergeTreeWriteAheadLog>;
WriteAheadLogPtr getWriteAheadLog();
constexpr static auto EMPTY_PART_TMP_PREFIX = "tmp_empty_";
MergeTreeData::MutableDataPartPtr createEmptyPart(MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, const String & new_part_name, const MergeTreeTransactionPtr & txn);
MergeTreeDataFormatVersion format_version;
@ -1029,6 +1030,7 @@ public:
/// Returns an object that protects temporary directory from cleanup
scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name);
std::optional<scope_guard> tryGetTemporaryPartDirectoryHolder(const String & part_dir_name);
protected:
friend class IMergeTreeDataPart;

View File

@ -15,12 +15,11 @@ bool TemporaryParts::contains(const std::string & basename) const
return parts.contains(basename);
}
void TemporaryParts::add(const std::string & basename)
bool TemporaryParts::add(const std::string & basename)
{
std::lock_guard lock(mutex);
bool inserted = parts.emplace(basename).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary part {} already added", basename);
return inserted;
}
void TemporaryParts::remove(const std::string & basename)

View File

@ -18,7 +18,7 @@ private:
/// NOTE: It is pretty short, so use STL is fine.
std::unordered_set<std::string> parts;
void add(const std::string & basename);
bool add(const std::string & basename);
void remove(const std::string & basename);
friend class MergeTreeData;

View File

@ -53,7 +53,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int ABORTED;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int SERIALIZATION_ERROR;
}
namespace ActionLocks
@ -1421,12 +1421,12 @@ void StorageMergeTree::dropPartNoWaitNoThrow(const String & part_name)
deduplication_log->dropPart(part->info);
}
LOG_INFO(log, "Removed 1 parts.");
/// Need to destroy part objects before clearing them from filesystem.
part.reset();
clearOldPartsFromFilesystem();
LOG_INFO(log, "Removed 1 part {}.", part_name);
}
/// Else nothing to do, part was removed in some different way
@ -1449,7 +1449,7 @@ RangesWithContinuousBlocks groupByRangesWithContinuousBlocks(DataPartsVector par
}
auto last_part_in_prev_range = result.back().back();
if (last_part_in_prev_range->info.max_block+1 == part->info.min_block)
if (last_part_in_prev_range->info.max_block + 1 == part->info.min_block)
result.back().push_back(part);
else
result.push_back({part});
@ -1458,80 +1458,122 @@ RangesWithContinuousBlocks groupByRangesWithContinuousBlocks(DataPartsVector par
return result;
}
struct PartitionInfo
struct FutureNewEmptyPart
{
MergeTreePartition partition{};
Int64 min_block = std::numeric_limits<Int64>::max();
Int64 max_block = 0;
UInt32 max_level = 0;
Int64 max_mutation = 0;
MergeTreePartInfo part_info;
MergeTreePartition partition;
std::string part_name;
scope_guard tmp_dir_guard;
StorageMergeTree::MutableDataPartPtr data_part;
std::string getDirName() const { return StorageMergeTree::EMPTY_PART_TMP_PREFIX + part_name; }
};
using PartitionInfos = std::unordered_map<String, PartitionInfo>;
using FutureNewEmptyParts = std::vector<FutureNewEmptyPart>;
PartitionInfos collectPartitionInfos(const DataPartsVector & parts)
FutureNewEmptyParts initNewEmptyPartsForPartitions(const DataPartsVector & old_parts)
{
PartitionInfos partition_infos;
for (const auto & part: parts)
using FutureNewEmptyPartsSet = std::unordered_map<String, FutureNewEmptyPart>;
FutureNewEmptyPartsSet new_parts;
for (const auto & old_part: old_parts)
{
const MergeTreePartInfo & part_info = part->info;
const MergeTreePartInfo & part_info = old_part->info;
auto partition_id = part_info.partition_id;
if (partition_infos.contains(partition_id))
if (new_parts.contains(partition_id))
{
auto & partition_info = partition_infos[partition_id];
partition_info.min_block = std::min(partition_info.min_block, part_info.min_block);
partition_info.max_block = std::max(partition_info.max_block, part_info.max_block);
partition_info.max_level = std::max(partition_info.max_level, part_info.level);
partition_info.max_mutation = std::max(partition_info.max_mutation, part_info.mutation);
auto & new_part = new_parts[partition_id];
new_part.part_info.min_block = std::min(new_part.part_info.min_block, part_info.min_block);
new_part.part_info.max_block = std::max(new_part.part_info.max_block, part_info.max_block);
new_part.part_info.level = std::max(new_part.part_info.level, part_info.level + 1);
new_part.part_info.mutation = std::max(new_part.part_info.mutation, part_info.mutation);
}
else
partition_infos[partition_id] = {part->partition, part_info.min_block, part_info.max_block, part_info.level, part_info.mutation};
{
auto & new_part = new_parts[partition_id];
new_part.part_info = MergeTreePartInfo(partition_id, part_info.min_block, part_info.max_block, part_info.level + 1, part_info.mutation);
new_part.partition = old_part->partition;
}
}
return partition_infos;
FutureNewEmptyParts result;
result.reserve(new_parts.size());
for (auto & item : new_parts)
result.push_back(std::move(item.second));
for (auto & new_part : result)
new_part.part_name = new_part.part_info.getPartName();
return result;
}
MergeTreeData::MutableDataPartsVector StorageMergeTree::makeCoveringEmptyTmpParts(const DataPartsVector & parts, const MergeTreeTransactionPtr & txn)
Strings getPartsNames(const DataPartsVector & parts)
{
MutableDataPartsVector new_parts;
Strings part_names;
for (const auto & p : parts)
part_names.push_back(p->getNameWithState());
return part_names;
}
Strings getPartsNames(const FutureNewEmptyParts & parts)
{
Strings part_names;
for (const auto & p : parts)
part_names.push_back(p.part_name);
return part_names;
}
FutureNewEmptyParts initCoverageWithNewEmptyParts(const DataPartsVector & parts)
{
FutureNewEmptyParts new_parts;
/// Parts could have gaps in continuous block numeration of parts due to concurrent insertions.
/// This gaps must not be covered with empty parts otherwise parts from concurrent insertions miss.
/// Here the parts are split on ranges without block gaps.
RangesWithContinuousBlocks continuous_ranges = groupByRangesWithContinuousBlocks(parts);
for (auto& range: continuous_ranges)
for (const auto& range: continuous_ranges)
{
/// Inside each range new empty part could cover parts from min_block to max_block with respect of partitions
auto partitions = collectPartitionInfos(range);
for (const auto & it: partitions)
{
const auto & partition_id = it.first;
const auto & info = it.second;
auto part_info = MergeTreePartInfo(partition_id, info.min_block, info.max_block, info.max_level+1, info.max_mutation);
auto data_part = createEmptyPart(part_info, info.partition, part_info.getPartName(), txn);
new_parts.push_back(data_part);
}
auto grouped_new_parts = initNewEmptyPartsForPartitions(range);
std::move(grouped_new_parts.begin(), grouped_new_parts.end(), std::back_inserter(new_parts));
}
auto get_part_names = [] (auto & parts_) -> Strings
{
Strings part_names;
for (const auto & p : parts_)
part_names.push_back(p->getNameWithState());
return part_names;
};
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}",
new_parts.size(), parts.size(),
fmt::join(get_part_names(new_parts), ", "), fmt::join(get_part_names(parts), ", "));
return new_parts;
}
void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector & old_parts, const MutableDataPartsVector & new_parts, Transaction & transaction)
StorageMergeTree::MutableDataPartsVector createNewEmptyDataParts(MergeTreeData & data, FutureNewEmptyParts & new_parts, const MergeTreeTransactionPtr & txn)
{
StorageMergeTree::MutableDataPartsVector data_parts;
for (auto & new_part: new_parts)
data_parts.push_back(data.createEmptyPart(new_part.part_info, new_part.partition, new_part.part_name, txn));
return data_parts;
}
void captureTmpDirName(MergeTreeData & data, FutureNewEmptyParts & new_parts)
{
for (auto & new_part : new_parts)
{
auto maybe_guard = data.tryGetTemporaryPartDirectoryHolder(new_part.getDirName());
if (!maybe_guard)
throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Temporary part {} already added", new_part.getDirName());
new_part.tmp_dir_guard = std::move(*maybe_guard);
}
}
StorageMergeTree::MutableDataPartsVector getDataParts(FutureNewEmptyParts & new_parts)
{
StorageMergeTree::MutableDataPartsVector result;
result.reserve(new_parts.size());
for (auto & part : new_parts)
result.push_back(part.data_part);
return result;
}
void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector & old_parts, MutableDataPartsVector & new_parts, Transaction & transaction)
{
auto part_lock = lockParts();
@ -1544,14 +1586,14 @@ void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector & old_part
changed_parts.push_back(part);
if (!changed_parts.empty())
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED,
throw Exception(ErrorCodes::SERIALIZATION_ERROR,
"Race with concurrent query that modifies parts. {} parts have changed the status, first is {}. Try again later.",
changed_parts.size(), changed_parts.front()->getNameWithState());
}
DataPartsVector covered_parts;
for (auto part: new_parts)
for (auto & part: new_parts)
{
bool no_covering_parts = renameTempPartAndReplaceUnlocked(part, transaction, part_lock, &covered_parts);
bool has_covering_parts = !no_covering_parts;
@ -1569,9 +1611,13 @@ void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector & old_part
LOG_INFO(log, "Remove {} parts by covering them with empty {} parts.", covered_parts.size(), new_parts.size());
// Do commit at the same locked scope where new parts was checked
/// Do commit at the same locked scope where new parts was checked
transaction.commit(&part_lock);
/// Remove covered parts without waiting for old_parts_lifetime seconds.
for (auto & covered_part: covered_parts)
covered_part->remove_time.store(0, std::memory_order_relaxed);
if (deduplication_log)
for (const auto & part: covered_parts)
deduplication_log->dropPart(part->info);
@ -1588,13 +1634,19 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
{
DataPartsVector parts = getVisibleDataPartsVector(query_context);
auto parts = getVisibleDataPartsVector(query_context);
MutableDataPartsVector new_parts = makeCoveringEmptyTmpParts(parts, txn);
auto new_parts = initCoverageWithNewEmptyParts(parts);
coverPartsWithEmptyParts(parts, new_parts, transaction);
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}",
new_parts.size(), parts.size(),
fmt::join(getPartsNames(new_parts), ", "), fmt::join(getPartsNames(parts), ", "));
PartLog::addNewParts(query_context, new_parts, watch.elapsed());
captureTmpDirName(*this, new_parts);
auto new_data_parts = createNewEmptyDataParts(*this, new_parts, txn);
coverPartsWithEmptyParts(parts, new_data_parts, transaction);
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
LOG_INFO(log, "Truncated table with {} parts by replacing them with new empty {} parts.",
parts.size(), new_parts.size());
@ -1603,6 +1655,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
/// Old parts are needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPtr query_context)
@ -1630,21 +1683,27 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
}
{
MutableDataPartsVector new_parts = makeCoveringEmptyTmpParts({part}, txn);
auto new_parts = initCoverageWithNewEmptyParts({part});
coverPartsWithEmptyParts({part}, new_parts, transaction);
LOG_TEST(log, "Made {} empty parts in order to cover {} part.",
fmt::join(getPartsNames(new_parts), ", "), fmt::join(getPartsNames({part}), ", "));
PartLog::addNewParts(query_context, new_parts, watch.elapsed());
captureTmpDirName(*this, new_parts);
auto new_data_parts = createNewEmptyDataParts(*this, new_parts, txn);
coverPartsWithEmptyParts({part}, new_data_parts, transaction);
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
const auto * op = detach ? "Detached" : "Dropped";
LOG_INFO(log, "{} {} part by replacing it with new empty {} part.",
op, part->name, new_parts[0]->name);
op, part->name, new_parts[0].part_name);
}
}
/// Old part objects is needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context)
@ -1681,11 +1740,17 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
part->makeCloneInDetached("", metadata_snapshot);
}
MutableDataPartsVector new_parts = makeCoveringEmptyTmpParts(parts, txn);
auto new_parts = initCoverageWithNewEmptyParts(parts);
coverPartsWithEmptyParts(parts, new_parts, transaction);
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}",
new_parts.size(), parts.size(),
fmt::join(getPartsNames(new_parts), ", "), fmt::join(getPartsNames(parts), ", "));
PartLog::addNewParts(query_context, new_parts, watch.elapsed());
captureTmpDirName(*this, new_parts);
auto new_data_parts = createNewEmptyDataParts(*this, new_parts, txn);
coverPartsWithEmptyParts(parts, new_data_parts, transaction);
PartLog::addNewParts(query_context, new_data_parts, watch.elapsed());
const auto * op = detach ? "Detached" : "Dropped";
LOG_INFO(log, "{} partition with {} parts by replacing them with new empty {} parts",
@ -1695,6 +1760,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
/// Old parts are needed to be destroyed before clearing them from filesystem.
clearOldMutations(true);
clearOldPartsFromFilesystem();
clearEmptyParts();
}
PartitionCommandsResultInfo StorageMergeTree::attachPartition(

View File

@ -170,7 +170,8 @@ private:
bool optimize_skip_merged_partitions = false);
MutableDataPartsVector makeCoveringEmptyTmpParts(const DataPartsVector & parts, const MergeTreeTransactionPtr & txn);
void coverPartsWithEmptyParts(const DataPartsVector & old_parts, const MutableDataPartsVector & new_parts, Transaction & transaction);
void coverPartsWithEmptyParts(const DataPartsVector & old_parts, MutableDataPartsVector & new_parts, Transaction & transaction);
/// Make part state outdated and queue it to remove without timeout
/// If force, then stop merges and block them until part state became outdated. Throw exception if part doesn't exists

View File

@ -1,5 +1,5 @@
import time
import logging
from helpers.test_tools import assert_eq_with_retry
def _parse_table_database(table, database):
@ -12,18 +12,13 @@ def _parse_table_database(table, database):
return table, "default"
def wait_for_delete_inactive_parts(node, table, database=None, timeout=60):
def wait_for_delete_inactive_parts(node, table, database=None):
table, database = _parse_table_database(table, database)
inactive_parts_query = (
f"SELECT count() FROM system.parts "
f"WHERE not active AND table = '{table}' AND database = '{database}';"
)
while timeout > 0:
if 0 == int(node.query(inactive_parts_query)):
break
timeout -= 1
time.sleep(1)
assert 0 == int(node.query(inactive_parts_query))
assert_eq_with_retry(node, inactive_parts_query, "0\n")
def wait_for_delete_empty_parts(node, table, database=None, timeout=60):
@ -32,9 +27,5 @@ def wait_for_delete_empty_parts(node, table, database=None, timeout=60):
f"SELECT count() FROM system.parts "
f"WHERE active AND rows = 0 AND table = '{table}' AND database = '{database}'"
)
while timeout > 0:
if 0 == int(node.query(empty_parts_query)):
break
timeout -= 1
time.sleep(1)
assert 0 == int(node.query(empty_parts_query))
assert_eq_with_retry(node, empty_parts_query, "0\n")

View File

@ -32,8 +32,7 @@ def test_numbers_of_detached_parts(started_cluster):
)
Engine=MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY id
SETTINGS old_parts_lifetime=1;
ORDER BY id;
"""
node1.query(query_create)
@ -71,7 +70,6 @@ def test_numbers_of_detached_parts(started_cluster):
# detach some parts and wait until asynchronous metrics notice it
node1.query("ALTER TABLE t DETACH PARTITION '20220901';")
wait_for_delete_inactive_parts(node1, "t")
wait_for_delete_empty_parts(node1, "t")
assert 2 == int(node1.query(query_count_detached_parts))
@ -86,7 +84,6 @@ def test_numbers_of_detached_parts(started_cluster):
# detach the rest parts and wait until asynchronous metrics notice it
node1.query("ALTER TABLE t DETACH PARTITION ALL")
wait_for_delete_inactive_parts(node1, "t")
wait_for_delete_empty_parts(node1, "t")
assert 3 == int(node1.query(query_count_detached_parts))

View File

@ -24,10 +24,10 @@ def started_cluster():
def test_empty_parts_alter_delete(started_cluster):
node1.query(
"CREATE TABLE empty_parts_delete (d Date, key UInt64, value String) \
ENGINE = ReplicatedMergeTree('/clickhouse/tables/empty_parts_delete', 'r1') \
PARTITION BY toYYYYMM(d) ORDER BY key \
SETTINGS old_parts_lifetime = 1"
"CREATE TABLE empty_parts_delete (d Date, key UInt64, value String) "
"ENGINE = ReplicatedMergeTree('/clickhouse/tables/empty_parts_delete', 'r1') "
"PARTITION BY toYYYYMM(d) ORDER BY key "
"SETTINGS old_parts_lifetime = 1"
)
node1.query("INSERT INTO empty_parts_delete VALUES (toDate('2020-10-10'), 1, 'a')")
@ -45,10 +45,10 @@ def test_empty_parts_alter_delete(started_cluster):
def test_empty_parts_summing(started_cluster):
node1.query(
"CREATE TABLE empty_parts_summing (d Date, key UInt64, value Int64) \
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/empty_parts_summing', 'r1') \
PARTITION BY toYYYYMM(d) ORDER BY key \
SETTINGS old_parts_lifetime = 1"
"CREATE TABLE empty_parts_summing (d Date, key UInt64, value Int64) "
"ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/empty_parts_summing', 'r1') "
"PARTITION BY toYYYYMM(d) ORDER BY key "
"SETTINGS old_parts_lifetime = 1"
)
node1.query("INSERT INTO empty_parts_summing VALUES (toDate('2020-10-10'), 1, 1)")

View File

@ -6,7 +6,6 @@ import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
@ -105,8 +104,8 @@ def create_table(
ORDER BY (dt, id)
SETTINGS
storage_policy='s3',
old_parts_lifetime=1,
index_granularity=512
index_granularity=512,
old_parts_lifetime=1
""".format(
create="ATTACH" if attach else "CREATE",
table_name=table_name,
@ -595,7 +594,6 @@ def test_restore_to_detached(cluster, replicated, db_atomic):
# Detach some partition.
node.query("ALTER TABLE s3.test DETACH PARTITION '2020-01-07'")
wait_for_delete_inactive_parts(node, "s3.test")
wait_for_delete_empty_parts(node, "s3.test")
node.query("ALTER TABLE s3.test FREEZE")

View File

@ -1244,10 +1244,17 @@ def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
def alter_drop(num):
for i in range(num):
partition = random.choice([201903, 201904])
drach = random.choice(["drop", "detach"])
node1.query(
"ALTER TABLE {} {} PARTITION {}".format(name, drach, partition)
)
op = random.choice(["drop", "detach"])
try:
node1.query(
"ALTER TABLE {} {} PARTITION {}".format(name, op, partition)
)
except QueryRuntimeException as e:
if "Code: 650" in e.stderr:
pass
else:
raise e
insert(100)
p = Pool(15)

View File

@ -185,8 +185,7 @@ def attach_check_all_parts_table(started_cluster):
q(
"CREATE TABLE test.attach_partition (n UInt64) "
"ENGINE = MergeTree() "
"PARTITION BY intDiv(n, 8) ORDER BY n "
"SETTINGS old_parts_lifetime = 1"
"PARTITION BY intDiv(n, 8) ORDER BY n"
)
q(
"INSERT INTO test.attach_partition SELECT number FROM system.numbers WHERE number % 2 = 0 LIMIT 8"
@ -234,7 +233,8 @@ def test_attach_check_all_parts(attach_check_all_parts_table):
)
parts = q(
"SElECT name FROM system.parts WHERE table='attach_partition' AND database='test' ORDER BY name"
"SElECT name FROM system.parts "
"WHERE table='attach_partition' AND database='test' AND active ORDER BY name"
)
assert TSV(parts) == TSV("1_2_2_0\n1_4_4_0")
detached = q(
@ -459,15 +459,13 @@ def test_system_detached_parts(drop_detached_parts_table):
def test_detached_part_dir_exists(started_cluster):
q(
"create table detached_part_dir_exists (n int) "
"engine=MergeTree order by n "
"settings old_parts_lifetime = 1"
"engine=MergeTree order by n"
)
q("insert into detached_part_dir_exists select 1") # will create all_1_1_0
q(
"alter table detached_part_dir_exists detach partition id 'all'"
) # will move all_1_1_0 to detached/all_1_1_0 and create all_1_1_1
wait_for_delete_inactive_parts(instance, "detached_part_dir_exists")
wait_for_delete_empty_parts(instance, "detached_part_dir_exists")
q("detach table detached_part_dir_exists")

View File

@ -2,37 +2,59 @@
1 3 bar
2 4 aa
2 5 bb
3 6 qq
3 7 ww
==================
2 6 cc
3 7 qq
3 8 ww
3 9 ee
3 10 rr
1_1_1_0 InMemory 2
2_2_2_0 InMemory 3
3_3_3_0 InMemory 4
^ init ==================
2 4 aa
2 5 bb
3 6 qq
3 7 ww
==================
3 6 qq
3 7 ww
==================
2 6 cc
3 7 qq
3 8 ww
3 9 ee
3 10 rr
2_2_2_0 InMemory 3
3_3_3_0 InMemory 4
^ drop 1 ==================
3 7 qq
3 8 ww
3 9 ee
3 10 rr
3_3_3_0 InMemory 4
^ detach 2 ==================
2 4 aa
2 5 bb
3 6 qq
3 7 ww
1_1_1_1 InMemory
2_2_2_1 InMemory
2_4_4_0 Compact
3_3_3_0 InMemory
==================
2 6 cc
3 7 qq
3 8 ww
3 9 ee
3 10 rr
2_4_4_0 Compact 3
3_3_3_0 InMemory 4
^ attach 2 =================
2 4 aa
2 5 bb
3 6 qq
3 7 ww
==================
2 6 cc
3 7 qq
3 8 ww
3 9 ee
3 10 rr
2_4_4_0 Compact 3
3_3_3_0 InMemory 4
^ detach attach ==================
2 4 aa
2 5 bb
3 6 cc
3 7 dd
t2 2_4_4_0 Compact
t2 3_6_6_0 Compact
t3 3_1_1_0 InMemory
==================
3_1_1_0 InMemory 1
2 6 cc
3 11 tt
3 12 yy
t2 2_4_4_0 Compact 3
t2 3_6_6_0 Compact 2
t3 3_1_1_0 InMemory 2
^ replace ==================
3_1_1_0 InMemory 1 2
^ freeze ==================

View File

@ -9,30 +9,34 @@ CREATE TABLE t2(id UInt32, a UInt64, s String)
SYSTEM STOP MERGES t2;
INSERT INTO t2 VALUES (1, 2, 'foo'), (1, 3, 'bar');
INSERT INTO t2 VALUES (2, 4, 'aa'), (2, 5, 'bb');
INSERT INTO t2 VALUES (3, 6, 'qq'), (3, 7, 'ww');
INSERT INTO t2 VALUES (2, 4, 'aa'), (2, 5, 'bb'), (2, 6, 'cc');
INSERT INTO t2 VALUES (3, 7, 'qq'), (3, 8, 'ww'), (3, 9, 'ee'), (3, 10, 'rr');
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
SELECT name, part_type, rows FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ init ==================';
ALTER TABLE t2 DROP PARTITION 1;
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
SELECT name, part_type, rows FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ drop 1 ==================';
ALTER TABLE t2 DETACH PARTITION 2;
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
SELECT name, part_type, rows FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ detach 2 ==================';
ALTER TABLE t2 ATTACH PARTITION 2;
SELECT * FROM t2 ORDER BY a;
SELECT name, part_type FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '==================';
SELECT name, part_type, rows FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ attach 2 =================';
DETACH TABLE t2;
ATTACH TABLE t2;
SELECT * FROM t2 ORDER BY a;
SELECT '==================';
SELECT name, part_type, rows FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ detach attach ==================';
DROP TABLE IF EXISTS t3;
@ -40,15 +44,16 @@ CREATE TABLE t3(id UInt32, a UInt64, s String)
ENGINE = MergeTree ORDER BY a PARTITION BY id
SETTINGS min_rows_for_compact_part = 1000, min_rows_for_wide_part = 2000;
INSERT INTO t3 VALUES (3, 6, 'cc'), (3, 7, 'dd');
INSERT INTO t3 VALUES (3, 11, 'tt'), (3, 12, 'yy');
ALTER TABLE t2 REPLACE PARTITION 3 FROM t3;
SELECT * FROM t2 ORDER BY a;
SELECT table, name, part_type FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT table, name, part_type FROM system.parts WHERE table = 't3' AND active AND database = currentDatabase() ORDER BY name;
SELECT '==================';
SELECT table, name, part_type, rows FROM system.parts WHERE table = 't2' AND active AND database = currentDatabase() ORDER BY name;
SELECT table, name, part_type, rows FROM system.parts WHERE table = 't3' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ replace ==================';
ALTER TABLE t3 FREEZE PARTITION 3;
SELECT name, part_type, is_frozen FROM system.parts WHERE table = 't3' AND active AND database = currentDatabase() ORDER BY name;
SELECT name, part_type, is_frozen, rows FROM system.parts WHERE table = 't3' AND active AND database = currentDatabase() ORDER BY name;
SELECT '^ freeze ==================';
DROP TABLE t2;
DROP TABLE t3;

View File

@ -8,16 +8,19 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
# shellcheck source=./transactions.lib
. "$CURDIR"/transactions.lib
# shellcheck source=./parts.lib
. "$CURDIR"/parts.lib
set -e
# https://github.com/ept/hermitage
$CLICKHOUSE_CLIENT -q "drop table if exists test"
$CLICKHOUSE_CLIENT -q "create table test (id int, value int) engine=MergeTree order by id"
$CLICKHOUSE_CLIENT -q "create table test (id int, value int) engine=MergeTree order by id SETTINGS old_parts_lifetime=1"
function reset_table()
{
$CLICKHOUSE_CLIENT -q "truncate table test;"
wait_for_delete_empty_parts "test"
$CLICKHOUSE_CLIENT -q "insert into test (id, value) values (1, 10);"
$CLICKHOUSE_CLIENT -q "insert into test (id, value) values (2, 20);"
}
@ -109,6 +112,7 @@ tx_wait 12
tx_wait 13
$CLICKHOUSE_CLIENT -q "select 16, * from test order by id"
# PMP write
reset_table
tx 14 "begin transaction"

View File

@ -14,7 +14,7 @@ tx5 4 6 all_7_7_0_8
tx5 5 2 all_1_1_0_8
tx5 5 5 all_10_10_0
tx5 5 6 all_7_7_0_8
PART_IS_TEMPORARILY_LOCKED
SERIALIZATION_ERROR
tx6 6 2 all_1_1_0_11
tx6 6 6 all_7_7_0_11
tx7 7 20 all_1_1_0_13

View File

@ -45,7 +45,7 @@ tx 6 "begin transaction"
tx 6 "alter table mt delete where n%2=1"
tx 6 "alter table mt drop part 'all_10_10_0_11'"
tx 5 "select 5, n, _part from mt order by n"
tx 5 "alter table mt drop partition id 'all'" | grep -Eo "PART_IS_TEMPORARILY_LOCKED" | uniq
tx 5 "alter table mt drop partition id 'all'" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 6 "select 6, n, _part from mt order by n"
tx 5 "rollback"
tx 6 "insert into mt values (8)"

View File

@ -32,7 +32,7 @@ function thread_insert()
# NOTE
# ALTER PARTITION query stops merges,
# but parts could be deleted (PART_IS_TEMPORARILY_LOCKED) if some merge was assigned (and committed) between BEGIN and ALTER.
# but parts could be deleted (SERIALIZATION_ERROR) if some merge was assigned (and committed) between BEGIN and ALTER.
function thread_partition_src_to_dst()
{
set -e
@ -49,14 +49,14 @@ function thread_partition_src_to_dst()
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=3) != ($count + 1, $sum + $i)) FORMAT Null;
COMMIT;" 2>&1) ||:
echo "$out" | grep -Fv "PART_IS_TEMPORARILY_LOCKED" | grep -F "Received from " && $CLICKHOUSE_CLIENT --multiquery --query "
echo "$out" | grep -Fv "SERIALIZATION_ERROR" | grep -F "Received from " && $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select $i, 'src', type, n, _part from src order by type, n;
select $i, 'dst', type, n, _part from dst order by type, n;
rollback" ||:
echo "$out" | grep -Fa "PART_IS_TEMPORARILY_LOCKED" >/dev/null || count=$((count+1))
echo "$out" | grep -Fa "PART_IS_TEMPORARILY_LOCKED" >/dev/null || sum=$((sum+i))
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || count=$((count+1))
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || sum=$((sum+i))
done
}

View File

@ -15,7 +15,7 @@ tx3 8 3
tx3 8 5
tx3 8 7
tx3 8 9
PART_IS_TEMPORARILY_LOCKED
SERIALIZATION_ERROR
INVALID_TRANSACTION
tx4 9 8

View File

@ -42,7 +42,7 @@ tx 3 "insert into mt values (9)"
tx 4 "insert into mt values (8)"
tx 4 "select 7, n from mt order by n"
tx 3 "select 8, n from mt order by n"
tx 3 "alter table mt drop partition id 'all'" | grep -Eo "PART_IS_TEMPORARILY_LOCKED" | uniq
tx 3 "alter table mt drop partition id 'all'" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 3 "insert into mt values (10)" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 4 "select 9, n from mt order by n"
tx 3 "rollback"

View File

@ -11,10 +11,8 @@ all_2_2_0
0
2
all_1_1_0
all_2_2_1
all_3_3_0
all_4_4_1
-- resume merges --
0
2
all_1_4_2
all_1_3_1

View File

@ -9,6 +9,6 @@ all_2_2_0 1
1 Active
2 Outdated
# truncate
1 Active
3 Outdated
HAVE PARTS Active
HAVE PARTS Outdated
# drop

View File

@ -31,9 +31,11 @@ OPTIMIZE TABLE data_01660 FINAL;
SELECT count(), _state FROM system.parts WHERE database = currentDatabase() AND table = 'data_01660' GROUP BY _state ORDER BY _state;
-- TRUNCATE does not remove parts instantly
-- Empty active parts are clearing by async process
-- Inactive parts are clearing by async process also
SELECT '# truncate';
TRUNCATE data_01660;
SELECT count(), _state FROM system.parts WHERE database = currentDatabase() AND table = 'data_01660' GROUP BY _state;
SELECT if (count() > 0, 'HAVE PARTS', 'NO PARTS'), _state FROM system.parts WHERE database = currentDatabase() AND table = 'data_01660' GROUP BY _state;
-- But DROP does
SELECT '# drop';

View File

@ -15,7 +15,6 @@ ${CLICKHOUSE_CLIENT} -q '
)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime = 1
'
${CLICKHOUSE_CLIENT} -q 'INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000)'

View File

@ -17,7 +17,7 @@ ${CLICKHOUSE_CLIENT} -q '
ENGINE = MergeTree()
ORDER BY tuple()
TTL event_time + INTERVAL 3 MONTH
SETTINGS min_bytes_for_wide_part = 0, old_parts_lifetime = 1, materialize_ttl_recalculate_only = true, max_number_of_merges_with_ttl_in_pool = 100
SETTINGS old_parts_lifetime = 1, min_bytes_for_wide_part = 0, materialize_ttl_recalculate_only = true, max_number_of_merges_with_ttl_in_pool = 100
'
${CLICKHOUSE_CLIENT} -q "INSERT INTO t_part_log_has_merge_type_table VALUES (now(), 1, 'username1');"

View File

@ -4,7 +4,7 @@ all_1_4_1 0
all_5_5_0 1
all_6_6_1 0
concurrent_drop_part_before
PART_IS_TEMPORARILY_LOCKED
SERIALIZATION_ERROR
INVALID_TRANSACTION
1
3
@ -15,10 +15,14 @@ concurrent_drop_part_after
NO_SUCH_DATA_PART
INVALID_TRANSACTION
all_1_3_1 0
NewPart all_1_1_0
NewPart all_1_3_1
NewPart all_2_2_0
NewPart all_3_3_0
concurrent_delete
tx41 41 3
tx41 41 3
PART_IS_TEMPORARILY_LOCKED
SERIALIZATION_ERROR
tx42 42 1
2
4

View File

@ -53,7 +53,7 @@ function concurrent_drop_part_before()
tx 11 "begin transaction"
tx 22 "begin transaction"
tx 22 "alter table tt drop part 'all_2_2_0'"
tx 11 "truncate table tt" | grep -Eo "PART_IS_TEMPORARILY_LOCKED" | uniq
tx 11 "truncate table tt" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 11 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 22 "commit"
@ -82,6 +82,7 @@ function concurrent_drop_part_after()
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
where table='drop_part_after_table' and database=currentDatabase() and active
order by name"
$CLICKHOUSE_CLIENT -q "system flush logs"
$CLICKHOUSE_CLIENT -q "select event_type, part_name from system.part_log
where table='drop_part_after_table' and database=currentDatabase()
order by part_name"
@ -100,7 +101,7 @@ function concurrent_delete()
tx 42 "begin transaction"
tx 42 "alter table tt delete where n%2=1"
tx 41 "select 41, count() from tt"
tx 41 "truncate table tt" | grep -Eo "PART_IS_TEMPORARILY_LOCKED" | uniq
tx 41 "truncate table tt" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 42 "select 42, count() from tt"
tx 41 "rollback"
tx 42 "insert into tt values (4)"

View File

@ -0,0 +1,4 @@
=== ataptive granularity: table one -; table two + ===
=== ataptive granularity: table one -; table two - ===
=== ataptive granularity: table one +; table two + ===
=== ataptive granularity: table one +; table two - ===

View File

@ -0,0 +1,81 @@
SELECT '=== ataptive granularity: table one -; table two + ===';
DROP TABLE IF EXISTS table_one;
CREATE TABLE table_one (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 0, min_bytes_for_wide_part = 100;
DROP TABLE IF EXISTS table_two;
CREATE TABLE table_two (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 1024, min_bytes_for_wide_part = 100;
INSERT INTO table_one SELECT intDiv(number, 10), number FROM numbers(100);
ALTER TABLE table_two REPLACE PARTITION 0 FROM table_one;
SELECT '=== ataptive granularity: table one -; table two - ===';
DROP TABLE IF EXISTS table_one;
CREATE TABLE table_one (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 0, min_bytes_for_wide_part = 100;
DROP TABLE IF EXISTS table_two;
CREATE TABLE table_two (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 0, min_bytes_for_wide_part = 100;
INSERT INTO table_one SELECT intDiv(number, 10), number FROM numbers(100);
ALTER TABLE table_two REPLACE PARTITION 0 FROM table_one;
SELECT '=== ataptive granularity: table one +; table two + ===';
DROP TABLE IF EXISTS table_one;
CREATE TABLE table_one (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 1024, min_bytes_for_wide_part = 100;
DROP TABLE IF EXISTS table_two;
CREATE TABLE table_two (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 1024, min_bytes_for_wide_part = 100;
INSERT INTO table_one SELECT intDiv(number, 10), number FROM numbers(100);
ALTER TABLE table_two REPLACE PARTITION 0 FROM table_one;
SELECT '=== ataptive granularity: table one +; table two - ===';
DROP TABLE IF EXISTS table_one;
CREATE TABLE table_one (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 1024, min_bytes_for_wide_part = 100;
DROP TABLE IF EXISTS table_two;
CREATE TABLE table_two (id UInt64, value UInt64)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS index_granularity = 8192, index_granularity_bytes = 0, min_bytes_for_wide_part = 100;
INSERT INTO table_one SELECT intDiv(number, 10), number FROM numbers(100);
ALTER TABLE table_two REPLACE PARTITION 0 FROM table_one; -- { serverError 36 }

View File

@ -0,0 +1,15 @@
init state
30
0_1_1_0 InMemory 10
1_2_2_0 InMemory 10
2_3_3_0 InMemory 10
drop part 0
20
0_1_1_1 InMemory 0
1_2_2_0 InMemory 10
2_3_3_0 InMemory 10
detach table
attach table
20
1_2_2_0 InMemory 10
2_3_3_0 InMemory 10

View File

@ -0,0 +1,40 @@
DROP TABLE IF EXISTS tt;
CREATE TABLE tt
(
`id` UInt64,
`value` UInt64
)
ENGINE = MergeTree
PARTITION BY id
ORDER BY value
SETTINGS min_bytes_for_wide_part=1000, min_bytes_for_compact_part=900;
SELECT 'init state';
INSERT INTO tt SELECT intDiv(number, 10), number FROM numbers(30);
SELECT count() FROM tt;
SELECT name, part_type, rows active from system.parts
WHERE table='tt' AND database=currentDatabase();
SELECT 'drop part 0';
ALTER TABLE tt DROP PARTITION 0;
SELECT count() FROM tt;
SELECT name, part_type, rows active from system.parts
WHERE table='tt' AND database=currentDatabase();
SELECT 'detach table';
DETACH TABLE tt;
SELECT name, part_type, rows active from system.parts
WHERE table='tt' AND database=currentDatabase();
SELECT 'attach table';
ATTACH TABLE tt;
SELECT count() FROM tt;
SELECT name, part_type, rows active from system.parts
WHERE table='tt' AND database=currentDatabase();

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
function wait_for_delete_empty_parts()
{
local table=$1
local database=${2:-$CLICKHOUSE_DATABASE}
local timeout=${3:-20}
while [[ timeout -gt 0 ]]
do
res=$(${CLICKHOUSE_CLIENT} --query="SELECT count() FROM system.parts WHERE database='$database' AND table='$table' AND active AND rows=0")
[[ $res -eq 0 ]] && return 0
sleep 1
timeout=$((timeout - 1))
done
echo "Timed out while waiting for delete empty parts!" >&2
return 2
}

View File

@ -32,7 +32,12 @@ ALTER TABLE test.hits ATTACH PARTITION 201403;
DROP TABLE IF EXISTS hits_copy;
CREATE TABLE hits_copy (`WatchID` UInt64, `JavaEnable` UInt8, `Title` String, `GoodEvent` Int16, `EventTime` DateTime, `EventDate` Date, `CounterID` UInt32, `ClientIP` UInt32, `ClientIP6` FixedString(16), `RegionID` UInt32, `UserID` UInt64, `CounterClass` Int8, `OS` UInt8, `UserAgent` UInt8, `URL` String, `Referer` String, `URLDomain` String, `RefererDomain` String, `Refresh` UInt8, `IsRobot` UInt8, `RefererCategories` Array(UInt16), `URLCategories` Array(UInt16), `URLRegions` Array(UInt32), `RefererRegions` Array(UInt32), `ResolutionWidth` UInt16, `ResolutionHeight` UInt16, `ResolutionDepth` UInt8, `FlashMajor` UInt8, `FlashMinor` UInt8, `FlashMinor2` String, `NetMajor` UInt8, `NetMinor` UInt8, `UserAgentMajor` UInt16, `UserAgentMinor` FixedString(2), `CookieEnable` UInt8, `JavascriptEnable` UInt8, `IsMobile` UInt8, `MobilePhone` UInt8, `MobilePhoneModel` String, `Params` String, `IPNetworkID` UInt32, `TraficSourceID` Int8, `SearchEngineID` UInt16, `SearchPhrase` String, `AdvEngineID` UInt8, `IsArtifical` UInt8, `WindowClientWidth` UInt16, `WindowClientHeight` UInt16, `ClientTimeZone` Int16, `ClientEventTime` DateTime, `SilverlightVersion1` UInt8, `SilverlightVersion2` UInt8, `SilverlightVersion3` UInt32, `SilverlightVersion4` UInt16, `PageCharset` String, `CodeVersion` UInt32, `IsLink` UInt8, `IsDownload` UInt8, `IsNotBounce` UInt8, `FUniqID` UInt64, `HID` UInt32, `IsOldCounter` UInt8, `IsEvent` UInt8, `IsParameter` UInt8, `DontCountHits` UInt8, `WithHash` UInt8, `HitColor` FixedString(1), `UTCEventTime` DateTime, `Age` UInt8, `Sex` UInt8, `Income` UInt8, `Interests` UInt16, `Robotness` UInt8, `GeneralInterests` Array(UInt16), `RemoteIP` UInt32, `RemoteIP6` FixedString(16), `WindowName` Int32, `OpenerName` Int32, `HistoryLength` Int16, `BrowserLanguage` FixedString(2), `BrowserCountry` FixedString(2), `SocialNetwork` String, `SocialAction` String, `HTTPError` UInt16, `SendTiming` Int32, `DNSTiming` Int32, `ConnectTiming` Int32, `ResponseStartTiming` Int32, `ResponseEndTiming` Int32, `FetchTiming` Int32, `RedirectTiming` Int32, `DOMInteractiveTiming` Int32, `DOMContentLoadedTiming` Int32, `DOMCompleteTiming` Int32, `LoadEventStartTiming` Int32, `LoadEventEndTiming` Int32, `NSToDOMContentLoadedTiming` Int32, `FirstPaintTiming` Int32, `RedirectCount` Int8, `SocialSourceNetworkID` UInt8, `SocialSourcePage` String, `ParamPrice` Int64, `ParamOrderID` String, `ParamCurrency` FixedString(3), `ParamCurrencyID` UInt16, `GoalsReached` Array(UInt32), `OpenstatServiceName` String, `OpenstatCampaignID` String, `OpenstatAdID` String, `OpenstatSourceID` String, `UTMSource` String, `UTMMedium` String, `UTMCampaign` String, `UTMContent` String, `UTMTerm` String, `FromTag` String, `HasGCLID` UInt8, `RefererHash` UInt64, `URLHash` UInt64, `CLID` UInt32, `YCLID` UInt64, `ShareService` String, `ShareURL` String, `ShareTitle` String, `ParsedParams.Key1` Array(String), `ParsedParams.Key2` Array(String), `ParsedParams.Key3` Array(String), `ParsedParams.Key4` Array(String), `ParsedParams.Key5` Array(String), `ParsedParams.ValueDouble` Array(Float64), `IslandID` FixedString(16), `RequestNum` UInt32, `RequestTry` UInt8) ENGINE = MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity=8192, index_granularity_bytes=0, min_bytes_for_wide_part = 0;
CREATE TABLE hits_copy (`WatchID` UInt64, `JavaEnable` UInt8, `Title` String, `GoodEvent` Int16, `EventTime` DateTime, `EventDate` Date, `CounterID` UInt32, `ClientIP` UInt32, `ClientIP6` FixedString(16), `RegionID` UInt32, `UserID` UInt64, `CounterClass` Int8, `OS` UInt8, `UserAgent` UInt8, `URL` String, `Referer` String, `URLDomain` String, `RefererDomain` String, `Refresh` UInt8, `IsRobot` UInt8, `RefererCategories` Array(UInt16), `URLCategories` Array(UInt16), `URLRegions` Array(UInt32), `RefererRegions` Array(UInt32), `ResolutionWidth` UInt16, `ResolutionHeight` UInt16, `ResolutionDepth` UInt8, `FlashMajor` UInt8, `FlashMinor` UInt8, `FlashMinor2` String, `NetMajor` UInt8, `NetMinor` UInt8, `UserAgentMajor` UInt16, `UserAgentMinor` FixedString(2), `CookieEnable` UInt8, `JavascriptEnable` UInt8, `IsMobile` UInt8, `MobilePhone` UInt8, `MobilePhoneModel` String, `Params` String, `IPNetworkID` UInt32, `TraficSourceID` Int8, `SearchEngineID` UInt16, `SearchPhrase` String, `AdvEngineID` UInt8, `IsArtifical` UInt8, `WindowClientWidth` UInt16, `WindowClientHeight` UInt16, `ClientTimeZone` Int16, `ClientEventTime` DateTime, `SilverlightVersion1` UInt8, `SilverlightVersion2` UInt8, `SilverlightVersion3` UInt32, `SilverlightVersion4` UInt16, `PageCharset` String, `CodeVersion` UInt32, `IsLink` UInt8, `IsDownload` UInt8, `IsNotBounce` UInt8, `FUniqID` UInt64, `HID` UInt32, `IsOldCounter` UInt8, `IsEvent` UInt8, `IsParameter` UInt8, `DontCountHits` UInt8, `WithHash` UInt8, `HitColor` FixedString(1), `UTCEventTime` DateTime, `Age` UInt8, `Sex` UInt8, `Income` UInt8, `Interests` UInt16, `Robotness` UInt8, `GeneralInterests` Array(UInt16), `RemoteIP` UInt32, `RemoteIP6` FixedString(16), `WindowName` Int32, `OpenerName` Int32, `HistoryLength` Int16, `BrowserLanguage` FixedString(2), `BrowserCountry` FixedString(2), `SocialNetwork` String, `SocialAction` String, `HTTPError` UInt16, `SendTiming` Int32, `DNSTiming` Int32, `ConnectTiming` Int32, `ResponseStartTiming` Int32, `ResponseEndTiming` Int32, `FetchTiming` Int32, `RedirectTiming` Int32, `DOMInteractiveTiming` Int32, `DOMContentLoadedTiming` Int32, `DOMCompleteTiming` Int32, `LoadEventStartTiming` Int32, `LoadEventEndTiming` Int32, `NSToDOMContentLoadedTiming` Int32, `FirstPaintTiming` Int32, `RedirectCount` Int8, `SocialSourceNetworkID` UInt8, `SocialSourcePage` String, `ParamPrice` Int64, `ParamOrderID` String, `ParamCurrency` FixedString(3), `ParamCurrencyID` UInt16, `GoalsReached` Array(UInt32), `OpenstatServiceName` String, `OpenstatCampaignID` String, `OpenstatAdID` String, `OpenstatSourceID` String, `UTMSource` String, `UTMMedium` String, `UTMCampaign` String, `UTMContent` String, `UTMTerm` String, `FromTag` String, `HasGCLID` UInt8, `RefererHash` UInt64, `URLHash` UInt64, `CLID` UInt32, `YCLID` UInt64, `ShareService` String, `ShareURL` String, `ShareTitle` String, `ParsedParams.Key1` Array(String), `ParsedParams.Key2` Array(String), `ParsedParams.Key3` Array(String), `ParsedParams.Key4` Array(String), `ParsedParams.Key5` Array(String), `ParsedParams.ValueDouble` Array(Float64), `IslandID` FixedString(16), `RequestNum` UInt32, `RequestTry` UInt8)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS index_granularity=8192, min_bytes_for_wide_part = 0;
ALTER TABLE hits_copy REPLACE PARTITION 201403 FROM test.hits;