Merge branch 'master' into revert-40217

This commit is contained in:
Alexey Milovidov 2022-10-21 05:49:34 +02:00 committed by GitHub
commit 48694def2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 30 additions and 83 deletions

View File

@ -49,27 +49,13 @@ When we calculate some function over columns in a block, we add another column w
Blocks are created for every processed chunk of data. Note that for the same type of calculation, the column names and types remain the same for different blocks, and only column data changes. It is better to split block data from the block header because small block sizes have a high overhead of temporary strings for copying shared_ptrs and column names.
## Block Streams {#block-streams}
## Processors
Block streams are for processing data. We use streams of blocks to read data from somewhere, perform data transformations, or write data to somewhere. `IBlockInputStream` has the `read` method to fetch the next block while available. `IBlockOutputStream` has the `write` method to push the block somewhere.
Streams are responsible for:
1. Reading or writing to a table. The table just returns a stream for reading or writing blocks.
2. Implementing data formats. For example, if you want to output data to a terminal in `Pretty` format, you create a block output stream where you push blocks, and it formats them.
3. Performing data transformations. Lets say you have `IBlockInputStream` and want to create a filtered stream. You create `FilterBlockInputStream` and initialize it with your stream. Then when you pull a block from `FilterBlockInputStream`, it pulls a block from your stream, filters it, and returns the filtered block to you. Query execution pipelines are represented this way.
There are more sophisticated transformations. For example, when you pull from `AggregatingBlockInputStream`, it reads all data from its source, aggregates it, and then returns a stream of aggregated data for you. Another example: `UnionBlockInputStream` accepts many input sources in the constructor and also a number of threads. It launches multiple threads and reads from multiple sources in parallel.
> Block streams use the “pull” approach to control flow: when you pull a block from the first stream, it consequently pulls the required blocks from nested streams, and the entire execution pipeline will work. Neither “pull” nor “push” is the best solution, because control flow is implicit, and that limits the implementation of various features like simultaneous execution of multiple queries (merging many pipelines together). This limitation could be overcome with coroutines or just running extra threads that wait for each other. We may have more possibilities if we make control flow explicit: if we locate the logic for passing data from one calculation unit to another outside of those calculation units. Read this [article](http://journal.stuffwithstuff.com/2013/01/13/iteration-inside-and-out/) for more thoughts.
We should note that the query execution pipeline creates temporary data at each step. We try to keep block size small enough so that temporary data fits in the CPU cache. With that assumption, writing and reading temporary data is almost free in comparison with other calculations. We could consider an alternative, which is to fuse many operations in the pipeline together. It could make the pipeline as short as possible and remove much of the temporary data, which could be an advantage, but it also has drawbacks. For example, a split pipeline makes it easy to implement caching intermediate data, stealing intermediate data from similar queries running at the same time, and merging pipelines for similar queries.
See the description at [https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.h](https://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.h).
## Formats {#formats}
Data formats are implemented with block streams. There are “presentational” formats only suitable for the output of data to the client, such as `Pretty` format, which provides only `IBlockOutputStream`. And there are input/output formats, such as `TabSeparated` or `JSONEachRow`.
There are also row streams: `IRowInputStream` and `IRowOutputStream`. They allow you to pull/push data by individual rows, not by blocks. And they are only needed to simplify the implementation of row-oriented formats. The wrappers `BlockInputStreamFromRowInputStream` and `BlockOutputStreamFromRowOutputStream` allow you to convert row-oriented streams to regular block-oriented streams.
Data formats are implemented with processors.
## I/O {#io}

View File

@ -19,7 +19,6 @@
{host}
{port}
{user}
{database}
{display_name}
Terminal colors: https://misc.flogisoft.com/bash/tip_colors_and_formatting
See also: https://wiki.hackzine.org/development/misc/readline-color-prompt.html

View File

@ -53,9 +53,12 @@ String IAggregateFunction::getDescription() const
bool IAggregateFunction::haveEqualArgumentTypes(const IAggregateFunction & rhs) const
{
return std::equal(argument_types.begin(), argument_types.end(),
rhs.argument_types.begin(), rhs.argument_types.end(),
[](const auto & t1, const auto & t2) { return t1->equals(*t2); });
return std::equal(
argument_types.begin(),
argument_types.end(),
rhs.argument_types.begin(),
rhs.argument_types.end(),
[](const auto & t1, const auto & t2) { return t1->equals(*t2); });
}
bool IAggregateFunction::haveSameStateRepresentation(const IAggregateFunction & rhs) const
@ -67,11 +70,7 @@ bool IAggregateFunction::haveSameStateRepresentation(const IAggregateFunction &
bool IAggregateFunction::haveSameStateRepresentationImpl(const IAggregateFunction & rhs) const
{
bool res = getName() == rhs.getName()
&& parameters == rhs.parameters
&& haveEqualArgumentTypes(rhs);
assert(res == (getStateType()->getName() == rhs.getStateType()->getName()));
return res;
return getStateType()->equals(*rhs.getStateType());
}
}

View File

@ -1,7 +1,6 @@
#include <Client/ClientBase.h>
#include <iostream>
#include <iomanip>
#include <filesystem>
#include <map>
#include <unordered_map>
@ -9,7 +8,6 @@
#include "config.h"
#include <Common/DateLUT.h>
#include <Common/LocalDate.h>
#include <Common/MemoryTracker.h>
#include <base/argsToConfig.h>
#include <base/LineReader.h>
@ -32,7 +30,6 @@
#include <Common/clearPasswordFromCommandLine.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/filesystemHelpers.h>
#include <Common/Config/configReadClient.h>
#include <Common/NetException.h>
#include <Storages/ColumnsDescription.h>
@ -70,10 +67,10 @@
#include <IO/WriteBufferFromOStream.h>
#include <IO/CompressionMethod.h>
#include <Client/InternalTextLogs.h>
#include <boost/algorithm/string/replace.hpp>
#include <IO/ForkWriteBuffer.h>
#include <Parsers/Kusto/ParserKQLStatement.h>
namespace fs = std::filesystem;
using namespace std::literals;
@ -1925,7 +1922,7 @@ bool ClientBase::processQueryText(const String & text)
String ClientBase::prompt() const
{
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
return prompt_by_server_display_name;
}

View File

@ -178,11 +178,7 @@ public:
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]() mutable /// mutable is needed to destroy capture
{
SCOPE_EXIT(
{
state->finished = true;
state->event.set();
});
SCOPE_EXIT(state->event.set());
state->thread_id = std::this_thread::get_id();
@ -217,17 +213,6 @@ public:
~ThreadFromGlobalPoolImpl()
{
/// The problem is that the our ThreadFromGlobalPool can be actually finished
/// before we try to join the thread or check whether it is joinable or not.
/// In some places we have code like:
/// if (thread->joinable())
/// thread->join();
/// Where join() won't be executed in case when we call it
/// from the same std::thread and it will end to std::abort().
/// So we just do nothing in this case
if (state->finished)
return;
if (initialized())
abort();
}
@ -267,9 +252,6 @@ protected:
/// The state used in this object and inside the thread job.
Poco::Event event;
/// To allow joining to the same std::thread after finishing
std::atomic<bool> finished{false};
};
std::shared_ptr<State> state;

View File

@ -331,8 +331,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \
M(Float, remerge_sort_lowered_memory_bytes_ratio, 2., "If memory usage after remerge does not reduced by this ratio, remerge will be disabled.", 0) \
\
M(UInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.", 0) \
M(UInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.", 0) \
M(UInt64, max_result_rows, 0, "Limit on result size in rows. The query will stop after processing a block of data if the threshold is met, but it will not cut the last block of the result, therefore the result size can be larger than the threshold.", 0) \
M(UInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). The query will stop after processing a block of data if the threshold is met, but it will not cut the last block of the result, therefore the result size can be larger than the threshold. Caveats: the result size in memory is taken into account for this threshold. Even if the result size is small, it can reference larger data structures in memory, representing dictionaries of LowCardinality columns, and Arenas of AggregateFunction columns, so the threshold can be exceeded despite the small result size. The setting is fairly low level and should be used with caution.", 0) \
M(OverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
/* TODO: Check also when merging and finalizing aggregate functions. */ \

View File

@ -1,39 +1,12 @@
#!/usr/bin/env bash
# Tags: no-fasttest
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function stress()
{
# We set up a signal handler to make sure to wait for all queries to be finished before ending
CONTINUE=true
handle_interruption()
{
CONTINUE=false
}
trap handle_interruption INT
while $CONTINUE; do
${CLICKHOUSE_CLIENT} --query "CREATE TABLE IF NOT EXISTS table (x UInt8) ENGINE = MergeTree ORDER BY tuple()" 2>/dev/null
${CLICKHOUSE_CLIENT} --query "DROP TABLE table" 2>/dev/null
done
trap - INT
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f stress
for _ in {1..5}; do
# Ten seconds are just barely enough to reproduce the issue in most of runs.
timeout -s INT 10 bash -c stress &
done
yes 'CREATE TABLE IF NOT EXISTS table (x UInt8) ENGINE = MergeTree ORDER BY tuple();' | head -n 1000 | $CLICKHOUSE_CLIENT --ignore-error -nm 2>/dev/null &
yes 'DROP TABLE table;' | head -n 1000 | $CLICKHOUSE_CLIENT --ignore-error -nm 2>/dev/null &
wait
echo
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table";
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table"

View File

@ -0,0 +1 @@
1027000000000000000000000000000000000000000000000000000000000000

View File

@ -0,0 +1 @@
SELECT hex(CAST(x, 'AggregateFunction(sum, Decimal(50, 10))')) FROM (SELECT arrayReduce('sumState', [toDecimal256('0.0000010.000001', 10)]) AS x) GROUP BY x;

View File

@ -0,0 +1 @@
SELECT count(*) FROM numbers(10) AS a, numbers(11) AS b, numbers(12) AS c;

View File

@ -0,0 +1,4 @@
1
1
[(3,3)]
1

View File

@ -0,0 +1,4 @@
select [(toUInt8(3), toUInt8(3))] = [(toInt16(3), toInt16(3))];
select hasAny([(toInt16(3), toInt16(3))],[(toInt16(3), toInt16(3))]);
select arrayFilter(x -> x = (toInt16(3), toInt16(3)), arrayZip([toUInt8(3)], [toUInt8(3)]));
select hasAny([(toUInt8(3), toUInt8(3))],[(toInt16(3), toInt16(3))]);