/**
* Relational pipes
* Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <set>
#include <regex>
#include <relpipe/writer/typedefs.h>
#include <relpipe/writer/RelationalWriter.h>
#include <relpipe/writer/AttributeMetadata.h>
namespace relpipe {
namespace in {
namespace recfile {
using namespace relpipe::writer;
class RecfileCommand {
private:
enum class RecfileLineType {
METADATA,
DATA,
SEPARATOR,
COMMENT,
END,
};
class RecfileHandler {
private:
RelationalWriter* writer;
string_t currentRelationName;
std::vector<AttributeMetadata> currentAttributeMetadata;
std::vector<AttributeMetadata> currentTypeHints;
std::vector<string_t> currentRecord;
std::vector<std::vector<string_t>> currentRecords;
size_t prefetchCount = 1;
bool headerWritten = false;
TypeId findType(string_t attributeName, TypeId defaultType = TypeId::STRING) {
for (AttributeMetadata m : currentTypeHints) if (m.attributeName == attributeName) return m.typeId;
return defaultType;
}
TypeId recType2typeId(string_t recType) {
// TODO: support more types
// boolean is currently unsupported, because NULLs are not implemented yet and recfile booleans might be null
if (recType == L"int") return TypeId::INTEGER;
else return TypeId::STRING;
}
void writeHeader() {
if (headerWritten) return;
if (currentRelationName.size() == 0) currentRelationName = L"recfile";
std::set<string_t> uniqueAttributeNames;
// TODO: add also attribute names from type hints from recfile metadata
for (int i = 0; i < currentRecords.size(); i++) {
std::vector<string_t> record = currentRecords[i];
for (int j = 0; j < record.size(); j += 2) {
if (uniqueAttributeNames.insert(record[j]).second) {
currentAttributeMetadata.push_back({record[j], findType(record[j])});
}
}
}
writer->startRelation(currentRelationName, currentAttributeMetadata, true);
headerWritten = true;
}
string_t findValue(std::vector<string_t>& record, TypeId type, string_t& name) {
for (int j = 0; j < record.size(); j += 2) if (record[j] == name) return record[j + 1];
return L""; // TODO: proper empty/null value for given type
}
void writeRecords() {
for (std::vector<string_t> record : currentRecords) {
for (AttributeMetadata a : currentAttributeMetadata) {
writer->writeAttribute(findValue(record, a.typeId, a.attributeName));
}
}
currentRecords.clear();
}
void metadata(const string_t& name, const string_t& value) {
if (name == L"rec") {
currentRelationName = value;
currentAttributeMetadata.clear();
currentTypeHints.clear();
currentRecord.clear();
currentRecords.clear();
headerWritten = false;
} else if (name == L"type") {
std::wsmatch match;
if (regex_search(value, match, std::wregex(L"\\s?(.*)\\s+(.*)\\s?"))) currentTypeHints.push_back({match[1], recType2typeId(match[2])});
} else {
// ignore – other recfile metadata like keys or auto-increments
}
}
void data(const string_t& name, const string_t& value) {
currentRecord.push_back(name);
currentRecord.push_back(value);
}
void comment(const string_t& value) {
// ignore comments
}
void separator() {
if (currentRecord.size()) {
currentRecords.push_back(currentRecord);
currentRecord.clear();
}
if (prefetchCount > 0 && currentRecords.size() >= prefetchCount) {
writeHeader();
writeRecords();
}
}
void end() {
if (currentRecord.size()) currentRecords.push_back(currentRecord);
writeHeader();
writeRecords();
}
public:
RecfileHandler(RelationalWriter* writer) : writer(writer) {
}
virtual ~RecfileHandler() {
}
void logicalLine(RecfileLineType type, const string_t& name = L"", const string_t& value = L"") {
switch (type) {
case RecfileLineType::METADATA: return metadata(name, value);
case RecfileLineType::DATA: return data(name, value);
case RecfileLineType::COMMENT: return comment(value);
case RecfileLineType::SEPARATOR: return separator();
case RecfileLineType::END: return end();
}
}
};
enum class ParserState {
START,
NAME,
VALUE,
VALUE_CONTINUATION,
COMMENT,
END,
};
class RecfileParser {
private:
wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings or are recfiles always in UTF-8?
RecfileHandler& handler;
void emitLogicalLine(RecfileLineType& type, std::stringstream& name, std::stringstream& value) {
handler.logicalLine(type, convertor.from_bytes(name.str()), convertor.from_bytes(value.str()));
name.str("");
name.clear();
value.str("");
value.clear();
type = RecfileLineType::DATA;
}
public:
RecfileParser(RecfileHandler& handler) : handler(handler) {
}
virtual ~RecfileParser() {
}
void parse(std::istream& input) {
ParserState state = ParserState::START;
RecfileLineType type = RecfileLineType::DATA;
std::stringstream name;
std::stringstream value;
char ch;
while (state != ParserState::END && input.good()) {
ch = input.get();
if (input.eof()) continue;
switch (state) {
case ParserState::START:
if (ch == '%') {
type = RecfileLineType::METADATA;
break;
} else if (ch == ' ') {
break;
} else if (ch == '\n') {
handler.logicalLine(RecfileLineType::SEPARATOR);
break;
} else if (ch == '#') {
type = RecfileLineType::COMMENT;
state = ParserState::COMMENT;
if (input.get() != ' ') input.unget();
break;
} // else → name
case ParserState::NAME:
if (ch == ':') {
state = ParserState::VALUE;
if (input.get() != ' ') input.unget();
} else {
name << ch;
}
break;
case ParserState::VALUE:
if (ch == '\n') state = ParserState::VALUE_CONTINUATION;
else if (ch == '\\' && input.peek() == '\n') input.get();
else value << ch;
break;
case ParserState::VALUE_CONTINUATION:
if (ch == '+') {
state = ParserState::VALUE;
if (value.tellp()) value << '\n';
if (input.get() != ' ') input.unget();
} else {
input.unget();
state = ParserState::START;
emitLogicalLine(type, name, value);
}
break;
case ParserState::COMMENT:
if (ch == '\n') {
state = ParserState::START;
emitLogicalLine(type, name, value);
} else {
value << ch;
}
break;
default:
throw RelpipeWriterException(L"Unknown ParserState: " + std::to_wstring((int) state) + L" in RecfileParser."); // TODO: better exception
}
}
if (name.tellp()) emitLogicalLine(type, name, value);
handler.logicalLine(RecfileLineType::END);
}
};
public:
void process(std::istream& input, std::ostream& output) {
unique_ptr<RelationalWriter> writer(Factory::create(output));
RecfileHandler handler(writer.get());
RecfileParser parser(handler);
parser.parse(input);
}
};
}
}
}