Merge branch 'master' into ast

This commit is contained in:
chertus 2019-10-21 15:08:05 +03:00
commit 4f0859a5fa
178 changed files with 2232 additions and 533 deletions

View File

@ -1,2 +0,0 @@
- regExp: ".*\\.md$"
labels: ["documentation", "pr-documentation"]

1
.github/labeler.keywords.yml vendored Normal file
View File

@ -0,0 +1 @@
pr-feature: "New Feature"

23
.github/labeler.yml vendored Normal file
View File

@ -0,0 +1,23 @@
# Build changes
pr-build:
- "**/CMakeLists.txt"
# Documentation PRs
documentation:
- "**/*.md"
- "docs/**/*"
pr-documentation:
- "**/*.md"
- "docs/**/*"
# Component labels
comp-mutations:
- "**/*Mutation*"
comp-matview:
- "**/*MaterializedView*"
comp-skipidx:
- "**/*Indices*"
comp-kafka:
- "dbms/src/Storages/Kafka/**/*"
- "dbms/tests/integration/test_storage_kafka/**/*"
- "utils/kafka/**/*"

View File

@ -1,9 +0,0 @@
workflow "Main workflow" {
resolves = ["Label PR"]
on = "pull_request"
}
action "Label PR" {
uses = "decathlon/pull-request-labeler-action@v1.0.0"
secrets = ["GITHUB_TOKEN"]
}

67
.github/stale.yml vendored Normal file
View File

@ -0,0 +1,67 @@
# Configuration for probot-stale - https://github.com/probot/stale
# Number of days of inactivity before an Issue or Pull Request becomes stale
daysUntilStale: 45
# Number of days of inactivity before an Issue or Pull Request with the stale label is closed.
# Set to false to disable. If disabled, issues still need to be closed manually, but will remain marked as stale.
daysUntilClose: 30
# Only issues or pull requests with all of these labels are check if stale. Defaults to `[]` (disabled)
onlyLabels: []
# Issues or Pull Requests with these labels will never be considered stale. Set to `[]` to disable
exemptLabels:
- bug
- feature
- memory
- performance
- prio-crit
- prio-major
- st-accepted
- st-in-progress
- st-waiting-for-fix
# Set to true to ignore issues in a project (defaults to false)
exemptProjects: false
# Set to true to ignore issues in a milestone (defaults to false)
exemptMilestones: false
# Set to true to ignore issues with an assignee (defaults to false)
exemptAssignees: false
# Label to use when marking as stale
staleLabel: stale
# Comment to post when marking as stale. Set to `false` to disable
markComment: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. Thank you
for your contributions.
# Comment to post when removing the stale label.
# unmarkComment: >
# Your comment here.
# Comment to post when closing a stale Issue or Pull Request.
# closeComment: >
# Your comment here.
# Limit the number of actions per hour, from 1-30. Default is 30
limitPerRun: 30
# Limit to only `issues` or `pulls`
# only: issues
# Optionally, specify configuration settings that are specific to just 'issues' or 'pulls':
pulls:
daysUntilStale: 365
markComment: >
This pull request has been automatically marked as stale because it has not had
any activity for over a year. It will be closed if no further activity occurs. Thank you
for your contributions.
# issues:
# exemptLabels:
# - confirmed

11
.github/workflows/labeler.yml vendored Normal file
View File

@ -0,0 +1,11 @@
name: "Pull Request Labeler"
on:
pull_request
jobs:
by-filename:
runs-on: ubuntu-latest
steps:
- uses: "actions/labeler@v2"
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"

27
.potato.yml Normal file
View File

@ -0,0 +1,27 @@
# This is the configuration file with settings for Potato.
# Potato is an internal Yandex technology that allows us to sync internal [Yandex.Tracker](https://yandex.com/tracker/) and GitHub.
# For all PRs where documentation is needed, just add a 'pr-feature' label and we will include it into documentation sprints.
# The project name.
name: clickhouse
# Object handlers defines which handlers we use.
handlers:
# The handler for creating an Yandex.Tracker issue.
- name: issue-create
params:
triggers:
# The trigger for creating the Yandex.Tracker issue. When the specified event occurs, it transfers PR data to Yandex.Tracker.
github:pullRequest:labeled:
data:
# The Yandex.Tracker queue to create the issue in. Each issue in Tracker belongs to one of the project queues.
queue: CLICKHOUSEDOCS
# The issue title.
summary: '[Potato] Pull Request #{{pullRequest.number}}'
# The issue description.
description: >
{{pullRequest.description}}
Ссылка на Pull Request: {{pullRequest.webUrl}}
# The condition for creating the Yandex.Tracker issue.
condition: eventPayload.labels.filter(label => ['pr-feature'].includes(label.name)).length

View File

@ -13,8 +13,6 @@ ClickHouse is an open-source column-oriented database management system that all
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup in Hong Kong](https://www.meetup.com/Hong-Kong-Machine-Learning-Meetup/events/263580542/) on October 17.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.
* [ClickHouse Meetup in Tokyo](https://clickhouse.connpass.com/event/147001/) on November 14.
* [ClickHouse Meetup in Istanbul](https://www.eventbrite.com/e/clickhouse-meetup-istanbul-create-blazing-fast-experiences-w-clickhouse-tickets-73101120419) on November 19.

View File

@ -62,6 +62,7 @@ set(SRCS
)
add_library(rdkafka ${SRCS})
target_compile_options(rdkafka PRIVATE -fno-sanitize=undefined)
target_include_directories(rdkafka SYSTEM PUBLIC include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR}) # Because weird logic with "include_next" is used.
target_include_directories(rdkafka SYSTEM PRIVATE ${ZSTD_INCLUDE_DIR}/common) # Because wrong path to "zstd_errors.h" is used.

View File

@ -13,6 +13,7 @@
// machines.
#include "murmurhash2.h"
#include <cstring>
// Platform-specific functions and macros
// Microsoft Visual Studio
@ -48,7 +49,8 @@ uint32_t MurmurHash2(const void * key, int len, uint32_t seed)
while (len >= 4)
{
uint32_t k = *reinterpret_cast<const uint32_t *>(data);
uint32_t k;
memcpy(&k, data, sizeof(k));
k *= m;
k ^= k >> r;
k *= m;
@ -418,4 +420,4 @@ uint32_t MurmurHashAligned2(const void * key, int len, uint32_t seed)
return h;
}
}
}

View File

@ -7,6 +7,7 @@
// non-native version will be less than optimal.
#include "murmurhash3.h"
#include <cstring>
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
@ -53,7 +54,9 @@ inline uint64_t rotl64 ( uint64_t x, int8_t r )
FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
{
return p[i];
uint32_t res;
memcpy(&res, p + i, sizeof(res));
return res;
}
FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i )

View File

@ -274,15 +274,24 @@ private:
pcg64 generator(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
for (size_t i = 0; i < concurrency; ++i)
try
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (size_t i = 0; i < concurrency; ++i)
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(
connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
}
}
catch (...)
{
pool.wait();
throw;
}
InterruptListener interrupt_listener;

View File

@ -895,7 +895,7 @@ public:
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
@ -2038,7 +2038,7 @@ protected:
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
thread_pool.wait();
}

View File

@ -565,7 +565,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto executor = pipeline.execute();
std::atomic_bool exception = false;
pool.schedule([&]()
pool.scheduleOrThrowOnError([&]()
{
/// ThreadStatus thread_status;

View File

@ -0,0 +1,39 @@
#include <AggregateFunctions/AggregateFunctionOrFill.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
namespace DB
{
template <bool UseNull>
class AggregateFunctionCombinatorOrFill final : public IAggregateFunctionCombinator
{
public:
String getName() const override
{
if constexpr (UseNull)
return "OrNull";
else
return "OrDefault";
}
AggregateFunctionPtr transformAggregateFunction(
const AggregateFunctionPtr & nested_function,
const DataTypes & arguments,
const Array & params) const override
{
return std::make_shared<AggregateFunctionOrFill<UseNull>>(
nested_function,
arguments,
params);
}
};
void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactory & factory)
{
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorOrFill<false>>());
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorOrFill<true>>());
}
}

View File

@ -0,0 +1,179 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
/**
* -OrDefault and -OrNull combinators for aggregate functions.
* If there are no input values, return NULL or a default value, accordingly.
* Use a single additional byte of data after the nested function data:
* 0 means there was no input, 1 means there was some.
*/
template <bool UseNull>
class AggregateFunctionOrFill final : public IAggregateFunctionHelper<AggregateFunctionOrFill<UseNull>>
{
private:
AggregateFunctionPtr nested_function;
size_t size_of_data;
DataTypePtr inner_type;
bool inner_nullable;
public:
AggregateFunctionOrFill(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params)
: IAggregateFunctionHelper<AggregateFunctionOrFill>{arguments, params}
, nested_function{nested_function_}
, size_of_data {nested_function->sizeOfData()}
, inner_type {nested_function->getReturnType()}
, inner_nullable {inner_type->isNullable()}
{
// nothing
}
String getName() const override
{
if constexpr (UseNull)
return nested_function->getName() + "OrNull";
else
return nested_function->getName() + "OrDefault";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();
}
bool allocatesMemoryInArena() const override
{
return nested_function->allocatesMemoryInArena();
}
bool hasTrivialDestructor() const override
{
return nested_function->hasTrivialDestructor();
}
size_t sizeOfData() const override
{
return size_of_data + sizeof(char);
}
size_t alignOfData() const override
{
return nested_function->alignOfData();
}
void create(AggregateDataPtr place) const override
{
nested_function->create(place);
place[size_of_data] = 0;
}
void destroy(AggregateDataPtr place) const noexcept override
{
nested_function->destroy(place);
}
void add(
AggregateDataPtr place,
const IColumn ** columns,
size_t row_num,
Arena * arena) const override
{
nested_function->add(place, columns, row_num, arena);
place[size_of_data] = 1;
}
void merge(
AggregateDataPtr place,
ConstAggregateDataPtr rhs,
Arena * arena) const override
{
nested_function->merge(place, rhs, arena);
}
void serialize(
ConstAggregateDataPtr place,
WriteBuffer & buf) const override
{
nested_function->serialize(place, buf);
}
void deserialize(
AggregateDataPtr place,
ReadBuffer & buf,
Arena * arena) const override
{
nested_function->deserialize(place, buf, arena);
}
DataTypePtr getReturnType() const override
{
if constexpr (UseNull)
{
// -OrNull
if (inner_nullable)
return inner_type;
return std::make_shared<DataTypeNullable>(inner_type);
}
else
{
// -OrDefault
return inner_type;
}
}
void insertResultInto(
ConstAggregateDataPtr place,
IColumn & to) const override
{
if (place[size_of_data])
{
if constexpr (UseNull)
{
// -OrNull
if (inner_nullable)
nested_function->insertResultInto(place, to);
else
{
ColumnNullable & col = typeid_cast<ColumnNullable &>(to);
col.getNullMapColumn().insertDefault();
nested_function->insertResultInto(place, col.getNestedColumn());
}
}
else
{
// -OrDefault
nested_function->insertResultInto(place, to);
}
}
else
to.insertDefault();
}
};
}

View File

@ -29,8 +29,8 @@ private:
size_t step;
size_t total;
size_t aod;
size_t sod;
size_t align_of_data;
size_t size_of_data;
public:
AggregateFunctionResample(
@ -47,8 +47,8 @@ public:
, end{end_}
, step{step_}
, total{0}
, aod{nested_function->alignOfData()}
, sod{(nested_function->sizeOfData() + aod - 1) / aod * aod}
, align_of_data{nested_function->alignOfData()}
, size_of_data{(nested_function->sizeOfData() + align_of_data - 1) / align_of_data * align_of_data}
{
// notice: argument types has been checked before
if (step == 0)
@ -94,24 +94,24 @@ public:
size_t sizeOfData() const override
{
return total * sod;
return total * size_of_data;
}
size_t alignOfData() const override
{
return aod;
return align_of_data;
}
void create(AggregateDataPtr place) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->create(place + i * sod);
nested_function->create(place + i * size_of_data);
}
void destroy(AggregateDataPtr place) const noexcept override
{
for (size_t i = 0; i < total; ++i)
nested_function->destroy(place + i * sod);
nested_function->destroy(place + i * size_of_data);
}
void add(
@ -132,7 +132,7 @@ public:
size_t pos = (key - begin) / step;
nested_function->add(place + pos * sod, columns, row_num, arena);
nested_function->add(place + pos * size_of_data, columns, row_num, arena);
}
void merge(
@ -141,7 +141,7 @@ public:
Arena * arena) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->merge(place + i * sod, rhs + i * sod, arena);
nested_function->merge(place + i * size_of_data, rhs + i * size_of_data, arena);
}
void serialize(
@ -149,7 +149,7 @@ public:
WriteBuffer & buf) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->serialize(place + i * sod, buf);
nested_function->serialize(place + i * size_of_data, buf);
}
void deserialize(
@ -158,7 +158,7 @@ public:
Arena * arena) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->deserialize(place + i * sod, buf, arena);
nested_function->deserialize(place + i * size_of_data, buf, arena);
}
DataTypePtr getReturnType() const override
@ -174,7 +174,7 @@ public:
auto & col_offsets = assert_cast<ColumnArray::ColumnOffsets &>(col.getOffsetsColumn());
for (size_t i = 0; i < total; ++i)
nested_function->insertResultInto(place + i * sod, col.getData());
nested_function->insertResultInto(place + i * size_of_data, col.getData());
col_offsets.getData().push_back(col.getData().size());
}

View File

@ -42,6 +42,7 @@ void registerAggregateFunctionCombinatorForEach(AggregateFunctionCombinatorFacto
void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctions()
@ -88,6 +89,7 @@ void registerAggregateFunctions()
registerAggregateFunctionCombinatorState(factory);
registerAggregateFunctionCombinatorMerge(factory);
registerAggregateFunctionCombinatorNull(factory);
registerAggregateFunctionCombinatorOrFill(factory);
registerAggregateFunctionCombinatorResample(factory);
}
}

View File

@ -94,16 +94,17 @@ MutableColumnPtr ColumnTuple::cloneResized(size_t new_size) const
Field ColumnTuple::operator[](size_t n) const
{
return Tuple{ext::map<TupleBackend>(columns, [n] (const auto & column) { return (*column)[n]; })};
return ext::map<Tuple>(columns, [n] (const auto & column) { return (*column)[n]; });
}
void ColumnTuple::get(size_t n, Field & res) const
{
const size_t tuple_size = columns.size();
res = Tuple(TupleBackend(tuple_size));
TupleBackend & res_arr = DB::get<Tuple &>(res).toUnderType();
Tuple tuple(tuple_size);
for (const auto i : ext::range(0, tuple_size))
columns[i]->get(n, res_arr[i]);
columns[i]->get(n, tuple[i]);
res = tuple;
}
StringRef ColumnTuple::getDataAt(size_t) const
@ -118,7 +119,7 @@ void ColumnTuple::insertData(const char *, size_t)
void ColumnTuple::insert(const Field & x)
{
const TupleBackend & tuple = DB::get<const Tuple &>(x).toUnderType();
auto & tuple = DB::get<const Tuple &>(x);
const size_t tuple_size = columns.size();
if (tuple.size() != tuple_size)
@ -352,14 +353,14 @@ void ColumnTuple::getExtremes(Field & min, Field & max) const
{
const size_t tuple_size = columns.size();
min = Tuple(TupleBackend(tuple_size));
max = Tuple(TupleBackend(tuple_size));
auto & min_backend = min.get<Tuple &>().toUnderType();
auto & max_backend = max.get<Tuple &>().toUnderType();
Tuple min_tuple(tuple_size);
Tuple max_tuple(tuple_size);
for (const auto i : ext::range(0, tuple_size))
columns[i]->getExtremes(min_backend[i], max_backend[i]);
columns[i]->getExtremes(min_tuple[i], max_tuple[i]);
min = min_tuple;
max = max_tuple;
}
void ColumnTuple::forEachSubcolumn(ColumnCallback callback)

View File

@ -72,9 +72,8 @@ String FieldVisitorDump::operator() (const Array & x) const
return wb.str();
}
String FieldVisitorDump::operator() (const Tuple & x_def) const
String FieldVisitorDump::operator() (const Tuple & x) const
{
auto & x = x_def.toUnderType();
WriteBufferFromOwnString wb;
wb << "Tuple_(";
@ -149,9 +148,8 @@ String FieldVisitorToString::operator() (const Array & x) const
return wb.str();
}
String FieldVisitorToString::operator() (const Tuple & x_def) const
String FieldVisitorToString::operator() (const Tuple & x) const
{
auto & x = x_def.toUnderType();
WriteBufferFromOwnString wb;
wb << '(';
@ -211,6 +209,16 @@ void FieldVisitorHash::operator() (const String & x) const
hash.update(x.data(), x.size());
}
void FieldVisitorHash::operator() (const Tuple & x) const
{
UInt8 type = Field::Types::Tuple;
hash.update(type);
hash.update(x.size());
for (const auto & elem : x)
applyVisitor(*this, elem);
}
void FieldVisitorHash::operator() (const Array & x) const
{
UInt8 type = Field::Types::Array;

View File

@ -231,6 +231,7 @@ public:
void operator() (const Float64 & x) const;
void operator() (const String & x) const;
void operator() (const Array & x) const;
void operator() (const Tuple & x) const;
void operator() (const DecimalField<Decimal32> & x) const;
void operator() (const DecimalField<Decimal64> & x) const;
void operator() (const DecimalField<Decimal128> & x) const;
@ -479,6 +480,7 @@ public:
bool operator() (Null &) const { throw Exception("Cannot sum Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (String &) const { throw Exception("Cannot sum Strings", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array &) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Tuple &) const { throw Exception("Cannot sum Tuples", ErrorCodes::LOGICAL_ERROR); }
bool operator() (UInt128 &) const { throw Exception("Cannot sum UUIDs", ErrorCodes::LOGICAL_ERROR); }
bool operator() (AggregateFunctionStateData &) const { throw Exception("Cannot sum AggregateFunctionStates", ErrorCodes::LOGICAL_ERROR); }

View File

@ -1,5 +1,10 @@
#pragma once
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wreserved-id-macro"
#endif
#define __msan_unpoison(X, Y)
#define __msan_test_shadow(X, Y) (false)
#define __msan_print_shadow(X, Y)
@ -11,3 +16,7 @@
# include <sanitizer/msan_interface.h>
# endif
#endif
#ifdef __clang__
#pragma clang diagnostic pop
#endif

View File

@ -121,13 +121,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
}
template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}

View File

@ -36,18 +36,23 @@ public:
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: greater is higher.
void schedule(Job job, int priority = 0);
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
void scheduleOrThrowOnError(Job job, int priority = 0);
/// Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
/// Wait for specified amount of time and schedule a job or throw an exception.
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// You may call schedule and wait many times in arbitrary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
@ -140,7 +145,7 @@ public:
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
: state(std::make_shared<Poco::Event>())
{
/// NOTE: If this will throw an exception, the descructor won't be called.
/// NOTE: If this will throw an exception, the destructor won't be called.
GlobalThreadPool::instance().scheduleOrThrow([
state = state,
func = std::forward<Function>(func),

View File

@ -24,7 +24,7 @@ std::vector<std::string> getMultipleValuesFromConfig(const Poco::Util::AbstractC
{
std::vector<std::string> values;
for (const auto & key : DB::getMultipleKeysFromConfig(config, root, name))
values.emplace_back(config.getString(key));
values.emplace_back(config.getString(root.empty() ? key : root + "." + key));
return values;
}

View File

@ -0,0 +1,26 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Poco/AutoPtr.h>
#include <Poco/Util/XMLConfiguration.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(Common, getMultipleValuesFromConfig)
{
std::istringstream xml_isteam(R"END(<?xml version="1.0"?>
<yandex>
<first_level>
<second_level>0</second_level>
<second_level>1</second_level>
<second_level>2</second_level>
<second_level>3</second_level>
</first_level>
</yandex>)END");
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(xml_isteam);
std::vector<std::string> answer = getMultipleValuesFromConfig(*config, "first_level", "second_level");
std::vector<std::string> right_answer = {"0", "1", "2", "3"};
EXPECT_EQ(answer, right_answer);
}

View File

@ -21,14 +21,14 @@ TEST(ThreadPool, ConcurrentWait)
ThreadPool pool(num_threads);
for (size_t i = 0; i < num_jobs; ++i)
pool.schedule(worker);
pool.scheduleOrThrowOnError(worker);
constexpr size_t num_waiting_threads = 4;
ThreadPool waiting_pool(num_waiting_threads);
for (size_t i = 0; i < num_waiting_threads; ++i)
waiting_pool.schedule([&pool]{ pool.wait(); });
waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); });
waiting_pool.wait();
}

View File

@ -30,11 +30,11 @@ TEST(ThreadPool, GlobalFull1)
ThreadPool pool(num_jobs);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);
for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
}
@ -67,10 +67,10 @@ TEST(ThreadPool, GlobalFull2)
ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);
ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
@ -79,7 +79,7 @@ TEST(ThreadPool, GlobalFull2)
global_pool.wait();
for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });
another_pool.scheduleOrThrowOnError([&] { ++counter; });
another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);

View File

@ -14,7 +14,7 @@ int test()
std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ ++counter; });
pool.scheduleOrThrowOnError([&]{ ++counter; });
pool.wait();
return counter;

View File

@ -14,7 +14,7 @@ TEST(ThreadPool, Loop)
size_t threads = 16;
ThreadPool pool(threads);
for (size_t j = 0; j < threads; ++j)
pool.schedule([&]{ ++res; });
pool.scheduleOrThrowOnError([&] { ++res; });
pool.wait();
}

View File

@ -9,12 +9,12 @@ bool check()
{
ThreadPool pool(10);
pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); });
try
{
for (size_t i = 0; i < 100; ++i)
pool.schedule([]{}); /// An exception will be rethrown from this method.
pool.scheduleOrThrowOnError([] {}); /// An exception will be rethrown from this method.
}
catch (const std::runtime_error &)
{

View File

@ -37,8 +37,8 @@ int main(int, char **)
ThreadPool tp(8);
for (size_t i = 0; i < n; ++i)
{
tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
tp.scheduleOrThrowOnError(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.scheduleOrThrowOnError(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
}
tp.wait();

View File

@ -284,7 +284,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -338,7 +338,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate12,
pool.scheduleOrThrowOnError(std::bind(aggregate12,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -397,7 +397,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -473,7 +473,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate2,
pool.scheduleOrThrowOnError(std::bind(aggregate2,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -499,7 +499,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2,
pool.scheduleOrThrowOnError(std::bind(merge2,
maps.data(), num_threads, i));
pool.wait();
@ -527,7 +527,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate22,
pool.scheduleOrThrowOnError(std::bind(aggregate22,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -553,7 +553,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2, maps.data(), num_threads, i));
pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i));
pool.wait();
@ -592,7 +592,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate3,
pool.scheduleOrThrowOnError(std::bind(aggregate3,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
@ -658,7 +658,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate33,
pool.scheduleOrThrowOnError(std::bind(aggregate33,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
@ -727,7 +727,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate4,
pool.scheduleOrThrowOnError(std::bind(aggregate4,
std::ref(local_maps[i]),
std::ref(global_map),
mutexes.data(),
@ -797,7 +797,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate5,
pool.scheduleOrThrowOnError(std::bind(aggregate5,
std::ref(local_maps[i]),
std::ref(global_map),
data.begin() + (data.size() * i) / num_threads,
@ -860,7 +860,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));

View File

@ -42,7 +42,7 @@ struct AggregateIndependent
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];
pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
for (auto it = begin; it != end; ++it)
{
@ -85,7 +85,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];
pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
typename Map::LookupResult place = nullptr;
Key prev_key {};
@ -180,7 +180,7 @@ struct MergeParallelForTwoLevelTable
ThreadPool & pool)
{
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
pool.schedule([&, bucket, num_maps]
pool.scheduleOrThrowOnError([&, bucket, num_maps]
{
std::vector<typename Map::Impl *> section(num_maps);
for (size_t i = 0; i < num_maps; ++i)

View File

@ -66,7 +66,7 @@ int main(int argc, char ** argv)
test(n, "Create and destroy ThreadPool each iteration", []
{
ThreadPool tp(1);
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
@ -90,7 +90,7 @@ int main(int argc, char ** argv)
test(n, "Schedule job for Threadpool each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}
@ -100,7 +100,7 @@ int main(int argc, char ** argv)
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}

View File

@ -152,9 +152,8 @@ namespace DB
buf.write(res.data(), res.size());
}
void readBinary(Tuple & x_def, ReadBuffer & buf)
void readBinary(Tuple & x, ReadBuffer & buf)
{
auto & x = x_def.toUnderType();
size_t size;
DB::readBinary(size, buf);
@ -231,9 +230,8 @@ namespace DB
}
}
void writeBinary(const Tuple & x_def, WriteBuffer & buf)
void writeBinary(const Tuple & x, WriteBuffer & buf)
{
auto & x = x_def.toUnderType();
const size_t size = x.size();
DB::writeBinary(size, buf);
@ -292,7 +290,12 @@ namespace DB
void writeText(const Tuple & x, WriteBuffer & buf)
{
DB::String res = applyVisitor(DB::FieldVisitorToString(), DB::Field(x));
writeFieldText(DB::Field(x), buf);
}
void writeFieldText(const Field & x, WriteBuffer & buf)
{
DB::String res = applyVisitor(DB::FieldVisitorToString(), x);
buf.write(res.data(), res.size());
}

View File

@ -34,9 +34,23 @@ template <typename T>
using NearestFieldType = typename NearestFieldTypeImpl<T>::Type;
class Field;
using Array = std::vector<Field>;
using TupleBackend = std::vector<Field>;
STRONG_TYPEDEF(TupleBackend, Tuple) /// Array and Tuple are different types with equal representation inside Field.
using FieldVector = std::vector<Field>;
/// Array and Tuple use the same storage type -- FieldVector, but we declare
/// distinct types for them, so that the caller can choose whether it wants to
/// construct a Field of Array or a Tuple type. An alternative approach would be
/// to construct both of these types from FieldVector, and have the caller
/// specify the desired Field type explicitly.
#define DEFINE_FIELD_VECTOR(X) \
struct X : public FieldVector \
{ \
using FieldVector::FieldVector; \
}
DEFINE_FIELD_VECTOR(Array);
DEFINE_FIELD_VECTOR(Tuple);
#undef DEFINE_FIELD_VECTOR
struct AggregateFunctionStateData
{
@ -457,7 +471,6 @@ private:
void createConcrete(T && x)
{
using UnqualifiedType = std::decay_t<T>;
which = TypeToEnum<UnqualifiedType>::value;
// In both Field and PODArray, small types may be stored as wider types,
// e.g. char is stored as UInt64. Field can return this extended value
@ -466,6 +479,7 @@ private:
// nominal type.
using StorageType = NearestFieldType<UnqualifiedType>;
new (&storage) StorageType(std::forward<T>(x));
which = TypeToEnum<UnqualifiedType>::value;
}
/// Assuming same types.
@ -748,5 +762,7 @@ void writeBinary(const Tuple & x, WriteBuffer & buf);
void writeText(const Tuple & x, WriteBuffer & buf);
void writeFieldText(const Field & x, WriteBuffer & buf);
[[noreturn]] inline void writeQuoted(const Tuple &, WriteBuffer &) { throw Exception("Cannot write Tuple quoted.", ErrorCodes::NOT_IMPLEMENTED); }
}

View File

@ -176,8 +176,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, CSVWithNames, TSVWithNames and TSKV formats).") \
M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
M(SettingBool, input_format_null_as_default, false, "For CSV format initialize null fields with default values if data type of this field is not nullable") \
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow, CSV and TSV formats).") \
M(SettingBool, input_format_tsv_empty_as_default, false, "Treat empty fields in TSV input as default values.") \
M(SettingBool, input_format_null_as_default, false, "For text input formats initialize null fields with default values if data type of this field is not nullable") \
\
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
M(SettingBool, input_format_values_deduce_templates_of_expressions, false, "For Values format: if field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.") \
@ -202,8 +203,8 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, fsync_metadata, 1, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.") \
\
M(SettingUInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
M(SettingFloat, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
M(SettingUInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
M(SettingFloat, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
\
M(SettingBool, join_use_nulls, 0, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.") \
\

View File

@ -36,7 +36,7 @@ void AsynchronousBlockInputStream::next()
{
ready.reset();
pool.schedule([this, thread_group = CurrentThread::getGroup()] ()
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};

View File

@ -168,21 +168,28 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
else
{
size_t num_children = children.size();
for (size_t i = 0; i < num_children; ++i)
try
{
auto & child = children[i];
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&child, thread_group]
for (size_t i = 0; i < num_children; ++i)
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
}
auto & child = children[i];
auto thread_group = CurrentThread::getGroup();
reading_pool->scheduleOrThrowOnError([&child, thread_group]
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
}
}
catch (...)
{
reading_pool->wait();
throw;
}
reading_pool->wait();
}
@ -194,7 +201,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
*/
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule([this, thread_group = CurrentThread::getGroup()] () { mergeThread(thread_group); });
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]() { mergeThread(thread_group); });
}
}
@ -475,22 +482,29 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
}
else
{
for (auto & input : inputs)
try
{
if (need_that_input(input))
for (auto & input : inputs)
{
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&input, &read_from_input, thread_group]
if (need_that_input(input))
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
auto thread_group = CurrentThread::getGroup();
reading_pool->scheduleOrThrowOnError([&input, &read_from_input, thread_group]
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
}
}
}
catch (...)
{
reading_pool->wait();
throw;
}
reading_pool->wait();
}

View File

@ -129,7 +129,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
for (size_t view_num = 0; view_num < views.size(); ++view_num)
{
auto thread_group = CurrentThread::getGroup();
pool.schedule([=]
pool.scheduleOrThrowOnError([=]
{
setThreadName("PushingToViews");
if (thread_group)

View File

@ -156,11 +156,11 @@ void DataTypeNullable::serializeBinary(const IColumn & column, size_t row_num, W
nested_data_type->serializeBinary(col.getNestedColumn(), row_num, ostr);
}
/// Deserialize value into ColumnNullable.
/// We need to insert both to nested column and to null byte map, or, in case of exception, to not insert at all.
template <typename CheckForNull, typename DeserializeNested>
static void safeDeserialize(
IColumn & column,
template <typename ReturnType = void, typename CheckForNull, typename DeserializeNested, typename std::enable_if_t<std::is_same_v<ReturnType, void>, ReturnType>* = nullptr>
static ReturnType safeDeserialize(
IColumn & column, const IDataType & /*nested_data_type*/,
CheckForNull && check_for_null, DeserializeNested && deserialize_nested)
{
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
@ -185,10 +185,26 @@ static void safeDeserialize(
}
}
/// Deserialize value into non-nullable column. In case of NULL, insert default value and return false.
template <typename ReturnType = void, typename CheckForNull, typename DeserializeNested, typename std::enable_if_t<std::is_same_v<ReturnType, bool>, ReturnType>* = nullptr>
static ReturnType safeDeserialize(
IColumn & column, const IDataType & nested_data_type,
CheckForNull && check_for_null, DeserializeNested && deserialize_nested)
{
assert(!dynamic_cast<ColumnNullable *>(&column));
assert(!dynamic_cast<const DataTypeNullable *>(&nested_data_type));
bool insert_default = check_for_null();
if (insert_default)
nested_data_type.insertDefaultInto(column);
else
deserialize_nested(column);
return !insert_default;
}
void DataTypeNullable::deserializeBinary(IColumn & column, ReadBuffer & istr) const
{
safeDeserialize(column,
safeDeserialize(column, *nested_data_type,
[&istr] { bool is_null = 0; readBinary(is_null, istr); return is_null; },
[this, &istr] (IColumn & nested) { nested_data_type->deserializeBinary(nested, istr); });
}
@ -206,6 +222,13 @@ void DataTypeNullable::serializeTextEscaped(const IColumn & column, size_t row_n
void DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextEscaped<void>(column, istr, settings, nested_data_type);
}
template<typename ReturnType>
ReturnType DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,
const DataTypePtr & nested_data_type)
{
/// Little tricky, because we cannot discriminate null from first character.
@ -215,9 +238,9 @@ void DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer & ist
/// This is not null, surely.
if (*istr.position() != '\\')
{
safeDeserialize(column,
return safeDeserialize<ReturnType>(column, *nested_data_type,
[] { return false; },
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextEscaped(nested, istr, settings); });
[&nested_data_type, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextEscaped(nested, istr, settings); });
}
else
{
@ -227,7 +250,7 @@ void DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer & ist
if (istr.eof())
throw Exception("Unexpected end of stream, while parsing value of Nullable type, after backslash", ErrorCodes::CANNOT_READ_ALL_DATA);
safeDeserialize(column,
return safeDeserialize<ReturnType>(column, *nested_data_type,
[&istr]
{
if (*istr.position() == 'N')
@ -237,7 +260,7 @@ void DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer & ist
}
return false;
},
[this, &istr, &settings] (IColumn & nested)
[&nested_data_type, &istr, &settings] (IColumn & nested)
{
if (istr.position() != istr.buffer().begin())
{
@ -275,15 +298,22 @@ void DataTypeNullable::serializeTextQuoted(const IColumn & column, size_t row_nu
void DataTypeNullable::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column,
deserializeTextQuoted<void>(column, istr, settings, nested_data_type);
}
template<typename ReturnType>
ReturnType DataTypeNullable::deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,
const DataTypePtr & nested_data_type)
{
return safeDeserialize<ReturnType>(column, *nested_data_type,
[&istr] { return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr); },
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextQuoted(nested, istr, settings); });
[&nested_data_type, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextQuoted(nested, istr, settings); });
}
void DataTypeNullable::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column,
safeDeserialize(column, *nested_data_type,
[&istr] { return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr); },
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsWholeText(nested, istr, settings); });
}
@ -300,6 +330,13 @@ void DataTypeNullable::serializeTextCSV(const IColumn & column, size_t row_num,
}
void DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextCSV<void>(column, istr, settings, nested_data_type);
}
template<typename ReturnType>
ReturnType DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,
const DataTypePtr & nested_data_type)
{
constexpr char const * null_literal = "NULL";
constexpr size_t len = 4;
@ -331,7 +368,7 @@ void DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
return false;
};
auto deserialize_nested = [this, &settings, &istr, &null_prefix_len] (IColumn & nested)
auto deserialize_nested = [&nested_data_type, &settings, &istr, &null_prefix_len] (IColumn & nested)
{
if (likely(!null_prefix_len))
nested_data_type->deserializeAsTextCSV(nested, istr, settings);
@ -357,8 +394,8 @@ void DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
WriteBufferFromOwnString parsed_value;
nested_data_type->serializeAsTextCSV(nested, nested.size() - 1, parsed_value, settings);
throw DB::Exception("Error while parsing \"" + std::string(null_literal, null_prefix_len)
+ std::string(istr.position(), std::min(size_t{10}, istr.available())) + "\" as " + getName()
+ " at position " + std::to_string(istr.count()) + ": expected \"NULL\" or " + nested_data_type->getName()
+ std::string(istr.position(), std::min(size_t{10}, istr.available())) + "\" as Nullable(" + nested_data_type->getName()
+ ") at position " + std::to_string(istr.count()) + ": expected \"NULL\" or " + nested_data_type->getName()
+ ", got \"" + std::string(null_literal, buf.count()) + "\", which was deserialized as \""
+ parsed_value.str() + "\". It seems that input data is ill-formatted.",
ErrorCodes::CANNOT_READ_ALL_DATA);
@ -366,7 +403,7 @@ void DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, c
}
};
safeDeserialize(column, check_for_null, deserialize_nested);
return safeDeserialize<ReturnType>(column, *nested_data_type, check_for_null, deserialize_nested);
}
void DataTypeNullable::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
@ -397,9 +434,16 @@ void DataTypeNullable::serializeTextJSON(const IColumn & column, size_t row_num,
void DataTypeNullable::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column,
deserializeTextJSON<void>(column, istr, settings, nested_data_type);
}
template<typename ReturnType>
ReturnType DataTypeNullable::deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,
const DataTypePtr & nested_data_type)
{
return safeDeserialize<ReturnType>(column, *nested_data_type,
[&istr] { return checkStringByFirstCharacterAndAssertTheRest("null", istr); },
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextJSON(nested, istr, settings); });
[&nested_data_type, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextJSON(nested, istr, settings); });
}
void DataTypeNullable::serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
@ -492,4 +536,10 @@ DataTypePtr removeNullable(const DataTypePtr & type)
return type;
}
template bool DataTypeNullable::deserializeTextEscaped<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template bool DataTypeNullable::deserializeTextQuoted<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings &, const DataTypePtr & nested);
template bool DataTypeNullable::deserializeTextCSV<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template bool DataTypeNullable::deserializeTextJSON<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings &, const DataTypePtr & nested);
}

View File

@ -100,6 +100,17 @@ public:
const DataTypePtr & getNestedType() const { return nested_data_type; }
/// If ReturnType is bool, check for NULL and deserialize value into non-nullable column (and return true) or insert default value of nested type (and return false)
/// If ReturnType is void, deserialize Nullable(T)
template <typename ReturnType = bool>
static ReturnType deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &, const DataTypePtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &, const DataTypePtr & nested);
private:
DataTypePtr nested_data_type;
};

View File

@ -101,7 +101,7 @@ static inline const IColumn & extractElementColumn(const IColumn & column, size_
void DataTypeTuple::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const auto & tuple = get<const Tuple &>(field).toUnderType();
const auto & tuple = get<const Tuple &>(field);
for (const auto idx_elem : ext::enumerate(elems))
idx_elem.second->serializeBinary(tuple[idx_elem.first], ostr);
}
@ -109,10 +109,12 @@ void DataTypeTuple::serializeBinary(const Field & field, WriteBuffer & ostr) con
void DataTypeTuple::deserializeBinary(Field & field, ReadBuffer & istr) const
{
const size_t size = elems.size();
field = Tuple(TupleBackend(size));
TupleBackend & tuple = get<Tuple &>(field).toUnderType();
Tuple tuple(size);
for (const auto i : ext::range(0, size))
elems[i]->deserializeBinary(tuple[i], istr);
field = tuple;
}
void DataTypeTuple::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
@ -447,7 +449,7 @@ MutableColumnPtr DataTypeTuple::createColumn() const
Field DataTypeTuple::getDefault() const
{
return Tuple(ext::map<TupleBackend>(elems, [] (const DataTypePtr & elem) { return elem->getDefault(); }));
return Tuple(ext::map<Tuple>(elems, [] (const DataTypePtr & elem) { return elem->getDefault(); }));
}
void DataTypeTuple::insertDefaultInto(IColumn & column) const

View File

@ -90,9 +90,8 @@ DataTypePtr FieldToDataType::operator() (const Array & x) const
}
DataTypePtr FieldToDataType::operator() (const Tuple & x) const
DataTypePtr FieldToDataType::operator() (const Tuple & tuple) const
{
auto & tuple = static_cast<const TupleBackend &>(x);
if (tuple.empty())
throw Exception("Cannot infer type of an empty tuple", ErrorCodes::EMPTY_DATA_PASSED);

View File

@ -141,7 +141,7 @@ void DatabaseOrdinary::loadTables(
for (const auto & file_name : file_names)
{
pool.schedule([&]() { loadOneTable(file_name); });
pool.scheduleOrThrowOnError([&]() { loadOneTable(file_name); });
}
pool.wait();
@ -174,11 +174,16 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
}
};
for (const auto & table : tables)
try
{
thread_pool.schedule([&]() { startupOneTable(table.second); });
for (const auto & table : tables)
thread_pool.scheduleOrThrowOnError([&]() { startupOneTable(table.second); });
}
catch (...)
{
thread_pool.wait();
throw;
}
thread_pool.wait();
}

View File

@ -40,7 +40,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings)
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
format_settings.csv.null_as_default = settings.input_format_null_as_default;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
@ -53,6 +53,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings)
format_settings.template_settings.resultset_format = settings.format_template_resultset;
format_settings.template_settings.row_format = settings.format_template_row;
format_settings.template_settings.row_between_delimiter = settings.format_template_rows_between_delimiter;
format_settings.tsv.empty_as_default = settings.input_format_tsv_empty_as_default;
return format_settings;
}

View File

@ -29,7 +29,6 @@ struct FormatSettings
bool allow_double_quotes = true;
bool unquoted_null_literal_as_null = false;
bool empty_as_default = false;
bool null_as_default = false;
};
CSV csv;
@ -61,10 +60,18 @@ struct FormatSettings
Template template_settings;
struct TSV
{
bool empty_as_default = false;
};
TSV tsv;
bool skip_unknown_fields = false;
bool with_names_use_header = false;
bool write_statistics = true;
bool import_nested_json = false;
bool null_as_default = false;
enum class DateTimeInputFormat
{

View File

@ -82,13 +82,18 @@ DataTypePtr FunctionBuilderJoinGet::getReturnTypeImpl(const ColumnsWithTypeAndNa
}
void FunctionJoinGet::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/)
void FunctionJoinGet::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
auto & ctn = block.getByPosition(arguments[2]);
auto ctn = block.getByPosition(arguments[2]);
if (isColumnConst(*ctn.column))
ctn.column = ctn.column->cloneResized(1);
ctn.name = ""; // make sure the key name never collide with the join columns
Block key_block = {ctn};
join->joinGet(key_block, attr_name);
block.getByPosition(result) = key_block.getByPosition(1);
auto & result_ctn = key_block.getByPosition(1);
if (isColumnConst(*ctn.column))
result_ctn.column = ColumnConst::create(result_ctn.column, input_rows_count);
block.getByPosition(result) = result_ctn;
}
void registerFunctionJoinGet(FunctionFactory & factory)

View File

@ -41,7 +41,7 @@ private:
swapBuffers();
/// The data will be written in separate stream.
pool.schedule([this] { thread(); });
pool.scheduleOrThrowOnError([this] { thread(); });
}
public:

View File

@ -89,7 +89,7 @@ struct ConnectionTimeouts
const auto & settings = context.getSettingsRef();
const auto & config = context.getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, http_keep_alive_timeout);
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout);
}
};

View File

@ -1158,7 +1158,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
tasks[bucket] = std::packaged_task<Block()>(std::bind(converter, bucket, CurrentThread::getGroup()));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
thread_pool->scheduleOrThrowOnError([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
@ -1614,7 +1614,7 @@ private:
if (max_scheduled_bucket_num >= NUM_BUCKETS)
return;
parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
parallel_merge_data->pool.scheduleOrThrowOnError(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
max_scheduled_bucket_num, CurrentThread::getGroup()));
}
@ -1968,7 +1968,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
auto task = std::bind(merge_bucket, bucket, aggregates_pool, CurrentThread::getGroup());
if (thread_pool)
thread_pool->schedule(task);
thread_pool->scheduleOrThrowOnError(task);
else
task();
}

View File

@ -203,6 +203,10 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns)
for (const auto & column : columns)
{
/// Do not include virtual columns
if (column.is_virtual)
continue;
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
ASTPtr column_declaration_ptr{column_declaration};

View File

@ -2083,19 +2083,8 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
});
}
if (pipeline.hasMoreThanOneStream())
{
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<AsynchronousBlockInputStream>(stream);
});
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
pipeline.firstStream() = std::make_shared<MergingSortedBlockInputStream>(
pipeline.streams, sorting_info->prefix_order_descr,
settings.max_block_size, limit_for_merging);
pipeline.streams.resize(1);
}
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
executeMergeSorted(pipeline, sorting_info->prefix_order_descr, limit_for_merging);
if (need_finish_sorting)
{
@ -2217,12 +2206,20 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
/// If there are several streams, then we merge them into one
if (pipeline.hasMoreThanOneStream())
{
unifyStreams(pipeline, pipeline.firstStream()->getHeader());
executeMergeSorted(pipeline, order_descr, limit);
}
}
void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit)
{
if (pipeline.hasMoreThanOneStream())
{
const Settings & settings = context.getSettingsRef();
/** MergingSortedBlockInputStream reads the sources sequentially.
* To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream.
@ -2232,8 +2229,8 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
stream = std::make_shared<AsynchronousBlockInputStream>(stream);
});
/// Merge the sorted sources into one sorted source.
pipeline.firstStream() = std::make_shared<MergingSortedBlockInputStream>(pipeline.streams, order_descr, settings.max_block_size, limit);
pipeline.firstStream() = std::make_shared<MergingSortedBlockInputStream>(
pipeline.streams, sort_description, settings.max_block_size, limit);
pipeline.streams.resize(1);
}
}
@ -2244,15 +2241,20 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline)
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
executeMergeSorted(pipeline, order_descr, limit);
}
void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
const Settings & settings = context.getSettingsRef();
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
order_descr,
sort_description,
settings.max_block_size, limit);
pipeline.addPipe({ std::move(transform) });
@ -2615,13 +2617,29 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
{
executeUnion(pipeline, {});
/// Merge streams to one. Use MergeSorting if data was read in sorted order, Union otherwise.
if (query_info.sorting_info)
{
if (pipeline.stream_with_non_joined_data)
throw Exception("Using read in order optimization, but has stream with non-joined data in pipeline", ErrorCodes::LOGICAL_ERROR);
executeMergeSorted(pipeline, query_info.sorting_info->prefix_order_descr, 0);
}
else
executeUnion(pipeline, {});
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(), subqueries_for_sets, context);
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
{
if (query_info.sorting_info)
{
if (pipeline.hasDelayedStream())
throw Exception("Using read in order optimization, but has delayed stream in pipeline", ErrorCodes::LOGICAL_ERROR);
executeMergeSorted(pipeline, query_info.sorting_info->prefix_order_descr, 0);
}
const Settings & settings = context.getSettingsRef();
auto creating_sets = std::make_shared<CreatingSetsTransform>(

View File

@ -214,6 +214,7 @@ private:
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
@ -231,6 +232,7 @@ private:
void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns);
void executeExtremes(QueryPipeline & pipeline);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
/// Add ConvertingBlockInputStream to specified header.
void unifyStreams(Pipeline & pipeline, Block header);

View File

@ -327,7 +327,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
for (auto & table : replica_names)
pool.schedule([&] () { tryRestartReplica(table.first, table.second, system_context); });
pool.scheduleOrThrowOnError([&]() { tryRestartReplica(table.first, table.second, system_context); });
pool.wait();
}

View File

@ -90,6 +90,8 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
if (!is_unlimited_query && max_size && processes.size() >= max_size)
{
if (queue_max_wait_ms)
LOG_WARNING(&Logger::get("ProcessList"), "Too many simultaneous queries, will wait " << queue_max_wait_ms << " ms.");
if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; }))
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
}

View File

@ -246,7 +246,7 @@ void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & co
else if (const auto * func = elem->as<ASTFunction>())
{
Field function_result;
const TupleBackend * tuple = nullptr;
const Tuple * tuple = nullptr;
if (func->name != "tuple")
{
if (!tuple_type)
@ -257,7 +257,7 @@ void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & co
throw Exception("Invalid type of set. Expected tuple, got " + String(function_result.getTypeName()),
ErrorCodes::INCORRECT_ELEMENT_OF_SET);
tuple = &function_result.get<Tuple>().toUnderType();
tuple = &function_result.get<Tuple>();
}
size_t tuple_size = tuple ? tuple->size() : func->arguments->children.size();

View File

@ -248,7 +248,7 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
{
if (src.getType() == Field::Types::Tuple)
{
const TupleBackend & src_tuple = src.get<Tuple>();
const auto & src_tuple = src.get<Tuple>();
size_t src_tuple_size = src_tuple.size();
size_t dst_tuple_size = type_tuple->getElements().size();
@ -256,7 +256,7 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
throw Exception("Bad size of tuple in IN or VALUES section. Expected size: "
+ toString(dst_tuple_size) + ", actual size: " + toString(src_tuple_size), ErrorCodes::TYPE_MISMATCH);
TupleBackend res(dst_tuple_size);
Tuple res(dst_tuple_size);
bool have_unconvertible_element = false;
for (size_t i = 0; i < dst_tuple_size; ++i)
{

View File

@ -15,6 +15,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/typeid_cast.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
namespace DB
@ -31,6 +32,9 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
{
NamesAndTypesList source_columns = {{ "_dummy", std::make_shared<DataTypeUInt8>() }};
auto ast = node->clone();
ReplaceQueryParameterVisitor param_visitor(context.getQueryParameters());
param_visitor.visit(ast);
String name = ast->getColumnName();
auto syntax_result = SyntaxAnalyzer(context).analyze(ast, source_columns);
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(ast, syntax_result, context).getConstActions();
@ -42,8 +46,6 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
if (!block_with_constants || block_with_constants.rows() == 0)
throw Exception("Logical error: empty block after evaluation of constant expression for IN, VALUES or LIMIT", ErrorCodes::LOGICAL_ERROR);
String name = node->getColumnName();
if (!block_with_constants.has(name))
throw Exception("Element of set in IN, VALUES or LIMIT is not a constant expression (result column not found): " + name, ErrorCodes::BAD_ARGUMENTS);

View File

@ -140,7 +140,7 @@ try
size_t num_threads = 2;
ThreadPool pool(num_threads);
for (size_t i = 0; i < num_threads; ++i)
pool.schedule([i]() { do_io(i); });
pool.scheduleOrThrowOnError([i]() { do_io(i); });
pool.wait();
test_perf();

View File

@ -85,7 +85,7 @@ namespace DB
// active_processors.insert(current_processor);
// }
//
// pool.schedule([processor = current_processor, &watch, this]
// pool.scheduleOrThrowOnError([processor = current_processor, &watch, this]
// {
// processor->work();
// {

View File

@ -46,6 +46,7 @@ Chunk IRowInputFormat::generate()
size_t prev_rows = total_rows;
///auto chunk_missing_values = std::make_unique<ChunkMissingValues>();
block_missing_values.clear();
try
{

View File

@ -77,7 +77,6 @@ protected:
private:
Params params;
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
size_t total_rows = 0;
size_t num_errors = 0;

View File

@ -24,12 +24,17 @@ CSVRowInputFormat::CSVRowInputFormat(const Block & header_, ReadBuffer & in_, co
, with_names(with_names_)
, format_settings(format_settings_)
{
const String bad_delimiters = " \t\"'.UL";
if (bad_delimiters.find(format_settings.csv.delimiter) != String::npos)
throw Exception(String("CSV format may not work correctly with delimiter '") + format_settings.csv.delimiter +
"'. Try use CustomSeparated format instead.", ErrorCodes::BAD_ARGUMENTS);
auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
column_idx_to_nullable_column_idx.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
@ -37,16 +42,6 @@ CSVRowInputFormat::CSVRowInputFormat(const Block & header_, ReadBuffer & in_, co
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
/// If input_format_null_as_default=1 we need ColumnNullable of type DataTypeNullable(nested_type)
/// to parse value as nullable before inserting it in corresponding column of not-nullable type.
/// Constructing temporary column for each row is slow, so we prepare it here
if (format_settings_.csv.null_as_default && !column_info.type->isNullable() && column_info.type->canBeInsideNullable())
{
column_idx_to_nullable_column_idx[i] = nullable_columns.size();
nullable_types.emplace_back(std::make_shared<DataTypeNullable>(column_info.type));
nullable_columns.emplace_back(nullable_types.back()->createColumn());
}
}
}
@ -220,6 +215,7 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
/// it doesn't have to check it.
bool have_default_columns = have_always_default_columns;
ext.read_columns.assign(read_columns.size(), true);
const auto delimiter = format_settings.csv.delimiter;
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
@ -229,9 +225,8 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
if (table_column)
{
skipWhitespacesAndTabs(in);
read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column],
is_last_file_column, *table_column);
if (!read_columns[*table_column])
ext.read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column], is_last_file_column);
if (!ext.read_columns[*table_column])
have_default_columns = true;
skipWhitespacesAndTabs(in);
}
@ -258,9 +253,9 @@ bool CSVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ext
/// value, we do not have to use the default value specified by
/// the data type, and can just use IColumn::insertDefault().
columns[i]->insertDefault();
ext.read_columns[i] = false;
}
}
ext.read_columns = read_columns;
}
return true;
@ -365,8 +360,7 @@ void CSVRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn &
if (column_indexes_for_input_fields[file_column])
{
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
if (!readField(column, type, is_last_file_column, *column_indexes_for_input_fields[file_column]))
column.insertDefault();
readField(column, type, is_last_file_column);
}
else
{
@ -378,12 +372,14 @@ void CSVRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColumn &
skipWhitespacesAndTabs(in);
}
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
{
const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (in.eof() || *in.position() == '\n' || *in.position() == '\r');
/// Note: Tuples are serialized in CSV as separate columns, but with empty_as_default or null_as_default
/// only one empty or NULL column will be expected
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
@ -393,20 +389,13 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bo
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
column.insertDefault();
return false;
}
else if (column_idx_to_nullable_column_idx[column_idx])
else if (format_settings.null_as_default && !type->isNullable())
{
/// If value is null but type is not nullable then use default value instead.
const size_t nullable_idx = *column_idx_to_nullable_column_idx[column_idx];
auto & tmp_col = *nullable_columns[nullable_idx];
nullable_types[nullable_idx]->deserializeAsTextCSV(tmp_col, in, format_settings);
Field value = tmp_col[0];
tmp_col.popBack(1); /// do not store copy of values in memory
if (value.isNull())
return false;
column.insert(value);
return true;
return DataTypeNullable::deserializeTextCSV(column, in, format_settings, type);
}
else
{

View File

@ -61,12 +61,7 @@ private:
return *pos != '\n' && *pos != '\r' && *pos != format_settings.csv.delimiter;
}
/// For setting input_format_null_as_default
DataTypes nullable_types;
MutableColumns nullable_columns;
OptionalIndexes column_idx_to_nullable_column_idx;
bool readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx);
bool readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column);
};
}

View File

@ -81,12 +81,11 @@ static Field convertNodeToField(const capnp::DynamicValue::Reader & value)
auto structValue = value.as<capnp::DynamicStruct>();
const auto & fields = structValue.getSchema().getFields();
Field field = Tuple(TupleBackend(fields.size()));
TupleBackend & tuple = get<Tuple &>(field).toUnderType();
Tuple tuple(fields.size());
for (auto i : kj::indices(fields))
tuple[i] = convertNodeToField(structValue.get(fields[i]));
return field;
return tuple;
}
case capnp::DynamicValue::CAPABILITY:
throw Exception("CAPABILITY type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
@ -271,7 +270,7 @@ bool CapnProtoRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
// Populate array with a single tuple elements
for (size_t off = 0; off < size; ++off)
{
const TupleBackend & tuple = DB::get<const Tuple &>(collected[off]).toUnderType();
const auto & tuple = DB::get<const Tuple &>(collected[off]);
flattened[off] = tuple[column_index];
}
auto & col = columns[action.columns[column_index]];

View File

@ -151,7 +151,7 @@ private:
{
info.special_parser.is_array = true;
info.type = applyVisitor(FieldToDataType(), info.literal->value);
auto nested_type = dynamic_cast<const DataTypeArray &>(*info.type).getNestedType();
auto nested_type = assert_cast<const DataTypeArray &>(*info.type).getNestedType();
/// It can be Array(Nullable(nested_type))
bool array_of_nullable = false;
@ -212,9 +212,9 @@ private:
/// E.g. template of "position('some string', 'other string') != 0" is
/// ["position", "(", DataTypeString, ",", DataTypeString, ")", "!=", DataTypeUInt64]
ConstantExpressionTemplate::TemplateStructure::TemplateStructure(LiteralsInfo & replaced_literals, TokenIterator expression_begin, TokenIterator expression_end,
ASTPtr & expression, const IDataType & result_type, const Context & context)
ASTPtr & expression, const IDataType & result_type, bool null_as_default_, const Context & context)
{
null_as_default = null_as_default_;
std::sort(replaced_literals.begin(), replaced_literals.end(), [](const LiteralInfo & a, const LiteralInfo & b)
{
@ -252,16 +252,17 @@ ConstantExpressionTemplate::TemplateStructure::TemplateStructure(LiteralsInfo &
++prev_end;
}
addNodesToCastResult(result_type, expression);
result_column_name = expression->getColumnName();
addNodesToCastResult(result_type, expression, null_as_default);
auto syntax_result = SyntaxAnalyzer(context).analyze(expression, literals.getNamesAndTypesList());
result_column_name = expression->getColumnName();
actions_on_literals = ExpressionAnalyzer(expression, syntax_result, context).getActions(false);
}
size_t ConstantExpressionTemplate::TemplateStructure::getTemplateHash(const ASTPtr & expression,
const LiteralsInfo & replaced_literals,
const DataTypePtr & result_column_type,
bool null_as_default,
const String & salt)
{
/// TODO distinguish expressions with the same AST and different tokens (e.g. "CAST(expr, 'Type')" and "CAST(expr AS Type)")
@ -272,6 +273,7 @@ size_t ConstantExpressionTemplate::TemplateStructure::getTemplateHash(const ASTP
for (const auto & info : replaced_literals)
hash_state.update(info.type->getName());
hash_state.update(null_as_default);
/// Allows distinguish expression in the last column in Values format
hash_state.update(salt);
@ -288,6 +290,7 @@ size_t ConstantExpressionTemplate::TemplateStructure::getTemplateHash(const ASTP
ConstantExpressionTemplate::TemplateStructurePtr
ConstantExpressionTemplate::Cache::getFromCacheOrConstruct(const DataTypePtr & result_column_type,
bool null_as_default,
TokenIterator expression_begin,
TokenIterator expression_end,
const ASTPtr & expression_,
@ -298,17 +301,18 @@ ConstantExpressionTemplate::Cache::getFromCacheOrConstruct(const DataTypePtr & r
TemplateStructurePtr res;
ASTPtr expression = expression_->clone();
ReplaceLiteralsVisitor visitor(context);
visitor.visit(expression, result_column_type->isNullable());
visitor.visit(expression, result_column_type->isNullable() || null_as_default);
ReplaceQueryParameterVisitor param_visitor(context.getQueryParameters());
param_visitor.visit(expression);
size_t template_hash = TemplateStructure::getTemplateHash(expression, visitor.replaced_literals, result_column_type, salt);
size_t template_hash = TemplateStructure::getTemplateHash(expression, visitor.replaced_literals, result_column_type, null_as_default, salt);
auto iter = cache.find(template_hash);
if (iter == cache.end())
{
if (max_size <= cache.size())
cache.clear();
res = std::make_shared<TemplateStructure>(visitor.replaced_literals, expression_begin, expression_end, expression, *result_column_type, context);
res = std::make_shared<TemplateStructure>(visitor.replaced_literals, expression_begin, expression_end,
expression, *result_column_type, null_as_default, context);
cache.insert({template_hash, res});
if (found_in_cache)
*found_in_cache = false;
@ -416,7 +420,7 @@ bool ConstantExpressionTemplate::parseLiteralAndAssertType(ReadBuffer & istr, co
const Field & array = ast->as<ASTLiteral &>().value;
auto array_type = applyVisitor(FieldToDataType(), array);
auto nested_type = dynamic_cast<const DataTypeArray &>(*array_type).getNestedType();
auto nested_type = assert_cast<const DataTypeArray &>(*array_type).getNestedType();
if (type_info.is_nullable)
if (auto nullable = dynamic_cast<const DataTypeNullable *>(nested_type.get()))
nested_type = nullable->getNestedType();
@ -488,7 +492,7 @@ bool ConstantExpressionTemplate::parseLiteralAndAssertType(ReadBuffer & istr, co
}
}
ColumnPtr ConstantExpressionTemplate::evaluateAll()
ColumnPtr ConstantExpressionTemplate::evaluateAll(BlockMissingValues & nulls, size_t column_idx, size_t offset)
{
Block evaluated = structure->literals.cloneWithColumns(std::move(columns));
columns = structure->literals.cloneEmptyColumns();
@ -506,23 +510,40 @@ ColumnPtr ConstantExpressionTemplate::evaluateAll()
ErrorCodes::LOGICAL_ERROR);
rows_count = 0;
return evaluated.getByName(structure->result_column_name).column->convertToFullColumnIfConst();
ColumnPtr res = evaluated.getByName(structure->result_column_name).column->convertToFullColumnIfConst();
if (!structure->null_as_default)
return res;
/// Extract column with evaluated expression and mask for NULLs
auto & tuple = assert_cast<const ColumnTuple &>(*res);
if (tuple.tupleSize() != 2)
throw Exception("Invalid tuple size, it'a a bug", ErrorCodes::LOGICAL_ERROR);
auto & is_null = assert_cast<const ColumnUInt8 &>(tuple.getColumn(1));
for (size_t i = 0; i < is_null.size(); ++i)
if (is_null.getUInt(i))
nulls.setBit(column_idx, offset + i);
return tuple.getColumnPtr(0);
}
void ConstantExpressionTemplate::TemplateStructure::addNodesToCastResult(const IDataType & result_column_type, ASTPtr & expr)
void ConstantExpressionTemplate::TemplateStructure::addNodesToCastResult(const IDataType & result_column_type, ASTPtr & expr, bool null_as_default)
{
auto result_type = std::make_shared<ASTLiteral>(result_column_type.getName());
/// Replace "expr" with "CAST(expr, 'TypeName')"
/// or with "(CAST(assumeNotNull(expr as _expression), 'TypeName'), isNull(_expression))" if null_as_default is true
if (null_as_default)
{
expr->setAlias("_expression");
expr = makeASTFunction("assumeNotNull", std::move(expr));
}
auto arguments = std::make_shared<ASTExpressionList>();
arguments->children.push_back(std::move(expr));
arguments->children.push_back(std::move(result_type));
expr = makeASTFunction("CAST", std::move(expr), std::make_shared<ASTLiteral>(result_column_type.getName()));
auto cast = std::make_shared<ASTFunction>();
cast->name = "CAST";
cast->arguments = std::move(arguments);
cast->children.push_back(cast->arguments);
expr = std::move(cast);
if (null_as_default)
{
auto is_null = makeASTFunction("isNull", std::make_shared<ASTIdentifier>("_expression"));
expr = makeASTFunction("tuple", std::move(expr), std::move(is_null));
}
}
}

View File

@ -21,10 +21,11 @@ class ConstantExpressionTemplate : boost::noncopyable
struct TemplateStructure : boost::noncopyable
{
TemplateStructure(LiteralsInfo & replaced_literals, TokenIterator expression_begin, TokenIterator expression_end,
ASTPtr & expr, const IDataType & result_type, const Context & context);
ASTPtr & expr, const IDataType & result_type, bool null_as_default_, const Context & context);
static void addNodesToCastResult(const IDataType & result_column_type, ASTPtr & expr);
static size_t getTemplateHash(const ASTPtr & expression, const LiteralsInfo & replaced_literals, const DataTypePtr & result_column_type, const String & salt);
static void addNodesToCastResult(const IDataType & result_column_type, ASTPtr & expr, bool null_as_default);
static size_t getTemplateHash(const ASTPtr & expression, const LiteralsInfo & replaced_literals,
const DataTypePtr & result_column_type, bool null_as_default, const String & salt);
String result_column_name;
@ -35,6 +36,7 @@ class ConstantExpressionTemplate : boost::noncopyable
ExpressionActionsPtr actions_on_literals;
std::vector<SpecialParserType> special_parser;
bool null_as_default;
};
public:
@ -50,6 +52,7 @@ public:
/// Deduce template of expression of type result_column_type and add it to cache (or use template from cache)
TemplateStructurePtr getFromCacheOrConstruct(const DataTypePtr & result_column_type,
bool null_as_default,
TokenIterator expression_begin,
TokenIterator expression_end,
const ASTPtr & expression_,
@ -65,8 +68,9 @@ public:
/// and parse literals into temporary columns
bool parseExpression(ReadBuffer & istr, const FormatSettings & settings);
/// Evaluate batch of expressions were parsed using template
ColumnPtr evaluateAll();
/// Evaluate batch of expressions were parsed using template.
/// If template was deduced with null_as_default == true, set bits in nulls for NULL values in column_idx, starting from offset.
ColumnPtr evaluateAll(BlockMissingValues & nulls, size_t column_idx, size_t offset = 0);
size_t rowsCount() const { return rows_count; }

View File

@ -3,6 +3,7 @@
#include <Processors/Formats/Impl/JSONEachRowRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -129,21 +130,23 @@ void JSONEachRowRowInputFormat::skipUnknownField(const StringRef & name_ref)
void JSONEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
{
if (read_columns[index])
if (seen_columns[index])
throw Exception("Duplicate field found while parsing JSONEachRow format: " + columnName(index), ErrorCodes::INCORRECT_DATA);
try
{
auto & header = getPort().getHeader();
header.getByPosition(index).type->deserializeAsTextJSON(*columns[index], in, format_settings);
seen_columns[index] = read_columns[index] = true;
const auto & type = getPort().getHeader().getByPosition(index).type;
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextJSON(*columns[index], in, format_settings, type);
else
type->deserializeAsTextJSON(*columns[index], in, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while read the value of key " + columnName(index) + ")");
throw;
}
read_columns[index] = true;
}
inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index)
@ -230,8 +233,8 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
size_t num_columns = columns.size();
/// Set of columns for which the values were read. The rest will be filled with default values.
read_columns.assign(num_columns, false);
seen_columns.assign(num_columns, false);
nested_prefix_length = 0;
readJSONObject(columns);
@ -239,7 +242,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
auto & header = getPort().getHeader();
/// Fill non-visited columns with the default values.
for (size_t i = 0; i < num_columns; ++i)
if (!read_columns[i])
if (!seen_columns[i])
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
/// return info about defaults set

View File

@ -55,7 +55,12 @@ private:
/// the nested column names are 'n.i' and 'n.s' and the nested prefix is 'n.'
size_t nested_prefix_length = 0;
/// Set of columns for which the values were read. The rest will be filled with default values.
std::vector<UInt8> read_columns;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> seen_columns;
/// These sets may be different, because if null_as_default=1 read_columns[i] will be false and seen_columns[i] will be true
/// for row like {..., "non-nullable column name" : null, ...}
/// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
using NameMap = HashMap<StringRef, size_t, StringRefHash>;

View File

@ -1,6 +1,7 @@
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/TSKVRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
@ -98,6 +99,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// Set of columns for which the values were read. The rest will be filled with default values.
read_columns.assign(num_columns, false);
seen_columns.assign(num_columns, false);
if (unlikely(*in.position() == '\n'))
{
@ -131,12 +133,15 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
{
index = *lookupResultGetMapped(it);
if (read_columns[index])
if (seen_columns[index])
throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
read_columns[index] = true;
header.getByPosition(index).type->deserializeAsTextEscaped(*columns[index], in, format_settings);
seen_columns[index] = read_columns[index] = true;
const auto & type = getPort().getHeader().getByPosition(index).type;
if (format_settings.null_as_default && !type->isNullable())
read_columns[index] = DataTypeNullable::deserializeTextEscaped(*columns[index], in, format_settings, type);
else
header.getByPosition(index).type->deserializeAsTextEscaped(*columns[index], in, format_settings);
}
}
else
@ -166,7 +171,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
if (index >= 0)
{
columns[index]->popBack(1);
read_columns[index] = false;
seen_columns[index] = read_columns[index] = false;
}
throw Exception("Found garbage after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
@ -176,7 +181,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// Fill in the not met columns with default values.
for (size_t i = 0; i < num_columns; ++i)
if (!read_columns[i])
if (!seen_columns[i])
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
/// return info about defaults set

View File

@ -41,7 +41,12 @@ private:
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
NameMap name_map;
/// Set of columns for which the values were read. The rest will be filled with default values.
std::vector<UInt8> read_columns;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> seen_columns;
/// These sets may be different, because if null_as_default=1 read_columns[i] will be false and seen_columns[i] will be true
/// for row like ..., non-nullable column name=\N, ...
};
}

View File

@ -6,6 +6,7 @@
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -117,9 +118,10 @@ void TabSeparatedRowInputFormat::fillUnreadColumnsWithDefaults(MutableColumns &
}
for (const auto column_index : columns_to_fill_with_default_values)
{
data_types[column_index]->insertDefaultInto(*columns[column_index]);
row_read_extension.read_columns = read_columns;
row_read_extension.read_columns[column_index] = false;
}
}
@ -174,12 +176,15 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
updateDiagnosticInfo();
ext.read_columns.assign(read_columns.size(), true);
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & column_index = column_indexes_for_input_fields[file_column];
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
if (column_index)
{
data_types[*column_index]->deserializeAsTextEscaped(*columns[*column_index], in, format_settings);
const auto & type = data_types[*column_index];
ext.read_columns[*column_index] = readField(*columns[*column_index], type, is_last_file_column);
}
else
{
@ -206,6 +211,22 @@ bool TabSeparatedRowInputFormat::readRow(MutableColumns & columns, RowReadExtens
return true;
}
bool TabSeparatedRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column)
{
const bool at_delimiter = !is_last_file_column && !in.eof() && *in.position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (in.eof() || *in.position() == '\n');
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
column.insertDefault();
return false;
}
else if (format_settings.null_as_default && !type->isNullable())
return DataTypeNullable::deserializeTextEscaped(column, in, format_settings, type);
type->deserializeAsTextEscaped(column, in, format_settings);
return true;
}
bool TabSeparatedRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out)
{
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
@ -303,7 +324,10 @@ void TabSeparatedRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, I
{
prev_pos = in.position();
if (column_indexes_for_input_fields[file_column])
type->deserializeAsTextEscaped(column, in, format_settings);
{
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
readField(column, type, is_last_file_column);
}
else
{
NullSink null_sink;

View File

@ -41,6 +41,8 @@ private:
std::vector<UInt8> read_columns;
std::vector<size_t> columns_to_fill_with_default_values;
bool readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column);
void addInputColumn(const String & column_name);
void setupAllColumnsByTableSchema();
void fillUnreadColumnsWithDefaults(MutableColumns & columns, RowReadExtension& ext);

View File

@ -4,6 +4,7 @@
#include <IO/Operators.h>
#include <DataTypes/DataTypeNothing.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -23,7 +24,8 @@ TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer
ParsedTemplateFormatString format_, ParsedTemplateFormatString row_format_)
: RowInputFormatWithDiagnosticInfo(header_, buf, params_), buf(in_), data_types(header_.getDataTypes()),
settings(settings_), ignore_spaces(ignore_spaces_),
format(std::move(format_)), row_format(std::move(row_format_))
format(std::move(format_)), row_format(std::move(row_format_)),
default_csv_delimiter(settings.csv.delimiter)
{
/// Validate format string for result set
bool has_data = false;
@ -68,6 +70,10 @@ TemplateRowInputFormat::TemplateRowInputFormat(const Block & header_, ReadBuffer
column_in_format[col_idx] = true;
}
}
for (size_t i = 0; i < header_.columns(); ++i)
if (!column_in_format[i])
always_default_columns.push_back(i);
}
void TemplateRowInputFormat::readPrefix()
@ -166,8 +172,7 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
if (row_format.format_idx_to_column_idx[i])
{
size_t col_idx = *row_format.format_idx_to_column_idx[i];
deserializeField(*data_types[col_idx], *columns[col_idx], row_format.formats[i]);
extra.read_columns[col_idx] = true;
extra.read_columns[col_idx] = deserializeField(data_types[col_idx], *columns[col_idx], i);
}
else
skipField(row_format.formats[i]);
@ -177,30 +182,47 @@ bool TemplateRowInputFormat::readRow(MutableColumns & columns, RowReadExtension
skipSpaces();
assertString(row_format.delimiters.back(), buf);
for (size_t i = 0; i < columns.size(); ++i)
if (!extra.read_columns[i])
data_types[i]->insertDefaultInto(*columns[i]);
for (const auto & idx : always_default_columns)
data_types[idx]->insertDefaultInto(*columns[idx]);
return true;
}
void TemplateRowInputFormat::deserializeField(const IDataType & type, IColumn & column, ColumnFormat col_format)
bool TemplateRowInputFormat::deserializeField(const DataTypePtr & type, IColumn & column, size_t file_column)
{
ColumnFormat col_format = row_format.formats[file_column];
bool read = true;
bool parse_as_nullable = settings.null_as_default && !type->isNullable();
try
{
switch (col_format)
{
case ColumnFormat::Escaped:
type.deserializeAsTextEscaped(column, buf, settings);
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextEscaped(column, buf, settings, type);
else
type->deserializeAsTextEscaped(column, buf, settings);
break;
case ColumnFormat::Quoted:
type.deserializeAsTextQuoted(column, buf, settings);
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextQuoted(column, buf, settings, type);
else
type->deserializeAsTextQuoted(column, buf, settings);
break;
case ColumnFormat::Csv:
type.deserializeAsTextCSV(column, buf, settings);
/// Will read unquoted string until settings.csv.delimiter
settings.csv.delimiter = row_format.delimiters[file_column + 1].empty() ? default_csv_delimiter :
row_format.delimiters[file_column + 1].front();
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextCSV(column, buf, settings, type);
else
type->deserializeAsTextCSV(column, buf, settings);
break;
case ColumnFormat::Json:
type.deserializeAsTextJSON(column, buf, settings);
if (parse_as_nullable)
read = DataTypeNullable::deserializeTextJSON(column, buf, settings, type);
else
type->deserializeAsTextJSON(column, buf, settings);
break;
default:
__builtin_unreachable();
@ -212,6 +234,7 @@ void TemplateRowInputFormat::deserializeField(const IDataType & type, IColumn &
throwUnexpectedEof();
throw;
}
return read;
}
void TemplateRowInputFormat::skipField(TemplateRowInputFormat::ColumnFormat col_format)
@ -391,7 +414,7 @@ void TemplateRowInputFormat::tryDeserializeFiled(const DataTypePtr & type, IColu
{
prev_pos = buf.position();
if (row_format.format_idx_to_column_idx[file_column])
deserializeField(*type, column, row_format.formats[file_column]);
deserializeField(type, column, file_column);
else
skipField(row_format.formats[file_column]);
curr_pos = buf.position();

View File

@ -29,7 +29,7 @@ public:
void syncAfterError() override;
private:
void deserializeField(const IDataType & type, IColumn & column, ColumnFormat col_format);
bool deserializeField(const DataTypePtr & type, IColumn & column, size_t file_column);
void skipField(ColumnFormat col_format);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(buf); }
@ -50,13 +50,15 @@ private:
PeekableReadBuffer buf;
DataTypes data_types;
const FormatSettings settings;
FormatSettings settings;
const bool ignore_spaces;
ParsedTemplateFormatString format;
ParsedTemplateFormatString row_format;
size_t format_data_idx;
bool end_of_stream = false;
std::vector<size_t> always_default_columns;
char default_csv_delimiter;
};
}

View File

@ -11,6 +11,7 @@
#include <Common/typeid_cast.h>
#include <common/find_symbols.h>
#include <Parsers/ASTLiteral.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
@ -47,6 +48,7 @@ Chunk ValuesBlockInputFormat::generate()
{
const Block & header = getPort().getHeader();
MutableColumns columns = header.cloneEmptyColumns();
block_missing_values.clear();
for (size_t rows_in_block = 0; rows_in_block < params.max_block_size; ++rows_in_block)
{
@ -55,7 +57,7 @@ Chunk ValuesBlockInputFormat::generate()
skipWhitespaceIfAny(buf);
if (buf.eof() || *buf.position() == ';')
break;
readRow(columns);
readRow(columns, rows_in_block);
if (params.callback)
params.callback();
}
@ -73,10 +75,10 @@ Chunk ValuesBlockInputFormat::generate()
if (!templates[i] || !templates[i]->rowsCount())
continue;
if (columns[i]->empty())
columns[i] = std::move(*templates[i]->evaluateAll()).mutate();
columns[i] = std::move(*templates[i]->evaluateAll(block_missing_values, i)).mutate();
else
{
ColumnPtr evaluated = templates[i]->evaluateAll();
ColumnPtr evaluated = templates[i]->evaluateAll(block_missing_values, i, columns[i]->size());
columns[i]->insertRangeFrom(*evaluated, 0, evaluated->size());
}
}
@ -91,7 +93,7 @@ Chunk ValuesBlockInputFormat::generate()
return Chunk{std::move(columns), rows_in_block};
}
void ValuesBlockInputFormat::readRow(MutableColumns & columns)
void ValuesBlockInputFormat::readRow(MutableColumns & columns, size_t row_num)
{
assertChar('(', buf);
@ -99,17 +101,22 @@ void ValuesBlockInputFormat::readRow(MutableColumns & columns)
{
skipWhitespaceIfAny(buf);
PeekableReadBufferCheckpoint checkpoint{buf};
bool read;
/// Parse value using fast streaming parser for literals and slow SQL parser for expressions.
/// If there is SQL expression in some row, template of this expression will be deduced,
/// so it makes possible to parse the following rows much faster
/// if expressions in the following rows have the same structure
if (parser_type_for_column[column_idx] == ParserType::Streaming)
tryReadValue(*columns[column_idx], column_idx);
read = tryReadValue(*columns[column_idx], column_idx);
else if (parser_type_for_column[column_idx] == ParserType::BatchTemplate)
tryParseExpressionUsingTemplate(columns[column_idx], column_idx);
read = tryParseExpressionUsingTemplate(columns[column_idx], column_idx);
else /// if (parser_type_for_column[column_idx] == ParserType::SingleExpressionEvaluation)
parseExpression(*columns[column_idx], column_idx);
read = parseExpression(*columns[column_idx], column_idx);
if (!read)
block_missing_values.setBit(column_idx, row_num);
/// If read is true, value still may be missing. Bit mask for these values will be copied from ConstantExpressionTemplate later.
}
skipWhitespaceIfAny(buf);
@ -119,22 +126,22 @@ void ValuesBlockInputFormat::readRow(MutableColumns & columns)
++total_rows;
}
void ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx)
bool ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx)
{
/// Try to parse expression using template if one was successfully deduced while parsing the first row
if (templates[column_idx]->parseExpression(buf, format_settings))
{
++rows_parsed_using_template[column_idx];
return;
return true;
}
/// Expression in the current row is not match template deduced on the first row.
/// Evaluate expressions, which were parsed using this template.
if (column->empty())
column = std::move(*templates[column_idx]->evaluateAll()).mutate();
column = std::move(*templates[column_idx]->evaluateAll(block_missing_values, column_idx)).mutate();
else
{
ColumnPtr evaluated = templates[column_idx]->evaluateAll();
ColumnPtr evaluated = templates[column_idx]->evaluateAll(block_missing_values, column_idx, column->size());
column->insertRangeFrom(*evaluated, 0, evaluated->size());
}
/// Do not use this template anymore
@ -142,19 +149,25 @@ void ValuesBlockInputFormat::tryParseExpressionUsingTemplate(MutableColumnPtr &
buf.rollbackToCheckpoint();
/// It will deduce new template or fallback to slow SQL parser
parseExpression(*column, column_idx);
return parseExpression(*column, column_idx);
}
void ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
bool ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
{
bool rollback_on_exception = false;
try
{
types[column_idx]->deserializeAsTextQuoted(column, buf, format_settings);
bool read = true;
const auto & type = types[column_idx];
if (format_settings.null_as_default && !type->isNullable())
read = DataTypeNullable::deserializeTextQuoted(column, buf, format_settings, type);
else
type->deserializeAsTextQuoted(column, buf, format_settings);
rollback_on_exception = true;
skipWhitespaceIfAny(buf);
assertDelimiterAfterValue(column_idx);
return read;
}
catch (const Exception & e)
{
@ -166,12 +179,11 @@ void ValuesBlockInputFormat::tryReadValue(IColumn & column, size_t column_idx)
/// Switch to SQL parser and don't try to use streaming parser for complex expressions
/// Note: Throwing exceptions for each expression may be very slow because of stacktraces
buf.rollbackToCheckpoint();
parseExpression(column, column_idx);
return parseExpression(column, column_idx);
}
}
void
ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
bool ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
{
const Block & header = getPort().getHeader();
const IDataType & type = *header.getByPosition(column_idx).type;
@ -223,7 +235,7 @@ ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
if (ok)
{
parser_type_for_column[column_idx] = ParserType::Streaming;
return;
return true;
}
else if (rollback_on_exception)
column.popBack(1);
@ -243,7 +255,8 @@ ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
bool found_in_cache = false;
const auto & result_type = header.getByPosition(column_idx).type;
const char * delimiter = (column_idx + 1 == num_columns) ? ")" : ",";
auto structure = templates_cache.getFromCacheOrConstruct(result_type, TokenIterator(tokens), token_iterator,
auto structure = templates_cache.getFromCacheOrConstruct(result_type, format_settings.null_as_default,
TokenIterator(tokens), token_iterator,
ast, *context, &found_in_cache, delimiter);
templates[column_idx].emplace(structure);
if (found_in_cache)
@ -256,7 +269,7 @@ ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
{
++rows_parsed_using_template[column_idx];
parser_type_for_column[column_idx] = ParserType::BatchTemplate;
return;
return true;
}
}
catch (...)
@ -290,6 +303,11 @@ ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
/// Check that we are indeed allowed to insert a NULL.
if (value.isNull() && !type.isNullable())
{
if (format_settings.null_as_default)
{
type.insertDefaultInto(column);
return false;
}
buf.rollbackToCheckpoint();
throw Exception{"Cannot insert NULL value into a column of type '" + type.getName() + "'"
+ " at: " +
@ -298,6 +316,7 @@ ValuesBlockInputFormat::parseExpression(IColumn & column, size_t column_idx)
}
column.insert(value);
return true;
}
/// Can be used in fileSegmentationEngine for parallel parsing of Values

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "ValuesBlockInputFormat"; }
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
private:
enum class ParserType
{
@ -45,11 +47,11 @@ private:
Chunk generate() override;
void readRow(MutableColumns & columns);
void readRow(MutableColumns & columns, size_t row_num);
void tryParseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx);
ALWAYS_INLINE inline void tryReadValue(IColumn & column, size_t column_idx);
void parseExpression(IColumn & column, size_t column_idx);
bool tryParseExpressionUsingTemplate(MutableColumnPtr & column, size_t column_idx);
ALWAYS_INLINE inline bool tryReadValue(IColumn & column, size_t column_idx);
bool parseExpression(IColumn & column, size_t column_idx);
ALWAYS_INLINE inline void assertDelimiterAfterValue(size_t column_idx);
ALWAYS_INLINE inline bool checkDelimiterAfterValue(size_t column_idx);
@ -81,6 +83,8 @@ private:
ConstantExpressionTemplate::Cache templates_cache;
DataTypes types;
BlockMissingValues block_missing_values;
};
}

View File

@ -88,7 +88,7 @@ public:
void schedule(EventCounter & watch) override
{
active = true;
pool.schedule([&watch, this]
pool.scheduleOrThrowOnError([&watch, this]
{
usleep(sleep_useconds);
current_chunk = generate();

View File

@ -339,11 +339,19 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i);
}
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->schedule(runWritingJob(job, block));
try
{
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block));
}
catch (...)
{
pool->wait();
throw;
}
try
{
@ -373,17 +381,27 @@ void DistributedBlockOutputStream::writeSuffix()
if (insert_sync && pool)
{
finished_jobs_count = 0;
for (auto & shard_jobs : per_shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
try
{
for (auto & shard_jobs : per_shard_jobs)
{
if (job.stream)
for (JobReplica & job : shard_jobs.replicas_jobs)
{
pool->schedule([&job] ()
if (job.stream)
{
job.stream->writeSuffix();
});
pool->scheduleOrThrowOnError([&job]()
{
job.stream->writeSuffix();
});
}
}
}
}
catch (...)
{
pool->wait();
throw;
}
try
{

View File

@ -15,6 +15,7 @@
#include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
#include <Common/TypePromotion.h>
#include <optional>
#include <shared_mutex>
@ -63,7 +64,7 @@ struct ColumnSize
* - data storage structure (compression, etc.)
* - concurrent access to data (locks, etc.)
*/
class IStorage : public std::enable_shared_from_this<IStorage>
class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromotion<IStorage>
{
public:
IStorage() = default;

View File

@ -273,6 +273,8 @@ bool StorageLiveView::hasColumn(const String & column_name) const
Block StorageLiveView::getHeader() const
{
std::lock_guard lock(sample_block_lock);
if (!sample_block)
{
auto storage = global_context.getTable(select_database_name, select_table_name);
@ -375,7 +377,7 @@ void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, co
{
while (1)
{
std::unique_lock lock(storage->no_users_thread_mutex);
std::unique_lock lock(storage->no_users_thread_wakeup_mutex);
if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
{
storage->no_users_thread_wakeup = false;
@ -421,17 +423,22 @@ void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
if (is_temporary)
{
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
if (shutdown_called)
return;
if (no_users_thread.joinable())
{
{
std::lock_guard lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_wakeup_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
no_users_thread.join();
}
{
std::lock_guard lock(no_users_thread_mutex);
std::lock_guard lock(no_users_thread_wakeup_mutex);
no_users_thread_wakeup = false;
}
if (!is_dropped)
@ -453,12 +460,15 @@ void StorageLiveView::shutdown()
if (!shutdown_called.compare_exchange_strong(expected, true))
return;
if (no_users_thread.joinable())
{
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
if (no_users_thread.joinable())
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
{
std::lock_guard lock(no_users_thread_wakeup_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
}
}
}
@ -466,8 +476,12 @@ void StorageLiveView::shutdown()
StorageLiveView::~StorageLiveView()
{
shutdown();
if (no_users_thread.joinable())
no_users_thread.detach();
{
std::lock_guard lock(no_users_thread_mutex);
if (no_users_thread.joinable())
no_users_thread.detach();
}
}
void StorageLiveView::drop(TableStructureWriteLockHolder &)
@ -539,11 +553,14 @@ BlockInputStreams StorageLiveView::watch(
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
if (no_users_thread.joinable())
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
if (no_users_thread.joinable())
{
std::lock_guard lock(no_users_thread_wakeup_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
}
{
@ -567,11 +584,14 @@ BlockInputStreams StorageLiveView::watch(
context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
if (no_users_thread.joinable())
{
std::lock_guard lock(no_users_thread_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
std::lock_guard no_users_thread_lock(no_users_thread_mutex);
if (no_users_thread.joinable())
{
std::lock_guard lock(no_users_thread_wakeup_mutex);
no_users_thread_wakeup = true;
no_users_thread_condition.notify_one();
}
}
{

View File

@ -73,7 +73,7 @@ public:
}
/// No users thread mutex, predicate and wake up condition
void startNoUsersThread(const UInt64 & timeout);
std::mutex no_users_thread_mutex;
std::mutex no_users_thread_wakeup_mutex;
bool no_users_thread_wakeup = false;
std::condition_variable no_users_thread_condition;
/// Get blocks hash
@ -149,6 +149,8 @@ private:
ASTPtr inner_query;
Context & global_context;
bool is_temporary = false;
/// Mutex to protect access to sample block
mutable std::mutex sample_block_lock;
mutable Block sample_block;
/// Mutex for the blocks and ready condition
@ -168,6 +170,7 @@ private:
/// Background thread for temporary tables
/// which drops this table if there are no users
static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
std::mutex no_users_thread_mutex;
std::thread no_users_thread;
std::atomic<bool> shutdown_called = false;
std::atomic<bool> start_no_users_thread_called = false;

View File

@ -802,7 +802,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (size_t i = 0; i < part_names_with_disks.size(); ++i)
{
pool.schedule([&, i]
pool.scheduleOrThrowOnError([&, i]
{
const auto & part_name = part_names_with_disks[i].first;
const auto part_disk_ptr = part_names_with_disks[i].second;
@ -1155,7 +1155,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
for (const DataPartPtr & part : parts_to_remove)
{
pool.schedule([&]
pool.scheduleOrThrowOnError([&]
{
LOG_DEBUG(log, "Removing part from filesystem " << part->name);
part->remove();
@ -2488,12 +2488,12 @@ void MergeTreeData::throwInsertIfNeeded() const
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
const MergeTreePartInfo & part_info, MergeTreeData::DataPartState state, DataPartsLock & /*lock*/)
{
auto committed_parts_range = getDataPartsStateRange(state);
auto current_state_parts_range = getDataPartsStateRange(state);
/// The part can be covered only by the previous or the next one in data_parts.
auto it = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{state, part_info});
if (it != committed_parts_range.end())
if (it != current_state_parts_range.end())
{
if ((*it)->info == part_info)
return *it;
@ -2501,7 +2501,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
return *it;
}
if (it != committed_parts_range.begin())
if (it != current_state_parts_range.begin())
{
--it;
if ((*it)->info.contains(part_info))

View File

@ -331,7 +331,7 @@ bool MergeTreeIndexConditionBloomFilter::traverseASTEquals(
if (which.isTuple() && function->name == "tuple")
{
const TupleBackend & tuple = get<const Tuple &>(value_field).toUnderType();
const Tuple & tuple = get<const Tuple &>(value_field);
const auto value_tuple_data_type = typeid_cast<const DataTypeTuple *>(value_type.get());
const ASTs & arguments = typeid_cast<const ASTExpressionList &>(*function->arguments).children;

View File

@ -206,7 +206,7 @@ public:
size_t bloom_filter_hashes;
/// Bloom filter seed.
size_t seed;
/// Fucntion for selecting next token.
/// Function for selecting next token.
std::unique_ptr<ITokenExtractor> token_extractor_func;
};

View File

@ -216,7 +216,7 @@ std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepar
};
if (thread_pool)
thread_pool->schedule(job);
thread_pool->scheduleOrThrowOnError(job);
else
job();
}

View File

@ -77,7 +77,9 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const
MutableColumnPtr database_column_mut = ColumnString::create();
for (const auto & database : databases)
{
if (context.hasDatabaseAccessRights(database.first))
/// Lazy database can not contain MergeTree tables
/// and it's unnecessary to load all tables of Lazy database just to filter all of them.
if (context.hasDatabaseAccessRights(database.first) && database.second->getEngineName() != "Lazy")
database_column_mut->insert(database.first);
}
block_to_filter.insert(ColumnWithTypeAndName(
@ -101,10 +103,6 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const
String database_name = (*database_column_)[i].get<String>();
const DatabasePtr database = databases.at(database_name);
/// Lazy database can not contain MergeTree tables
if (database->getEngineName() == "Lazy")
continue;
offsets[i] = i ? offsets[i - 1] : 0;
for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next())
{

View File

@ -44,7 +44,7 @@ static void parseAndInsertValues(MutableColumns & res_columns, const ASTs & args
{
const auto & [value_field, value_type_ptr] = evaluateConstantExpression(args[i], context);
const DataTypes & value_types_tuple = typeid_cast<const DataTypeTuple *>(value_type_ptr.get())->getElements();
const TupleBackend & value_tuple = value_field.safeGet<Tuple>().toUnderType();
const Tuple & value_tuple = value_field.safeGet<Tuple>();
if (value_tuple.size() != sample_block.columns())
throw Exception("Values size should match with number of columns", ErrorCodes::LOGICAL_ERROR);

View File

@ -54,6 +54,11 @@
<value>toYYYYMM</value>
<value>toYYYYMMDD</value>
<value>toYYYYMMDDhhmmss</value>
<value>timeSlot</value>
<value>toRelativeQuarterNum</value>
<value>toStartOfTenMinutes</value>
<value>toUnixTimestamp</value>
</values>
</substitution>
<substitution>
@ -70,6 +75,7 @@
<value>toDate</value>
<value>toMonday</value>
<value>toStartOfDay</value>
<value>toStartOfMonth</value>
<value>toStartOfQuarter</value>
<value>toStartOfYear</value>
@ -83,18 +89,55 @@
<value>toYYYYMM</value>
<value>toYYYYMMDD</value>
<value>toYYYYMMDDhhmmss</value>
<value>toRelativeQuarterNum</value>
<value>toUnixTimestamp</value>
</values>
</substitution>
<substitution>
<name>time_zone</name>
<values>
</substitution>
<substitution>
<name>time_zone</name>
<values>
<value>UTC</value>
<value>Europe/Moscow</value>
<value>Asia/Kolkata</value>
</values>
</substitution>
</values>
</substitution>
<substitution>
<name>binary_function</name>
<values>
<value>lessOrEquals</value>
<value>less</value>
<value>greater</value>
<value>greaterOrEquals</value>
<value>equals</value>
<value>notEquals</value>
<value>plus</value>
<value>minus</value>
<value>addDays</value>
<value>addHours</value>
<value>addMinutes</value>
<value>addMonths</value>
<value>addQuarters</value>
<value>addSeconds</value>
<value>addWeeks</value>
<value>addYears</value>
<value>subtractDays</value>
<value>subtractHours</value>
<value>subtractMinutes</value>
<value>subtractMonths</value>
<value>subtractQuarters</value>
<value>subtractSeconds</value>
<value>subtractWeeks</value>
<value>subtractYears</value>
</values>
</substitution>
</substitutions>
<query>SELECT count() FROM system.numbers WHERE NOT ignore(toDateTime('2017-01-01 00:00:00') + number % 100000000 + rand() % 100000 AS t, {datetime_transform}(t, '{time_zone}'))</query>
<query>SELECT count() FROM system.numbers WHERE NOT ignore(toDate('2017-01-01') + number % 1000 + rand() % 10 AS t, {date_transform}(t))</query>
</test>
<query>SELECT count() FROM system.numbers WHERE NOT ignore(toDateTime('2017-01-01 00:00:00') + number % 100000000 + rand() % 100000 AS t, {binary_function}(t, 1))</query>
<query>SELECT count() FROM system.numbers WHERE NOT ignore(toDateTime('2017-01-01 00:00:00') + number % 100000000 + rand() % 100000 AS t, toStartOfInterval(t, INTERVAL 1 month))</query>
</test>

View File

@ -4,10 +4,6 @@ Hello "world" 789 2016-01-03
Hello\n world 100 2016-01-04
default 1 2019-06-19
default-eof 1 2019-06-19
0 1 42 2019-07-22
1 world 3 2019-07-23
2 Hello 123 2019-06-19
3 Hello 42 2019-06-19
2016-01-01 01:02:03 1
2016-01-02 01:02:03 2
2017-08-15 13:15:01 3

View File

@ -17,17 +17,6 @@ Hello "world", 789 ,2016-01-03
$CLICKHOUSE_CLIENT --query="SELECT * FROM csv ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE csv (i Int8, s String DEFAULT 'Hello', n UInt64 DEFAULT 42, d Date DEFAULT '2019-06-19') ENGINE = Memory";
echo '\N, 1, \N, "2019-07-22"
1, world, 3, "2019-07-23"
2, \N, 123, \N
3, \N, \N, \N' | $CLICKHOUSE_CLIENT --input_format_null_as_default=1 --query="INSERT INTO csv FORMAT CSV";
$CLICKHOUSE_CLIENT --query="SELECT * FROM csv ORDER BY i";
$CLICKHOUSE_CLIENT --query="DROP TABLE csv";
$CLICKHOUSE_CLIENT --query="CREATE TABLE csv (t DateTime('Europe/Moscow'), s String) ENGINE = Memory";
echo '"2016-01-01 01:02:03","1"

View File

@ -20,3 +20,4 @@ format_template_row = '$CURDIR/00937_template_output_format_row.tmp', \
format_template_rows_between_delimiter = ';\n'";
$CLICKHOUSE_CLIENT --query="DROP TABLE template";
rm $CURDIR/00937_template_output_format_resultset.tmp $CURDIR/00937_template_output_format_row.tmp

View File

@ -50,3 +50,4 @@ $CLICKHOUSE_CLIENT --query="SELECT * FROM template2 ORDER BY n FORMAT CSV";
$CLICKHOUSE_CLIENT --query="DROP TABLE template1";
$CLICKHOUSE_CLIENT --query="DROP TABLE template2";
rm $CURDIR/00938_template_input_format_resultset.tmp $CURDIR/00938_template_input_format_row.tmp

View File

@ -11,7 +11,7 @@ SELECT
hash_of_uncompressed_files,
uncompressed_hash_of_compressed_files
FROM system.parts
WHERE table = 'test_00961';
WHERE table = 'test_00961' and database = currentDatabase();
DROP TABLE test_00961;

View File

@ -1 +1,2 @@
default merge_ab x UInt8 0 0 0 0 0 0 0
default as_kafka x UInt8 0 0 0 0 0 0 0

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS merge_a;
DROP TABLE IF EXISTS merge_b;
DROP TABLE IF EXISTS merge_ab;
DROP TABLE IF EXISTS kafka;
DROP TABLE IF EXISTS as_kafka;
CREATE TABLE merge_a (x UInt8) ENGINE = StripeLog;
CREATE TABLE merge_b (x UInt8) ENGINE = StripeLog;
CREATE TABLE merge_ab AS merge(currentDatabase(), '^merge_[ab]$');
CREATE TABLE kafka (x UInt8)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka',
kafka_topic_list = 'topic',
kafka_group_name = 'group',
kafka_format = 'CSV';
CREATE TABLE as_kafka AS kafka ENGINE = Memory;
SELECT * FROM system.columns WHERE database = currentDatabase() AND table = 'merge_ab';
SELECT * FROM system.columns WHERE database = currentDatabase() AND table = 'as_kafka';
DROP TABLE merge_a;
DROP TABLE merge_b;
DROP TABLE merge_ab;
DROP TABLE kafka;
DROP TABLE as_kafka;

View File

@ -1,13 +0,0 @@
DROP TABLE IF EXISTS merge_a;
DROP TABLE IF EXISTS merge_b;
DROP TABLE IF EXISTS merge_ab;
CREATE TABLE merge_a (x UInt8) ENGINE = StripeLog;
CREATE TABLE merge_b (x UInt8) ENGINE = StripeLog;
CREATE TABLE merge_ab AS merge(currentDatabase(), '^merge_[ab]$');
SELECT * FROM system.columns WHERE database = currentDatabase() AND table = 'merge_ab';
DROP TABLE merge_a;
DROP TABLE merge_b;
DROP TABLE merge_ab;

View File

@ -3,6 +3,7 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CLIENT} -n -q "DROP DATABASE IF EXISTS testlazy"
${CLICKHOUSE_CLIENT} -n -q "
CREATE DATABASE testlazy ENGINE = Lazy(1);
@ -11,6 +12,8 @@ ${CLICKHOUSE_CLIENT} -n -q "
CREATE TABLE testlazy.tlog (a UInt64, b UInt64) ENGINE = TinyLog;
"
${CLICKHOUSE_CLIENT} -q "SELECT * FROM system.parts WHERE database = 'testlazy'";
sleep 1.5
${CLICKHOUSE_CLIENT} -q "

Some files were not shown because too many files have changed in this diff Show More