Merge branch 'master' into stress-test

This commit is contained in:
mergify[bot] 2021-12-02 12:02:44 +00:00 committed by GitHub
commit 9c016d4e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
88 changed files with 1135 additions and 853 deletions

View File

@ -1,7 +1,7 @@
name: Cancel
on: # yamllint disable-line rule:truthy
workflow_run:
workflows: ["CIGithubActions", "ReleaseCI"]
workflows: ["CIGithubActions", "ReleaseCI", "DocsCheck", "BackportPR"]
types:
- requested
jobs:
@ -10,4 +10,5 @@ jobs:
steps:
- uses: styfle/cancel-workflow-action@0.9.1
with:
all_but_latest: true
workflow_id: ${{ github.event.workflow.id }}

62
.github/workflows/docs_check.yml vendored Normal file
View File

@ -0,0 +1,62 @@
name: DocsCheck
on: # yamllint disable-line rule:truthy
pull_request:
types:
- synchronize
- reopened
- opened
branches:
- master
paths:
- 'docs/**'
- 'website/**'
jobs:
CheckLabels:
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
uses: actions/checkout@v2
- name: Labels check
run: |
cd $GITHUB_WORKSPACE/tests/ci
python3 run_check.py
DockerHubPush:
needs: CheckLabels
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
uses: actions/checkout@v2
- name: Images check
run: |
cd $GITHUB_WORKSPACE/tests/ci
python3 docker_images_check.py
- name: Upload images files to artifacts
uses: actions/upload-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/docker_images_check/changed_images.json
DocsCheck:
needs: DockerHubPush
runs-on: [self-hosted, func-tester]
steps:
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/docs_check
- name: Check out repository code
uses: actions/checkout@v2
- name: Docs Check
env:
TEMP_PATH: ${{runner.temp}}/docs_check
REPO_COPY: ${{runner.temp}}/docs_check/ClickHouse
run: |
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci
python3 docs_check.py
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH

View File

@ -2,13 +2,14 @@ name: CIGithubActions
on: # yamllint disable-line rule:truthy
pull_request:
types:
- labeled
- unlabeled
- synchronize
- reopened
- opened
branches:
- master
paths-ignore:
- 'docs/**'
- 'website/**'
##########################################################################################
##################################### SMALL CHECKS #######################################
##########################################################################################
@ -60,34 +61,8 @@ jobs:
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
DocsCheck:
needs: DockerHubPush
runs-on: [self-hosted, func-tester]
steps:
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/docs_check
- name: Check out repository code
uses: actions/checkout@v2
- name: Docs Check
env:
TEMP_PATH: ${{runner.temp}}/docs_check
REPO_COPY: ${{runner.temp}}/docs_check/ClickHouse
run: |
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci
python3 docs_check.py
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
FastTest:
needs: DockerHubPush
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Check out repository code
@ -109,8 +84,7 @@ jobs:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
PVSCheck:
needs: DockerHubPush
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, func-tester]
steps:
- name: Check out repository code
@ -134,7 +108,6 @@ jobs:
sudo rm -fr $TEMP_PATH
CompatibilityCheck:
needs: [BuilderDebRelease]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
@ -161,7 +134,6 @@ jobs:
sudo rm -fr $TEMP_PATH
SplitBuildSmokeTest:
needs: [BuilderDebSplitted]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
@ -191,7 +163,6 @@ jobs:
#########################################################################################
BuilderDebRelease:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -230,7 +201,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinRelease:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -269,7 +239,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderDebAsan:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -308,7 +277,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderDebUBsan:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -347,7 +315,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderDebTsan:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -386,7 +353,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderDebMsan:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -425,7 +391,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderDebDebug:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -467,7 +432,6 @@ jobs:
##########################################################################################
BuilderDebSplitted:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -506,7 +470,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinTidy:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -545,7 +508,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinDarwin:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -584,7 +546,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinAarch64:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -623,7 +584,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinFreeBSD:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -662,7 +622,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinDarwinAarch64:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -701,7 +660,6 @@ jobs:
sudo rm -fr $TEMP_PATH
BuilderBinPPC64:
needs: [DockerHubPush, FastTest]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Download changed images
@ -930,9 +888,7 @@ jobs:
sudo rm -fr $TEMP_PATH
FunctionalStatelessTestTsan:
needs: [BuilderDebTsan]
# tests can consume more than 60GB of memory,
# so use bigger server
runs-on: [self-hosted, stress-tester]
runs-on: [self-hosted, func-tester]
steps:
- name: Download json reports
uses: actions/download-artifact@v2
@ -1817,7 +1773,6 @@ jobs:
- FunctionalStatefulTestTsan
- FunctionalStatefulTestMsan
- FunctionalStatefulTestUBsan
- DocsCheck
- StressTestDebug
- StressTestAsan
- StressTestTsan

View File

@ -41,30 +41,6 @@ jobs:
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
PVSCheck:
needs: DockerHubPush
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, func-tester]
steps:
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'recursive'
- name: PVS Check
env:
TEMP_PATH: ${{runner.temp}}/pvs_check
REPO_COPY: ${{runner.temp}}/pvs_check/ClickHouse
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci && python3 pvs_check.py
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
CompatibilityCheck:
needs: [BuilderDebRelease]
runs-on: [self-hosted, style-checker]
@ -1699,7 +1675,6 @@ jobs:
- UnitTestsUBsan
- UnitTestsReleaseClang
- SplitBuildSmokeTest
- PVSCheck
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code

2
contrib/sysroot vendored

@ -1 +1 @@
Subproject commit 4ef348b7f30f2ad5b02b266268b3c948e51ad457
Subproject commit 410845187f582c5e6692b53dddbe43efbb728734

View File

@ -234,6 +234,9 @@ function build
time ninja clickhouse-bundle 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/build_log.txt"
if [ "$COPY_CLICKHOUSE_BINARY_TO_OUTPUT" -eq "1" ]; then
cp programs/clickhouse "$FASTTEST_OUTPUT/clickhouse"
strip programs/clickhouse -o "$FASTTEST_OUTPUT/clickhouse-stripped"
gzip "$FASTTEST_OUTPUT/clickhouse-stripped"
fi
ccache --show-stats ||:
)

View File

@ -33,7 +33,7 @@ CREATE TABLE test
`key` String,
`v1` UInt32,
`v2` String,
`v3` Float32,
`v3` Float32
)
ENGINE = EmbeddedRocksDB
PRIMARY KEY key

View File

@ -117,8 +117,10 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
if (sigaddset(&sa.sa_mask, pause_signal))
throwFromErrno("Failed to add signal to mask for query profiler", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
if (sigaction(pause_signal, &sa, previous_handler))
struct sigaction local_previous_handler;
if (sigaction(pause_signal, &sa, &local_previous_handler))
throwFromErrno("Failed to setup signal handler for query profiler", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
previous_handler.emplace(local_previous_handler);
try
{
@ -133,7 +135,8 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
#else
sev._sigev_un._tid = thread_id;
#endif
if (timer_create(clock_type, &sev, &timer_id))
timer_t local_timer_id;
if (timer_create(clock_type, &sev, &local_timer_id))
{
/// In Google Cloud Run, the function "timer_create" is implemented incorrectly as of 2020-01-25.
/// https://mybranch.dev/posts/clickhouse-on-cloud-run/
@ -143,6 +146,7 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
}
timer_id.emplace(local_timer_id);
/// Randomize offset as uniform random value from 0 to period - 1.
/// It will allow to sample short queries even if timer period is large.
@ -154,7 +158,7 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
struct timespec offset{.tv_sec = period_rand / TIMER_PRECISION, .tv_nsec = period_rand % TIMER_PRECISION};
struct itimerspec timer_spec = {.it_interval = interval, .it_value = offset};
if (timer_settime(timer_id, 0, &timer_spec, nullptr))
if (timer_settime(*timer_id, 0, &timer_spec, nullptr))
throwFromErrno("Failed to set thread timer period", ErrorCodes::CANNOT_SET_TIMER_PERIOD);
}
catch (...)
@ -175,10 +179,10 @@ template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::tryCleanup()
{
#if USE_UNWIND
if (timer_id != nullptr && timer_delete(timer_id))
if (timer_id.has_value() && timer_delete(*timer_id))
LOG_ERROR(log, "Failed to delete query profiler timer {}", errnoToString(ErrorCodes::CANNOT_DELETE_TIMER));
if (previous_handler != nullptr && sigaction(pause_signal, previous_handler, nullptr))
if (previous_handler.has_value() && sigaction(pause_signal, &*previous_handler, nullptr))
LOG_ERROR(log, "Failed to restore signal handler after query profiler {}", errnoToString(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER));
#endif
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <base/types.h>
#include <signal.h>
#include <time.h>
@ -40,14 +41,14 @@ private:
#if USE_UNWIND
/// Timer id from timer_create(2)
timer_t timer_id = nullptr;
std::optional<timer_t> timer_id;
#endif
/// Pause signal to interrupt threads to get traces
int pause_signal;
/// Previous signal handler to restore after query profiler exits
struct sigaction * previous_handler = nullptr;
std::optional<struct sigaction> previous_handler;
};
/// Query profiler with timer based on real clock

View File

@ -501,6 +501,9 @@ namespace MySQLReplication
UInt32 mask = 0;
DecimalType res(0);
if (payload.eof())
throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
if ((*payload.position() & 0x80) == 0)
mask = UInt32(-1);

View File

@ -90,6 +90,7 @@ class IColumn;
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(Bool, stream_like_engine_allow_direct_select, false, "Allow direct SELECT query for Kafka, RabbitMQ and FileLog engines. In case there are attached materialized views, SELECT query is not allowed even if this setting is enabled.", 0) \
\
M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
M(Milliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \
@ -591,6 +592,9 @@ class IColumn;
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.", 0) \
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \
\
M(String, bool_true_representation, "true", "Text to represent bool value in TSV/CSV formats.", 0) \
M(String, bool_false_representation, "false", "Text to represent bool value in TSV/CSV formats.", 0) \
\
M(Bool, input_format_values_interpret_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \
M(Bool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \
M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \

View File

@ -0,0 +1,21 @@
#include <DataTypes/Serializations/SerializationBool.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeCustom.h>
namespace DB
{
void registerDataTypeDomainBool(DataTypeFactory & factory)
{
factory.registerSimpleDataTypeCustom("Bool", []
{
auto type = DataTypeFactory::instance().get("UInt8");
return std::make_pair(type, std::make_unique<DataTypeCustomDesc>(
std::make_unique<DataTypeCustomFixedName>("Bool"), std::make_unique<SerializationBool>(type->getDefaultSerialization())));
});
factory.registerAlias("bool", "Bool", DataTypeFactory::CaseInsensitive);
factory.registerAlias("boolean", "Bool", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -209,6 +209,7 @@ DataTypeFactory::DataTypeFactory()
registerDataTypeInterval(*this);
registerDataTypeLowCardinality(*this);
registerDataTypeDomainIPv4AndIPv6(*this);
registerDataTypeDomainBool(*this);
registerDataTypeDomainSimpleAggregateFunction(*this);
registerDataTypeDomainGeo(*this);
registerDataTypeMap(*this);

View File

@ -85,6 +85,7 @@ void registerDataTypeNested(DataTypeFactory & factory);
void registerDataTypeInterval(DataTypeFactory & factory);
void registerDataTypeLowCardinality(DataTypeFactory & factory);
void registerDataTypeDomainIPv4AndIPv6(DataTypeFactory & factory);
void registerDataTypeDomainBool(DataTypeFactory & factory);
void registerDataTypeDomainSimpleAggregateFunction(DataTypeFactory & factory);
void registerDataTypeDomainGeo(DataTypeFactory & factory);

View File

@ -57,8 +57,6 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
/// These synonyms are added for compatibility.
factory.registerAlias("TINYINT", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BOOL", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BOOLEAN", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INT1", "Int8", DataTypeFactory::CaseInsensitive); /// MySQL
factory.registerAlias("BYTE", "Int8", DataTypeFactory::CaseInsensitive); /// MS Access
factory.registerAlias("SMALLINT", "Int16", DataTypeFactory::CaseInsensitive);

View File

@ -0,0 +1,169 @@
#include <DataTypes/Serializations/SerializationBool.h>
#include <Columns/ColumnsNumber.h>
#include <Common/Exception.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING;
extern const int ILLEGAL_COLUMN;
}
SerializationBool::SerializationBool(const SerializationPtr &nested_)
: SerializationCustomSimpleText(nested_)
{
}
void SerializationBool::serializeText(const IColumn &column, size_t row_num, WriteBuffer &ostr, const FormatSettings &) const
{
const auto *col = checkAndGetColumn<ColumnUInt8>(&column);
if (!col)
throw Exception("Bool type can only serialize columns of type UInt8." + column.getName(),
ErrorCodes::ILLEGAL_COLUMN);
if (col->getData()[row_num])
ostr.write(str_true, sizeof(str_true) - 1);
else
ostr.write(str_false, sizeof(str_false) - 1);
}
void SerializationBool::deserializeText(IColumn &column, ReadBuffer &istr, const FormatSettings & settings, bool whole) const
{
ColumnUInt8 *col = typeid_cast<ColumnUInt8 *>(&column);
if (!col)
{
throw Exception("Bool type can only deserialize columns of type UInt8." + column.getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
if (!istr.eof())
{
bool value = false;
if (*istr.position() == 't' || *istr.position() == 'f' || *istr.position() == 'T' || *istr.position() == 'F')
readBoolTextWord(value, istr, true);
else if (*istr.position() == '1' || *istr.position() == '0')
readBoolText(value, istr);
else
throw Exception("Invalid boolean value, should be true/false, TRUE/FALSE, 1/0.",
ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
col->insert(value);
}
else
throw Exception("Expected boolean value but get EOF.", ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
if (whole && !istr.eof())
throwUnexpectedDataAfterParsedValue(column, istr, settings, "Bool");
}
void SerializationBool::serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
const auto *col = checkAndGetColumn<ColumnUInt8>(&column);
if (!col)
throw Exception("Bool type can only serialize columns of type UInt8." + column.getName(),
ErrorCodes::ILLEGAL_COLUMN);
if (col->getData()[row_num])
{
writeString(settings.bool_true_representation, ostr);
}
else
{
writeString(settings.bool_false_representation, ostr);
}
}
void SerializationBool::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (istr.eof())
throw Exception("Expected boolean value but get EOF.", ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
String input;
readEscapedString(input, istr);
deserializeFromString(column, input, settings);
}
void SerializationBool::serializeTextJSON(const IColumn &column, size_t row_num, WriteBuffer &ostr, const FormatSettings &settings) const
{
serializeText(column, row_num, ostr, settings);
}
void SerializationBool::deserializeTextJSON(IColumn &column, ReadBuffer &istr, const FormatSettings &) const
{
ColumnUInt8 *col = typeid_cast<ColumnUInt8 *>(&column);
if (!col)
{
throw Exception("Bool type can only deserialize columns of type UInt8." + column.getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
if (!istr.eof())
{
bool value = false;
if (*istr.position() == 't' || *istr.position() == 'f')
readBoolTextWord(value, istr);
else if (*istr.position() == '1' || *istr.position() == '0')
readBoolText(value, istr);
else
throw Exception("Invalid boolean value, should be true/false, 1/0.",
ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
col->insert(value);
}
else
throw Exception("Expected boolean value but get EOF.", ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
}
void SerializationBool::serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeTextEscaped(column, row_num, ostr, settings);
}
void SerializationBool::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (istr.eof())
throw Exception("Expected boolean value but get EOF.", ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
String input;
readCSVString(input, istr, settings.csv);
deserializeFromString(column, input, settings);
}
void SerializationBool::serializeTextRaw(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeTextEscaped(column, row_num, ostr, settings);
}
void SerializationBool::deserializeTextRaw(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
if (istr.eof())
throw Exception("Expected boolean value but get EOF.", ErrorCodes::CANNOT_PARSE_DOMAIN_VALUE_FROM_STRING);
String input;
readString(input, istr);
deserializeFromString(column, input, settings);
}
void SerializationBool::deserializeFromString(IColumn & column, String & input, const FormatSettings & settings)
{
ColumnUInt8 * col = typeid_cast<ColumnUInt8 *>(&column);
if (!col)
{
throw Exception("Bool type can only deserialize columns of type UInt8." + column.getName(), ErrorCodes::ILLEGAL_COLUMN);
}
if (settings.bool_true_representation == input)
{
col->insert(true);
}
else if (settings.bool_false_representation == input)
{
col->insert(false);
}
else
throw Exception("Invalid boolean value, should be " + settings.bool_true_representation + " or " + settings.bool_false_representation + " controlled by setting bool_true_representation and bool_false_representation.", ErrorCodes::ILLEGAL_COLUMN);
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <DataTypes/Serializations/SerializationCustomSimpleText.h>
namespace DB
{
class SerializationBool final : public SerializationCustomSimpleText
{
private:
static constexpr char str_true[5] = "true";
static constexpr char str_false[6] = "false";
public:
SerializationBool(const SerializationPtr & nested_);
void serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const override;
void deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,bool whole) const override;
void serializeTextEscaped(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextCSV(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
void serializeTextRaw(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
void deserializeTextRaw(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;
protected:
static void deserializeFromString(IColumn & column, String & input, const FormatSettings & settings);
};
}

View File

@ -70,6 +70,8 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.date_time_input_format = settings.date_time_input_format;
format_settings.date_time_output_format = settings.date_time_output_format;
format_settings.bool_true_representation = settings.bool_true_representation;
format_settings.bool_false_representation = settings.bool_false_representation;
format_settings.enable_streaming = settings.output_format_enable_streaming;
format_settings.import_nested_json = settings.input_format_import_nested_json;
format_settings.input_allow_errors_num = settings.input_format_allow_errors_num;

View File

@ -82,6 +82,9 @@ struct FormatSettings
UInt64 output_rows_in_file = 1;
} avro;
String bool_true_representation = "true";
String bool_false_representation = "false";
struct CSV
{
char delimiter = ',';

View File

@ -662,18 +662,13 @@ private:
{
auto return_type = impl.getReturnTypeImpl(arguments);
if (!areTypesEqual(return_type, result_type))
if (!return_type->equals(*result_type))
throw Exception{"Dictionary attribute has different type " + return_type->getName() + " expected " + result_type->getName(),
ErrorCodes::TYPE_MISMATCH};
return impl.executeImpl(arguments, return_type, input_rows_count);
}
static bool areTypesEqual(const DataTypePtr & lhs, const DataTypePtr & rhs)
{
return removeNullable(recursiveRemoveLowCardinality(lhs))->equals(*removeNullable(recursiveRemoveLowCardinality(rhs)));
}
const FunctionDictGetNoType<dictionary_get_function_type> impl;
};

View File

@ -76,7 +76,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const double lon = col_lon->getFloat64(row);
const double lat = col_lat->getFloat64(row);

View File

@ -58,7 +58,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const int resolution = col_hindex->getUInt(row);
if (resolution > MAX_H3_RES)

View File

@ -63,7 +63,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 resolution = col_hindex->getUInt(row);
if (resolution > MAX_H3_RES)

View File

@ -55,7 +55,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 hindex = col_hindex->getUInt(row);

View File

@ -55,7 +55,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 hindex = col_hindex->getUInt(row);

View File

@ -58,7 +58,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 resolution = col_hindex->getUInt(row);
if (resolution > MAX_H3_RES)

View File

@ -63,7 +63,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 hindex_origin = col_hindex_origin->getUInt(row);
const UInt64 hindex_dest = col_hindex_dest->getUInt(row);

View File

@ -55,7 +55,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 hindex = col_hindex->getUInt(row);

View File

@ -76,7 +76,7 @@ public:
std::vector<H3Index> hindex_vec;
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 parent_hindex = col_hindex->getUInt(row);
const UInt8 child_resolution = col_resolution->getUInt(row);

View File

@ -66,7 +66,7 @@ public:
auto & dst_data = dst->getData();
dst_data.resize(input_rows_count);
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const UInt64 hindex = col_hindex->getUInt(row);
const UInt8 resolution = col_resolution->getUInt(row);

View File

@ -73,7 +73,7 @@ public:
std::vector<H3Index> hindex_vec;
for (const auto row : collections::range(0, input_rows_count))
for (size_t row = 0; row < input_rows_count; row++)
{
const H3Index origin_hindex = col_hindex->getUInt(row);
const int k = col_k->getInt(row);

View File

@ -28,6 +28,8 @@ CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, Wri
void CascadeWriteBuffer::nextImpl()
{
if (!curr_buffer)
return;
try
{
curr_buffer->position() = position();

View File

@ -48,6 +48,7 @@ struct Memory;
namespace ErrorCodes
{
extern const int CANNOT_PARSE_DATE;
extern const int CANNOT_PARSE_BOOL;
extern const int CANNOT_PARSE_DATETIME;
extern const int CANNOT_PARSE_UUID;
extern const int CANNOT_READ_ARRAY_FROM_TEXT;
@ -231,20 +232,45 @@ inline void readBoolText(bool & x, ReadBuffer & buf)
x = tmp != '0';
}
inline void readBoolTextWord(bool & x, ReadBuffer & buf)
inline void readBoolTextWord(bool & x, ReadBuffer & buf, bool support_upper_case = false)
{
if (buf.eof())
throwReadAfterEOF();
if (*buf.position() == 't')
switch (*buf.position())
{
assertString("true", buf);
x = true;
}
else
{
assertString("false", buf);
x = false;
case 't':
assertString("true", buf);
x = true;
break;
case 'f':
assertString("false", buf);
x = false;
break;
case 'T':
{
if (support_upper_case)
{
assertString("TRUE", buf);
x = true;
break;
}
else
[[fallthrough]];
}
case 'F':
{
if (support_upper_case)
{
assertString("FALSE", buf);
x = false;
break;
}
else
[[fallthrough]];
}
default:
throw ParsingException("Unexpected Bool value", ErrorCodes::CANNOT_PARSE_BOOL);
}
}

View File

@ -341,6 +341,8 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in)
negative = true;
++in.position();
}
else if (*in.position() == '+')
++in.position();
auto count_after_sign = in.count();

View File

@ -40,7 +40,7 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
{"TINYINT", "Int8"}, {"SMALLINT", "Int16"}, {"MEDIUMINT", "Int32"}, {"INT", "Int32"},
{"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"},
{"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"},
{"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Int8"}
{"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}
};
for (const auto & [test_type, mapped_type] : test_types)
@ -104,7 +104,7 @@ TEST(MySQLCreateRewritten, PartitionPolicy)
{"INTEGER", "Int32", " PARTITION BY intDiv(key, 4294967)"}, {"BIGINT", "Int64", " PARTITION BY intDiv(key, 18446744073709551)"},
{"FLOAT", "Float32", ""}, {"DOUBLE", "Float64", ""}, {"VARCHAR(10)", "String", ""}, {"CHAR(10)", "String", ""},
{"Date", "Date", " PARTITION BY toYYYYMM(key)"}, {"DateTime", "DateTime", " PARTITION BY toYYYYMM(key)"},
{"TIMESTAMP", "DateTime", " PARTITION BY toYYYYMM(key)"}, {"BOOLEAN", "Int8", " PARTITION BY key"}
{"TIMESTAMP", "DateTime", " PARTITION BY toYYYYMM(key)"}, {"BOOLEAN", "Bool", " PARTITION BY key"}
};
for (const auto & [test_type, mapped_type, partition_policy] : test_types)
@ -135,7 +135,7 @@ TEST(MySQLCreateRewritten, OrderbyPolicy)
{"INTEGER", "Int32", " PARTITION BY intDiv(key, 4294967)"}, {"BIGINT", "Int64", " PARTITION BY intDiv(key, 18446744073709551)"},
{"FLOAT", "Float32", ""}, {"DOUBLE", "Float64", ""}, {"VARCHAR(10)", "String", ""}, {"CHAR(10)", "String", ""},
{"Date", "Date", " PARTITION BY toYYYYMM(key)"}, {"DateTime", "DateTime", " PARTITION BY toYYYYMM(key)"},
{"TIMESTAMP", "DateTime", " PARTITION BY toYYYYMM(key)"}, {"BOOLEAN", "Int8", " PARTITION BY key"}
{"TIMESTAMP", "DateTime", " PARTITION BY toYYYYMM(key)"}, {"BOOLEAN", "Bool", " PARTITION BY key"}
};
for (const auto & [test_type, mapped_type, partition_policy] : test_types)

View File

@ -12,7 +12,6 @@ class ASTPartition : public IAST
{
public:
ASTPtr value;
String fields_str; /// The extent of comma-separated partition expression fields without parentheses.
size_t fields_count = 0;
String id;

View File

@ -1523,6 +1523,23 @@ bool ParserNull::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
bool ParserBool::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
if (ParserKeyword("true").parse(pos, node, expected))
{
node = std::make_shared<ASTLiteral>(true);
return true;
}
else if (ParserKeyword("false").parse(pos, node, expected))
{
node = std::make_shared<ASTLiteral>(false);
return true;
}
else
return false;
}
static bool parseNumber(char * buffer, size_t size, bool negative, int base, Field & res)
{
errno = 0; /// Functions strto* don't clear errno.
@ -1754,6 +1771,7 @@ bool ParserLiteral::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserNull null_p;
ParserNumber num_p;
ParserBool bool_p;
ParserStringLiteral str_p;
if (null_p.parse(pos, node, expected))
@ -1762,6 +1780,9 @@ bool ParserLiteral::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (num_p.parse(pos, node, expected))
return true;
if (bool_p.parse(pos, node, expected))
return true;
if (str_p.parse(pos, node, expected))
return true;

View File

@ -294,6 +294,14 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Bool literal.
*/
class ParserBool : public IParserBase
{
protected:
const char * getName() const override { return "Bool"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Numeric literal.
*/

View File

@ -35,7 +35,6 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
size_t fields_count;
String fields_str;
const auto * tuple_ast = value->as<ASTFunction>();
bool surrounded_by_parens = false;
@ -58,7 +57,6 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
else
{
fields_count = 1;
fields_str = String(begin->begin, pos->begin - begin->begin);
}
}
else
@ -78,13 +76,10 @@ bool ParserPartition::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
--right_paren;
if (right_paren->type != TokenType::ClosingRoundBracket)
return false;
fields_str = String(left_paren->end, right_paren->begin - left_paren->end);
}
partition->value = value;
partition->children.push_back(value);
partition->fields_str = std::move(fields_str);
partition->fields_count = fields_count;
}

View File

@ -37,6 +37,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -342,6 +343,10 @@ ConstantExpressionTemplate::TemplateStructure::TemplateStructure(LiteralsInfo &
auto syntax_result = TreeRewriter(context).analyze(expression, literals.getNamesAndTypesList());
result_column_name = expression->getColumnName();
actions_on_literals = ExpressionAnalyzer(expression, syntax_result, context).getActions(false);
if (actions_on_literals->hasArrayJoin())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Array joins are not allowed in constant expressions for IN, VALUES, LIMIT and similar sections.");
}
size_t ConstantExpressionTemplate::TemplateStructure::getTemplateHash(const ASTPtr & expression,

View File

@ -311,14 +311,11 @@ Pipe StorageFileLog::read(
unsigned /* num_streams */)
{
/// If there are MVs depended on this table, we just forbid reading
if (has_dependent_mv)
{
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED,
"Can not make `SELECT` query from table {}, because it has attached dependencies. Remove dependent materialized views if "
"needed",
getStorageID().getTableName());
}
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views");
std::lock_guard<std::mutex> lock(file_infos_mutex);
if (running_streams)
@ -585,9 +582,9 @@ void StorageFileLog::threadFunc()
if (dependencies_count)
{
has_dependent_mv = true;
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled)
{
@ -629,6 +626,8 @@ void StorageFileLog::threadFunc()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
// Wait for attached views
if (!task->stream_cancelled)
{

View File

@ -170,7 +170,7 @@ private:
bool has_new_events = false;
std::condition_variable cv;
bool has_dependent_mv = false;
std::atomic<bool> mv_attached = false;
std::mutex file_infos_mutex;

View File

@ -31,6 +31,7 @@ class ASTStorage;
M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
M(HandleKafkaErrorMode, kafka_handle_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
/** TODO: */
/* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */

View File

@ -23,7 +23,7 @@ public:
const Names & columns,
Poco::Logger * log_,
size_t max_block_size_,
bool commit_in_suffix = true);
bool commit_in_suffix = false);
~KafkaSource() override;
String getName() const override { return storage.getName(); }

View File

@ -49,6 +49,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int QUERY_NOT_ALLOWED;
}
struct StorageKafkaInterceptors
@ -271,6 +272,12 @@ Pipe StorageKafka::read(
if (num_created_consumers == 0)
return {};
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(num_created_consumers);
@ -283,7 +290,7 @@ Pipe StorageKafka::read(
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
pipes.emplace_back(std::make_shared<KafkaSource>(*this, metadata_snapshot, modified_context, column_names, log, 1));
pipes.emplace_back(std::make_shared<KafkaSource>(*this, metadata_snapshot, modified_context, column_names, log, 1, kafka_settings->kafka_commit_on_select));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
@ -557,6 +564,8 @@ void StorageKafka::threadFunc(size_t idx)
{
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled && num_created_consumers > 0)
{
@ -588,6 +597,8 @@ void StorageKafka::threadFunc(size_t idx)
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
// Wait for attached views
if (!task->stream_cancelled)
task->holder->scheduleAfter(RESCHEDULE_MS);

View File

@ -90,6 +90,8 @@ private:
const bool intermediate_commit;
const SettingsChanges settings_adjustments;
std::atomic<bool> mv_attached = false;
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0; /// number of actually created consumers.

View File

@ -1283,7 +1283,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
}
}
String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix) const
String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix, bool detached) const
{
String res;
@ -1292,11 +1292,20 @@ String IMergeTreeDataPart::getRelativePathForPrefix(const String & prefix) const
* This is done only in the case of `to_detached`, because it is assumed that in this case the exact name does not matter.
* No more than 10 attempts are made so that there are not too many junk directories left.
*/
auto full_relative_path = fs::path(storage.relative_data_path);
if (detached)
full_relative_path /= "detached";
if (detached && parent_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot detach projection");
else if (parent_part)
full_relative_path /= parent_part->relative_path;
for (int try_no = 0; try_no < 10; try_no++)
{
res = (prefix.empty() ? "" : prefix + "_") + name + (try_no ? "_try" + DB::toString(try_no) : "");
if (!volume->getDisk()->exists(fs::path(getFullRelativePath()) / res))
if (!volume->getDisk()->exists(full_relative_path / res))
return res;
LOG_WARNING(storage.log, "Directory {} (to detach to) already exists. Will detach to directory with '_tryN' suffix.", res);
@ -1312,7 +1321,7 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
assert(prefix.empty() || std::find(DetachedPartInfo::DETACH_REASONS.begin(),
DetachedPartInfo::DETACH_REASONS.end(),
prefix) != DetachedPartInfo::DETACH_REASONS.end());
return "detached/" + getRelativePathForPrefix(prefix);
return "detached/" + getRelativePathForPrefix(prefix, /* detached */ true);
}
void IMergeTreeDataPart::renameToDetached(const String & prefix) const

View File

@ -357,7 +357,7 @@ public:
/// Calculate column and secondary indices sizes on disk.
void calculateColumnsAndSecondaryIndicesSizesOnDisk();
String getRelativePathForPrefix(const String & prefix) const;
String getRelativePathForPrefix(const String & prefix, bool detached = false) const;
bool isProjectionPart() const { return parent_part != nullptr; }

View File

@ -26,6 +26,8 @@
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTNameTypePair.h>
@ -3646,56 +3648,54 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
/// Re-parse partition key fields using the information about expected field types.
auto metadata_snapshot = getInMemoryMetadataPtr();
size_t fields_count = metadata_snapshot->getPartitionKey().sample_block.columns();
const Block & key_sample_block = metadata_snapshot->getPartitionKey().sample_block;
size_t fields_count = key_sample_block.columns();
if (partition_ast.fields_count != fields_count)
throw Exception(
"Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
", must be: " + toString(fields_count),
ErrorCodes::INVALID_PARTITION_VALUE);
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE,
"Wrong number of fields in the partition expression: {}, must be: {}",
partition_ast.fields_count, fields_count);
if (auto * f = partition_ast.value->as<ASTFunction>())
{
assert(f->name == "tuple");
if (f->arguments && !f->arguments->as<ASTExpressionList>()->children.empty())
{
ASTPtr query = partition_ast.value->clone();
auto syntax_analyzer_result
= TreeRewriter(local_context)
.analyze(query, metadata_snapshot->getPartitionKey().sample_block.getNamesAndTypesList(), {}, {}, false, false);
auto actions = ExpressionAnalyzer(query, syntax_analyzer_result, local_context).getActions(true);
if (actions->hasArrayJoin())
throw Exception("The partition expression cannot contain array joins", ErrorCodes::INVALID_PARTITION_VALUE);
}
}
const FormatSettings format_settings;
Row partition_row(fields_count);
if (fields_count)
if (fields_count == 0)
{
ConcatReadBuffer buf;
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>("(", 1));
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(partition_ast.fields_str.data(), partition_ast.fields_str.size()));
buf.appendBuffer(std::make_unique<ReadBufferFromMemory>(")", 1));
/// Function tuple(...) requires at least one argument, so empty key is a special case
assert(!partition_ast.fields_count);
assert(typeid_cast<ASTFunction *>(partition_ast.value.get()));
assert(partition_ast.value->as<ASTFunction>()->name == "tuple");
assert(partition_ast.value->as<ASTFunction>()->arguments);
bool empty_tuple = partition_ast.value->as<ASTFunction>()->arguments->children.empty();
if (!empty_tuple)
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE, "Partition key is empty, expected 'tuple()' as partition key");
}
else if (fields_count == 1)
{
ASTPtr partition_value_ast = partition_ast.value;
if (auto * tuple = partition_value_ast->as<ASTFunction>())
{
assert(tuple->name == "tuple");
assert(tuple->arguments);
assert(tuple->arguments->children.size() == 1);
partition_value_ast = tuple->arguments->children[0];
}
/// Simple partition key, need to evaluate and cast
Field partition_key_value = evaluateConstantExpression(partition_value_ast, local_context).first;
partition_row[0] = convertFieldToTypeOrThrow(partition_key_value, *key_sample_block.getByPosition(0).type);
}
else
{
/// Complex key, need to evaluate, untuple and cast
Field partition_key_value = evaluateConstantExpression(partition_ast.value, local_context).first;
if (partition_key_value.getType() != Field::Types::Tuple)
throw Exception(ErrorCodes::INVALID_PARTITION_VALUE,
"Expected tuple for complex partition key, got {}", partition_key_value.getTypeName());
auto input_format = local_context->getInputFormat(
"Values",
buf,
metadata_snapshot->getPartitionKey().sample_block,
local_context->getSettingsRef().max_block_size);
QueryPipeline pipeline(std::move(input_format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
if (!block || !block.rows())
throw Exception(
"Could not parse partition value: `" + partition_ast.fields_str + "`",
ErrorCodes::INVALID_PARTITION_VALUE);
const Tuple & tuple = partition_key_value.get<Tuple>();
if (tuple.size() != fields_count)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Wrong number of fields in the partition expression: {}, must be: {}", tuple.size(), fields_count);
for (size_t i = 0; i < fields_count; ++i)
block.getByPosition(i).column->get(0, partition_row[i]);
partition_row[i] = convertFieldToTypeOrThrow(tuple[i], *key_sample_block.getByPosition(i).type);
}
MergeTreePartition partition(std::move(partition_row));
@ -3707,11 +3707,10 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{
WriteBufferFromOwnString buf;
writeCString("Parsed partition value: ", buf);
partition.serializeText(*this, buf, format_settings);
writeCString(" doesn't match partition value for an existing part with the same partition ID: ", buf);
writeString(existing_part_in_partition->name, buf);
throw Exception(buf.str(), ErrorCodes::INVALID_PARTITION_VALUE);
partition.serializeText(*this, buf, FormatSettings{});
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parsed partition value: {} "
"doesn't match partition value for an existing part with the same partition ID: {}",
buf.str(), existing_part_in_partition->name);
}
}

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_PARTITION_VALUE;
}
namespace
@ -182,6 +183,8 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
/// In case all partition fields are represented by integral types, try to produce a human-readable ID.
/// Otherwise use a hex-encoded hash.
/// NOTE It will work in unexpected way if some partition key column is Nullable:
/// are_all_integral will be false if some value is NULL. Maybe we should fix it.
bool are_all_integral = true;
for (const Field & field : value)
{
@ -228,6 +231,94 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
return result;
}
std::optional<Row> MergeTreePartition::tryParseValueFromID(const String & partition_id, const Block & partition_key_sample)
{
size_t num_keys = partition_key_sample.columns();
Row res;
res.reserve(num_keys);
ReadBufferFromString buf(partition_id);
if (num_keys == 0)
{
checkString("all", buf);
assertEOF(buf);
return res;
}
enum KeyType { DATE, UNSIGNED, SIGNED };
std::vector<KeyType> key_types;
key_types.reserve(num_keys);
for (size_t i = 0; i < num_keys; ++i)
{
auto type = partition_key_sample.getByPosition(i).type;
/// NOTE Sometimes it's possible to parse Nullable key, but easier to ignore it.
if (type->isNullable())
return {};
/// We use Field::Types when serializing partition_id, let's get some Field to check type
Field sample_field = type->getDefault();
if (typeid_cast<const DataTypeDate *>(type.get()))
key_types.emplace_back(DATE);
else if (sample_field.getType() == Field::Types::UInt64)
key_types.emplace_back(UNSIGNED);
else if (sample_field.getType() == Field::Types::Int64)
key_types.emplace_back(SIGNED);
else
return {};
}
/// All columns are numeric, will parse partition value
for (size_t i = 0; i < num_keys; ++i)
{
switch (key_types[i])
{
case DATE:
{
UInt32 date_yyyymmdd;
readText(date_yyyymmdd, buf);
constexpr UInt32 min_yyyymmdd = 10000000;
constexpr UInt32 max_yyyymmdd = 99999999;
if (date_yyyymmdd < min_yyyymmdd || max_yyyymmdd < date_yyyymmdd)
throw Exception(
ErrorCodes::INVALID_PARTITION_VALUE, "Cannot parse partition_id: got unexpected Date: {}", date_yyyymmdd);
UInt32 date = DateLUT::instance().YYYYMMDDToDayNum(date_yyyymmdd);
res.emplace_back(date);
break;
}
case UNSIGNED:
{
UInt64 value;
readText(value, buf);
res.emplace_back(value);
break;
}
case SIGNED:
{
Int64 value;
readText(value, buf);
res.emplace_back(value);
break;
}
}
if (i + 1 != num_keys)
assertChar('-', buf);
}
assertEOF(buf);
String expected_partition_id = MergeTreePartition{res}.getID(partition_key_sample);
if (expected_partition_id != partition_id)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Partition ID was parsed incorrectly: expected {}, got {}",
expected_partition_id, partition_id);
return res;
}
void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();

View File

@ -33,6 +33,8 @@ public:
String getID(const MergeTreeData & storage) const;
String getID(const Block & partition_key_sample) const;
static std::optional<Row> tryParseValueFromID(const String & partition_id, const Block & partition_key_sample);
void serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const;
void load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path);

View File

@ -30,6 +30,7 @@ namespace DB
M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \
M(String, rabbitmq_username, "", "RabbitMQ username", 0) \
M(String, rabbitmq_password, "", "RabbitMQ password", 0) \
M(Bool, rabbitmq_commit_on_select, false, "Commit messages when select query is made", 0) \
#define LIST_OF_RABBITMQ_SETTINGS(M) \
RABBITMQ_RELATED_SETTINGS(M) \

View File

@ -18,7 +18,7 @@ public:
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix = true);
bool ack_in_suffix = false);
~RabbitMQSource() override;

View File

@ -45,6 +45,7 @@ namespace ErrorCodes
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING;
extern const int QUERY_NOT_ALLOWED;
}
namespace ExchangeType
@ -645,11 +646,16 @@ Pipe StorageRabbitMQ::read(
if (num_created_consumers == 0)
return {};
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageRabbitMQ with attached materialized views");
std::lock_guard lock(loop_mutex);
auto sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto modified_context = addSettings(local_context);
auto block_size = getMaxBlockSize();
if (!connection->isConnected())
{
@ -667,7 +673,7 @@ Pipe StorageRabbitMQ::read(
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_source = std::make_shared<RabbitMQSource>(
*this, metadata_snapshot, modified_context, column_names, block_size);
*this, metadata_snapshot, modified_context, column_names, 1, rabbitmq_settings->rabbitmq_commit_on_select);
auto converting_dag = ActionsDAG::makeConvertingActions(
rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(),
@ -938,6 +944,8 @@ void StorageRabbitMQ::streamingToViewsFunc()
initializeBuffers();
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!shutdown_called && num_created_consumers > 0)
{
@ -976,6 +984,8 @@ void StorageRabbitMQ::streamingToViewsFunc()
}
}
mv_attached.store(false);
/// If there is no running select, stop the loop which was
/// activated by previous select.
if (connection->getHandler().loopRunning())

View File

@ -148,6 +148,7 @@ private:
/// For select query we must be aware of the end of streaming
/// to be able to turn off the loop.
std::atomic<size_t> readers_count = 0;
std::atomic<bool> mv_attached = false;
/// In select query we start event loop, but do not stop it
/// after that select is finished. Then in a thread, which

View File

@ -1077,7 +1077,7 @@ Int64 StorageMergeTree::getUpdatedDataVersion(
return part->info.getDataVersion();
}
Int64 StorageMergeTree::getCurrentMutationVersion(
UInt64 StorageMergeTree::getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock) const
{

View File

@ -192,10 +192,14 @@ private:
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder, std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock, bool & were_some_mutations_for_some_parts_skipped);
Int64 getCurrentMutationVersion(
/// For current mutations queue, returns maximum version of mutation for a part,
/// with respect of mutations which would not change it.
/// Returns 0 if there is no such mutation in active status.
UInt64 getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
/// Returns maximum version of a part, with respect of mutations which would not change it.
Int64 getUpdatedDataVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;

View File

@ -7381,13 +7381,23 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
{
auto lock = lockParts();
auto parts_in_partition = getDataPartsPartitionRange(new_part_info.partition_id);
if (parts_in_partition.empty())
if (!parts_in_partition.empty())
{
LOG_WARNING(log, "Empty part {} is not created instead of lost part because there are no parts in partition {} (it's empty), resolve this manually using DROP PARTITION.", lost_part_name, new_part_info.partition_id);
new_data_part->partition = (*parts_in_partition.begin())->partition;
}
else if (auto parsed_partition = MergeTreePartition::tryParseValueFromID(
new_part_info.partition_id,
metadata_snapshot->getPartitionKey().sample_block))
{
new_data_part->partition = MergeTreePartition(*parsed_partition);
}
else
{
LOG_WARNING(log, "Empty part {} is not created instead of lost part because there are no parts in partition {} (it's empty), "
"resolve this manually using DROP/DETACH PARTITION.", lost_part_name, new_part_info.partition_id);
return false;
}
new_data_part->partition = (*parts_in_partition.begin())->partition;
}
new_data_part->minmax_idx = std::move(minmax_idx);

View File

@ -57,7 +57,7 @@ def get_packager_cmd(build_config, packager_path, output_path, build_version, im
cmd += ' --ccache_dir={}'.format(ccache_path)
if 'alien_pkgs' in build_config and build_config['alien_pkgs']:
if pr_info == 0 or 'release' in pr_info.labels:
if pr_info.number == 0 or 'release' in pr_info.labels:
cmd += ' --alien-pkgs rpm tgz'
cmd += ' --docker-image-version={}'.format(image_version)

View File

@ -0,0 +1,13 @@
FROM public.ecr.aws/lambda/python:3.9
# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
import json
import time
import jwt
import requests
import boto3
# https://docs.github.com/en/rest/reference/actions#cancel-a-workflow-run
#
API_URL = 'https://api.github.com/repos/ClickHouse/ClickHouse'
MAX_RETRY = 5
def get_installation_id(jwt_token):
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get("https://api.github.com/app/installations", headers=headers)
response.raise_for_status()
data = response.json()
return data[0]['id']
def get_access_token(jwt_token, installation_id):
headers = {
"Authorization": f"Bearer {jwt_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.post(f"https://api.github.com/app/installations/{installation_id}/access_tokens", headers=headers)
response.raise_for_status()
data = response.json()
return data['token']
def get_key_and_app_from_aws():
secret_name = "clickhouse_github_secret_key"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
)
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
data = json.loads(get_secret_value_response['SecretString'])
return data['clickhouse-app-key'], int(data['clickhouse-app-id'])
def get_token_from_aws():
private_key, app_id = get_key_and_app_from_aws()
payload = {
"iat": int(time.time()) - 60,
"exp": int(time.time()) + (10 * 60),
"iss": app_id,
}
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256")
installation_id = get_installation_id(encoded_jwt)
return get_access_token(encoded_jwt, installation_id)
def _exec_get_with_retry(url):
for i in range(MAX_RETRY):
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
time.sleep(i + 1)
raise Exception("Cannot execute GET request with retries")
def get_workflows_cancel_urls_for_pull_request(pull_request_event):
head_branch = pull_request_event['head']['ref']
print("PR", pull_request_event['number'], "has head ref", head_branch)
workflows = _exec_get_with_retry(API_URL + f"/actions/runs?branch={head_branch}")
workflows_urls_to_cancel = set([])
for workflow in workflows['workflow_runs']:
if workflow['status'] != 'completed':
print("Workflow", workflow['url'], "not finished, going to be cancelled")
workflows_urls_to_cancel.add(workflow['cancel_url'])
else:
print("Workflow", workflow['url'], "already finished, will not try to cancel")
return workflows_urls_to_cancel
def _exec_post_with_retry(url, token):
headers = {
"Authorization": f"token {token}"
}
for i in range(MAX_RETRY):
try:
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
time.sleep(i + 1)
raise Exception("Cannot execute POST request with retry")
def cancel_workflows(urls_to_cancel, token):
for url in urls_to_cancel:
print("Cancelling workflow using url", url)
_exec_post_with_retry(url, token)
print("Workflow cancelled")
def main(event):
token = get_token_from_aws()
event_data = json.loads(event['body'])
print("Got event for PR", event_data['number'])
action = event_data['action']
print("Got action", event_data['action'])
pull_request = event_data['pull_request']
labels = { l['name'] for l in pull_request['labels'] }
print("PR has labels", labels)
if action == 'closed' or 'do not test' in labels:
print("PR merged/closed or manually labeled 'do not test' will kill workflows")
workflows_to_cancel = get_workflows_cancel_urls_for_pull_request(pull_request)
print(f"Found {len(workflows_to_cancel)} workflows to cancel")
cancel_workflows(workflows_to_cancel, token)
else:
print("Nothing to do")
def handler(event, _):
main(event)

View File

@ -0,0 +1,3 @@
requests
PyJWT
cryptography

View File

@ -34,128 +34,6 @@
{
"compiler": "clang-13",
"build-type": "",
"sanitizer": "address",
"package-type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13",
"build-type": "",
"sanitizer": "undefined",
"package-type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13",
"build-type": "",
"sanitizer": "thread",
"package-type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13",
"build-type": "",
"sanitizer": "memory",
"package-type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13",
"build-type": "debug",
"sanitizer": "",
"package-type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
}
],
"special_build_config": [
{
"compiler": "clang-13",
"build-type": "debug",
"sanitizer": "",
"package-type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "enable",
"with_coverage": false
},
{
"compiler": "clang-13",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
"splitted": "splitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13-darwin",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13-aarch64",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13-freebsd",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13-darwin-aarch64",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": false
},
{
"compiler": "clang-13-ppc64le",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
"bundled": "bundled",
@ -165,354 +43,6 @@
}
],
"tests_config": {
"Functional stateful tests (address)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (thread)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "thread",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (memory)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "memory",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (ubsan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "undefined",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (debug)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "debug",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (release)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (release, DatabaseOrdinary)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateful tests (release, DatabaseReplicated)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (address)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (thread)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "thread",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (memory)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "memory",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (ubsan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "undefined",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (debug)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "debug",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (release)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (release, wide parts enabled)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (release, DatabaseOrdinary)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Functional stateless tests (release, DatabaseReplicated)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Stress test (address)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Stress test (thread)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "thread",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Stress test (undefined)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "undefined",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Stress test (memory)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "memory",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Stress test (debug)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "debug",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Integration tests (asan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Integration tests (thread)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "thread",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Integration tests (release)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Integration tests (memory)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "memory",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Integration tests flaky check (asan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Compatibility check": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Split build smoke test": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "splitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Testflows check": {
"required_build_properties": {
"compiler": "clang-13",
@ -525,138 +55,6 @@
"with_coverage": false
}
},
"Unit tests release gcc": {
"required_build_properties": {
"compiler": "gcc-11",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Unit tests release clang": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Unit tests ASAN": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Unit tests MSAN": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "memory",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Unit tests TSAN": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "thread",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Unit tests UBSAN": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "undefined",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"AST fuzzer (debug)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "debug",
"sanitizer": "none",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"AST fuzzer (ASan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"AST fuzzer (MSan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "memory",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"AST fuzzer (TSan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "thread",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"AST fuzzer (UBSan)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "binary",
"build_type": "relwithdebuginfo",
"sanitizer": "undefined",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"Release": {
"required_build_properties": {
"compiler": "clang-13",
@ -669,18 +67,6 @@
"with_coverage": false
}
},
"Functional stateless tests flaky check (address)": {
"required_build_properties": {
"compiler": "clang-13",
"package_type": "deb",
"build_type": "relwithdebuginfo",
"sanitizer": "address",
"bundled": "bundled",
"splitted": "unsplitted",
"clang-tidy": "disable",
"with_coverage": false
}
},
"ClickHouse Keeper Jepsen": {
"required_build_properties": {
"compiler": "clang-13",

View File

@ -143,4 +143,7 @@ if __name__ == "__main__":
# Refuse other checks to run if fast test failed
if state != 'success':
sys.exit(1)
if 'force-tests' in pr_info.labels:
print("'force-tests' enabled, will report success")
else:
sys.exit(1)

View File

@ -179,4 +179,7 @@ if __name__ == "__main__":
ch_helper.insert_events_into(db="gh-data", table="checks", events=prepared_events)
if state != 'success':
sys.exit(1)
if 'force-tests' in pr_info.labels:
print("'force-tests' enabled, will report success")
else:
sys.exit(1)

View File

@ -374,7 +374,7 @@ class TestCase:
else:
# If --database is not specified, we will create temporary database with unique name
# And we will recreate and drop it for each test
def random_str(length=6):
def random_str(length=8):
alphabet = string.ascii_lowercase + string.digits
return ''.join(random.choice(alphabet) for _ in range(length))

View File

@ -48,6 +48,7 @@ ln -sf $SRC_PATH/users.d/remote_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
</clickhouse>

View File

@ -178,18 +178,20 @@ def test_lost_part_mutation(start_cluster):
def test_lost_last_part(start_cluster):
for node in [node1, node2]:
node.query(
"CREATE TABLE mt3 (id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') ORDER BY tuple()".format(node.name))
"CREATE TABLE mt3 (id UInt64, p String) ENGINE ReplicatedMergeTree('/clickhouse/tables/t3', '{}') "
"ORDER BY tuple() PARTITION BY p".format(node.name))
node1.query("SYSTEM STOP MERGES mt3")
node2.query("SYSTEM STOP REPLICATION QUEUES")
for i in range(1):
node1.query("INSERT INTO mt3 VALUES ({})".format(i))
node1.query("INSERT INTO mt3 VALUES ({}, 'x')".format(i))
# actually not important
node1.query("ALTER TABLE mt3 UPDATE id = 777 WHERE 1", settings={"mutations_sync": "0"})
remove_part_from_disk(node1, 'mt3', 'all_0_0_0')
partition_id = node1.query("select partitionId('x')").strip()
remove_part_from_disk(node1, 'mt3', '{}_0_0_0'.format(partition_id))
# other way to detect broken parts
node1.query("CHECK TABLE mt3")
@ -199,13 +201,13 @@ def test_lost_last_part(start_cluster):
for i in range(10):
result = node1.query("SELECT count() FROM system.replication_queue")
assert int(result) <= 1, "Have a lot of entries in queue {}".format(node1.query("SELECT * FROM system.replication_queue FORMAT Vertical"))
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log("DROP PARTITION"):
if node1.contains_in_log("Cannot create empty part") and node1.contains_in_log("DROP/DETACH PARTITION"):
break
time.sleep(1)
else:
assert False, "Don't have required messages in node1 log"
node1.query("ALTER TABLE mt3 DROP PARTITION ID 'all'")
node1.query("ALTER TABLE mt3 DROP PARTITION ID '{}'".format(partition_id))
assert_eq_with_retry(node1, "SELECT COUNT() FROM mt3", "0")
assert_eq_with_retry(node1, "SELECT COUNT() FROM system.replication_queue", "0")

View File

@ -315,3 +315,19 @@ def test_system_detached_parts(drop_detached_parts_table):
q("alter table sdp_{} attach partition id '{}'".format(i, p))
assert q("select n, x, count() from merge('default', 'sdp_') group by n, x") == "0\t0\t4\n1\t1\t4\n"
def test_detached_part_dir_exists(started_cluster):
q("create table detached_part_dir_exists (n int) engine=MergeTree order by n")
q("insert into detached_part_dir_exists select 1") # will create all_1_1_0
q("alter table detached_part_dir_exists detach partition id 'all'") # will move all_1_1_0 to detached/all_1_1_0
q("detach table detached_part_dir_exists")
q("attach table detached_part_dir_exists")
q("insert into detached_part_dir_exists select 1") # will create all_1_1_0
q("insert into detached_part_dir_exists select 1") # will create all_2_2_0
instance.exec_in_container(['bash', '-c', 'mkdir /var/lib/clickhouse/data/default/detached_part_dir_exists/detached/all_2_2_0'], privileged=True)
instance.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/data/default/detached_part_dir_exists/detached/all_2_2_0/file'], privileged=True)
q("alter table detached_part_dir_exists detach partition id 'all'") # directories already exist, but it's ok
assert q("select name from system.detached_parts where table='detached_part_dir_exists' order by name") == \
"all_1_1_0\nall_1_1_0_try1\nall_2_2_0\nall_2_2_0_try1\n"
q("drop table detached_part_dir_exists")

View File

@ -2,24 +2,7 @@
<clickhouse>
<profiles>
<default>
<!--stream_poll_timeout_ms>1</stream_poll_timeout_ms>
<stream_flush_interval_ms>100</stream_flush_interval_ms-->
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</clickhouse>

View File

@ -44,6 +44,7 @@ from . import social_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml', 'configs/named_collection.xml'],
user_configs=['configs/users.xml'],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
macros={"kafka_broker":"kafka1",
@ -236,7 +237,8 @@ kafka_topic_old old
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_old}', '{kafka_group_name_old}', '{kafka_format_json_each_row}', '\\n');
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_old}', '{kafka_group_name_old}', '{kafka_format_json_each_row}', '\\n')
SETTINGS kafka_commit_on_select = 1;
''')
# Don't insert malformed messages since old settings syntax
@ -268,6 +270,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_group_name = '{kafka_group_name_new}',
kafka_format = '{kafka_format_json_each_row}',
kafka_row_delimiter = '\\n',
kafka_commit_on_select = 1,
kafka_client_id = '{kafka_client_id} test 1234',
kafka_skip_broken_messages = 1;
''')
@ -313,6 +316,7 @@ def test_kafka_json_as_string(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_group_name = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
@ -802,6 +806,7 @@ def test_kafka_issue4116(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'issue4116',
kafka_group_name = 'issue4116',
kafka_commit_on_select = 1,
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
format_csv_delimiter = '|';
@ -875,6 +880,7 @@ def test_kafka_consumer_hang(kafka_cluster):
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic_name}',
kafka_commit_on_select = 1,
kafka_group_name = '{topic_name}',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 8;
@ -900,12 +906,14 @@ def test_kafka_consumer_hang2(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_commit_on_select = 1,
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
''')
@ -944,6 +952,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_commit_on_select = 1,
kafka_group_name = 'csv',
kafka_format = 'CSV';
''')
@ -968,6 +977,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_commit_on_select = 1,
kafka_group_name = 'tsv',
kafka_format = 'TSV';
''')
@ -991,6 +1001,7 @@ def test_kafka_select_empty(kafka_cluster):
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic_name}',
kafka_commit_on_select = 1,
kafka_group_name = '{topic_name}',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
@ -1017,6 +1028,7 @@ def test_kafka_json_without_delimiter(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
''')
@ -1041,6 +1053,7 @@ def test_kafka_protobuf(kafka_cluster):
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_commit_on_select = 1,
kafka_schema = 'kafka.proto:KeyValuePair';
''')
@ -1069,6 +1082,7 @@ SETTINGS
kafka_topic_list = 'string_field_on_first_position_in_protobuf',
kafka_group_name = 'string_field_on_first_position_in_protobuf',
kafka_format = 'Protobuf',
kafka_commit_on_select = 1,
kafka_schema = 'social:User';
''')
@ -1135,6 +1149,7 @@ def test_kafka_protobuf_no_delimiter(kafka_cluster):
kafka_topic_list = 'pb_no_delimiter',
kafka_group_name = 'pb_no_delimiter',
kafka_format = 'ProtobufSingle',
kafka_commit_on_select = 1,
kafka_schema = 'kafka.proto:KeyValuePair';
''')
@ -1157,6 +1172,7 @@ def test_kafka_protobuf_no_delimiter(kafka_cluster):
kafka_topic_list = 'pb_no_delimiter',
kafka_group_name = 'pb_no_delimiter',
kafka_format = 'ProtobufSingle',
kafka_commit_on_select = 1,
kafka_schema = 'kafka.proto:KeyValuePair';
''')
@ -1487,6 +1503,7 @@ def test_kafka_virtual_columns(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt1',
kafka_group_name = 'virt1',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
''')
@ -1558,6 +1575,7 @@ def test_kafka_insert(kafka_cluster):
kafka_topic_list = 'insert1',
kafka_group_name = 'insert1',
kafka_format = 'TSV',
kafka_commit_on_select = 1,
kafka_row_delimiter = '\\n';
''')
@ -1855,6 +1873,7 @@ def test_kafka_insert_avro(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'avro1',
kafka_group_name = 'avro1',
kafka_commit_on_select = 1,
kafka_format = 'Avro';
''')
@ -2283,6 +2302,7 @@ def test_exception_from_destructor(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'xyz',
kafka_group_name = '',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
''')
instance.query_and_get_error('''
@ -2581,6 +2601,7 @@ def test_kafka_unavailable(kafka_cluster):
kafka_topic_list = 'test_bad_reschedule',
kafka_group_name = 'test_bad_reschedule',
kafka_format = 'JSONEachRow',
kafka_commit_on_select = 1,
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.destination_unavailable Engine=Log AS
@ -2650,6 +2671,7 @@ def test_kafka_csv_with_thread_per_consumer(kafka_cluster):
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
kafka_num_consumers = 4,
kafka_commit_on_select = 1,
kafka_thread_per_consumer = 1;
''')

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
</clickhouse>

View File

@ -22,6 +22,7 @@ import socket
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
user_configs=['configs/users.xml'],
with_kerberized_kafka=True,
clickhouse_path_dir="clickhouse_path")
@ -38,8 +39,8 @@ def get_kafka_producer(port, serializer):
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
raise Exception("Connection not establised, {}".format(errors))
def kafka_produce(kafka_cluster, topic, messages, timestamp=None):
logging.debug("kafka_produce server:{}:{} topic:{}".format("localhost", kafka_cluster.kerberized_kafka_port, topic))
@ -78,6 +79,7 @@ def test_kafka_json_as_string(kafka_cluster):
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
@ -106,6 +108,7 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
</clickhouse>

View File

@ -20,6 +20,7 @@ from . import rabbitmq_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/rabbitmq.xml', 'configs/macros.xml', 'configs/named_collection.xml'],
user_configs=['configs/users.xml'],
with_rabbitmq=True)
@ -78,6 +79,7 @@ def test_rabbitmq_select(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:5672',
rabbitmq_exchange_name = 'select',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'JSONEachRow',
rabbitmq_row_delimiter = '\\n';
'''.format(rabbitmq_cluster.rabbitmq_host))
@ -113,6 +115,7 @@ def test_rabbitmq_select_empty(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:5672',
rabbitmq_exchange_name = 'empty',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'TSV',
rabbitmq_row_delimiter = '\\n';
'''.format(rabbitmq_cluster.rabbitmq_host))
@ -125,6 +128,7 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster):
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{}:5672',
rabbitmq_commit_on_select = 1,
rabbitmq_exchange_name = 'json',
rabbitmq_format = 'JSONEachRow'
'''.format(rabbitmq_cluster.rabbitmq_host))
@ -167,6 +171,7 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster):
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'csv',
rabbitmq_commit_on_select = 1,
rabbitmq_format = 'CSV',
rabbitmq_row_delimiter = '\\n';
''')
@ -202,6 +207,7 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster):
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'tsv',
rabbitmq_format = 'TSV',
rabbitmq_commit_on_select = 1,
rabbitmq_queue_base = 'tsv',
rabbitmq_row_delimiter = '\\n';
CREATE TABLE test.view (key UInt64, value UInt64)
@ -238,6 +244,7 @@ def test_rabbitmq_macros(rabbitmq_cluster):
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = '{rabbitmq_host}:{rabbitmq_port}',
rabbitmq_commit_on_select = 1,
rabbitmq_exchange_name = '{rabbitmq_exchange_name}',
rabbitmq_format = '{rabbitmq_format}'
''')

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
# Tags: long, replica, no-replicated-database
# Tag no-replicated-database: Old syntax is not allowed
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS replicated_optimize1;"
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS replicated_optimize2;"
$CLICKHOUSE_CLIENT -q "CREATE TABLE replicated_optimize1 (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00925/optimize', 'r1', d, k, 8192);"
$CLICKHOUSE_CLIENT -q "CREATE TABLE replicated_optimize2 (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/test_00925/optimize', 'r2', d, k, 8192);"
num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT is_leader FROM system.replicas WHERE database=currentDatabase() AND table='replicated_optimize1'") -ne 1 ]]; do
sleep 0.5;
num_tries=$((num_tries-1))
if [ $num_tries -eq 10 ]; then
echo "Replica cannot become leader"
break
fi
done
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE replicated_optimize1 FINAL;"
$CLICKHOUSE_CLIENT -q "DROP TABLE replicated_optimize1;"
$CLICKHOUSE_CLIENT -q "DROP TABLE replicated_optimize2;"

View File

@ -1,12 +0,0 @@
-- Tags: long, replica, no-replicated-database
-- Tag no-replicated-database: Old syntax is not allowed
DROP TABLE IF EXISTS replicated_optimize1;
DROP TABLE IF EXISTS replicated_optimize2;
CREATE TABLE replicated_optimize1 (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/test_00925/optimize', 'r1', d, k, 8192);
CREATE TABLE replicated_optimize2 (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/test_00925/optimize', 'r2', d, k, 8192);
OPTIMIZE TABLE replicated_optimize1 FINAL;
DROP TABLE replicated_optimize1;
DROP TABLE replicated_optimize2;

View File

@ -0,0 +1,37 @@
-- Tags: zookeeper
create table rmt1 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by toYYYYMMDD(d);
create table rmt2 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by toYYYYMMDD(d);
system stop replicated sends rmt1;
insert into rmt1 values (now(), arrayJoin([1, 2])); -- { clientError 36 }
insert into rmt1(n) select * from system.numbers limit arrayJoin([1, 2]); -- { serverError 36 }
insert into rmt1 values (now(), rand());
drop table rmt1;
system sync replica rmt2;
drop table rmt2;
create table rmt1 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by tuple();
create table rmt2 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by tuple();
system stop replicated sends rmt1;
insert into rmt1 values (now(), rand());
drop table rmt1;
system sync replica rmt2;
drop table rmt2;
create table rmt1 (n UInt8, m Int32, d Date, t DateTime) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by (n, m, d, t);
create table rmt2 (n UInt8, m Int32, d Date, t DateTime) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by (n, m, d, t);
system stop replicated sends rmt1;
insert into rmt1 values (rand(), rand(), now(), now());
insert into rmt1 values (rand(), rand(), now(), now());
insert into rmt1 values (rand(), rand(), now(), now());
drop table rmt1;
system sync replica rmt2;
drop table rmt2;

View File

@ -1,4 +1,4 @@
CREATE TABLE table_2009_part (`i` Int64, `d` Date, `s` String) ENGINE = MergeTree PARTITION BY toYYYYMM(d) ORDER BY i;
ALTER TABLE table_2009_part ATTACH PARTITION tuple(arrayJoin([0, 1])); -- {serverError 248}
ALTER TABLE table_2009_part ATTACH PARTITION tuple(toYYYYMM(toDate([arrayJoin([arrayJoin([arrayJoin([arrayJoin([3, materialize(NULL), arrayJoin([1025, materialize(NULL), materialize(NULL)]), NULL])])]), materialize(NULL)])], NULL))); -- {serverError 248}
ALTER TABLE table_2009_part ATTACH PARTITION tuple(arrayJoin([0, 1])); -- {serverError 36}
ALTER TABLE table_2009_part ATTACH PARTITION tuple(toYYYYMM(toDate([arrayJoin([arrayJoin([arrayJoin([arrayJoin([3, materialize(NULL), arrayJoin([1025, materialize(NULL), materialize(NULL)]), NULL])])]), materialize(NULL)])], NULL))); -- {serverError 36}

View File

@ -16,7 +16,7 @@ c
a
1
1
1
true
1
a
\N

View File

@ -0,0 +1,120 @@
CREATE TABLE default.bool_test\n(\n `value` Bool,\n `f` String\n)\nENGINE = Memory
false test
true test
false test
true test
false test
true test
false test
true test
false test
true test
{"value":false,"f":"test"}
{"value":true,"f":"test"}
{"value":false,"f":"test"}
{"value":true,"f":"test"}
{"value":false,"f":"test"}
{"value":true,"f":"test"}
{"value":false,"f":"test"}
{"value":true,"f":"test"}
{"value":false,"f":"test"}
{"value":true,"f":"test"}
0 test
1 test
0 test
1 test
0 test
1 test
0 test
1 test
0 test
1 test
true test
true test
true test
true test
true test
False,"test"
False,"test"
False,"test"
False,"test"
False,"test"
False,"test"
True,"test"
True,"test"
True,"test"
True,"test"
True,"test"
True,"test"
False test
False test
False test
False test
False test
False test
True test
True test
True test
True test
True test
True test
No,"test"
No,"test"
No,"test"
No,"test"
No,"test"
No,"test"
No,"test"
Yes,"test"
Yes,"test"
Yes,"test"
Yes,"test"
Yes,"test"
Yes,"test"
Yes,"test"
No test
No test
No test
No test
No test
No test
No test
Yes test
Yes test
Yes test
Yes test
Yes test
Yes test
Yes test
Off,"test"
Off,"test"
Off,"test"
Off,"test"
Off,"test"
Off,"test"
Off,"test"
Off,"test"
On,"test"
On,"test"
On,"test"
On,"test"
On,"test"
On,"test"
On,"test"
On,"test"
Off test
Off test
Off test
Off test
Off test
Off test
Off test
Off test
On test
On test
On test
On test
On test
On test
On test
On test

View File

@ -0,0 +1,38 @@
DROP TABLE IF EXISTS bool_test;
CREATE TABLE bool_test (value Bool,f String) ENGINE = Memory;
-- value column shoud have type 'Bool'
SHOW CREATE TABLE bool_test;
INSERT INTO bool_test (value,f) VALUES ('false', 'test'), ('true' , 'test'), (0, 'test'), (1, 'test'), ('FALSE', 'test'), ('TRUE', 'test');
INSERT INTO bool_test (value,f) FORMAT JSONEachRow {"value":false,"f":"test"}{"value":true,"f":"test"}{"value":0,"f":"test"}{"value":1,"f":"test"}
SELECT value,f FROM bool_test;
SELECT value,f FROM bool_test FORMAT JSONEachRow;
SELECT toUInt64(value),f FROM bool_test;
SELECT value,f FROM bool_test where value > 0;
set bool_true_representation='True';
set bool_false_representation='False';
INSERT INTO bool_test (value,f) FORMAT CSV True,test
INSERT INTO bool_test (value,f) FORMAT TSV False test
SELECT value,f FROM bool_test order by value FORMAT CSV;
SELECT value,f FROM bool_test order by value FORMAT TSV;
set bool_true_representation='Yes';
set bool_false_representation='No';
INSERT INTO bool_test (value,f) FORMAT CSV Yes,test
INSERT INTO bool_test (value,f) FORMAT TSV No test
SELECT value,f FROM bool_test order by value FORMAT CSV;
SELECT value,f FROM bool_test order by value FORMAT TSV;
set bool_true_representation='On';
set bool_false_representation='Off';
INSERT INTO bool_test (value,f) FORMAT CSV On,test
INSERT INTO bool_test (value,f) FORMAT TSV Off test
SELECT value,f FROM bool_test order by value FORMAT CSV;
SELECT value,f FROM bool_test order by value FORMAT TSV;
DROP TABLE IF EXISTS bool_test;

View File

@ -0,0 +1 @@
Value

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS 02125_test_table;
CREATE TABLE 02125_test_table
(
id UInt64,
value Nullable(String)
)
ENGINE=TinyLog;
INSERT INTO 02125_test_table VALUES (0, 'Value');
DROP DICTIONARY IF EXISTS 02125_test_dictionary;
CREATE DICTIONARY 02125_test_dictionary
(
id UInt64,
value Nullable(String)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(TABLE '02125_test_table'))
LAYOUT(DIRECT());
SELECT dictGet('02125_test_dictionary', 'value', toUInt64(0));
SELECT dictGetString('02125_test_dictionary', 'value', toUInt64(0)); --{serverError 53}

View File

@ -0,0 +1,6 @@
42.42 42.42
42.42 42.42
42.42 42.42
42.42 42.42
42.42 42.42
42.42 42.42

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "select toFloat32('+42.42'), toFloat64('+42.42')"
$CLICKHOUSE_CLIENT -q "drop table if exists test_02127"
$CLICKHOUSE_CLIENT -q "create table test_02127 (x Float32, y Float64) engine=Memory()"
for escaping_rule in Quoted JSON Escaped CSV Raw
do
echo -e "+42.42\t+42.42" | $CLICKHOUSE_CLIENT -q "insert into test_02127 format CustomSeparated settings format_custom_escaping_rule='$escaping_rule'"
done
$CLICKHOUSE_CLIENT -q "select * from test_02127"
$CLICKHOUSE_CLIENT -q "drop table test_02127"