dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-11-06 06:22:52 +00:00
parent 7dd2114223
commit bf568aa887
15 changed files with 187 additions and 18 deletions

View File

@ -5,6 +5,7 @@
#include <DB/Core/Field.h> #include <DB/Core/Field.h>
#include <DB/Core/Exception.h> #include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h> #include <DB/Core/ErrorCodes.h>
#include <DB/Columns/ColumnVector.h>
#include <DB/Columns/IColumn.h> #include <DB/Columns/IColumn.h>
@ -13,10 +14,19 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
class IColumnConst : public IColumn
{
public:
bool isConst() const { return true; }
virtual ColumnPtr convertToFullColumn() const = 0;
};
/** шаблон для столбцов-констант (столбцов одинаковых значений). /** шаблон для столбцов-констант (столбцов одинаковых значений).
*/ */
template <typename T> template <typename T>
class ColumnConst : public IColumn class ColumnConst : public IColumnConst
{ {
public: public:
typedef T Type; typedef T Type;
@ -26,7 +36,6 @@ public:
std::string getName() const { return "ColumnConst<" + TypeName<T>::get() + ">"; } std::string getName() const { return "ColumnConst<" + TypeName<T>::get() + ">"; }
bool isNumeric() const { return IsNumber<T>::value; } bool isNumeric() const { return IsNumber<T>::value; }
size_t sizeOfField() const { return sizeof(T); } size_t sizeOfField() const { return sizeof(T); }
bool isConst() const { return true; }
ColumnPtr cloneEmpty() const { return new ColumnConst(0, data); } ColumnPtr cloneEmpty() const { return new ColumnConst(0, data); }
size_t size() const { return s; } size_t size() const { return s; }
Field operator[](size_t n) const { return typename NearestFieldType<T>::Type(data); } Field operator[](size_t n) const { return typename NearestFieldType<T>::Type(data); }
@ -80,7 +89,7 @@ public:
const T & getData() const { return data; } const T & getData() const { return data; }
/** Преобразование из константы в полноценный столбец */ /** Преобразование из константы в полноценный столбец */
// virtual ColumnPtr convertToFullColumn() const = 0; ColumnPtr convertToFullColumn() const;
private: private:
size_t s; size_t s;
@ -90,4 +99,12 @@ private:
typedef ColumnConst<String> ColumnConstString; typedef ColumnConst<String> ColumnConstString;
template <typename T> ColumnPtr ColumnConst<T>::convertToFullColumn() const
{
ColumnVector<T> * res = new ColumnVector<T>;
res->getData().assign(s, data);
return res;
}
} }

View File

@ -21,6 +21,8 @@ public:
size_t max_block_size_ = DEFAULT_BLOCK_SIZE); size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
Block readImpl(); Block readImpl();
void readPrefix() { row_input->readPrefix(); }
void readSuffix() { row_input->readSuffix(); }
String getName() const { return "BlockInputStreamFromRowInputStream"; } String getName() const { return "BlockInputStreamFromRowInputStream"; }

View File

@ -15,6 +15,8 @@ class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream
public: public:
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_); BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_);
void write(const Block & block); void write(const Block & block);
void writePrefix() { row_output->writePrefix(); }
void writeSuffix() { row_output->writeSuffix(); }
BlockOutputStreamPtr clone() { return new BlockOutputStreamFromRowOutputStream(row_output); } BlockOutputStreamPtr clone() { return new BlockOutputStreamFromRowOutputStream(row_output); }

View File

@ -0,0 +1,48 @@
#pragma once
#include <DB/Columns/ColumnConst.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Преобразует столбцы-константы в полноценные столбцы ("материализует" их).
*/
class MaterializingBlockInputStream : public IProfilingBlockInputStream
{
public:
MaterializingBlockInputStream(BlockInputStreamPtr input_)
: input(input_)
{
children.push_back(input);
}
Block readImpl()
{
Block res = input->read();
if (!res)
return res;
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnPtr col = res.getByPosition(i).column;
if (col->isConst())
res.getByPosition(i).column = dynamic_cast<IColumnConst &>(*col).convertToFullColumn();
}
return res;
}
String getName() const { return "MaterializingBlockInputStream"; }
BlockInputStreamPtr clone() { return new MaterializingBlockInputStream(input); }
private:
BlockInputStreamPtr input;
};
}

View File

@ -18,15 +18,21 @@ using Poco::SharedPtr;
class TabSeparatedRowInputStream : public IRowInputStream class TabSeparatedRowInputStream : public IRowInputStream
{ {
public: public:
TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_); /** with_names - в первой строке заголовок с именами столбцов
* with_types - на следующей строке заголовок с именами типов
*/
TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_, bool with_names_ = false, bool with_types_ = false);
Row read(); Row read();
void readPrefix();
RowInputStreamPtr clone() { return new TabSeparatedRowInputStream(istr, sample); } RowInputStreamPtr clone() { return new TabSeparatedRowInputStream(istr, sample); }
private: private:
ReadBuffer & istr; ReadBuffer & istr;
const Block & sample; const Block & sample;
bool with_names;
bool with_types;
DataTypes data_types; DataTypes data_types;
}; };

View File

@ -18,17 +18,23 @@ using Poco::SharedPtr;
class TabSeparatedRowOutputStream : public IRowOutputStream class TabSeparatedRowOutputStream : public IRowOutputStream
{ {
public: public:
TabSeparatedRowOutputStream(WriteBuffer & ostr_, const Block & sample_); /** with_names - выводить в первой строке заголовок с именами столбцов
* with_types - выводить на следующей строке заголовок с именами типов
*/
TabSeparatedRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool with_names_ = false, bool with_types_ = false);
void writeField(const Field & field); void writeField(const Field & field);
void writeFieldDelimiter(); void writeFieldDelimiter();
void writeRowEndDelimiter(); void writeRowEndDelimiter();
void writePrefix();
RowOutputStreamPtr clone() { return new TabSeparatedRowOutputStream(ostr, sample); } RowOutputStreamPtr clone() { return new TabSeparatedRowOutputStream(ostr, sample); }
private: private:
WriteBuffer & ostr; WriteBuffer & ostr;
const Block & sample; const Block & sample;
bool with_names;
bool with_types;
DataTypes data_types; DataTypes data_types;
size_t field_number; size_t field_number;
}; };

View File

@ -1,5 +1,4 @@
#ifndef DBMS_DATA_STREAMS_COPY_DATA_H #pragma once
#define DBMS_DATA_STREAMS_COPY_DATA_H
#include <DB/Core/Block.h> #include <DB/Core/Block.h>
#include <DB/DataStreams/IBlockInputStream.h> #include <DB/DataStreams/IBlockInputStream.h>
@ -20,5 +19,3 @@ void copyData(IBlockInputStream & from, IRowOutputStream & to);
void copyData(IRowInputStream & from, IBlockOutputStream & to, const Block & sample); void copyData(IRowInputStream & from, IBlockOutputStream & to, const Block & sample);
} }
#endif

View File

@ -0,0 +1,29 @@
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnConst.h>
namespace DB
{
template <> ColumnPtr ColumnConst<String>::convertToFullColumn() const
{
ColumnString * res = new ColumnString;
ColumnString::Offsets_t & offsets = res->getOffsets();
ColumnUInt8::Container_t & vec = dynamic_cast<ColumnVector<UInt8> &>(res->getData()).getData();
size_t string_size = data.size() + 1;
size_t offset = 0;
offsets.resize(s);
vec.resize(s * string_size);
for (size_t i = 0; i < s; ++i)
{
memcpy(&vec[offset], data.data(), string_size);
offset += string_size;
offsets[i] = offset;
}
return res;
}
}

View File

@ -23,7 +23,6 @@ Block BlockInputStreamFromRowInputStream::readImpl()
{ {
Block res; Block res;
row_input->readPrefix();
for (size_t rows = 0; rows < max_block_size; ++rows) for (size_t rows = 0; rows < max_block_size; ++rows)
{ {
if (rows != 0) if (rows != 0)
@ -43,7 +42,6 @@ Block BlockInputStreamFromRowInputStream::readImpl()
for (size_t i = 0; i < row.size(); ++i) for (size_t i = 0; i < row.size(); ++i)
res.getByPosition(i).column->insert(row[i]); res.getByPosition(i).column->insert(row[i]);
} }
row_input->readSuffix();
return res; return res;
} }

View File

@ -12,7 +12,6 @@ void BlockOutputStreamFromRowOutputStream::write(const Block & block)
size_t rows = block.rows(); size_t rows = block.rows();
size_t columns = block.columns(); size_t columns = block.columns();
row_output->writePrefix();
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
if (i != 0) if (i != 0)
@ -29,7 +28,6 @@ void BlockOutputStreamFromRowOutputStream::write(const Block & block)
row_output->writeRowEndDelimiter(); row_output->writeRowEndDelimiter();
} }
row_output->writeSuffix();
} }
} }

View File

@ -4,6 +4,7 @@
#include <DB/DataStreams/TabSeparatedRowOutputStream.h> #include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/ValuesRowInputStream.h> #include <DB/DataStreams/ValuesRowInputStream.h>
#include <DB/DataStreams/ValuesRowOutputStream.h> #include <DB/DataStreams/ValuesRowOutputStream.h>
#include <DB/DataStreams/TabSeparatedBlockOutputStream.h>
#include <DB/DataStreams/BlockInputStreamFromRowInputStream.h> #include <DB/DataStreams/BlockInputStreamFromRowInputStream.h>
#include <DB/DataStreams/BlockOutputStreamFromRowOutputStream.h> #include <DB/DataStreams/BlockOutputStreamFromRowOutputStream.h>
#include <DB/DataStreams/FormatFactory.h> #include <DB/DataStreams/FormatFactory.h>
@ -19,6 +20,10 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
return new NativeBlockInputStream(buf, data_type_factory); return new NativeBlockInputStream(buf, data_type_factory);
else if (name == "TabSeparated") else if (name == "TabSeparated")
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size); return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample), sample, max_block_size);
else if (name == "TabSeparatedWithNames")
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample, true), sample, max_block_size);
else if (name == "TabSeparatedWithNamesAndTypes")
return new BlockInputStreamFromRowInputStream(new TabSeparatedRowInputStream(buf, sample, true, true), sample, max_block_size);
else if (name == "Values") else if (name == "Values")
return new BlockInputStreamFromRowInputStream(new ValuesRowInputStream(buf, sample), sample, max_block_size); return new BlockInputStreamFromRowInputStream(new ValuesRowInputStream(buf, sample), sample, max_block_size);
else else
@ -33,6 +38,12 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
return new NativeBlockOutputStream(buf); return new NativeBlockOutputStream(buf);
else if (name == "TabSeparated") else if (name == "TabSeparated")
return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample)); return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample));
else if (name == "TabSeparatedWithNames")
return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample, true));
else if (name == "TabSeparatedWithNamesAndTypes")
return new BlockOutputStreamFromRowOutputStream(new TabSeparatedRowOutputStream(buf, sample, true, true));
else if (name == "BlockTabSeparated")
return new TabSeparatedBlockOutputStream(buf);
else if (name == "Values") else if (name == "Values")
return new BlockOutputStreamFromRowOutputStream(new ValuesRowOutputStream(buf, sample)); return new BlockOutputStreamFromRowOutputStream(new ValuesRowOutputStream(buf, sample));
else else

View File

@ -8,8 +8,8 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_) TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_, bool with_names_, bool with_types_)
: istr(istr_), sample(sample_) : istr(istr_), sample(sample_), with_names(with_names_), with_types(with_types_)
{ {
size_t columns = sample.columns(); size_t columns = sample.columns();
data_types.resize(columns); data_types.resize(columns);
@ -18,6 +18,31 @@ TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, const
} }
void TabSeparatedRowInputStream::readPrefix()
{
size_t columns = sample.columns();
String tmp;
if (with_names)
{
for (size_t i = 0; i < columns; ++i)
{
readEscapedString(tmp, istr);
assertString(i == columns - 1 ? "\n" : "\t", istr);
}
}
if (with_types)
{
for (size_t i = 0; i < columns; ++i)
{
readEscapedString(tmp, istr);
assertString(i == columns - 1 ? "\n" : "\t", istr);
}
}
}
Row TabSeparatedRowInputStream::read() Row TabSeparatedRowInputStream::read()
{ {
Row res; Row res;

View File

@ -9,8 +9,8 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(WriteBuffer & ostr_, const Block & sample_) TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(WriteBuffer & ostr_, const Block & sample_, bool with_names_, bool with_types_)
: ostr(ostr_), sample(sample_), field_number(0) : ostr(ostr_), sample(sample_), with_names(with_names_), with_types(with_types_), field_number(0)
{ {
size_t columns = sample.columns(); size_t columns = sample.columns();
data_types.resize(columns); data_types.resize(columns);
@ -19,6 +19,30 @@ TabSeparatedRowOutputStream::TabSeparatedRowOutputStream(WriteBuffer & ostr_, co
} }
void TabSeparatedRowOutputStream::writePrefix()
{
size_t columns = sample.columns();
if (with_names)
{
for (size_t i = 0; i < columns; ++i)
{
writeEscapedString(sample.getByPosition(i).name, ostr);
writeChar(i == columns - 1 ? '\n' : '\t', ostr);
}
}
if (with_types)
{
for (size_t i = 0; i < columns; ++i)
{
writeEscapedString(sample.getByPosition(i).type->getName(), ostr);
writeChar(i == columns - 1 ? '\n' : '\t', ostr);
}
}
}
void TabSeparatedRowOutputStream::writeField(const Field & field) void TabSeparatedRowOutputStream::writeField(const Field & field)
{ {
data_types[field_number]->serializeTextEscaped(field, ostr); data_types[field_number]->serializeTextEscaped(field, ostr);

View File

@ -6,6 +6,7 @@
#include <DB/IO/WriteBufferFromString.h> #include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteHelpers.h> #include <DB/IO/WriteHelpers.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/copyData.h> #include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTCreateQuery.h> #include <DB/Parsers/ASTCreateQuery.h>
@ -167,7 +168,10 @@ StoragePtr InterpreterCreateQuery::execute()
/// Если запрос CREATE SELECT, то вставим в таблицу данные /// Если запрос CREATE SELECT, то вставим в таблицу данные
if (create.select) if (create.select)
copyData(*interpreter_select->execute(), *res->write(query_ptr)); {
BlockInputStreamPtr from = new MaterializingBlockInputStream(interpreter_select->execute());
copyData(*from, *res->write(query_ptr));
}
return res; return res;
} }

View File

@ -1,5 +1,6 @@
#include <DB/IO/ConcatReadBuffer.h> #include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/FormatFactory.h> #include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h> #include <DB/DataStreams/copyData.h>
@ -79,6 +80,7 @@ void InterpreterInsertQuery::execute(SharedPtr<ReadBuffer> remaining_data_istr)
{ {
InterpreterSelectQuery interpreter_select(query.select, context, max_block_size); InterpreterSelectQuery interpreter_select(query.select, context, max_block_size);
in = interpreter_select.execute(); in = interpreter_select.execute();
in = new MaterializingBlockInputStream(in);
copyData(*in, *out); copyData(*in, *out);
} }
} }