equal
deleted
inserted
replaced
80 } else { |
80 } else { |
81 throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet."); |
81 throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet."); |
82 } |
82 } |
83 } |
83 } |
84 |
84 |
|
85 void writeAttribute(RelationalWriter* writer, TypeId typeId, SubProcess::Message* m) { |
|
86 if (m->parameters[1] == L"true") { |
|
87 if (typeId == TypeId::BOOLEAN) writer->writeAttribute(L"false"); |
|
88 else if (typeId == TypeId::INTEGER)writer->writeAttribute(L"0"); |
|
89 else writer->writeAttribute(L""); // TODO: write acruall null values (when supported) |
|
90 } else { |
|
91 writer->writeAttribute(m->parameters[0]); |
|
92 } |
|
93 } |
|
94 |
85 protected: |
95 protected: |
86 |
96 |
87 void startFile(const fs::path& file, const string& fileRaw, bool exists) override { |
97 void startFile(const fs::path& file, const string& fileRaw, bool exists) override { |
88 AttributeFinder::startFile(file, fileRaw, exists); |
98 AttributeFinder::startFile(file, fileRaw, exists); |
89 if (exists) { |
99 if (exists) { |
96 |
106 |
97 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { |
107 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { |
98 if (field.group == RequestedField::GROUP_STREAMLET) { |
108 if (field.group == RequestedField::GROUP_STREAMLET) { |
99 for (auto metadata : cachedMetadata[field.id]) { |
109 for (auto metadata : cachedMetadata[field.id]) { |
100 SubProcess::Message m = subProcesses[field.id]->read(); |
110 SubProcess::Message m = subProcesses[field.id]->read(); |
101 if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); |
111 if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writeAttribute(writer, metadata.typeId, &m); |
102 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
112 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
103 } |
113 } |
104 |
114 |
105 SubProcess::Message m = subProcesses[field.id]->read(); |
115 SubProcess::Message m = subProcesses[field.id]->read(); |
106 if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); |
116 if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); |