Merge branch 'master' into try-fix-keeper-map

This commit is contained in:
Antonio Andelic 2023-01-12 15:19:51 +00:00
commit dc4262f338
48 changed files with 724 additions and 334 deletions

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit ef75afc075fc71fbcd8fe28dcda3794ae265fd1c
Subproject commit ea8c3044f43f5afa7016d2d580ed201f495d7e94

View File

@ -5,12 +5,18 @@ FROM ubuntu:22.04
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
RUN apt-get update --yes && env DEBIAN_FRONTEND=noninteractive apt-get install wget unzip git default-jdk maven python3 --yes --no-install-recommends
RUN wget https://github.com/sqlancer/sqlancer/archive/master.zip -O /sqlancer.zip
RUN apt-get update --yes && \
env DEBIAN_FRONTEND=noninteractive apt-get install wget git default-jdk maven python3 --yes --no-install-recommends && \
apt-get clean
# We need to get the repository's HEAD each time despite, so we invalidate layers' cache
ARG CACHE_INVALIDATOR=0
RUN mkdir /sqlancer && \
cd /sqlancer && \
unzip /sqlancer.zip
RUN cd /sqlancer/sqlancer-master && mvn package -DskipTests
wget -q -O- https://github.com/sqlancer/sqlancer/archive/master.tar.gz | \
tar zx -C /sqlancer && \
cd /sqlancer/sqlancer-master && \
mvn package -DskipTests && \
rm -r /root/.m2
COPY run.sh /
COPY process_sqlancer_result.py /

View File

@ -593,7 +593,7 @@ clickhouse-local --structure "test String, res String" -q "SELECT 'failure', tes
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
# Core dumps
find . -type f -name 'core.*' | while read core; do
find . -type f -maxdepth 1 -name 'core.*' | while read core; do
zstd --threads=0 $core
mv $core.zst /test_output/
done

View File

@ -207,16 +207,9 @@ Converts a date or date with time to a UInt8 number containing the number of the
Aliases: `DAYOFMONTH`, `DAY`.
## toDayOfWeek(date\[,mode\])
## toDayOfWeek
Converts a date or date with time to a UInt8 number containing the number of the day of the week. The two-argument form of toDayOfWeek() enables you to specify whether the week starts on Monday or Sunday, and whether the return value should be in the range from 0 to 6 or from 1-7. If the mode argument is ommited, the default mode is 0.
| Mode | First day of week | Range |
|------|-------------------|------------------------------------------------|
| 0 | Monday | 1-7, Monday = 1, Tuesday = 2, ..., Sunday = 7 |
| 1 | Monday | 0-6, Monday = 0, Tuesday = 1, ..., Sunday = 6 |
| 2 | Sunday | 0-6, Sunday = 0, Monday = 1, ..., Saturday = 6 |
| 3 | Sunday | 1-7, Sunday = 1, Monday = 2, ..., Saturday = 7 |
Converts a date or date with time to a UInt8 number containing the number of the day of the week (Monday is 1, and Sunday is 7).
Alias: `DAYOFWEEK`.

View File

@ -606,5 +606,10 @@ if (ENABLE_TESTS)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::yaml_cpp)
endif()
if (TARGET ch_contrib::azure_sdk)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::azure_sdk)
endif()
add_check(unit_tests_dbms)
endif ()

View File

@ -39,15 +39,6 @@ enum class WeekModeFlag : UInt8
};
using YearWeek = std::pair<UInt16, UInt8>;
/// Modes for toDayOfWeek() function.
enum class WeekDayMode
{
WeekStartsMonday1 = 0,
WeekStartsMonday0 = 1,
WeekStartsSunday0 = 2,
WeekStartsSunday1 = 3
};
/** Lookup table to conversion of time to date, and to month / year / day of week / day of month and so on.
* First time was implemented for OLAPServer, that needed to do billions of such transformations.
*/
@ -628,25 +619,9 @@ public:
template <typename DateOrTime>
inline Int16 toYear(DateOrTime v) const { return lut[toLUTIndex(v)].year; }
/// 1-based, starts on Monday
template <typename DateOrTime>
inline UInt8 toDayOfWeek(DateOrTime v) const { return lut[toLUTIndex(v)].day_of_week; }
template <typename DateOrTime>
inline UInt8 toDayOfWeek(DateOrTime v, UInt8 week_day_mode) const
{
WeekDayMode mode = check_week_day_mode(week_day_mode);
UInt8 res = toDayOfWeek(v);
bool start_from_sunday = (mode == WeekDayMode::WeekStartsSunday0 || mode == WeekDayMode::WeekStartsSunday1);
bool zero_based = (mode == WeekDayMode::WeekStartsMonday0 || mode == WeekDayMode::WeekStartsSunday0);
if (start_from_sunday)
res = res % 7 + 1;
if (zero_based)
--res;
return res;
}
template <typename DateOrTime>
inline UInt8 toDayOfMonth(DateOrTime v) const { return lut[toLUTIndex(v)].day_of_month; }
@ -869,13 +844,6 @@ public:
return week_format;
}
/// Check and change mode to effective.
inline WeekDayMode check_week_day_mode(UInt8 mode) const /// NOLINT
{
return static_cast<WeekDayMode>(mode & 3);
}
/** Calculate weekday from d.
* Returns 0 for monday, 1 for tuesday...
*/

View File

@ -126,6 +126,10 @@ ExternalTable::ExternalTable(const boost::program_options::variables_map & exter
void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream)
{
/// After finishing this function we will be ready to receive the next file, for this we clear all the information received.
/// We should use SCOPE_EXIT because read_buffer should be reset correctly if there will be an exception.
SCOPE_EXIT(clear());
const Settings & settings = getContext()->getSettingsRef();
if (settings.http_max_multipart_form_data_size)
@ -167,9 +171,6 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
CompletedPipelineExecutor executor(pipeline);
executor.execute();
/// We are ready to receive the next file, for this we clear all the information received
clear();
}
}

View File

@ -0,0 +1,25 @@
#include <string>
#include <vector>
#include <Common/logger_useful.h>
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <azure/storage/blobs.hpp>
#include <azure/storage/common/internal/xml_wrapper.hpp>
#include <gtest/gtest.h>
TEST(AzureXMLWrapper, TestLeak)
{
std::string str = "<hello>world</hello>";
Azure::Storage::_internal::XmlReader reader(str.c_str(), str.length());
Azure::Storage::_internal::XmlReader reader2(std::move(reader));
Azure::Storage::_internal::XmlReader reader3 = std::move(reader2);
reader3.Read();
}
#endif

View File

@ -7,49 +7,29 @@
namespace fs = std::filesystem;
template <typename T>
DB::DiskPtr createDisk();
template <>
DB::DiskPtr createDisk<DB::DiskLocal>()
DB::DiskPtr createDisk()
{
fs::create_directory("tmp/");
return std::make_shared<DB::DiskLocal>("local_disk", "tmp/", 0);
}
template <typename T>
void destroyDisk(DB::DiskPtr & disk)
{
disk.reset();
}
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk)
{
disk.reset();
fs::remove_all("tmp/");
}
template <typename T>
class DiskTest : public testing::Test
{
public:
void SetUp() override { disk = createDisk<T>(); }
void TearDown() override { destroyDisk<T>(disk); }
void SetUp() override { disk = createDisk(); }
void TearDown() override { destroyDisk(disk); }
DB::DiskPtr disk;
};
using DiskImplementations = testing::Types<DB::DiskLocal>;
TYPED_TEST_SUITE(DiskTest, DiskImplementations);
TYPED_TEST(DiskTest, createDirectories)
TEST_F(DiskTest, createDirectories)
{
this->disk->createDirectories("test_dir1/");
EXPECT_TRUE(this->disk->isDirectory("test_dir1/"));
@ -59,7 +39,7 @@ TYPED_TEST(DiskTest, createDirectories)
}
TYPED_TEST(DiskTest, writeFile)
TEST_F(DiskTest, writeFile)
{
{
std::unique_ptr<DB::WriteBuffer> out = this->disk->writeFile("test_file");
@ -77,7 +57,7 @@ TYPED_TEST(DiskTest, writeFile)
}
TYPED_TEST(DiskTest, readFile)
TEST_F(DiskTest, readFile)
{
{
std::unique_ptr<DB::WriteBuffer> out = this->disk->writeFile("test_file");
@ -112,7 +92,7 @@ TYPED_TEST(DiskTest, readFile)
}
TYPED_TEST(DiskTest, iterateDirectory)
TEST_F(DiskTest, iterateDirectory)
{
this->disk->createDirectories("test_dir/nested_dir/");

View File

@ -3,14 +3,6 @@
#include <Disks/DiskLocal.h>
#include <Disks/IDisk.h>
template <typename T>
DB::DiskPtr createDisk();
template <>
DB::DiskPtr createDisk<DB::DiskLocal>();
template <typename T>
void destroyDisk(DB::DiskPtr & disk);
template <>
void destroyDisk<DB::DiskLocal>(DB::DiskPtr & disk);

View File

@ -3,7 +3,7 @@
#include <Disks/IDisk.h>
TEST(DiskTest, parentPath)
TEST(DiskPathTest, parentPath)
{
EXPECT_EQ("", DB::parentPath("test_dir/"));
EXPECT_EQ("test_dir/", DB::parentPath("test_dir/nested_dir/"));
@ -11,7 +11,7 @@ TEST(DiskTest, parentPath)
}
TEST(DiskTest, fileName)
TEST(DiskPathTest, fileName)
{
EXPECT_EQ("test_file", DB::fileName("test_file"));
EXPECT_EQ("nested_file", DB::fileName("test_dir/nested_file"));

View File

@ -786,21 +786,21 @@ struct ToDayOfWeekImpl
{
static constexpr auto name = "toDayOfWeek";
static inline UInt8 execute(Int64 t, UInt8 week_day_mode, const DateLUTImpl & time_zone)
static inline UInt8 execute(Int64 t, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfWeek(t, week_day_mode);
return time_zone.toDayOfWeek(t);
}
static inline UInt8 execute(UInt32 t, UInt8 week_day_mode, const DateLUTImpl & time_zone)
static inline UInt8 execute(UInt32 t, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfWeek(t, week_day_mode);
return time_zone.toDayOfWeek(t);
}
static inline UInt8 execute(Int32 d, UInt8 week_day_mode, const DateLUTImpl & time_zone)
static inline UInt8 execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfWeek(ExtendedDayNum(d), week_day_mode);
return time_zone.toDayOfWeek(ExtendedDayNum(d));
}
static inline UInt8 execute(UInt16 d, UInt8 week_day_mode, const DateLUTImpl & time_zone)
static inline UInt8 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toDayOfWeek(DayNum(d), week_day_mode);
return time_zone.toDayOfWeek(DayNum(d));
}
using FactorTransform = ToMondayImpl;

View File

@ -225,8 +225,8 @@ public:
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeWeekNumImpl<ResultPrecision::Extended>>>)
{
auto x_day_of_week = TransformDateTime64<ToDayOfWeekImpl>(transform_x.getScaleMultiplier()).execute(x, 0, timezone_x);
auto y_day_of_week = TransformDateTime64<ToDayOfWeekImpl>(transform_y.getScaleMultiplier()).execute(y, 0, timezone_y);
auto x_day_of_week = TransformDateTime64<ToDayOfWeekImpl>(transform_x.getScaleMultiplier()).execute(x, timezone_x);
auto y_day_of_week = TransformDateTime64<ToDayOfWeekImpl>(transform_y.getScaleMultiplier()).execute(y, timezone_y);
if ((x_day_of_week > y_day_of_week)
|| ((x_day_of_week == y_day_of_week) && (a_comp.time.hour > b_comp.time.hour))
|| ((a_comp.time.hour == b_comp.time.hour) && ((a_comp.time.minute > b_comp.time.minute)

View File

@ -276,7 +276,7 @@ private:
{
static inline void write(WriteBuffer & buffer, Time source, const DateLUTImpl & timezone)
{
const auto day = ToDayOfWeekImpl::execute(source, 0, timezone);
const auto day = ToDayOfWeekImpl::execute(source, timezone);
static constexpr std::string_view day_names[] =
{
"Monday",

View File

@ -344,13 +344,13 @@ private:
static size_t mysqlDayOfWeek(char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
*dest = '0' + ToDayOfWeekImpl::execute(source, 0, timezone);
*dest = '0' + ToDayOfWeekImpl::execute(source, timezone);
return 1;
}
static size_t mysqlDayOfWeek0To6(char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto day = ToDayOfWeekImpl::execute(source, 0, timezone);
auto day = ToDayOfWeekImpl::execute(source, timezone);
*dest = '0' + (day == 7 ? 0 : day);
return 1;
}
@ -499,13 +499,13 @@ private:
static size_t jodaDayOfWeek1Based(size_t min_represent_digits, char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto week_day = ToDayOfWeekImpl::execute(source, 0, timezone);
auto week_day = ToDayOfWeekImpl::execute(source, timezone);
return writeNumberWithPadding(dest, week_day, min_represent_digits);
}
static size_t jodaDayOfWeekText(size_t min_represent_digits, char * dest, Time source, UInt64, UInt32, const DateLUTImpl & timezone)
{
auto week_day = ToDayOfWeekImpl::execute(source, 0, timezone);
auto week_day = ToDayOfWeekImpl::execute(source, timezone);
if (week_day == 7)
week_day = 0;

View File

@ -1,12 +1,13 @@
#include <Functions/FunctionFactory.h>
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionDateOrDateTimeToSomething.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionCustomWeekToSomething.h>
namespace DB
{
using FunctionToDayOfWeek = FunctionCustomWeekToSomething<DataTypeUInt8, ToDayOfWeekImpl>;
using FunctionToDayOfWeek = FunctionDateOrDateTimeToSomething<DataTypeUInt8, ToDayOfWeekImpl>;
REGISTER_FUNCTION(ToDayOfWeek)
{

View File

@ -886,20 +886,20 @@ public:
const auto & lhs = lhs_block.getByPosition(i);
const auto & rhs = rhs_block.getByPosition(i);
if (lhs.name != rhs.name)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}]",
lhs_block.dumpStructure(), rhs_block.dumpStructure());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}] ({} != {})",
lhs_block.dumpStructure(), rhs_block.dumpStructure(), lhs.name, rhs.name);
const auto & ltype = recursiveRemoveLowCardinality(lhs.type);
const auto & rtype = recursiveRemoveLowCardinality(rhs.type);
if (!ltype->equals(*rtype))
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}]",
lhs_block.dumpStructure(), rhs_block.dumpStructure());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}] ({} != {})",
lhs_block.dumpStructure(), rhs_block.dumpStructure(), ltype->getName(), rtype->getName());
const auto & lcol = recursiveRemoveLowCardinality(lhs.column);
const auto & rcol = recursiveRemoveLowCardinality(rhs.column);
if (lcol->getDataType() != rcol->getDataType())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}]",
lhs_block.dumpStructure(), rhs_block.dumpStructure());
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}] ({} != {})",
lhs_block.dumpStructure(), rhs_block.dumpStructure(), lcol->getDataType(), rcol->getDataType());
}
}

View File

@ -458,16 +458,6 @@ TableJoin::createConvertingActions(
LOG_DEBUG(&Poco::Logger::get("TableJoin"), "{} JOIN converting actions: empty", side);
return;
}
auto format_cols = [](const auto & cols) -> std::string
{
std::vector<std::string> str_cols;
str_cols.reserve(cols.size());
for (const auto & col : cols)
str_cols.push_back(fmt::format("'{}': {}", col.name, col.type->getName()));
return fmt::format("[{}]", fmt::join(str_cols, ", "));
};
LOG_DEBUG(&Poco::Logger::get("TableJoin"), "{} JOIN converting actions: {} -> {}",
side, format_cols(dag->getRequiredColumns()), format_cols(dag->getResultColumns()));
};
log_actions("Left", left_converting_actions);
log_actions("Right", right_converting_actions);

View File

@ -0,0 +1,66 @@
#include <Planner/CollectColumnIdentifiers.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/ColumnNode.h>
#include <Planner/PlannerContext.h>
namespace DB
{
namespace
{
class CollectTopLevelColumnIdentifiersVisitor : public InDepthQueryTreeVisitor<CollectTopLevelColumnIdentifiersVisitor, true>
{
public:
explicit CollectTopLevelColumnIdentifiersVisitor(const PlannerContextPtr & planner_context_, ColumnIdentifierSet & used_identifiers_)
: used_identifiers(used_identifiers_)
, planner_context(planner_context_)
{}
static bool needChildVisit(VisitQueryTreeNodeType &, VisitQueryTreeNodeType & child)
{
const auto & node_type = child->getNodeType();
return node_type != QueryTreeNodeType::TABLE
&& node_type != QueryTreeNodeType::TABLE_FUNCTION
&& node_type != QueryTreeNodeType::QUERY
&& node_type != QueryTreeNodeType::UNION
&& node_type != QueryTreeNodeType::JOIN
&& node_type != QueryTreeNodeType::ARRAY_JOIN;
}
void visitImpl(const QueryTreeNodePtr & node)
{
if (node->getNodeType() != QueryTreeNodeType::COLUMN)
return;
const auto * column_identifier = planner_context->getColumnNodeIdentifierOrNull(node);
if (!column_identifier)
return;
used_identifiers.insert(*column_identifier);
}
ColumnIdentifierSet & used_identifiers;
const PlannerContextPtr & planner_context;
};
}
void collectTopLevelColumnIdentifiers(const QueryTreeNodePtr & node, const PlannerContextPtr & planner_context, ColumnIdentifierSet & out)
{
CollectTopLevelColumnIdentifiersVisitor visitor(planner_context, out);
visitor.visit(node);
}
ColumnIdentifierSet collectTopLevelColumnIdentifiers(const QueryTreeNodePtr & node, const PlannerContextPtr & planner_context)
{
ColumnIdentifierSet out;
collectTopLevelColumnIdentifiers(node, planner_context, out);
return out;
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Analyzer/IQueryTreeNode.h>
#include <Planner/Planner.h>
#include <Planner/TableExpressionData.h>
namespace DB
{
/** Collect all top level column identifiers from query tree node.
* Top level column identifiers are in the SELECT list or GROUP BY/ORDER BY/WHERE/HAVING clause, but not in child nodes of join tree.
* For example, in the following query:
* SELECT sum(b) FROM (SELECT x AS a, y AS b FROM t) AS t1 JOIN t2 ON t1.a = t2.key GROUP BY t2.y
* The top level column identifiers are: `t1.b`, `t2.y`
*
* There is precondition that table expression data is collected in planner context.
*/
ColumnIdentifierSet collectTopLevelColumnIdentifiers(const QueryTreeNodePtr & node, const PlannerContextPtr & planner_context);
void collectTopLevelColumnIdentifiers(const QueryTreeNodePtr & node, const PlannerContextPtr & planner_context, ColumnIdentifierSet & out);
}

View File

@ -64,6 +64,7 @@
#include <Planner/CollectTableExpressionData.h>
#include <Planner/PlannerJoinTree.h>
#include <Planner/PlannerExpressionAnalysis.h>
#include <Planner/CollectColumnIdentifiers.h>
namespace DB
{
@ -374,7 +375,9 @@ void Planner::buildQueryPlanIfNeeded()
collectSets(query_tree, *planner_context);
query_plan = buildQueryPlanForJoinTreeNode(query_node.getJoinTree(), select_query_info, select_query_options, planner_context);
auto top_level_identifiers = collectTopLevelColumnIdentifiers(query_tree, planner_context);
query_plan = buildQueryPlanForJoinTreeNode(query_node.getJoinTree(), select_query_info, select_query_options, top_level_identifiers, planner_context);
auto expression_analysis_result = buildExpressionAnalysisResult(query_tree, query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(), planner_context);
if (expression_analysis_result.hasWhere())

View File

@ -33,6 +33,7 @@
#include <Interpreters/HashJoin.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Planner/CollectColumnIdentifiers.h>
#include <Planner/Planner.h>
#include <Planner/PlannerJoins.h>
#include <Planner/PlannerActionsVisitor.h>
@ -262,19 +263,25 @@ QueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expression,
QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
{
auto & join_node = join_tree_node->as<JoinNode &>();
ColumnIdentifierSet current_scope_columns = outer_scope_columns;
collectTopLevelColumnIdentifiers(join_tree_node, planner_context, current_scope_columns);
auto left_plan = buildQueryPlanForJoinTreeNode(join_node.getLeftTableExpression(),
select_query_info,
select_query_options,
current_scope_columns,
planner_context);
auto left_plan_output_columns = left_plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
auto right_plan = buildQueryPlanForJoinTreeNode(join_node.getRightTableExpression(),
select_query_info,
select_query_options,
current_scope_columns,
planner_context);
auto right_plan_output_columns = right_plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
@ -641,6 +648,7 @@ QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
size_t max_block_size = query_context->getSettingsRef().max_block_size;
size_t max_streams = query_context->getSettingsRef().max_threads;
JoinPipelineType join_pipeline_type = join_algorithm->pipelineType();
auto join_step = std::make_unique<JoinStep>(
left_plan.getCurrentDataStream(),
right_plan.getCurrentDataStream(),
@ -649,7 +657,7 @@ QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
max_streams,
false /*optimize_read_in_order*/);
join_step->setStepDescription(fmt::format("JOIN {}", JoinPipelineType::FillRightFirst));
join_step->setStepDescription(fmt::format("JOIN {}", join_pipeline_type));
std::vector<QueryPlanPtr> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(left_plan)));
@ -664,8 +672,13 @@ QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
for (auto & output : drop_unused_columns_after_join_actions_dag->getOutputs())
{
if (updated_outputs_names.contains(output->result_name) || !planner_context->getGlobalPlannerContext()->hasColumnIdentifier(output->result_name))
const auto & global_planner_context = planner_context->getGlobalPlannerContext();
if (updated_outputs_names.contains(output->result_name)
|| !global_planner_context->hasColumnIdentifier(output->result_name)
|| !outer_scope_columns.contains(output->result_name))
{
continue;
}
updated_outputs.push_back(output);
updated_outputs_names.insert(output->result_name);
@ -683,6 +696,7 @@ QueryPlan buildQueryPlanForJoinNode(QueryTreeNodePtr join_tree_node,
QueryPlan buildQueryPlanForArrayJoinNode(QueryTreeNodePtr table_expression,
SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
{
auto & array_join_node = table_expression->as<ArrayJoinNode &>();
@ -690,6 +704,7 @@ QueryPlan buildQueryPlanForArrayJoinNode(QueryTreeNodePtr table_expression,
auto plan = buildQueryPlanForJoinTreeNode(array_join_node.getTableExpression(),
select_query_info,
select_query_options,
outer_scope_columns,
planner_context);
auto plan_output_columns = plan.getCurrentDataStream().header.getColumnsWithTypeAndName();
@ -729,6 +744,7 @@ QueryPlan buildQueryPlanForArrayJoinNode(QueryTreeNodePtr table_expression,
QueryPlan buildQueryPlanForJoinTreeNode(QueryTreeNodePtr join_tree_node,
SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context)
{
auto join_tree_node_type = join_tree_node->getNodeType();
@ -747,11 +763,11 @@ QueryPlan buildQueryPlanForJoinTreeNode(QueryTreeNodePtr join_tree_node,
}
case QueryTreeNodeType::JOIN:
{
return buildQueryPlanForJoinNode(join_tree_node, select_query_info, select_query_options, planner_context);
return buildQueryPlanForJoinNode(join_tree_node, select_query_info, select_query_options, outer_scope_columns, planner_context);
}
case QueryTreeNodeType::ARRAY_JOIN:
{
return buildQueryPlanForArrayJoinNode(join_tree_node, select_query_info, select_query_options, planner_context);
return buildQueryPlanForArrayJoinNode(join_tree_node, select_query_info, select_query_options, outer_scope_columns, planner_context);
}
default:
{

View File

@ -15,6 +15,7 @@ namespace DB
QueryPlan buildQueryPlanForJoinTreeNode(QueryTreeNodePtr join_tree_node,
SelectQueryInfo & select_query_info,
const SelectQueryOptions & select_query_options,
const ColumnIdentifierSet & outer_scope_columns,
PlannerContextPtr & planner_context);
}

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
using ColumnIdentifier = std::string;
using ColumnIdentifiers = std::vector<ColumnIdentifier>;
using ColumnIdentifierSet = std::unordered_set<ColumnIdentifier>;
/** Table expression data is created for each table expression that take part in query.
* Table expression data has information about columns that participate in query, their name to identifier mapping,

View File

@ -3780,7 +3780,7 @@ std::pair<size_t, size_t> MergeTreeData::getMaxPartsCountAndSizeForPartition() c
}
size_t MergeTreeData::getMaxInactivePartsCountForPartition() const
size_t MergeTreeData::getMaxOutdatedPartsCountForPartition() const
{
return getMaxPartsCountAndSizeForPartitionWithState(DataPartState::Outdated).first;
}
@ -3801,70 +3801,102 @@ std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
}
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const
{
const auto settings = getSettings();
const auto & query_settings = query_context->getSettingsRef();
const size_t parts_count_in_total = getPartsCount();
/// check if have too many parts in total
if (parts_count_in_total >= settings->max_parts_in_total)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception("Too many parts (" + toString(parts_count_in_total) + ") in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.", ErrorCodes::TOO_MANY_PARTS);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}) in all partitions in total. This indicates wrong choice of partition key. The threshold can be modified "
"with 'max_parts_in_total' setting in <merge_tree> element in config.xml or with per-table setting.",
toString(parts_count_in_total));
}
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition();
ssize_t k_inactive = -1;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
size_t outdated_parts_over_threshold = 0;
{
size_t inactive_parts_count_in_partition = getMaxInactivePartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && inactive_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
size_t outdated_parts_count_in_partition = 0;
if (settings->inactive_parts_to_throw_insert > 0 || settings->inactive_parts_to_delay_insert > 0)
outdated_parts_count_in_partition = getMaxOutdatedPartsCountForPartition();
if (settings->inactive_parts_to_throw_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_throw_insert)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many inactive parts ({}). Parts cleaning are processing significantly slower than inserts",
inactive_parts_count_in_partition);
outdated_parts_count_in_partition);
}
k_inactive = static_cast<ssize_t>(inactive_parts_count_in_partition) - static_cast<ssize_t>(settings->inactive_parts_to_delay_insert);
if (settings->inactive_parts_to_delay_insert > 0 && outdated_parts_count_in_partition >= settings->inactive_parts_to_delay_insert)
outdated_parts_over_threshold = outdated_parts_count_in_partition - settings->inactive_parts_to_delay_insert + 1;
}
auto parts_to_delay_insert = query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
auto parts_to_throw_insert = query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
auto [parts_count_in_partition, size_of_partition] = getMaxPartsCountAndSizeForPartition();
size_t average_part_size = parts_count_in_partition ? size_of_partition / parts_count_in_partition : 0;
bool parts_are_large_enough_in_average = settings->max_avg_part_size_for_too_many_parts
&& average_part_size > settings->max_avg_part_size_for_too_many_parts;
if (parts_count_in_partition >= parts_to_throw_insert && !parts_are_large_enough_in_average)
const auto active_parts_to_delay_insert
= query_settings.parts_to_delay_insert ? query_settings.parts_to_delay_insert : settings->parts_to_delay_insert;
const auto active_parts_to_throw_insert
= query_settings.parts_to_throw_insert ? query_settings.parts_to_throw_insert : settings->parts_to_throw_insert;
size_t active_parts_over_threshold = 0;
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count_in_partition, ReadableSize(average_part_size));
bool parts_are_large_enough_in_average
= settings->max_avg_part_size_for_too_many_parts && average_part_size > settings->max_avg_part_size_for_too_many_parts;
if (parts_count_in_partition >= active_parts_to_throw_insert && !parts_are_large_enough_in_average)
{
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({} with average size of {}). Merges are processing significantly slower than inserts",
parts_count_in_partition,
ReadableSize(average_part_size));
}
if (active_parts_to_delay_insert > 0 && parts_count_in_partition >= active_parts_to_delay_insert
&& !parts_are_large_enough_in_average)
/// if parts_count == parts_to_delay_insert -> we're 1 part over threshold
active_parts_over_threshold = parts_count_in_partition - active_parts_to_delay_insert + 1;
}
if (k_inactive < 0 && (parts_count_in_partition < parts_to_delay_insert || parts_are_large_enough_in_average))
/// no need for delay
if (!active_parts_over_threshold && !outdated_parts_over_threshold)
return;
const ssize_t k_active = ssize_t(parts_count_in_partition) - ssize_t(parts_to_delay_insert);
size_t max_k;
size_t k;
if (k_active > k_inactive)
UInt64 delay_milliseconds = 0;
{
max_k = parts_to_throw_insert - parts_to_delay_insert;
k = k_active + 1;
}
else
{
max_k = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert;
k = k_inactive + 1;
}
size_t parts_over_threshold = 0;
size_t allowed_parts_over_threshold = 1;
const bool use_active_parts_threshold = (active_parts_over_threshold >= outdated_parts_over_threshold);
if (use_active_parts_threshold)
{
parts_over_threshold = active_parts_over_threshold;
allowed_parts_over_threshold = active_parts_to_throw_insert - active_parts_to_delay_insert;
}
else
{
parts_over_threshold = outdated_parts_over_threshold;
allowed_parts_over_threshold = outdated_parts_over_threshold; /// if throw threshold is not set, will use max delay
if (settings->inactive_parts_to_throw_insert > 0)
allowed_parts_over_threshold = settings->inactive_parts_to_throw_insert - settings->inactive_parts_to_delay_insert;
}
const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000);
/// min() as a save guard here
const UInt64 delay_milliseconds
= std::min(max_delay_milliseconds, static_cast<UInt64>(::pow(max_delay_milliseconds, static_cast<double>(k) / max_k)));
if (allowed_parts_over_threshold == 0 || parts_over_threshold > allowed_parts_over_threshold) [[unlikely]]
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect calculation of {} parts over threshold: allowed_parts_over_threshold={}, parts_over_threshold={}",
(use_active_parts_threshold ? "active" : "inactive"),
allowed_parts_over_threshold,
parts_over_threshold);
const UInt64 max_delay_milliseconds = (settings->max_delay_to_insert > 0 ? settings->max_delay_to_insert * 1000 : 1000);
double delay_factor = static_cast<double>(parts_over_threshold) / allowed_parts_over_threshold;
const UInt64 min_delay_milliseconds = settings->min_delay_to_insert_ms;
delay_milliseconds = std::max(min_delay_milliseconds, static_cast<UInt64>(max_delay_milliseconds * delay_factor));
}
ProfileEvents::increment(ProfileEvents::DelayedInserts);
ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay_milliseconds);

View File

@ -535,7 +535,7 @@ public:
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const;
size_t getMaxInactivePartsCountForPartition() const;
size_t getMaxOutdatedPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
@ -555,7 +555,7 @@ public:
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until, ContextPtr query_context) const;
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.

View File

@ -5,6 +5,7 @@
#include <Interpreters/Context.h>
#include <Common/FieldVisitors.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeIPv4andIPv6.h>
#include <DataTypes/DataTypeTuple.h>
#include <Columns/ColumnTuple.h>
#include <Common/SipHash.h>
@ -93,9 +94,7 @@ namespace
}
void operator() (const IPv6 & x) const
{
UInt8 type = Field::Types::IPv6;
hash.update(type);
hash.update(x);
return operator()(String(reinterpret_cast<const char *>(&x), 16));
}
void operator() (const Float64 & x) const
{
@ -213,7 +212,7 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
bool are_all_integral = true;
for (const Field & field : value)
{
if (field.getType() != Field::Types::UInt64 && field.getType() != Field::Types::Int64)
if (field.getType() != Field::Types::UInt64 && field.getType() != Field::Types::Int64 && field.getType() != Field::Types::IPv4)
{
are_all_integral = false;
break;
@ -232,6 +231,8 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
if (typeid_cast<const DataTypeDate *>(partition_key_sample.getByPosition(i).type.get()))
result += toString(DateLUT::instance().toNumYYYYMMDD(DayNum(value[i].safeGet<UInt64>())));
else if (typeid_cast<const DataTypeIPv4 *>(partition_key_sample.getByPosition(i).type.get()))
result += toString(value[i].get<IPv4>().toUnderType());
else
result += applyVisitor(to_string_visitor, value[i]);

View File

@ -68,12 +68,13 @@ struct Settings;
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
M(UInt64, inactive_parts_to_delay_insert, 0, "If table contains at least that many inactive parts in single partition, artificially slow down insert into table.", 0) \
M(UInt64, parts_to_throw_insert, 300, "If more than this number active parts in single partition, throw 'Too many parts ...' exception.", 0) \
M(UInt64, inactive_parts_to_throw_insert, 0, "If more than this number inactive parts in single partition, throw 'Too many inactive parts ...' exception.", 0) \
M(UInt64, max_avg_part_size_for_too_many_parts, 10ULL * 1024 * 1024 * 1024, "The 'too many parts' check according to 'parts_to_delay_insert' and 'parts_to_throw_insert' will be active only if the average part size (in the relevant partition) is not larger than the specified threshold. If it is larger than the specified threshold, the INSERTs will be neither delayed or rejected. This allows to have hundreds of terabytes in a single table on a single server if the parts are successfully merged to larger parts. This does not affect the thresholds on inactive parts or total parts.", 0) \
M(UInt64, max_delay_to_insert, 1, "Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
\
/* Part removal settings. */ \

View File

@ -229,11 +229,13 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join,
return join_clone;
}
void StorageJoin::insertBlock(const Block & block, ContextPtr context)
{
Block block_to_insert = block;
convertRightBlock(block_to_insert);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
join->addJoinedBlock(block, true);
join->addJoinedBlock(block_to_insert, true);
}
size_t StorageJoin::getSize(ContextPtr context) const
@ -265,6 +267,16 @@ ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & bl
return join->joinGet(block, block_with_columns_to_add);
}
void StorageJoin::convertRightBlock(Block & block) const
{
bool need_covert = use_nulls && isLeftOrFull(kind);
if (!need_covert)
return;
for (auto & col : block)
JoinCommon::convertColumnToNullable(col);
}
void registerStorageJoin(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)

View File

@ -77,9 +77,7 @@ public:
{
auto metadata_snapshot = getInMemoryMetadataPtr();
Block block = metadata_snapshot->getSampleBlock();
if (use_nulls && isLeftOrFull(kind))
for (auto & col : block)
JoinCommon::convertColumnToNullable(col);
convertRightBlock(block);
return block;
}
@ -108,6 +106,8 @@ private:
void finishInsert() override {}
size_t getSize(ContextPtr context) const override;
RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
void convertRightBlock(Block & block) const;
};
}

View File

@ -60,7 +60,7 @@ static UInt64 calculateTotalSizeOnDisk(const DiskPtr & disk, const String & from
}
Pipe StorageSystemDetachedParts::read(
const Names & /* column_names */,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
@ -68,37 +68,63 @@ Pipe StorageSystemDetachedParts::read(
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
StoragesInfoStream stream(query_info, context);
/// Create the result.
Block block = storage_snapshot->metadata->getSampleBlock();
MutableColumns new_columns = block.cloneEmptyColumns();
NameSet names_set(column_names.begin(), column_names.end());
std::vector<UInt8> columns_mask(block.columns());
Block header;
for (size_t i = 0; i < block.columns(); ++i)
{
if (names_set.contains(block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(block.getByPosition(i));
}
}
MutableColumns new_columns = header.cloneEmptyColumns();
while (StoragesInfo info = stream.next())
{
const auto parts = info.data->getDetachedParts();
for (const auto & p : parts)
{
size_t i = 0;
size_t src_index = 0, res_index = 0;
String detached_part_path = fs::path(MergeTreeData::DETACHED_DIR_NAME) / p.dir_name;
new_columns[i++]->insert(info.database);
new_columns[i++]->insert(info.table);
new_columns[i++]->insert(p.valid_name ? p.partition_id : Field());
new_columns[i++]->insert(p.dir_name);
new_columns[i++]->insert(calculateTotalSizeOnDisk(p.disk, fs::path(info.data->getRelativeDataPath()) / detached_part_path));
new_columns[i++]->insert(p.disk->getName());
new_columns[i++]->insert((fs::path(info.data->getFullPathOnDisk(p.disk)) / detached_part_path).string());
new_columns[i++]->insert(p.valid_name ? p.prefix : Field());
new_columns[i++]->insert(p.valid_name ? p.min_block : Field());
new_columns[i++]->insert(p.valid_name ? p.max_block : Field());
new_columns[i++]->insert(p.valid_name ? p.level : Field());
if (columns_mask[src_index++])
new_columns[res_index++]->insert(info.database);
if (columns_mask[src_index++])
new_columns[res_index++]->insert(info.table);
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.valid_name ? p.partition_id : Field());
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.dir_name);
if (columns_mask[src_index++])
new_columns[res_index++]->insert(calculateTotalSizeOnDisk(p.disk, fs::path(info.data->getRelativeDataPath()) / detached_part_path));
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.disk->getName());
if (columns_mask[src_index++])
new_columns[res_index++]->insert((fs::path(info.data->getFullPathOnDisk(p.disk)) / detached_part_path).string());
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.valid_name ? p.prefix : Field());
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.valid_name ? p.min_block : Field());
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.valid_name ? p.max_block : Field());
if (columns_mask[src_index++])
new_columns[res_index++]->insert(p.valid_name ? p.level : Field());
}
}
UInt64 num_rows = new_columns.at(0)->size();
Chunk chunk(std::move(new_columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(std::move(block), std::move(chunk)));
return Pipe(std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk)));
}
}

View File

@ -39,21 +39,20 @@ DB::StoragePtr createStorage(DB::DiskPtr & disk)
return table;
}
template <typename T>
class StorageLogTest : public testing::Test
{
public:
void SetUp() override
{
disk = createDisk<T>();
disk = createDisk();
table = createStorage(disk);
}
void TearDown() override
{
table->flushAndShutdown();
destroyDisk<T>(disk);
destroyDisk(disk);
}
const DB::DiskPtr & getDisk() { return disk; }
@ -65,9 +64,6 @@ private:
};
using DiskImplementations = testing::Types<DB::DiskLocal>;
TYPED_TEST_SUITE(StorageLogTest, DiskImplementations);
// Returns data written to table in Values format.
std::string writeData(int rows, DB::StoragePtr & table, const DB::ContextPtr context)
{
@ -153,7 +149,7 @@ std::string readData(DB::StoragePtr & table, const DB::ContextPtr context)
return out_buf.str();
}
TYPED_TEST(StorageLogTest, testReadWrite)
TEST_F(StorageLogTest, testReadWrite)
{
using namespace DB;
const auto & context_holder = getContext();

View File

@ -42,10 +42,10 @@ def cluster():
# For inserts there is no guarantee that retries will not result in duplicates.
# But it is better to retry anyway because 'Connection was closed by the server' error
# happens in fact only for inserts because reads already have build-in retries in code.
def azure_query(node, query, try_num=3):
def azure_query(node, query, try_num=3, settings={}):
for i in range(try_num):
try:
return node.query(query)
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response"
@ -80,7 +80,7 @@ def create_table(node, table_name, **additional_settings):
ORDER BY (dt, id)
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(f"DROP TABLE IF EXISTS {table_name}")
azure_query(node, f"DROP TABLE IF EXISTS {table_name}")
azure_query(node, create_table_statement)
assert (
azure_query(node, f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)"
@ -230,9 +230,9 @@ def test_alter_table_columns(cluster):
f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096, -1)}",
)
node.query(f"ALTER TABLE {TABLE_NAME} ADD COLUMN col1 UInt64 DEFAULT 1")
azure_query(node, f"ALTER TABLE {TABLE_NAME} ADD COLUMN col1 UInt64 DEFAULT 1")
# To ensure parts have been merged
node.query(f"OPTIMIZE TABLE {TABLE_NAME}")
azure_query(node, f"OPTIMIZE TABLE {TABLE_NAME}")
assert (
azure_query(node, f"SELECT sum(col1) FROM {TABLE_NAME} FORMAT Values")
@ -245,7 +245,8 @@ def test_alter_table_columns(cluster):
== "(4096)"
)
node.query(
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} MODIFY COLUMN col1 String",
settings={"mutations_sync": 2},
)
@ -271,26 +272,27 @@ def test_attach_detach_partition(cluster):
== "(8192)"
)
node.query(f"ALTER TABLE {TABLE_NAME} DETACH PARTITION '2020-01-03'")
azure_query(node, f"ALTER TABLE {TABLE_NAME} DETACH PARTITION '2020-01-03'")
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values")
== "(4096)"
)
node.query(f"ALTER TABLE {TABLE_NAME} ATTACH PARTITION '2020-01-03'")
azure_query(node, f"ALTER TABLE {TABLE_NAME} ATTACH PARTITION '2020-01-03'")
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values")
== "(8192)"
)
node.query(f"ALTER TABLE {TABLE_NAME} DROP PARTITION '2020-01-03'")
azure_query(node, f"ALTER TABLE {TABLE_NAME} DROP PARTITION '2020-01-03'")
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values")
== "(4096)"
)
node.query(f"ALTER TABLE {TABLE_NAME} DETACH PARTITION '2020-01-04'")
node.query(
azure_query(node, f"ALTER TABLE {TABLE_NAME} DETACH PARTITION '2020-01-04'")
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} DROP DETACHED PARTITION '2020-01-04'",
settings={"allow_drop_detached": 1},
)
@ -314,16 +316,18 @@ def test_move_partition_to_another_disk(cluster):
== "(8192)"
)
node.query(
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{LOCAL_DISK}'"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{LOCAL_DISK}'",
)
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values")
== "(8192)"
)
node.query(
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{AZURE_BLOB_STORAGE_DISK}'"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-04' TO DISK '{AZURE_BLOB_STORAGE_DISK}'",
)
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values")
@ -344,14 +348,14 @@ def test_table_manipulations(cluster):
f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}"
)
node.query(f"RENAME TABLE {TABLE_NAME} TO {renamed_table}")
azure_query(node, f"RENAME TABLE {TABLE_NAME} TO {renamed_table}")
assert (
azure_query(node, f"SELECT count(*) FROM {renamed_table} FORMAT Values")
== "(8192)"
)
node.query(f"RENAME TABLE {renamed_table} TO {TABLE_NAME}")
assert node.query(f"CHECK TABLE {TABLE_NAME} FORMAT Values") == "(1)"
azure_query(node, f"RENAME TABLE {renamed_table} TO {TABLE_NAME}")
assert azure_query(node, f"CHECK TABLE {TABLE_NAME} FORMAT Values") == "(1)"
node.query(f"DETACH TABLE {TABLE_NAME}")
node.query(f"ATTACH TABLE {TABLE_NAME}")
@ -360,7 +364,7 @@ def test_table_manipulations(cluster):
== "(8192)"
)
node.query(f"TRUNCATE TABLE {TABLE_NAME}")
azure_query(node, f"TRUNCATE TABLE {TABLE_NAME}")
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values") == "(0)"
)
@ -395,11 +399,13 @@ def test_move_replace_partition_to_another_table(cluster):
create_table(node, table_clone_name)
node.query(
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-03' TO TABLE {table_clone_name}"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-03' TO TABLE {table_clone_name}",
)
node.query(
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-05' TO TABLE {table_clone_name}"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} MOVE PARTITION '2020-01-05' TO TABLE {table_clone_name}",
)
assert azure_query(node, f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert (
@ -428,11 +434,13 @@ def test_move_replace_partition_to_another_table(cluster):
== "(1024)"
)
node.query(
f"ALTER TABLE {TABLE_NAME} REPLACE PARTITION '2020-01-03' FROM {table_clone_name}"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} REPLACE PARTITION '2020-01-03' FROM {table_clone_name}",
)
node.query(
f"ALTER TABLE {TABLE_NAME} REPLACE PARTITION '2020-01-05' FROM {table_clone_name}"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} REPLACE PARTITION '2020-01-05' FROM {table_clone_name}",
)
assert azure_query(node, f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert (
@ -448,16 +456,16 @@ def test_move_replace_partition_to_another_table(cluster):
== "(512)"
)
node.query(f"DROP TABLE {table_clone_name} NO DELAY")
azure_query(node, f"DROP TABLE {table_clone_name} NO DELAY")
assert azure_query(node, f"SELECT sum(id) FROM {TABLE_NAME} FORMAT Values") == "(0)"
assert (
azure_query(node, f"SELECT count(*) FROM {TABLE_NAME} FORMAT Values")
== "(1024)"
)
node.query(f"ALTER TABLE {TABLE_NAME} FREEZE")
azure_query(node, f"ALTER TABLE {TABLE_NAME} FREEZE")
node.query(f"DROP TABLE {TABLE_NAME} NO DELAY")
azure_query(node, f"DROP TABLE {TABLE_NAME} NO DELAY")
def test_freeze_unfreeze(cluster):
@ -470,20 +478,21 @@ def test_freeze_unfreeze(cluster):
azure_query(
node, f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}"
)
node.query(f"ALTER TABLE {TABLE_NAME} FREEZE WITH NAME '{backup1}'")
azure_query(node, f"ALTER TABLE {TABLE_NAME} FREEZE WITH NAME '{backup1}'")
azure_query(
node, f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-04', 4096)}"
)
node.query(f"ALTER TABLE {TABLE_NAME} FREEZE WITH NAME '{backup2}'")
azure_query(node, f"ALTER TABLE {TABLE_NAME} FREEZE WITH NAME '{backup2}'")
azure_query(node, f"TRUNCATE TABLE {TABLE_NAME}")
# Unfreeze single partition from backup1.
node.query(
f"ALTER TABLE {TABLE_NAME} UNFREEZE PARTITION '2020-01-03' WITH NAME '{backup1}'"
azure_query(
node,
f"ALTER TABLE {TABLE_NAME} UNFREEZE PARTITION '2020-01-03' WITH NAME '{backup1}'",
)
# Unfreeze all partitions from backup2.
node.query(f"ALTER TABLE {TABLE_NAME} UNFREEZE WITH NAME '{backup2}'")
azure_query(node, f"ALTER TABLE {TABLE_NAME} UNFREEZE WITH NAME '{backup2}'")
def test_apply_new_settings(cluster):
@ -524,8 +533,8 @@ def test_big_insert(cluster):
node,
f"INSERT INTO {TABLE_NAME} {check_query}",
)
assert azure_query(node, f"SELECT * FROM {TABLE_NAME} ORDER BY id") == node.query(
check_query
assert azure_query(node, f"SELECT * FROM {TABLE_NAME} ORDER BY id") == azure_query(
node, check_query
)
blob_container_client = cluster.blob_service_client.get_container_client(

View File

@ -96,6 +96,7 @@ def test_rename_replicated(started_cluster, entity):
node2.query_with_retry(
f"ALTER {entity.keyword} {entity.name} {entity.options} RENAME TO {entity.name}2"
)
node1.query("SYSTEM RELOAD USERS")
node1.query(f"DROP {entity.keyword} {entity.name}2 {entity.options}")

View File

@ -120,11 +120,11 @@ SELECT toDayOfMonth(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toDayOfWeek */
SELECT 'toDayOfWeek';
SELECT toDayOfWeek(toDateTime(1412106600), 0, 'Asia/Istanbul');
SELECT toDayOfWeek(toDateTime(1412106600), 0, 'Europe/Paris');
SELECT toDayOfWeek(toDateTime(1412106600), 0, 'Europe/London');
SELECT toDayOfWeek(toDateTime(1412106600), 0, 'Asia/Tokyo');
SELECT toDayOfWeek(toDateTime(1412106600), 0, 'Pacific/Pitcairn');
SELECT toDayOfWeek(toDateTime(1412106600), 'Asia/Istanbul');
SELECT toDayOfWeek(toDateTime(1412106600), 'Europe/Paris');
SELECT toDayOfWeek(toDateTime(1412106600), 'Europe/London');
SELECT toDayOfWeek(toDateTime(1412106600), 'Asia/Tokyo');
SELECT toDayOfWeek(toDateTime(1412106600), 'Pacific/Pitcairn');
/* toHour */

View File

@ -7,14 +7,14 @@ import sys
import argparse
# Create SQL statement to verify dateTime64 is accepted as argument to functions taking DateTime.
FUNCTIONS = """
FUNCTIONS="""
toTimeZone(N, 'UTC')
toYear(N, 'Asia/Istanbul')
toQuarter(N, 'Asia/Istanbul')
toMonth(N, 'Asia/Istanbul')
toDayOfYear(N, 'Asia/Istanbul')
toDayOfMonth(N, 'Asia/Istanbul')
toDayOfWeek(N, 0, 'Asia/Istanbul')
toDayOfWeek(N, 'Asia/Istanbul')
toHour(N, 'Asia/Istanbul')
toMinute(N, 'Asia/Istanbul')
toSecond(N, 'Asia/Istanbul')
@ -90,51 +90,68 @@ formatDateTime(N, '%C %d %D %e %F %H %I %j %m %M %p %R %S %T %u %V %w %y %Y %%',
extra_ops = [
# With same type:
(
["N {op} N"],
['N {op} N'],
{
"op": [
"- ", # does not work, but should it?
"+ ", # does not work, but should it?
"!=",
"==", # equality and inequality supposed to take sub-second part in account
"< ",
"<=",
"> ",
">=",
'op':
[
'- ', # does not work, but should it?
'+ ', # does not work, but should it?
'!=', '==', # equality and inequality supposed to take sub-second part in account
'< ',
'<=',
'> ',
'>='
]
},
}
),
# With other DateTime types:
(
["N {op} {arg}", "{arg} {op} N"],
[
'N {op} {arg}',
'{arg} {op} N'
],
{
"op": [
"-", # does not work, but should it?
"!=",
"==",
'op':
[
'-', # does not work, but should it?
'!=', '==',
# these are naturally expected to work, but they don't:
"< ",
"<=",
"> ",
">=",
'< ',
'<=',
'> ',
'>='
],
"arg": ["DT", "D", "DT64"],
},
'arg': ['DT', 'D', 'DT64'],
}
),
# With arithmetic types
(
["N {op} {arg}", "{arg} {op} N"],
[
'N {op} {arg}',
'{arg} {op} N'
],
{
"op": ["+ ", "- ", "==", "!=", "< ", "<=", "> ", ">="],
"arg": [
"toUInt8(1)",
"toInt8(-1)",
"toUInt16(1)",
"toInt16(-1)",
"toUInt32(1)",
"toInt32(-1)",
"toUInt64(1)",
"toInt64(-1)",
'op':
[
'+ ',
'- ',
'==',
'!=',
'< ',
'<=',
'> ',
'>='
],
'arg':
[
'toUInt8(1)',
'toInt8(-1)',
'toUInt16(1)',
'toInt16(-1)',
'toUInt32(1)',
'toInt32(-1)',
'toUInt64(1)',
'toInt64(-1)'
],
},
),
@ -150,17 +167,14 @@ for funcs, args in extra_ops:
# filter out empty lines and commented out lines
COMMENTED_OUT_LINE_RE = re.compile(r"^\s*#")
FUNCTIONS = list(
[f for f in FUNCTIONS if len(f) != 0 and COMMENTED_OUT_LINE_RE.match(f) == None]
)
TYPES = ["D", "DT", "DT64"]
FUNCTIONS = list([f for f in FUNCTIONS if len(f) != 0 and COMMENTED_OUT_LINE_RE.match(f) == None])
TYPES = ['D', 'DT', 'DT64']
def escape_string(s):
if sys.version_info[0] > 2:
return s.encode("unicode_escape").decode("utf-8").replace("'", "\\'")
return s.encode('unicode_escape').decode('utf-8').replace("'", "\\'")
else:
return s.encode("string-escape").decode("utf-8")
return s.encode('string-escape').decode('utf-8')
def execute_functions_for_types(functions, types):
@ -172,39 +186,18 @@ def execute_functions_for_types(functions, types):
WITH \
toDateTime64('2019-09-16 19:20:11.234', 3, 'Europe/Minsk') as DT64, \
toDateTime('2019-09-16 19:20:11', 'Europe/Minsk') as DT, \
toDate('2019-09-16') as D, {X} as N".format(
X=dt
)
print(
(
"""{prologue} SELECT toTypeName(r), {func} as r FORMAT CSV;""".format(
prologue=prologue, func=func
)
)
)
toDate('2019-09-16') as D, {X} as N".format(X=dt)
print(("""{prologue} SELECT toTypeName(r), {func} as r FORMAT CSV;""".format(prologue=prologue, func=func)))
print("""SELECT '------------------------------------------';""")
def main():
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--functions_re",
type=re.compile,
help="RE to enable functions",
default=None,
)
parser.add_argument(
"--types_re",
type=lambda s: re.compile("^(" + s + ")$"),
help="RE to enable types, supported types: " + ",".join(TYPES),
default=None,
)
parser.add_argument(
"--list_functions",
action="store_true",
help="List all functions to be tested and exit",
)
parser.add_argument('--functions_re', type=re.compile, help="RE to enable functions", default=None)
parser.add_argument('--types_re',
type=lambda s: re.compile('^(' + s + ')$'),
help="RE to enable types, supported types: " + ",".join(TYPES), default=None)
parser.add_argument('--list_functions', action='store_true', help="List all functions to be tested and exit")
return parser.parse_args()
args = parse_args()
@ -230,6 +223,5 @@ def main():
execute_functions_for_types(functions, types)
if __name__ == "__main__":
if __name__ == '__main__':
exit(main())

View File

@ -28,7 +28,7 @@ SELECT toDayOfMonth(N, \'Asia/Istanbul\')
"UInt8",16
"UInt8",16
------------------------------------------
SELECT toDayOfWeek(N, 0, \'Asia/Istanbul\')
SELECT toDayOfWeek(N, \'Asia/Istanbul\')
"UInt8",1
"UInt8",1
"UInt8",1

View File

@ -1,4 +1,4 @@
-- Tags: no-replicated-database
-- Tags: no-replicated-database, no-asan, no-tsan, no-msan, no-ubsan
SET max_memory_usage = '50M';
SELECT cityHash64(rand() % 1000) as n, groupBitmapState(number) FROM numbers_mt(2000000000) GROUP BY n FORMAT Null; -- { serverError 241 }
SET max_memory_usage = '100M';
SELECT cityHash64(rand() % 1000) as n, groupBitmapState(number) FROM numbers_mt(200000000) GROUP BY n FORMAT Null; -- { serverError 241 }

View File

@ -0,0 +1,156 @@
-- { echoOn }
EXPLAIN PLAN header = 1
SELECT count() FROM a JOIN b ON b.b1 = a.a1 JOIN c ON c.c1 = b.b1 JOIN d ON d.d1 = c.c1 GROUP BY a.a2
;
Expression ((Project names + Projection))
Header: count() UInt64
Aggregating
Header: default.a.a2_4 String
count() UInt64
Expression ((Before GROUP BY + DROP unused columns after JOIN))
Header: default.a.a2_4 String
Join (JOIN FillRightFirst)
Header: default.a.a2_4 String
default.c.c1_2 UInt64
default.d.d1_3 UInt64
Expression ((JOIN actions + DROP unused columns after JOIN))
Header: default.a.a2_4 String
default.c.c1_2 UInt64
Join (JOIN FillRightFirst)
Header: default.a.a2_4 String
default.b.b1_0 UInt64
default.c.c1_2 UInt64
Expression ((JOIN actions + DROP unused columns after JOIN))
Header: default.a.a2_4 String
default.b.b1_0 UInt64
Join (JOIN FillRightFirst)
Header: default.a.a2_4 String
default.a.a1_1 UInt64
default.b.b1_0 UInt64
Expression ((JOIN actions + Change column names to column identifiers))
Header: default.a.a2_4 String
default.a.a1_1 UInt64
ReadFromStorage (Memory)
Header: a2 String
a1 UInt64
Expression ((JOIN actions + Change column names to column identifiers))
Header: default.b.b1_0 UInt64
ReadFromStorage (Memory)
Header: b1 UInt64
Expression ((JOIN actions + Change column names to column identifiers))
Header: default.c.c1_2 UInt64
ReadFromStorage (Memory)
Header: c1 UInt64
Expression ((JOIN actions + Change column names to column identifiers))
Header: default.d.d1_3 UInt64
ReadFromStorage (Memory)
Header: d1 UInt64
EXPLAIN PLAN header = 1
SELECT a.a2, d.d2 FROM a JOIN b USING (k) JOIN c USING (k) JOIN d USING (k)
;
Expression ((Project names + (Projection + DROP unused columns after JOIN)))
Header: a2 String
d2 String
Join (JOIN FillRightFirst)
Header: default.a.k_2 UInt64
default.a.a2_0 String
default.d.d2_1 String
default.d.k_5 UInt64
Expression (DROP unused columns after JOIN)
Header: default.a.k_2 UInt64
default.a.a2_0 String
Join (JOIN FillRightFirst)
Header: default.a.k_2 UInt64
default.a.a2_0 String
default.c.k_4 UInt64
Expression (DROP unused columns after JOIN)
Header: default.a.k_2 UInt64
default.a.a2_0 String
Join (JOIN FillRightFirst)
Header: default.a.k_2 UInt64
default.a.a2_0 String
default.b.k_3 UInt64
Expression (Change column names to column identifiers)
Header: default.a.k_2 UInt64
default.a.a2_0 String
ReadFromStorage (Memory)
Header: k UInt64
a2 String
Expression (Change column names to column identifiers)
Header: default.b.k_3 UInt64
ReadFromStorage (Memory)
Header: k UInt64
Expression (Change column names to column identifiers)
Header: default.c.k_4 UInt64
ReadFromStorage (Memory)
Header: k UInt64
Expression (Change column names to column identifiers)
Header: default.d.k_5 UInt64
default.d.d2_1 String
ReadFromStorage (Memory)
Header: k UInt64
d2 String
EXPLAIN PLAN header = 1
SELECT b.bx FROM a
JOIN (SELECT b1, b2 || 'x' AS bx FROM b ) AS b ON b.b1 = a.a1
JOIN c ON c.c1 = b.b1
JOIN (SELECT number AS d1 from numbers(10)) AS d ON d.d1 = c.c1
WHERE c.c2 != '' ORDER BY a.a2
;
Expression (Project names)
Header: bx String
Sorting (Sorting for ORDER BY)
Header: default.a.a2_6 String
b.bx_0 String
Expression ((Before ORDER BY + (Projection + )))
Header: default.a.a2_6 String
b.bx_0 String
Join (JOIN FillRightFirst)
Header: default.a.a2_6 String
b.bx_0 String
default.c.c2_5 String
default.c.c1_3 UInt64
d.d1_4 UInt64
Filter (( + (JOIN actions + DROP unused columns after JOIN)))
Header: default.a.a2_6 String
b.bx_0 String
default.c.c2_5 String
default.c.c1_3 UInt64
Join (JOIN FillRightFirst)
Header: default.a.a2_6 String
b.bx_0 String
b.b1_1 UInt64
default.c.c2_5 String
default.c.c1_3 UInt64
Expression ((JOIN actions + DROP unused columns after JOIN))
Header: default.a.a2_6 String
b.bx_0 String
b.b1_1 UInt64
Join (JOIN FillRightFirst)
Header: default.a.a2_6 String
default.a.a1_2 UInt64
b.bx_0 String
b.b1_1 UInt64
Expression ((JOIN actions + Change column names to column identifiers))
Header: default.a.a2_6 String
default.a.a1_2 UInt64
ReadFromStorage (Memory)
Header: a2 String
a1 UInt64
Expression ((JOIN actions + (Change column names to column identifiers + (Project names + (Projection + Change column names to column identifiers)))))
Header: b.b1_1 UInt64
b.bx_0 String
ReadFromStorage (Memory)
Header: b2 String
b1 UInt64
Expression ((JOIN actions + Change column names to column identifiers))
Header: default.c.c2_5 String
default.c.c1_3 UInt64
ReadFromStorage (Memory)
Header: c2 String
c1 UInt64
Expression ((JOIN actions + (Change column names to column identifiers + (Project names + (Projection + Change column names to column identifiers)))))
Header: d.d1_4 UInt64
ReadFromStorage (SystemNumbers)
Header: number UInt64

View File

@ -0,0 +1,43 @@
DROP TABLE IF EXISTS a;
DROP TABLE IF EXISTS b;
DROP TABLE IF EXISTS c;
DROP TABLE IF EXISTS d;
CREATE TABLE a (k UInt64, a1 UInt64, a2 String) ENGINE = Memory;
INSERT INTO a VALUES (1, 1, 'a'), (2, 2, 'b'), (3, 3, 'c');
CREATE TABLE b (k UInt64, b1 UInt64, b2 String) ENGINE = Memory;
INSERT INTO b VALUES (1, 1, 'a'), (2, 2, 'b'), (3, 3, 'c');
CREATE TABLE c (k UInt64, c1 UInt64, c2 String) ENGINE = Memory;
INSERT INTO c VALUES (1, 1, 'a'), (2, 2, 'b'), (3, 3, 'c');
CREATE TABLE d (k UInt64, d1 UInt64, d2 String) ENGINE = Memory;
INSERT INTO d VALUES (1, 1, 'a'), (2, 2, 'b'), (3, 3, 'c');
SET allow_experimental_analyzer = 1;
-- { echoOn }
EXPLAIN PLAN header = 1
SELECT count() FROM a JOIN b ON b.b1 = a.a1 JOIN c ON c.c1 = b.b1 JOIN d ON d.d1 = c.c1 GROUP BY a.a2
;
EXPLAIN PLAN header = 1
SELECT a.a2, d.d2 FROM a JOIN b USING (k) JOIN c USING (k) JOIN d USING (k)
;
EXPLAIN PLAN header = 1
SELECT b.bx FROM a
JOIN (SELECT b1, b2 || 'x' AS bx FROM b ) AS b ON b.b1 = a.a1
JOIN c ON c.c1 = b.b1
JOIN (SELECT number AS d1 from numbers(10)) AS d ON d.d1 = c.c1
WHERE c.c2 != '' ORDER BY a.a2
;
-- { echoOff }
DROP TABLE IF EXISTS a;
DROP TABLE IF EXISTS b;
DROP TABLE IF EXISTS c;
DROP TABLE IF EXISTS d;

View File

@ -0,0 +1,6 @@
0
300
500
750
1000
TOO_MANY_PARTS

View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02521_insert_delay"
# Create MergeTree with settings which allow to insert maximum 5 parts, on 6th it'll throw TOO_MANY_PARTS
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_02521_insert_delay (key UInt32, value String) Engine=MergeTree() ORDER BY tuple() SETTINGS parts_to_delay_insert=1, parts_to_throw_insert=5, max_delay_to_insert=1, min_delay_to_insert_ms=300"
$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES test_02521_insert_delay"
# Every delay is increased by max_delay_to_insert*1000/(parts_to_throw_insert - parts_to_delay_insert + 1), here it's 250ms
# 0-indexed INSERT - no delay, 1-indexed INSERT - 300ms instead of 250ms due to min_delay_to_insert_ms
for i in {0..4}
do
query_id="${CLICKHOUSE_DATABASE}_02521_${i}_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT --query_id="$query_id" -q "INSERT INTO test_02521_insert_delay SELECT number, toString(number) FROM numbers(${i}, 1)"
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "select ProfileEvents['DelayedInsertsMilliseconds'] as delay from system.query_log where event_date >= yesterday() and query_id = {query_id:String} order by delay desc limit 1"
done
$CLICKHOUSE_CLIENT -q "INSERT INTO test_02521_insert_delay VALUES(0, 'This query throws error')" 2>&1 | grep -o 'TOO_MANY_PARTS' | head -n 1
$CLICKHOUSE_CLIENT -q "DROP TABLE test_02521_insert_delay"

View File

@ -1,7 +0,0 @@
1 7
1 7
0 6
1 0
2 1
1 7
0 6

View File

@ -1,10 +0,0 @@
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon), toDayOfWeek(date_sun);
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon, 0), toDayOfWeek(date_sun, 0);
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon, 1), toDayOfWeek(date_sun, 1);
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon, 2), toDayOfWeek(date_sun, 2);
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon, 3), toDayOfWeek(date_sun, 3);
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon, 4), toDayOfWeek(date_sun, 4);
with toDate('2023-01-09') as date_mon, date_mon - 1 as date_sun select toDayOfWeek(date_mon, 5), toDayOfWeek(date_sun, 5);
select toDayOfWeek(today(), -1); -- { serverError 43 }

View File

@ -0,0 +1,2 @@
1.2.3.4 ::ffff:1.2.3.4 16909060_1_1_0
1.2.3.4 ::ffff:1.2.3.4 1334d7cc23ffb5a5c0262304b3313426_1_1_0

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS ip_part_test;
CREATE TABLE ip_part_test ( ipv4 IPv4, ipv6 IPv6 ) ENGINE = MergeTree PARTITION BY ipv4 ORDER BY ipv4 AS SELECT '1.2.3.4', '::ffff:1.2.3.4';
SELECT *, _part FROM ip_part_test;
DROP TABLE IF EXISTS ip_part_test;
CREATE TABLE ip_part_test ( ipv4 IPv4, ipv6 IPv6 ) ENGINE = MergeTree PARTITION BY ipv6 ORDER BY ipv6 AS SELECT '1.2.3.4', '::ffff:1.2.3.4';
SELECT *, _part FROM ip_part_test;
DROP TABLE IF EXISTS ip_part_test;

View File

@ -0,0 +1,3 @@
3 \N 3
2 2 2
1 1 1

View File

@ -0,0 +1,18 @@
SET allow_suspicious_low_cardinality_types = 1;
DROP TABLE IF EXISTS t1__fuzz_8;
DROP TABLE IF EXISTS full_join__fuzz_4;
CREATE TABLE t1__fuzz_8 (`x` LowCardinality(UInt32), `str` Nullable(Int16)) ENGINE = Memory;
INSERT INTO t1__fuzz_8 VALUES (1, 1), (2, 2);
CREATE TABLE full_join__fuzz_4 (`x` LowCardinality(UInt32), `s` LowCardinality(String)) ENGINE = Join(`ALL`, FULL, x) SETTINGS join_use_nulls = 1;
INSERT INTO full_join__fuzz_4 VALUES (1, '1'), (2, '2'), (3, '3');
SET join_use_nulls = 1;
SELECT * FROM t1__fuzz_8 FULL OUTER JOIN full_join__fuzz_4 USING (x) ORDER BY x DESC, str ASC, s ASC NULLS LAST;
DROP TABLE IF EXISTS t1__fuzz_8;
DROP TABLE IF EXISTS full_join__fuzz_4;