Merge remote-tracking branch 'upstream/master' into fix25

This commit is contained in:
proller 2019-08-20 18:43:40 +03:00
commit 78e7beb160
199 changed files with 5476 additions and 631 deletions

View File

@ -1,3 +1,54 @@
## ClickHouse release 19.13.2.19, 2019-08-14
### New Feature
* Sampling profiler on query level. [Example](https://gist.github.com/alexey-milovidov/92758583dd41c24c360fdb8d6a4da194). [#4247](https://github.com/yandex/ClickHouse/issues/4247) ([laplab](https://github.com/laplab)) [#6124](https://github.com/yandex/ClickHouse/pull/6124) ([alexey-milovidov](https://github.com/alexey-milovidov)) [#6250](https://github.com/yandex/ClickHouse/pull/6250) [#6283](https://github.com/yandex/ClickHouse/pull/6283) [#6386](https://github.com/yandex/ClickHouse/pull/6386)
* Allow to specify a list of columns with `COLUMNS('regexp')` expression that works like a more sophisticated variant of `*` asterisk. [#5951](https://github.com/yandex/ClickHouse/pull/5951) ([mfridental](https://github.com/mfridental)), ([alexey-milovidov](https://github.com/alexey-milovidov))
* `CREATE TABLE AS table_function()` is now possible [#6057](https://github.com/yandex/ClickHouse/pull/6057) ([dimarub2000](https://github.com/dimarub2000))
* Adam optimizer for stochastic gradient descent is used by default in `stochasticLinearRegression()` and `stochasticLogisticRegression()` aggregate functions, because it shows good quality without almost any tuning. [#6000](https://github.com/yandex/ClickHouse/pull/6000) ([Quid37](https://github.com/Quid37))
* Added functions for working with the сustom week number [#5212](https://github.com/yandex/ClickHouse/pull/5212) ([Andy Yang](https://github.com/andyyzh))
* `RENAME` queries now work with all storages. [#5953](https://github.com/yandex/ClickHouse/pull/5953) ([Ivan](https://github.com/abyss7))
* Now client receive logs from server with any desired level by setting `send_logs_level` regardless to the log level specified in server settings. [#5964](https://github.com/yandex/ClickHouse/pull/5964) ([Nikita Mikhaylov](https://github.com/nikitamikhaylov))
### Experimental features
* New query processing pipeline. Use `experimental_use_processors=1` option to enable it. Use for your own trouble. [#4914](https://github.com/yandex/ClickHouse/pull/4914) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
### Bug Fix
* Kafka integration has been fixed in this version.
* Fixed `DoubleDelta` encoding of `Int64` for large `DoubleDelta` values, improved `DoubleDelta` encoding for random data for `Int32`. [#5998](https://github.com/yandex/ClickHouse/pull/5998) ([Vasily Nemkov](https://github.com/Enmk))
* Fixed overestimation of `max_rows_to_read` if the setting `merge_tree_uniform_read_distribution` is set to 0. [#6019](https://github.com/yandex/ClickHouse/pull/6019) ([alexey-milovidov](https://github.com/alexey-milovidov))
### Improvement
* The setting `input_format_defaults_for_omitted_fields` is enabled by default. It enables calculation of complex default expressions for omitted fields in `JSONEachRow` and `CSV*` formats. It should be the expected behaviour but may lead to negligible performance difference or subtle incompatibilities. [#6043](https://github.com/yandex/ClickHouse/pull/6043) ([Artem Zuikov](https://github.com/4ertus2)), [#5625](https://github.com/yandex/ClickHouse/pull/5625) ([akuzm](https://github.com/akuzm))
* Throws an exception if `config.d` file doesn't have the corresponding root element as the config file [#6123](https://github.com/yandex/ClickHouse/pull/6123) ([dimarub2000](https://github.com/dimarub2000))
### Performance Improvement
* Optimize `count()`. Now it uses the smallest column (if possible). [#6028](https://github.com/yandex/ClickHouse/pull/6028) ([Amos Bird](https://github.com/amosbird))
### Build/Testing/Packaging Improvement
* Report memory usage in performance tests. [#5899](https://github.com/yandex/ClickHouse/pull/5899) ([akuzm](https://github.com/akuzm))
* Fix build with external `libcxx` [#6010](https://github.com/yandex/ClickHouse/pull/6010) ([Ivan](https://github.com/abyss7))
* Fix shared build with `rdkafka` library [#6101](https://github.com/yandex/ClickHouse/pull/6101) ([Ivan](https://github.com/abyss7))
## ClickHouse release 19.11.7.40, 2019-08-14
### Bug fix
* Kafka integration has been fixed in this version.
* Fix segfault when using `arrayReduce` for constant arguments. [#6326](https://github.com/yandex/ClickHouse/pull/6326) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed `toFloat()` monotonicity. [#6374](https://github.com/yandex/ClickHouse/pull/6374) ([dimarub2000](https://github.com/dimarub2000))
* Fix segfault with enabled `optimize_skip_unused_shards` and missing sharding key. [#6384](https://github.com/yandex/ClickHouse/pull/6384) ([CurtizJ](https://github.com/CurtizJ))
* Fixed logic of `arrayEnumerateUniqRanked` function. [#6423](https://github.com/yandex/ClickHouse/pull/6423) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Removed extra verbose logging from MySQL handler. [#6389](https://github.com/yandex/ClickHouse/pull/6389) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix wrong behavior and possible segfaults in `topK` and `topKWeighted` aggregated functions. [#6404](https://github.com/yandex/ClickHouse/pull/6404) ([CurtizJ](https://github.com/CurtizJ))
* Do not expose virtual columns in `system.columns` table. This is required for backward compatibility. [#6406](https://github.com/yandex/ClickHouse/pull/6406) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix bug with memory allocation for string fields in complex key cache dictionary. [#6447](https://github.com/yandex/ClickHouse/pull/6447) ([alesapin](https://github.com/alesapin))
* Fix bug with enabling adaptive granularity when creating new replica for `Replicated*MergeTree` table. [#6452](https://github.com/yandex/ClickHouse/pull/6452) ([alesapin](https://github.com/alesapin))
* Fix infinite loop when reading Kafka messages. [#6354](https://github.com/yandex/ClickHouse/pull/6354) ([abyss7](https://github.com/abyss7))
* Fixed the possibility of a fabricated query to cause server crash due to stack overflow in SQL parser and possibility of stack overflow in `Merge` and `Distributed` tables [#6433](https://github.com/yandex/ClickHouse/pull/6433) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fixed Gorilla encoding error on small sequences. [#6444](https://github.com/yandex/ClickHouse/pull/6444) ([Enmk](https://github.com/Enmk))
### Improvement
* Allow user to override `poll_interval` and `idle_connection_timeout` settings on connection. [#6230](https://github.com/yandex/ClickHouse/pull/6230) ([alexey-milovidov](https://github.com/alexey-milovidov))
## ClickHouse release 19.11.5.28, 2019-08-05
### Bug fix
@ -299,7 +350,7 @@ It allows to set commit mode: after every batch of messages is handled, or after
* Renamed functions `leastSqr` to `simpleLinearRegression`, `LinearRegression` to `linearRegression`, `LogisticRegression` to `logisticRegression`. [#5391](https://github.com/yandex/ClickHouse/pull/5391) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
### Performance Improvements
* Paralellize processing of parts in alter modify query. [#4639](https://github.com/yandex/ClickHouse/pull/4639) ([Ivan Kush](https://github.com/IvanKush))
* Paralellize processing of parts of non-replicated MergeTree tables in ALTER MODIFY query. [#4639](https://github.com/yandex/ClickHouse/pull/4639) ([Ivan Kush](https://github.com/IvanKush))
* Optimizations in regular expressions extraction. [#5193](https://github.com/yandex/ClickHouse/pull/5193) [#5191](https://github.com/yandex/ClickHouse/pull/5191) ([Danila Kutenin](https://github.com/danlark1))
* Do not add right join key column to join result if it's used only in join on section. [#5260](https://github.com/yandex/ClickHouse/pull/5260) ([Artem Zuikov](https://github.com/4ertus2))
* Freeze the Kafka buffer after first empty response. It avoids multiple invokations of `ReadBuffer::next()` for empty result in some row-parsing streams. [#5283](https://github.com/yandex/ClickHouse/pull/5283) ([Ivan](https://github.com/abyss7))

View File

@ -5,7 +5,6 @@ set(CLICKHOUSE_ODBC_BRIDGE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/IdentifierQuoteHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MainHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBlockInputStream.cpp
${CMAKE_CURRENT_SOURCE_DIR}/odbc-bridge.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBridge.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PingHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/validateODBCConnectionString.cpp

View File

@ -24,6 +24,12 @@ template <typename Value, bool FloatReturn> using FuncQuantilesDeterministic = A
template <typename Value, bool _> using FuncQuantileExact = AggregateFunctionQuantile<Value, QuantileExact<Value>, NameQuantileExact, false, void, false>;
template <typename Value, bool _> using FuncQuantilesExact = AggregateFunctionQuantile<Value, QuantileExact<Value>, NameQuantilesExact, false, void, true>;
template <typename Value, bool _> using FuncQuantileExactExclusive = AggregateFunctionQuantile<Value, QuantileExactExclusive<Value>, NameQuantileExactExclusive, false, Float64, false>;
template <typename Value, bool _> using FuncQuantilesExactExclusive = AggregateFunctionQuantile<Value, QuantileExactExclusive<Value>, NameQuantilesExactExclusive, false, Float64, true>;
template <typename Value, bool _> using FuncQuantileExactInclusive = AggregateFunctionQuantile<Value, QuantileExactInclusive<Value>, NameQuantileExactInclusive, false, Float64, false>;
template <typename Value, bool _> using FuncQuantilesExactInclusive = AggregateFunctionQuantile<Value, QuantileExactInclusive<Value>, NameQuantilesExactInclusive, false, Float64, true>;
template <typename Value, bool _> using FuncQuantileExactWeighted = AggregateFunctionQuantile<Value, QuantileExactWeighted<Value>, NameQuantileExactWeighted, true, void, false>;
template <typename Value, bool _> using FuncQuantilesExactWeighted = AggregateFunctionQuantile<Value, QuantileExactWeighted<Value>, NameQuantilesExactWeighted, true, void, true>;
@ -92,6 +98,12 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
factory.registerFunction(NameQuantileExact::name, createAggregateFunctionQuantile<FuncQuantileExact>);
factory.registerFunction(NameQuantilesExact::name, createAggregateFunctionQuantile<FuncQuantilesExact>);
factory.registerFunction(NameQuantileExactExclusive::name, createAggregateFunctionQuantile<FuncQuantileExactExclusive>);
factory.registerFunction(NameQuantilesExactExclusive::name, createAggregateFunctionQuantile<FuncQuantilesExactExclusive>);
factory.registerFunction(NameQuantileExactInclusive::name, createAggregateFunctionQuantile<FuncQuantileExactInclusive>);
factory.registerFunction(NameQuantilesExactInclusive::name, createAggregateFunctionQuantile<FuncQuantilesExactInclusive>);
factory.registerFunction(NameQuantileExactWeighted::name, createAggregateFunctionQuantile<FuncQuantileExactWeighted>);
factory.registerFunction(NameQuantilesExactWeighted::name, createAggregateFunctionQuantile<FuncQuantilesExactWeighted>);

View File

@ -199,8 +199,15 @@ struct NameQuantileDeterministic { static constexpr auto name = "quantileDetermi
struct NameQuantilesDeterministic { static constexpr auto name = "quantilesDeterministic"; };
struct NameQuantileExact { static constexpr auto name = "quantileExact"; };
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExact { static constexpr auto name = "quantilesExact"; };
struct NameQuantileExactExclusive { static constexpr auto name = "quantileExactExclusive"; };
struct NameQuantilesExactExclusive { static constexpr auto name = "quantilesExactExclusive"; };
struct NameQuantileExactInclusive { static constexpr auto name = "quantileExactInclusive"; };
struct NameQuantilesExactInclusive { static constexpr auto name = "quantilesExactInclusive"; };
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExactWeighted { static constexpr auto name = "quantilesExactWeighted"; };
struct NameQuantileTiming { static constexpr auto name = "quantileTiming"; };

View File

@ -17,8 +17,8 @@ namespace
template <template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & arguments, const Array & params)
{
if (params.size() != 1)
throw Exception{"Aggregate function " + name + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
if (params.size() < 1)
throw Exception{"Aggregate function " + name + " requires at least one parameter: <window>, [option, [option, ...]]", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
if (arguments.size() < 2)
throw Exception("Aggregate function " + name + " requires one timestamp argument and at least one event condition.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -139,6 +139,7 @@ class AggregateFunctionWindowFunnel final
private:
UInt64 window;
UInt8 events_size;
UInt8 strict;
// Loop through the entire events_list, update the event timestamp value
@ -165,6 +166,10 @@ private:
if (event_idx == 0)
events_timestamp[0] = timestamp;
else if (strict && events_timestamp[event_idx] >= 0)
{
return event_idx + 1;
}
else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window)
{
events_timestamp[event_idx] = events_timestamp[event_idx - 1];
@ -191,8 +196,17 @@ public:
{
events_size = arguments.size() - 1;
window = params.at(0).safeGet<UInt64>();
}
strict = 0;
for (size_t i = 1; i < params.size(); ++i)
{
String option = params.at(i).safeGet<String>();
if (option.compare("strict") == 0)
strict = 1;
else
throw Exception{"Aggregate function " + getName() + " doesn't support a parameter: " + option, ErrorCodes::BAD_ARGUMENTS};
}
}
DataTypePtr getReturnType() const override
{

View File

@ -14,6 +14,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
}
/** Calculates quantile by collecting all values into array
@ -106,16 +107,134 @@ struct QuantileExact
result[i] = Value();
}
}
};
/// The same, but in the case of an empty state, NaN is returned.
Float64 getFloat(Float64) const
/// QuantileExactExclusive is equivalent to Excel PERCENTILE.EXC, R-6, SAS-4, SciPy-(0,0)
template <typename Value>
struct QuantileExactExclusive : public QuantileExact<Value>
{
throw Exception("Method getFloat is not implemented for QuantileExact", ErrorCodes::NOT_IMPLEMENTED);
using QuantileExact<Value>::array;
/// Get the value of the `level` quantile. The level must be between 0 and 1 excluding bounds.
Float64 getFloat(Float64 level)
{
if (!array.empty())
{
if (level == 0. || level == 1.)
throw Exception("QuantileExactExclusive cannot interpolate for the percentiles 1 and 0", ErrorCodes::BAD_ARGUMENTS);
Float64 h = level * (array.size() + 1);
auto n = static_cast<size_t>(h);
if (n >= array.size())
return array[array.size() - 1];
else if (n < 1)
return array[0];
std::nth_element(array.begin(), array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
return array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
}
void getManyFloat(const Float64 *, const size_t *, size_t, Float64 *) const
return std::numeric_limits<Float64>::quiet_NaN();
}
void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
{
throw Exception("Method getManyFloat is not implemented for QuantileExact", ErrorCodes::NOT_IMPLEMENTED);
if (!array.empty())
{
size_t prev_n = 0;
for (size_t i = 0; i < size; ++i)
{
auto level = levels[indices[i]];
if (level == 0. || level == 1.)
throw Exception("QuantileExactExclusive cannot interpolate for the percentiles 1 and 0", ErrorCodes::BAD_ARGUMENTS);
Float64 h = level * (array.size() + 1);
auto n = static_cast<size_t>(h);
if (n >= array.size())
result[indices[i]] = array[array.size() - 1];
else if (n < 1)
result[indices[i]] = array[0];
else
{
std::nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
result[indices[i]] = array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
prev_n = n - 1;
}
}
}
else
{
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<Float64>::quiet_NaN();
}
}
};
/// QuantileExactInclusive is equivalent to Excel PERCENTILE and PERCENTILE.INC, R-7, SciPy-(1,1)
template <typename Value>
struct QuantileExactInclusive : public QuantileExact<Value>
{
using QuantileExact<Value>::array;
/// Get the value of the `level` quantile. The level must be between 0 and 1 including bounds.
Float64 getFloat(Float64 level)
{
if (!array.empty())
{
Float64 h = level * (array.size() - 1) + 1;
auto n = static_cast<size_t>(h);
if (n >= array.size())
return array[array.size() - 1];
else if (n < 1)
return array[0];
std::nth_element(array.begin(), array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
return array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
}
return std::numeric_limits<Float64>::quiet_NaN();
}
void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
{
if (!array.empty())
{
size_t prev_n = 0;
for (size_t i = 0; i < size; ++i)
{
auto level = levels[indices[i]];
Float64 h = level * (array.size() - 1) + 1;
auto n = static_cast<size_t>(h);
if (n >= array.size())
result[indices[i]] = array[array.size() - 1];
else if (n < 1)
result[indices[i]] = array[0];
else
{
std::nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
result[indices[i]] = array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
prev_n = n - 1;
}
}
}
else
{
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<Float64>::quiet_NaN();
}
}
};

View File

@ -443,6 +443,7 @@ namespace ErrorCodes
extern const int INSECURE_PATH = 466;
extern const int CANNOT_PARSE_BOOL = 467;
extern const int CANNOT_PTHREAD_ATTR = 468;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW = 469;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -1,7 +1,8 @@
#pragma once
#include <tuple>
#include <sstream>
#include <iomanip>
#include <city.h>
#include <Core/Types.h>
@ -33,6 +34,13 @@ struct UInt128
auto tuple() const { return std::tie(high, low); }
String toHexString() const
{
std::ostringstream os;
os << std::setw(16) << std::setfill('0') << std::hex << high << low;
return String(os.str());
}
bool inline operator== (const UInt128 rhs) const { return tuple() == rhs.tuple(); }
bool inline operator!= (const UInt128 rhs) const { return tuple() != rhs.tuple(); }
bool inline operator< (const UInt128 rhs) const { return tuple() < rhs.tuple(); }

View File

@ -144,7 +144,8 @@ private:
using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
using BlocksPtr = std::shared_ptr<Blocks>;
using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
/// Compare number of columns, data types, column types, column names, and values of constant columns.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);

View File

@ -32,7 +32,11 @@
*/
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC 5
#define DEFAULT_TEMPORARY_LIVE_CHANNEL_TIMEOUT_SEC 15
#define DEFAULT_ALTER_LIVE_CHANNEL_WAIT_MS 10000
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
#define DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC 15
#define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 1024
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
/// each period reduces the error counter by 2 times

View File

@ -99,7 +99,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, parallel_replicas_count, 0, "") \
M(SettingUInt64, parallel_replica_offset, 0, "") \
\
M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \
M(SettingBool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.") \
\
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \
@ -212,7 +212,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \
M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite.") \
M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.") \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
@ -345,6 +345,12 @@ struct Settings : public SettingsCollection<Settings>
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13") \
\
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
M(SettingSeconds, temporary_live_channel_timeout, DEFAULT_TEMPORARY_LIVE_CHANNEL_TIMEOUT_SEC, "Timeout after which temporary live channel is deleted.") \
M(SettingMilliseconds, alter_channel_wait_ms, DEFAULT_ALTER_LIVE_CHANNEL_WAIT_MS, "The wait time for alter channel request.") \
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -0,0 +1,51 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A stream of blocks from a shared vector of blocks
*/
class BlocksBlockInputStream : public IBlockInputStream
{
public:
/// Acquires shared ownership of the blocks vector
BlocksBlockInputStream(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header_)
: blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()), header(std::move(header_)) {}
String getName() const override { return "Blocks"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override
{
if (it == end)
return Block();
Block res = *it;
++it;
return res;
}
private:
BlocksPtr blocks;
Blocks::iterator it;
const Blocks::iterator end;
Block header;
};
}

View File

@ -0,0 +1,222 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
namespace DB
{
/** Implements LIVE VIEW table WATCH input stream.
* Keeps stream alive by outputing blocks with no rows
* based on period specified by the heartbeat interval.
*/
class LiveViewBlockInputStream : public IBlockInputStream
{
using NonBlockingResult = std::pair<Block, bool>;
public:
~LiveViewBlockInputStream() override
{
/// Start storage no users thread
/// if we are the last active user
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
storage->startNoUsersThread(temporary_live_view_timeout_sec);
}
LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
std::shared_ptr<BlocksPtr> blocks_ptr_,
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
std::shared_ptr<bool> active_ptr_,
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_,
const UInt64 temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
{
/// grab active pointer
active = active_ptr.lock();
}
String getName() const override { return "LiveViewBlockInputStream"; }
void cancel(bool kill) override
{
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(storage->mutex);
storage->condition.broadcast();
}
Block getHeader() const override { return storage->getHeader(); }
void refresh()
{
if (active && blocks && it == end)
it = blocks->begin();
}
void suspend()
{
active.reset();
}
void resume()
{
active = active_ptr.lock();
{
if (!blocks || blocks.get() != (*blocks_ptr).get())
blocks = (*blocks_ptr);
}
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
NonBlockingResult tryRead()
{
return tryRead_(false);
}
protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
}
/** tryRead method attempts to read a block in either blocking
* or non-blocking mode. If blocking is set to false
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
{
Block res;
if (has_limit && num_updates == static_cast<Int64>(limit))
{
return { Block(), true };
}
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
if (isCancelled() || storage->is_dropped)
{
return { Block(), true };
}
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
/// and there are new blocks availble then get them
if (blocks.get() != (*blocks_ptr).get())
{
blocks = (*blocks_ptr);
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
/// No new blocks available wait for new ones
else
{
if (!blocking)
{
return { Block(), false };
}
if (!end_of_blocks)
{
end_of_blocks = true;
return { getHeader(), true };
}
while (true)
{
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec)) / 1000);
if (isCancelled() || storage->is_dropped)
{
return { Block(), true };
}
if (signaled)
{
break;
}
else
{
// heartbeat
last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
return { getHeader(), true };
}
}
}
}
return tryRead_(blocking);
}
res = *it;
++it;
if (it == end)
{
end_of_blocks = false;
num_updates += 1;
}
last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
return { res, true };
}
private:
std::shared_ptr<StorageLiveView> storage;
std::shared_ptr<BlocksPtr> blocks_ptr;
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
std::weak_ptr<bool> active_ptr;
std::shared_ptr<bool> active;
BlocksPtr blocks;
BlocksMetadataPtr blocks_metadata;
Blocks::iterator it;
Blocks::iterator end;
Blocks::iterator begin;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;
bool end_of_blocks = false;
UInt64 heartbeat_interval_usec;
UInt64 temporary_live_view_timeout_sec;
UInt64 last_event_timestamp_usec = 0;
Poco::Timestamp timestamp;
};
}

View File

@ -0,0 +1,243 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <limits>
#include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Condition.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageLiveView.h>
namespace DB
{
/** Implements LIVE VIEW table WATCH EVENTS input stream.
* Keeps stream alive by outputing blocks with no rows
* based on period specified by the heartbeat interval.
*/
class LiveViewEventsBlockInputStream : public IBlockInputStream
{
using NonBlockingResult = std::pair<Block, bool>;
public:
~LiveViewEventsBlockInputStream() override
{
/// Start storage no users thread
/// if we are the last active user
if (!storage->is_dropped && blocks_ptr.use_count() < 3)
storage->startNoUsersThread(temporary_live_view_timeout_sec);
}
/// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
/// and LIMIT 0 just returns data without waiting for any updates
LiveViewEventsBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
std::shared_ptr<BlocksPtr> blocks_ptr_,
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
std::shared_ptr<bool> active_ptr_,
const bool has_limit_, const UInt64 limit_,
const UInt64 heartbeat_interval_sec_,
const UInt64 temporary_live_view_timeout_sec_)
: storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)), blocks_metadata_ptr(std::move(blocks_metadata_ptr_)), active_ptr(std::move(active_ptr_)), has_limit(has_limit_), limit(limit_), heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000), temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
{
/// grab active pointer
active = active_ptr.lock();
}
String getName() const override { return "LiveViewEventsBlockInputStream"; }
void cancel(bool kill) override
{
if (isCancelled() || storage->is_dropped)
return;
IBlockInputStream::cancel(kill);
Poco::FastMutex::ScopedLock lock(storage->mutex);
storage->condition.broadcast();
}
Block getHeader() const override
{
return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "version")};
}
void refresh()
{
if (active && blocks && it == end)
it = blocks->begin();
}
void suspend()
{
active.reset();
}
void resume()
{
active = active_ptr.lock();
{
if (!blocks || blocks.get() != (*blocks_ptr).get())
{
blocks = (*blocks_ptr);
blocks_metadata = (*blocks_metadata_ptr);
}
}
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
NonBlockingResult tryRead()
{
return tryRead_(false);
}
Block getEventBlock()
{
Block res{
ColumnWithTypeAndName(
DataTypeUInt64().createColumnConst(1, blocks_metadata->version)->convertToFullColumnIfConst(),
std::make_shared<DataTypeUInt64>(),
"version")
};
return res;
}
protected:
Block readImpl() override
{
/// try reading
return tryRead_(true).first;
}
/** tryRead method attempts to read a block in either blocking
* or non-blocking mode. If blocking is set to false
* then method return empty block with flag set to false
* to indicate that method would block to get the next block.
*/
NonBlockingResult tryRead_(bool blocking)
{
if (has_limit && num_updates == static_cast<Int64>(limit))
{
return { Block(), true };
}
/// If blocks were never assigned get blocks
if (!blocks)
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
if (!active)
return { Block(), false };
blocks = (*blocks_ptr);
blocks_metadata = (*blocks_metadata_ptr);
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
if (isCancelled() || storage->is_dropped)
{
return { Block(), true };
}
if (it == end)
{
{
Poco::FastMutex::ScopedLock lock(storage->mutex);
if (!active)
return { Block(), false };
/// If we are done iterating over our blocks
/// and there are new blocks availble then get them
if (blocks.get() != (*blocks_ptr).get())
{
blocks = (*blocks_ptr);
blocks_metadata = (*blocks_metadata_ptr);
it = blocks->begin();
begin = blocks->begin();
end = blocks->end();
}
/// No new blocks available wait for new ones
else
{
if (!blocking)
{
return { Block(), false };
}
if (!end_of_blocks)
{
end_of_blocks = true;
return { getHeader(), true };
}
while (true)
{
UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
bool signaled = storage->condition.tryWait(storage->mutex, std::max(static_cast<UInt64>(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec)) / 1000);
if (isCancelled() || storage->is_dropped)
{
return { Block(), true };
}
if (signaled)
{
break;
}
else
{
// repeat the event block as a heartbeat
last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
return { getHeader(), true };
}
}
}
}
return tryRead_(blocking);
}
// move right to the end
it = end;
if (it == end)
{
end_of_blocks = false;
num_updates += 1;
}
last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
return { getEventBlock(), true };
}
private:
std::shared_ptr<StorageLiveView> storage;
std::shared_ptr<BlocksPtr> blocks_ptr;
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
std::weak_ptr<bool> active_ptr;
std::shared_ptr<bool> active;
BlocksPtr blocks;
BlocksMetadataPtr blocks_metadata;
Blocks::iterator it;
Blocks::iterator end;
Blocks::iterator begin;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;
bool end_of_blocks = false;
UInt64 heartbeat_interval_usec;
UInt64 temporary_live_view_timeout_sec;
UInt64 last_event_timestamp_usec = 0;
Poco::Timestamp timestamp;
};
}

View File

@ -12,6 +12,7 @@
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
#include <Storages/StorageLiveView.h>
namespace DB
{
@ -47,17 +48,30 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
for (const auto & database_table : dependencies)
{
auto dependent_table = context.getTable(database_table.first, database_table.second);
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
StoragePtr inner_table = materialized_view.getTargetTable();
auto query = materialized_view.getInnerQuery();
ASTPtr query;
BlockOutputStreamPtr out;
if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(dependent_table.get()))
{
StoragePtr inner_table = materialized_view->getTargetTable();
query = materialized_view->getInnerQuery();
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = inner_table->getDatabaseName();
insert->table = inner_table->getTableName();
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, *views_context);
BlockIO io = interpreter.execute();
views.emplace_back(ViewInfo{query, database_table.first, database_table.second, io.out});
out = io.out;
}
else if (dynamic_cast<const StorageLiveView *>(dependent_table.get()))
out = std::make_shared<PushingToViewsBlockOutputStream>(
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)});
}
}
@ -90,10 +104,18 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
*/
Nested::validateArraySizes(block);
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))
{
BlockOutputStreamPtr output_ = std::make_shared<LiveViewBlockOutputStream>(*live_view);
StorageLiveView::writeIntoLiveView(*live_view, block, context, output_);
}
else
{
if (output)
/// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended
/// with additional columns directly from storage and pass it to MVs instead of raw block.
output->write(block);
}
/// Don't process materialized views if this block is duplicate
if (replicated_output && replicated_output->lastBlockIsDuplicate())
@ -179,21 +201,30 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
auto & view = views[view_num];
try
{
BlockInputStreamPtr in;
if (view.query)
{
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
Context local_context = *views_context;
local_context.addViewSource(StorageValues::create(storage->getDatabaseName(), storage->getTableName(), storage->getColumns(), block));
local_context.addViewSource(
StorageValues::create(storage->getDatabaseName(), storage->getTableName(), storage->getColumns(),
block));
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
BlockInputStreamPtr in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
in = std::make_shared<SquashingBlockInputStream>(
in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes);
in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name);
}
else
in = std::make_shared<OneBlockInputStream>(block);
in->readPrefix();

View File

@ -5,7 +5,7 @@
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageLiveView.h>
namespace DB
{

View File

@ -28,6 +28,8 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
break;
to.write(block);
if (!block.rows())
to.flush();
progress(block);
}

View File

@ -41,7 +41,7 @@ String getTableDefinitionFromCreateQuery(const ASTPtr & query)
create.replace_view = false;
/// For views it is necessary to save the SELECT query itself, for the rest - on the contrary
if (!create.is_view && !create.is_materialized_view)
if (!create.is_view && !create.is_materialized_view && !create.is_live_view)
create.select = nullptr;
create.format = nullptr;

View File

@ -1,45 +0,0 @@
#include <Core/Block.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_)
: row_output(row_output_), header(header_) {}
void BlockOutputStreamFromRowOutputStream::write(const Block & block)
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
if (!first_row)
row_output->writeRowBetweenDelimiter();
first_row = false;
row_output->write(block, i);
}
}
void BlockOutputStreamFromRowOutputStream::setRowsBeforeLimit(size_t rows_before_limit)
{
row_output->setRowsBeforeLimit(rows_before_limit);
}
void BlockOutputStreamFromRowOutputStream::setTotals(const Block & totals)
{
row_output->setTotals(totals);
}
void BlockOutputStreamFromRowOutputStream::setExtremes(const Block & extremes)
{
row_output->setExtremes(extremes);
}
void BlockOutputStreamFromRowOutputStream::onProgress(const Progress & progress)
{
row_output->onProgress(progress);
}
}

View File

@ -1,38 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/IRowOutputStream.h>
namespace DB
{
/** Transforms a stream to write data by rows to a stream to write data by blocks.
* For example, to write a text dump.
*/
class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream
{
public:
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override { row_output->writePrefix(); }
void writeSuffix() override { row_output->writeSuffix(); }
void flush() override { row_output->flush(); }
void setRowsBeforeLimit(size_t rows_before_limit) override;
void setTotals(const Block & totals) override;
void setExtremes(const Block & extremes) override;
void onProgress(const Progress & progress) override;
String getContentType() const override { return row_output->getContentType(); }
private:
RowOutputStreamPtr row_output;
Block header;
bool first_row = true;
};
}

View File

@ -348,10 +348,9 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
const auto & current_column_type = data_types[table_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *istr.position() == delimiter;
const bool at_delimiter = !istr.eof() && *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
&& (istr.eof() || *istr.position() == '\n' || *istr.position() == '\r');
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ')
@ -514,10 +513,9 @@ void CSVRowInputStream::updateDiagnosticInfo()
bool CSVRowInputStream::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
{
const bool at_delimiter = *istr.position() == format_settings.csv.delimiter;
const bool at_delimiter = !istr.eof() || *istr.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
&& (istr.eof() || *istr.position() == '\n' || *istr.position() == '\r');
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))

View File

@ -100,7 +100,8 @@ BlockInputStreamPtr FormatFactory::getInput(
}
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{
if (name == "PrettyCompactMonoBlock")
{
@ -128,10 +129,10 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, context, format_settings), sample);
output_getter(buf, sample, context, callback, format_settings), sample);
}
auto format = getOutputFormat(name, buf, sample, context);
auto format = getOutputFormat(name, buf, sample, context, callback);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
@ -165,7 +166,8 @@ InputFormatPtr FormatFactory::getInputFormat(
}
OutputFormatPtr FormatFactory::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
@ -177,7 +179,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(const String & name, WriteBuffer
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return output_getter(buf, sample, context, format_settings);
return output_getter(buf, sample, context, callback, format_settings);
}
@ -250,6 +252,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
@ -266,6 +269,8 @@ FormatFactory::FormatFactory()
registerInputFormatTabSeparated(*this);
registerInputFormatCSV(*this);
registerOutputFormatProcessorJSONEachRowWithProgress(*this);
registerInputFormatProcessorNative(*this);
registerOutputFormatProcessorNative(*this);
registerInputFormatProcessorRowBinary(*this);

View File

@ -41,6 +41,10 @@ public:
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;
private:
using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf,
@ -55,6 +59,7 @@ private:
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;
using InputProcessorCreator = std::function<InputFormatPtr(
@ -68,6 +73,7 @@ private:
WriteBuffer & buf,
const Block & sample,
const Context & context,
WriteCallback callback,
const FormatSettings & settings)>;
struct Creators
@ -91,7 +97,7 @@ public:
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;
const Block & sample, const Context & context, WriteCallback callback = {}) const;
InputFormatPtr getInputFormat(
const String & name,
@ -102,8 +108,8 @@ public:
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;
OutputFormatPtr getOutputFormat(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);

View File

@ -27,6 +27,7 @@ void registerOutputFormatNative(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);

View File

@ -11,6 +11,7 @@ void registerOutputFormatNull(FormatFactory & factory)
WriteBuffer &,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
return std::make_shared<NullBlockOutputStream>(sample);

View File

@ -14,7 +14,6 @@
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
@ -47,7 +46,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, format_settings));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, []{}, format_settings));
copyData(block_input, *block_output);
}

View File

@ -11,7 +11,6 @@
#include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
@ -44,7 +43,7 @@ try
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, format_settings));
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [] {}, format_settings));
copyData(block_input, *block_output);
return 0;

View File

@ -11,7 +11,7 @@
namespace DB
{
/// Stores data in memory chunks, size of cunks are exponentially increasing during write
/// Stores data in memory chunks, size of chunks are exponentially increasing during write
/// Written data could be reread after write
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{

View File

@ -35,7 +35,7 @@ public:
*/
inline void next()
{
if (!offset())
if (!offset() && available())
return;
bytes += offset();

View File

@ -117,6 +117,9 @@ void WriteBufferValidUTF8::nextImpl()
memory[i] = p[i];
working_buffer = Buffer(&memory[cnt], memory.data() + memory.size());
/// Propagate next() to the output buffer
output_buffer.next();
}

View File

@ -400,6 +400,10 @@ public:
return count;
}
#if !__clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
#endif
bool hasCurrentlyLoadedObjects() const
{
std::lock_guard lock{mutex};
@ -408,6 +412,9 @@ public:
return true;
return false;
}
#if !__clang__
#pragma GCC diagnostic pop
#endif
/// Starts loading of a specified object.
void load(const String & name)

View File

@ -8,7 +8,9 @@
#include <Storages/AlterCommands.h>
#include <Storages/MutationCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/LiveViewCommands.h>
#include <Common/typeid_cast.h>
#include <Storages/StorageLiveView.h>
#include <algorithm>
@ -48,6 +50,7 @@ BlockIO InterpreterAlterQuery::execute()
AlterCommands alter_commands;
PartitionCommands partition_commands;
MutationCommands mutation_commands;
LiveViewCommands live_view_commands;
for (ASTAlterCommand * command_ast : alter.command_list->commands)
{
if (auto alter_command = AlterCommand::parse(command_ast))
@ -56,6 +59,8 @@ BlockIO InterpreterAlterQuery::execute()
partition_commands.emplace_back(std::move(*partition_command));
else if (auto mut_command = MutationCommand::parse(command_ast))
mutation_commands.emplace_back(std::move(*mut_command));
else if (auto live_view_command = LiveViewCommand::parse(command_ast))
live_view_commands.emplace_back(std::move(*live_view_command));
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
@ -72,6 +77,21 @@ BlockIO InterpreterAlterQuery::execute()
table->alterPartition(query_ptr, partition_commands, context);
}
if (!live_view_commands.empty())
{
live_view_commands.validate(*table);
for (const LiveViewCommand & command : live_view_commands)
{
auto live_view = std::dynamic_pointer_cast<StorageLiveView>(table);
switch (command.type)
{
case LiveViewCommand::REFRESH:
live_view->refresh(context);
break;
}
}
}
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());

View File

@ -442,7 +442,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
return;
}
if (create.temporary)
if (create.temporary && !create.is_live_view)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Memory";
@ -465,6 +465,11 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
"Cannot CREATE a table AS " + as_database_name + "." + as_table_name + ", it is a View",
ErrorCodes::INCORRECT_QUERY);
if (as_create.is_live_view)
throw Exception(
"Cannot CREATE a table AS " + as_database_name + "." + as_table_name + ", it is a Live View",
ErrorCodes::INCORRECT_QUERY);
create.set(create.storage, as_create.storage->ptr());
}
}
@ -482,7 +487,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
/// Temporary tables are created out of databases.
if (create.temporary && !create.database.empty())
if (create.temporary && !create.database.empty() && !create.is_live_view)
throw Exception("Temporary tables cannot be inside a database. You should not specify a database for a temporary table.",
ErrorCodes::BAD_DATABASE_FOR_TEMPORARY_TABLE);
@ -505,7 +510,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.to_database.empty())
create.to_database = current_database;
if (create.select && (create.is_view || create.is_materialized_view))
if (create.select && (create.is_view || create.is_materialized_view || create.is_live_view))
{
AddDefaultDatabaseVisitor visitor(current_database);
visitor.visit(*create.select);
@ -565,7 +570,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
String data_path;
DatabasePtr database;
if (!create.temporary)
if (!create.temporary || create.is_live_view)
{
database = context.getDatabase(database_name);
data_path = database->getDataPath();
@ -611,7 +616,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
false);
}
if (create.temporary)
if (create.temporary && !create.is_live_view)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
@ -630,7 +635,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_view && (!create.is_materialized_view || create.is_populate))
&& !create.is_view && !create.is_live_view && (!create.is_materialized_view || create.is_populate))
{
auto insert = std::make_shared<ASTInsertQuery>();

View File

@ -14,6 +14,7 @@
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/TablePropertiesQueriesASTs.h>
#include <Parsers/ASTWatchQuery.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
@ -35,6 +36,7 @@
#include <Interpreters/InterpreterShowTablesQuery.h>
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Parsers/ASTSystemQuery.h>
@ -173,6 +175,10 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
throwIfNoAccess(context);
return std::make_unique<InterpreterSystemQuery>(query, context);
}
else if (query->as<ASTWatchQuery>())
{
return std::make_unique<InterpreterWatchQuery>(query, context);
}
else
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
}

View File

@ -0,0 +1,108 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <Core/Settings.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTWatchQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_STORAGE;
extern const int UNKNOWN_TABLE;
extern const int TOO_MANY_COLUMNS;
}
BlockInputStreamPtr InterpreterWatchQuery::executeImpl()
{
return std::make_shared<OneBlockInputStream>(Block());
}
BlockIO InterpreterWatchQuery::execute()
{
BlockIO res;
const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr);
String database;
String table;
/// Get database
if (!query.database.empty())
database = query.database;
else
database = context.getCurrentDatabase();
/// Get table
table = query.table;
/// Get storage
storage = context.tryGetTable(database, table);
if (!storage)
throw Exception("Table " + backQuoteIfNeed(database) + "." +
backQuoteIfNeed(table) + " doesn't exist.",
ErrorCodes::UNKNOWN_TABLE);
/// List of columns to read to execute the query.
Names required_columns = storage->getColumns().getNamesOfPhysical();
/// Get context settings for this query
const Settings & settings = context.getSettingsRef();
/// Limitation on the number of columns to read.
if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + std::to_string(required_columns.size())
+ ", maximum: " + settings.max_columns_to_read.toString(),
ErrorCodes::TOO_MANY_COLUMNS);
size_t max_block_size = settings.max_block_size;
size_t max_streams = 1;
/// Define query info
SelectQueryInfo query_info;
query_info.query = query_ptr;
/// From stage
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
QueryProcessingStage::Enum to_stage = QueryProcessingStage::Complete;
/// Watch storage
streams = storage->watch(required_columns, query_info, context, from_stage, max_block_size, max_streams);
/// Constraints on the result, the quota on the result, and also callback for progress.
if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get()))
{
/// Constraints apply only to the final result.
if (to_stage == QueryProcessingStage::Complete)
{
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits.max_rows = settings.max_result_rows;
limits.size_limits.max_bytes = settings.max_result_bytes;
limits.size_limits.overflow_mode = settings.result_overflow_mode;
stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
res.in = streams[0];
return res;
}
}

View File

@ -0,0 +1,50 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/IInterpreter.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
namespace DB
{
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
using StoragePtr = std::shared_ptr<IStorage>;
class InterpreterWatchQuery : public IInterpreter
{
public:
InterpreterWatchQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
Context & context;
BlockInputStreamPtr executeImpl();
/// Table from where to read data, if not subquery.
StoragePtr storage;
/// Streams of read data
BlockInputStreams streams;
};
}

View File

@ -320,13 +320,7 @@ ColumnPtr Set::execute(const Block & block, bool negative) const
return res;
}
if (data_types.size() != num_key_columns)
{
std::stringstream message;
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
checkColumnsNumber(num_key_columns);
/// Remember the columns we will work with. Also check that the data types are correct.
ColumnRawPtrs key_columns;
@ -337,11 +331,7 @@ ColumnPtr Set::execute(const Block & block, bool negative) const
for (size_t i = 0; i < num_key_columns; ++i)
{
if (!removeNullable(data_types[i])->equals(*removeNullable(block.safeGetByPosition(i).type)))
throw Exception("Types of column " + toString(i + 1) + " in section IN don't match: "
+ data_types[i]->getName() + " on the right, " + block.safeGetByPosition(i).type->getName() +
" on the left.", ErrorCodes::TYPE_MISMATCH);
checkTypesEqual(i, block.safeGetByPosition(i).type);
materialized_columns.emplace_back(block.safeGetByPosition(i).column->convertToFullColumnIfConst());
key_columns.emplace_back() = materialized_columns.back().get();
}
@ -421,6 +411,24 @@ void Set::executeOrdinary(
}
}
void Set::checkColumnsNumber(size_t num_key_columns) const
{
if (data_types.size() != num_key_columns)
{
std::stringstream message;
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
}
void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const
{
if (!removeNullable(data_types[set_type_idx])->equals(*removeNullable(other_type)))
throw Exception("Types of column " + toString(set_type_idx + 1) + " in section IN don't match: "
+ data_types[set_type_idx]->getName() + " on the right, " + other_type->getName() +
" on the left.", ErrorCodes::TYPE_MISMATCH);
}
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
: indexes_mapping(std::move(index_mapping_))

View File

@ -70,6 +70,9 @@ public:
bool hasExplicitSetElements() const { return fill_set_elements; }
Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; }
void checkColumnsNumber(size_t num_key_columns) const;
void checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const;
private:
size_t keys_size = 0;
Sizes key_sizes;

41
dbms/src/NOTICE Normal file
View File

@ -0,0 +1,41 @@
--
The following notice shall be applied to the files listed below.
Some modifications Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Common/ErrorCodes.cpp
Common/UInt128.h
Core/Block.h
Core/Defines.h
Core/Settings.h
DataStreams/PushingToViewsBlockOutputStream.cpp
DataStreams/PushingToViewsBlockOutputStream.h
DataStreams/copyData.cpp
Databases/DatabasesCommon.cpp
IO/WriteBufferValidUTF8.cpp
Interpreters/InterpreterAlterQuery.cpp
Interpreters/InterpreterCreateQuery.cpp
Interpreters/InterpreterFactory.cpp
Parsers/ASTAlterQuery.cpp
Parsers/ASTAlterQuery.h
Parsers/ASTCreateQuery.cpp
Parsers/ASTCreateQuery.h
Parsers/ParserAlterQuery.cpp
Parsers/ParserAlterQuery.h
Parsers/ParserCreateQuery.cpp
Parsers/ParserCreateQuery.h
Parsers/ParserQueryWithOutput.cpp
Storages/IStorage.h
Storages/StorageFactory.cpp
Storages/registerStorages.cpp
--

View File

@ -45,6 +45,11 @@ ASTPtr ASTAlterCommand::clone() const
res->ttl = ttl->clone();
res->children.push_back(res->ttl);
}
if (values)
{
res->values = values->clone();
res->children.push_back(res->values);
}
return res;
}
@ -200,6 +205,46 @@ void ASTAlterCommand::formatImpl(
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY TTL " << (settings.hilite ? hilite_none : "");
ttl->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_VIEW_REFRESH)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "REFRESH " << (settings.hilite ? hilite_none : "");
}
else if (type == ASTAlterCommand::LIVE_CHANNEL_ADD)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ADD " << (settings.hilite ? hilite_none : "");
values->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_CHANNEL_DROP)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "DROP " << (settings.hilite ? hilite_none : "");
values->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_CHANNEL_MODIFY)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "MODIFY " << (settings.hilite ? hilite_none : "");
values->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_CHANNEL_SUSPEND)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "SUSPEND " << (settings.hilite ? hilite_none : "");
values->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_CHANNEL_RESUME)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "RESUME " << (settings.hilite ? hilite_none : "");
values->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_CHANNEL_REFRESH)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "REFRESH " << (settings.hilite ? hilite_none : "");
values->formatImpl(settings, state, frame);
}
else
throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}
@ -252,6 +297,11 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
if (is_live_view)
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER LIVE VIEW " << (settings.hilite ? hilite_none : "");
else if (is_live_channel)
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER LIVE CHANNEL " << (settings.hilite ? hilite_none : "");
else
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");
if (!table.empty())

View File

@ -15,6 +15,15 @@ namespace DB
* MODIFY COLUMN col_name type,
* DROP PARTITION partition,
* COMMENT_COLUMN col_name 'comment',
* ALTER LIVE VIEW [db.]name_type
* REFRESH
* ALTER CHANNEL [db.]name_type
* ADD live_view,...
* DROP live_view,...
* SUSPEND live_view,...
* RESUME live_view,...
* REFRESH live_view,...
* MODIFY live_view,...
*/
class ASTAlterCommand : public IAST
@ -44,6 +53,15 @@ public:
UPDATE,
NO_TYPE,
LIVE_VIEW_REFRESH,
LIVE_CHANNEL_ADD,
LIVE_CHANNEL_DROP,
LIVE_CHANNEL_SUSPEND,
LIVE_CHANNEL_RESUME,
LIVE_CHANNEL_REFRESH,
LIVE_CHANNEL_MODIFY
};
Type type = NO_TYPE;
@ -91,6 +109,10 @@ public:
/// For MODIFY TTL query
ASTPtr ttl;
/** In ALTER CHANNEL, ADD, DROP, SUSPEND, RESUME, REFRESH, MODIFY queries, the list of live views is stored here
*/
ASTPtr values;
bool detach = false; /// true for DETACH PARTITION
bool part = false; /// true for ATTACH PART
@ -147,6 +169,9 @@ protected:
class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCluster
{
public:
bool is_live_view{false}; /// true for ALTER LIVE VIEW
bool is_live_channel{false}; /// true for ALTER LIVE CHANNEL
ASTAlterCommandList * command_list = nullptr;
String getID(char) const override;

View File

@ -173,6 +173,8 @@ ASTPtr ASTCreateQuery::clone() const
res->set(res->storage, storage->clone());
if (select)
res->set(res->select, select->clone());
if (tables)
res->set(res->tables, tables->clone());
cloneOutputOptions(*res);
@ -204,6 +206,11 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
what = "VIEW";
if (is_materialized_view)
what = "MATERIALIZED VIEW";
if (is_live_view)
what = "LIVE VIEW";
if (is_live_channel)
what = "LIVE CHANNEL";
settings.ostr
<< (settings.hilite ? hilite_keyword : "")
@ -257,6 +264,12 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS" << settings.nl_or_ws << (settings.hilite ? hilite_none : "");
select->formatImpl(settings, state, frame);
}
if (tables)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH " << (settings.hilite ? hilite_none : "");
tables->formatImpl(settings, state, frame);
}
}
}

View File

@ -55,9 +55,12 @@ public:
bool if_not_exists{false};
bool is_view{false};
bool is_materialized_view{false};
bool is_live_view{false};
bool is_live_channel{false};
bool is_populate{false};
bool replace_view{false}; /// CREATE OR REPLACE VIEW
ASTColumns * columns_list = nullptr;
ASTExpressionList *tables = nullptr;
String to_database; /// For CREATE MATERIALIZED VIEW mv TO table.
String to_table;
ASTStorage * storage = nullptr;

View File

@ -0,0 +1,59 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Parsers/ASTQueryWithTableAndOutput.h>
namespace DB
{
class ASTWatchQuery : public ASTQueryWithTableAndOutput
{
public:
ASTPtr limit_length;
bool is_watch_events;
ASTWatchQuery() = default;
String getID(char) const override { return "WatchQuery_" + database + "_" + table; }
ASTPtr clone() const override
{
std::shared_ptr<ASTWatchQuery> res = std::make_shared<ASTWatchQuery>(*this);
res->children.clear();
cloneOutputOptions(*res);
return res;
}
protected:
void formatQueryImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override
{
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
s.ostr << (s.hilite ? hilite_keyword : "") << "WATCH" << " " << (s.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
if (is_watch_events)
{
s.ostr << " " << (s.hilite ? hilite_keyword : "") << "EVENTS" << (s.hilite ? hilite_none : "");
}
if (limit_length)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : "");
limit_length->formatImpl(s, state, frame);
}
}
};
}

View File

@ -34,6 +34,13 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_clear_index("CLEAR INDEX");
ParserKeyword s_materialize_index("MATERIALIZE INDEX");
ParserKeyword s_add("ADD");
ParserKeyword s_drop("DROP");
ParserKeyword s_suspend("SUSPEND");
ParserKeyword s_resume("RESUME");
ParserKeyword s_refresh("REFRESH");
ParserKeyword s_modify("MODIFY");
ParserKeyword s_attach_partition("ATTACH PARTITION");
ParserKeyword s_detach_partition("DETACH PARTITION");
ParserKeyword s_drop_partition("DROP PARTITION");
@ -65,7 +72,66 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserList parser_assignment_list(
std::make_unique<ParserAssignment>(), std::make_unique<ParserToken>(TokenType::Comma),
/* allow_empty = */ false);
ParserNameList values_p;
if (is_live_view)
{
if (s_refresh.ignore(pos, expected))
{
command->type = ASTAlterCommand::LIVE_VIEW_REFRESH;
}
else
return false;
}
else if (is_live_channel)
{
if (s_add.ignore(pos, expected))
{
if (!values_p.parse(pos, command->values, expected))
return false;
command->type = ASTAlterCommand::LIVE_CHANNEL_ADD;
}
else if (s_drop.ignore(pos, expected))
{
if (!values_p.parse(pos, command->values, expected))
return false;
command->type = ASTAlterCommand::LIVE_CHANNEL_DROP;
}
else if (s_suspend.ignore(pos, expected))
{
if (!values_p.parse(pos, command->values, expected))
return false;
command->type = ASTAlterCommand::LIVE_CHANNEL_SUSPEND;
}
else if (s_resume.ignore(pos, expected))
{
if (!values_p.parse(pos, command->values, expected))
return false;
command->type = ASTAlterCommand::LIVE_CHANNEL_RESUME;
}
else if (s_refresh.ignore(pos, expected))
{
if (!values_p.parse(pos, command->values, expected))
return false;
command->type = ASTAlterCommand::LIVE_CHANNEL_REFRESH;
}
else if (s_modify.ignore(pos, expected))
{
if (!values_p.parse(pos, command->values, expected))
return false;
command->type = ASTAlterCommand::LIVE_CHANNEL_MODIFY;
}
else
return false;
}
else
{
if (s_add_column.ignore(pos, expected))
{
if (s_if_not_exists.ignore(pos, expected))
@ -327,6 +393,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
}
else
return false;
}
if (command->col_decl)
command->children.push_back(command->col_decl);
@ -340,6 +407,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->children.push_back(command->predicate);
if (command->update_assignments)
command->children.push_back(command->update_assignments);
if (command->values)
command->children.push_back(command->values);
if (command->comment)
command->children.push_back(command->comment);
if (command->ttl)
@ -355,7 +424,7 @@ bool ParserAlterCommandList::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
node = command_list;
ParserToken s_comma(TokenType::Comma);
ParserAlterCommand p_command;
ParserAlterCommand p_command(is_live_view, is_live_channel);
do
{
@ -404,8 +473,30 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
node = query;
ParserKeyword s_alter_table("ALTER TABLE");
ParserKeyword s_alter_live_view("ALTER LIVE VIEW");
ParserKeyword s_alter_live_channel("ALTER LIVE CHANNEL");
bool is_live_view = false;
bool is_live_channel = false;
if (!s_alter_table.ignore(pos, expected))
{
if (!s_alter_live_view.ignore(pos, expected))
{
if (!s_alter_live_channel.ignore(pos, expected))
return false;
else
is_live_channel = true;
}
else
is_live_view = true;
}
if (is_live_view)
query->is_live_view = true;
if (is_live_channel)
query->is_live_channel = true;
if (!parseDatabaseAndTableName(pos, expected, query->database, query->table))
return false;
@ -418,7 +509,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
query->cluster = cluster_str;
ParserAlterCommandList p_command_list;
ParserAlterCommandList p_command_list(is_live_view, is_live_channel);
ASTPtr command_list;
if (!p_command_list.parse(pos, command_list, expected))
return false;

View File

@ -19,6 +19,15 @@ namespace DB
* [FREEZE [PARTITION] [WITH NAME name]]
* [DELETE WHERE ...]
* [UPDATE col_name = expr, ... WHERE ...]
* ALTER LIVE VIEW [db.name]
* [REFRESH]
* ALTER LIVE CHANNEL [db.name] [ON CLUSTER cluster]
* [ADD live_view, ...]
* [DROP live_view, ...]
* [SUSPEND live_view, ...]
* [RESUME live_view, ...]
* [REFRESH live_view, ...]
* [MODIFY live_view, ...]
*/
class ParserAlterQuery : public IParserBase
@ -34,6 +43,12 @@ class ParserAlterCommandList : public IParserBase
protected:
const char * getName() const { return "a list of ALTER commands"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
public:
bool is_live_view;
bool is_live_channel;
ParserAlterCommandList(bool is_live_view_ = false, bool is_live_channel_ = false) : is_live_view(is_live_view_), is_live_channel(is_live_channel_) {}
};
@ -42,6 +57,12 @@ class ParserAlterCommand : public IParserBase
protected:
const char * getName() const { return "ALTER command"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
public:
bool is_live_view;
bool is_live_channel;
ParserAlterCommand(bool is_live_view_ = false, bool is_live_channel_ = false) : is_live_view(is_live_view_), is_live_channel(is_live_channel_) {}
};

View File

@ -94,6 +94,12 @@ bool ParserColumnDeclarationList::parseImpl(Pos & pos, ASTPtr & node, Expected &
.parse(pos, node, expected);
}
bool ParserNameList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
return ParserList(std::make_unique<ParserCompoundIdentifier>(), std::make_unique<ParserToken>(TokenType::Comma), false)
.parse(pos, node, expected);
}
bool ParserIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_type("TYPE");
@ -309,7 +315,10 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_as("AS");
ParserKeyword s_view("VIEW");
ParserKeyword s_with("WITH");
ParserKeyword s_materialized("MATERIALIZED");
ParserKeyword s_live("LIVE");
ParserKeyword s_channel("CHANNEL");
ParserKeyword s_populate("POPULATE");
ParserKeyword s_or_replace("OR REPLACE");
ParserToken s_dot(TokenType::Dot);
@ -320,6 +329,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserColumnsOrIndicesDeclarationList columns_or_indices_p;
ParserSelectWithUnionQuery select_p;
ParserFunction table_function_p;
ParserNameList names_p;
ASTPtr database;
ASTPtr table;
@ -331,11 +341,15 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr as_table;
ASTPtr as_table_function;
ASTPtr select;
ASTPtr tables;
String cluster_str;
bool attach = false;
bool if_not_exists = false;
bool is_view = false;
bool is_materialized_view = false;
bool is_live_view = false;
bool is_live_channel = false;
bool is_populate = false;
bool is_temporary = false;
bool replace_view = false;
@ -431,6 +445,79 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
}
else if (s_live.ignore(pos, expected))
{
if (s_channel.ignore(pos, expected))
is_live_channel = true;
else if (s_view.ignore(pos, expected))
is_live_view = true;
else
return false;
if (s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!name_p.parse(pos, table, expected))
return false;
if (s_dot.ignore(pos, expected))
{
database = table;
if (!name_p.parse(pos, table, expected))
return false;
}
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
if (!is_live_channel)
{
// TO [db.]table
if (ParserKeyword{"TO"}.ignore(pos, expected))
{
if (!name_p.parse(pos, to_table, expected))
return false;
if (s_dot.ignore(pos, expected))
{
to_database = to_table;
if (!name_p.parse(pos, to_table, expected))
return false;
}
}
}
/// Optional - a list of columns can be specified. It must fully comply with SELECT.
if (s_lparen.ignore(pos, expected))
{
if (!columns_or_indices_p.parse(pos, columns_list, expected))
return false;
if (!s_rparen.ignore(pos, expected))
return false;
}
if (is_live_channel)
{
if (s_with.ignore(pos, expected))
{
if (!names_p.parse(pos, tables, expected))
return false;
}
}
else
{
/// AS SELECT ...
if (!s_as.ignore(pos, expected))
return false;
if (!select_p.parse(pos, select, expected))
return false;
}
}
else if (is_temporary)
return false;
else if (s_database.ignore(pos, expected))
@ -538,6 +625,8 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->if_not_exists = if_not_exists;
query->is_view = is_view;
query->is_materialized_view = is_materialized_view;
query->is_live_view = is_live_view;
query->is_live_channel = is_live_channel;
query->is_populate = is_populate;
query->temporary = is_temporary;
query->replace_view = replace_view;
@ -551,6 +640,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->set(query->columns_list, columns_list);
query->set(query->storage, storage);
query->set(query->tables, tables);
tryGetIdentifierNameInto(as_database, query->as_database);
tryGetIdentifierNameInto(as_table, query->as_table);

View File

@ -91,6 +91,14 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
/** List of table names. */
class ParserNameList : public IParserBase
{
protected:
const char * getName() const { return "name list"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
template <typename NameParser>
class IParserColumnDeclaration : public IParserBase
@ -300,7 +308,7 @@ protected:
* CREATE|ATTACH DATABASE db [ENGINE = engine]
*
* Or:
* CREATE [OR REPLACE]|ATTACH [MATERIALIZED] VIEW [IF NOT EXISTS] [db.]name [TO [db.]name] [ENGINE = engine] [POPULATE] AS SELECT ...
* CREATE[OR REPLACE]|ATTACH [[MATERIALIZED] VIEW] | [[TEMPORARY] LIVE [CHANNEL] | [VIEW]] [IF NOT EXISTS] [db.]name [TO [db.]name] [ENGINE = engine] [POPULATE] AS SELECT ...
*/
class ParserCreateQuery : public IParserBase
{

View File

@ -11,6 +11,7 @@
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserKillQueryQuery.h>
#include <Parsers/ParserOptimizeQuery.h>
#include <Parsers/ParserWatchQuery.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTExplainQuery.h>
@ -32,6 +33,7 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ParserCheckQuery check_p;
ParserOptimizeQuery optimize_p;
ParserKillQueryQuery kill_query_p;
ParserWatchQuery watch_p;
ASTPtr query;
@ -57,7 +59,8 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|| drop_p.parse(pos, query, expected)
|| check_p.parse(pos, query, expected)
|| kill_query_p.parse(pos, query, expected)
|| optimize_p.parse(pos, query, expected);
|| optimize_p.parse(pos, query, expected)
|| watch_p.parse(pos, query, expected);
if (!parsed)
return false;

View File

@ -0,0 +1,77 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ParserWatchQuery.h>
#include <Parsers/ExpressionElementParsers.h>
namespace DB
{
bool ParserWatchQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_watch("WATCH");
ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
ParserKeyword s_events("EVENTS");
ParserKeyword s_limit("LIMIT");
ASTPtr database;
ASTPtr table;
auto query = std::make_shared<ASTWatchQuery>();
if (!s_watch.ignore(pos, expected))
{
return false;
}
if (!name_p.parse(pos, table, expected))
return false;
if (s_dot.ignore(pos, expected))
{
database = table;
if (!name_p.parse(pos, table, expected))
return false;
}
/// EVENTS
if (s_events.ignore(pos, expected))
{
query->is_watch_events = true;
}
/// LIMIT length
if (s_limit.ignore(pos, expected))
{
ParserNumber num;
if (!num.parse(pos, query->limit_length, expected))
return false;
}
if (database)
query->database = getIdentifierName(database);
if (table)
query->table = getIdentifierName(table);
node = query;
return true;
}
}

View File

@ -0,0 +1,30 @@
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Query like this:
* WATCH [db.]table EVENTS
*/
class ParserWatchQuery : public IParserBase
{
protected:
const char * getName() const { return "WATCH query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
}

View File

@ -20,6 +20,9 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
first_row = false;
write(columns, row);
if (write_single_row_callback)
write_single_row_callback();
}
}
@ -96,6 +99,3 @@ void IRowOutputFormat::writeTotals(const DB::Columns & columns, size_t row_num)
}
}

View File

@ -1,8 +1,10 @@
#pragma once
#include <string>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h>
#include <string>
namespace DB
{
@ -22,8 +24,8 @@ protected:
void finalize() override;
public:
IRowOutputFormat(const Block & header, WriteBuffer & out_)
: IOutputFormat(header, out_), types(header.getDataTypes())
IRowOutputFormat(const Block & header, WriteBuffer & out_, FormatFactory::WriteCallback callback)
: IOutputFormat(header, out_), types(header.getDataTypes()), write_single_row_callback(callback)
{
}
@ -57,6 +59,9 @@ private:
bool prefix_written = false;
bool suffix_written = false;
// Callback used to indicate that another row is written.
FormatFactory::WriteCallback write_single_row_callback;
void writePrefixIfNot()
{
if (!prefix_written)
@ -76,5 +81,3 @@ private:
};
}

View File

@ -9,8 +9,8 @@
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_)
: IRowOutputFormat(header, out_), with_names(with_names_), with_types(with_types_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header, out_, callback), with_names(with_names_), with_types(with_types_)
{
}
@ -53,18 +53,20 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, callback);
});
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, callback);
});
}

View File

@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback);
String getName() const override { return "BinaryRowOutputFormat"; }
@ -32,4 +32,3 @@ protected:
};
}

View File

@ -349,10 +349,9 @@ bool OPTIMIZE(1) CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumn
const auto & current_column_type = data_types[table_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *in.position() == delimiter;
const bool at_delimiter = !in.eof() && *in.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*in.position() == '\n' || *in.position() == '\r'
|| in.eof());
&& (in.eof() || *in.position() == '\n' || *in.position() == '\r');
auto & header = getPort().getHeader();
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
@ -516,10 +515,9 @@ void CSVRowInputFormat::updateDiagnosticInfo()
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
{
const bool at_delimiter = *in.position() == format_settings.csv.delimiter;
const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*in.position() == '\n' || *in.position() == '\r'
|| in.eof());
&& (in.eof() || *in.position() == '\n' || *in.position() == '\r');
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))

View File

@ -8,8 +8,8 @@ namespace DB
{
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), format_settings(format_settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -77,9 +77,10 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, callback, format_settings);
});
}
}

View File

@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; }
@ -45,4 +45,3 @@ protected:
};
}

View File

@ -8,8 +8,8 @@ namespace DB
{
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_, const Block & header, const FormatSettings & settings_)
: JSONRowOutputFormat(out_, header, settings_)
WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: JSONRowOutputFormat(out_, header, callback, settings_)
{
}
@ -81,9 +81,10 @@ void registerOutputFormatProcessorJSONCompact(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings);
});
}

View File

@ -16,7 +16,7 @@ struct FormatSettings;
class JSONCompactRowOutputFormat : public JSONRowOutputFormat
{
public:
JSONCompactRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_);
JSONCompactRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONCompactRowOutputFormat"; }

View File

@ -8,8 +8,8 @@ namespace DB
{
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_), settings(settings_)
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -57,9 +57,10 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, format_settings);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings);
});
}

View File

@ -15,7 +15,7 @@ namespace DB
class JSONEachRowRowOutputFormat : public IRowOutputFormat
{
public:
JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_);
JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; }
@ -29,12 +29,12 @@ protected:
void consumeTotals(Chunk) override {}
void consumeExtremes(Chunk) override {}
private:
size_t field_number = 0;
private:
Names fields;
FormatSettings settings;
};
}

View File

@ -0,0 +1,44 @@
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
void JSONEachRowWithProgressRowOutputFormat::writeRowStartDelimiter()
{
writeCString("{\"row\":{", out);
}
void JSONEachRowWithProgressRowOutputFormat::writeRowEndDelimiter()
{
writeCString("}}\n", out);
field_number = 0;
}
void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
writeCString("{\"progress\":", out);
progress.writeJSON(out);
writeCString("}\n", out);
}
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings);
});
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
namespace DB
{
class JSONEachRowWithProgressRowOutputFormat : public JSONEachRowRowOutputFormat
{
public:
using JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void onProgress(const Progress & value) override;
private:
Progress progress;
};
}

View File

@ -2,14 +2,13 @@
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_)
: IRowOutputFormat(header, out_), settings(settings_)
JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header, out_, callback), settings(settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -248,9 +247,10 @@ void registerOutputFormatProcessorJSON(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings);
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings);
});
}

View File

@ -16,7 +16,7 @@ namespace DB
class JSONRowOutputFormat : public IRowOutputFormat
{
public:
JSONRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_);
JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONRowOutputFormat"; }
@ -81,4 +81,3 @@ protected:
};
}

View File

@ -107,10 +107,12 @@ void MySQLOutputFormat::flush()
void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"MySQLWire", [](WriteBuffer & buf, const Block & sample, const Context & context, const FormatSettings & settings)
{
return std::make_shared<MySQLOutputFormat>(buf, sample, context, settings);
});
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
const Context & context,
FormatFactory::WriteCallback,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, context, settings); });
}
}

View File

@ -40,4 +40,3 @@ private:
};
}

View File

@ -161,6 +161,7 @@ void registerOutputFormatProcessorNative(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf);

View File

@ -22,6 +22,7 @@ void registerOutputFormatProcessorNull(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings &)
{
return std::make_shared<NullOutputFormat>(sample, buf);

View File

@ -107,7 +107,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const Context &, const FormatSettings & format_settings)
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const Context &, FormatFactory::WriteCallback, const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
});

View File

@ -70,6 +70,7 @@ void registerOutputFormatProcessorODBCDriver(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriverBlockOutputFormat>(buf, sample, format_settings);

View File

@ -423,7 +423,12 @@ void ParquetBlockOutputFormat::finalize()
void registerOutputFormatProcessorParquet(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Parquet", [](WriteBuffer & buf, const Block & sample, const Context & /*context*/, const FormatSettings & format_settings)
"Parquet",
[](WriteBuffer & buf,
const Block & sample,
const Context & /*context*/,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
/// TODO

View File

@ -261,6 +261,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
@ -270,6 +271,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;

View File

@ -134,6 +134,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings);
@ -143,6 +144,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;

View File

@ -103,6 +103,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
@ -112,6 +113,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;

View File

@ -23,8 +23,9 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema)
: IRowOutputFormat(header, out_)
: IRowOutputFormat(header, out_, callback)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{
@ -46,9 +47,14 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
"Protobuf",
[](WriteBuffer & buf,
const Block & header,
const Context & context,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, FormatSchemaInfo(context, "Protobuf"));
return std::make_shared<ProtobufRowOutputFormat>(buf, header, callback, FormatSchemaInfo(context, "Protobuf"));
});
}

View File

@ -35,6 +35,7 @@ public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema);
String getName() const override { return "ProtobufRowOutputFormat"; }

View File

@ -8,8 +8,8 @@
namespace DB
{
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, format_settings_)
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, callback, format_settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -46,9 +46,10 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<TSKVRowOutputFormat>(buf, sample, settings);
return std::make_shared<TSKVRowOutputFormat>(buf, sample, callback, settings);
});
}

View File

@ -14,7 +14,7 @@ namespace DB
class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat
{
public:
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings);
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings);
String getName() const override { return "TSKVRowOutputFormat"; }
@ -27,4 +27,3 @@ protected:
};
}

View File

@ -13,8 +13,16 @@ namespace DB
class TabSeparatedRawRowOutputFormat : public TabSeparatedRowOutputFormat
{
public:
TabSeparatedRawRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, format_settings_) {}
TabSeparatedRawRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, callback, format_settings_)
{
}
String getName() const override { return "TabSeparatedRawRowOutputFormat"; }
@ -25,4 +33,3 @@ public:
};
}

View File

@ -6,10 +6,14 @@
namespace DB
{
TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
WriteBuffer & out_,
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
}
@ -75,9 +79,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, callback, settings);
});
}
@ -87,9 +92,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, settings);
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, callback, settings);
});
}
@ -99,9 +105,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, callback, settings);
});
}
@ -111,9 +118,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, callback, settings);
});
}
}

View File

@ -18,7 +18,13 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output the next line header with the names of the types
*/
TabSeparatedRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_);
TabSeparatedRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings_);
String getName() const override { return "TabSeparatedRowOutputFormat"; }
@ -40,4 +46,3 @@ protected:
};
}

View File

@ -10,8 +10,8 @@ namespace DB
{
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_)
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
{
}
@ -47,9 +47,10 @@ void registerOutputFormatProcessorValues(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, settings);
return std::make_shared<ValuesRowOutputFormat>(buf, sample, callback, settings);
});
}

View File

@ -15,7 +15,7 @@ class WriteBuffer;
class ValuesRowOutputFormat : public IRowOutputFormat
{
public:
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "ValuesRowOutputFormat"; }
@ -30,4 +30,3 @@ private:
};
}

View File

@ -11,8 +11,8 @@ namespace DB
{
VerticalRowOutputFormat::VerticalRowOutputFormat(
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_)
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
@ -169,9 +169,10 @@ void registerOutputFormatProcessorVertical(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<VerticalRowOutputFormat>(buf, sample, settings);
return std::make_shared<VerticalRowOutputFormat>(buf, sample, callback, settings);
});
}

View File

@ -18,7 +18,7 @@ class Context;
class VerticalRowOutputFormat : public IRowOutputFormat
{
public:
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "VerticalRowOutputFormat"; }
@ -50,4 +50,3 @@ protected:
};
}

View File

@ -7,8 +7,8 @@
namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_)
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
{
auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -246,9 +246,10 @@ void registerOutputFormatProcessorXML(FormatFactory & factory)
WriteBuffer & buf,
const Block & sample,
const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, settings);
return std::make_shared<XMLRowOutputFormat>(buf, sample, callback, settings);
});
}

View File

@ -16,7 +16,7 @@ namespace DB
class XMLRowOutputFormat : public IRowOutputFormat
{
public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "XMLRowOutputFormat"; }
@ -75,4 +75,3 @@ protected:
};
}

View File

@ -76,7 +76,7 @@ public:
/// The name of the table.
virtual std::string getTableName() const = 0;
virtual std::string getDatabaseName() const = 0;
virtual std::string getDatabaseName() const { return {}; }
/// Returns true if the storage receives data from a remote server or servers.
virtual bool isRemote() const { return false; }
@ -102,7 +102,8 @@ public:
virtual ColumnSizeByName getColumnSizes() const { return {}; }
public: /// thread-unsafe part. lockStructure must be acquired
const ColumnsDescription & getColumns() const; /// returns combined set of columns
virtual const ColumnsDescription & getColumns() const; /// returns combined set of columns
virtual void setColumns(ColumnsDescription columns_); /// sets only real columns, possibly overwrites virtual ones.
const ColumnsDescription & getVirtuals() const;
const IndicesDescription & getIndices() const;
@ -132,7 +133,6 @@ public: /// thread-unsafe part. lockStructure must be acquired
void check(const Block & block, bool need_all = false) const;
protected: /// still thread-unsafe part.
void setColumns(ColumnsDescription columns_); /// sets only real columns, possibly overwrites virtual ones.
void setIndices(IndicesDescription indices_);
/// Returns whether the column is virtual - by default all columns are real.
@ -172,6 +172,36 @@ public:
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
/** Watch live changes to the table.
* Accepts a list of columns to read, as well as a description of the query,
* from which information can be extracted about how to retrieve data
* (indexes, locks, etc.)
* Returns a stream with which you can read data sequentially
* or multiple streams for parallel data reading.
* The `processed_stage` info is also written to what stage the request was processed.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
*
* context contains settings for one query.
* Usually Storage does not care about these settings, since they are used in the interpreter.
* But, for example, for distributed query processing, the settings are passed to the remote server.
*
* num_streams - a recommendation, how many streams to return,
* if the storage can return a different number of streams.
*
* It is guaranteed that the structure of the table will not change over the lifetime of the returned streams (that is, there will not be ALTER, RENAME and DROP).
*/
virtual BlockInputStreams watch(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
throw Exception("Method watch is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Read a set of columns from the table.
* Accepts a list of columns to read, as well as a description of the query,
* from which information can be extracted about how to retrieve data
@ -296,7 +326,7 @@ public:
return {};
}
bool is_dropped{false};
std::atomic<bool> is_dropped{false};
/// Does table support index for IN sections
virtual bool supportsIndexForIn() const { return false; }

View File

@ -33,7 +33,7 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
buffer->reset();
}
storage.pushBuffer(buffer);
storage.pushReadBuffer(buffer);
}
Block KafkaBlockInputStream::getHeader() const
@ -43,11 +43,12 @@ Block KafkaBlockInputStream::getHeader() const
void KafkaBlockInputStream::readPrefixImpl()
{
buffer = storage.tryClaimBuffer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
auto timeout = std::chrono::milliseconds(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
claimed = !!buffer;
if (!buffer)
buffer = storage.createBuffer();
return;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics());
@ -80,6 +81,9 @@ void KafkaBlockInputStream::readPrefixImpl()
Block KafkaBlockInputStream::readImpl()
{
if (!buffer)
return Block();
Block block = children.back()->read();
if (!block)
return block;
@ -99,6 +103,9 @@ Block KafkaBlockInputStream::readImpl()
void KafkaBlockInputStream::readSuffixImpl()
{
if (!buffer)
return;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();
broken = false;

View File

@ -27,7 +27,7 @@ private:
Names column_names;
UInt64 max_block_size;
BufferPtr buffer;
ConsumerBufferPtr buffer;
MutableColumns virtual_columns;
bool broken = true, claimed = false;
};

View File

@ -0,0 +1,54 @@
#include "KafkaBlockOutputStream.h"
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern int CANNOT_CREATE_IO_BUFFER;
}
KafkaBlockOutputStream::KafkaBlockOutputStream(StorageKafka & storage_, const Context & context_) : storage(storage_), context(context_)
{
}
KafkaBlockOutputStream::~KafkaBlockOutputStream()
{
}
Block KafkaBlockOutputStream::getHeader() const
{
return storage.getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this]{ buffer->count_row(); });
}
void KafkaBlockOutputStream::write(const Block & block)
{
child->write(block);
}
void KafkaBlockOutputStream::writeSuffix()
{
child->writeSuffix();
flush();
}
void KafkaBlockOutputStream::flush()
{
if (buffer)
buffer->flush();
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
class KafkaBlockOutputStream : public IBlockOutputStream
{
public:
explicit KafkaBlockOutputStream(StorageKafka & storage_, const Context & context_);
~KafkaBlockOutputStream() override;
Block getHeader() const override;
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
void flush() override;
private:
StorageKafka & storage;
Context context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
};
}

View File

@ -10,7 +10,7 @@
namespace DB
{
using BufferPtr = std::shared_ptr<DelimitedReadBuffer>;
using ConsumerBufferPtr = std::shared_ptr<DelimitedReadBuffer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer

View File

@ -16,6 +16,7 @@
#include <Parsers/ASTLiteral.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaBlockInputStream.h>
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
@ -106,7 +107,7 @@ StorageKafka::StorageKafka(
, skip_broken(skip_broken_)
, intermediate_commit(intermediate_commit_)
{
task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); });
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate();
}
@ -140,14 +141,21 @@ BlockInputStreams StorageKafka::read(
}
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const Context & context)
{
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, context);
}
void StorageKafka::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
{
// Make buffer available
try
{
pushBuffer(createBuffer());
pushReadBuffer(createReadBuffer());
++num_created_consumers;
}
catch (const cppkafka::Exception &)
@ -169,7 +177,7 @@ void StorageKafka::shutdown()
// Close all consumers
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto buffer = claimBuffer();
auto buffer = popReadBuffer();
// FIXME: not sure if we really close consumers here, and if we really need to close them here.
}
@ -193,10 +201,70 @@ void StorageKafka::updateDependencies()
}
BufferPtr StorageKafka::createBuffer()
void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer)
{
std::lock_guard lock(mutex);
buffers.push_back(buffer);
semaphore.set();
}
ConsumerBufferPtr StorageKafka::popReadBuffer()
{
return popReadBuffer(std::chrono::milliseconds::zero());
}
ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
{
// Wait for the first free buffer
if (timeout == std::chrono::milliseconds::zero())
semaphore.wait();
else
{
if (!semaphore.tryWait(timeout.count()))
return nullptr;
}
// Take the first available buffer from the list
std::lock_guard lock(mutex);
auto buffer = buffers.back();
buffers.pop_back();
return buffer;
}
ProducerBufferPtr StorageKafka::createWriteBuffer()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// TODO: fill required settings
updateConfiguration(conf);
auto producer = std::make_shared<cppkafka::Producer>(conf);
const Settings & settings = global_context.getSettingsRef();
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<WriteBufferToKafkaProducer>(
producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout));
}
ConsumerBufferPtr StorageKafka::createReadBuffer()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
updateConfiguration(conf);
// Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration());
auto consumer = std::make_shared<cppkafka::Consumer>(conf);
// Limit the number of batched messages to allow early cancellations
const Settings & settings = global_context.getSettingsRef();
@ -209,61 +277,9 @@ BufferPtr StorageKafka::createBuffer()
std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit), row_delimiter);
}
BufferPtr StorageKafka::claimBuffer()
void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
{
return tryClaimBuffer(-1L);
}
BufferPtr StorageKafka::tryClaimBuffer(long wait_ms)
{
// Wait for the first free buffer
if (wait_ms >= 0)
{
if (!semaphore.tryWait(wait_ms))
return nullptr;
}
else
semaphore.wait();
// Take the first available buffer from the list
std::lock_guard lock(mutex);
auto buffer = buffers.back();
buffers.pop_back();
return buffer;
}
void StorageKafka::pushBuffer(BufferPtr buffer)
{
std::lock_guard lock(mutex);
buffers.push_back(buffer);
semaphore.set();
}
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// Ignore EOF messages
conf.set("enable.partition.eof", "false");
// for debug logs inside rdkafka
// conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX))
@ -276,8 +292,6 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration()
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name)
@ -307,7 +321,7 @@ bool StorageKafka::checkDependencies(const String & current_database_name, const
return true;
}
void StorageKafka::streamThread()
void StorageKafka::threadFunc()
{
try
{

View File

@ -5,11 +5,11 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Poco/Event.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
#include <cppkafka/cppkafka.h>
#include <mutex>
namespace DB
@ -36,14 +36,20 @@ public:
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(
const ASTPtr & query,
const Context & context
) override;
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override;
void updateDependencies() override;
BufferPtr createBuffer();
BufferPtr claimBuffer();
BufferPtr tryClaimBuffer(long wait_ms);
void pushBuffer(BufferPtr buf);
void pushReadBuffer(ConsumerBufferPtr buf);
ConsumerBufferPtr popReadBuffer();
ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
ProducerBufferPtr createWriteBuffer();
const auto & getTopics() const { return topics; }
const auto & getFormatName() const { return format_name; }
@ -84,7 +90,7 @@ private:
// Consumer list
Poco::Semaphore semaphore;
std::mutex mutex;
std::vector<BufferPtr> buffers; /// available buffers for Kafka consumers
std::vector<ConsumerBufferPtr> buffers; /// available buffers for Kafka consumers
size_t skip_broken;
@ -94,9 +100,12 @@ private:
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
cppkafka::Configuration createConsumerConfiguration();
ConsumerBufferPtr createReadBuffer();
void streamThread();
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void threadFunc();
bool streamToViews();
bool checkDependencies(const String & database_name, const String & table_name);
};

View File

@ -0,0 +1,90 @@
#include "WriteBufferToKafkaProducer.h"
namespace DB
{
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
ProducerPtr producer_,
const std::string & topic_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout)
: WriteBuffer(nullptr, 0)
, producer(producer_)
, topic(topic_)
, delim(delimiter)
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
, timeout(poll_timeout)
{
}
WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
{
assert(rows == 0 && chunks.empty());
}
void WriteBufferToKafkaProducer::count_row()
{
if (++rows % max_rows == 0)
{
std::string payload;
payload.reserve((chunks.size() - 1) * chunk_size + offset());
for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
payload.append(*i);
int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0;
payload.append(chunks.back(), 0, offset() - trunk_delim);
while (true)
{
try
{
producer->produce(cppkafka::MessageBuilder(topic).payload(payload));
}
catch (cppkafka::HandleException & e)
{
if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
producer->poll(timeout);
continue;
}
throw e;
}
break;
}
rows = 0;
chunks.clear();
set(nullptr, 0);
}
}
void WriteBufferToKafkaProducer::flush()
{
// For unknown reason we may hit some internal timeout when inserting for the first time.
while (true)
{
try
{
producer->flush(timeout);
}
catch (cppkafka::HandleException & e)
{
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
continue;
throw e;
}
break;
}
}
void WriteBufferToKafkaProducer::nextImpl()
{
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
}
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <cppkafka/cppkafka.h>
#include <list>
namespace DB
{
class WriteBufferToKafkaProducer;
using ProducerBufferPtr = std::shared_ptr<WriteBufferToKafkaProducer>;
using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
class WriteBufferToKafkaProducer : public WriteBuffer
{
public:
WriteBufferToKafkaProducer(
ProducerPtr producer_,
const std::string & topic_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout);
~WriteBufferToKafkaProducer() override;
void count_row();
void flush();
private:
void nextImpl() override;
ProducerPtr producer;
const std::string topic;
const std::optional<char> delim;
const size_t max_rows;
const size_t chunk_size;
const std::chrono::milliseconds timeout;
size_t rows = 0;
std::list<std::string> chunks;
};
}

Some files were not shown because too many files have changed in this diff Show More