mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge branch 'master' of github.com:yandex/ClickHouse
This commit is contained in:
commit
45f5f4011a
@ -263,6 +263,11 @@ protected:
|
||||
*/
|
||||
bool checkTimeLimit();
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool read_prefix_is_called = false;
|
||||
bool read_suffix_is_called = false;
|
||||
#endif
|
||||
|
||||
private:
|
||||
bool enabled_extremes = false;
|
||||
|
||||
@ -315,10 +320,6 @@ private:
|
||||
return;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
bool read_prefix_is_called = false;
|
||||
bool read_suffix_is_called = false;
|
||||
#endif
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -57,12 +57,19 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
|
||||
}
|
||||
}
|
||||
|
||||
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
|
||||
void NativeBlockInputStream::resetParser()
|
||||
{
|
||||
istr_concrete = nullptr;
|
||||
use_index = false;
|
||||
header.clear();
|
||||
avg_value_size_hints.clear();
|
||||
|
||||
#ifndef NDEBUG
|
||||
read_prefix_is_called = false;
|
||||
read_suffix_is_called = false;
|
||||
#endif
|
||||
|
||||
is_cancelled.store(false);
|
||||
is_killed.store(false);
|
||||
}
|
||||
|
||||
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
|
||||
|
@ -20,6 +20,7 @@ void registerFunctionsJSON(FunctionFactory & factory)
|
||||
factory.registerFunction<FunctionJSON<NameJSONExtract, JSONExtractImpl>>();
|
||||
factory.registerFunction<FunctionJSON<NameJSONExtractKeysAndValues, JSONExtractKeysAndValuesImpl>>();
|
||||
factory.registerFunction<FunctionJSON<NameJSONExtractRaw, JSONExtractRawImpl>>();
|
||||
factory.registerFunction<FunctionJSON<NameJSONExtractArrayRaw, JSONExtractArrayRawImpl>>();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -291,6 +291,7 @@ struct NameJSONExtractString { static constexpr auto name{"JSONExtractString"};
|
||||
struct NameJSONExtract { static constexpr auto name{"JSONExtract"}; };
|
||||
struct NameJSONExtractKeysAndValues { static constexpr auto name{"JSONExtractKeysAndValues"}; };
|
||||
struct NameJSONExtractRaw { static constexpr auto name{"JSONExtractRaw"}; };
|
||||
struct NameJSONExtractArrayRaw { static constexpr auto name{"JSONExtractArrayRaw"}; };
|
||||
|
||||
|
||||
template <typename JSONParser>
|
||||
@ -1088,4 +1089,39 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
template <typename JSONParser>
|
||||
class JSONExtractArrayRawImpl
|
||||
{
|
||||
public:
|
||||
static DataTypePtr getType(const char *, const ColumnsWithTypeAndName &)
|
||||
{
|
||||
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
|
||||
}
|
||||
|
||||
using Iterator = typename JSONParser::Iterator;
|
||||
static bool addValueToColumn(IColumn & dest, const Iterator & it)
|
||||
{
|
||||
if (!JSONParser::isArray(it))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
ColumnArray & col_res = assert_cast<ColumnArray &>(dest);
|
||||
Iterator array_it = it;
|
||||
size_t size = 0;
|
||||
if (JSONParser::firstArrayElement(array_it))
|
||||
{
|
||||
do
|
||||
{
|
||||
JSONExtractRawImpl<JSONParser>::addValueToColumn(col_res.getData(), array_it);
|
||||
++size;
|
||||
} while (JSONParser::nextArrayElement(array_it));
|
||||
}
|
||||
|
||||
col_res.getOffsets().push_back(col_res.getOffsets().back() + size);
|
||||
return true;
|
||||
}
|
||||
|
||||
static constexpr size_t num_extra_arguments = 0;
|
||||
static void prepare(const char *, const Block &, const ColumnNumbers &, size_t) {}
|
||||
};
|
||||
}
|
||||
|
@ -2,9 +2,7 @@
|
||||
#include <Parsers/ASTSystemQuery.h>
|
||||
#include <Parsers/CommonParsers.h>
|
||||
#include <Parsers/parseIdentifierOrStringLiteral.h>
|
||||
#include <Parsers/ExpressionElementParsers.h>
|
||||
#include <Parsers/parseDatabaseAndTableName.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -19,7 +17,7 @@ namespace DB
|
||||
|
||||
bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
if (!ParserKeyword{"SYSTEM"}.ignore(pos))
|
||||
if (!ParserKeyword{"SYSTEM"}.ignore(pos, expected))
|
||||
return false;
|
||||
|
||||
using Type = ASTSystemQuery::Type;
|
||||
@ -30,7 +28,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
|
||||
for (int i = static_cast<int>(Type::UNKNOWN) + 1; i < static_cast<int>(Type::END); ++i)
|
||||
{
|
||||
Type t = static_cast<Type>(i);
|
||||
if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos))
|
||||
if (ParserKeyword{ASTSystemQuery::typeToString(t)}.ignore(pos, expected))
|
||||
{
|
||||
res->type = t;
|
||||
found = true;
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
KafkaBlockInputStream::KafkaBlockInputStream(
|
||||
@ -66,20 +65,8 @@ Block KafkaBlockInputStream::readImpl()
|
||||
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
|
||||
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
|
||||
|
||||
auto read_callback = [&]
|
||||
{
|
||||
virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
|
||||
virtual_columns[1]->insert(buffer->currentKey()); // "key"
|
||||
virtual_columns[2]->insert(buffer->currentOffset()); // "offset"
|
||||
virtual_columns[3]->insert(buffer->currentPartition()); // "partition"
|
||||
|
||||
auto timestamp = buffer->currentTimestamp();
|
||||
if (timestamp)
|
||||
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
|
||||
};
|
||||
|
||||
auto input_format = FormatFactory::instance().getInputFormat(
|
||||
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
|
||||
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
|
||||
|
||||
InputPort port(input_format->getPort().getHeader(), input_format.get());
|
||||
connect(input_format->getPort(), port);
|
||||
@ -106,13 +93,17 @@ Block KafkaBlockInputStream::readImpl()
|
||||
case IProcessor::Status::PortFull:
|
||||
{
|
||||
auto chunk = port.pull();
|
||||
new_rows = new_rows + chunk.getNumRows();
|
||||
|
||||
/// FIXME: materialize MATERIALIZED columns here.
|
||||
// that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
|
||||
// if will be backported should go together with #8005
|
||||
auto chunk_rows = chunk.getNumRows();
|
||||
new_rows += chunk_rows;
|
||||
|
||||
auto columns = chunk.detachColumns();
|
||||
for (size_t i = 0, s = columns.size(); i < s; ++i)
|
||||
{
|
||||
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::NeedData:
|
||||
@ -125,18 +116,55 @@ Block KafkaBlockInputStream::readImpl()
|
||||
};
|
||||
|
||||
size_t total_rows = 0;
|
||||
while (total_rows < max_block_size)
|
||||
|
||||
while (true)
|
||||
{
|
||||
// some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
|
||||
// throw an exception from readPrefix when buffer in empty
|
||||
if (buffer->eof())
|
||||
break;
|
||||
|
||||
auto new_rows = read_kafka_message();
|
||||
|
||||
auto _topic = buffer->currentTopic();
|
||||
auto _key = buffer->currentKey();
|
||||
auto _offset = buffer->currentOffset();
|
||||
auto _partition = buffer->currentPartition();
|
||||
auto _timestamp_raw = buffer->currentTimestamp();
|
||||
auto _timestamp = _timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(_timestamp_raw->get_timestamp()).count()
|
||||
: 0;
|
||||
for (size_t i = 0; i < new_rows; ++i)
|
||||
{
|
||||
virtual_columns[0]->insert(_topic);
|
||||
virtual_columns[1]->insert(_key);
|
||||
virtual_columns[2]->insert(_offset);
|
||||
virtual_columns[3]->insert(_partition);
|
||||
if (_timestamp_raw)
|
||||
{
|
||||
virtual_columns[4]->insert(_timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
virtual_columns[4]->insertDefault();
|
||||
}
|
||||
}
|
||||
|
||||
total_rows = total_rows + new_rows;
|
||||
buffer->allowNext();
|
||||
if (!new_rows || !checkTimeLimit())
|
||||
if (!new_rows || total_rows >= max_block_size || !checkTimeLimit())
|
||||
break;
|
||||
}
|
||||
|
||||
if (total_rows == 0)
|
||||
return Block();
|
||||
|
||||
/// MATERIALIZED columns can be added here, but I think
|
||||
// they are not needed here:
|
||||
// and it's misleading to use them here,
|
||||
// as columns 'materialized' that way stays 'ephemeral'
|
||||
// i.e. will not be stored anythere
|
||||
// If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
|
||||
|
||||
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
|
||||
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
|
||||
|
||||
|
@ -166,3 +166,12 @@ d
|
||||
e
|
||||
u
|
||||
v
|
||||
--JSONExtractArrayRaw--
|
||||
[]
|
||||
[]
|
||||
[]
|
||||
['[]','[]']
|
||||
['-100','200','300']
|
||||
['1','2','3','4','5','"hello"']
|
||||
['1','2','3']
|
||||
['4','5','6']
|
||||
|
@ -182,3 +182,12 @@ SELECT JSONExtractRaw('{"abc":"\\u263a"}', 'abc');
|
||||
SELECT '--const/non-const mixed--';
|
||||
SELECT JSONExtractString('["a", "b", "c", "d", "e"]', idx) FROM (SELECT arrayJoin([1,2,3,4,5]) AS idx);
|
||||
SELECT JSONExtractString(json, 's') FROM (SELECT arrayJoin(['{"s":"u"}', '{"s":"v"}']) AS json);
|
||||
|
||||
SELECT '--JSONExtractArrayRaw--';
|
||||
SELECT JSONExtractArrayRaw('');
|
||||
SELECT JSONExtractArrayRaw('{"a": "hello", "b": "not_array"}');
|
||||
SELECT JSONExtractArrayRaw('[]');
|
||||
SELECT JSONExtractArrayRaw('[[],[]]');
|
||||
SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b');
|
||||
SELECT JSONExtractArrayRaw('[1,2,3,4,5,"hello"]');
|
||||
SELECT JSONExtractArrayRaw(arrayJoin(JSONExtractArrayRaw('[[1,2,3],[4,5,6]]')));
|
||||
|
@ -206,4 +206,16 @@ Example:
|
||||
SELECT JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b') = '[-100, 200.0, 300]'
|
||||
```
|
||||
|
||||
## JSONExtractArrayRaw(json[, indices_or_keys]...)
|
||||
|
||||
Returns an array with elements of JSON array, each represented as unparsed string.
|
||||
|
||||
If the part does not exist or isn't array, an empty array will be returned.
|
||||
|
||||
Example:
|
||||
|
||||
```sql
|
||||
SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, "hello"]}', 'b') = ['-100', '200.0', '"hello"']'
|
||||
```
|
||||
|
||||
[Original article](https://clickhouse.yandex/docs/en/query_language/functions/json_functions/) <!--hide-->
|
||||
|
@ -199,9 +199,9 @@ SELECT JSONExtractKeysAndValues('{"x": {"a": 5, "b": 7, "c": 11}}', 'x', 'Int8')
|
||||
|
||||
## JSONExtractRaw(json[, indices_or_keys]...)
|
||||
|
||||
Возвращает часть JSON.
|
||||
Возвращает часть JSON в виде строки, содержащей неразобранную подстроку.
|
||||
|
||||
Если значение не существует или имеет неверный тип, то возвращается пустая строка.
|
||||
Если значение не существует, то возвращается пустая строка.
|
||||
|
||||
Пример:
|
||||
|
||||
@ -209,4 +209,16 @@ SELECT JSONExtractKeysAndValues('{"x": {"a": 5, "b": 7, "c": 11}}', 'x', 'Int8')
|
||||
SELECT JSONExtractRaw('{"a": "hello", "b": [-100, 200.0, 300]}', 'b') = '[-100, 200.0, 300]'
|
||||
```
|
||||
|
||||
## JSONExtractArrayRaw(json[, indices_or_keys]...)
|
||||
|
||||
Возвращает массив из элементов JSON массива, каждый из которых представлен в виде строки с неразобранными подстроками из JSON.
|
||||
|
||||
Если значение не существует или не является массивом, то возвращается пустой массив.
|
||||
|
||||
Пример:
|
||||
|
||||
```sql
|
||||
SELECT JSONExtractArrayRaw('{"a": "hello", "b": [-100, 200.0, "hello"]}', 'b') = ['-100', '200.0', '"hello"']'
|
||||
```
|
||||
|
||||
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/functions/json_functions/) <!--hide-->
|
||||
|
Loading…
Reference in New Issue
Block a user