Added support for format TSKV for input [#METR-20081].

This commit is contained in:
Alexey Milovidov 2016-02-18 06:13:52 +03:00
parent 7af87e367f
commit 2f35f6a350
7 changed files with 225 additions and 3 deletions

View File

@ -212,6 +212,7 @@ add_library (dbms
include/DB/DataStreams/JSONRowOutputStream.h
include/DB/DataStreams/XMLRowOutputStream.h
include/DB/DataStreams/TSKVRowOutputStream.h
include/DB/DataStreams/TSKVRowInputStream.h
include/DB/DataStreams/ODBCDriverBlockOutputStream.h
include/DB/DataStreams/MergeSortingBlockInputStream.h
include/DB/DataStreams/ExpressionBlockInputStream.h
@ -680,6 +681,7 @@ add_library (dbms
src/DataStreams/JSONRowOutputStream.cpp
src/DataStreams/XMLRowOutputStream.cpp
src/DataStreams/TSKVRowOutputStream.cpp
src/DataStreams/TSKVRowInputStream.cpp
src/DataStreams/ODBCDriverBlockOutputStream.cpp
src/DataStreams/JSONCompactRowOutputStream.cpp
src/DataStreams/PrettyCompactMonoBlockOutputStream.cpp

View File

@ -0,0 +1,41 @@
#pragma once
#include <DB/Core/Block.h>
#include <DB/DataStreams/IRowInputStream.h>
#include <DB/Common/HashTable/HashMap.h>
namespace DB
{
class ReadBuffer;
/** Поток для чтения данных в формате TSKV.
* TSKV - очень неэффективный формат данных.
* Похож на TSV, но каждое поле записано в виде key=value.
* Поля могут быть перечислены в произвольном порядке (в том числе, в разных строках может быть разный порядок),
* и часть полей может отсутствовать.
* В имени поля может быть заэскейплен знак равенства.
* Также, в качестве дополнительного элемента может присутствовать бесполезный фрагмент tskv - его нужно игнорировать.
*/
class TSKVRowInputStream : public IRowInputStream
{
public:
TSKVRowInputStream(ReadBuffer & istr_, const Block & sample_);
bool read(Block & block) override;
private:
ReadBuffer & istr;
const Block sample;
/// Буфер для прочитанного из потока имени поля. Используется, если его потребовалось скопировать.
String name_buf;
/// Хэш-таблица соответствия имя поля -> позиция в блоке. NOTE Можно использовать perfect hash map.
using NameMap = HashMap<StringRef, size_t, StringRefHash>;
NameMap name_map;
};
}

View File

@ -19,6 +19,7 @@
#include <DB/DataStreams/JSONCompactRowOutputStream.h>
#include <DB/DataStreams/XMLRowOutputStream.h>
#include <DB/DataStreams/TSKVRowOutputStream.h>
#include <DB/DataStreams/TSKVRowInputStream.h>
#include <DB/DataStreams/PrettyCompactMonoBlockOutputStream.h>
#include <DB/DataStreams/ODBCDriverBlockOutputStream.h>
#include <DB/DataStreams/CSVRowInputStream.h>
@ -56,6 +57,8 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
return new BlockInputStreamFromRowInputStream(new CSVRowInputStream(buf, sample, ','), sample, max_block_size);
else if (name == "CSVWithNames")
return new BlockInputStreamFromRowInputStream(new CSVRowInputStream(buf, sample, ',', true), sample, max_block_size);
else if (name == "TSKV")
return new BlockInputStreamFromRowInputStream(new TSKVRowInputStream(buf, sample), sample, max_block_size);
else if (name == "TabSeparatedRaw"
|| name == "BlockTabSeparated"
|| name == "Pretty"
@ -71,7 +74,6 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
|| name == "JSON"
|| name == "JSONCompact"
|| name == "XML"
|| name == "TSKV"
|| name == "ODBCDriver")
throw Exception("Format " + name + " is not suitable for input", ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_INPUT);
else

View File

@ -0,0 +1,160 @@
#include <DB/IO/ReadHelpers.h>
#include <DB/DataStreams/TSKVRowInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int CANNOT_PARSE_ESCAPE_SEQUENCE;
extern const int CANNOT_READ_ALL_DATA;
}
TSKVRowInputStream::TSKVRowInputStream(ReadBuffer & istr_, const Block & sample_)
: istr(istr_), sample(sample_), name_map(sample.columns())
{
size_t columns = sample.columns();
for (size_t i = 0; i < columns; ++i)
name_map[sample.getByPosition(i).name] = i; /// NOTE Можно было бы расположить имена более кэш-локально.
}
/** Прочитать имя поля в формате tskv.
* Вернуть true, если после имени поля идёт знак равенства,
* иначе (поле без значения) вернуть false.
* Ссылка на имя поля будет записана в ref.
* Также может быть использован временный буфер tmp, чтобы скопировать туда имя поля.
* При чтении, пропускает имя и знак равенства после него.
*/
static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
{
tmp.clear();
while (!buf.eof())
{
const char * next_pos = find_first_symbols<'\t', '\n', '\\', '='>(buf.position(), buf.buffer().end());
if (next_pos == buf.buffer().end())
{
tmp.append(buf.position(), next_pos - buf.position());
buf.next();
continue;
}
/// Дошли до конца имени.
if (*next_pos != '\\')
{
bool have_value = *next_pos == '=';
if (tmp.empty())
{
/// Данные не нужно копировать, можно ссылаться прямо на внутренность buf.
ref = StringRef(buf.position(), next_pos - buf.position());
buf.position() += next_pos + have_value - buf.position();
}
else
{
/// Копируем данные во временную строку и возвращаем ссылку на неё.
tmp.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos + have_value - buf.position();
ref = StringRef(tmp);
}
return have_value;
}
/// В имени есть эскейп-последовательность.
else
{
tmp.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos + 1 - buf.position();
if (buf.eof())
throw Exception("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
tmp.push_back(parseEscapeSequence(*buf.position()));
++buf.position();
continue;
}
}
throw Exception("Unexpected end of stream while reading key name from TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA);
}
bool TSKVRowInputStream::read(Block & block)
{
if (istr.eof())
return false;
size_t columns = block.columns();
/// Множество столбцов, для которых были считаны значения. Остальные затем заполним значениями по-умолчанию.
/// TODO Возможность предоставить свои DEFAULT-ы.
bool read_columns[columns] = {};
if (unlikely(*istr.position() == '\n'))
{
/// Пустая строка. Допустимо, но непонятно зачем.
++istr.position();
}
else
{
while (true)
{
StringRef name_ref;
bool has_value = readName(istr, name_ref, name_buf);
if (has_value)
{
/// NOTE Возможна оптимизация путём кэширования порядка полей (который почти всегда одинаковый)
/// и быстрой проверки на соответствие следующему ожидаемому полю, вместо поиска в хэш-таблице.
auto it = name_map.find(name_ref);
if (name_map.end() == it)
throw Exception("Unknown field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
size_t index = it->second;
if (read_columns[index])
throw Exception("Duplicate field found while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
read_columns[index] = true;
auto & col = block.unsafeGetByPosition(index);
col.type.get()->deserializeTextEscaped(*col.column.get(), istr);
}
else
{
/// Единственное, что может идти без значения - это фрагмент tskv, который игнорируется.
if (!(name_ref.size == 4 && 0 == memcmp(name_ref.data, "tskv", 4)))
throw Exception("Found field without value while parsing TSKV format: " + name_ref.toString(), ErrorCodes::INCORRECT_DATA);
}
if (istr.eof())
{
throw Exception("Unexpected end of stream after field '" + name_ref.toString() + "' in TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA);
}
else if (*istr.position() == '\t')
{
++istr.position();
continue;
}
else if (*istr.position() == '\n')
{
++istr.position();
break;
}
else
throw Exception("Found garbage after field '" + name_ref.toString() + "' in TSKV format", ErrorCodes::INCORRECT_DATA);
}
}
/// Заполняем не встретившиеся столбцы значениями по-умолчанию.
for (size_t i = 0; i < columns; ++i)
if (!read_columns[i])
block.unsafeGetByPosition(i).column.get()->insertDefault();
return true;
}
}

View File

@ -14,8 +14,6 @@ namespace ErrorCodes
}
using Poco::SharedPtr;
TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_, bool with_names_, bool with_types_)
: istr(istr_), sample(sample_), with_names(with_names_), with_types(with_types_)
{

View File

@ -0,0 +1,5 @@
0000-00-00 00:00:00
0000-00-00 00:00:00
0000-00-00 00:00:00 Hello, world abc
custom-service-log 2013-01-01 00:00:00 +0400 multiline\ntext can contain \0 symbol
0000-00-00 00:00:00 def

View File

@ -0,0 +1,14 @@
#!/bin/bash
clickhouse-client --query="DROP TABLE IF EXISTS test.tskv";
clickhouse-client --query="CREATE TABLE test.tskv (tskv_format String, timestamp DateTime, timezone String, text String, binary_data String) ENGINE = Memory";
echo -n 'tskv tskv_format=custom-service-log timestamp=2013-01-01 00:00:00 timezone=+0400 text=multiline\ntext binary_data=can contain \0 symbol
binary_data=abc text=Hello, world
binary_data=def text=
tskv
' | clickhouse-client --query="INSERT INTO test.tskv FORMAT TSKV";
clickhouse-client --query="SELECT * FROM test.tskv ORDER BY binary_data";
clickhouse-client --query="DROP TABLE test.tskv";