Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
BayoNet 2019-08-20 15:08:35 +03:00
commit 37e683c2fe
93 changed files with 1445 additions and 337 deletions

View File

@ -350,7 +350,7 @@ It allows to set commit mode: after every batch of messages is handled, or after
* Renamed functions `leastSqr` to `simpleLinearRegression`, `LinearRegression` to `linearRegression`, `LogisticRegression` to `logisticRegression`. [#5391](https://github.com/yandex/ClickHouse/pull/5391) ([Nikolai Kochetov](https://github.com/KochetovNicolai)) * Renamed functions `leastSqr` to `simpleLinearRegression`, `LinearRegression` to `linearRegression`, `LogisticRegression` to `logisticRegression`. [#5391](https://github.com/yandex/ClickHouse/pull/5391) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
### Performance Improvements ### Performance Improvements
* Paralellize processing of parts in alter modify query. [#4639](https://github.com/yandex/ClickHouse/pull/4639) ([Ivan Kush](https://github.com/IvanKush)) * Paralellize processing of parts of non-replicated MergeTree tables in ALTER MODIFY query. [#4639](https://github.com/yandex/ClickHouse/pull/4639) ([Ivan Kush](https://github.com/IvanKush))
* Optimizations in regular expressions extraction. [#5193](https://github.com/yandex/ClickHouse/pull/5193) [#5191](https://github.com/yandex/ClickHouse/pull/5191) ([Danila Kutenin](https://github.com/danlark1)) * Optimizations in regular expressions extraction. [#5193](https://github.com/yandex/ClickHouse/pull/5193) [#5191](https://github.com/yandex/ClickHouse/pull/5191) ([Danila Kutenin](https://github.com/danlark1))
* Do not add right join key column to join result if it's used only in join on section. [#5260](https://github.com/yandex/ClickHouse/pull/5260) ([Artem Zuikov](https://github.com/4ertus2)) * Do not add right join key column to join result if it's used only in join on section. [#5260](https://github.com/yandex/ClickHouse/pull/5260) ([Artem Zuikov](https://github.com/4ertus2))
* Freeze the Kafka buffer after first empty response. It avoids multiple invokations of `ReadBuffer::next()` for empty result in some row-parsing streams. [#5283](https://github.com/yandex/ClickHouse/pull/5283) ([Ivan](https://github.com/abyss7)) * Freeze the Kafka buffer after first empty response. It avoids multiple invokations of `ReadBuffer::next()` for empty result in some row-parsing streams. [#5283](https://github.com/yandex/ClickHouse/pull/5283) ([Ivan](https://github.com/abyss7))

View File

@ -5,7 +5,6 @@ set(CLICKHOUSE_ODBC_BRIDGE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/IdentifierQuoteHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/IdentifierQuoteHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MainHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/MainHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBlockInputStream.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ODBCBlockInputStream.cpp
${CMAKE_CURRENT_SOURCE_DIR}/odbc-bridge.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ODBCBridge.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ODBCBridge.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PingHandler.cpp ${CMAKE_CURRENT_SOURCE_DIR}/PingHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/validateODBCConnectionString.cpp ${CMAKE_CURRENT_SOURCE_DIR}/validateODBCConnectionString.cpp

View File

@ -24,6 +24,12 @@ template <typename Value, bool FloatReturn> using FuncQuantilesDeterministic = A
template <typename Value, bool _> using FuncQuantileExact = AggregateFunctionQuantile<Value, QuantileExact<Value>, NameQuantileExact, false, void, false>; template <typename Value, bool _> using FuncQuantileExact = AggregateFunctionQuantile<Value, QuantileExact<Value>, NameQuantileExact, false, void, false>;
template <typename Value, bool _> using FuncQuantilesExact = AggregateFunctionQuantile<Value, QuantileExact<Value>, NameQuantilesExact, false, void, true>; template <typename Value, bool _> using FuncQuantilesExact = AggregateFunctionQuantile<Value, QuantileExact<Value>, NameQuantilesExact, false, void, true>;
template <typename Value, bool _> using FuncQuantileExactExclusive = AggregateFunctionQuantile<Value, QuantileExactExclusive<Value>, NameQuantileExactExclusive, false, Float64, false>;
template <typename Value, bool _> using FuncQuantilesExactExclusive = AggregateFunctionQuantile<Value, QuantileExactExclusive<Value>, NameQuantilesExactExclusive, false, Float64, true>;
template <typename Value, bool _> using FuncQuantileExactInclusive = AggregateFunctionQuantile<Value, QuantileExactInclusive<Value>, NameQuantileExactInclusive, false, Float64, false>;
template <typename Value, bool _> using FuncQuantilesExactInclusive = AggregateFunctionQuantile<Value, QuantileExactInclusive<Value>, NameQuantilesExactInclusive, false, Float64, true>;
template <typename Value, bool _> using FuncQuantileExactWeighted = AggregateFunctionQuantile<Value, QuantileExactWeighted<Value>, NameQuantileExactWeighted, true, void, false>; template <typename Value, bool _> using FuncQuantileExactWeighted = AggregateFunctionQuantile<Value, QuantileExactWeighted<Value>, NameQuantileExactWeighted, true, void, false>;
template <typename Value, bool _> using FuncQuantilesExactWeighted = AggregateFunctionQuantile<Value, QuantileExactWeighted<Value>, NameQuantilesExactWeighted, true, void, true>; template <typename Value, bool _> using FuncQuantilesExactWeighted = AggregateFunctionQuantile<Value, QuantileExactWeighted<Value>, NameQuantilesExactWeighted, true, void, true>;
@ -92,6 +98,12 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
factory.registerFunction(NameQuantileExact::name, createAggregateFunctionQuantile<FuncQuantileExact>); factory.registerFunction(NameQuantileExact::name, createAggregateFunctionQuantile<FuncQuantileExact>);
factory.registerFunction(NameQuantilesExact::name, createAggregateFunctionQuantile<FuncQuantilesExact>); factory.registerFunction(NameQuantilesExact::name, createAggregateFunctionQuantile<FuncQuantilesExact>);
factory.registerFunction(NameQuantileExactExclusive::name, createAggregateFunctionQuantile<FuncQuantileExactExclusive>);
factory.registerFunction(NameQuantilesExactExclusive::name, createAggregateFunctionQuantile<FuncQuantilesExactExclusive>);
factory.registerFunction(NameQuantileExactInclusive::name, createAggregateFunctionQuantile<FuncQuantileExactInclusive>);
factory.registerFunction(NameQuantilesExactInclusive::name, createAggregateFunctionQuantile<FuncQuantilesExactInclusive>);
factory.registerFunction(NameQuantileExactWeighted::name, createAggregateFunctionQuantile<FuncQuantileExactWeighted>); factory.registerFunction(NameQuantileExactWeighted::name, createAggregateFunctionQuantile<FuncQuantileExactWeighted>);
factory.registerFunction(NameQuantilesExactWeighted::name, createAggregateFunctionQuantile<FuncQuantilesExactWeighted>); factory.registerFunction(NameQuantilesExactWeighted::name, createAggregateFunctionQuantile<FuncQuantilesExactWeighted>);

View File

@ -199,8 +199,15 @@ struct NameQuantileDeterministic { static constexpr auto name = "quantileDetermi
struct NameQuantilesDeterministic { static constexpr auto name = "quantilesDeterministic"; }; struct NameQuantilesDeterministic { static constexpr auto name = "quantilesDeterministic"; };
struct NameQuantileExact { static constexpr auto name = "quantileExact"; }; struct NameQuantileExact { static constexpr auto name = "quantileExact"; };
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExact { static constexpr auto name = "quantilesExact"; }; struct NameQuantilesExact { static constexpr auto name = "quantilesExact"; };
struct NameQuantileExactExclusive { static constexpr auto name = "quantileExactExclusive"; };
struct NameQuantilesExactExclusive { static constexpr auto name = "quantilesExactExclusive"; };
struct NameQuantileExactInclusive { static constexpr auto name = "quantileExactInclusive"; };
struct NameQuantilesExactInclusive { static constexpr auto name = "quantilesExactInclusive"; };
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExactWeighted { static constexpr auto name = "quantilesExactWeighted"; }; struct NameQuantilesExactWeighted { static constexpr auto name = "quantilesExactWeighted"; };
struct NameQuantileTiming { static constexpr auto name = "quantileTiming"; }; struct NameQuantileTiming { static constexpr auto name = "quantileTiming"; };

View File

@ -17,8 +17,8 @@ namespace
template <template <typename> class Data> template <template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & arguments, const Array & params) AggregateFunctionPtr createAggregateFunctionWindowFunnel(const std::string & name, const DataTypes & arguments, const Array & params)
{ {
if (params.size() != 1) if (params.size() < 1)
throw Exception{"Aggregate function " + name + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH}; throw Exception{"Aggregate function " + name + " requires at least one parameter: <window>, [option, [option, ...]]", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
if (arguments.size() < 2) if (arguments.size() < 2)
throw Exception("Aggregate function " + name + " requires one timestamp argument and at least one event condition.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Aggregate function " + name + " requires one timestamp argument and at least one event condition.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -139,6 +139,7 @@ class AggregateFunctionWindowFunnel final
private: private:
UInt64 window; UInt64 window;
UInt8 events_size; UInt8 events_size;
UInt8 strict;
// Loop through the entire events_list, update the event timestamp value // Loop through the entire events_list, update the event timestamp value
@ -165,6 +166,10 @@ private:
if (event_idx == 0) if (event_idx == 0)
events_timestamp[0] = timestamp; events_timestamp[0] = timestamp;
else if (strict && events_timestamp[event_idx] >= 0)
{
return event_idx + 1;
}
else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window) else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window)
{ {
events_timestamp[event_idx] = events_timestamp[event_idx - 1]; events_timestamp[event_idx] = events_timestamp[event_idx - 1];
@ -191,8 +196,17 @@ public:
{ {
events_size = arguments.size() - 1; events_size = arguments.size() - 1;
window = params.at(0).safeGet<UInt64>(); window = params.at(0).safeGet<UInt64>();
}
strict = 0;
for (size_t i = 1; i < params.size(); ++i)
{
String option = params.at(i).safeGet<String>();
if (option.compare("strict") == 0)
strict = 1;
else
throw Exception{"Aggregate function " + getName() + " doesn't support a parameter: " + option, ErrorCodes::BAD_ARGUMENTS};
}
}
DataTypePtr getReturnType() const override DataTypePtr getReturnType() const override
{ {

View File

@ -14,6 +14,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
} }
/** Calculates quantile by collecting all values into array /** Calculates quantile by collecting all values into array
@ -106,16 +107,134 @@ struct QuantileExact
result[i] = Value(); result[i] = Value();
} }
} }
};
/// The same, but in the case of an empty state, NaN is returned. /// QuantileExactExclusive is equivalent to Excel PERCENTILE.EXC, R-6, SAS-4, SciPy-(0,0)
Float64 getFloat(Float64) const template <typename Value>
struct QuantileExactExclusive : public QuantileExact<Value>
{
using QuantileExact<Value>::array;
/// Get the value of the `level` quantile. The level must be between 0 and 1 excluding bounds.
Float64 getFloat(Float64 level)
{ {
throw Exception("Method getFloat is not implemented for QuantileExact", ErrorCodes::NOT_IMPLEMENTED); if (!array.empty())
{
if (level == 0. || level == 1.)
throw Exception("QuantileExactExclusive cannot interpolate for the percentiles 1 and 0", ErrorCodes::BAD_ARGUMENTS);
Float64 h = level * (array.size() + 1);
auto n = static_cast<size_t>(h);
if (n >= array.size())
return array[array.size() - 1];
else if (n < 1)
return array[0];
std::nth_element(array.begin(), array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
return array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
}
return std::numeric_limits<Float64>::quiet_NaN();
} }
void getManyFloat(const Float64 *, const size_t *, size_t, Float64 *) const void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
{ {
throw Exception("Method getManyFloat is not implemented for QuantileExact", ErrorCodes::NOT_IMPLEMENTED); if (!array.empty())
{
size_t prev_n = 0;
for (size_t i = 0; i < size; ++i)
{
auto level = levels[indices[i]];
if (level == 0. || level == 1.)
throw Exception("QuantileExactExclusive cannot interpolate for the percentiles 1 and 0", ErrorCodes::BAD_ARGUMENTS);
Float64 h = level * (array.size() + 1);
auto n = static_cast<size_t>(h);
if (n >= array.size())
result[indices[i]] = array[array.size() - 1];
else if (n < 1)
result[indices[i]] = array[0];
else
{
std::nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
result[indices[i]] = array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
prev_n = n - 1;
}
}
}
else
{
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<Float64>::quiet_NaN();
}
}
};
/// QuantileExactInclusive is equivalent to Excel PERCENTILE and PERCENTILE.INC, R-7, SciPy-(1,1)
template <typename Value>
struct QuantileExactInclusive : public QuantileExact<Value>
{
using QuantileExact<Value>::array;
/// Get the value of the `level` quantile. The level must be between 0 and 1 including bounds.
Float64 getFloat(Float64 level)
{
if (!array.empty())
{
Float64 h = level * (array.size() - 1) + 1;
auto n = static_cast<size_t>(h);
if (n >= array.size())
return array[array.size() - 1];
else if (n < 1)
return array[0];
std::nth_element(array.begin(), array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
return array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
}
return std::numeric_limits<Float64>::quiet_NaN();
}
void getManyFloat(const Float64 * levels, const size_t * indices, size_t size, Float64 * result)
{
if (!array.empty())
{
size_t prev_n = 0;
for (size_t i = 0; i < size; ++i)
{
auto level = levels[indices[i]];
Float64 h = level * (array.size() - 1) + 1;
auto n = static_cast<size_t>(h);
if (n >= array.size())
result[indices[i]] = array[array.size() - 1];
else if (n < 1)
result[indices[i]] = array[0];
else
{
std::nth_element(array.begin() + prev_n, array.begin() + n - 1, array.end());
auto nth_element = std::min_element(array.begin() + n, array.end());
result[indices[i]] = array[n - 1] + (h - n) * (*nth_element - array[n - 1]);
prev_n = n - 1;
}
}
}
else
{
for (size_t i = 0; i < size; ++i)
result[i] = std::numeric_limits<Float64>::quiet_NaN();
}
} }
}; };

View File

@ -99,7 +99,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, parallel_replicas_count, 0, "") \ M(SettingUInt64, parallel_replicas_count, 0, "") \
M(SettingUInt64, parallel_replica_offset, 0, "") \ M(SettingUInt64, parallel_replica_offset, 0, "") \
\ \
M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \ M(SettingBool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.") \
\ \
M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \ M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \
M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \ M(SettingBool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.") \
@ -212,7 +212,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \ M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \
M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite.") \ M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite.") \
M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \ M(SettingMilliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.") \
M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from streaming storages.") \ M(SettingMilliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.") \
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \ M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \ M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \

View File

@ -1,45 +0,0 @@
#include <Core/Block.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_)
: row_output(row_output_), header(header_) {}
void BlockOutputStreamFromRowOutputStream::write(const Block & block)
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
if (!first_row)
row_output->writeRowBetweenDelimiter();
first_row = false;
row_output->write(block, i);
}
}
void BlockOutputStreamFromRowOutputStream::setRowsBeforeLimit(size_t rows_before_limit)
{
row_output->setRowsBeforeLimit(rows_before_limit);
}
void BlockOutputStreamFromRowOutputStream::setTotals(const Block & totals)
{
row_output->setTotals(totals);
}
void BlockOutputStreamFromRowOutputStream::setExtremes(const Block & extremes)
{
row_output->setExtremes(extremes);
}
void BlockOutputStreamFromRowOutputStream::onProgress(const Progress & progress)
{
row_output->onProgress(progress);
}
}

View File

@ -1,38 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/IRowOutputStream.h>
namespace DB
{
/** Transforms a stream to write data by rows to a stream to write data by blocks.
* For example, to write a text dump.
*/
class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream
{
public:
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override { row_output->writePrefix(); }
void writeSuffix() override { row_output->writeSuffix(); }
void flush() override { row_output->flush(); }
void setRowsBeforeLimit(size_t rows_before_limit) override;
void setTotals(const Block & totals) override;
void setExtremes(const Block & extremes) override;
void onProgress(const Progress & progress) override;
String getContentType() const override { return row_output->getContentType(); }
private:
RowOutputStreamPtr row_output;
Block header;
bool first_row = true;
};
}

View File

@ -348,10 +348,9 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
const auto & current_column_type = data_types[table_column]; const auto & current_column_type = data_types[table_column];
const bool is_last_file_column = const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size(); file_column + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *istr.position() == delimiter; const bool at_delimiter = !istr.eof() && *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r' && (istr.eof() || *istr.position() == '\n' || *istr.position() == '\r');
|| istr.eof());
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ') out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
<< "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ') << "name: " << header.safeGetByPosition(table_column).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(table_column).name.size(), ' ')
@ -514,10 +513,9 @@ void CSVRowInputStream::updateDiagnosticInfo()
bool CSVRowInputStream::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx) bool CSVRowInputStream::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
{ {
const bool at_delimiter = *istr.position() == format_settings.csv.delimiter; const bool at_delimiter = !istr.eof() || *istr.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r' && (istr.eof() || *istr.position() == '\n' || *istr.position() == '\r');
|| istr.eof());
if (format_settings.csv.empty_as_default if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end)) && (at_delimiter || at_last_column_line_end))

View File

@ -100,7 +100,8 @@ BlockInputStreamPtr FormatFactory::getInput(
} }
BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{ {
if (name == "PrettyCompactMonoBlock") if (name == "PrettyCompactMonoBlock")
{ {
@ -124,14 +125,14 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
const Settings & settings = context.getSettingsRef(); const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings); FormatSettings format_settings = getOutputFormatSetting(settings);
/** Materialization is needed, because formats can use the functions `IDataType`, /** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns. * which only work with full columns.
*/ */
return std::make_shared<MaterializingBlockOutputStream>( return std::make_shared<MaterializingBlockOutputStream>(
output_getter(buf, sample, context, format_settings), sample); output_getter(buf, sample, context, callback, format_settings), sample);
} }
auto format = getOutputFormat(name, buf, sample, context); auto format = getOutputFormat(name, buf, sample, context, callback);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample); return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
} }
@ -165,7 +166,8 @@ InputFormatPtr FormatFactory::getInputFormat(
} }
OutputFormatPtr FormatFactory::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample, const Context & context) const OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
{ {
const auto & output_getter = getCreators(name).output_processor_creator; const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter) if (!output_getter)
@ -177,7 +179,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(const String & name, WriteBuffer
/** TODO: Materialization is needed, because formats can use the functions `IDataType`, /** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns. * which only work with full columns.
*/ */
return output_getter(buf, sample, context, format_settings); return output_getter(buf, sample, context, callback, format_settings);
} }

View File

@ -41,6 +41,10 @@ public:
/// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer. /// It's initial purpose was to extract payload for virtual columns from Kafka Consumer ReadBuffer.
using ReadCallback = std::function<void()>; using ReadCallback = std::function<void()>;
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;
private: private:
using InputCreator = std::function<BlockInputStreamPtr( using InputCreator = std::function<BlockInputStreamPtr(
ReadBuffer & buf, ReadBuffer & buf,
@ -55,6 +59,7 @@ private:
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context & context, const Context & context,
WriteCallback callback,
const FormatSettings & settings)>; const FormatSettings & settings)>;
using InputProcessorCreator = std::function<InputFormatPtr( using InputProcessorCreator = std::function<InputFormatPtr(
@ -68,6 +73,7 @@ private:
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context & context, const Context & context,
WriteCallback callback,
const FormatSettings & settings)>; const FormatSettings & settings)>;
struct Creators struct Creators
@ -91,7 +97,7 @@ public:
ReadCallback callback = {}) const; ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf, BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context) const; const Block & sample, const Context & context, WriteCallback callback = {}) const;
InputFormatPtr getInputFormat( InputFormatPtr getInputFormat(
const String & name, const String & name,
@ -102,8 +108,8 @@ public:
UInt64 rows_portion_size = 0, UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const; ReadCallback callback = {}) const;
OutputFormatPtr getOutputFormat(const String & name, WriteBuffer & buf, OutputFormatPtr getOutputFormat(
const Block & sample, const Context & context) const; const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;
/// Register format by its name. /// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator); void registerInputFormat(const String & name, InputCreator input_creator);

View File

@ -27,6 +27,7 @@ void registerOutputFormatNative(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample); return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);

View File

@ -11,6 +11,7 @@ void registerOutputFormatNull(FormatFactory & factory)
WriteBuffer &, WriteBuffer &,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<NullBlockOutputStream>(sample); return std::make_shared<NullBlockOutputStream>(sample);

View File

@ -14,7 +14,6 @@
#include <Formats/TabSeparatedRowInputStream.h> #include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h> #include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h> #include <Processors/Formats/Impl/TabSeparatedRowOutputFormat.h>
@ -47,7 +46,7 @@ try
RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings); RowInputStreamPtr row_input = std::make_shared<TabSeparatedRowInputStream>(in_buf, sample, false, false, format_settings);
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings); BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, format_settings)); BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, []{}, format_settings));
copyData(block_input, *block_output); copyData(block_input, *block_output);
} }

View File

@ -11,7 +11,6 @@
#include <Formats/TabSeparatedRowInputStream.h> #include <Formats/TabSeparatedRowInputStream.h>
#include <Formats/BlockInputStreamFromRowInputStream.h> #include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h> #include <Processors/Formats/OutputStreamToOutputFormat.h>
@ -44,7 +43,7 @@ try
BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings); BlockInputStreamFromRowInputStream block_input(row_input, sample, DEFAULT_INSERT_BLOCK_SIZE, 0, []{}, format_settings);
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>( BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, format_settings)); std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [] {}, format_settings));
copyData(block_input, *block_output); copyData(block_input, *block_output);
return 0; return 0;

View File

@ -11,7 +11,7 @@
namespace DB namespace DB
{ {
/// Stores data in memory chunks, size of cunks are exponentially increasing during write /// Stores data in memory chunks, size of chunks are exponentially increasing during write
/// Written data could be reread after write /// Written data could be reread after write
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false> class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{ {

View File

@ -35,7 +35,7 @@ public:
*/ */
inline void next() inline void next()
{ {
if (!offset()) if (!offset() && available())
return; return;
bytes += offset(); bytes += offset();

View File

@ -320,13 +320,7 @@ ColumnPtr Set::execute(const Block & block, bool negative) const
return res; return res;
} }
if (data_types.size() != num_key_columns) checkColumnsNumber(num_key_columns);
{
std::stringstream message;
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
/// Remember the columns we will work with. Also check that the data types are correct. /// Remember the columns we will work with. Also check that the data types are correct.
ColumnRawPtrs key_columns; ColumnRawPtrs key_columns;
@ -337,11 +331,7 @@ ColumnPtr Set::execute(const Block & block, bool negative) const
for (size_t i = 0; i < num_key_columns; ++i) for (size_t i = 0; i < num_key_columns; ++i)
{ {
if (!removeNullable(data_types[i])->equals(*removeNullable(block.safeGetByPosition(i).type))) checkTypesEqual(i, block.safeGetByPosition(i).type);
throw Exception("Types of column " + toString(i + 1) + " in section IN don't match: "
+ data_types[i]->getName() + " on the right, " + block.safeGetByPosition(i).type->getName() +
" on the left.", ErrorCodes::TYPE_MISMATCH);
materialized_columns.emplace_back(block.safeGetByPosition(i).column->convertToFullColumnIfConst()); materialized_columns.emplace_back(block.safeGetByPosition(i).column->convertToFullColumnIfConst());
key_columns.emplace_back() = materialized_columns.back().get(); key_columns.emplace_back() = materialized_columns.back().get();
} }
@ -421,6 +411,24 @@ void Set::executeOrdinary(
} }
} }
void Set::checkColumnsNumber(size_t num_key_columns) const
{
if (data_types.size() != num_key_columns)
{
std::stringstream message;
message << "Number of columns in section IN doesn't match. "
<< num_key_columns << " at left, " << data_types.size() << " at right.";
throw Exception(message.str(), ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
}
}
void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const
{
if (!removeNullable(data_types[set_type_idx])->equals(*removeNullable(other_type)))
throw Exception("Types of column " + toString(set_type_idx + 1) + " in section IN don't match: "
+ data_types[set_type_idx]->getName() + " on the right, " + other_type->getName() +
" on the left.", ErrorCodes::TYPE_MISMATCH);
}
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_) MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
: indexes_mapping(std::move(index_mapping_)) : indexes_mapping(std::move(index_mapping_))

View File

@ -70,6 +70,9 @@ public:
bool hasExplicitSetElements() const { return fill_set_elements; } bool hasExplicitSetElements() const { return fill_set_elements; }
Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; } Columns getSetElements() const { return { set_elements.begin(), set_elements.end() }; }
void checkColumnsNumber(size_t num_key_columns) const;
void checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) const;
private: private:
size_t keys_size = 0; size_t keys_size = 0;
Sizes key_sizes; Sizes key_sizes;

View File

@ -20,6 +20,9 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
first_row = false; first_row = false;
write(columns, row); write(columns, row);
if (write_single_row_callback)
write_single_row_callback();
} }
} }
@ -96,6 +99,3 @@ void IRowOutputFormat::writeTotals(const DB::Columns & columns, size_t row_num)
} }
} }

View File

@ -1,8 +1,10 @@
#pragma once #pragma once
#include <string> #include <Formats/FormatFactory.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <string>
namespace DB namespace DB
{ {
@ -22,8 +24,8 @@ protected:
void finalize() override; void finalize() override;
public: public:
IRowOutputFormat(const Block & header, WriteBuffer & out_) IRowOutputFormat(const Block & header, WriteBuffer & out_, FormatFactory::WriteCallback callback)
: IOutputFormat(header, out_), types(header.getDataTypes()) : IOutputFormat(header, out_), types(header.getDataTypes()), write_single_row_callback(callback)
{ {
} }
@ -57,6 +59,9 @@ private:
bool prefix_written = false; bool prefix_written = false;
bool suffix_written = false; bool suffix_written = false;
// Callback used to indicate that another row is written.
FormatFactory::WriteCallback write_single_row_callback;
void writePrefixIfNot() void writePrefixIfNot()
{ {
if (!prefix_written) if (!prefix_written)
@ -76,5 +81,3 @@ private:
}; };
} }

View File

@ -9,8 +9,8 @@
namespace DB namespace DB
{ {
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_) BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header, out_), with_names(with_names_), with_types(with_types_) : IRowOutputFormat(header, out_, callback), with_names(with_names_), with_types(with_types_)
{ {
} }
@ -53,18 +53,20 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false); return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, callback);
}); });
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", []( factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true); return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, callback);
}); });
} }

View File

@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat: public IRowOutputFormat class BinaryRowOutputFormat: public IRowOutputFormat
{ {
public: public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_); BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback);
String getName() const override { return "BinaryRowOutputFormat"; } String getName() const override { return "BinaryRowOutputFormat"; }
@ -32,4 +32,3 @@ protected:
}; };
} }

View File

@ -349,10 +349,9 @@ bool OPTIMIZE(1) CSVRowInputFormat::parseRowAndPrintDiagnosticInfo(MutableColumn
const auto & current_column_type = data_types[table_column]; const auto & current_column_type = data_types[table_column];
const bool is_last_file_column = const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size(); file_column + 1 == column_indexes_for_input_fields.size();
const bool at_delimiter = *in.position() == delimiter; const bool at_delimiter = !in.eof() && *in.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column const bool at_last_column_line_end = is_last_file_column
&& (*in.position() == '\n' || *in.position() == '\r' && (in.eof() || *in.position() == '\n' || *in.position() == '\r');
|| in.eof());
auto & header = getPort().getHeader(); auto & header = getPort().getHeader();
out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ') out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
@ -516,10 +515,9 @@ void CSVRowInputFormat::updateDiagnosticInfo()
bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx) bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
{ {
const bool at_delimiter = *in.position() == format_settings.csv.delimiter; const bool at_delimiter = !in.eof() && *in.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column const bool at_last_column_line_end = is_last_file_column
&& (*in.position() == '\n' || *in.position() == '\r' && (in.eof() || *in.position() == '\n' || *in.position() == '\r');
|| in.eof());
if (format_settings.csv.empty_as_default if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end)) && (at_delimiter || at_last_column_line_end))

View File

@ -8,8 +8,8 @@ namespace DB
{ {
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const FormatSettings & format_settings_) CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), with_names(with_names_), format_settings(format_settings_) : IRowOutputFormat(header_, out_, callback), with_names(with_names_), format_settings(format_settings_)
{ {
auto & sample = getPort(PortKind::Main).getHeader(); auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns(); size_t columns = sample.columns();
@ -77,9 +77,10 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, format_settings); return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, callback, format_settings);
}); });
} }
} }

View File

@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names /** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types * with_types - output in the next line header with the names of the types
*/ */
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const FormatSettings & format_settings_); CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; } String getName() const override { return "CSVRowOutputFormat"; }
@ -45,4 +45,3 @@ protected:
}; };
} }

View File

@ -8,8 +8,8 @@ namespace DB
{ {
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat( JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_, const Block & header, const FormatSettings & settings_) WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: JSONRowOutputFormat(out_, header, settings_) : JSONRowOutputFormat(out_, header, callback, settings_)
{ {
} }
@ -81,9 +81,10 @@ void registerOutputFormatProcessorJSONCompact(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, format_settings); return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings);
}); });
} }

View File

@ -16,7 +16,7 @@ struct FormatSettings;
class JSONCompactRowOutputFormat : public JSONRowOutputFormat class JSONCompactRowOutputFormat : public JSONRowOutputFormat
{ {
public: public:
JSONCompactRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_); JSONCompactRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONCompactRowOutputFormat"; } String getName() const override { return "JSONCompactRowOutputFormat"; }

View File

@ -8,8 +8,8 @@ namespace DB
{ {
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_) JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_), settings(settings_) : IRowOutputFormat(header_, out_, callback), settings(settings_)
{ {
auto & sample = getPort(PortKind::Main).getHeader(); auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns(); size_t columns = sample.columns();
@ -57,9 +57,10 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, format_settings); return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings);
}); });
} }

View File

@ -15,7 +15,7 @@ namespace DB
class JSONEachRowRowOutputFormat : public IRowOutputFormat class JSONEachRowRowOutputFormat : public IRowOutputFormat
{ {
public: public:
JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_); JSONEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONEachRowRowOutputFormat"; } String getName() const override { return "JSONEachRowRowOutputFormat"; }
@ -37,4 +37,3 @@ private:
}; };
} }

View File

@ -2,14 +2,13 @@
#include <IO/WriteBufferValidUTF8.h> #include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h> #include <Processors/Formats/Impl/JSONRowOutputFormat.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB namespace DB
{ {
JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_) JSONRowOutputFormat::JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header, out_), settings(settings_) : IRowOutputFormat(header, out_, callback), settings(settings_)
{ {
auto & sample = getPort(PortKind::Main).getHeader(); auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList()); NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -248,9 +247,10 @@ void registerOutputFormatProcessorJSON(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<JSONRowOutputFormat>(buf, sample, format_settings); return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings);
}); });
} }

View File

@ -16,7 +16,7 @@ namespace DB
class JSONRowOutputFormat : public IRowOutputFormat class JSONRowOutputFormat : public IRowOutputFormat
{ {
public: public:
JSONRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & settings_); JSONRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONRowOutputFormat"; } String getName() const override { return "JSONRowOutputFormat"; }
@ -81,4 +81,3 @@ protected:
}; };
} }

View File

@ -107,10 +107,12 @@ void MySQLOutputFormat::flush()
void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory) void registerOutputFormatProcessorMySQLWrite(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormatProcessor(
"MySQLWire", [](WriteBuffer & buf, const Block & sample, const Context & context, const FormatSettings & settings) "MySQLWire",
{ [](WriteBuffer & buf,
return std::make_shared<MySQLOutputFormat>(buf, sample, context, settings); const Block & sample,
}); const Context & context,
FormatFactory::WriteCallback,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, context, settings); });
} }
} }

View File

@ -40,4 +40,3 @@ private:
}; };
} }

View File

@ -161,6 +161,7 @@ void registerOutputFormatProcessorNative(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf); return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf);

View File

@ -22,6 +22,7 @@ void registerOutputFormatProcessorNull(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings &) const FormatSettings &)
{ {
return std::make_shared<NullOutputFormat>(sample, buf); return std::make_shared<NullOutputFormat>(sample, buf);

View File

@ -107,7 +107,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory) void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormatProcessor(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const Context &, const FormatSettings & format_settings) "ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const Context &, FormatFactory::WriteCallback, const FormatSettings & format_settings)
{ {
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings); return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
}); });

View File

@ -70,6 +70,7 @@ void registerOutputFormatProcessorODBCDriver(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<ODBCDriverBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<ODBCDriverBlockOutputFormat>(buf, sample, format_settings);

View File

@ -423,7 +423,12 @@ void ParquetBlockOutputFormat::finalize()
void registerOutputFormatProcessorParquet(FormatFactory & factory) void registerOutputFormatProcessorParquet(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormatProcessor(
"Parquet", [](WriteBuffer & buf, const Block & sample, const Context & /*context*/, const FormatSettings & format_settings) "Parquet",
[](WriteBuffer & buf,
const Block & sample,
const Context & /*context*/,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{ {
auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings); auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
/// TODO /// TODO

View File

@ -261,6 +261,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
@ -270,6 +271,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
FormatSettings changed_settings = format_settings; FormatSettings changed_settings = format_settings;

View File

@ -134,6 +134,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings);
@ -143,6 +144,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
FormatSettings changed_settings = format_settings; FormatSettings changed_settings = format_settings;

View File

@ -103,6 +103,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings); return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
@ -112,6 +113,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback,
const FormatSettings & format_settings) const FormatSettings & format_settings)
{ {
FormatSettings changed_settings = format_settings; FormatSettings changed_settings = format_settings;

View File

@ -23,8 +23,9 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat( ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header, const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema) const FormatSchemaInfo & format_schema)
: IRowOutputFormat(header, out_) : IRowOutputFormat(header, out_, callback)
, data_types(header.getDataTypes()) , data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames()) , writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
{ {
@ -46,9 +47,14 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
void registerOutputFormatProcessorProtobuf(FormatFactory & factory) void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{ {
factory.registerOutputFormatProcessor( factory.registerOutputFormatProcessor(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &) "Protobuf",
[](WriteBuffer & buf,
const Block & header,
const Context & context,
FormatFactory::WriteCallback callback,
const FormatSettings &)
{ {
return std::make_shared<ProtobufRowOutputFormat>(buf, header, FormatSchemaInfo(context, "Protobuf")); return std::make_shared<ProtobufRowOutputFormat>(buf, header, callback, FormatSchemaInfo(context, "Protobuf"));
}); });
} }

View File

@ -35,6 +35,7 @@ public:
ProtobufRowOutputFormat( ProtobufRowOutputFormat(
WriteBuffer & out_, WriteBuffer & out_,
const Block & header, const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema); const FormatSchemaInfo & format_schema);
String getName() const override { return "ProtobufRowOutputFormat"; } String getName() const override { return "ProtobufRowOutputFormat"; }

View File

@ -8,8 +8,8 @@
namespace DB namespace DB
{ {
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_) TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, format_settings_) : TabSeparatedRowOutputFormat(out_, header, false, false, callback, format_settings_)
{ {
auto & sample = getPort(PortKind::Main).getHeader(); auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList()); NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -46,9 +46,10 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TSKVRowOutputFormat>(buf, sample, settings); return std::make_shared<TSKVRowOutputFormat>(buf, sample, callback, settings);
}); });
} }

View File

@ -14,7 +14,7 @@ namespace DB
class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat
{ {
public: public:
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings); TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings);
String getName() const override { return "TSKVRowOutputFormat"; } String getName() const override { return "TSKVRowOutputFormat"; }
@ -27,4 +27,3 @@ protected:
}; };
} }

View File

@ -13,8 +13,16 @@ namespace DB
class TabSeparatedRawRowOutputFormat : public TabSeparatedRowOutputFormat class TabSeparatedRawRowOutputFormat : public TabSeparatedRowOutputFormat
{ {
public: public:
TabSeparatedRawRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_) TabSeparatedRawRowOutputFormat(
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, format_settings_) {} WriteBuffer & out_,
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, callback, format_settings_)
{
}
String getName() const override { return "TabSeparatedRawRowOutputFormat"; } String getName() const override { return "TabSeparatedRawRowOutputFormat"; }
@ -25,4 +33,3 @@ public:
}; };
} }

View File

@ -6,10 +6,14 @@
namespace DB namespace DB
{ {
TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat( TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_) WriteBuffer & out_,
: IRowOutputFormat(header_, out_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_) const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{ {
} }
@ -75,9 +79,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, settings); return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, callback, settings);
}); });
} }
@ -87,9 +92,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, settings); return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, callback, settings);
}); });
} }
@ -99,9 +105,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, settings); return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, callback, settings);
}); });
} }
@ -111,9 +118,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, settings); return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, callback, settings);
}); });
} }
} }

View File

@ -18,7 +18,13 @@ public:
/** with_names - output in the first line a header with column names /** with_names - output in the first line a header with column names
* with_types - output the next line header with the names of the types * with_types - output the next line header with the names of the types
*/ */
TabSeparatedRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings_); TabSeparatedRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings_);
String getName() const override { return "TabSeparatedRowOutputFormat"; } String getName() const override { return "TabSeparatedRowOutputFormat"; }
@ -40,4 +46,3 @@ protected:
}; };
} }

View File

@ -10,8 +10,8 @@ namespace DB
{ {
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_) ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_) : IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
{ {
} }
@ -47,9 +47,10 @@ void registerOutputFormatProcessorValues(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<ValuesRowOutputFormat>(buf, sample, settings); return std::make_shared<ValuesRowOutputFormat>(buf, sample, callback, settings);
}); });
} }

View File

@ -15,7 +15,7 @@ class WriteBuffer;
class ValuesRowOutputFormat : public IRowOutputFormat class ValuesRowOutputFormat : public IRowOutputFormat
{ {
public: public:
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "ValuesRowOutputFormat"; } String getName() const override { return "ValuesRowOutputFormat"; }
@ -30,4 +30,3 @@ private:
}; };
} }

View File

@ -11,8 +11,8 @@ namespace DB
{ {
VerticalRowOutputFormat::VerticalRowOutputFormat( VerticalRowOutputFormat::VerticalRowOutputFormat(
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_) WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_) : IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
{ {
auto & sample = getPort(PortKind::Main).getHeader(); auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns(); size_t columns = sample.columns();
@ -169,9 +169,10 @@ void registerOutputFormatProcessorVertical(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<VerticalRowOutputFormat>(buf, sample, settings); return std::make_shared<VerticalRowOutputFormat>(buf, sample, callback, settings);
}); });
} }

View File

@ -18,7 +18,7 @@ class Context;
class VerticalRowOutputFormat : public IRowOutputFormat class VerticalRowOutputFormat : public IRowOutputFormat
{ {
public: public:
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "VerticalRowOutputFormat"; } String getName() const override { return "VerticalRowOutputFormat"; }
@ -50,4 +50,3 @@ protected:
}; };
} }

View File

@ -7,8 +7,8 @@
namespace DB namespace DB
{ {
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_) XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_), format_settings(format_settings_) : IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
{ {
auto & sample = getPort(PortKind::Main).getHeader(); auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList()); NamesAndTypesList columns(sample.getNamesAndTypesList());
@ -246,9 +246,10 @@ void registerOutputFormatProcessorXML(FormatFactory & factory)
WriteBuffer & buf, WriteBuffer & buf,
const Block & sample, const Block & sample,
const Context &, const Context &,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) const FormatSettings & settings)
{ {
return std::make_shared<XMLRowOutputFormat>(buf, sample, settings); return std::make_shared<XMLRowOutputFormat>(buf, sample, callback, settings);
}); });
} }

View File

@ -16,7 +16,7 @@ namespace DB
class XMLRowOutputFormat : public IRowOutputFormat class XMLRowOutputFormat : public IRowOutputFormat
{ {
public: public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
String getName() const override { return "XMLRowOutputFormat"; } String getName() const override { return "XMLRowOutputFormat"; }
@ -75,4 +75,3 @@ protected:
}; };
} }

View File

@ -33,7 +33,7 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
buffer->reset(); buffer->reset();
} }
storage.pushBuffer(buffer); storage.pushReadBuffer(buffer);
} }
Block KafkaBlockInputStream::getHeader() const Block KafkaBlockInputStream::getHeader() const
@ -43,11 +43,12 @@ Block KafkaBlockInputStream::getHeader() const
void KafkaBlockInputStream::readPrefixImpl() void KafkaBlockInputStream::readPrefixImpl()
{ {
buffer = storage.tryClaimBuffer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds()); auto timeout = std::chrono::milliseconds(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
claimed = !!buffer; claimed = !!buffer;
if (!buffer) if (!buffer)
buffer = storage.createBuffer(); return;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics()); buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics());
@ -80,6 +81,9 @@ void KafkaBlockInputStream::readPrefixImpl()
Block KafkaBlockInputStream::readImpl() Block KafkaBlockInputStream::readImpl()
{ {
if (!buffer)
return Block();
Block block = children.back()->read(); Block block = children.back()->read();
if (!block) if (!block)
return block; return block;
@ -99,6 +103,9 @@ Block KafkaBlockInputStream::readImpl()
void KafkaBlockInputStream::readSuffixImpl() void KafkaBlockInputStream::readSuffixImpl()
{ {
if (!buffer)
return;
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit(); buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();
broken = false; broken = false;

View File

@ -27,7 +27,7 @@ private:
Names column_names; Names column_names;
UInt64 max_block_size; UInt64 max_block_size;
BufferPtr buffer; ConsumerBufferPtr buffer;
MutableColumns virtual_columns; MutableColumns virtual_columns;
bool broken = true, claimed = false; bool broken = true, claimed = false;
}; };

View File

@ -0,0 +1,54 @@
#include "KafkaBlockOutputStream.h"
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern int CANNOT_CREATE_IO_BUFFER;
}
KafkaBlockOutputStream::KafkaBlockOutputStream(StorageKafka & storage_, const Context & context_) : storage(storage_), context(context_)
{
}
KafkaBlockOutputStream::~KafkaBlockOutputStream()
{
}
Block KafkaBlockOutputStream::getHeader() const
{
return storage.getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer();
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this]{ buffer->count_row(); });
}
void KafkaBlockOutputStream::write(const Block & block)
{
child->write(block);
}
void KafkaBlockOutputStream::writeSuffix()
{
child->writeSuffix();
flush();
}
void KafkaBlockOutputStream::flush()
{
if (buffer)
buffer->flush();
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Kafka/StorageKafka.h>
namespace DB
{
class KafkaBlockOutputStream : public IBlockOutputStream
{
public:
explicit KafkaBlockOutputStream(StorageKafka & storage_, const Context & context_);
~KafkaBlockOutputStream() override;
Block getHeader() const override;
void writePrefix() override;
void write(const Block & block) override;
void writeSuffix() override;
void flush() override;
private:
StorageKafka & storage;
Context context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;
};
}

View File

@ -10,7 +10,7 @@
namespace DB namespace DB
{ {
using BufferPtr = std::shared_ptr<DelimitedReadBuffer>; using ConsumerBufferPtr = std::shared_ptr<DelimitedReadBuffer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>; using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class ReadBufferFromKafkaConsumer : public ReadBuffer class ReadBufferFromKafkaConsumer : public ReadBuffer

View File

@ -16,6 +16,7 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Storages/Kafka/KafkaSettings.h> #include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaBlockInputStream.h> #include <Storages/Kafka/KafkaBlockInputStream.h>
#include <Storages/Kafka/KafkaBlockOutputStream.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h> #include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
@ -106,7 +107,7 @@ StorageKafka::StorageKafka(
, skip_broken(skip_broken_) , skip_broken(skip_broken_)
, intermediate_commit(intermediate_commit_) , intermediate_commit(intermediate_commit_)
{ {
task = global_context.getSchedulePool().createTask(log->name(), [this]{ streamThread(); }); task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate(); task->deactivate();
} }
@ -140,14 +141,21 @@ BlockInputStreams StorageKafka::read(
} }
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const Context & context)
{
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, context);
}
void StorageKafka::startup() void StorageKafka::startup()
{ {
for (size_t i = 0; i < num_consumers; ++i) for (size_t i = 0; i < num_consumers; ++i)
{ {
// Make buffer available
try try
{ {
pushBuffer(createBuffer()); pushReadBuffer(createReadBuffer());
++num_created_consumers; ++num_created_consumers;
} }
catch (const cppkafka::Exception &) catch (const cppkafka::Exception &)
@ -169,7 +177,7 @@ void StorageKafka::shutdown()
// Close all consumers // Close all consumers
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
auto buffer = claimBuffer(); auto buffer = popReadBuffer();
// FIXME: not sure if we really close consumers here, and if we really need to close them here. // FIXME: not sure if we really close consumers here, and if we really need to close them here.
} }
@ -193,10 +201,70 @@ void StorageKafka::updateDependencies()
} }
BufferPtr StorageKafka::createBuffer() void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer)
{ {
std::lock_guard lock(mutex);
buffers.push_back(buffer);
semaphore.set();
}
ConsumerBufferPtr StorageKafka::popReadBuffer()
{
return popReadBuffer(std::chrono::milliseconds::zero());
}
ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
{
// Wait for the first free buffer
if (timeout == std::chrono::milliseconds::zero())
semaphore.wait();
else
{
if (!semaphore.tryWait(timeout.count()))
return nullptr;
}
// Take the first available buffer from the list
std::lock_guard lock(mutex);
auto buffer = buffers.back();
buffers.pop_back();
return buffer;
}
ProducerBufferPtr StorageKafka::createWriteBuffer()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// TODO: fill required settings
updateConfiguration(conf);
auto producer = std::make_shared<cppkafka::Producer>(conf);
const Settings & settings = global_context.getSettingsRef();
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<WriteBufferToKafkaProducer>(
producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout));
}
ConsumerBufferPtr StorageKafka::createReadBuffer()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
updateConfiguration(conf);
// Create a consumer and subscribe to topics // Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration()); auto consumer = std::make_shared<cppkafka::Consumer>(conf);
// Limit the number of batched messages to allow early cancellations // Limit the number of batched messages to allow early cancellations
const Settings & settings = global_context.getSettingsRef(); const Settings & settings = global_context.getSettingsRef();
@ -209,61 +277,9 @@ BufferPtr StorageKafka::createBuffer()
std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit), row_delimiter); std::make_unique<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit), row_delimiter);
} }
BufferPtr StorageKafka::claimBuffer()
void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
{ {
return tryClaimBuffer(-1L);
}
BufferPtr StorageKafka::tryClaimBuffer(long wait_ms)
{
// Wait for the first free buffer
if (wait_ms >= 0)
{
if (!semaphore.tryWait(wait_ms))
return nullptr;
}
else
semaphore.wait();
// Take the first available buffer from the list
std::lock_guard lock(mutex);
auto buffer = buffers.back();
buffers.pop_back();
return buffer;
}
void StorageKafka::pushBuffer(BufferPtr buffer)
{
std::lock_guard lock(mutex);
buffers.push_back(buffer);
semaphore.set();
}
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
// If no offset stored for this group, read all messages from the start
conf.set("auto.offset.reset", "smallest");
// We manually commit offsets after a stream successfully finished
conf.set("enable.auto.commit", "false");
// Ignore EOF messages
conf.set("enable.partition.eof", "false");
// for debug logs inside rdkafka
// conf.set("debug", "consumer,cgrp,topic,fetch");
// Update consumer configuration from the configuration // Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef(); const auto & config = global_context.getConfigRef();
if (config.has(CONFIG_PREFIX)) if (config.has(CONFIG_PREFIX))
@ -276,8 +292,6 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration()
if (config.has(topic_config_key)) if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key); loadFromConfig(conf, config, topic_config_key);
} }
return conf;
} }
bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name) bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name)
@ -307,7 +321,7 @@ bool StorageKafka::checkDependencies(const String & current_database_name, const
return true; return true;
} }
void StorageKafka::streamThread() void StorageKafka::threadFunc()
{ {
try try
{ {

View File

@ -5,11 +5,11 @@
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h> #include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Poco/Event.h> #include <Storages/Kafka/WriteBufferToKafkaProducer.h>
#include <Poco/Semaphore.h> #include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h> #include <ext/shared_ptr_helper.h>
#include <cppkafka/cppkafka.h>
#include <mutex> #include <mutex>
namespace DB namespace DB
@ -36,14 +36,20 @@ public:
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
BlockOutputStreamPtr write(
const ASTPtr & query,
const Context & context
) override;
void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override; void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name) override;
void updateDependencies() override; void updateDependencies() override;
BufferPtr createBuffer(); void pushReadBuffer(ConsumerBufferPtr buf);
BufferPtr claimBuffer(); ConsumerBufferPtr popReadBuffer();
BufferPtr tryClaimBuffer(long wait_ms); ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
void pushBuffer(BufferPtr buf);
ProducerBufferPtr createWriteBuffer();
const auto & getTopics() const { return topics; } const auto & getTopics() const { return topics; }
const auto & getFormatName() const { return format_name; } const auto & getFormatName() const { return format_name; }
@ -84,7 +90,7 @@ private:
// Consumer list // Consumer list
Poco::Semaphore semaphore; Poco::Semaphore semaphore;
std::mutex mutex; std::mutex mutex;
std::vector<BufferPtr> buffers; /// available buffers for Kafka consumers std::vector<ConsumerBufferPtr> buffers; /// available buffers for Kafka consumers
size_t skip_broken; size_t skip_broken;
@ -94,9 +100,12 @@ private:
BackgroundSchedulePool::TaskHolder task; BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false}; std::atomic<bool> stream_cancelled{false};
cppkafka::Configuration createConsumerConfiguration(); ConsumerBufferPtr createReadBuffer();
void streamThread(); // Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void threadFunc();
bool streamToViews(); bool streamToViews();
bool checkDependencies(const String & database_name, const String & table_name); bool checkDependencies(const String & database_name, const String & table_name);
}; };

View File

@ -0,0 +1,90 @@
#include "WriteBufferToKafkaProducer.h"
namespace DB
{
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
ProducerPtr producer_,
const std::string & topic_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout)
: WriteBuffer(nullptr, 0)
, producer(producer_)
, topic(topic_)
, delim(delimiter)
, max_rows(rows_per_message)
, chunk_size(chunk_size_)
, timeout(poll_timeout)
{
}
WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
{
assert(rows == 0 && chunks.empty());
}
void WriteBufferToKafkaProducer::count_row()
{
if (++rows % max_rows == 0)
{
std::string payload;
payload.reserve((chunks.size() - 1) * chunk_size + offset());
for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
payload.append(*i);
int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0;
payload.append(chunks.back(), 0, offset() - trunk_delim);
while (true)
{
try
{
producer->produce(cppkafka::MessageBuilder(topic).payload(payload));
}
catch (cppkafka::HandleException & e)
{
if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
producer->poll(timeout);
continue;
}
throw e;
}
break;
}
rows = 0;
chunks.clear();
set(nullptr, 0);
}
}
void WriteBufferToKafkaProducer::flush()
{
// For unknown reason we may hit some internal timeout when inserting for the first time.
while (true)
{
try
{
producer->flush(timeout);
}
catch (cppkafka::HandleException & e)
{
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
continue;
throw e;
}
break;
}
}
void WriteBufferToKafkaProducer::nextImpl()
{
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
}
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <cppkafka/cppkafka.h>
#include <list>
namespace DB
{
class WriteBufferToKafkaProducer;
using ProducerBufferPtr = std::shared_ptr<WriteBufferToKafkaProducer>;
using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
class WriteBufferToKafkaProducer : public WriteBuffer
{
public:
WriteBufferToKafkaProducer(
ProducerPtr producer_,
const std::string & topic_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout);
~WriteBufferToKafkaProducer() override;
void count_row();
void flush();
private:
void nextImpl() override;
ProducerPtr producer;
const std::string topic;
const std::optional<char> delim;
const size_t max_rows;
const size_t chunk_size;
const std::chrono::milliseconds timeout;
size_t rows = 0;
std::list<std::string> chunks;
};
}

View File

@ -546,11 +546,13 @@ bool KeyCondition::tryPrepareSetIndex(
} }
}; };
size_t left_args_count = 1;
const auto * left_arg_tuple = left_arg->as<ASTFunction>(); const auto * left_arg_tuple = left_arg->as<ASTFunction>();
if (left_arg_tuple && left_arg_tuple->name == "tuple") if (left_arg_tuple && left_arg_tuple->name == "tuple")
{ {
const auto & tuple_elements = left_arg_tuple->arguments->children; const auto & tuple_elements = left_arg_tuple->arguments->children;
for (size_t i = 0; i < tuple_elements.size(); ++i) left_args_count = tuple_elements.size();
for (size_t i = 0; i < left_args_count; ++i)
get_key_tuple_position_mapping(tuple_elements[i], i); get_key_tuple_position_mapping(tuple_elements[i], i);
} }
else else
@ -577,6 +579,10 @@ bool KeyCondition::tryPrepareSetIndex(
if (!prepared_set->hasExplicitSetElements()) if (!prepared_set->hasExplicitSetElements())
return false; return false;
prepared_set->checkColumnsNumber(left_args_count);
for (size_t i = 0; i < indexes_mapping.size(); ++i)
prepared_set->checkTypesEqual(indexes_mapping[i].tuple_index, removeLowCardinality(data_types[i]));
out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping)); out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping));
return true; return true;

View File

@ -176,6 +176,22 @@ IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeA
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH}; throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
} }
std::string makeFormattedListOfShards(const ClusterPtr & cluster)
{
std::ostringstream os;
bool head = true;
os << "[";
for (const auto & shard_info : cluster->getShardsInfo())
{
(head ? os : os << ", ") << shard_info.shard_num;
head = false;
}
os << "]";
return os.str();
}
} }
@ -312,10 +328,23 @@ BlockInputStreams StorageDistributed::read(
if (settings.optimize_skip_unused_shards) if (settings.optimize_skip_unused_shards)
{ {
auto smaller_cluster = skipUnusedShards(cluster, query_info); if (has_sharding_key)
{
auto smaller_cluster = skipUnusedShards(cluster, query_info);
if (smaller_cluster) if (smaller_cluster)
cluster = smaller_cluster; {
cluster = smaller_cluster;
LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": "
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
" " << makeFormattedListOfShards(cluster));
}
else
{
LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": "
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster");
}
}
} }
return ClusterProxy::executeQuery( return ClusterProxy::executeQuery(
@ -488,15 +517,32 @@ void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
} }
/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible /// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
/// using constraints from "WHERE" condition, otherwise returns `nullptr` /// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info) ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info)
{ {
if (!has_sharding_key)
{
throw Exception("Internal error: cannot determine shards of a distributed table if no sharding expression is supplied", ErrorCodes::LOGICAL_ERROR);
}
const auto & select = query_info.query->as<ASTSelectQuery &>(); const auto & select = query_info.query->as<ASTSelectQuery &>();
if (!select.where() || !sharding_key_expr) if (!select.prewhere() && !select.where())
{
return nullptr; return nullptr;
}
const auto & blocks = evaluateExpressionOverConstantCondition(select.where(), sharding_key_expr); ASTPtr condition_ast;
if (select.prewhere() && select.where())
{
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
}
else
{
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
}
const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr);
// Can't get definite answer if we can skip any shards // Can't get definite answer if we can skip any shards
if (!blocks) if (!blocks)

View File

@ -1956,10 +1956,37 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
} }
/// Add to the queue jobs to receive all the active parts that the reference/master replica has. /// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts"); Strings source_replica_parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(format_version, parts); ActiveDataPartSet active_parts_set(format_version, source_replica_parts);
Strings active_parts = active_parts_set.getParts(); Strings active_parts = active_parts_set.getParts();
/// Remove local parts if source replica does not have them, because such parts will never be fetched by other replicas.
Strings local_parts_in_zk = zookeeper->getChildren(replica_path + "/parts");
Strings parts_to_remove_from_zk;
for (const auto & part : local_parts_in_zk)
{
if (active_parts_set.getContainingPart(part).empty())
{
queue.remove(zookeeper, part);
parts_to_remove_from_zk.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part " << part << ". Removing it from ZooKeeper.");
}
}
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
auto local_active_parts = getDataParts();
DataPartsVector parts_to_remove_from_working_set;
for (const auto & part : local_active_parts)
{
if (active_parts_set.getContainingPart(part->name).empty())
{
parts_to_remove_from_working_set.emplace_back(part);
LOG_WARNING(log, "Source replica does not have part " << part->name << ". Removing it from working set.");
}
}
removePartsFromWorkingSet(parts_to_remove_from_working_set, true);
for (const String & name : active_parts) for (const String & name : active_parts)
{ {
LogEntry log_entry; LogEntry log_entry;

View File

@ -148,10 +148,9 @@ StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const
StoragesInfo StoragesInfoStream::next() StoragesInfo StoragesInfoStream::next()
{ {
StoragesInfo info;
while (next_row < rows) while (next_row < rows)
{ {
StoragesInfo info;
info.database = (*database_column)[next_row].get<String>(); info.database = (*database_column)[next_row].get<String>();
info.table = (*table_column)[next_row].get<String>(); info.table = (*table_column)[next_row].get<String>();
@ -198,10 +197,10 @@ StoragesInfo StoragesInfoStream::next()
if (!info.data) if (!info.data)
throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR);
break; return info;
} }
return info; return {};
} }
BlockInputStreams StorageSystemPartsBase::read( BlockInputStreams StorageSystemPartsBase::read(

View File

@ -0,0 +1,19 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>shard_0</default_database>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<default_database>shard_0</default_database>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,60 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
def fill_nodes(nodes, shard):
for node in nodes:
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}')
ORDER BY id PARTITION BY toYYYYMM(date)
SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
fill_nodes([node1, node2], 1)
yield cluster
except Exception as ex:
print ex
finally:
cluster.shutdown()
def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster):
with PartitionManager() as pm:
# insert into all replicas
for i in range(50):
node1.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(i))
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))
# disable network on the first replica
pm.partition_instances(node1, node2)
pm.drop_instance_zk_connections(node1)
# drop all parts on the second replica
node2.query_with_retry("ALTER TABLE test_table DROP PARTITION 201908")
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", "0")
# insert into the second replica
# DROP_RANGE will be removed from the replication log and the first replica will be lost
for i in range(50):
node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(50 + i))
# the first replica will be cloned from the second
pm.heal_all()
assert_eq_with_retry(node1, "SELECT count(*) FROM test_table", node2.query("SELECT count(*) FROM test_table"))

View File

@ -1,14 +1,17 @@
import os.path as p import os.path as p
import random
import threading
import time import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
import json import json
import subprocess import subprocess
import kafka.errors import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from google.protobuf.internal.encoder import _VarintBytes from google.protobuf.internal.encoder import _VarintBytes
""" """
@ -69,6 +72,17 @@ def kafka_produce(topic, messages, timestamp=None):
print ("Produced {} messages for topic {}".format(len(messages), topic)) print ("Produced {} messages for topic {}".format(len(messages), topic))
def kafka_consume(topic):
consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest")
consumer.subscribe(topics=(topic))
for toppar, messages in consumer.poll(5000).items():
if toppar.topic == topic:
for message in messages:
yield message.value
consumer.unsubscribe()
consumer.close()
def kafka_produce_protobuf_messages(topic, start_index, num_messages): def kafka_produce_protobuf_messages(topic, start_index, num_messages):
data = '' data = ''
for i in range(start_index, start_index + num_messages): for i in range(start_index, start_index + num_messages):
@ -490,6 +504,105 @@ def test_kafka_virtual_columns_with_materialized_view(kafka_cluster):
kafka_check_result(result, True, 'test_kafka_virtual2.reference') kafka_check_result(result, True, 'test_kafka_virtual2.reference')
def test_kafka_insert(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert1',
kafka_group_name = 'insert1',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
values = []
for i in range(50):
values.append("({i}, {i})".format(i=i))
values = ','.join(values)
while True:
try:
instance.query("INSERT INTO test.kafka VALUES {}".format(values))
break
except QueryRuntimeException as e:
if 'Local: Timed out.' in str(e):
continue
else:
raise
messages = []
while True:
messages.extend(kafka_consume('insert1'))
if len(messages) == 50:
break
result = '\n'.join(messages)
kafka_check_result(result, True)
def test_kafka_produce_consume(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert2',
kafka_group_name = 'insert2',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
''')
messages_num = 10000
def insert():
values = []
for i in range(messages_num):
values.append("({i}, {i})".format(i=i))
values = ','.join(values)
while True:
try:
instance.query("INSERT INTO test.kafka VALUES {}".format(values))
break
except QueryRuntimeException as e:
if 'Local: Timed out.' in str(e):
continue
else:
raise
threads = []
threads_num = 16
for _ in range(threads_num):
threads.append(threading.Thread(target=insert))
for thread in threads:
time.sleep(random.uniform(0, 1))
thread.start()
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka;
''')
while True:
result = instance.query('SELECT count() FROM test.view')
time.sleep(1)
if int(result) == messages_num * threads_num:
break
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
for thread in threads:
thread.join()
assert int(result) == messages_num * threads_num, 'ClickHouse lost some messages: {}'.format(result)
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
raw_input("Cluster created, press any key to destroy...") raw_input("Cluster created, press any key to destroy...")

View File

@ -16,3 +16,5 @@
1 1
1 1
1 1
1
1

View File

@ -9,7 +9,6 @@ select 3 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 10
select 4 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1008) from funnel_test; select 4 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1008) from funnel_test;
select 1 = windowFunnel(1)(timestamp, event = 1000) from funnel_test; select 1 = windowFunnel(1)(timestamp, event = 1000) from funnel_test;
select 3 = windowFunnel(2)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test; select 3 = windowFunnel(2)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
select 4 = windowFunnel(3)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test; select 4 = windowFunnel(3)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test;
@ -39,6 +38,16 @@ select 1 = windowFunnel(10000)(timestamp, event = 1008, event = 1001) from funne
select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64; select 5 = windowFunnel(4)(timestamp, event = 1003, event = 1004, event = 1005, event = 1006, event = 1007) from funnel_test_u64;
select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test_u64; select 4 = windowFunnel(4)(timestamp, event <= 1007, event >= 1002, event <= 1006, event >= 1004) from funnel_test_u64;
drop table if exists funnel_test_strict;
create table funnel_test_strict (timestamp UInt32, event UInt32) engine=Memory;
insert into funnel_test_strict values (00,1000),(10,1001),(20,1002),(30,1003),(40,1004),(50,1005),(51,1005),(60,1006),(70,1007),(80,1008);
select 6 = windowFunnel(10000, 'strict')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
select 7 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004, event = 1005, event = 1006) from funnel_test_strict;
drop table funnel_test; drop table funnel_test;
drop table funnel_test2; drop table funnel_test2;
drop table funnel_test_u64; drop table funnel_test_u64;
drop table funnel_test_strict;

View File

@ -0,0 +1,15 @@
OK
OK
1
OK
0
1
4
4
2
4
OK
OK
OK
OK
OK

View File

@ -0,0 +1,100 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS distributed_00754;"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS mergetree_00754;"
${CLICKHOUSE_CLIENT} --query "
CREATE TABLE mergetree_00754 (a Int64, b Int64, c String) ENGINE = MergeTree ORDER BY (a, b);
"
${CLICKHOUSE_CLIENT} --query "
CREATE TABLE distributed_00754 AS mergetree_00754
ENGINE = Distributed(test_unavailable_shard, ${CLICKHOUSE_DATABASE}, mergetree_00754, jumpConsistentHash(a+b, 2));
"
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (0, 0, 'Hello');"
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (1, 0, 'World');"
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (0, 1, 'Hello');"
${CLICKHOUSE_CLIENT} --query "INSERT INTO mergetree_00754 VALUES (1, 1, 'World');"
# Should fail because the second shard is unavailable
${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM distributed_00754;" 2>&1 \
| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
# Should fail without setting `optimize_skip_unused_shards` = 1
${CLICKHOUSE_CLIENT} --query "SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0;" 2>&1 \
| fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
# Should pass now
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0;
"
# Should still fail because of matching unavailable shard
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 2 AND b = 2;
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
# Try more complex expressions for constant folding - all should pass.
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 1 AND a = 0 WHERE b = 0;
"
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 1 WHERE b = 1 AND length(c) = 5;
"
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a IN (0, 1) AND b IN (0, 1) WHERE c LIKE '%l%';
"
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a IN (0, 1) WHERE b IN (0, 1) AND c LIKE '%l%';
"
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR a = 1 AND b = 1 WHERE c LIKE '%l%';
"
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE (a = 0 OR a = 1) WHERE (b = 0 OR b = 1);
"
# These should fail.
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b <= 1;
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 WHERE c LIKE '%l%';
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 OR a = 1 AND b = 0;
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR a = 2 AND b = 2;
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'
${CLICKHOUSE_CLIENT} -n --query="
SET optimize_skip_unused_shards = 1;
SELECT count(*) FROM distributed_00754 PREWHERE a = 0 AND b = 0 OR c LIKE '%l%';
" 2>&1 \ | fgrep -q "All connection tries failed" && echo 'OK' || echo 'FAIL'

View File

@ -0,0 +1,6 @@
[249.25,499.5,749.75,899.9,949.9499999999999,989.99,998.999]
[249.75,499.5,749.25,899.1,949.05,989.01,998.001]
[250,500,750,900,950,990,999]
599.6
599.4
600

View File

@ -0,0 +1,12 @@
DROP TABLE IF EXISTS num;
CREATE TABLE num AS numbers(1000);
SELECT quantilesExactExclusive(0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999)(x) FROM (SELECT number AS x FROM num);
SELECT quantilesExactInclusive(0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999)(x) FROM (SELECT number AS x FROM num);
SELECT quantilesExact(0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.999)(x) FROM (SELECT number AS x FROM num);
SELECT quantileExactExclusive(0.6)(x) FROM (SELECT number AS x FROM num);
SELECT quantileExactInclusive(0.6)(x) FROM (SELECT number AS x FROM num);
SELECT quantileExact(0.6)(x) FROM (SELECT number AS x FROM num);
DROP TABLE num;

View File

@ -0,0 +1,7 @@
OK1
OK2
OK3
OK4
OK5
2019-08-11 world
2019-08-12 hello

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS bug";
$CLICKHOUSE_CLIENT --query="CREATE TABLE bug (d Date, s String) ENGINE = MergeTree(d, s, 8192)";
$CLICKHOUSE_CLIENT --query="INSERT INTO bug VALUES ('2019-08-09', 'hello'), ('2019-08-10', 'world'), ('2019-08-11', 'world'), ('2019-08-12', 'hello')";
#SET force_primary_key = 1;
$CLICKHOUSE_CLIENT --query="SELECT * FROM bug WHERE (s, d) IN (SELECT (s, max(d)) FROM bug GROUP BY s) ORDER BY d" 2>&1 | grep "Number of columns in section IN doesn't match" > /dev/null && echo "OK1";
$CLICKHOUSE_CLIENT --query="SELECT * FROM bug WHERE (s, d, s) IN (SELECT s, max(d) FROM bug GROUP BY s)" 2>&1 | grep "Number of columns in section IN doesn't match" > /dev/null && echo "OK2";
$CLICKHOUSE_CLIENT --query="SELECT * FROM bug WHERE (s, d) IN (SELECT s, max(d), s FROM bug GROUP BY s)" 2>&1 | grep "Number of columns in section IN doesn't match" > /dev/null && echo "OK3";
$CLICKHOUSE_CLIENT --query="SELECT * FROM bug WHERE (s, toDateTime(d)) IN (SELECT s, max(d) FROM bug GROUP BY s)" 2>&1 | grep "Types of column 2 in section IN don't match" > /dev/null && echo "OK4";
$CLICKHOUSE_CLIENT --query="SELECT * FROM bug WHERE (s, d) IN (SELECT s, toDateTime(max(d)) FROM bug GROUP BY s)" 2>&1 | grep "Types of column 2 in section IN don't match" > /dev/null && echo "OK5";
$CLICKHOUSE_CLIENT --query="SELECT * FROM bug WHERE (s, d) IN (SELECT s, max(d) FROM bug GROUP BY s) ORDER BY d";
$CLICKHOUSE_CLIENT --query="DROP TABLE bug";

View File

@ -1,9 +1,6 @@
# Formats for input and output data {#formats} # Formats for input and output data {#formats}
ClickHouse can accept and return data in various formats. A format supported ClickHouse can accept and return data in various formats. A format supported for input can be used to parse the data provided to `INSERT`s, to perform `SELECT`s from a file-backed table such as File, URL or HDFS, or to read an external dictionary. A format supported for output can be used to arrange the
for input can be used to parse the data provided to `INSERT`s, to perform
`SELECT`s from a file-backed table such as File, URL or HDFS, or to read an
external dictionary. A format supported for output can be used to arrange the
results of a `SELECT`, and to perform `INSERT`s into a file-backed table. results of a `SELECT`, and to perform `INSERT`s into a file-backed table.
The supported formats are: The supported formats are:
@ -388,7 +385,7 @@ Unlike the [JSON](#json) format, there is no substitution of invalid UTF-8 seque
### Usage of Nested Structures {#jsoneachrow-nested} ### Usage of Nested Structures {#jsoneachrow-nested}
If you have a table with the [Nested](../data_types/nested_data_structures/nested.md) data type columns, you can insert JSON data having the same structure. Enable this functionality with the [input_format_import_nested_json](../operations/settings/settings.md#settings-input_format_import_nested_json) setting. If you have a table with [Nested](../data_types/nested_data_structures/nested.md) data type columns, you can insert JSON data with the same structure. Enable this feature with the [input_format_import_nested_json](../operations/settings/settings.md#settings-input_format_import_nested_json) setting.
For example, consider the following table: For example, consider the following table:
@ -396,13 +393,13 @@ For example, consider the following table:
CREATE TABLE json_each_row_nested (n Nested (s String, i Int32) ) ENGINE = Memory CREATE TABLE json_each_row_nested (n Nested (s String, i Int32) ) ENGINE = Memory
``` ```
As you can find in the `Nested` data type description, ClickHouse treats each component of the nested structure as a separate column, `n.s` and `n.i` for our table. So you can insert the data the following way: As you can see in the `Nested` data type description, ClickHouse treats each component of the nested structure as a separate column (`n.s` and `n.i` for our table). You can insert data in the following way:
```sql ```sql
INSERT INTO json_each_row_nested FORMAT JSONEachRow {"n.s": ["abc", "def"], "n.i": [1, 23]} INSERT INTO json_each_row_nested FORMAT JSONEachRow {"n.s": ["abc", "def"], "n.i": [1, 23]}
``` ```
To insert data as hierarchical JSON object set [input_format_import_nested_json=1](../operations/settings/settings.md#settings-input_format_import_nested_json). To insert data as a hierarchical JSON object, set [input_format_import_nested_json=1](../operations/settings/settings.md#settings-input_format_import_nested_json).
```json ```json
{ {
@ -413,7 +410,7 @@ To insert data as hierarchical JSON object set [input_format_import_nested_json=
} }
``` ```
Without this setting ClickHouse throws the exception. Without this setting, ClickHouse throws an exception.
```sql ```sql
SELECT name, value FROM system.settings WHERE name = 'input_format_import_nested_json' SELECT name, value FROM system.settings WHERE name = 'input_format_import_nested_json'

View File

@ -555,11 +555,9 @@ If the table doesn't exist, ClickHouse will create it. If the structure of the q
``` ```
## remote_servers ## remote_servers {#server_settings_remote_servers}
Configuration of clusters used by the Distributed table engine. Configuration of clusters used by the [Distributed](../../operations/table_engines/distributed.md) table engine and by the `cluster` table function.
For more information, see the section "[Table engines/Distributed](../../operations/table_engines/distributed.md)".
**Example** **Example**
@ -569,6 +567,9 @@ For more information, see the section "[Table engines/Distributed](../../operati
For the value of the `incl` attribute, see the section "[Configuration files](../configuration_files.md#configuration_files)". For the value of the `incl` attribute, see the section "[Configuration files](../configuration_files.md#configuration_files)".
**See Also**
- [skip_unavailable_shards](../settings/settings.md#settings-skip_unavailable_shards)
## timezone ## timezone

View File

@ -238,7 +238,7 @@ Default value: 0.
## input_format_import_nested_json {#settings-input_format_import_nested_json} ## input_format_import_nested_json {#settings-input_format_import_nested_json}
Enables or disables inserting of JSON data with nested objects. Enables or disables the insertion of JSON data with nested objects.
Supported formats: Supported formats:
@ -275,7 +275,7 @@ Default value: 1.
## date_time_input_format {#settings-date_time_input_format} ## date_time_input_format {#settings-date_time_input_format}
Enables or disables extended parsing of date and time formatted strings. Allows to choose a parser of text representation of date and time.
The setting doesn't apply to [date and time functions](../../query_language/functions/date_time_functions.md). The setting doesn't apply to [date and time functions](../../query_language/functions/date_time_functions.md).
@ -283,11 +283,13 @@ Possible values:
- `'best_effort'` — Enables extended parsing. - `'best_effort'` — Enables extended parsing.
ClickHouse can parse the basic format `YYYY-MM-DD HH:MM:SS` and all the [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) date and time formats. For example, `'2018-06-08T01:02:03.000Z'`. ClickHouse can parse the basic `YYYY-MM-DD HH:MM:SS` format and all [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) date and time formats. For example, `'2018-06-08T01:02:03.000Z'`.
- `'basic'` — Use basic parser. - `'basic'` — Use basic parser.
ClickHouse can parse only the basic format. ClickHouse can parse only the basic `YYYY-MM-DD HH:MM:SS` format. For example, `'2019-08-20 10:18:56'`.
Default value: `'basic'`.
**See Also** **See Also**
@ -834,3 +836,29 @@ Possible values:
Default value: `uniqExact`. Default value: `uniqExact`.
[Original article](https://clickhouse.yandex/docs/en/operations/settings/settings/) <!--hide--> [Original article](https://clickhouse.yandex/docs/en/operations/settings/settings/) <!--hide-->
## skip_unavailable_shards {#settings-skip_unavailable_shards}
Enables or disables silent skipping of:
- Node, if its name cannot be resolved through DNS.
When skipping is disabled, ClickHouse requires that all the nodes in the [cluster configuration](../server_settings/settings.md#server_settings_remote_servers) can be resolvable through DNS. Otherwise, ClickHouse throws an exception when trying to perform a query on the cluster.
If skipping is enabled, ClickHouse considers unresolved nodes as unavailable and tries to resolve them at every connection attempt. Such behavior creates the risk of wrong cluster configuration because a user can specify the wrong node name, and ClickHouse doesn't report about it. However, this can be useful in systems with dynamic DNS, for example, [Kubernetes](https://kubernetes.io), where nodes can be unresolvable during downtime, and this is not an error.
- Shard, if there are no available replicas of the shard.
When skipping is disabled, ClickHouse throws an exception.
When skipping is enabled, ClickHouse returns a partial answer and doesn't report about issues with nodes availability.
Possible values:
- 1 — skipping enabled.
- 0 — skipping disabled.
Default value: 0.
[Original article](https://clickhouse.yandex/docs/en/operations/settings/settings/) <!-- hide -->

View File

@ -2,17 +2,26 @@
Buffers the data to write in RAM, periodically flushing it to another table. During the read operation, data is read from the buffer and the other table simultaneously. Buffers the data to write in RAM, periodically flushing it to another table. During the read operation, data is read from the buffer and the other table simultaneously.
``` ```sql
Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
``` ```
Engine parameters:database, table The table to flush data to. Instead of the database name, you can use a constant expression that returns a string. num_layers Parallelism layer. Physically, the table will be represented as 'num_layers' of independent buffers. Recommended value: 16. min_time, max_time, min_rows, max_rows, min_bytes, and max_bytes are conditions for flushing data from the buffer. Engine parameters:
Data is flushed from the buffer and written to the destination table if all the 'min' conditions or at least one 'max' condition are met.min_time, max_time Condition for the time in seconds from the moment of the first write to the buffer. min_rows, max_rows Condition for the number of rows in the buffer. min_bytes, max_bytes Condition for the number of bytes in the buffer. - `database` Database name. Instead of the database name, you can use a constant expression that returns a string.
- `table` Table to flush data to.
- `num_layers` Parallelism layer. Physically, the table will be represented as 'num_layers' of independent buffers. Recommended value: 16.
- `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` Conditions for flushing data from the buffer.
During the write operation, data is inserted to a 'num_layers' number of random buffers. Or, if the data part to insert is large enough (greater than 'max_rows' or 'max_bytes'), it is written directly to the destination table, omitting the buffer. Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met.
The conditions for flushing the data are calculated separately for each of the 'num_layers' buffers. For example, if num_layers = 16 and max_bytes = 100000000, the maximum RAM consumption is 1.6 GB. - `min_time`, `max_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes` Condition for the number of bytes in the buffer.
During the write operation, data is inserted to a `num_layers` number of random buffers. Or, if the data part to insert is large enough (greater than `max_rows` or `max_bytes`), it is written directly to the destination table, omitting the buffer.
The conditions for flushing the data are calculated separately for each of the `num_layers` buffers. For example, if `num_layers = 16` and `max_bytes = 100000000`, the maximum RAM consumption is 1.6 GB.
Example: Example:

View File

@ -172,6 +172,7 @@ The number of columns in the primary key is not explicitly limited. Depending on
- Improve the performance of an index. - Improve the performance of an index.
If the primary key is `(a, b)`, then adding another column `c` will improve the performance if the following conditions are met: If the primary key is `(a, b)`, then adding another column `c` will improve the performance if the following conditions are met:
- There are queries with a condition on column `c`. - There are queries with a condition on column `c`.
- Long data ranges (several times longer than the `index_granularity`) with identical values for `(a, b)` are common. In other words, when adding another column allows you to skip quite long data ranges. - Long data ranges (several times longer than the `index_granularity`) with identical values for `(a, b)` are common. In other words, when adding another column allows you to skip quite long data ranges.

View File

@ -681,11 +681,11 @@ groupArrayMovingSum(numbers_for_summing)
groupArrayMovingSum(window_size)(numbers_for_summing) groupArrayMovingSum(window_size)(numbers_for_summing)
``` ```
The function can take the window size as a parameter. If it not specified, the function takes the window size equal to the number of rows in the column. The function can take the window size as a parameter. If left unspecified, the function takes the window size equal to the number of rows in the column.
**Parameters** **Parameters**
- `numbers_for_summing` — [Expression](../syntax.md#syntax-expressions) resulting with a value in a numeric data type. - `numbers_for_summing` — [Expression](../syntax.md#syntax-expressions) resulting in a numeric data type value.
- `window_size` — Size of the calculation window. - `window_size` — Size of the calculation window.
**Returned values** **Returned values**
@ -750,11 +750,11 @@ groupArrayMovingAvg(numbers_for_summing)
groupArrayMovingAvg(window_size)(numbers_for_summing) groupArrayMovingAvg(window_size)(numbers_for_summing)
``` ```
The function can take the window size as a parameter. If it not specified, the function takes the window size equal to the number of rows in the column. The function can take the window size as a parameter. If left unspecified, the function takes the window size equal to the number of rows in the column.
**Parameters** **Parameters**
- `numbers_for_summing` — [Expression](../syntax.md#syntax-expressions) resulting with a value in a numeric data type. - `numbers_for_summing` — [Expression](../syntax.md#syntax-expressions) resulting in a numeric data type value.
- `window_size` — Size of the calculation window. - `window_size` — Size of the calculation window.
**Returned values** **Returned values**

View File

@ -377,9 +377,68 @@ CREATE TABLE IF NOT EXISTS example_table
В отличие от формата [JSON](#json), для `JSONEachRow` ClickHouse не заменяет невалидные UTF-8 последовательности. Значения экранируются так же, как и для формата `JSON`. В отличие от формата [JSON](#json), для `JSONEachRow` ClickHouse не заменяет невалидные UTF-8 последовательности. Значения экранируются так же, как и для формата `JSON`.
!!! Примечание " Примечание" !!! note "Примечание"
В строках может выводиться произвольный набор байт. Используйте формат `JSONEachRow`, если вы уверены, что данные в таблице могут быть представлены в формате JSON без потери информации. В строках может выводиться произвольный набор байт. Используйте формат `JSONEachRow`, если вы уверены, что данные в таблице могут быть представлены в формате JSON без потери информации.
### Использование вложенных структур {#jsoneachrow-nested}
Если у вас есть таблица со столбцами типа [Nested](../data_types/nested_data_structures/nested.md), то в неё можно вставить данные из JSON-документа с такой же структурой. Функциональность включается настройкой [input_format_import_nested_json](../operations/settings/settings.md#settings-input_format_import_nested_json).
Например, рассмотрим следующую таблицу:
```sql
CREATE TABLE json_each_row_nested (n Nested (s String, i Int32) ) ENGINE = Memory
```
Из описания типа данных `Nested` видно, что ClickHouse трактует каждый компонент вложенной структуры как отдельный столбец (для нашей таблицы `n.s` и `n.i`). Можно вставить данные следующим образом:
```sql
INSERT INTO json_each_row_nested FORMAT JSONEachRow {"n.s": ["abc", "def"], "n.i": [1, 23]}
```
Чтобы вставить данные как иерархический объект JSON, установите [input_format_import_nested_json=1](../operations/settings/settings.md#settings-input_format_import_nested_json).
```json
{
"n": {
"s": ["abc", "def"],
"i": [1, 23]
}
}
```
Без этой настройки ClickHouse сгенерирует исключение.
```sql
SELECT name, value FROM system.settings WHERE name = 'input_format_import_nested_json'
```
```text
┌─name────────────────────────────┬─value─┐
│ input_format_import_nested_json │ 0 │
└─────────────────────────────────┴───────┘
```
```sql
INSERT INTO json_each_row_nested FORMAT JSONEachRow {"n": {"s": ["abc", "def"], "i": [1, 23]}}
```
```text
Code: 117. DB::Exception: Unknown field found while parsing JSONEachRow format: n: (at row 1)
```
```sql
SET input_format_import_nested_json=1
INSERT INTO json_each_row_nested FORMAT JSONEachRow {"n": {"s": ["abc", "def"], "i": [1, 23]}}
SELECT * FROM json_each_row_nested
```
```text
┌─n.s───────────┬─n.i────┐
│ ['abc','def'] │ [1,23] │
└───────────────┴────────┘
```
## Native {#native} ## Native {#native}
Самый эффективный формат. Данные пишутся и читаются блоками в бинарном виде. Для каждого блока пишется количество строк, количество столбцов, имена и типы столбцов, а затем кусочки столбцов этого блока, один за другим. То есть, этот формат является "столбцовым" - не преобразует столбцы в строки. Именно этот формат используется в родном интерфейсе - при межсерверном взаимодействии, при использовании клиента командной строки, при работе клиентов, написанных на C++. Самый эффективный формат. Данные пишутся и читаются блоками в бинарном виде. Для каждого блока пишется количество строк, количество столбцов, имена и типы столбцов, а затем кусочки столбцов этого блока, один за другим. То есть, этот формат является "столбцовым" - не преобразует столбцы в строки. Именно этот формат используется в родном интерфейсе - при межсерверном взаимодействии, при использовании клиента командной строки, при работе клиентов, написанных на C++.
@ -399,7 +458,7 @@ CREATE TABLE IF NOT EXISTS example_table
[NULL](../query_language/syntax.md) выводится как `ᴺᵁᴸᴸ`. [NULL](../query_language/syntax.md) выводится как `ᴺᵁᴸᴸ`.
``` sql ```sql
SELECT * FROM t_null SELECT * FROM t_null
``` ```
``` ```
@ -410,7 +469,7 @@ SELECT * FROM t_null
В форматах `Pretty*` строки выводятся без экранирования. Ниже приведен пример для формата [PrettyCompact](#prettycompact): В форматах `Pretty*` строки выводятся без экранирования. Ниже приведен пример для формата [PrettyCompact](#prettycompact):
``` sql ```sql
SELECT 'String with \'quotes\' and \t character' AS Escaping_test SELECT 'String with \'quotes\' and \t character' AS Escaping_test
``` ```
@ -425,7 +484,7 @@ SELECT 'String with \'quotes\' and \t character' AS Escaping_test
Формат `Pretty` поддерживает вывод тотальных значений (при использовании WITH TOTALS) и экстремальных значений (при настройке extremes выставленной в 1). В этих случаях, после основных данных выводятся тотальные значения, и экстремальные значения, в отдельных табличках. Пример (показан для формата [PrettyCompact](#prettycompact)): Формат `Pretty` поддерживает вывод тотальных значений (при использовании WITH TOTALS) и экстремальных значений (при настройке extremes выставленной в 1). В этих случаях, после основных данных выводятся тотальные значения, и экстремальные значения, в отдельных табличках. Пример (показан для формата [PrettyCompact](#prettycompact)):
``` sql ```sql
SELECT EventDate, count() AS c FROM test.hits GROUP BY EventDate WITH TOTALS ORDER BY EventDate FORMAT PrettyCompact SELECT EventDate, count() AS c FROM test.hits GROUP BY EventDate WITH TOTALS ORDER BY EventDate FORMAT PrettyCompact
``` ```
@ -516,7 +575,7 @@ Array представлены как длина в формате varint (unsig
Пример: Пример:
``` sql ```sql
SELECT * FROM t_null FORMAT Vertical SELECT * FROM t_null FORMAT Vertical
``` ```
``` ```
@ -528,7 +587,7 @@ y: ᴺᵁᴸᴸ
В формате `Vertical` строки выводятся без экранирования. Например: В формате `Vertical` строки выводятся без экранирования. Например:
``` sql ```sql
SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT Vertical SELECT 'string with \'quotes\' and \t with some special \n characters' AS test FORMAT Vertical
``` ```

View File

@ -233,7 +233,26 @@ Ok.
- 0 — выключена. - 0 — выключена.
- 1 — включена. - 1 — включена.
Значение по умолчанию: 0. Значение по умолчанию — 0.
## input_format_import_nested_json {#settings-input_format_import_nested_json}
Включает или отключает вставку данных JSON с вложенными объектами.
Поддерживаемые форматы:
- [JSONEachRow](../../interfaces/formats.md#jsoneachrow)
Возможные значения:
- 0 — выключена.
- 1 — включена.
Значение по умолчанию — 0.
**Смотрите также**
- [Использование вложенных структур](../../interfaces/formats.md#jsoneachrow-nested) with the `JSONEachRow` format.
## input_format_with_names_use_header {#settings-input_format_with_names_use_header} ## input_format_with_names_use_header {#settings-input_format_with_names_use_header}
@ -253,6 +272,29 @@ Ok.
Значение по умолчанию: 1. Значение по умолчанию: 1.
## date_time_input_format {#settings-date_time_input_format}
Выбор парсера для текстового представления дат и времени при обработке входного формата.
Настройка не применяется к [функциям для работы с датой и временем](../../query_language/functions/date_time_functions.md).
Возможные значения:
- `'best_effort'` — включает расширенный парсинг.
ClickHouse может парсить базовый формат `YYYY-MM-DD HH:MM:SS` и все форматы [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601). Например, `'2018-06-08T01:02:03.000Z'`.
- `'basic'` — используется базовый парсер.
ClickHouse может парсить только базовый формат `YYYY-MM-DD HH:MM:SS`. Например, `'2019-08-20 10:18:56'`.
Значение по умолчанию — `'basic'`.
**Смотрите также**
- [Тип данных DateTime.](../../data_types/datetime.md)
- [Функции для работы с датой и временем.](../../query_language/functions/date_time_functions.md)
## join_default_strictness {#settings-join_default_strictness} ## join_default_strictness {#settings-join_default_strictness}
Устанавливает строгость по умолчанию для [JOIN](../../query_language/select.md#select-join). Устанавливает строгость по умолчанию для [JOIN](../../query_language/select.md#select-join).

View File

@ -87,7 +87,7 @@ anyHeavy(column)
**Аргументы** **Аргументы**
- `column` Имя столбца. - `column` — имя столбца.
**Пример** **Пример**
@ -681,6 +681,154 @@ uniqExact(x[, ...])
- Значение по умолчанию для подстановки на пустые позиции. - Значение по умолчанию для подстановки на пустые позиции.
- Длина результирующего массива. Например, если вы хотите получать массивы одинакового размера для всех агрегатных ключей. При использовании этого параметра значение по умолчанию задавать обязательно. - Длина результирующего массива. Например, если вы хотите получать массивы одинакового размера для всех агрегатных ключей. При использовании этого параметра значение по умолчанию задавать обязательно.
## groupArrayMovingSum {#agg_function-grouparraymovingsum}
Вычисляет скользящую сумму входных значений.
```
groupArrayMovingSum(numbers_for_summing)
groupArrayMovingSum(window_size)(numbers_for_summing)
```
Функция может принимать размер окна в качестве параметра. Если окно не указано, то функция использует размер окна, равный количеству строк в столбце.
**Параметры**
- `numbers_for_summing` — [выражение](../syntax.md#syntax-expressions), возвращающее значение числового типа.
- `window_size` — размер окна.
**Возвращаемые значения**
- Массив того же размера и типа, что и входные данные.
**Пример**
Таблица с исходными данными:
```sql
CREATE TABLE t
(
`int` UInt8,
`float` Float32,
`dec` Decimal32(2)
)
ENGINE = TinyLog
```
```text
┌─int─┬─float─┬──dec─┐
│ 1 │ 1.1 │ 1.10 │
│ 2 │ 2.2 │ 2.20 │
│ 4 │ 4.4 │ 4.40 │
│ 7 │ 7.77 │ 7.77 │
└─────┴───────┴──────┘
```
Запросы:
```sql
SELECT
groupArrayMovingSum(int) AS I,
groupArrayMovingSum(float) AS F,
groupArrayMovingSum(dec) AS D
FROM t
```
```text
┌─I──────────┬─F───────────────────────────────┬─D──────────────────────┐
│ [1,3,7,14] │ [1.1,3.3000002,7.7000003,15.47] │ [1.10,3.30,7.70,15.47] │
└────────────┴─────────────────────────────────┴────────────────────────┘
```
```sql
SELECT
groupArrayMovingSum(2)(int) AS I,
groupArrayMovingSum(2)(float) AS F,
groupArrayMovingSum(2)(dec) AS D
FROM t
```
```text
┌─I──────────┬─F───────────────────────────────┬─D──────────────────────┐
│ [1,3,6,11] │ [1.1,3.3000002,6.6000004,12.17] │ [1.10,3.30,6.60,12.17] │
└────────────┴─────────────────────────────────┴────────────────────────┘
```
## groupArrayMovingAvg {#agg_function-grouparraymovingavg}
Вычисляет скользящее среднее для входных значений.
```
groupArrayMovingAvg(numbers_for_summing)
groupArrayMovingAvg(window_size)(numbers_for_summing)
```
Функция может принимать размер окна в качестве параметра. Если окно не указано, то функция использует размер окна, равный количеству строк в столбце.
**Параметры**
- `numbers_for_summing` — [выражение](../syntax.md#syntax-expressions), возвращающее значение числового типа.
- `window_size` — размер окна.
**Возвращаемые значения**
- Массив того же размера и типа, что и входные данные.
Функция использует [округление к меньшему по модулю](https://ru.wikipedia.org/wiki/Округление#Методы). Оно усекает десятичные разряды, незначимые для результирующего типа данных.
**Пример**
Таблица с исходными данными:
```sql
CREATE TABLE t
(
`int` UInt8,
`float` Float32,
`dec` Decimal32(2)
)
ENGINE = TinyLog
```
```text
┌─int─┬─float─┬──dec─┐
│ 1 │ 1.1 │ 1.10 │
│ 2 │ 2.2 │ 2.20 │
│ 4 │ 4.4 │ 4.40 │
│ 7 │ 7.77 │ 7.77 │
└─────┴───────┴──────┘
```
Запросы:
```sql
SELECT
groupArrayMovingAvg(int) AS I,
groupArrayMovingAvg(float) AS F,
groupArrayMovingAvg(dec) AS D
FROM t
```
```text
┌─I─────────┬─F───────────────────────────────────┬─D─────────────────────┐
│ [0,0,1,3] │ [0.275,0.82500005,1.9250001,3.8675] │ [0.27,0.82,1.92,3.86] │
└───────────┴─────────────────────────────────────┴───────────────────────┘
```
```sql
SELECT
groupArrayMovingAvg(2)(int) AS I,
groupArrayMovingAvg(2)(float) AS F,
groupArrayMovingAvg(2)(dec) AS D
FROM t
```
```text
┌─I─────────┬─F────────────────────────────────┬─D─────────────────────┐
│ [0,1,3,5] │ [0.55,1.6500001,3.3000002,6.085] │ [0.55,1.65,3.30,6.08] │
└───────────┴──────────────────────────────────┴───────────────────────┘
```
## groupUniqArray(x), groupUniqArray(max_size)(x) ## groupUniqArray(x), groupUniqArray(max_size)(x)
Составляет массив из различных значений аргумента. Расход оперативной памяти такой же, как у функции `uniqExact`. Составляет массив из различных значений аргумента. Расход оперативной памяти такой же, как у функции `uniqExact`.

View File

@ -12,7 +12,7 @@ namespace ext
* *
* Downsides: * Downsides:
* - your class cannot be final; * - your class cannot be final;
* - bad compilation error messages; * - awful compilation error messages;
* - bad code navigation. * - bad code navigation.
* - different dynamic type of created object, you cannot use typeid. * - different dynamic type of created object, you cannot use typeid.
*/ */

View File

@ -26,6 +26,7 @@ def main():
'bootstrap_servers': f'{args.server}:{args.port}', 'bootstrap_servers': f'{args.server}:{args.port}',
'client_id': args.client, 'client_id': args.client,
'group_id': args.group, 'group_id': args.group,
'auto_offset_reset': 'earliest',
} }
client = kafka.KafkaConsumer(**config) client = kafka.KafkaConsumer(**config)