--- a/src/StreamletAttributeFinder.h Sat Jan 11 18:13:30 2020 +0100
+++ b/src/StreamletAttributeFinder.h Sat Jan 11 19:10:01 2020 +0100
@@ -50,7 +50,7 @@
protected:
- virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override {
+ virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
if (field.group == RequestedField::GROUP_STREAMLET) {
@@ -73,7 +73,7 @@
static const string_t SCRIPT_PREFIX;
- virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const RequestedField& field) override {
+ virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
if (field.group == RequestedField::GROUP_STREAMLET) {
if (cachedMetadata.count(field.id)) {
@@ -96,7 +96,7 @@
subProcess->write({StreamletMsg::WAITING_FOR_VERSION});
SubProcess::Message versionMessage = subProcess->read();
if (versionMessage.code == StreamletMsg::VERSION_ACCEPTED && versionMessage.parameters[0] == version) {
- subProcess->write({StreamletMsg::RELATION_START});
+ subProcess->write({StreamletMsg::RELATION_START, relationName});
subProcess->write({StreamletMsg::INPUT_ATTRIBUTE_METADATA, L"path", L"string"});
for (string_t alias : field.getAliases()) subProcess->write({StreamletMsg::OUTPUT_ATTRIBUTE_ALIAS, alias});
for (int i = 0; i < field.options.size();) subProcess->write({StreamletMsg::OPTION, field.options[i++], field.options[i++]});