add test with mv

This commit is contained in:
Alexander Tokmakov 2021-06-02 23:03:44 +03:00
parent 02d966dcc9
commit 9a9e95172f
24 changed files with 173 additions and 24 deletions

View File

@ -247,6 +247,7 @@ function configure
cp -a "$FASTTEST_SOURCE/programs/server/"{config,users}.xml "$FASTTEST_DATA"
"$FASTTEST_SOURCE/tests/config/install.sh" "$FASTTEST_DATA" "$FASTTEST_DATA/client-config"
cp -a "$FASTTEST_SOURCE/programs/server/config.d/log_to_console.xml" "$FASTTEST_DATA/config.d"
cp -a "$FASTTEST_SOURCE/tests/config/config.d/transactions.xml" "$FASTTEST_DATA/config.d"
# doesn't support SSL
rm -f "$FASTTEST_DATA/config.d/secure_ports.xml"
}

View File

@ -91,7 +91,8 @@ void VersionMetadata::unlockMaxTID(const TransactionID & tid)
void VersionMetadata::setMinTID(const TransactionID & tid)
{
/// TODO Transactions: initialize it in constructor on part creation and remove this method
assert(!mintid);
/// FIXME ReplicatedMergeTreeBlockOutputStream may add one part multiple times
assert(!mintid || mintid == tid);
const_cast<TransactionID &>(mintid) = tid;
}

View File

@ -29,6 +29,8 @@ BlockIO InterpreterTransactionControlQuery::execute()
case ASTTransactionControl::ROLLBACK:
return executeRollback(session_context);
}
assert(false);
__builtin_unreachable();
}
BlockIO InterpreterTransactionControlQuery::executeBegin(ContextPtr session_context)

View File

@ -54,7 +54,7 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const DataPartPtr & new_pa
new_part->storage.getStorageID().getNameForLogs(),
new_part->name);
error_context += ", part_name: {}";
for (auto covered : covered_parts)
for (const auto & covered : covered_parts)
{
covered->versions.lockMaxTID(tid, fmt::format(error_context, covered->name));
if (txn)
@ -79,7 +79,7 @@ bool MergeTreeTransaction::isReadOnly() const
return creating_parts.empty() && removing_parts.empty();
}
void MergeTreeTransaction::beforeCommit()
void MergeTreeTransaction::beforeCommit() const
{
assert(csn == Tx::UnknownCSN);
}

View File

@ -42,7 +42,7 @@ public:
String dumpDescription() const;
private:
void beforeCommit();
void beforeCommit() const;
void afterCommit(CSN assigned_csn) noexcept;
void rollback() noexcept;

View File

@ -63,7 +63,7 @@ MergeTreeTransactionHolder::MergeTreeTransactionHolder(const MergeTreeTransactio
MergeTreeTransactionHolder & MergeTreeTransactionHolder::operator=(const MergeTreeTransactionHolder &)
{
txn = nullptr;
assert(txn == nullptr);
return *this;
}

View File

@ -1061,14 +1061,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
part->renameToDetached("");
for (auto it = data_parts_by_state_and_info.begin(); it != data_parts_by_state_and_info.end(); ++it)
for (const auto & part : data_parts_by_state_and_info)
{
/// We do not have version metadata and transactions history for old parts,
/// so let's consider that such parts were created by some ancient transaction
/// and were committed with some prehistoric CSN.
/// TODO Transactions: distinguish "prehistoric" parts from uncommitted parts in case of hard restart
(*it)->versions.setMinTID(Tx::PrehistoricTID);
(*it)->versions.mincsn.store(Tx::PrehistoricCSN, std::memory_order_relaxed);
part->versions.setMinTID(Tx::PrehistoricTID);
part->versions.mincsn.store(Tx::PrehistoricCSN, std::memory_order_relaxed);
}
/// Delete from the set of current parts those parts that are covered by another part (those parts that

View File

@ -387,6 +387,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
const String & partition_id,
bool final,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason,
bool optimize_skip_merged_partitions)
{
@ -417,7 +418,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
while (it != parts.end())
{
/// For the case of one part, we check that it can be merged "with itself".
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, nullptr, out_disable_reason))
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, txn.get(), out_disable_reason))
{
return SelectPartsDecision::CANNOT_SELECT;
}

View File

@ -111,6 +111,7 @@ public:
const String & partition_id,
bool final,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn,
String * out_disable_reason = nullptr,
bool optimize_skip_merged_partitions = false);

View File

@ -748,7 +748,7 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, disk_space, can_merge, partition_id, final, metadata_snapshot, out_disable_reason, optimize_skip_merged_partitions);
future_part, disk_space, can_merge, partition_id, final, metadata_snapshot, txn, out_disable_reason, optimize_skip_merged_partitions);
auto timeout_ms = getSettings()->lock_acquire_timeout_for_background_operations.totalMilliseconds();
auto timeout = std::chrono::milliseconds(timeout_ms);

View File

@ -4542,7 +4542,7 @@ bool StorageReplicatedMergeTree::optimize(
future_merged_part.uuid = UUIDHelpers::generateV4();
SelectPartsDecision select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, true, metadata_snapshot, nullptr, query_context->getSettingsRef().optimize_skip_merged_partitions);
future_merged_part, disk_space, can_merge, partition_id, true, metadata_snapshot, nullptr, nullptr, query_context->getSettingsRef().optimize_skip_merged_partitions);
if (select_decision != SelectPartsDecision::SELECTED)
break;
@ -4593,7 +4593,7 @@ bool StorageReplicatedMergeTree::optimize(
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
String partition_id = getPartitionIDFromQuery(partition, query_context);
select_decision = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, final, metadata_snapshot, &disable_reason, query_context->getSettingsRef().optimize_skip_merged_partitions);
future_merged_part, disk_space, can_merge, partition_id, final, metadata_snapshot, nullptr, &disable_reason, query_context->getSettingsRef().optimize_skip_merged_partitions);
}
/// If there is nothing to merge then we treat this merge as successful (needed for optimize final optimization)

View File

@ -78,6 +78,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"visible", std::make_shared<DataTypeUInt8>()},
{"mintid", TransactionID::getDataType()},
{"maxtid", TransactionID::getDataType()},
{"mincsn", std::make_shared<DataTypeUInt64>()},
@ -88,7 +89,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
}
void StorageSystemParts::processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
using State = IMergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;
@ -259,6 +260,15 @@ void StorageSystemParts::processNextStorage(
add_ttl_info_map(part->ttl_infos.group_by_ttl);
add_ttl_info_map(part->ttl_infos.rows_where_ttl);
if (columns_mask[src_index++])
{
auto txn = context->getCurrentTransaction();
if (txn)
columns[res_index++]->insert(part->versions.isVisible(*txn));
else
columns[res_index++]->insert(part_state == State::Committed);
}
auto get_tid_as_field = [](const TransactionID & tid) -> Field
{
return Tuple{tid.start_csn, tid.local_tid, tid.host_id};

View File

@ -21,7 +21,7 @@ public:
protected:
explicit StorageSystemParts(const StorageID & table_id_);
void processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
};
}

View File

@ -268,7 +268,7 @@ Pipe StorageSystemPartsBase::read(
while (StoragesInfo info = stream.next())
{
processNextStorage(res_columns, columns_mask, info, has_state_column);
processNextStorage(context, res_columns, columns_mask, info, has_state_column);
}
if (has_state_column)

View File

@ -76,7 +76,7 @@ protected:
StorageSystemPartsBase(const StorageID & table_id_, NamesAndTypesList && columns_);
virtual void
processNextStorage(MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) = 0;
processNextStorage(ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) = 0;
};
}

View File

@ -65,7 +65,7 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const StorageID & table_id_
}
void StorageSystemPartsColumns::processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
ContextPtr, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
/// Prepare information about columns in storage.
struct ColumnInfo

View File

@ -23,7 +23,7 @@ public:
protected:
StorageSystemPartsColumns(const StorageID & table_id_);
void processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
};
}

View File

@ -90,7 +90,7 @@ StorageSystemProjectionParts::StorageSystemProjectionParts(const StorageID & tab
}
void StorageSystemProjectionParts::processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
ContextPtr, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
using State = IMergeTreeDataPart::State;
MergeTreeData::DataPartStateVector all_parts_state;

View File

@ -21,6 +21,6 @@ public:
protected:
explicit StorageSystemProjectionParts(const StorageID & table_id_);
void processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
};
}

View File

@ -73,7 +73,7 @@ StorageSystemProjectionPartsColumns::StorageSystemProjectionPartsColumns(const S
}
void StorageSystemProjectionPartsColumns::processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
ContextPtr, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
/// Prepare information about columns in storage.
struct ColumnInfo

View File

@ -23,6 +23,6 @@ public:
protected:
StorageSystemProjectionPartsColumns(const StorageID & table_id_);
void processNextStorage(
MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
ContextPtr context, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column) override;
};
}

View File

@ -0,0 +1,2 @@
275 0 138 136 0
275 0

View File

@ -0,0 +1,131 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS mv";
$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n Int8, m Int8, CONSTRAINT c CHECK xxHash32(n+m) % 8 != 0) ENGINE=MergeTree ORDER BY n PARTITION BY 0 < n";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (nm Int16, CONSTRAINT c CHECK xxHash32(nm) % 8 != 0) ENGINE=MergeTree ORDER BY nm";
$CLICKHOUSE_CLIENT --query "CREATE MATERIALIZED VIEW mv TO dst (nm Int16) AS SELECT n*m AS nm FROM src";
$CLICKHOUSE_CLIENT --query "CREATE TABLE tmp (x UInt8, nm Int16) ENGINE=MergeTree ORDER BY (x, nm)"
$CLICKHOUSE_CLIENT --query "INSERT INTO src VALUES (0, 0)"
# some transactions will fail due to constraint
function thread_insert_commit()
{
for i in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES ($i, $1);
SELECT throwIf((SELECT sum(nm) FROM mv) != $(($i * $1))) FORMAT Null;
INSERT INTO src VALUES (-$i, $1);
COMMIT;" 2>&1| grep -Fv "is violated at row" | grep -Fv "Transaction is not in RUNNING state" | grep -F "Received from " ||:
done
}
function thread_insert_rollback()
{
for _ in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES (42, $1);
SELECT throwIf((SELECT count() FROM src WHERE n=42 AND m=$1) != 1) FORMAT Null;
ROLLBACK;"
done
}
# make merges more aggressive
function thread_optimize()
{
trap "exit 0" INT
while true; do
optimize_query="OPTIMIZE TABLE src"
if (( RANDOM % 2 )); then
optimize_query="OPTIMIZE TABLE dst"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query FINAL"
fi
action="COMMIT"
if (( RANDOM % 2 )); then
action="ROLLBACK"
fi
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
$optimize_query;
$action;
"
sleep 0.$RANDOM;
done
}
function thread_select()
{
trap "exit 0" INT
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT throwIf((SELECT (sum(n), count() % 2) FROM src) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM mv) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM dst) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
COMMIT;" || $CLICKHOUSE_CLIENT -q "SELECT 'src', arraySort(groupArray(n*m)) FROM src UNION ALL SELECT 'mv', arraySort(groupArray(nm)) FROM mv"
done
}
function thread_select_insert()
{
trap "exit 0" INT
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT throwIf((SELECT count() FROM tmp) != 0) FORMAT Null;
INSERT INTO tmp SELECT 1, n*m FROM src;
INSERT INTO tmp SELECT 2, nm FROM mv;
INSERT INTO tmp SELECT 3, nm FROM dst;
INSERT INTO tmp SELECT 4, (*,).1 FROM (SELECT n*m FROM src UNION ALL SELECT nm FROM mv UNION ALL SELECT nm FROM dst);
SELECT throwIf((SELECT countDistinct(x) FROM tmp) != 4) FORMAT Null;
-- now check that all results are the same
SELECT throwIf(1 != (SELECT countDistinct(arr) FROM (SELECT x, arraySort(groupArray(nm)) AS arr FROM tmp WHERE x!=4 GROUP BY x))) FORMAT Null;
SELECT throwIf((SELECT count(), sum(nm) FROM tmp WHERE x=4) != (SELECT count(), sum(nm) FROM tmp WHERE x!=4)) FORMAT Null;
ROLLBACK;" || $CLICKHOUSE_CLIENT -q "SELECT x, arraySort(groupArray(nm)) AS arr FROM tmp GROUP BY x"
done
}
thread_insert_commit 1 & PID_1=$!
thread_insert_commit 2 & PID_2=$!
thread_insert_rollback 3 & PID_3=$!
thread_optimize & PID_4=$!
thread_select & PID_5=$!
thread_select_insert & PID_6=$!
sleep 0.$RANDOM;
thread_select & PID_7=$!
thread_select_insert & PID_8=$!
wait $PID_1 && wait $PID_2 && wait $PID_3
kill -INT $PID_4
kill -INT $PID_5
kill -INT $PID_6
kill -INT $PID_7
kill -INT $PID_8
wait
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src;
SELECT count(), sum(nm) FROM mv";
$CLICKHOUSE_CLIENT --query "DROP TABLE src";
$CLICKHOUSE_CLIENT --query "DROP TABLE dst";
$CLICKHOUSE_CLIENT --query "DROP TABLE mv";

View File

@ -41,8 +41,8 @@ function thread_select()
SELECT throwIf((SELECT sum(n) FROM mt) != 0) FORMAT Null;
SELECT throwIf((SELECT count() FROM mt) % 2 != 0) FORMAT Null;
SELECT arraySort(groupArray(n)), arraySort(groupArray(m)), arraySort(groupArray(_part)) FROM mt;
COMMIT;" | tee -a ./wtf.log | uniq | wc -l | grep -v "^1$" && $CLICKHOUSE_CLIENT -q "SELECT * FROM system.parts
WHERE database='$CLICKHOUSE_DATABASE' AND table='mt'" ||:;
COMMIT;" | uniq | wc -l | grep -v "^1$" && $CLICKHOUSE_CLIENT -q "SELECT * FROM system.parts
WHERE database='$CLICKHOUSE_DATABASE' AND table='mt'" ||:;
done
}