equal
deleted
inserted
replaced
93 } |
93 } |
94 } |
94 } |
95 } |
95 } |
96 |
96 |
97 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { |
97 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { |
98 // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) |
|
99 if (field.group == RequestedField::GROUP_STREAMLET) { |
98 if (field.group == RequestedField::GROUP_STREAMLET) { |
100 for (auto metadata : cachedMetadata[field.id]) { |
99 for (auto metadata : cachedMetadata[field.id]) { |
101 SubProcess::Message m = subProcesses[field.id]->read(); |
100 SubProcess::Message m = subProcesses[field.id]->read(); |
102 if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); |
101 if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); |
103 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
102 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
104 } |
103 } |
105 |
104 |
106 SubProcess::Message m = subProcesses[field.id]->read(); |
105 SubProcess::Message m = subProcesses[field.id]->read(); |
107 if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); |
106 if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); |
108 // TODO: generic protocol violation error messages / method for checking responses |
107 // FIXME: generic protocol violation error messages / method for checking responses |
109 } |
108 } |
110 } |
109 } |
111 |
110 |
112 public: |
111 public: |
113 |
112 |