src/StreamletAttributeFinder.h
branchv_0
changeset 60 bb7ca5891755
parent 51 841845ccf06d
child 61 640ba8948d69
equal deleted inserted replaced
59:7471529c0d11 60:bb7ca5891755
    93 			}
    93 			}
    94 		}
    94 		}
    95 	}
    95 	}
    96 
    96 
    97 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
    97 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
    98 		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
       
    99 		if (field.group == RequestedField::GROUP_STREAMLET) {
    98 		if (field.group == RequestedField::GROUP_STREAMLET) {
   100 			for (auto metadata : cachedMetadata[field.id]) {
    99 			for (auto metadata : cachedMetadata[field.id]) {
   101 				SubProcess::Message m = subProcesses[field.id]->read();
   100 				SubProcess::Message m = subProcesses[field.id]->read();
   102 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);
   101 				if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);
   103 				else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString());
   102 				else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString());
   104 			}
   103 			}
   105 
   104 
   106 			SubProcess::Message m = subProcesses[field.id]->read();
   105 			SubProcess::Message m = subProcesses[field.id]->read();
   107 			if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());
   106 			if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());
   108 			// TODO: generic protocol violation error messages / method for checking responses
   107 			// FIXME: generic protocol violation error messages / method for checking responses
   109 		}
   108 		}
   110 	}
   109 	}
   111 
   110 
   112 public:
   111 public:
   113 
   112