Merge pull request #287 from ludv1x/METR-23783

[WIP] Vertical merge optimization, fix and perf tests.
This commit is contained in:
alexey-milovidov 2016-12-22 17:40:01 +04:00 committed by GitHub
commit 690ab98b91
9 changed files with 269 additions and 45 deletions

View File

@ -21,10 +21,10 @@ struct RowSourcePart
setSkipFlag(flag);
}
/// is equal to getSourceNum() if flag is false
size_t getData() const { return data; }
/// Data is equal to getSourceNum() if flag is false
UInt8 getData() const { return data; }
size_t getSourceNum()const { return data & MASK_NUMBER; }
size_t getSourceNum() const { return data & MASK_NUMBER; }
/// In CollapsingMergeTree case flag means "skip this rows"
bool getSkipFlag() const { return (data & MASK_FLAG) != 0; }
@ -58,7 +58,7 @@ class ColumnGathererStream : public IProfilingBlockInputStream
{
public:
ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & row_source_, size_t block_size_ = DEFAULT_MERGE_BLOCK_SIZE);
const MergedRowSources & row_source_, size_t block_preferred_size_ = DEFAULT_MERGE_BLOCK_SIZE);
String getName() const override { return "ColumnGatherer"; }
@ -95,10 +95,12 @@ private:
}
};
void fetchNewBlock(Source & source, size_t source_num);
std::vector<Source> sources;
size_t pos_global_start = 0;
size_t block_size;
size_t block_preferred_size;
Logger * log = &Logger::get("ColumnGathererStream");
};

View File

@ -137,7 +137,7 @@ public:
private:
MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const;
size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc) const;
private:
MergeTreeData & data;

View File

@ -101,6 +101,9 @@ struct MergeTreeSettings
/// Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_rows_to_activate = 16 * DEFAULT_MERGE_BLOCK_SIZE;
/// Minimal amount of non-PK columns to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_columns_to_activate = 11;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
@ -137,6 +140,7 @@ struct MergeTreeSettings
SET_SIZE_T(min_absolute_delay_to_close);
SET_SIZE_T(enable_vertical_merge_algorithm);
SET_SIZE_T(vertical_merge_algorithm_min_rows_to_activate);
SET_SIZE_T(vertical_merge_algorithm_min_columns_to_activate);
#undef SET_SIZE_T
#undef SET_DOUBLE

View File

@ -14,8 +14,8 @@ namespace ErrorCodes
}
ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const String & column_name_,
const MergedRowSources & row_source_, size_t block_size_)
: name(column_name_), row_source(row_source_), block_size(block_size_)
const MergedRowSources & row_source_, size_t block_preferred_size_)
: name(column_name_), row_source(row_source_), block_preferred_size(block_preferred_size_)
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
@ -70,47 +70,45 @@ Block ColumnGathererStream::readImpl()
Block block_res{column.cloneEmpty()};
IColumn & column_res = *block_res.unsafeGetByPosition(0).column;
size_t pos_global_finish = std::min(pos_global_start + block_size, row_source.size());
size_t curr_block_size = pos_global_finish - pos_global_start;
column_res.reserve(curr_block_size);
size_t global_size = row_source.size();
size_t curr_block_preferred_size = std::min(global_size - pos_global_start, block_preferred_size);
column_res.reserve(curr_block_preferred_size);
for (size_t pos_global = pos_global_start; pos_global < pos_global_finish;)
size_t pos_global = pos_global_start;
while (pos_global < global_size && column_res.size() < curr_block_preferred_size)
{
auto source_id = row_source[pos_global].getSourceNum();
bool skip = row_source[pos_global].getSkipFlag();
Source & source = sources[source_id];
auto source_data = row_source[pos_global].getData();
bool source_skip = row_source[pos_global].getSkipFlag();
auto source_num = row_source[pos_global].getSourceNum();
Source & source = sources[source_num];
if (source.pos >= source.size) /// Fetch new block
if (source.pos >= source.size) /// Fetch new block from source_num part
{
try
{
source.block = children[source_id]->read();
source.update(name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_id]->getID() + ", part " + toString(source_id));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_id]->getID() + ", part " + toString(source_id),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
fetchNewBlock(source, source_num);
}
/// Consecutive optimization. TODO: precompute lens
size_t len = 1;
size_t max_len = std::min(pos_global_finish - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && row_source[pos_global].getData() == row_source[pos_global + len].getData(); ++len);
size_t max_len = std::min(global_size - pos_global, source.size - source.pos); // interval should be in the same block
for (; len < max_len && source_data == row_source[pos_global + len].getData(); ++len);
if (!skip)
if (!source_skip)
{
if (column_res.size() == 0 && source.pos == 0 && curr_block_size == len && source.size == len)
/// Whole block could be produced via copying pointer from current block
if (source.pos == 0 && source.size == len)
{
// Whole block could be produced via copying pointer from current block
/// If current block already contains data, return it. We will be here again on next read() iteration.
if (column_res.size() != 0)
break;
block_res.unsafeGetByPosition(0).column = source.block.getByName(name).column;
source.pos += len;
pos_global += len;
break;
}
else if (len == 1)
{
column_res.insertFrom(*source.column, source.pos);
}
else
{
@ -121,13 +119,33 @@ Block ColumnGathererStream::readImpl()
source.pos += len;
pos_global += len;
}
pos_global_start = pos_global_finish;
pos_global_start = pos_global;
return block_res;
}
void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
{
try
{
source.block = children[source_num]->read();
source.update(name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getID() + ", part " + toString(source_num));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_num]->getID() + ", part " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
void ColumnGathererStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();

View File

@ -512,7 +512,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
MergedRowSources merged_rows_sources;
MergedRowSources * merged_rows_sources_ptr = &merged_rows_sources;
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources);
MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, gathering_columns, merged_rows_sources);
LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
@ -679,8 +679,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
column_part_streams[part_num] = std::move(column_part_stream);
}
/// Block size should match with block size of column_part_stream to enable fast gathering via copying of column pointer
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_MERGE_BLOCK_SIZE);
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);
column_to.writePrefix();
@ -736,8 +735,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts,
size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const
const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound,
const NamesAndTypesList & gathering_columns, MergedRowSources & rows_sources_to_alloc) const
{
if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0)
return MergeAlgorithm::Horizontal;
@ -746,7 +745,7 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm(
data.merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
data.merging_params.mode == MergeTreeData::MergingParams::Collapsing;
bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size();
bool enough_ordinary_cols = gathering_columns.size() >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_columns_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data.context.getMergeTreeSettings().vertical_merge_algorithm_min_rows_to_activate;

View File

@ -0,0 +1,8 @@
#!/bin/bash
set -e
i=0
while IFS= read line; do
echo "$i,$line"
i=$((i+1))
done

View File

@ -0,0 +1 @@
Year UInt16, Quarter UInt8, Month UInt8, DayofMonth UInt8, DayOfWeek UInt8, FlightDate Date, UniqueCarrier FixedString(7), AirlineID Int32, Carrier FixedString(2), TailNum String, FlightNum String, OriginAirportID Int32, OriginAirportSeqID Int32, OriginCityMarketID Int32, Origin FixedString(5), OriginCityName String, OriginState FixedString(2), OriginStateFips String, OriginStateName String, OriginWac Int32, DestAirportID Int32, DestAirportSeqID Int32, DestCityMarketID Int32, Dest FixedString(5), DestCityName String, DestState FixedString(2), DestStateFips String, DestStateName String, DestWac Int32, CRSDepTime Int32, DepTime Int32, DepDelay Int32, DepDelayMinutes Int32, DepDel15 Int32, DepartureDelayGroups String, DepTimeBlk String, TaxiOut Int32, WheelsOff Int32, WheelsOn Int32, TaxiIn Int32, CRSArrTime Int32, ArrTime Int32, ArrDelay Int32, ArrDelayMinutes Int32, ArrDel15 Int32, ArrivalDelayGroups Int32, ArrTimeBlk String, Cancelled UInt8, CancellationCode FixedString(1), Diverted UInt8, CRSElapsedTime Int32, ActualElapsedTime Int32, AirTime Int32, Flights Int32, Distance Int32, DistanceGroup UInt8, CarrierDelay Int32, WeatherDelay Int32, NASDelay Int32, SecurityDelay Int32, LateAircraftDelay Int32, FirstDepTime String, TotalAddGTime String, LongestAddGTime String, DivAirportLandings String, DivReachedDest String, DivActualElapsedTime String, DivArrDelay String, DivDistance String, Div1Airport String, Div1AirportID Int32, Div1AirportSeqID Int32, Div1WheelsOn String, Div1TotalGTime String, Div1LongestGTime String, Div1WheelsOff String, Div1TailNum String, Div2Airport String, Div2AirportID Int32, Div2AirportSeqID Int32, Div2WheelsOn String, Div2TotalGTime String, Div2LongestGTime String, Div2WheelsOff String, Div2TailNum String, Div3Airport String, Div3AirportID Int32, Div3AirportSeqID Int32, Div3WheelsOn String, Div3TotalGTime String, Div3LongestGTime String, Div3WheelsOff String, Div3TailNum String, Div4Airport String, Div4AirportID Int32, Div4AirportSeqID Int32, Div4WheelsOn String, Div4TotalGTime String, Div4LongestGTime String, Div4WheelsOff String, Div4TailNum String, Div5Airport String, Div5AirportID Int32, Div5AirportSeqID Int32, Div5WheelsOn String, Div5TotalGTime String, Div5LongestGTime String, Div5WheelsOff String, Div5TailNum String

View File

@ -0,0 +1,183 @@
#!/bin/bash
set -e
SOURCE_RAW=/opt/ontime/ontime.csv
#SOURCE=/opt/ontime/ontime9M_id.csv
#SOURCE=/opt/ontime/ontime9M_id.Native
SOURCE=/opt/ontime/ontime_id.csv
if [[ ! -f $SOURCE ]]; then
echo "Inserting id field from $SOURCE_RAW to $SOURCE ..."
tail -n +2 "$SOURCE_RAW" | ./add_id_to_csv > $SOURCE
fi
STRUCT="id UInt32, "`cat ontime.struct`
COLUMNS=`echo "$STRUCT" | tr " " "\n" | awk 'NR % 2 == 1' | tr "\n" "," | head -c -1`
ACTIVE_COLUMNS=11
ST_OPTIONS=""
#ST_OPTIONS="--max_threads=1 --background_pool_size=1" # increase std. dev. of measurements
db="test"
table_name="ontime"
table="test.ontime"
function read_src_data {
clickhouse-local --file "$SOURCE" --input-format CSV --structure "$STRUCT" -of Native --query "$@" 2>/dev/null
#clickhouse-local --file "$SOURCE" --input-format Native --structure "$STRUCT" -of Native --query "$@" 2>/dev/null
}
function set_vertical_alg {
echo "<yandex><merge_tree><enable_vertical_merge_algorithm>$1</enable_vertical_merge_algorithm></merge_tree></yandex>" | sudo tee /etc/clickhouse-server/conf.d/enable_vertical_merge_algorithm.xml >/dev/null
echo "<yandex><merge_tree><vertical_merge_algorithm_min_rows_to_activate>0</vertical_merge_algorithm_min_rows_to_activate></merge_tree></yandex>" | sudo tee /etc/clickhouse-server/conf.d/vertical_merge_algorithm_min_rows_to_activate.xml >/dev/null
}
function set_and_restart {
sudo service clickhouse-server stop 1>/dev/null
set_vertical_alg $1
sudo service clickhouse-server start 1>/dev/null
./wait_clickhouse_server
}
function get_n_columns {
echo $1 | cut -d ',' -f -$2
}
function parts_stat {
clickhouse-client --query "SELECT count() as parts, round(avg(marks), 2) AS marks_avg, min(marks) AS marks_min, max(marks) AS marks_max FROM system.parts WHERE active AND table='$table_name' AND database='$db' FORMAT TSKV"
}
function parts_count {
clickhouse-client --query "SELECT count() FROM system.parts WHERE active AND table='$table_name' AND database='$db'"
}
function merges_count {
clickhouse-client --query "SELECT count() FROM system.merges WHERE table='$table_name' AND database='$db'"
}
function wait_merges {
while [[ -n $(merges_count) ]]; do sleep 1; done
}
function drop_cache {
sudo sh -c 'echo 3 >/proc/sys/vm/drop_caches'
}
function get_last_merge_info {
cat /var/log/clickhouse-server/clickhouse-server.log | grep "(Merger): Merging" | tail -1 | cut -d " " -f 12-
}
function get_last_merge_time {
cat /var/log/clickhouse-server/clickhouse-server.log | grep "(Merger): Merge sorted" | tail -1 | cut -d " " -f 21
}
function total_merge_time_from_log {
cat /var/log/clickhouse-server/clickhouse-server.log | grep "(Merger): Merge sorted" | cut -d " " -f 21 | clickhouse-local -S "d Float64" --query "SELECT round(sum(d), 3) FROM table" 2>/dev/null
}
function get_max_clickhouse_server_memory {
cat /proc/`cat /var/run/clickhouse-server/clickhouse-server.pid`/status | grep VmPeak | awk '{ print $2/1024 }' #MiB
}
function optimize_rounds {
[[ -z $1 ]] && NUM_ROUNDS=29 || NUM_ROUNDS=$1
drop_cache
echo "OPTIMIZE before: $(parts_stat)"
t_optimize_total=0
for i in $(seq 1 $NUM_ROUNDS); do
local t=`clickhouse-client --time ${ST_OPTIONS} --query "OPTIMIZE TABLE $table" 2>&1`
#echo "$t $(get_last_merge_time) $(get_last_merge_info)"
t_optimize_total=`echo "$t_optimize_total + $t" | bc -l`
done
echo "OPTIMIZE after : $(parts_stat)"
echo "OPTIMIZE time : $t_optimize_total"
}
function run_case {
case_func="case_$1"
sudo service clickhouse-server stop 1>/dev/null
sudo rm -f /var/log/clickhouse-server/clickhouse-server.log
sudo service clickhouse-server start 1>/dev/null
./wait_clickhouse_server
clickhouse-client --query "DROP TABLE IF EXISTS $table"
drop_cache
$case_func 1>&2
wait_merges
t_merges=$(total_merge_time_from_log)
echo "After INSERT: $(parts_stat)"
#echo "Merges time : $t_merges"
#optimize_rounds
#optimize_times="$optimize_times $t_optimize_total"
insert_times="$insert_times $t_insert"
merges_times="$merges_times $t_merges"
clickhouse-client --query "DROP TABLE IF EXISTS $table"
}
function run_cases {
cur_struct=$(get_n_columns "$STRUCT" $ACTIVE_COLUMNS)
cur_columns=$(get_n_columns "$COLUMNS" $ACTIVE_COLUMNS)
t_insert=0
insert_times=""
merges_times=""
optimize_times=""
run_case 1
run_case 2
run_case 3
echo "INSERT times : $insert_times"
echo "Merges times : $merges_times"
#echo "OPTIMIZE times: $optimize_times"
}
function case_1 {
echo "Case #1. Trivial. All parts not intersected by PK."
clickhouse-client --query "CREATE TABLE $table ($cur_struct) ENGINE = MergeTree(FlightDate, (FlightDate), 8192)"
t_insert=`read_src_data "SELECT $cur_columns FROM table" | clickhouse-client --time ${ST_OPTIONS} --query "INSERT INTO $table FORMAT Native" 2>&1`
}
function case_2 {
echo "Case #2. Strong mixture. Each new (merged) row comes from new part."
clickhouse-client --query "CREATE TABLE $table ($cur_struct) ENGINE = MergeTree(FlightDate, (intHash32(id), FlightDate), 8192)"
t_insert=`read_src_data "SELECT $cur_columns FROM table" | clickhouse-client --time ${ST_OPTIONS} --query "INSERT INTO $table FORMAT Native" 2>&1`
}
function case_3 {
echo "Case #3. Chunked mixture. Merged row with dozens of its neighbors come from the same part."
clickhouse-client --query "CREATE TABLE $table ($cur_struct) ENGINE = MergeTree(FlightDate, (bitAnd(id, 15), intHash32(bitShiftRight(id, 4))), 8192)"
t_insert=`read_src_data "SELECT $cur_columns FROM table" | clickhouse-client --time ${ST_OPTIONS} --query "INSERT INTO $table FORMAT Native" 2>&1`
}
[[ $(whoami) -ne "root" ]] && echo "Run script as root"
echo "### Vertical ###"
set_and_restart 1
run_cases
vertical_optimize_times="$optimize_times"
vertical_merges_times="$merges_times"
echo
echo "### Horizontal ###"
set_and_restart 0
run_cases
horizontal_optimize_times="$optimize_times"
horizontal_merges_times="$merges_times"
echo
echo "#V" "Merges:" ${vertical_merges_times} #"Optimitze:" ${vertical_optimize_times}
echo "#H" "Merges:" ${horizontal_merges_times} #"Optimitze:" ${horizontal_optimize_times}

View File

@ -0,0 +1,9 @@
#!/bin/bash
for i in $(seq 1 100); do
clickhouse-client --query "SELECT * FROM system.one" 1>/dev/null 2>/dev/null;
[[ $? -eq 0 ]] && break
[[ $(pgrep -l clickhouse-s | wc -l) -lt 1 ]] && echo "Error" && exit -1
sleep 1
done