mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge with new changes
This commit is contained in:
commit
e2a779b315
@ -1,102 +1,21 @@
|
|||||||
#!/bin/bash -e
|
#!/bin/bash -e
|
||||||
|
|
||||||
if [[ -n $1 ]]; then
|
TABLE="hits_100m_obfuscated"
|
||||||
SCALE=$1
|
|
||||||
else
|
|
||||||
SCALE=100
|
|
||||||
fi
|
|
||||||
|
|
||||||
TABLE="hits_${SCALE}m_obfuscated"
|
|
||||||
DATASET="${TABLE}_v1.tar.xz"
|
|
||||||
QUERIES_FILE="queries.sql"
|
QUERIES_FILE="queries.sql"
|
||||||
TRIES=3
|
TRIES=3
|
||||||
|
|
||||||
# Note: on older Ubuntu versions, 'axel' does not support IPv6. If you are using IPv6-only servers on very old Ubuntu, just don't install 'axel'.
|
mkdir -p clickhouse-benchmark
|
||||||
|
pushd clickhouse-benchmark
|
||||||
|
|
||||||
FASTER_DOWNLOAD=wget
|
# Download the binary
|
||||||
if command -v axel >/dev/null; then
|
if [[ ! -x clickhouse ]]; then
|
||||||
FASTER_DOWNLOAD=axel
|
curl https://clickhouse.com/ | sh
|
||||||
else
|
|
||||||
echo "It's recommended to install 'axel' for faster downloads."
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if command -v pixz >/dev/null; then
|
|
||||||
TAR_PARAMS='-Ipixz'
|
|
||||||
else
|
|
||||||
echo "It's recommended to install 'pixz' for faster decompression of the dataset."
|
|
||||||
fi
|
|
||||||
|
|
||||||
mkdir -p clickhouse-benchmark-$SCALE
|
|
||||||
pushd clickhouse-benchmark-$SCALE
|
|
||||||
|
|
||||||
OS=$(uname -s)
|
|
||||||
ARCH=$(uname -m)
|
|
||||||
|
|
||||||
DIR=
|
|
||||||
|
|
||||||
if [ "${OS}" = "Linux" ]
|
|
||||||
then
|
|
||||||
if [ "${ARCH}" = "x86_64" ]
|
|
||||||
then
|
|
||||||
DIR="amd64"
|
|
||||||
elif [ "${ARCH}" = "aarch64" ]
|
|
||||||
then
|
|
||||||
DIR="aarch64"
|
|
||||||
elif [ "${ARCH}" = "powerpc64le" ]
|
|
||||||
then
|
|
||||||
DIR="powerpc64le"
|
|
||||||
fi
|
|
||||||
elif [ "${OS}" = "FreeBSD" ]
|
|
||||||
then
|
|
||||||
if [ "${ARCH}" = "x86_64" ]
|
|
||||||
then
|
|
||||||
DIR="freebsd"
|
|
||||||
elif [ "${ARCH}" = "aarch64" ]
|
|
||||||
then
|
|
||||||
DIR="freebsd-aarch64"
|
|
||||||
elif [ "${ARCH}" = "powerpc64le" ]
|
|
||||||
then
|
|
||||||
DIR="freebsd-powerpc64le"
|
|
||||||
fi
|
|
||||||
elif [ "${OS}" = "Darwin" ]
|
|
||||||
then
|
|
||||||
if [ "${ARCH}" = "x86_64" ]
|
|
||||||
then
|
|
||||||
DIR="macos"
|
|
||||||
elif [ "${ARCH}" = "aarch64" -o "${ARCH}" = "arm64" ]
|
|
||||||
then
|
|
||||||
DIR="macos-aarch64"
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [ -z "${DIR}" ]
|
|
||||||
then
|
|
||||||
echo "The '${OS}' operating system with the '${ARCH}' architecture is not supported."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
URL="https://builds.clickhouse.com/master/${DIR}/clickhouse"
|
|
||||||
echo
|
|
||||||
echo "Will download ${URL}"
|
|
||||||
echo
|
|
||||||
curl -O "${URL}" && chmod a+x clickhouse || exit 1
|
|
||||||
echo
|
|
||||||
echo "Successfully downloaded the ClickHouse binary"
|
|
||||||
|
|
||||||
chmod a+x clickhouse
|
|
||||||
|
|
||||||
if [[ ! -f $QUERIES_FILE ]]; then
|
if [[ ! -f $QUERIES_FILE ]]; then
|
||||||
wget "https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/$QUERIES_FILE"
|
wget "https://raw.githubusercontent.com/ClickHouse/ClickHouse/master/benchmark/clickhouse/$QUERIES_FILE"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [[ ! -d data ]]; then
|
|
||||||
if [[ ! -f $DATASET ]]; then
|
|
||||||
$FASTER_DOWNLOAD "https://datasets.clickhouse.com/hits/partitions/$DATASET"
|
|
||||||
fi
|
|
||||||
|
|
||||||
tar $TAR_PARAMS --strip-components=1 --directory=. -x -v -f $DATASET
|
|
||||||
fi
|
|
||||||
|
|
||||||
uptime
|
uptime
|
||||||
|
|
||||||
echo "Starting clickhouse-server"
|
echo "Starting clickhouse-server"
|
||||||
@ -114,10 +33,20 @@ echo "Waiting for clickhouse-server to start"
|
|||||||
|
|
||||||
for i in {1..30}; do
|
for i in {1..30}; do
|
||||||
sleep 1
|
sleep 1
|
||||||
./clickhouse client --query "SELECT 'The dataset size is: ', count() FROM $TABLE" 2>/dev/null && break || echo '.'
|
./clickhouse client --query "SELECT 'Ok.'" 2>/dev/null && break || echo -n '.'
|
||||||
if [[ $i == 30 ]]; then exit 1; fi
|
if [[ $i == 30 ]]; then exit 1; fi
|
||||||
done
|
done
|
||||||
|
|
||||||
|
echo "Will download the dataset"
|
||||||
|
./clickhouse client --max_insert_threads $(nproc || 4) --progress --query "
|
||||||
|
CREATE OR REPLACE TABLE ${TABLE} ENGINE = MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime)
|
||||||
|
AS SELECT * FROM url('https://datasets.clickhouse.com/hits/native/hits_100m_obfuscated_{0..255}.native.zst')"
|
||||||
|
|
||||||
|
./clickhouse client --query "SELECT 'The dataset size is: ', count() FROM ${TABLE}"
|
||||||
|
|
||||||
|
echo "Will prepare the dataset"
|
||||||
|
./clickhouse client --query "OPTIMIZE TABLE ${TABLE} FINAL"
|
||||||
|
|
||||||
echo
|
echo
|
||||||
echo "Will perform benchmark. Results:"
|
echo "Will perform benchmark. Results:"
|
||||||
echo
|
echo
|
||||||
@ -133,7 +62,7 @@ cat "$QUERIES_FILE" | sed "s/{table}/${TABLE}/g" | while read query; do
|
|||||||
|
|
||||||
echo -n "["
|
echo -n "["
|
||||||
for i in $(seq 1 $TRIES); do
|
for i in $(seq 1 $TRIES); do
|
||||||
RES=$(./clickhouse client --max_memory_usage 100G --time --format=Null --query="$query" 2>&1 ||:)
|
RES=$(./clickhouse client --time --format=Null --query="$query" 2>&1 ||:)
|
||||||
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"
|
[[ "$?" == "0" ]] && echo -n "${RES}" || echo -n "null"
|
||||||
[[ "$i" != $TRIES ]] && echo -n ", "
|
[[ "$i" != $TRIES ]] && echo -n ", "
|
||||||
done
|
done
|
||||||
@ -180,10 +109,10 @@ else
|
|||||||
cat /proc/meminfo | grep MemTotal
|
cat /proc/meminfo | grep MemTotal
|
||||||
echo '----RAID Info-------------------'
|
echo '----RAID Info-------------------'
|
||||||
cat /proc/mdstat
|
cat /proc/mdstat
|
||||||
#echo '----PCI-------------------------'
|
|
||||||
#lspci
|
|
||||||
#echo '----All Hardware Info-----------'
|
|
||||||
#lshw
|
|
||||||
echo '--------------------------------'
|
echo '--------------------------------'
|
||||||
fi
|
fi
|
||||||
echo
|
echo
|
||||||
|
|
||||||
|
echo "Instance type from IMDS (if available):"
|
||||||
|
curl --connect-timeout 1 http://169.254.169.254/latest/meta-data/instance-type
|
||||||
|
echo
|
||||||
|
@ -1176,7 +1176,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
|||||||
query_plan.getCurrentDataStream(),
|
query_plan.getCurrentDataStream(),
|
||||||
expressions.prewhere_info->row_level_filter,
|
expressions.prewhere_info->row_level_filter,
|
||||||
expressions.prewhere_info->row_level_column_name,
|
expressions.prewhere_info->row_level_column_name,
|
||||||
false);
|
true);
|
||||||
|
|
||||||
row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
|
row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
|
||||||
query_plan.addStep(std::move(row_level_filter_step));
|
query_plan.addStep(std::move(row_level_filter_step));
|
||||||
|
@ -297,6 +297,11 @@ std::string PrewhereInfo::dump() const
|
|||||||
WriteBufferFromOwnString ss;
|
WriteBufferFromOwnString ss;
|
||||||
ss << "PrewhereDagInfo\n";
|
ss << "PrewhereDagInfo\n";
|
||||||
|
|
||||||
|
if (row_level_filter)
|
||||||
|
{
|
||||||
|
ss << "row_level_filter " << row_level_filter->dumpDAG() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
if (prewhere_actions)
|
if (prewhere_actions)
|
||||||
{
|
{
|
||||||
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";
|
ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";
|
||||||
|
@ -89,8 +89,6 @@ protected:
|
|||||||
using ColumnPosition = std::optional<size_t>;
|
using ColumnPosition = std::optional<size_t>;
|
||||||
ColumnPosition findColumnForOffsets(const String & column_name) const;
|
ColumnPosition findColumnForOffsets(const String & column_name) const;
|
||||||
|
|
||||||
friend class MergeTreeRangeReader::DelayedStream;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Alter conversions, which must be applied on fly if required
|
/// Alter conversions, which must be applied on fly if required
|
||||||
MergeTreeData::AlterConversions alter_conversions;
|
MergeTreeData::AlterConversions alter_conversions;
|
||||||
|
@ -264,7 +264,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
|
|||||||
/// Task is not needed
|
/// Task is not needed
|
||||||
merge_task.reset();
|
merge_task.reset();
|
||||||
|
|
||||||
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, transaction_ptr.get());
|
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -115,7 +115,11 @@ void MergePlainMergeTreeTask::prepare()
|
|||||||
void MergePlainMergeTreeTask::finish()
|
void MergePlainMergeTreeTask::finish()
|
||||||
{
|
{
|
||||||
new_part = merge_task->getFuture().get();
|
new_part = merge_task->getFuture().get();
|
||||||
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr);
|
|
||||||
|
MergeTreeData::Transaction transaction(storage, txn.get());
|
||||||
|
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction);
|
||||||
|
transaction.commit();
|
||||||
|
|
||||||
write_part_log({});
|
write_part_log({});
|
||||||
storage.incrementMergedPartsProfileEvent(new_part->getType());
|
storage.incrementMergedPartsProfileEvent(new_part->getType());
|
||||||
}
|
}
|
||||||
|
@ -74,14 +74,27 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
|
|||||||
prewhere_actions = std::make_unique<PrewhereExprInfo>();
|
prewhere_actions = std::make_unique<PrewhereExprInfo>();
|
||||||
|
|
||||||
if (prewhere_info->row_level_filter)
|
if (prewhere_info->row_level_filter)
|
||||||
prewhere_actions->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings);
|
{
|
||||||
|
PrewhereExprStep row_level_filter_step
|
||||||
|
{
|
||||||
|
.actions = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter, actions_settings),
|
||||||
|
.column_name = prewhere_info->row_level_column_name,
|
||||||
|
.remove_column = true,
|
||||||
|
.need_filter = true
|
||||||
|
};
|
||||||
|
|
||||||
prewhere_actions->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
|
prewhere_actions->steps.emplace_back(std::move(row_level_filter_step));
|
||||||
|
}
|
||||||
|
|
||||||
prewhere_actions->row_level_column_name = prewhere_info->row_level_column_name;
|
PrewhereExprStep prewhere_step
|
||||||
prewhere_actions->prewhere_column_name = prewhere_info->prewhere_column_name;
|
{
|
||||||
prewhere_actions->remove_prewhere_column = prewhere_info->remove_prewhere_column;
|
.actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings),
|
||||||
prewhere_actions->need_filter = prewhere_info->need_filter;
|
.column_name = prewhere_info->prewhere_column_name,
|
||||||
|
.remove_column = prewhere_info->remove_prewhere_column,
|
||||||
|
.need_filter = prewhere_info->need_filter
|
||||||
|
};
|
||||||
|
|
||||||
|
prewhere_actions->steps.emplace_back(std::move(prewhere_step));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,30 +217,78 @@ Chunk MergeTreeBaseSelectProcessor::generate()
|
|||||||
|
|
||||||
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
|
void MergeTreeBaseSelectProcessor::initializeRangeReaders(MergeTreeReadTask & current_task)
|
||||||
{
|
{
|
||||||
|
MergeTreeRangeReader* prev_reader = nullptr;
|
||||||
|
bool last_reader = false;
|
||||||
|
|
||||||
if (prewhere_info)
|
if (prewhere_info)
|
||||||
{
|
{
|
||||||
if (reader->getColumns().empty())
|
if (prewhere_actions->steps.size() != pre_reader_for_step.size())
|
||||||
{
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||||
current_task.range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), true, non_const_virtual_column_names);
|
"PREWHERE steps count mismatch, actions: {}, readers: {}",
|
||||||
}
|
prewhere_actions->steps.size(), pre_reader_for_step.size());
|
||||||
else
|
|
||||||
{
|
|
||||||
MergeTreeRangeReader * pre_reader_ptr = nullptr;
|
|
||||||
if (pre_reader != nullptr)
|
|
||||||
{
|
|
||||||
current_task.pre_range_reader = MergeTreeRangeReader(pre_reader.get(), nullptr, prewhere_actions.get(), false, non_const_virtual_column_names);
|
|
||||||
pre_reader_ptr = ¤t_task.pre_range_reader;
|
|
||||||
}
|
|
||||||
|
|
||||||
current_task.range_reader = MergeTreeRangeReader(reader.get(), pre_reader_ptr, nullptr, true, non_const_virtual_column_names);
|
|
||||||
|
for (size_t i = 0; i < prewhere_actions->steps.size(); ++i)
|
||||||
|
{
|
||||||
|
last_reader = reader->getColumns().empty() && (i + 1 == prewhere_actions->steps.size());
|
||||||
|
current_task.pre_range_readers.push_back(
|
||||||
|
MergeTreeRangeReader(pre_reader_for_step[i].get(), prev_reader, &prewhere_actions->steps[i], last_reader, non_const_virtual_column_names));
|
||||||
|
|
||||||
|
prev_reader = ¤t_task.pre_range_readers.back();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!last_reader)
|
||||||
|
{
|
||||||
|
current_task.range_reader = MergeTreeRangeReader(reader.get(), prev_reader, nullptr, true, non_const_virtual_column_names);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
current_task.range_reader = MergeTreeRangeReader(reader.get(), nullptr, nullptr, true, non_const_virtual_column_names);
|
/// If all columns are read by pre_range_readers than move last pre_range_reader into range_reader
|
||||||
|
current_task.range_reader = std::move(current_task.pre_range_readers.back());
|
||||||
|
current_task.pre_range_readers.pop_back();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static UInt64 estimateNumRows(const MergeTreeReadTask & current_task, UInt64 current_preferred_block_size_bytes,
|
||||||
|
UInt64 current_max_block_size_rows, UInt64 current_preferred_max_column_in_block_size_bytes, double min_filtration_ratio)
|
||||||
|
{
|
||||||
|
const MergeTreeRangeReader & current_reader = current_task.range_reader;
|
||||||
|
|
||||||
|
if (!current_task.size_predictor)
|
||||||
|
return static_cast<size_t>(current_max_block_size_rows);
|
||||||
|
|
||||||
|
/// Calculates number of rows will be read using preferred_block_size_bytes.
|
||||||
|
/// Can't be less than avg_index_granularity.
|
||||||
|
size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
|
||||||
|
if (!rows_to_read)
|
||||||
|
return rows_to_read;
|
||||||
|
auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
|
||||||
|
rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
|
||||||
|
|
||||||
|
if (current_preferred_max_column_in_block_size_bytes)
|
||||||
|
{
|
||||||
|
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
|
||||||
|
auto rows_to_read_for_max_size_column
|
||||||
|
= current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
|
||||||
|
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
|
||||||
|
auto rows_to_read_for_max_size_column_with_filtration
|
||||||
|
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
|
||||||
|
|
||||||
|
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
|
||||||
|
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
|
||||||
|
if (unread_rows_in_current_granule >= rows_to_read)
|
||||||
|
return rows_to_read;
|
||||||
|
|
||||||
|
const MergeTreeIndexGranularity & index_granularity = current_task.data_part->index_granularity;
|
||||||
|
|
||||||
|
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
|
Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
|
||||||
{
|
{
|
||||||
@ -237,45 +298,10 @@ Chunk MergeTreeBaseSelectProcessor::readFromPartImpl()
|
|||||||
const UInt64 current_max_block_size_rows = max_block_size_rows;
|
const UInt64 current_max_block_size_rows = max_block_size_rows;
|
||||||
const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes;
|
const UInt64 current_preferred_block_size_bytes = preferred_block_size_bytes;
|
||||||
const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
|
const UInt64 current_preferred_max_column_in_block_size_bytes = preferred_max_column_in_block_size_bytes;
|
||||||
const MergeTreeIndexGranularity & index_granularity = task->data_part->index_granularity;
|
|
||||||
const double min_filtration_ratio = 0.00001;
|
const double min_filtration_ratio = 0.00001;
|
||||||
|
|
||||||
auto estimate_num_rows = [current_preferred_block_size_bytes, current_max_block_size_rows,
|
UInt64 recommended_rows = estimateNumRows(*task, current_preferred_block_size_bytes,
|
||||||
&index_granularity, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio](
|
current_max_block_size_rows, current_preferred_max_column_in_block_size_bytes, min_filtration_ratio);
|
||||||
MergeTreeReadTask & current_task, MergeTreeRangeReader & current_reader)
|
|
||||||
{
|
|
||||||
if (!current_task.size_predictor)
|
|
||||||
return static_cast<size_t>(current_max_block_size_rows);
|
|
||||||
|
|
||||||
/// Calculates number of rows will be read using preferred_block_size_bytes.
|
|
||||||
/// Can't be less than avg_index_granularity.
|
|
||||||
size_t rows_to_read = current_task.size_predictor->estimateNumRows(current_preferred_block_size_bytes);
|
|
||||||
if (!rows_to_read)
|
|
||||||
return rows_to_read;
|
|
||||||
auto total_row_in_current_granule = current_reader.numRowsInCurrentGranule();
|
|
||||||
rows_to_read = std::max(total_row_in_current_granule, rows_to_read);
|
|
||||||
|
|
||||||
if (current_preferred_max_column_in_block_size_bytes)
|
|
||||||
{
|
|
||||||
/// Calculates number of rows will be read using preferred_max_column_in_block_size_bytes.
|
|
||||||
auto rows_to_read_for_max_size_column
|
|
||||||
= current_task.size_predictor->estimateNumRowsForMaxSizeColumn(current_preferred_max_column_in_block_size_bytes);
|
|
||||||
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - current_task.size_predictor->filtered_rows_ratio);
|
|
||||||
auto rows_to_read_for_max_size_column_with_filtration
|
|
||||||
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
|
|
||||||
|
|
||||||
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than current_index_granularity.
|
|
||||||
rows_to_read = std::min(rows_to_read, rows_to_read_for_max_size_column_with_filtration);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto unread_rows_in_current_granule = current_reader.numPendingRowsInCurrentGranule();
|
|
||||||
if (unread_rows_in_current_granule >= rows_to_read)
|
|
||||||
return rows_to_read;
|
|
||||||
|
|
||||||
return index_granularity.countMarksForRows(current_reader.currentMark(), rows_to_read, current_reader.numReadRowsInCurrentGranule());
|
|
||||||
};
|
|
||||||
|
|
||||||
UInt64 recommended_rows = estimate_num_rows(*task, task->range_reader);
|
|
||||||
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(current_max_block_size_rows, recommended_rows));
|
UInt64 rows_to_read = std::max(static_cast<UInt64>(1), std::min(current_max_block_size_rows, recommended_rows));
|
||||||
|
|
||||||
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
|
auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
|
||||||
@ -602,9 +628,12 @@ std::unique_ptr<MergeTreeBlockSizePredictor> MergeTreeBaseSelectProcessor::getSi
|
|||||||
const Block & sample_block)
|
const Block & sample_block)
|
||||||
{
|
{
|
||||||
const auto & required_column_names = task_columns.columns.getNames();
|
const auto & required_column_names = task_columns.columns.getNames();
|
||||||
const auto & required_pre_column_names = task_columns.pre_columns.getNames();
|
|
||||||
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
|
NameSet complete_column_names(required_column_names.begin(), required_column_names.end());
|
||||||
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
|
for (const auto & pre_columns_per_step : task_columns.pre_columns)
|
||||||
|
{
|
||||||
|
const auto & required_pre_column_names = pre_columns_per_step.getNames();
|
||||||
|
complete_column_names.insert(required_pre_column_names.begin(), required_pre_column_names.end());
|
||||||
|
}
|
||||||
|
|
||||||
return std::make_unique<MergeTreeBlockSizePredictor>(
|
return std::make_unique<MergeTreeBlockSizePredictor>(
|
||||||
data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block);
|
data_part, Names(complete_column_names.begin(), complete_column_names.end()), sample_block);
|
||||||
|
@ -115,7 +115,7 @@ protected:
|
|||||||
|
|
||||||
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
|
using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
|
||||||
MergeTreeReaderPtr reader;
|
MergeTreeReaderPtr reader;
|
||||||
MergeTreeReaderPtr pre_reader;
|
std::vector<MergeTreeReaderPtr> pre_reader_for_step;
|
||||||
|
|
||||||
MergeTreeReadTaskPtr task;
|
MergeTreeReadTaskPtr task;
|
||||||
|
|
||||||
|
@ -5,6 +5,9 @@
|
|||||||
#include <Common/checkStackSize.h>
|
#include <Common/checkStackSize.h>
|
||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Columns/ColumnConst.h>
|
#include <Columns/ColumnConst.h>
|
||||||
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
#include <IO/Operators.h>
|
||||||
|
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
|
|
||||||
|
|
||||||
@ -131,12 +134,12 @@ NameSet injectRequiredColumns(
|
|||||||
|
|
||||||
MergeTreeReadTask::MergeTreeReadTask(
|
MergeTreeReadTask::MergeTreeReadTask(
|
||||||
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
|
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
|
||||||
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
|
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
|
||||||
const NamesAndTypesList & pre_columns_, bool remove_prewhere_column_, bool should_reorder_,
|
bool remove_prewhere_column_,
|
||||||
MergeTreeBlockSizePredictorPtr && size_predictor_)
|
MergeTreeBlockSizePredictorPtr && size_predictor_)
|
||||||
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
|
: data_part{data_part_}, mark_ranges{mark_ranges_}, part_index_in_query{part_index_in_query_},
|
||||||
ordered_names{ordered_names_}, column_name_set{column_name_set_}, columns{columns_}, pre_columns{pre_columns_},
|
ordered_names{ordered_names_}, column_name_set{column_name_set_}, task_columns{task_columns_},
|
||||||
remove_prewhere_column{remove_prewhere_column_}, should_reorder{should_reorder_}, size_predictor{std::move(size_predictor_)}
|
remove_prewhere_column{remove_prewhere_column_}, size_predictor{std::move(size_predictor_)}
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,34 +279,40 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
|||||||
Names pre_column_names;
|
Names pre_column_names;
|
||||||
|
|
||||||
/// inject columns required for defaults evaluation
|
/// inject columns required for defaults evaluation
|
||||||
bool should_reorder = !injectRequiredColumns(
|
injectRequiredColumns(
|
||||||
storage, storage_snapshot, data_part, with_subcolumns, column_names).empty();
|
storage, storage_snapshot, data_part, with_subcolumns, column_names);
|
||||||
|
|
||||||
|
MergeTreeReadTaskColumns result;
|
||||||
|
auto options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects();
|
||||||
|
if (with_subcolumns)
|
||||||
|
options.withSubcolumns();
|
||||||
|
|
||||||
if (prewhere_info)
|
if (prewhere_info)
|
||||||
{
|
{
|
||||||
pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
NameSet pre_name_set;
|
||||||
|
|
||||||
|
/// Add column reading steps:
|
||||||
|
/// 1. Columns for row level filter
|
||||||
if (prewhere_info->row_level_filter)
|
if (prewhere_info->row_level_filter)
|
||||||
{
|
{
|
||||||
NameSet names(pre_column_names.begin(), pre_column_names.end());
|
Names row_filter_column_names = prewhere_info->row_level_filter->getRequiredColumnsNames();
|
||||||
|
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, row_filter_column_names));
|
||||||
for (auto & name : prewhere_info->row_level_filter->getRequiredColumnsNames())
|
pre_name_set.insert(row_filter_column_names.begin(), row_filter_column_names.end());
|
||||||
{
|
|
||||||
if (!names.contains(name))
|
|
||||||
pre_column_names.push_back(name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pre_column_names.empty())
|
/// 2. Columns for prewhere
|
||||||
pre_column_names.push_back(column_names[0]);
|
Names all_pre_column_names = prewhere_info->prewhere_actions->getRequiredColumnsNames();
|
||||||
|
|
||||||
const auto injected_pre_columns = injectRequiredColumns(
|
const auto injected_pre_columns = injectRequiredColumns(
|
||||||
storage, storage_snapshot, data_part, with_subcolumns, pre_column_names);
|
storage, storage_snapshot, data_part, with_subcolumns, all_pre_column_names);
|
||||||
|
|
||||||
if (!injected_pre_columns.empty())
|
for (const auto & name : all_pre_column_names)
|
||||||
should_reorder = true;
|
{
|
||||||
|
if (pre_name_set.contains(name))
|
||||||
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
|
continue;
|
||||||
|
pre_column_names.push_back(name);
|
||||||
|
pre_name_set.insert(name);
|
||||||
|
}
|
||||||
|
|
||||||
Names post_column_names;
|
Names post_column_names;
|
||||||
for (const auto & name : column_names)
|
for (const auto & name : column_names)
|
||||||
@ -313,17 +322,23 @@ MergeTreeReadTaskColumns getReadTaskColumns(
|
|||||||
column_names = post_column_names;
|
column_names = post_column_names;
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeReadTaskColumns result;
|
result.pre_columns.push_back(storage_snapshot->getColumnsByNames(options, pre_column_names));
|
||||||
NamesAndTypesList all_columns;
|
|
||||||
|
|
||||||
auto options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects();
|
/// 3. Rest of the requested columns
|
||||||
if (with_subcolumns)
|
|
||||||
options.withSubcolumns();
|
|
||||||
|
|
||||||
result.pre_columns = storage_snapshot->getColumnsByNames(options, pre_column_names);
|
|
||||||
result.columns = storage_snapshot->getColumnsByNames(options, column_names);
|
result.columns = storage_snapshot->getColumnsByNames(options, column_names);
|
||||||
result.should_reorder = should_reorder;
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::string MergeTreeReadTaskColumns::dump() const
|
||||||
|
{
|
||||||
|
WriteBufferFromOwnString s;
|
||||||
|
for (size_t i = 0; i < pre_columns.size(); ++i)
|
||||||
|
{
|
||||||
|
s << "STEP " << i << ": " << pre_columns[i].toString() << "\n";
|
||||||
|
}
|
||||||
|
s << "COLUMNS: " << columns.toString() << "\n";
|
||||||
|
return s.str();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,16 @@ NameSet injectRequiredColumns(
|
|||||||
Names & columns);
|
Names & columns);
|
||||||
|
|
||||||
|
|
||||||
|
struct MergeTreeReadTaskColumns
|
||||||
|
{
|
||||||
|
/// column names to read during WHERE
|
||||||
|
NamesAndTypesList columns;
|
||||||
|
/// column names to read during each PREWHERE step
|
||||||
|
std::vector<NamesAndTypesList> pre_columns;
|
||||||
|
|
||||||
|
std::string dump() const;
|
||||||
|
};
|
||||||
|
|
||||||
/// A batch of work for MergeTreeThreadSelectProcessor
|
/// A batch of work for MergeTreeThreadSelectProcessor
|
||||||
struct MergeTreeReadTask
|
struct MergeTreeReadTask
|
||||||
{
|
{
|
||||||
@ -43,39 +53,27 @@ struct MergeTreeReadTask
|
|||||||
const Names & ordered_names;
|
const Names & ordered_names;
|
||||||
/// used to determine whether column should be filtered during PREWHERE or WHERE
|
/// used to determine whether column should be filtered during PREWHERE or WHERE
|
||||||
const NameSet & column_name_set;
|
const NameSet & column_name_set;
|
||||||
/// column names to read during WHERE
|
/// column names to read during PREWHERE and WHERE
|
||||||
const NamesAndTypesList & columns;
|
const MergeTreeReadTaskColumns & task_columns;
|
||||||
/// column names to read during PREWHERE
|
|
||||||
const NamesAndTypesList & pre_columns;
|
|
||||||
/// should PREWHERE column be returned to requesting side?
|
/// should PREWHERE column be returned to requesting side?
|
||||||
const bool remove_prewhere_column;
|
const bool remove_prewhere_column;
|
||||||
/// resulting block may require reordering in accordance with `ordered_names`
|
|
||||||
const bool should_reorder;
|
|
||||||
/// Used to satistfy preferred_block_size_bytes limitation
|
/// Used to satistfy preferred_block_size_bytes limitation
|
||||||
MergeTreeBlockSizePredictorPtr size_predictor;
|
MergeTreeBlockSizePredictorPtr size_predictor;
|
||||||
/// Used to save current range processing status
|
/// Used to save current range processing status
|
||||||
MergeTreeRangeReader range_reader;
|
MergeTreeRangeReader range_reader;
|
||||||
MergeTreeRangeReader pre_range_reader;
|
/// Range readers for multiple filtering steps: row level security, PREWHERE etc.
|
||||||
|
/// NOTE: we take references to elements and push_back new elements, that's why it is a deque but noit a vector
|
||||||
|
std::deque<MergeTreeRangeReader> pre_range_readers;
|
||||||
|
|
||||||
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
|
bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
|
||||||
|
|
||||||
MergeTreeReadTask(
|
MergeTreeReadTask(
|
||||||
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
|
const MergeTreeData::DataPartPtr & data_part_, const MarkRanges & mark_ranges_, size_t part_index_in_query_,
|
||||||
const Names & ordered_names_, const NameSet & column_name_set_, const NamesAndTypesList & columns_,
|
const Names & ordered_names_, const NameSet & column_name_set_, const MergeTreeReadTaskColumns & task_columns_,
|
||||||
const NamesAndTypesList & pre_columns_, bool remove_prewhere_column_, bool should_reorder_,
|
bool remove_prewhere_column_,
|
||||||
MergeTreeBlockSizePredictorPtr && size_predictor_);
|
MergeTreeBlockSizePredictorPtr && size_predictor_);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct MergeTreeReadTaskColumns
|
|
||||||
{
|
|
||||||
/// column names to read during WHERE
|
|
||||||
NamesAndTypesList columns;
|
|
||||||
/// column names to read during PREWHERE
|
|
||||||
NamesAndTypesList pre_columns;
|
|
||||||
/// resulting block may require reordering in accordance with `ordered_names`
|
|
||||||
bool should_reorder = false;
|
|
||||||
};
|
|
||||||
|
|
||||||
MergeTreeReadTaskColumns getReadTaskColumns(
|
MergeTreeReadTaskColumns getReadTaskColumns(
|
||||||
const MergeTreeData & storage,
|
const MergeTreeData & storage,
|
||||||
const StorageSnapshotPtr & storage_snapshot,
|
const StorageSnapshotPtr & storage_snapshot,
|
||||||
|
@ -96,7 +96,6 @@ namespace ProfileEvents
|
|||||||
extern const Event RejectedInserts;
|
extern const Event RejectedInserts;
|
||||||
extern const Event DelayedInserts;
|
extern const Event DelayedInserts;
|
||||||
extern const Event DelayedInsertsMilliseconds;
|
extern const Event DelayedInsertsMilliseconds;
|
||||||
extern const Event DuplicatedInsertedBlocks;
|
|
||||||
extern const Event InsertedWideParts;
|
extern const Event InsertedWideParts;
|
||||||
extern const Event InsertedCompactParts;
|
extern const Event InsertedCompactParts;
|
||||||
extern const Event InsertedInMemoryParts;
|
extern const Event InsertedInMemoryParts;
|
||||||
@ -2786,22 +2785,14 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
|
|||||||
|
|
||||||
bool MergeTreeData::renameTempPartAndAdd(
|
bool MergeTreeData::renameTempPartAndAdd(
|
||||||
MutableDataPartPtr & part,
|
MutableDataPartPtr & part,
|
||||||
MergeTreeTransaction * txn,
|
Transaction & out_transaction,
|
||||||
SimpleIncrement * increment,
|
DataPartsLock & lock)
|
||||||
Transaction * out_transaction,
|
|
||||||
MergeTreeDeduplicationLog * deduplication_log,
|
|
||||||
std::string_view deduplication_token)
|
|
||||||
{
|
{
|
||||||
if (out_transaction && &out_transaction->data != this)
|
|
||||||
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
DataPartsVector covered_parts;
|
DataPartsVector covered_parts;
|
||||||
{
|
|
||||||
auto lock = lockParts();
|
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
|
||||||
if (!renameTempPartAndReplace(part, txn, increment, out_transaction, lock, &covered_parts, deduplication_log, deduplication_token))
|
return false;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (!covered_parts.empty())
|
if (!covered_parts.empty())
|
||||||
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
|
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
|
||||||
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
|
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
|
||||||
@ -2809,29 +2800,10 @@ bool MergeTreeData::renameTempPartAndAdd(
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MergeTreeData::checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const
|
||||||
bool MergeTreeData::renameTempPartAndReplace(
|
|
||||||
MutableDataPartPtr & part,
|
|
||||||
MergeTreeTransaction * txn,
|
|
||||||
SimpleIncrement * increment,
|
|
||||||
Transaction * out_transaction,
|
|
||||||
std::unique_lock<std::mutex> & lock,
|
|
||||||
DataPartsVector * out_covered_parts,
|
|
||||||
MergeTreeDeduplicationLog * deduplication_log,
|
|
||||||
std::string_view deduplication_token)
|
|
||||||
{
|
{
|
||||||
if (out_transaction && &out_transaction->data != this)
|
|
||||||
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
if (txn)
|
|
||||||
transactions_enabled.store(true);
|
|
||||||
|
|
||||||
part->assertState({DataPartState::Temporary});
|
part->assertState({DataPartState::Temporary});
|
||||||
|
|
||||||
MergeTreePartInfo part_info = part->info;
|
|
||||||
String part_name;
|
|
||||||
|
|
||||||
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
|
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
|
||||||
{
|
{
|
||||||
if (part->partition.value != existing_part_in_partition->partition.value)
|
if (part->partition.value != existing_part_in_partition->partition.value)
|
||||||
@ -2841,21 +2813,7 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
ErrorCodes::CORRUPTED_DATA);
|
ErrorCodes::CORRUPTED_DATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
|
if (auto it_duplicate = data_parts_by_info.find(part->info); it_duplicate != data_parts_by_info.end())
|
||||||
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
|
|
||||||
*/
|
|
||||||
if (increment)
|
|
||||||
{
|
|
||||||
part_info.min_block = part_info.max_block = increment->get();
|
|
||||||
part_info.mutation = 0; /// it's equal to min_block by default
|
|
||||||
part_name = part->getNewName(part_info);
|
|
||||||
}
|
|
||||||
else /// Parts from ReplicatedMergeTree already have names
|
|
||||||
part_name = part->name;
|
|
||||||
|
|
||||||
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->data_part_storage->getPartDirectory(), part_name);
|
|
||||||
|
|
||||||
if (auto it_duplicate = data_parts_by_info.find(part_info); it_duplicate != data_parts_by_info.end())
|
|
||||||
{
|
{
|
||||||
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
|
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
|
||||||
|
|
||||||
@ -2864,93 +2822,51 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
|
|
||||||
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
|
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename)
|
||||||
|
{
|
||||||
|
part->is_temp = false;
|
||||||
|
part->setState(DataPartState::PreActive);
|
||||||
|
|
||||||
|
if (need_rename)
|
||||||
|
part->renameTo(part->name, true);
|
||||||
|
|
||||||
|
data_parts_indexes.insert(part);
|
||||||
|
out_transaction.precommitted_parts.insert(part);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MergeTreeData::renameTempPartAndReplaceImpl(
|
||||||
|
MutableDataPartPtr & part,
|
||||||
|
Transaction & out_transaction,
|
||||||
|
DataPartsLock & lock,
|
||||||
|
DataPartsVector * out_covered_parts)
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->data_part_storage->getPartDirectory(), part->name);
|
||||||
|
|
||||||
|
if (&out_transaction.data != this)
|
||||||
|
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
|
||||||
|
ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
checkPartCanBeAddedToTable(part, lock);
|
||||||
|
|
||||||
DataPartPtr covering_part;
|
DataPartPtr covering_part;
|
||||||
DataPartsVector covered_parts = getActivePartsToReplace(part_info, part_name, covering_part, lock);
|
DataPartsVector covered_parts = getActivePartsToReplace(part->info, part->name, covering_part, lock);
|
||||||
|
|
||||||
if (covering_part)
|
if (covering_part)
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part_name, covering_part->getNameWithState());
|
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part->name, covering_part->getNameWithState());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deduplication log used only from non-replicated MergeTree. Replicated
|
|
||||||
/// tables have their own mechanism. We try to deduplicate at such deep
|
|
||||||
/// level, because only here we know real part name which is required for
|
|
||||||
/// deduplication.
|
|
||||||
if (deduplication_log)
|
|
||||||
{
|
|
||||||
const String block_id = part->getZeroLevelPartBlockID(deduplication_token);
|
|
||||||
auto res = deduplication_log->addPart(block_id, part_info);
|
|
||||||
if (!res.second)
|
|
||||||
{
|
|
||||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
|
||||||
LOG_INFO(log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// All checks are passed. Now we can rename the part on disk.
|
/// All checks are passed. Now we can rename the part on disk.
|
||||||
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
|
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
|
||||||
///
|
preparePartForCommit(part, out_transaction, /* need_rename = */ true);
|
||||||
/// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.
|
|
||||||
|
|
||||||
part->name = part_name;
|
|
||||||
part->info = part_info;
|
|
||||||
part->is_temp = false;
|
|
||||||
part->setState(DataPartState::PreActive);
|
|
||||||
part->renameTo(part_name, true);
|
|
||||||
|
|
||||||
auto part_it = data_parts_indexes.insert(part).first;
|
|
||||||
|
|
||||||
if (out_transaction)
|
|
||||||
{
|
|
||||||
chassert(out_transaction->txn == txn);
|
|
||||||
out_transaction->precommitted_parts.insert(part);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/// FIXME Transactions: it's not the best place for checking and setting removal_tid,
|
|
||||||
/// because it's too optimistic. We should lock removal_tid of covered parts at the beginning of operation.
|
|
||||||
MergeTreeTransaction::addNewPartAndRemoveCovered(shared_from_this(), part, covered_parts, txn);
|
|
||||||
|
|
||||||
size_t reduce_bytes = 0;
|
|
||||||
size_t reduce_rows = 0;
|
|
||||||
size_t reduce_parts = 0;
|
|
||||||
auto current_time = time(nullptr);
|
|
||||||
for (const DataPartPtr & covered_part : covered_parts)
|
|
||||||
{
|
|
||||||
covered_part->remove_time.store(current_time, std::memory_order_relaxed);
|
|
||||||
modifyPartState(covered_part, DataPartState::Outdated);
|
|
||||||
removePartContributionToColumnAndSecondaryIndexSizes(covered_part);
|
|
||||||
reduce_bytes += covered_part->getBytesOnDisk();
|
|
||||||
reduce_rows += covered_part->rows_count;
|
|
||||||
++reduce_parts;
|
|
||||||
}
|
|
||||||
|
|
||||||
modifyPartState(part_it, DataPartState::Active);
|
|
||||||
addPartContributionToColumnAndSecondaryIndexSizes(part);
|
|
||||||
|
|
||||||
if (covered_parts.empty())
|
|
||||||
updateObjectColumns(*part_it, lock);
|
|
||||||
else
|
|
||||||
resetObjectColumnsFromActiveParts(lock);
|
|
||||||
|
|
||||||
ssize_t diff_bytes = part->getBytesOnDisk() - reduce_bytes;
|
|
||||||
ssize_t diff_rows = part->rows_count - reduce_rows;
|
|
||||||
ssize_t diff_parts = 1 - reduce_parts;
|
|
||||||
increaseDataVolume(diff_bytes, diff_rows, diff_parts);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto part_in_memory = asInMemoryPart(part);
|
|
||||||
if (part_in_memory && getSettings()->in_memory_parts_enable_wal)
|
|
||||||
{
|
|
||||||
auto wal = getWriteAheadLog();
|
|
||||||
wal->addPart(part_in_memory);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (out_covered_parts)
|
if (out_covered_parts)
|
||||||
{
|
{
|
||||||
|
out_covered_parts->reserve(covered_parts.size());
|
||||||
|
|
||||||
for (DataPartPtr & covered_part : covered_parts)
|
for (DataPartPtr & covered_part : covered_parts)
|
||||||
out_covered_parts->emplace_back(std::move(covered_part));
|
out_covered_parts->emplace_back(std::move(covered_part));
|
||||||
}
|
}
|
||||||
@ -2958,24 +2874,26 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplaceUnlocked(
|
||||||
MutableDataPartPtr & part, MergeTreeTransaction * txn, SimpleIncrement * increment,
|
MutableDataPartPtr & part,
|
||||||
Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
|
Transaction & out_transaction,
|
||||||
|
DataPartsLock & lock)
|
||||||
{
|
{
|
||||||
if (out_transaction && &out_transaction->data != this)
|
|
||||||
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
DataPartsVector covered_parts;
|
DataPartsVector covered_parts;
|
||||||
{
|
renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts);
|
||||||
auto lock = lockParts();
|
|
||||||
renameTempPartAndReplace(part, txn, increment, out_transaction, lock, &covered_parts, deduplication_log);
|
|
||||||
}
|
|
||||||
return covered_parts;
|
return covered_parts;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock)
|
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
|
||||||
|
MutableDataPartPtr & part,
|
||||||
|
Transaction & out_transaction)
|
||||||
|
{
|
||||||
|
auto part_lock = lockParts();
|
||||||
|
return renameTempPartAndReplaceUnlocked(part, out_transaction, part_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock)
|
||||||
{
|
{
|
||||||
if (txn)
|
if (txn)
|
||||||
transactions_enabled.store(true);
|
transactions_enabled.store(true);
|
||||||
@ -4879,6 +4797,14 @@ MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
MergeTreeData::Transaction::Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_)
|
||||||
|
: data(data_)
|
||||||
|
, txn(txn_)
|
||||||
|
{
|
||||||
|
if (txn)
|
||||||
|
data.transactions_enabled.store(true);
|
||||||
|
}
|
||||||
|
|
||||||
void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
|
void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
|
||||||
{
|
{
|
||||||
if (!isEmpty())
|
if (!isEmpty())
|
||||||
@ -4922,9 +4848,12 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
|
|||||||
|
|
||||||
if (!isEmpty())
|
if (!isEmpty())
|
||||||
{
|
{
|
||||||
|
auto settings = data.getSettings();
|
||||||
|
MergeTreeData::WriteAheadLogPtr wal;
|
||||||
auto parts_lock = acquired_parts_lock ? MergeTreeData::DataPartsLock() : data.lockParts();
|
auto parts_lock = acquired_parts_lock ? MergeTreeData::DataPartsLock() : data.lockParts();
|
||||||
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
|
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
|
||||||
|
|
||||||
|
|
||||||
if (txn)
|
if (txn)
|
||||||
{
|
{
|
||||||
for (const DataPartPtr & part : precommitted_parts)
|
for (const DataPartPtr & part : precommitted_parts)
|
||||||
@ -4949,6 +4878,15 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
|
|||||||
|
|
||||||
for (const DataPartPtr & part : precommitted_parts)
|
for (const DataPartPtr & part : precommitted_parts)
|
||||||
{
|
{
|
||||||
|
auto part_in_memory = asInMemoryPart(part);
|
||||||
|
if (part_in_memory && settings->in_memory_parts_enable_wal)
|
||||||
|
{
|
||||||
|
if (!wal)
|
||||||
|
wal = data.getWriteAheadLog();
|
||||||
|
|
||||||
|
wal->addPart(part_in_memory);
|
||||||
|
}
|
||||||
|
|
||||||
DataPartPtr covering_part;
|
DataPartPtr covering_part;
|
||||||
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, *owing_parts_lock);
|
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, *owing_parts_lock);
|
||||||
if (covering_part)
|
if (covering_part)
|
||||||
|
@ -252,7 +252,7 @@ public:
|
|||||||
class Transaction : private boost::noncopyable
|
class Transaction : private boost::noncopyable
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_) : data(data_), txn(txn_) {}
|
Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_);
|
||||||
|
|
||||||
DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
|
DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
|
||||||
|
|
||||||
@ -549,37 +549,24 @@ public:
|
|||||||
|
|
||||||
/// Renames temporary part to a permanent part and adds it to the parts set.
|
/// Renames temporary part to a permanent part and adds it to the parts set.
|
||||||
/// It is assumed that the part does not intersect with existing parts.
|
/// It is assumed that the part does not intersect with existing parts.
|
||||||
/// If increment != nullptr, part index is determining using increment. Otherwise part index remains unchanged.
|
/// Adds the part in the PreActive state (the part will be added to the active set later with out_transaction->commit()).
|
||||||
/// If out_transaction != nullptr, adds the part in the PreActive state (the part will be added to the
|
|
||||||
/// active set later with out_transaction->commit()).
|
|
||||||
/// Else, commits the part immediately.
|
|
||||||
/// Returns true if part was added. Returns false if part is covered by bigger part.
|
/// Returns true if part was added. Returns false if part is covered by bigger part.
|
||||||
bool renameTempPartAndAdd(
|
bool renameTempPartAndAdd(
|
||||||
MutableDataPartPtr & part,
|
MutableDataPartPtr & part,
|
||||||
MergeTreeTransaction * txn,
|
Transaction & transaction,
|
||||||
SimpleIncrement * increment = nullptr,
|
DataPartsLock & lock);
|
||||||
Transaction * out_transaction = nullptr,
|
|
||||||
MergeTreeDeduplicationLog * deduplication_log = nullptr,
|
|
||||||
std::string_view deduplication_token = std::string_view());
|
|
||||||
|
|
||||||
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
|
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
|
||||||
/// Returns all parts covered by the added part (in ascending order).
|
/// Returns all parts covered by the added part (in ascending order).
|
||||||
/// If out_transaction == nullptr, marks covered parts as Outdated.
|
|
||||||
DataPartsVector renameTempPartAndReplace(
|
DataPartsVector renameTempPartAndReplace(
|
||||||
MutableDataPartPtr & part, MergeTreeTransaction * txn, SimpleIncrement * increment = nullptr,
|
|
||||||
Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
|
|
||||||
|
|
||||||
/// Low-level version of previous one, doesn't lock mutex
|
|
||||||
/// FIXME Transactions: remove add_to_txn flag, maybe merge MergeTreeTransaction and Transaction
|
|
||||||
bool renameTempPartAndReplace(
|
|
||||||
MutableDataPartPtr & part,
|
MutableDataPartPtr & part,
|
||||||
MergeTreeTransaction * txn,
|
Transaction & out_transaction);
|
||||||
SimpleIncrement * increment,
|
|
||||||
Transaction * out_transaction,
|
/// Unlocked version of previous one. Useful when added multiple parts with a single lock.
|
||||||
DataPartsLock & lock,
|
DataPartsVector renameTempPartAndReplaceUnlocked(
|
||||||
DataPartsVector * out_covered_parts = nullptr,
|
MutableDataPartPtr & part,
|
||||||
MergeTreeDeduplicationLog * deduplication_log = nullptr,
|
Transaction & out_transaction,
|
||||||
std::string_view deduplication_token = std::string_view());
|
DataPartsLock & lock);
|
||||||
|
|
||||||
/// Remove parts from working set immediately (without wait for background
|
/// Remove parts from working set immediately (without wait for background
|
||||||
/// process). Transfer part state to temporary. Have very limited usage only
|
/// process). Transfer part state to temporary. Have very limited usage only
|
||||||
@ -1251,6 +1238,22 @@ protected:
|
|||||||
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
|
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
/// Checking that candidate part doesn't break invariants: correct partition and doesn't exist already
|
||||||
|
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
|
||||||
|
|
||||||
|
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
|
||||||
|
/// in precommitted state and to transasction
|
||||||
|
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename);
|
||||||
|
|
||||||
|
/// Low-level method for preparing parts for commit (in-memory).
|
||||||
|
/// FIXME Merge MergeTreeTransaction and Transaction
|
||||||
|
bool renameTempPartAndReplaceImpl(
|
||||||
|
MutableDataPartPtr & part,
|
||||||
|
Transaction & out_transaction,
|
||||||
|
DataPartsLock & lock,
|
||||||
|
DataPartsVector * out_covered_parts);
|
||||||
|
|
||||||
/// RAII Wrapper for atomic work with currently moving parts
|
/// RAII Wrapper for atomic work with currently moving parts
|
||||||
/// Acquire them in constructor and remove them in destructor
|
/// Acquire them in constructor and remove them in destructor
|
||||||
/// Uses data.currently_moving_parts_mutex
|
/// Uses data.currently_moving_parts_mutex
|
||||||
|
@ -541,7 +541,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
|
|||||||
MergeTreeData::MutableDataPartPtr & new_data_part,
|
MergeTreeData::MutableDataPartPtr & new_data_part,
|
||||||
const MergeTreeData::DataPartsVector & parts,
|
const MergeTreeData::DataPartsVector & parts,
|
||||||
const MergeTreeTransactionPtr & txn,
|
const MergeTreeTransactionPtr & txn,
|
||||||
MergeTreeData::Transaction * out_transaction)
|
MergeTreeData::Transaction & out_transaction)
|
||||||
{
|
{
|
||||||
/// Some of source parts was possibly created in transaction, so non-transactional merge may break isolation.
|
/// Some of source parts was possibly created in transaction, so non-transactional merge may break isolation.
|
||||||
if (data.transactions_enabled.load(std::memory_order_relaxed) && !txn)
|
if (data.transactions_enabled.load(std::memory_order_relaxed) && !txn)
|
||||||
@ -549,7 +549,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
|
|||||||
"but transactions were enabled for this table");
|
"but transactions were enabled for this table");
|
||||||
|
|
||||||
/// Rename new part, add to the set and remove original parts.
|
/// Rename new part, add to the set and remove original parts.
|
||||||
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, txn.get(), nullptr, out_transaction);
|
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction);
|
||||||
|
|
||||||
/// Let's check that all original parts have been deleted and only them.
|
/// Let's check that all original parts have been deleted and only them.
|
||||||
if (replaced_parts.size() != parts.size())
|
if (replaced_parts.size() != parts.size())
|
||||||
|
@ -133,7 +133,7 @@ public:
|
|||||||
MergeTreeData::MutableDataPartPtr & new_data_part,
|
MergeTreeData::MutableDataPartPtr & new_data_part,
|
||||||
const MergeTreeData::DataPartsVector & parts,
|
const MergeTreeData::DataPartsVector & parts,
|
||||||
const MergeTreeTransactionPtr & txn,
|
const MergeTreeTransactionPtr & txn,
|
||||||
MergeTreeData::Transaction * out_transaction = nullptr);
|
MergeTreeData::Transaction & out_transaction);
|
||||||
|
|
||||||
|
|
||||||
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
|
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
|
||||||
|
@ -34,9 +34,9 @@ try
|
|||||||
: getSizePredictor(data_part, task_columns, sample_block);
|
: getSizePredictor(data_part, task_columns, sample_block);
|
||||||
|
|
||||||
task = std::make_unique<MergeTreeReadTask>(
|
task = std::make_unique<MergeTreeReadTask>(
|
||||||
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns.columns,
|
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set, task_columns,
|
||||||
task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
prewhere_info && prewhere_info->remove_prewhere_column,
|
||||||
task_columns.should_reorder, std::move(size_predictor));
|
std::move(size_predictor));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,8 @@
|
|||||||
#include <Columns/ColumnConst.h>
|
#include <Columns/ColumnConst.h>
|
||||||
#include <Columns/ColumnsCommon.h>
|
#include <Columns/ColumnsCommon.h>
|
||||||
#include <Common/TargetSpecific.h>
|
#include <Common/TargetSpecific.h>
|
||||||
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
#include <IO/Operators.h>
|
||||||
#include <base/range.h>
|
#include <base/range.h>
|
||||||
#include <Interpreters/castColumn.h>
|
#include <Interpreters/castColumn.h>
|
||||||
#include <DataTypes/DataTypeNothing.h>
|
#include <DataTypes/DataTypeNothing.h>
|
||||||
@ -64,7 +66,7 @@ static void filterColumns(Columns & columns, const ColumnPtr & filter)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static size_t getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges)
|
size_t MergeTreeRangeReader::ReadResult::getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges)
|
||||||
{
|
{
|
||||||
size_t current_task_last_mark = 0;
|
size_t current_task_last_mark = 0;
|
||||||
for (const auto & mark_range : ranges)
|
for (const auto & mark_range : ranges)
|
||||||
@ -594,6 +596,7 @@ size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, con
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Filter size must match total_rows_per_granule
|
||||||
void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
|
void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
|
||||||
{
|
{
|
||||||
if (!new_filter && filter)
|
if (!new_filter && filter)
|
||||||
@ -644,7 +647,7 @@ size_t MergeTreeRangeReader::ReadResult::countBytesInResultFilter(const IColumn:
|
|||||||
MergeTreeRangeReader::MergeTreeRangeReader(
|
MergeTreeRangeReader::MergeTreeRangeReader(
|
||||||
IMergeTreeReader * merge_tree_reader_,
|
IMergeTreeReader * merge_tree_reader_,
|
||||||
MergeTreeRangeReader * prev_reader_,
|
MergeTreeRangeReader * prev_reader_,
|
||||||
const PrewhereExprInfo * prewhere_info_,
|
const PrewhereExprStep * prewhere_info_,
|
||||||
bool last_reader_in_chain_,
|
bool last_reader_in_chain_,
|
||||||
const Names & non_const_virtual_column_names_)
|
const Names & non_const_virtual_column_names_)
|
||||||
: merge_tree_reader(merge_tree_reader_)
|
: merge_tree_reader(merge_tree_reader_)
|
||||||
@ -672,17 +675,12 @@ MergeTreeRangeReader::MergeTreeRangeReader(
|
|||||||
|
|
||||||
if (prewhere_info)
|
if (prewhere_info)
|
||||||
{
|
{
|
||||||
if (prewhere_info->row_level_filter)
|
const auto & step = *prewhere_info;
|
||||||
{
|
if (step.actions)
|
||||||
prewhere_info->row_level_filter->execute(sample_block, true);
|
step.actions->execute(sample_block, true);
|
||||||
sample_block.erase(prewhere_info->row_level_column_name);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (prewhere_info->prewhere_actions)
|
if (step.remove_column)
|
||||||
prewhere_info->prewhere_actions->execute(sample_block, true);
|
sample_block.erase(step.column_name);
|
||||||
|
|
||||||
if (prewhere_info->remove_prewhere_column)
|
|
||||||
sample_block.erase(prewhere_info->prewhere_column_name);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -983,11 +981,15 @@ void MergeTreeRangeReader::fillPartOffsetColumn(ReadResult & result, UInt64 lead
|
|||||||
result.columns.emplace_back(std::move(column));
|
result.columns.emplace_back(std::move(column));
|
||||||
}
|
}
|
||||||
|
|
||||||
Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t & num_rows)
|
Columns MergeTreeRangeReader::continueReadingChain(const ReadResult & result, size_t & num_rows)
|
||||||
{
|
{
|
||||||
Columns columns;
|
Columns columns;
|
||||||
num_rows = 0;
|
num_rows = 0;
|
||||||
|
|
||||||
|
/// No columns need to be read at this step? (only more filtering)
|
||||||
|
if (merge_tree_reader->getColumns().empty())
|
||||||
|
return columns;
|
||||||
|
|
||||||
if (result.rowsPerGranule().empty())
|
if (result.rowsPerGranule().empty())
|
||||||
{
|
{
|
||||||
/// If zero rows were read on prev step, than there is no more rows to read.
|
/// If zero rows were read on prev step, than there is no more rows to read.
|
||||||
@ -1001,7 +1003,7 @@ Columns MergeTreeRangeReader::continueReadingChain(ReadResult & result, size_t &
|
|||||||
const auto & rows_per_granule = result.rowsPerGranule();
|
const auto & rows_per_granule = result.rowsPerGranule();
|
||||||
const auto & started_ranges = result.startedRanges();
|
const auto & started_ranges = result.startedRanges();
|
||||||
|
|
||||||
size_t current_task_last_mark = getLastMark(started_ranges);
|
size_t current_task_last_mark = ReadResult::getLastMark(started_ranges);
|
||||||
size_t next_range_to_start = 0;
|
size_t next_range_to_start = 0;
|
||||||
|
|
||||||
auto size = rows_per_granule.size();
|
auto size = rows_per_granule.size();
|
||||||
@ -1039,6 +1041,8 @@ static void checkCombinedFiltersSize(size_t bytes_in_first_filter, size_t second
|
|||||||
"does not match second filter size ({})", bytes_in_first_filter, second_filter_size);
|
"does not match second filter size ({})", bytes_in_first_filter, second_filter_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Second filter size must be equal to number of 1s in the first filter.
|
||||||
|
/// The result size is equal to first filter size.
|
||||||
static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second)
|
static ColumnPtr combineFilters(ColumnPtr first, ColumnPtr second)
|
||||||
{
|
{
|
||||||
ConstantFilterDescription first_const_descr(*first);
|
ConstantFilterDescription first_const_descr(*first);
|
||||||
@ -1099,13 +1103,17 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
|||||||
const auto & header = merge_tree_reader->getColumns();
|
const auto & header = merge_tree_reader->getColumns();
|
||||||
size_t num_columns = header.size();
|
size_t num_columns = header.size();
|
||||||
|
|
||||||
if (result.columns.size() != (num_columns + non_const_virtual_column_names.size()))
|
/// Check that we have columns from previous steps and newly read required columns
|
||||||
throw Exception("Invalid number of columns passed to MergeTreeRangeReader. "
|
if (result.columns.size() < num_columns + non_const_virtual_column_names.size())
|
||||||
"Expected " + toString(num_columns) + ", "
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||||
"got " + toString(result.columns.size()), ErrorCodes::LOGICAL_ERROR);
|
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
|
||||||
|
num_columns, result.columns.size());
|
||||||
|
|
||||||
ColumnPtr filter;
|
/// This filter has the size of total_rows_per granule. It is applied after reading contiguous chunks from
|
||||||
ColumnPtr row_level_filter;
|
/// the start of each granule.
|
||||||
|
ColumnPtr combined_filter;
|
||||||
|
/// Filter computed at the current step. Its size is equal to num_rows which is <= total_rows_per_granule
|
||||||
|
ColumnPtr current_step_filter;
|
||||||
size_t prewhere_column_pos;
|
size_t prewhere_column_pos;
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -1122,13 +1130,23 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto name_and_type = header.begin(); pos < num_columns; ++pos, ++name_and_type)
|
for (auto name_and_type = header.begin(); name_and_type != header.end() && pos < result.columns.size(); ++pos, ++name_and_type)
|
||||||
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
|
block.insert({result.columns[pos], name_and_type->type, name_and_type->name});
|
||||||
|
|
||||||
for (const auto & column_name : non_const_virtual_column_names)
|
for (const auto & column_name : non_const_virtual_column_names)
|
||||||
{
|
{
|
||||||
|
if (block.has(column_name))
|
||||||
|
continue;
|
||||||
|
|
||||||
if (column_name == "_part_offset")
|
if (column_name == "_part_offset")
|
||||||
|
{
|
||||||
|
if (pos >= result.columns.size())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||||
|
"Invalid number of columns passed to MergeTreeRangeReader. Expected {}, got {}",
|
||||||
|
num_columns, result.columns.size());
|
||||||
|
|
||||||
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name});
|
block.insert({result.columns[pos], std::make_shared<DataTypeUInt64>(), column_name});
|
||||||
|
}
|
||||||
else
|
else
|
||||||
throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Unexpected non-const virtual column: " + column_name, ErrorCodes::LOGICAL_ERROR);
|
||||||
++pos;
|
++pos;
|
||||||
@ -1137,58 +1155,37 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
|||||||
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
|
/// Columns might be projected out. We need to store them here so that default columns can be evaluated later.
|
||||||
result.block_before_prewhere = block;
|
result.block_before_prewhere = block;
|
||||||
|
|
||||||
if (prewhere_info->row_level_filter)
|
if (prewhere_info->actions)
|
||||||
{
|
prewhere_info->actions->execute(block);
|
||||||
prewhere_info->row_level_filter->execute(block);
|
|
||||||
auto row_level_filter_pos = block.getPositionByName(prewhere_info->row_level_column_name);
|
|
||||||
row_level_filter = block.getByPosition(row_level_filter_pos).column;
|
|
||||||
block.erase(row_level_filter_pos);
|
|
||||||
|
|
||||||
auto columns = block.getColumns();
|
prewhere_column_pos = block.getPositionByName(prewhere_info->column_name);
|
||||||
filterColumns(columns, row_level_filter);
|
|
||||||
if (columns.empty())
|
|
||||||
block = block.cloneEmpty();
|
|
||||||
else
|
|
||||||
block.setColumns(columns);
|
|
||||||
}
|
|
||||||
|
|
||||||
prewhere_info->prewhere_actions->execute(block);
|
|
||||||
|
|
||||||
prewhere_column_pos = block.getPositionByName(prewhere_info->prewhere_column_name);
|
|
||||||
|
|
||||||
result.columns.clear();
|
result.columns.clear();
|
||||||
result.columns.reserve(block.columns());
|
result.columns.reserve(block.columns());
|
||||||
for (auto & col : block)
|
for (auto & col : block)
|
||||||
result.columns.emplace_back(std::move(col.column));
|
result.columns.emplace_back(std::move(col.column));
|
||||||
|
|
||||||
filter.swap(result.columns[prewhere_column_pos]);
|
current_step_filter.swap(result.columns[prewhere_column_pos]);
|
||||||
|
combined_filter = current_step_filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result.getFilter())
|
if (result.getFilter())
|
||||||
{
|
{
|
||||||
/// TODO: implement for prewhere chain.
|
ColumnPtr prev_filter = result.getFilterHolder();
|
||||||
/// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
|
combined_filter = combineFilters(prev_filter, std::move(combined_filter));
|
||||||
throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (filter && row_level_filter)
|
result.setFilter(combined_filter);
|
||||||
{
|
|
||||||
row_level_filter = combineFilters(std::move(row_level_filter), filter);
|
|
||||||
result.setFilter(row_level_filter);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
result.setFilter(filter);
|
|
||||||
|
|
||||||
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
|
/// If there is a WHERE, we filter in there, and only optimize IO and shrink columns here
|
||||||
if (!last_reader_in_chain)
|
if (!last_reader_in_chain)
|
||||||
result.optimize(merge_tree_reader->canReadIncompleteGranules(), prewhere_info->row_level_filter == nullptr);
|
result.optimize(merge_tree_reader->canReadIncompleteGranules(), true);
|
||||||
|
|
||||||
/// If we read nothing or filter gets optimized to nothing
|
/// If we read nothing or filter gets optimized to nothing
|
||||||
if (result.totalRowsPerGranule() == 0)
|
if (result.totalRowsPerGranule() == 0)
|
||||||
result.setFilterConstFalse();
|
result.setFilterConstFalse();
|
||||||
/// If we need to filter in PREWHERE
|
/// If we need to filter in PREWHERE
|
||||||
else if (prewhere_info->need_filter || result.need_filter || prewhere_info->row_level_filter)
|
else if (prewhere_info->need_filter || result.need_filter)
|
||||||
{
|
{
|
||||||
/// If there is a filter and without optimized
|
/// If there is a filter and without optimized
|
||||||
if (result.getFilter() && last_reader_in_chain)
|
if (result.getFilter() && last_reader_in_chain)
|
||||||
@ -1208,10 +1205,7 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
|||||||
/// filter might be shrunk while columns not
|
/// filter might be shrunk while columns not
|
||||||
const auto * result_filter = result.getFilterOriginal();
|
const auto * result_filter = result.getFilterOriginal();
|
||||||
|
|
||||||
if (row_level_filter)
|
filterColumns(result.columns, current_step_filter);
|
||||||
filterColumns(result.columns, filter);
|
|
||||||
else
|
|
||||||
filterColumns(result.columns, result_filter->getData());
|
|
||||||
|
|
||||||
result.need_filter = true;
|
result.need_filter = true;
|
||||||
|
|
||||||
@ -1234,22 +1228,22 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
|||||||
/// Check if the PREWHERE column is needed
|
/// Check if the PREWHERE column is needed
|
||||||
if (!result.columns.empty())
|
if (!result.columns.empty())
|
||||||
{
|
{
|
||||||
if (prewhere_info->remove_prewhere_column)
|
if (prewhere_info->remove_column)
|
||||||
result.columns.erase(result.columns.begin() + prewhere_column_pos);
|
result.columns.erase(result.columns.begin() + prewhere_column_pos);
|
||||||
else
|
else
|
||||||
result.columns[prewhere_column_pos] =
|
result.columns[prewhere_column_pos] =
|
||||||
getSampleBlock().getByName(prewhere_info->prewhere_column_name).type->
|
getSampleBlock().getByName(prewhere_info->column_name).type->
|
||||||
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
|
createColumnConst(result.num_rows, 1u)->convertToFullColumnIfConst();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Filter in WHERE instead
|
/// Filter in WHERE instead
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (prewhere_info->remove_prewhere_column)
|
if (prewhere_info->remove_column)
|
||||||
result.columns.erase(result.columns.begin() + prewhere_column_pos);
|
result.columns.erase(result.columns.begin() + prewhere_column_pos);
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto type = getSampleBlock().getByName(prewhere_info->prewhere_column_name).type;
|
auto type = getSampleBlock().getByName(prewhere_info->column_name).type;
|
||||||
ColumnWithTypeAndName col(result.getFilterHolder()->convertToFullColumnIfConst(), std::make_shared<DataTypeUInt8>(), "");
|
ColumnWithTypeAndName col(result.getFilterHolder()->convertToFullColumnIfConst(), std::make_shared<DataTypeUInt8>(), "");
|
||||||
result.columns[prewhere_column_pos] = castColumn(col, type);
|
result.columns[prewhere_column_pos] = castColumn(col, type);
|
||||||
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
|
result.clearFilter(); // Acting as a flag to not filter in PREWHERE
|
||||||
@ -1257,4 +1251,20 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string PrewhereExprInfo::dump() const
|
||||||
|
{
|
||||||
|
WriteBufferFromOwnString s;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < steps.size(); ++i)
|
||||||
|
{
|
||||||
|
s << "STEP " << i << ":\n"
|
||||||
|
<< " ACTIONS: " << (steps[i].actions ? steps[i].actions->dumpActions() : "nullptr") << "\n"
|
||||||
|
<< " COLUMN: " << steps[i].column_name << "\n"
|
||||||
|
<< " REMOVE_COLUMN: " << steps[i].remove_column << "\n"
|
||||||
|
<< " NEED_FILTER: " << steps[i].need_filter << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.str();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,18 +18,20 @@ using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
|
|||||||
class ExpressionActions;
|
class ExpressionActions;
|
||||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||||
|
|
||||||
|
struct PrewhereExprStep
|
||||||
|
{
|
||||||
|
ExpressionActionsPtr actions;
|
||||||
|
String column_name;
|
||||||
|
bool remove_column = false;
|
||||||
|
bool need_filter = false;
|
||||||
|
};
|
||||||
|
|
||||||
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
|
/// The same as PrewhereInfo, but with ExpressionActions instead of ActionsDAG
|
||||||
struct PrewhereExprInfo
|
struct PrewhereExprInfo
|
||||||
{
|
{
|
||||||
/// Actions for row level security filter. Applied separately before prewhere_actions.
|
std::vector<PrewhereExprStep> steps;
|
||||||
/// This actions are separate because prewhere condition should not be executed over filtered rows.
|
|
||||||
ExpressionActionsPtr row_level_filter;
|
std::string dump() const;
|
||||||
/// Actions which are executed on block in order to get filter column for prewhere step.
|
|
||||||
ExpressionActionsPtr prewhere_actions;
|
|
||||||
String row_level_column_name;
|
|
||||||
String prewhere_column_name;
|
|
||||||
bool remove_prewhere_column = false;
|
|
||||||
bool need_filter = false;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
|
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
|
||||||
@ -41,7 +43,7 @@ public:
|
|||||||
MergeTreeRangeReader(
|
MergeTreeRangeReader(
|
||||||
IMergeTreeReader * merge_tree_reader_,
|
IMergeTreeReader * merge_tree_reader_,
|
||||||
MergeTreeRangeReader * prev_reader_,
|
MergeTreeRangeReader * prev_reader_,
|
||||||
const PrewhereExprInfo * prewhere_info_,
|
const PrewhereExprStep * prewhere_info_,
|
||||||
bool last_reader_in_chain_,
|
bool last_reader_in_chain_,
|
||||||
const Names & non_const_virtual_column_names);
|
const Names & non_const_virtual_column_names);
|
||||||
|
|
||||||
@ -57,6 +59,7 @@ public:
|
|||||||
bool isCurrentRangeFinished() const;
|
bool isCurrentRangeFinished() const;
|
||||||
bool isInitialized() const { return is_initialized; }
|
bool isInitialized() const { return is_initialized; }
|
||||||
|
|
||||||
|
private:
|
||||||
/// Accumulates sequential read() requests to perform a large read instead of multiple small reads
|
/// Accumulates sequential read() requests to perform a large read instead of multiple small reads
|
||||||
class DelayedStream
|
class DelayedStream
|
||||||
{
|
{
|
||||||
@ -144,10 +147,23 @@ public:
|
|||||||
size_t ceilRowsToCompleteGranules(size_t rows_num) const;
|
size_t ceilRowsToCompleteGranules(size_t rows_num) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
/// Statistics after next reading step.
|
/// Statistics after next reading step.
|
||||||
class ReadResult
|
class ReadResult
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
Columns columns;
|
||||||
|
size_t num_rows = 0;
|
||||||
|
|
||||||
|
/// The number of rows were added to block as a result of reading chain.
|
||||||
|
size_t numReadRows() const { return num_read_rows; }
|
||||||
|
/// The number of bytes read from disk.
|
||||||
|
size_t numBytesRead() const { return num_bytes_read; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
/// Only MergeTreeRangeReader is supposed to access ReadResult internals.
|
||||||
|
friend class MergeTreeRangeReader;
|
||||||
|
|
||||||
using NumRows = std::vector<size_t>;
|
using NumRows = std::vector<size_t>;
|
||||||
|
|
||||||
struct RangeInfo
|
struct RangeInfo
|
||||||
@ -161,13 +177,11 @@ public:
|
|||||||
const RangesInfo & startedRanges() const { return started_ranges; }
|
const RangesInfo & startedRanges() const { return started_ranges; }
|
||||||
const NumRows & rowsPerGranule() const { return rows_per_granule; }
|
const NumRows & rowsPerGranule() const { return rows_per_granule; }
|
||||||
|
|
||||||
|
static size_t getLastMark(const MergeTreeRangeReader::ReadResult::RangesInfo & ranges);
|
||||||
|
|
||||||
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
|
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
|
||||||
size_t totalRowsPerGranule() const { return total_rows_per_granule; }
|
size_t totalRowsPerGranule() const { return total_rows_per_granule; }
|
||||||
/// The number of rows were added to block as a result of reading chain.
|
|
||||||
size_t numReadRows() const { return num_read_rows; }
|
|
||||||
size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; }
|
size_t numRowsToSkipInLastGranule() const { return num_rows_to_skip_in_last_granule; }
|
||||||
/// The number of bytes read from disk.
|
|
||||||
size_t numBytesRead() const { return num_bytes_read; }
|
|
||||||
/// Filter you need to apply to newly-read columns in order to add them to block.
|
/// Filter you need to apply to newly-read columns in order to add them to block.
|
||||||
const ColumnUInt8 * getFilterOriginal() const { return filter_original ? filter_original : filter; }
|
const ColumnUInt8 * getFilterOriginal() const { return filter_original ? filter_original : filter; }
|
||||||
const ColumnUInt8 * getFilter() const { return filter; }
|
const ColumnUInt8 * getFilter() const { return filter; }
|
||||||
@ -195,13 +209,12 @@ public:
|
|||||||
|
|
||||||
size_t countBytesInResultFilter(const IColumn::Filter & filter);
|
size_t countBytesInResultFilter(const IColumn::Filter & filter);
|
||||||
|
|
||||||
Columns columns;
|
/// If this flag is false than filtering form PREWHERE can be delayed and done in WHERE
|
||||||
size_t num_rows = 0;
|
/// to reduce memory copies and applying heavy filters multiple times
|
||||||
bool need_filter = false;
|
bool need_filter = false;
|
||||||
|
|
||||||
Block block_before_prewhere;
|
Block block_before_prewhere;
|
||||||
|
|
||||||
private:
|
|
||||||
RangesInfo started_ranges;
|
RangesInfo started_ranges;
|
||||||
/// The number of rows read from each granule.
|
/// The number of rows read from each granule.
|
||||||
/// Granule here is not number of rows between two marks
|
/// Granule here is not number of rows between two marks
|
||||||
@ -234,16 +247,15 @@ public:
|
|||||||
const Block & getSampleBlock() const { return sample_block; }
|
const Block & getSampleBlock() const { return sample_block; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
|
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
|
||||||
Columns continueReadingChain(ReadResult & result, size_t & num_rows);
|
Columns continueReadingChain(const ReadResult & result, size_t & num_rows);
|
||||||
void executePrewhereActionsAndFilterColumns(ReadResult & result);
|
void executePrewhereActionsAndFilterColumns(ReadResult & result);
|
||||||
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
|
void fillPartOffsetColumn(ReadResult & result, UInt64 leading_begin_part_offset, UInt64 leading_end_part_offset);
|
||||||
|
|
||||||
IMergeTreeReader * merge_tree_reader = nullptr;
|
IMergeTreeReader * merge_tree_reader = nullptr;
|
||||||
const MergeTreeIndexGranularity * index_granularity = nullptr;
|
const MergeTreeIndexGranularity * index_granularity = nullptr;
|
||||||
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
|
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
|
||||||
const PrewhereExprInfo * prewhere_info;
|
const PrewhereExprStep * prewhere_info;
|
||||||
|
|
||||||
Stream stream;
|
Stream stream;
|
||||||
|
|
||||||
|
@ -135,13 +135,15 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(size_t min_marks_to_read, size_t
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto curr_task_size_predictor = !per_part_size_predictor[part_idx] ? nullptr
|
const auto & per_part = per_part_params[part_idx];
|
||||||
: std::make_unique<MergeTreeBlockSizePredictor>(*per_part_size_predictor[part_idx]); /// make a copy
|
|
||||||
|
auto curr_task_size_predictor = !per_part.size_predictor ? nullptr
|
||||||
|
: std::make_unique<MergeTreeBlockSizePredictor>(*per_part.size_predictor); /// make a copy
|
||||||
|
|
||||||
return std::make_unique<MergeTreeReadTask>(
|
return std::make_unique<MergeTreeReadTask>(
|
||||||
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
|
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names,
|
||||||
per_part_column_name_set[part_idx], per_part_columns[part_idx], per_part_pre_columns[part_idx],
|
per_part.column_name_set, per_part.task_columns,
|
||||||
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
|
prewhere_info && prewhere_info->remove_prewhere_column, std::move(curr_task_size_predictor));
|
||||||
}
|
}
|
||||||
|
|
||||||
Block MergeTreeReadPool::getHeader() const
|
Block MergeTreeReadPool::getHeader() const
|
||||||
@ -216,15 +218,14 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(const RangesInDataParts &
|
|||||||
auto size_predictor = !predict_block_size_bytes ? nullptr
|
auto size_predictor = !predict_block_size_bytes ? nullptr
|
||||||
: MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, sample_block);
|
: MergeTreeBaseSelectProcessor::getSizePredictor(part.data_part, task_columns, sample_block);
|
||||||
|
|
||||||
per_part_size_predictor.emplace_back(std::move(size_predictor));
|
auto & per_part = per_part_params.emplace_back();
|
||||||
|
|
||||||
|
per_part.size_predictor = std::move(size_predictor);
|
||||||
|
|
||||||
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
|
||||||
const auto & required_column_names = task_columns.columns.getNames();
|
const auto & required_column_names = task_columns.columns.getNames();
|
||||||
per_part_column_name_set.emplace_back(required_column_names.begin(), required_column_names.end());
|
per_part.column_name_set = {required_column_names.begin(), required_column_names.end()};
|
||||||
|
per_part.task_columns = std::move(task_columns);
|
||||||
per_part_pre_columns.push_back(std::move(task_columns.pre_columns));
|
|
||||||
per_part_columns.push_back(std::move(task_columns.columns));
|
|
||||||
per_part_should_reorder.push_back(task_columns.should_reorder);
|
|
||||||
|
|
||||||
parts_with_idx.push_back({ part.data_part, part.part_index_in_query });
|
parts_with_idx.push_back({ part.data_part, part.part_index_in_query });
|
||||||
}
|
}
|
||||||
|
@ -99,11 +99,16 @@ private:
|
|||||||
const Names column_names;
|
const Names column_names;
|
||||||
bool do_not_steal_tasks;
|
bool do_not_steal_tasks;
|
||||||
bool predict_block_size_bytes;
|
bool predict_block_size_bytes;
|
||||||
std::vector<NameSet> per_part_column_name_set;
|
|
||||||
std::vector<NamesAndTypesList> per_part_columns;
|
struct PerPartParams
|
||||||
std::vector<NamesAndTypesList> per_part_pre_columns;
|
{
|
||||||
std::vector<char> per_part_should_reorder;
|
MergeTreeReadTaskColumns task_columns;
|
||||||
std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
|
NameSet column_name_set;
|
||||||
|
MergeTreeBlockSizePredictorPtr size_predictor;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<PerPartParams> per_part_params;
|
||||||
|
|
||||||
PrewhereInfoPtr prewhere_info;
|
PrewhereInfoPtr prewhere_info;
|
||||||
|
|
||||||
struct Part
|
struct Part
|
||||||
|
@ -67,9 +67,12 @@ size_t MergeTreeReaderWide::readRows(
|
|||||||
size_t read_rows = 0;
|
size_t read_rows = 0;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
size_t num_columns = columns.size();
|
size_t num_columns = res_columns.size();
|
||||||
checkNumberOfColumns(num_columns);
|
checkNumberOfColumns(num_columns);
|
||||||
|
|
||||||
|
if (num_columns == 0)
|
||||||
|
return max_rows_to_read;
|
||||||
|
|
||||||
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
|
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
|
||||||
|
|
||||||
std::unordered_set<std::string> prefetched_streams;
|
std::unordered_set<std::string> prefetched_streams;
|
||||||
|
@ -31,8 +31,8 @@ try
|
|||||||
|
|
||||||
task = std::make_unique<MergeTreeReadTask>(
|
task = std::make_unique<MergeTreeReadTask>(
|
||||||
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
|
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,
|
||||||
task_columns.columns, task_columns.pre_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
task_columns, prewhere_info && prewhere_info->remove_prewhere_column,
|
||||||
task_columns.should_reorder, std::move(size_predictor));
|
std::move(size_predictor));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -66,10 +66,16 @@ void MergeTreeSelectProcessor::initializeReaders()
|
|||||||
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
|
reader = data_part->getReader(task_columns.columns, storage_snapshot->getMetadataForQuery(),
|
||||||
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
|
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
|
||||||
|
|
||||||
if (prewhere_info)
|
pre_reader_for_step.clear();
|
||||||
pre_reader = data_part->getReader(task_columns.pre_columns, storage_snapshot->getMetadataForQuery(),
|
|
||||||
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {});
|
|
||||||
|
|
||||||
|
if (prewhere_info)
|
||||||
|
{
|
||||||
|
for (const auto & pre_columns_for_step : task_columns.pre_columns)
|
||||||
|
{
|
||||||
|
pre_reader_for_step.push_back(data_part->getReader(pre_columns_for_step, storage_snapshot->getMetadataForQuery(),
|
||||||
|
all_mark_ranges, owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings, {}, {}));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -80,7 +86,7 @@ void MergeTreeSelectProcessor::finish()
|
|||||||
* buffers don't waste memory.
|
* buffers don't waste memory.
|
||||||
*/
|
*/
|
||||||
reader.reset();
|
reader.reset();
|
||||||
pre_reader.reset();
|
pre_reader_for_step.clear();
|
||||||
data_part.reset();
|
data_part.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,6 +3,10 @@
|
|||||||
#include <Storages/StorageMergeTree.h>
|
#include <Storages/StorageMergeTree.h>
|
||||||
#include <Interpreters/PartLog.h>
|
#include <Interpreters/PartLog.h>
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event DuplicatedInsertedBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -133,8 +137,41 @@ void MergeTreeSink::finishDelayedChunk()
|
|||||||
|
|
||||||
auto & part = partition.temp_part.part;
|
auto & part = partition.temp_part.part;
|
||||||
|
|
||||||
|
bool added = false;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto lock = storage.lockParts();
|
||||||
|
storage.fillNewPartName(part, lock);
|
||||||
|
|
||||||
|
auto * deduplication_log = storage.getDeduplicationLog();
|
||||||
|
if (deduplication_log)
|
||||||
|
{
|
||||||
|
const String block_id = part->getZeroLevelPartBlockID(partition.block_dedup_token);
|
||||||
|
auto res = deduplication_log->addPart(block_id, part->info);
|
||||||
|
if (!res.second)
|
||||||
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||||
|
LOG_INFO(storage.log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MergeTreeData::Transaction transaction(storage, context->getCurrentTransaction().get());
|
||||||
|
added = storage.renameTempPartAndAdd(part, transaction, lock);
|
||||||
|
transaction.commit(&lock);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MergeTreeData::Transaction transaction(storage, context->getCurrentTransaction().get());
|
||||||
|
added = storage.renameTempPartAndAdd(part, transaction, lock);
|
||||||
|
transaction.commit(&lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
|
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
|
||||||
if (storage.renameTempPartAndAdd(part, context->getCurrentTransaction().get(), &storage.increment, nullptr, storage.getDeduplicationLog(), partition.block_dedup_token))
|
if (added)
|
||||||
{
|
{
|
||||||
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
|
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
|
||||||
storage.incrementInsertedPartsProfileEvent(part->getType());
|
storage.incrementInsertedPartsProfileEvent(part->getType());
|
||||||
|
@ -111,14 +111,20 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
|
|||||||
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
|
owned_uncompressed_cache = storage.getContext()->getUncompressedCache();
|
||||||
owned_mark_cache = storage.getContext()->getMarkCache();
|
owned_mark_cache = storage.getContext()->getMarkCache();
|
||||||
|
|
||||||
reader = task->data_part->getReader(task->columns, metadata_snapshot, task->mark_ranges,
|
reader = task->data_part->getReader(task->task_columns.columns, metadata_snapshot, task->mark_ranges,
|
||||||
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
||||||
IMergeTreeReader::ValueSizeMap{}, profile_callback);
|
IMergeTreeReader::ValueSizeMap{}, profile_callback);
|
||||||
|
|
||||||
|
pre_reader_for_step.clear();
|
||||||
if (prewhere_info)
|
if (prewhere_info)
|
||||||
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, task->mark_ranges,
|
{
|
||||||
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
|
||||||
IMergeTreeReader::ValueSizeMap{}, profile_callback);
|
{
|
||||||
|
pre_reader_for_step.push_back(task->data_part->getReader(pre_columns_per_step, metadata_snapshot, task->mark_ranges,
|
||||||
|
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
||||||
|
IMergeTreeReader::ValueSizeMap{}, profile_callback));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -126,14 +132,20 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
|
|||||||
if (part_name != last_readed_part_name)
|
if (part_name != last_readed_part_name)
|
||||||
{
|
{
|
||||||
/// retain avg_value_size_hints
|
/// retain avg_value_size_hints
|
||||||
reader = task->data_part->getReader(task->columns, metadata_snapshot, task->mark_ranges,
|
reader = task->data_part->getReader(task->task_columns.columns, metadata_snapshot, task->mark_ranges,
|
||||||
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
||||||
reader->getAvgValueSizeHints(), profile_callback);
|
reader->getAvgValueSizeHints(), profile_callback);
|
||||||
|
|
||||||
|
pre_reader_for_step.clear();
|
||||||
if (prewhere_info)
|
if (prewhere_info)
|
||||||
pre_reader = task->data_part->getReader(task->pre_columns, metadata_snapshot, task->mark_ranges,
|
{
|
||||||
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
for (const auto & pre_columns_per_step : task->task_columns.pre_columns)
|
||||||
reader->getAvgValueSizeHints(), profile_callback);
|
{
|
||||||
|
pre_reader_for_step.push_back(task->data_part->getReader(pre_columns_per_step, metadata_snapshot, task->mark_ranges,
|
||||||
|
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings,
|
||||||
|
reader->getAvgValueSizeHints(), profile_callback));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -144,7 +156,7 @@ void MergeTreeThreadSelectProcessor::finalizeNewTask()
|
|||||||
void MergeTreeThreadSelectProcessor::finish()
|
void MergeTreeThreadSelectProcessor::finish()
|
||||||
{
|
{
|
||||||
reader.reset();
|
reader.reset();
|
||||||
pre_reader.reset();
|
pre_reader_for_step.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -171,7 +171,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
|
|||||||
{
|
{
|
||||||
new_part = mutate_task->getFuture().get();
|
new_part = mutate_task->getFuture().get();
|
||||||
|
|
||||||
storage.renameTempPartAndReplace(new_part, NO_TRANSACTION_RAW, nullptr, transaction_ptr.get());
|
storage.renameTempPartAndReplace(new_part, *transaction_ptr);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -83,8 +83,12 @@ bool MutatePlainMergeTreeTask::executeStep()
|
|||||||
|
|
||||||
new_part = mutate_task->getFuture().get();
|
new_part = mutate_task->getFuture().get();
|
||||||
|
|
||||||
|
|
||||||
|
MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
|
||||||
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
|
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
|
||||||
storage.renameTempPartAndReplace(new_part, merge_mutate_entry->txn.get());
|
storage.renameTempPartAndReplace(new_part, transaction);
|
||||||
|
transaction.commit();
|
||||||
|
|
||||||
storage.updateMutationEntriesErrors(future_part, true, "");
|
storage.updateMutationEntriesErrors(future_part, true, "");
|
||||||
write_part_log({});
|
write_part_log({});
|
||||||
|
|
||||||
|
@ -479,7 +479,8 @@ void ReplicatedMergeTreeSink::commitPart(
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
renamed = storage.renameTempPartAndAdd(part, NO_TRANSACTION_RAW, nullptr, &transaction, nullptr, "");
|
auto lock = storage.lockParts();
|
||||||
|
renamed = storage.renameTempPartAndAdd(part, transaction, lock);
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
|
@ -1540,7 +1540,14 @@ PartitionCommandsResultInfo StorageMergeTree::attachPartition(
|
|||||||
loaded_parts[i]->storeVersionMetadata();
|
loaded_parts[i]->storeVersionMetadata();
|
||||||
|
|
||||||
String old_name = renamed_parts.old_and_new_names[i].old_name;
|
String old_name = renamed_parts.old_and_new_names[i].old_name;
|
||||||
renameTempPartAndAdd(loaded_parts[i], local_context->getCurrentTransaction().get(), &increment);
|
{
|
||||||
|
auto lock = lockParts();
|
||||||
|
MergeTreeData::Transaction transaction(*this, local_context->getCurrentTransaction().get());
|
||||||
|
fillNewPartName(loaded_parts[i], lock);
|
||||||
|
renameTempPartAndAdd(loaded_parts[i], transaction, lock);
|
||||||
|
transaction.commit(&lock);
|
||||||
|
}
|
||||||
|
|
||||||
renamed_parts.old_and_new_names[i].old_name.clear();
|
renamed_parts.old_and_new_names[i].old_name.clear();
|
||||||
|
|
||||||
results.push_back(PartitionCommandResultInfo{
|
results.push_back(PartitionCommandResultInfo{
|
||||||
@ -1612,10 +1619,15 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
|||||||
|
|
||||||
auto data_parts_lock = lockParts();
|
auto data_parts_lock = lockParts();
|
||||||
|
|
||||||
|
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
|
||||||
|
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
|
||||||
|
*/
|
||||||
|
for (auto part : dst_parts)
|
||||||
|
{
|
||||||
|
fillNewPartName(part, data_parts_lock);
|
||||||
|
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
|
||||||
|
}
|
||||||
/// Populate transaction
|
/// Populate transaction
|
||||||
for (MutableDataPartPtr & part : dst_parts)
|
|
||||||
renameTempPartAndReplace(part, local_context->getCurrentTransaction().get(), &increment, &transaction, data_parts_lock);
|
|
||||||
|
|
||||||
transaction.commit(&data_parts_lock);
|
transaction.commit(&data_parts_lock);
|
||||||
|
|
||||||
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
|
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
|
||||||
@ -1688,14 +1700,15 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
|||||||
auto src_data_parts_lock = lockParts();
|
auto src_data_parts_lock = lockParts();
|
||||||
auto dest_data_parts_lock = dest_table_storage->lockParts();
|
auto dest_data_parts_lock = dest_table_storage->lockParts();
|
||||||
|
|
||||||
std::mutex mutex;
|
for (auto & part : dst_parts)
|
||||||
DataPartsLock lock(mutex);
|
{
|
||||||
|
dest_table_storage->fillNewPartName(part, dest_data_parts_lock);
|
||||||
|
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
|
||||||
|
}
|
||||||
|
|
||||||
for (MutableDataPartPtr & part : dst_parts)
|
|
||||||
dest_table_storage->renameTempPartAndReplace(part, local_context->getCurrentTransaction().get(), &dest_table_storage->increment, &transaction, lock);
|
|
||||||
|
|
||||||
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, lock);
|
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), src_parts, true, src_data_parts_lock);
|
||||||
transaction.commit(&lock);
|
transaction.commit(&src_data_parts_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
clearOldPartsFromFilesystem();
|
clearOldPartsFromFilesystem();
|
||||||
@ -1785,7 +1798,13 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
|
|||||||
void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
|
void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
|
||||||
{
|
{
|
||||||
for (auto part : parts)
|
for (auto part : parts)
|
||||||
renameTempPartAndAdd(part, NO_TRANSACTION_RAW, &increment);
|
{
|
||||||
|
auto lock = lockParts();
|
||||||
|
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
|
||||||
|
fillNewPartName(part, lock);
|
||||||
|
renameTempPartAndAdd(part, transaction, lock);
|
||||||
|
transaction.commit(&lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1810,4 +1829,11 @@ std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
|
|||||||
return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
|
return std::make_unique<MergeTreeSettings>(getContext()->getMergeTreeSettings());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StorageMergeTree::fillNewPartName(MutableDataPartPtr & part, DataPartsLock &)
|
||||||
|
{
|
||||||
|
part->info.min_block = part->info.max_block = increment.get();
|
||||||
|
part->info.mutation = 0;
|
||||||
|
part->name = part->getNewName(part->info);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -251,6 +251,8 @@ private:
|
|||||||
/// return any ids.
|
/// return any ids.
|
||||||
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr) const;
|
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr) const;
|
||||||
|
|
||||||
|
void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock);
|
||||||
|
|
||||||
void startBackgroundMovesIfNeeded() override;
|
void startBackgroundMovesIfNeeded() override;
|
||||||
|
|
||||||
/// Attaches restored parts to the storage.
|
/// Attaches restored parts to the storage.
|
||||||
|
@ -1657,7 +1657,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
|
|||||||
Transaction transaction(*this, NO_TRANSACTION_RAW);
|
Transaction transaction(*this, NO_TRANSACTION_RAW);
|
||||||
|
|
||||||
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
||||||
renameTempPartAndReplace(part, NO_TRANSACTION_RAW, nullptr, &transaction);
|
renameTempPartAndReplace(part, transaction);
|
||||||
checkPartChecksumsAndCommit(transaction, part);
|
checkPartChecksumsAndCommit(transaction, part);
|
||||||
|
|
||||||
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
|
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
|
||||||
@ -2342,7 +2342,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
Coordination::Requests ops;
|
Coordination::Requests ops;
|
||||||
for (PartDescriptionPtr & part_desc : final_parts)
|
for (PartDescriptionPtr & part_desc : final_parts)
|
||||||
{
|
{
|
||||||
renameTempPartAndReplace(part_desc->res_part, NO_TRANSACTION_RAW, nullptr, &transaction);
|
renameTempPartAndReplace(part_desc->res_part, transaction);
|
||||||
getCommitPartOps(ops, part_desc->res_part);
|
getCommitPartOps(ops, part_desc->res_part);
|
||||||
|
|
||||||
lockSharedData(*part_desc->res_part, false, part_desc->hardlinked_files);
|
lockSharedData(*part_desc->res_part, false, part_desc->hardlinked_files);
|
||||||
@ -4081,7 +4081,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
|||||||
if (!to_detached)
|
if (!to_detached)
|
||||||
{
|
{
|
||||||
Transaction transaction(*this, NO_TRANSACTION_RAW);
|
Transaction transaction(*this, NO_TRANSACTION_RAW);
|
||||||
renameTempPartAndReplace(part, NO_TRANSACTION_RAW, nullptr, &transaction);
|
renameTempPartAndReplace(part, transaction);
|
||||||
|
|
||||||
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files);
|
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files);
|
||||||
|
|
||||||
@ -6602,9 +6602,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
|||||||
Transaction transaction(*this, NO_TRANSACTION_RAW);
|
Transaction transaction(*this, NO_TRANSACTION_RAW);
|
||||||
{
|
{
|
||||||
auto data_parts_lock = lockParts();
|
auto data_parts_lock = lockParts();
|
||||||
|
for (auto & part : dst_parts)
|
||||||
for (MutableDataPartPtr & part : dst_parts)
|
renameTempPartAndReplaceUnlocked(part, transaction, data_parts_lock);
|
||||||
renameTempPartAndReplace(part, query_context->getCurrentTransaction().get(), nullptr, &transaction, data_parts_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < dst_parts.size(); ++i)
|
for (size_t i = 0; i < dst_parts.size(); ++i)
|
||||||
@ -6837,11 +6836,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
|||||||
auto src_data_parts_lock = lockParts();
|
auto src_data_parts_lock = lockParts();
|
||||||
auto dest_data_parts_lock = dest_table_storage->lockParts();
|
auto dest_data_parts_lock = dest_table_storage->lockParts();
|
||||||
|
|
||||||
std::mutex mutex;
|
for (auto & part : dst_parts)
|
||||||
DataPartsLock lock(mutex);
|
dest_table_storage->renameTempPartAndReplaceUnlocked(part, transaction, dest_data_parts_lock);
|
||||||
|
|
||||||
for (MutableDataPartPtr & part : dst_parts)
|
|
||||||
dest_table_storage->renameTempPartAndReplace(part, query_context->getCurrentTransaction().get(), nullptr, &transaction, lock);
|
|
||||||
|
|
||||||
for (size_t i = 0; i < dst_parts.size(); ++i)
|
for (size_t i = 0; i < dst_parts.size(); ++i)
|
||||||
dest_table_storage->lockSharedData(*dst_parts[i], false, hardlinked_files_for_parts[i]);
|
dest_table_storage->lockSharedData(*dst_parts[i], false, hardlinked_files_for_parts[i]);
|
||||||
@ -6852,8 +6848,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
|||||||
else
|
else
|
||||||
zkutil::KeeperMultiException::check(code, ops, op_results);
|
zkutil::KeeperMultiException::check(code, ops, op_results);
|
||||||
|
|
||||||
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, lock);
|
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range, src_data_parts_lock);
|
||||||
transaction.commit(&lock);
|
transaction.commit(&src_data_parts_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
|
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
|
||||||
@ -8020,7 +8016,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
|
MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
|
||||||
auto replaced_parts = renameTempPartAndReplace(new_data_part, NO_TRANSACTION_RAW, nullptr, &transaction, nullptr);
|
auto replaced_parts = renameTempPartAndReplace(new_data_part, transaction);
|
||||||
|
|
||||||
if (!replaced_parts.empty())
|
if (!replaced_parts.empty())
|
||||||
{
|
{
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
1000000
|
||||||
|
0
|
||||||
|
0
|
||||||
|
0
|
||||||
|
400000
|
||||||
|
195431
|
||||||
|
195431
|
||||||
|
5923
|
||||||
|
200000
|
||||||
|
200000
|
||||||
|
6061
|
@ -0,0 +1,36 @@
|
|||||||
|
DROP ROW POLICY IF EXISTS test_filter_policy ON test_table;
|
||||||
|
DROP ROW POLICY IF EXISTS test_filter_policy_2 ON test_table;
|
||||||
|
DROP TABLE IF EXISTS test_table;
|
||||||
|
|
||||||
|
CREATE TABLE test_table (`n` UInt64, `s` String)
|
||||||
|
ENGINE = MergeTree
|
||||||
|
PRIMARY KEY n ORDER BY n;
|
||||||
|
|
||||||
|
INSERT INTO test_table SELECT number, concat('some string ', CAST(number, 'String')) FROM numbers(1000000);
|
||||||
|
|
||||||
|
-- Create row policy that doesn't use any column
|
||||||
|
CREATE ROW POLICY test_filter_policy ON test_table USING False TO ALL;
|
||||||
|
|
||||||
|
-- Run query under default user so that always false row_level_filter is added that doesn't require any columns
|
||||||
|
SELECT count(1) FROM test_table;
|
||||||
|
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000;
|
||||||
|
SELECT count(1) FROM test_table WHERE (n % 8192) < 4000;
|
||||||
|
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000 WHERE (n % 33) == 0;
|
||||||
|
|
||||||
|
-- Add policy for default user that will read a column
|
||||||
|
CREATE ROW POLICY test_filter_policy_2 ON test_table USING (n % 5) >= 3 TO default;
|
||||||
|
|
||||||
|
-- Run query under default user that needs the same column as PREWHERE and WHERE
|
||||||
|
SELECT count(1) FROM test_table;
|
||||||
|
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000;
|
||||||
|
SELECT count(1) FROM test_table WHERE (n % 8192) < 4000;
|
||||||
|
SELECT count(1) FROM test_table PREWHERE (n % 8192) < 4000 WHERE (n % 33) == 0;
|
||||||
|
|
||||||
|
-- Run queries that have division by zero if row level filter isn't applied before prewhere
|
||||||
|
SELECT count(1) FROM test_table PREWHERE 7 / (n % 5) > 2;
|
||||||
|
SELECT count(1) FROM test_table WHERE 7 / (n % 5) > 2;
|
||||||
|
SELECT count(1) FROM test_table PREWHERE 7 / (n % 5) > 2 WHERE (n % 33) == 0;
|
||||||
|
|
||||||
|
DROP TABLE test_table;
|
||||||
|
DROP ROW POLICY test_filter_policy ON test_table;
|
||||||
|
DROP ROW POLICY test_filter_policy_2 ON test_table;
|
Loading…
Reference in New Issue
Block a user