Merge branch 'master' into add-compression-sorts-optimization

This commit is contained in:
Igor Markelov 2024-05-30 13:24:42 +00:00
commit 3eb2dcec38
84 changed files with 838 additions and 292 deletions

View File

@ -57,6 +57,18 @@ SELECT toTypeName(from), hex(from) FROM hits LIMIT 1;
└──────────────────┴───────────┘
```
IPv4 addresses can be directly compared to IPv6 addresses:
```sql
SELECT toIPv4('127.0.0.1') = toIPv6('::ffff:127.0.0.1');
```
```text
┌─equals(toIPv4('127.0.0.1'), toIPv6('::ffff:127.0.0.1'))─┐
│ 1 │
└─────────────────────────────────────────────────────────┘
```
**See Also**
- [Functions for Working with IPv4 and IPv6 Addresses](../functions/ip-address-functions.md)

View File

@ -57,6 +57,19 @@ SELECT toTypeName(from), hex(from) FROM hits LIMIT 1;
└──────────────────┴──────────────────────────────────┘
```
IPv6 addresses can be directly compared to IPv4 addresses:
```sql
SELECT toIPv4('127.0.0.1') = toIPv6('::ffff:127.0.0.1');
```
```text
┌─equals(toIPv4('127.0.0.1'), toIPv6('::ffff:127.0.0.1'))─┐
│ 1 │
└─────────────────────────────────────────────────────────┘
```
**See Also**
- [Functions for Working with IPv4 and IPv6 Addresses](../functions/ip-address-functions.md)

View File

@ -0,0 +1,55 @@
# loop
**Syntax**
``` sql
SELECT ... FROM loop(database, table);
SELECT ... FROM loop(database.table);
SELECT ... FROM loop(table);
SELECT ... FROM loop(other_table_function(...));
```
**Parameters**
- `database` — database name.
- `table` — table name.
- `other_table_function(...)` — other table function.
Example: `SELECT * FROM loop(numbers(10));`
`other_table_function(...)` here is `numbers(10)`.
**Returned Value**
Infinite loop to return query results.
**Examples**
Selecting data from ClickHouse:
``` sql
SELECT * FROM loop(test_database, test_table);
SELECT * FROM loop(test_database.test_table);
SELECT * FROM loop(test_table);
```
Or using other table function:
``` sql
SELECT * FROM loop(numbers(3)) LIMIT 7;
┌─number─┐
1. │ 0 │
2. │ 1 │
3. │ 2 │
└────────┘
┌─number─┐
4. │ 0 │
5. │ 1 │
6. │ 2 │
└────────┘
┌─number─┐
7. │ 0 │
└────────┘
```
``` sql
SELECT * FROM loop(mysql('localhost:3306', 'test', 'test', 'user', 'password'));
...
```

View File

@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int KEEPER_EXCEPTION;
}
@ -441,7 +442,7 @@ void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient
new_members = query->args[1].safeGet<String>();
break;
default:
UNREACHABLE();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected operation: {}", operation);
}
auto response = client->zookeeper->reconfig(joining, leaving, new_members);

View File

@ -155,8 +155,8 @@ auto instructionFailToString(InstructionFail fail)
ret("AVX2");
case InstructionFail::AVX512:
ret("AVX512");
#undef ret
}
UNREACHABLE();
}

View File

@ -144,8 +144,7 @@ AccessEntityPtr deserializeAccessEntity(const String & definition, const String
catch (Exception & e)
{
e.addMessage("Could not parse " + file_path);
e.rethrow();
UNREACHABLE();
throw;
}
}

View File

@ -258,7 +258,7 @@ namespace
case TABLE_LEVEL: return AccessFlags::allFlagsGrantableOnTableLevel();
case COLUMN_LEVEL: return AccessFlags::allFlagsGrantableOnColumnLevel();
}
UNREACHABLE();
chassert(false);
}
}

View File

@ -257,8 +257,7 @@ std::vector<UUID> IAccessStorage::insert(const std::vector<AccessEntityPtr> & mu
}
e.addMessage("After successfully inserting {}/{}: {}", successfully_inserted.size(), multiple_entities.size(), successfully_inserted_str);
}
e.rethrow();
UNREACHABLE();
throw;
}
}
@ -361,8 +360,7 @@ std::vector<UUID> IAccessStorage::remove(const std::vector<UUID> & ids, bool thr
}
e.addMessage("After successfully removing {}/{}: {}", removed_names.size(), ids.size(), removed_names_str);
}
e.rethrow();
UNREACHABLE();
throw;
}
}
@ -458,8 +456,7 @@ std::vector<UUID> IAccessStorage::update(const std::vector<UUID> & ids, const Up
}
e.addMessage("After successfully updating {}/{}: {}", names_of_updated.size(), ids.size(), names_of_updated_str);
}
e.rethrow();
UNREACHABLE();
throw;
}
}

View File

@ -60,14 +60,13 @@ struct GroupArrayTrait
template <typename Trait>
constexpr const char * getNameByTrait()
{
if (Trait::last)
if constexpr (Trait::last)
return "groupArrayLast";
if (Trait::sampler == Sampler::NONE)
return "groupArray";
else if (Trait::sampler == Sampler::RNG)
return "groupArraySample";
UNREACHABLE();
switch (Trait::sampler)
{
case Sampler::NONE: return "groupArray";
case Sampler::RNG: return "groupArraySample";
}
}
template <typename T>

View File

@ -414,7 +414,6 @@ public:
break;
return (i == events_size) ? base - i : unmatched_idx;
}
UNREACHABLE();
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override

View File

@ -463,7 +463,6 @@ public:
return "sumWithOverflow";
else if constexpr (Type == AggregateFunctionTypeSumKahan)
return "sumKahan";
UNREACHABLE();
}
explicit AggregateFunctionSum(const DataTypes & argument_types_)

View File

@ -127,6 +127,9 @@
M(DestroyAggregatesThreads, "Number of threads in the thread pool for destroy aggregate states.") \
M(DestroyAggregatesThreadsActive, "Number of threads in the thread pool for destroy aggregate states running a task.") \
M(DestroyAggregatesThreadsScheduled, "Number of queued or active jobs in the thread pool for destroy aggregate states.") \
M(ConcurrentHashJoinPoolThreads, "Number of threads in the thread pool for concurrent hash join.") \
M(ConcurrentHashJoinPoolThreadsActive, "Number of threads in the thread pool for concurrent hash join running a task.") \
M(ConcurrentHashJoinPoolThreadsScheduled, "Number of queued or active jobs in the thread pool for concurrent hash join.") \
M(HashedDictionaryThreads, "Number of threads in the HashedDictionary thread pool.") \
M(HashedDictionaryThreadsActive, "Number of threads in the HashedDictionary thread pool running a task.") \
M(HashedDictionaryThreadsScheduled, "Number of queued or active jobs in the HashedDictionary thread pool.") \

View File

@ -41,7 +41,6 @@ UInt8 getDayOfWeek(const cctz::civil_day & date)
case cctz::weekday::saturday: return 6;
case cctz::weekday::sunday: return 7;
}
UNREACHABLE();
}
inline cctz::time_point<cctz::seconds> lookupTz(const cctz::time_zone & cctz_time_zone, const cctz::civil_day & date)

View File

@ -34,8 +34,6 @@ Int64 IntervalKind::toAvgNanoseconds() const
default:
return toAvgSeconds() * NANOSECONDS_PER_SECOND;
}
UNREACHABLE();
}
Int32 IntervalKind::toAvgSeconds() const
@ -54,7 +52,6 @@ Int32 IntervalKind::toAvgSeconds() const
case IntervalKind::Kind::Quarter: return 7889238; /// Exactly 1/4 of a year.
case IntervalKind::Kind::Year: return 31556952; /// The average length of a Gregorian year is equal to 365.2425 days
}
UNREACHABLE();
}
Float64 IntervalKind::toSeconds() const
@ -80,7 +77,6 @@ Float64 IntervalKind::toSeconds() const
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not possible to get precise number of seconds in non-precise interval");
}
UNREACHABLE();
}
bool IntervalKind::isFixedLength() const
@ -99,7 +95,6 @@ bool IntervalKind::isFixedLength() const
case IntervalKind::Kind::Quarter:
case IntervalKind::Kind::Year: return false;
}
UNREACHABLE();
}
IntervalKind IntervalKind::fromAvgSeconds(Int64 num_seconds)
@ -141,7 +136,6 @@ const char * IntervalKind::toKeyword() const
case IntervalKind::Kind::Quarter: return "QUARTER";
case IntervalKind::Kind::Year: return "YEAR";
}
UNREACHABLE();
}
@ -161,7 +155,6 @@ const char * IntervalKind::toLowercasedKeyword() const
case IntervalKind::Kind::Quarter: return "quarter";
case IntervalKind::Kind::Year: return "year";
}
UNREACHABLE();
}
@ -192,7 +185,6 @@ const char * IntervalKind::toDateDiffUnit() const
case IntervalKind::Kind::Year:
return "year";
}
UNREACHABLE();
}
@ -223,7 +215,6 @@ const char * IntervalKind::toNameOfFunctionToIntervalDataType() const
case IntervalKind::Kind::Year:
return "toIntervalYear";
}
UNREACHABLE();
}
@ -257,7 +248,6 @@ const char * IntervalKind::toNameOfFunctionExtractTimePart() const
case IntervalKind::Kind::Year:
return "toYear";
}
UNREACHABLE();
}

View File

@ -54,8 +54,6 @@ String toString(TargetArch arch)
case TargetArch::AMXTILE: return "amxtile";
case TargetArch::AMXINT8: return "amxint8";
}
UNREACHABLE();
}
}

View File

@ -75,7 +75,6 @@ const char * TasksStatsCounters::metricsProviderString(MetricsProvider provider)
case MetricsProvider::Netlink:
return "netlink";
}
UNREACHABLE();
}
bool TasksStatsCounters::checkIfAvailable()

View File

@ -146,8 +146,6 @@ const char * errorMessage(Error code)
case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
case Error::ZNOTREADONLY: return "State-changing request is passed to read-only server";
}
UNREACHABLE();
}
bool isHardwareError(Error zk_return_code)

View File

@ -466,7 +466,6 @@ void CompressionCodecDeflateQpl::doDecompressData(const char * source, UInt32 so
sw_codec->doDecompressData(source, source_size, dest, uncompressed_size);
return;
}
UNREACHABLE();
}
void CompressionCodecDeflateQpl::flushAsynchronousDecompressRequests()

View File

@ -21,6 +21,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/** NOTE DoubleDelta is surprisingly bad name. The only excuse is that it comes from an academic paper.
* Most people will think that "double delta" is just applying delta transform twice.
* But in fact it is something more than applying delta transform twice.
@ -142,9 +147,9 @@ namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER;
extern const int LOGICAL_ERROR;
}
namespace
@ -163,9 +168,8 @@ inline Int64 getMaxValueForByteSize(Int8 byte_size)
case sizeof(UInt64):
return std::numeric_limits<Int64>::max();
default:
assert(false && "only 1, 2, 4 and 8 data sizes are supported");
throw Exception(ErrorCodes::LOGICAL_ERROR, "only 1, 2, 4 and 8 data sizes are supported");
}
UNREACHABLE();
}
struct WriteSpec

View File

@ -5,6 +5,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ClusterUpdateActions joiningToClusterUpdates(const ClusterConfigPtr & cfg, std::string_view joining)
{
ClusterUpdateActions out;
@ -79,7 +85,7 @@ String serializeClusterConfig(const ClusterConfigPtr & cfg, const ClusterUpdateA
new_config.emplace_back(RaftServerConfig{*cfg->get_server(priority->id)});
}
else
UNREACHABLE();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected update");
}
for (const auto & item : cfg->get_servers())

View File

@ -990,7 +990,7 @@ KeeperServer::ConfigUpdateState KeeperServer::applyConfigUpdate(
raft_instance->set_priority(update->id, update->priority, /*broadcast on live leader*/true);
return Accepted;
}
UNREACHABLE();
std::unreachable();
}
ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config)

View File

@ -667,8 +667,6 @@ public:
case Types::AggregateFunctionState: return f(field.template get<AggregateFunctionStateData>());
case Types::CustomType: return f(field.template get<CustomType>());
}
UNREACHABLE();
}
String dump() const;

View File

@ -36,7 +36,6 @@ String ISerialization::kindToString(Kind kind)
case Kind::SPARSE:
return "Sparse";
}
UNREACHABLE();
}
ISerialization::Kind ISerialization::stringToKind(const String & str)

View File

@ -17,7 +17,6 @@ std::string toString(MetadataStorageTransactionState state)
case MetadataStorageTransactionState::PARTIALLY_ROLLED_BACK:
return "PARTIALLY_ROLLED_BACK";
}
UNREACHABLE();
}
}

View File

@ -112,7 +112,6 @@ DiskPtr VolumeJBOD::getDisk(size_t /* index */) const
return disks_by_size.top().disk;
}
}
UNREACHABLE();
}
ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
@ -164,7 +163,6 @@ ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
return reservation;
}
}
UNREACHABLE();
}
bool VolumeJBOD::areMergesAvoided() const

View File

@ -62,7 +62,6 @@ String escapingRuleToString(FormatSettings::EscapingRule escaping_rule)
case FormatSettings::EscapingRule::Raw:
return "Raw";
}
UNREACHABLE();
}
void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule escaping_rule, const FormatSettings & format_settings)

View File

@ -1176,8 +1176,7 @@ public:
/// You can compare the date, datetime, or datatime64 and an enumeration with a constant string.
|| ((left.isDate() || left.isDate32() || left.isDateTime() || left.isDateTime64()) && (right.isDate() || right.isDate32() || right.isDateTime() || right.isDateTime64()) && left.idx == right.idx) /// only date vs date, or datetime vs datetime
|| (left.isUUID() && right.isUUID())
|| (left.isIPv4() && right.isIPv4())
|| (left.isIPv6() && right.isIPv6())
|| ((left.isIPv4() || left.isIPv6()) && (right.isIPv4() || right.isIPv6()))
|| (left.isEnum() && right.isEnum() && arguments[0]->getName() == arguments[1]->getName()) /// only equivalent enum type values can be compared against
|| (left_tuple && right_tuple && left_tuple->getElements().size() == right_tuple->getElements().size())
|| (arguments[0]->equals(*arguments[1]))))
@ -1266,6 +1265,8 @@ public:
const bool left_is_float = which_left.isFloat();
const bool right_is_float = which_right.isFloat();
const bool left_is_ipv4 = which_left.isIPv4();
const bool right_is_ipv4 = which_right.isIPv4();
const bool left_is_ipv6 = which_left.isIPv6();
const bool right_is_ipv6 = which_right.isIPv6();
const bool left_is_fixed_string = which_left.isFixedString();
@ -1323,10 +1324,13 @@ public:
{
return res;
}
else if (((left_is_ipv6 && right_is_fixed_string) || (right_is_ipv6 && left_is_fixed_string)) && fixed_string_size == IPV6_BINARY_LENGTH)
else if (
(((left_is_ipv6 && right_is_fixed_string) || (right_is_ipv6 && left_is_fixed_string)) && fixed_string_size == IPV6_BINARY_LENGTH)
|| ((left_is_ipv4 || left_is_ipv6) && (right_is_ipv4 || right_is_ipv6))
)
{
/// Special treatment for FixedString(16) as a binary representation of IPv6 -
/// CAST is customized for this case
/// Special treatment for FixedString(16) as a binary representation of IPv6 & for comparing IPv4 & IPv6 values -
/// CAST is customized for this cases
ColumnPtr left_column = left_is_ipv6 ?
col_with_type_and_name_left.column : castColumn(col_with_type_and_name_left, right_type);
ColumnPtr right_column = right_is_ipv6 ?

View File

@ -149,8 +149,6 @@ struct IntegerRoundingComputation
return x;
}
}
UNREACHABLE();
}
static ALWAYS_INLINE T compute(T x, T scale)
@ -163,8 +161,6 @@ struct IntegerRoundingComputation
case ScaleMode::Negative:
return computeImpl(x, scale);
}
UNREACHABLE();
}
static ALWAYS_INLINE void compute(const T * __restrict in, size_t scale, T * __restrict out) requires std::integral<T>
@ -247,8 +243,6 @@ inline float roundWithMode(float x, RoundingMode mode)
case RoundingMode::Ceil: return ceilf(x);
case RoundingMode::Trunc: return truncf(x);
}
UNREACHABLE();
}
inline double roundWithMode(double x, RoundingMode mode)
@ -260,8 +254,6 @@ inline double roundWithMode(double x, RoundingMode mode)
case RoundingMode::Ceil: return ceil(x);
case RoundingMode::Trunc: return trunc(x);
}
UNREACHABLE();
}
template <typename T>

View File

@ -232,7 +232,6 @@ struct TimeWindowImpl<TUMBLE>
default:
throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
}
UNREACHABLE();
}
template <typename ToType, IntervalKind::Kind unit>
@ -422,7 +421,6 @@ struct TimeWindowImpl<HOP>
default:
throw Exception(ErrorCodes::SYNTAX_ERROR, "Fraction seconds are unsupported by windows yet");
}
UNREACHABLE();
}
template <typename ToType, IntervalKind::Kind kind>

View File

@ -381,8 +381,6 @@ bool PointInPolygonWithGrid<CoordinateType>::contains(CoordinateType x, Coordina
case CellType::complexPolygon:
return boost::geometry::within(Point(x, y), polygons[cell.index_of_inner_polygon]);
}
UNREACHABLE();
}

View File

@ -35,7 +35,6 @@ namespace
case UserDefinedSQLObjectType::Function:
return "function_";
}
UNREACHABLE();
}
constexpr std::string_view sql_extension = ".sql";

View File

@ -52,7 +52,6 @@ std::string toContentEncodingName(CompressionMethod method)
case CompressionMethod::None:
return "";
}
UNREACHABLE();
}
CompressionMethod chooseHTTPCompressionMethod(const std::string & list)

View File

@ -88,7 +88,6 @@ public:
case Status::TOO_LARGE_COMPRESSED_BLOCK:
return "TOO_LARGE_COMPRESSED_BLOCK";
}
UNREACHABLE();
}
explicit HadoopSnappyReadBuffer(

View File

@ -117,8 +117,6 @@ size_t AggregatedDataVariants::size() const
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
size_t AggregatedDataVariants::sizeWithoutOverflowRow() const
@ -136,8 +134,6 @@ size_t AggregatedDataVariants::sizeWithoutOverflowRow() const
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
const char * AggregatedDataVariants::getMethodName() const
@ -155,8 +151,6 @@ const char * AggregatedDataVariants::getMethodName() const
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
bool AggregatedDataVariants::isTwoLevel() const
@ -174,8 +168,6 @@ bool AggregatedDataVariants::isTwoLevel() const
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
bool AggregatedDataVariants::isConvertibleToTwoLevel() const

View File

@ -799,7 +799,6 @@ String FileSegment::stateToString(FileSegment::State state)
case FileSegment::State::DETACHED:
return "DETACHED";
}
UNREACHABLE();
}
bool FileSegment::assertCorrectness() const

View File

@ -309,7 +309,6 @@ ComparisonGraphCompareResult ComparisonGraph<Node>::pathToCompareResult(Path pat
case Path::GREATER: return inverse ? ComparisonGraphCompareResult::LESS : ComparisonGraphCompareResult::GREATER;
case Path::GREATER_OR_EQUAL: return inverse ? ComparisonGraphCompareResult::LESS_OR_EQUAL : ComparisonGraphCompareResult::GREATER_OR_EQUAL;
}
UNREACHABLE();
}
template <ComparisonGraphNodeType Node>

View File

@ -1,10 +1,9 @@
#include <memory>
#include <mutex>
#include <Columns/ColumnSparse.h>
#include <Columns/FilterDescription.h>
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/ConcurrentHashJoin.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
@ -15,10 +14,20 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/parseQuery.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/WeakHash.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace CurrentMetrics
{
extern const Metric ConcurrentHashJoinPoolThreads;
extern const Metric ConcurrentHashJoinPoolThreadsActive;
extern const Metric ConcurrentHashJoinPoolThreadsScheduled;
}
namespace DB
{
@ -36,20 +45,82 @@ static UInt32 toPowerOfTwo(UInt32 x)
return static_cast<UInt32>(1) << (32 - std::countl_zero(x - 1));
}
ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_)
ConcurrentHashJoin::ConcurrentHashJoin(
ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_)
: context(context_)
, table_join(table_join_)
, slots(toPowerOfTwo(std::min<UInt32>(static_cast<UInt32>(slots_), 256)))
, pool(std::make_unique<ThreadPool>(
CurrentMetrics::ConcurrentHashJoinPoolThreads,
CurrentMetrics::ConcurrentHashJoinPoolThreadsActive,
CurrentMetrics::ConcurrentHashJoinPoolThreadsScheduled,
slots))
{
for (size_t i = 0; i < slots; ++i)
{
auto inner_hash_join = std::make_shared<InternalHashJoin>();
hash_joins.resize(slots);
inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", i));
/// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
/// TODO: It's not handled properly in ConcurrentHashJoin case, so we set it to 0 to disable this feature.
inner_hash_join->data->setMaxJoinedBlockRows(0);
hash_joins.emplace_back(std::move(inner_hash_join));
try
{
for (size_t i = 0; i < slots; ++i)
{
pool->scheduleOrThrow(
[&, idx = i, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE({
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
});
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("ConcurrentJoin");
auto inner_hash_join = std::make_shared<InternalHashJoin>();
inner_hash_join->data = std::make_unique<HashJoin>(
table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", idx));
/// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
/// TODO: It's not handled properly in ConcurrentHashJoin case, so we set it to 0 to disable this feature.
inner_hash_join->data->setMaxJoinedBlockRows(0);
hash_joins[idx] = std::move(inner_hash_join);
});
}
pool->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
pool->wait();
throw;
}
}
ConcurrentHashJoin::~ConcurrentHashJoin()
{
try
{
for (size_t i = 0; i < slots; ++i)
{
// Hash tables destruction may be very time-consuming.
// Without the following code, they would be destroyed in the current thread (i.e. sequentially).
// `InternalHashJoin` is moved here and will be destroyed in the destructor of the lambda function.
pool->scheduleOrThrow(
[join = std::move(hash_joins[i]), thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE({
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
});
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("ConcurrentJoin");
});
}
pool->wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
pool->wait();
}
}

View File

@ -10,6 +10,7 @@
#include <base/defines.h>
#include <base/types.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool_fwd.h>
namespace DB
{
@ -39,7 +40,7 @@ public:
const Block & right_sample_block,
bool any_take_last_row_ = false);
~ConcurrentHashJoin() override = default;
~ConcurrentHashJoin() override;
std::string getName() const override { return "ConcurrentHashJoin"; }
const TableJoin & getTableJoin() const override { return *table_join; }
@ -66,6 +67,7 @@ private:
ContextPtr context;
std::shared_ptr<TableJoin> table_join;
size_t slots;
std::unique_ptr<ThreadPool> pool;
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
std::mutex totals_mutex;

View File

@ -705,7 +705,6 @@ namespace
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
}
@ -2641,8 +2640,6 @@ private:
default:
throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", parent.data->type);
}
UNREACHABLE();
}
template <typename Map>

View File

@ -322,8 +322,6 @@ public:
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
size_t getTotalByteCountImpl(Type which) const
@ -338,8 +336,6 @@ public:
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
size_t getBufferSizeInCells(Type which) const
@ -354,8 +350,6 @@ public:
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
/// NOLINTEND(bugprone-macro-parentheses)
};

View File

@ -33,7 +33,6 @@ BlockIO InterpreterTransactionControlQuery::execute()
case ASTTransactionControl::SET_SNAPSHOT:
return executeSetSnapshot(session_context, tcl.snapshot);
}
UNREACHABLE();
}
BlockIO InterpreterTransactionControlQuery::executeBegin(ContextMutablePtr session_context)

View File

@ -41,8 +41,6 @@ size_t SetVariantsTemplate<Variant>::getTotalRowCount() const
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
template <typename Variant>
@ -57,8 +55,6 @@ size_t SetVariantsTemplate<Variant>::getTotalByteCount() const
APPLY_FOR_SET_VARIANTS(M)
#undef M
}
UNREACHABLE();
}
template <typename Variant>

View File

@ -40,8 +40,6 @@ public:
case TableOverride: return "EXPLAIN TABLE OVERRIDE";
case CurrentTransaction: return "EXPLAIN CURRENT TRANSACTION";
}
UNREACHABLE();
}
static ExplainKind fromString(const String & str)

View File

@ -42,7 +42,7 @@ Token quotedString(const char *& pos, const char * const token_begin, const char
continue;
}
UNREACHABLE();
chassert(false);
}
}
@ -538,8 +538,6 @@ const char * getTokenName(TokenType type)
APPLY_FOR_TOKENS(M)
#undef M
}
UNREACHABLE();
}

View File

@ -51,7 +51,7 @@ FilterAnalysisResult analyzeFilter(const QueryTreeNodePtr & filter_expression_no
return result;
}
bool isDeterministicConstant(const ConstantNode & root)
bool canRemoveConstantFromGroupByKey(const ConstantNode & root)
{
const auto & source_expression = root.getSourceExpression();
if (!source_expression)
@ -64,15 +64,20 @@ bool isDeterministicConstant(const ConstantNode & root)
const auto * node = nodes.top();
nodes.pop();
if (node->getNodeType() == QueryTreeNodeType::QUERY)
/// Allow removing constants from scalar subqueries. We send them to all the shards.
continue;
const auto * constant_node = node->as<ConstantNode>();
const auto * function_node = node->as<FunctionNode>();
if (constant_node)
{
if (!isDeterministicConstant(*constant_node))
if (!canRemoveConstantFromGroupByKey(*constant_node))
return false;
}
else if (function_node)
{
/// Do not allow removing constants like `hostName()`
if (!function_node->getFunctionOrThrow()->isDeterministic())
return false;
@ -122,7 +127,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
bool is_secondary_query = planner_context->getQueryContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_distributed_query = planner_context->getQueryContext()->isDistributed();
bool check_deterministic_constants = is_secondary_query || is_distributed_query;
bool check_constants_for_group_by_key = is_secondary_query || is_distributed_query;
if (query_node.hasGroupBy())
{
@ -139,7 +144,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
const auto * constant_key = grouping_set_key_node->as<ConstantNode>();
group_by_with_constant_keys |= (constant_key != nullptr);
if (constant_key && !aggregates_descriptions.empty() && (!check_deterministic_constants || isDeterministicConstant(*constant_key)))
if (constant_key && !aggregates_descriptions.empty() && (!check_constants_for_group_by_key || canRemoveConstantFromGroupByKey(*constant_key)))
continue;
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, grouping_set_key_node);
@ -191,7 +196,7 @@ std::optional<AggregationAnalysisResult> analyzeAggregation(const QueryTreeNodeP
const auto * constant_key = group_by_key_node->as<ConstantNode>();
group_by_with_constant_keys |= (constant_key != nullptr);
if (constant_key && !aggregates_descriptions.empty() && (!check_deterministic_constants || isDeterministicConstant(*constant_key)))
if (constant_key && !aggregates_descriptions.empty() && (!check_constants_for_group_by_key || canRemoveConstantFromGroupByKey(*constant_key)))
continue;
auto expression_dag_nodes = actions_visitor.visit(before_aggregation_actions, group_by_key_node);

View File

@ -657,7 +657,6 @@ DataTypePtr MsgPackSchemaReader::getDataType(const msgpack::object & object)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Msgpack extension type {:x} is not supported", object_ext.type());
}
}
UNREACHABLE();
}
std::optional<DataTypes> MsgPackSchemaReader::readRowAndGetDataTypes()

View File

@ -36,8 +36,6 @@ std::string IProcessor::statusToName(Status status)
case Status::ExpandPipeline:
return "ExpandPipeline";
}
UNREACHABLE();
}
}

View File

@ -0,0 +1,156 @@
#include <Processors/QueryPlan/ReadFromLoopStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/IStorage.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>
#include <Processors/ISource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_RETRIES_TO_FETCH_PARTS;
}
class PullingPipelineExecutor;
class LoopSource : public ISource
{
public:
LoopSource(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr & context_,
QueryProcessingStage::Enum processed_stage_,
StoragePtr inner_storage_,
size_t max_block_size_,
size_t num_streams_)
: ISource(storage_snapshot_->getSampleBlockForColumns(column_names_))
, column_names(column_names_)
, query_info(query_info_)
, storage_snapshot(storage_snapshot_)
, processed_stage(processed_stage_)
, context(context_)
, inner_storage(std::move(inner_storage_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
String getName() const override { return "Loop"; }
Chunk generate() override
{
while (true)
{
if (!loop)
{
QueryPlan plan;
auto storage_snapshot_ = inner_storage->getStorageSnapshotForQuery(inner_storage->getInMemoryMetadataPtr(), nullptr, context);
inner_storage->read(
plan,
column_names,
storage_snapshot_,
query_info,
context,
processed_stage,
max_block_size,
num_streams);
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
QueryPlanResourceHolder resources;
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
query_pipeline = QueryPipeline(std::move(pipe));
executor = std::make_unique<PullingPipelineExecutor>(query_pipeline);
loop = true;
}
Chunk chunk;
if (executor->pull(chunk))
{
if (chunk)
{
retries_count = 0;
return chunk;
}
}
else
{
++retries_count;
if (retries_count > max_retries_count)
throw Exception(ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS, "Too many retries to pull from storage");
loop = false;
executor.reset();
query_pipeline.reset();
}
}
}
private:
const Names column_names;
SelectQueryInfo query_info;
const StorageSnapshotPtr storage_snapshot;
QueryProcessingStage::Enum processed_stage;
ContextPtr context;
StoragePtr inner_storage;
size_t max_block_size;
size_t num_streams;
// add retries. If inner_storage failed to pull X times in a row we'd better to fail here not to hang
size_t retries_count = 0;
size_t max_retries_count = 3;
bool loop = false;
QueryPipeline query_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
};
ReadFromLoopStep::ReadFromLoopStep(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
QueryProcessingStage::Enum processed_stage_,
StoragePtr inner_storage_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(
DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)},
column_names_,
query_info_,
storage_snapshot_,
context_)
, column_names(column_names_)
, processed_stage(processed_stage_)
, inner_storage(std::move(inner_storage_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
Pipe ReadFromLoopStep::makePipe()
{
return Pipe(std::make_shared<LoopSource>(
column_names, query_info, storage_snapshot, context, processed_stage, inner_storage, max_block_size, num_streams));
}
void ReadFromLoopStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto pipe = makePipe();
if (pipe.empty())
{
assert(output_stream != std::nullopt);
pipe = Pipe(std::make_shared<NullSource>(output_stream->header));
}
pipeline.init(std::move(pipe));
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
class ReadFromLoopStep final : public SourceStepWithFilter
{
public:
ReadFromLoopStep(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
QueryProcessingStage::Enum processed_stage_,
StoragePtr inner_storage_,
size_t max_block_size_,
size_t num_streams_);
String getName() const override { return "ReadFromLoop"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
Pipe makePipe();
const Names column_names;
QueryProcessingStage::Enum processed_stage;
StoragePtr inner_storage;
size_t max_block_size;
size_t num_streams;
};
}

View File

@ -1136,8 +1136,6 @@ static void addMergingFinal(
return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
sort_description, max_block_size_rows, /*max_block_size_bytes=*/0, merging_params.graphite_params, now);
}
UNREACHABLE();
};
pipe.addTransform(get_merging_processor());
@ -2125,8 +2123,6 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
case ReadFromMergeTree::IndexType::Skip:
return "Skip";
}
UNREACHABLE();
}
static const char * readTypeToString(ReadFromMergeTree::ReadType type)
@ -2142,8 +2138,6 @@ static const char * readTypeToString(ReadFromMergeTree::ReadType type)
case ReadFromMergeTree::ReadType::ParallelReplicas:
return "Parallel";
}
UNREACHABLE();
}
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const

View File

@ -86,8 +86,6 @@ static String totalsModeToString(TotalsMode totals_mode, double auto_include_thr
case TotalsMode::AFTER_HAVING_AUTO:
return "after_having_auto threshold " + std::to_string(auto_include_threshold);
}
UNREACHABLE();
}
void TotalsHavingStep::describeActions(FormatSettings & settings) const

View File

@ -67,7 +67,6 @@ static FillColumnDescription::StepFunction getStepFunction(
FOR_EACH_INTERVAL_KIND(DECLARE_CASE)
#undef DECLARE_CASE
}
UNREACHABLE();
}
static bool tryConvertFields(FillColumnDescription & descr, const DataTypePtr & type)

View File

@ -898,8 +898,6 @@ static std::exception_ptr addStorageToException(std::exception_ptr ptr, const St
{
return std::current_exception();
}
UNREACHABLE();
}
void FinalizingViewsTransform::work()

View File

@ -93,7 +93,6 @@ String BackgroundJobsAssignee::toString(Type type)
case Type::Moving:
return "Moving";
}
UNREACHABLE();
}
void BackgroundJobsAssignee::start()

View File

@ -2964,8 +2964,6 @@ String KeyCondition::RPNElement::toString(std::string_view column_name, bool pri
case ALWAYS_TRUE:
return "true";
}
UNREACHABLE();
}

View File

@ -1177,8 +1177,6 @@ String MergeTreeData::MergingParams::getModeName() const
case Graphite: return "Graphite";
case VersionedCollapsing: return "VersionedCollapsing";
}
UNREACHABLE();
}
Int64 MergeTreeData::getMaxBlockNumber() const

View File

@ -361,8 +361,6 @@ Block MergeTreeDataWriter::mergeBlock(
return std::make_shared<GraphiteRollupSortedAlgorithm>(
block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0, merging_params.graphite_params, time(nullptr));
}
UNREACHABLE();
};
auto merging_algorithm = get_merging_algorithm();

View File

@ -616,8 +616,6 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
}
}
}
UNREACHABLE();
}
void PartMovesBetweenShardsOrchestrator::removePins(const Entry & entry, zkutil::ZooKeeperPtr zk)

View File

@ -0,0 +1,49 @@
#include "StorageLoop.h"
#include <Storages/StorageFactory.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromLoopStep.h>
namespace DB
{
namespace ErrorCodes
{
}
StorageLoop::StorageLoop(
const StorageID & table_id_,
StoragePtr inner_storage_)
: IStorage(table_id_)
, inner_storage(std::move(inner_storage_))
{
StorageInMemoryMetadata storage_metadata = inner_storage->getInMemoryMetadata();
setInMemoryMetadata(storage_metadata);
}
void StorageLoop::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
query_info.optimize_trivial_count = false;
query_plan.addStep(std::make_unique<ReadFromLoopStep>(
column_names, query_info, storage_snapshot, context, processed_stage, inner_storage, max_block_size, num_streams
));
}
void registerStorageLoop(StorageFactory & factory)
{
factory.registerStorage("Loop", [](const StorageFactory::Arguments & args)
{
StoragePtr inner_storage;
return std::make_shared<StorageLoop>(args.table_id, inner_storage);
});
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include "config.h"
#include <Storages/IStorage.h>
namespace DB
{
class StorageLoop final : public IStorage
{
public:
StorageLoop(
const StorageID & table_id,
StoragePtr inner_storage_);
std::string getName() const override { return "Loop"; }
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return false; }
private:
StoragePtr inner_storage;
};
}

View File

@ -297,7 +297,6 @@ namespace
CASE_WINDOW_KIND(Year)
#undef CASE_WINDOW_KIND
}
UNREACHABLE();
}
class AddingAggregatedChunkInfoTransform : public ISimpleTransform
@ -920,7 +919,6 @@ UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec)
CASE_WINDOW_KIND(Year)
#undef CASE_WINDOW_KIND
}
UNREACHABLE();
}
UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
@ -948,7 +946,6 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
CASE_WINDOW_KIND(Year)
#undef CASE_WINDOW_KIND
}
UNREACHABLE();
}
void StorageWindowView::addFireSignal(std::set<UInt32> & signals)

View File

@ -25,6 +25,7 @@ void registerStorageLiveView(StorageFactory & factory);
void registerStorageGenerateRandom(StorageFactory & factory);
void registerStorageExecutable(StorageFactory & factory);
void registerStorageWindowView(StorageFactory & factory);
void registerStorageLoop(StorageFactory & factory);
#if USE_RAPIDJSON || USE_SIMDJSON
void registerStorageFuzzJSON(StorageFactory & factory);
#endif
@ -120,6 +121,7 @@ void registerStorages()
registerStorageGenerateRandom(factory);
registerStorageExecutable(factory);
registerStorageWindowView(factory);
registerStorageLoop(factory);
#if USE_RAPIDJSON || USE_SIMDJSON
registerStorageFuzzJSON(factory);
#endif

View File

@ -0,0 +1,156 @@
#include "config.h"
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/Exception.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageLoop.h>
#include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int UNKNOWN_TABLE;
}
namespace
{
class TableFunctionLoop : public ITableFunction
{
public:
static constexpr auto name = "loop";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Loop"; }
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
// save the inner table function AST
ASTPtr inner_table_function_ast;
// save database and table
std::string loop_database_name;
std::string loop_table_name;
};
}
void TableFunctionLoop::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
const auto & args_func = ast_function->as<ASTFunction &>();
if (!args_func.arguments)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function 'loop' must have arguments.");
auto & args = args_func.arguments->children;
if (args.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "No arguments provided for table function 'loop'");
if (args.size() == 1)
{
if (const auto * id = args[0]->as<ASTIdentifier>())
{
String id_name = id->name();
size_t dot_pos = id_name.find('.');
if (id_name.find('.', dot_pos + 1) != String::npos)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "There are more than one dot");
if (dot_pos != String::npos)
{
loop_database_name = id_name.substr(0, dot_pos);
loop_table_name = id_name.substr(dot_pos + 1);
}
else
{
loop_table_name = id_name;
}
}
else if (const auto * func = args[0]->as<ASTFunction>())
{
inner_table_function_ast = args[0];
}
else
{
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Expected identifier or function for argument 1 of function 'loop', got {}", args[0]->getID());
}
}
// loop(database, table)
else if (args.size() == 2)
{
args[0] = evaluateConstantExpressionForDatabaseName(args[0], context);
args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(args[1], context);
loop_database_name = checkAndGetLiteralArgument<String>(args[0], "database");
loop_table_name = checkAndGetLiteralArgument<String>(args[1], "table");
}
else
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function 'loop' must have 1 or 2 arguments.");
}
}
ColumnsDescription TableFunctionLoop::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const
{
return ColumnsDescription();
}
StoragePtr TableFunctionLoop::executeImpl(
const ASTPtr & /*ast_function*/,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns,
bool is_insert_query) const
{
StoragePtr storage;
if (!loop_table_name.empty())
{
String database_name = loop_database_name;
if (database_name.empty())
database_name = context->getCurrentDatabase();
auto database = DatabaseCatalog::instance().getDatabase(database_name);
storage = database->tryGetTable(loop_table_name, context);
if (!storage)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table '{}' not found in database '{}'", loop_table_name, database_name);
}
else
{
auto inner_table_function = TableFunctionFactory::instance().get(inner_table_function_ast, context);
storage = inner_table_function->execute(
inner_table_function_ast,
context,
table_name,
std::move(cached_columns),
is_insert_query);
}
auto res = std::make_shared<StorageLoop>(
StorageID(getDatabaseName(), table_name),
storage
);
res->startup();
return res;
}
void registerTableFunctionLoop(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionLoop>(
{.documentation
= {.description=R"(The table function can be used to continuously output query results in an infinite loop.)",
.examples{{"loop", "SELECT * FROM loop((numbers(3)) LIMIT 7", "0"
"1"
"2"
"0"
"1"
"2"
"0"}}
}});
}
}

View File

@ -11,6 +11,7 @@ void registerTableFunctions()
registerTableFunctionMerge(factory);
registerTableFunctionRemote(factory);
registerTableFunctionNumbers(factory);
registerTableFunctionLoop(factory);
registerTableFunctionGenerateSeries(factory);
registerTableFunctionNull(factory);
registerTableFunctionZeros(factory);

View File

@ -8,6 +8,7 @@ class TableFunctionFactory;
void registerTableFunctionMerge(TableFunctionFactory & factory);
void registerTableFunctionRemote(TableFunctionFactory & factory);
void registerTableFunctionNumbers(TableFunctionFactory & factory);
void registerTableFunctionLoop(TableFunctionFactory & factory);
void registerTableFunctionGenerateSeries(TableFunctionFactory & factory);
void registerTableFunctionNull(TableFunctionFactory & factory);
void registerTableFunctionZeros(TableFunctionFactory & factory);

View File

@ -109,12 +109,12 @@ def main():
test_script = jobs_scripts[test_job]
if report_file.exists():
report_file.unlink()
extra_timeout_option = ""
if test_job == JobNames.STATELESS_TEST_RELEASE:
extra_timeout_option = str(3600)
# "bugfix" must be present in checkname, as integration test runner checks this
check_name = f"Validate bugfix: {test_job}"
command = f"python3 {test_script} '{check_name}' {extra_timeout_option} --validate-bugfix --report-to-file {report_file}"
command = (
f"python3 {test_script} '{check_name}' "
f"--validate-bugfix --report-to-file {report_file}"
)
print(f"Going to validate job [{test_job}], command [{command}]")
_ = subprocess.run(
command,

View File

@ -18,6 +18,7 @@ import docker_images_helper
import upload_result_helper
from build_check import get_release_or_pr
from ci_config import CI_CONFIG, Build, CILabels, CIStages, JobNames, StatusNames
from ci_metadata import CiMetadata
from ci_utils import GHActions, is_hex, normalize_string
from clickhouse_helper import (
CiLogsCredentials,
@ -39,22 +40,23 @@ from digest_helper import DockerDigester, JobDigester
from env_helper import (
CI,
GITHUB_JOB_API_URL,
GITHUB_REPOSITORY,
GITHUB_RUN_ID,
GITHUB_RUN_URL,
REPO_COPY,
REPORT_PATH,
S3_BUILDS_BUCKET,
TEMP_PATH,
GITHUB_RUN_ID,
GITHUB_REPOSITORY,
)
from get_robot_token import get_best_robot_token
from git_helper import GIT_PREFIX, Git
from git_helper import Runner as GitRunner
from github_helper import GitHub
from pr_info import PRInfo
from report import ERROR, SUCCESS, BuildResult, JobReport, PENDING
from report import ERROR, FAILURE, PENDING, SUCCESS, BuildResult, JobReport, TestResult
from s3_helper import S3Helper
from ci_metadata import CiMetadata
from stopwatch import Stopwatch
from tee_popen import TeePopen
from version_helper import get_version_from_repo
# pylint: disable=too-many-lines
@ -1867,8 +1869,8 @@ def _run_test(job_name: str, run_command: str) -> int:
run_command or CI_CONFIG.get_job_config(job_name).run_command
), "Run command must be provided as input argument or be configured in job config"
if CI_CONFIG.get_job_config(job_name).timeout:
os.environ["KILL_TIMEOUT"] = str(CI_CONFIG.get_job_config(job_name).timeout)
env = os.environ.copy()
timeout = CI_CONFIG.get_job_config(job_name).timeout or None
if not run_command:
run_command = "/".join(
@ -1879,26 +1881,27 @@ def _run_test(job_name: str, run_command: str) -> int:
print("Use run command from a job config")
else:
print("Use run command from the workflow")
os.environ["CHECK_NAME"] = job_name
env["CHECK_NAME"] = job_name
print(f"Going to start run command [{run_command}]")
process = subprocess.run(
run_command,
stdout=sys.stdout,
stderr=sys.stderr,
text=True,
check=False,
shell=True,
)
stopwatch = Stopwatch()
job_log = Path(TEMP_PATH) / "job_log.txt"
with TeePopen(run_command, job_log, env, timeout) as process:
retcode = process.wait()
if retcode != 0:
print(f"Run action failed for: [{job_name}] with exit code [{retcode}]")
if timeout and process.timeout_exceeded:
print(f"Timeout {timeout} exceeded, dumping the job report")
JobReport(
status=FAILURE,
description=f"Timeout {timeout} exceeded",
test_results=[TestResult.create_check_timeout_expired(timeout)],
start_time=stopwatch.start_time_str,
duration=stopwatch.duration_seconds,
additional_files=[job_log],
).dump()
if process.returncode == 0:
print(f"Run action done for: [{job_name}]")
exit_code = 0
else:
print(
f"Run action failed for: [{job_name}] with exit code [{process.returncode}]"
)
exit_code = process.returncode
return exit_code
print(f"Run action done for: [{job_name}]")
return retcode
def _get_ext_check_name(check_name: str) -> str:

View File

@ -175,8 +175,8 @@ class JobNames(metaclass=WithIter):
COMPATIBILITY_TEST = "Compatibility check (amd64)"
COMPATIBILITY_TEST_ARM = "Compatibility check (aarch64)"
CLCIKBENCH_TEST = "ClickBench (amd64)"
CLCIKBENCH_TEST_ARM = "ClickBench (aarch64)"
CLICKBENCH_TEST = "ClickBench (amd64)"
CLICKBENCH_TEST_ARM = "ClickBench (aarch64)"
LIBFUZZER_TEST = "libFuzzer tests"
@ -472,17 +472,18 @@ compatibility_test_common_params = {
}
stateless_test_common_params = {
"digest": stateless_check_digest,
"run_command": 'functional_test_check.py "$CHECK_NAME" $KILL_TIMEOUT',
"run_command": 'functional_test_check.py "$CHECK_NAME"',
"timeout": 10800,
}
stateful_test_common_params = {
"digest": stateful_check_digest,
"run_command": 'functional_test_check.py "$CHECK_NAME" $KILL_TIMEOUT',
"run_command": 'functional_test_check.py "$CHECK_NAME"',
"timeout": 3600,
}
stress_test_common_params = {
"digest": stress_check_digest,
"run_command": "stress_check.py",
"timeout": 9000,
}
upgrade_test_common_params = {
"digest": upgrade_check_digest,
@ -531,6 +532,7 @@ clickbench_test_params = {
docker=["clickhouse/clickbench"],
),
"run_command": 'clickbench.py "$CHECK_NAME"',
"timeout": 900,
}
install_test_params = JobConfig(
digest=install_check_digest,
@ -1111,6 +1113,7 @@ CI_CONFIG = CIConfig(
exclude_files=[".md"],
docker=["clickhouse/fasttest"],
),
timeout=2400,
),
),
JobNames.STYLE_CHECK: TestConfig(
@ -1123,7 +1126,9 @@ CI_CONFIG = CIConfig(
"",
# we run this check by label - no digest required
job_config=JobConfig(
run_by_label="pr-bugfix", run_command="bugfix_validate_check.py"
run_by_label="pr-bugfix",
run_command="bugfix_validate_check.py",
timeout=900,
),
),
},
@ -1357,10 +1362,10 @@ CI_CONFIG = CIConfig(
Build.PACKAGE_RELEASE, job_config=sqllogic_test_params
),
JobNames.SQLTEST: TestConfig(Build.PACKAGE_RELEASE, job_config=sql_test_params),
JobNames.CLCIKBENCH_TEST: TestConfig(
JobNames.CLICKBENCH_TEST: TestConfig(
Build.PACKAGE_RELEASE, job_config=JobConfig(**clickbench_test_params) # type: ignore
),
JobNames.CLCIKBENCH_TEST_ARM: TestConfig(
JobNames.CLICKBENCH_TEST_ARM: TestConfig(
Build.PACKAGE_AARCH64, job_config=JobConfig(**clickbench_test_params) # type: ignore
),
JobNames.LIBFUZZER_TEST: TestConfig(
@ -1368,7 +1373,7 @@ CI_CONFIG = CIConfig(
job_config=JobConfig(
run_by_label=CILabels.libFuzzer,
timeout=10800,
run_command='libfuzzer_test_check.py "$CHECK_NAME" 10800',
run_command='libfuzzer_test_check.py "$CHECK_NAME"',
),
), # type: ignore
},

View File

@ -1,8 +1,7 @@
from contextlib import contextmanager
import os
import signal
from typing import Any, List, Union, Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, List, Union
class WithIter(type):
@ -49,14 +48,3 @@ class GHActions:
for line in lines:
print(line)
print("::endgroup::")
def set_job_timeout():
def timeout_handler(_signum, _frame):
print("Timeout expired")
raise TimeoutError("Job's KILL_TIMEOUT expired")
kill_timeout = int(os.getenv("KILL_TIMEOUT", "0"))
assert kill_timeout > 0, "kill timeout must be provided in KILL_TIMEOUT env"
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(kill_timeout)

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
import argparse
import csv
import logging
import os
@ -11,15 +10,7 @@ from typing import Tuple
from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPO_COPY, S3_BUILDS_BUCKET, TEMP_PATH
from pr_info import PRInfo
from report import (
ERROR,
FAILURE,
SUCCESS,
JobReport,
TestResult,
TestResults,
read_test_results,
)
from report import ERROR, FAILURE, SUCCESS, JobReport, TestResults, read_test_results
from stopwatch import Stopwatch
from tee_popen import TeePopen
@ -80,30 +71,9 @@ def process_results(result_directory: Path) -> Tuple[str, str, TestResults]:
return state, description, test_results
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="FastTest script",
)
parser.add_argument(
"--timeout",
type=int,
# Fast tests in most cases done within 10 min and 40 min timout should be sufficient,
# though due to cold cache build time can be much longer
# https://pastila.nl/?146195b6/9bb99293535e3817a9ea82c3f0f7538d.link#5xtClOjkaPLEjSuZ92L2/g==
default=40,
help="Timeout in minutes",
)
args = parser.parse_args()
args.timeout = args.timeout * 60
return args
def main():
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
args = parse_args()
temp_path = Path(TEMP_PATH)
temp_path.mkdir(parents=True, exist_ok=True)
@ -134,14 +104,10 @@ def main():
logs_path.mkdir(parents=True, exist_ok=True)
run_log_path = logs_path / "run.log"
timeout_expired = False
with TeePopen(run_cmd, run_log_path, timeout=args.timeout) as process:
with TeePopen(run_cmd, run_log_path) as process:
retcode = process.wait()
if process.timeout_exceeded:
logging.info("Timeout expired for command: %s", run_cmd)
timeout_expired = True
elif retcode == 0:
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
@ -175,11 +141,6 @@ def main():
else:
state, description, test_results = process_results(output_path)
if timeout_expired:
test_results.append(TestResult.create_check_timeout_expired(args.timeout))
state = FAILURE
description = test_results[-1].name
JobReport(
description=description,
test_results=test_results,

View File

@ -68,7 +68,6 @@ def get_run_command(
repo_path: Path,
result_path: Path,
server_log_path: Path,
kill_timeout: int,
additional_envs: List[str],
ci_logs_args: str,
image: DockerImage,
@ -86,7 +85,6 @@ def get_run_command(
)
envs = [
f"-e MAX_RUN_TIME={int(0.9 * kill_timeout)}",
# a static link, don't use S3_URL or S3_DOWNLOAD
'-e S3_URL="https://s3.amazonaws.com/clickhouse-datasets"',
]
@ -192,7 +190,6 @@ def process_results(
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("check_name")
parser.add_argument("kill_timeout", type=int)
parser.add_argument(
"--validate-bugfix",
action="store_true",
@ -224,12 +221,7 @@ def main():
assert (
check_name
), "Check name must be provided as an input arg or in CHECK_NAME env"
kill_timeout = args.kill_timeout or int(os.getenv("KILL_TIMEOUT", "0"))
assert (
kill_timeout > 0
), "kill timeout must be provided as an input arg or in KILL_TIMEOUT env"
validate_bugfix_check = args.validate_bugfix
print(f"Runnin check [{check_name}] with timeout [{kill_timeout}]")
flaky_check = "flaky" in check_name.lower()
@ -288,7 +280,6 @@ def main():
repo_path,
result_path,
server_log_path,
kill_timeout,
additional_envs,
ci_logs_args,
docker_image,

View File

@ -1,25 +1,21 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
import subprocess
import sys
from pathlib import Path
from shutil import copy2
from typing import Dict
from build_download_helper import download_builds_filter
from compress_files import compress_fast
from docker_images_helper import DockerImage, pull_image, get_docker_image
from env_helper import CI, REPORT_PATH, TEMP_PATH as TEMP
from report import JobReport, TestResults, TestResult, FAILURE, FAIL, OK, SUCCESS
from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPORT_PATH
from env_helper import TEMP_PATH as TEMP
from report import FAIL, FAILURE, OK, SUCCESS, JobReport, TestResult, TestResults
from stopwatch import Stopwatch
from tee_popen import TeePopen
from ci_utils import set_job_timeout
RPM_IMAGE = "clickhouse/install-rpm-test"
DEB_IMAGE = "clickhouse/install-deb-test"
@ -256,9 +252,6 @@ def main():
args = parse_args()
if CI:
set_job_timeout()
TEMP_PATH.mkdir(parents=True, exist_ok=True)
LOGS_PATH.mkdir(parents=True, exist_ok=True)

View File

@ -10,6 +10,7 @@ from typing import Any, List
import boto3 # type: ignore
import requests
from build_download_helper import (
download_build_with_progress,
get_build_name_for_check,
@ -201,7 +202,7 @@ def main():
docker_image = KEEPER_IMAGE_NAME if args.program == "keeper" else SERVER_IMAGE_NAME
if pr_info.is_scheduled or pr_info.is_dispatched:
# get latest clcikhouse by the static link for latest master buit - get its version and provide permanent url for this version to the jepsen
# get latest clickhouse by the static link for latest master buit - get its version and provide permanent url for this version to the jepsen
build_url = f"{S3_URL}/{S3_BUILDS_BUCKET}/master/amd64/clickhouse"
download_build_with_progress(build_url, Path(TEMP_PATH) / "clickhouse")
git_runner.run(f"chmod +x {TEMP_PATH}/clickhouse")

View File

@ -46,7 +46,6 @@ def get_run_command(
fuzzers_path: Path,
repo_path: Path,
result_path: Path,
kill_timeout: int,
additional_envs: List[str],
ci_logs_args: str,
image: DockerImage,
@ -59,7 +58,6 @@ def get_run_command(
)
envs = [
f"-e MAX_RUN_TIME={int(0.9 * kill_timeout)}",
# a static link, don't use S3_URL or S3_DOWNLOAD
'-e S3_URL="https://s3.amazonaws.com/clickhouse-datasets"',
]
@ -83,7 +81,6 @@ def get_run_command(
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("check_name")
parser.add_argument("kill_timeout", type=int)
return parser.parse_args()
@ -99,7 +96,6 @@ def main():
args = parse_args()
check_name = args.check_name
kill_timeout = args.kill_timeout
pr_info = PRInfo()
@ -145,7 +141,6 @@ def main():
fuzzers_path,
repo_path,
result_path,
kill_timeout,
additional_envs,
ci_logs_args,
docker_image,

View File

@ -288,7 +288,7 @@ class JobReport:
start_time: str
duration: float
additional_files: Union[Sequence[str], Sequence[Path]]
# clcikhouse version, build job only
# clickhouse version, build job only
version: str = ""
# checkname to set in commit status, set if differs from jjob name
check_name: str = ""

View File

@ -9,8 +9,8 @@ from pathlib import Path
from typing import Tuple
from build_download_helper import download_all_deb_packages
from docker_images_helper import DockerImage, pull_image, get_docker_image
from env_helper import REPORT_PATH, TEMP_PATH, REPO_COPY
from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from report import (
ERROR,
FAIL,
@ -72,11 +72,6 @@ def parse_args() -> argparse.Namespace:
required=False,
default="",
)
parser.add_argument(
"--kill-timeout",
required=False,
default=0,
)
return parser.parse_args()
@ -96,10 +91,6 @@ def main():
assert (
check_name
), "Check name must be provided as an input arg or in CHECK_NAME env"
kill_timeout = args.kill_timeout or int(os.getenv("KILL_TIMEOUT", "0"))
assert (
kill_timeout > 0
), "kill timeout must be provided as an input arg or in KILL_TIMEOUT env"
docker_image = pull_image(get_docker_image(IMAGE_NAME))
@ -127,7 +118,7 @@ def main():
)
logging.info("Going to run func tests: %s", run_command)
with TeePopen(run_command, run_log_path, timeout=kill_timeout) as process:
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")

View File

@ -14,7 +14,7 @@ from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import ERROR, JobReport, TestResult, TestResults, read_test_results
from report import ERROR, JobReport, TestResults, read_test_results
from stopwatch import Stopwatch
from tee_popen import TeePopen
@ -161,14 +161,9 @@ def run_stress_test(docker_image_name: str) -> None:
)
logging.info("Going to run stress test: %s", run_command)
timeout_expired = False
timeout = 60 * 150
with TeePopen(run_command, run_log_path, timeout=timeout) as process:
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if process.timeout_exceeded:
logging.info("Timeout expired for command: %s", run_command)
timeout_expired = True
elif retcode == 0:
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
@ -180,11 +175,6 @@ def run_stress_test(docker_image_name: str) -> None:
result_path, server_log_path, run_log_path
)
if timeout_expired:
test_results.append(TestResult.create_check_timeout_expired(timeout))
state = "failure"
description = test_results[-1].name
JobReport(
description=description,
test_results=test_results,

View File

@ -1223,12 +1223,9 @@ class TestCase:
return FailureReason.S3_STORAGE
elif (
tags
and ("no-s3-storage-with-slow-build" in tags)
and "no-s3-storage-with-slow-build" in tags
and args.s3_storage
and (
BuildFlags.THREAD in args.build_flags
or BuildFlags.DEBUG in args.build_flags
)
and BuildFlags.RELEASE not in args.build_flags
):
return FailureReason.S3_STORAGE

View File

@ -4,3 +4,5 @@ a|x
String, Const(size = 1, String(size = 1))
String, Const(size = 1, String(size = 1))
5128475243952187658
0 0
0 0

View File

@ -10,3 +10,23 @@ select dumpColumnStructure('x') GROUP BY 'x';
select dumpColumnStructure('x');
-- from https://github.com/ClickHouse/ClickHouse/pull/60046
SELECT cityHash64('limit', _CAST(materialize('World'), 'LowCardinality(String)')) FROM system.one GROUP BY GROUPING SETS ('limit');
WITH (
SELECT dummy AS x
FROM system.one
) AS y
SELECT
y,
min(dummy)
FROM remote('127.0.0.{1,2}', system.one)
GROUP BY y;
WITH (
SELECT dummy AS x
FROM system.one
) AS y
SELECT
y,
min(dummy)
FROM remote('127.0.0.{2,3}', system.one)
GROUP BY y;

View File

@ -0,0 +1,65 @@
0
1
2
0
1
2
0
1
2
0
0
1
2
0
1
2
0
1
2
0
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4

View File

@ -0,0 +1,14 @@
-- Tags: no-parallel
SELECT * FROM loop(numbers(3)) LIMIT 10;
SELECT * FROM loop (numbers(3)) LIMIT 10 settings max_block_size = 1;
DROP DATABASE IF EXISTS 03147_db;
CREATE DATABASE IF NOT EXISTS 03147_db;
CREATE TABLE 03147_db.t (n Int8) ENGINE=MergeTree ORDER BY n;
INSERT INTO 03147_db.t SELECT * FROM numbers(10);
USE 03147_db;
SELECT * FROM loop(03147_db.t) LIMIT 15;
SELECT * FROM loop(t) LIMIT 15;
SELECT * FROM loop(03147_db, t) LIMIT 15;

View File

@ -0,0 +1,8 @@
1
1
0
0
0
0
0
0

View File

@ -0,0 +1,11 @@
-- Equal
SELECT toIPv4('127.0.0.1') = toIPv6('::ffff:127.0.0.1');
SELECT toIPv6('::ffff:127.0.0.1') = toIPv4('127.0.0.1');
-- Not equal
SELECT toIPv4('127.0.0.1') = toIPv6('::ffff:127.0.0.2');
SELECT toIPv4('127.0.0.2') = toIPv6('::ffff:127.0.0.1');
SELECT toIPv6('::ffff:127.0.0.1') = toIPv4('127.0.0.2');
SELECT toIPv6('::ffff:127.0.0.2') = toIPv4('127.0.0.1');
SELECT toIPv4('127.0.0.1') = toIPv6('::ffef:127.0.0.1');
SELECT toIPv6('::ffef:127.0.0.1') = toIPv4('127.0.0.1');