# HG changeset patch # User František Kučera # Date 1580070902 -3600 # Node ID a467e8cbd16b922a6502ae1691d8d73ea15eab08 # Parent 640ba8948d69607e307c9bcc4762b6dfc0c64cdd parallel processing: optimize flush() + detect another protocol violation diff -r 640ba8948d69 -r a467e8cbd16b src/StreamletAttributeFinder.h --- a/src/StreamletAttributeFinder.h Sat Jan 25 21:28:37 2020 +0100 +++ b/src/StreamletAttributeFinder.h Sun Jan 26 21:35:02 2020 +0100 @@ -143,6 +143,7 @@ SubProcess::Message m = subProcess->read(); if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE_METADATA) metadata.push_back({m.parameters[0], writer->toTypeId(m.parameters[1])}); else if (m.code == StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) break; + else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading output attribute metadata. Expected OUTPUT_ATTRIBUTE_METADATA or WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); } cachedMetadata[field.id] = metadata; diff -r 640ba8948d69 -r a467e8cbd16b src/SubProcess.cpp --- a/src/SubProcess.cpp Sat Jan 25 21:28:37 2020 +0100 +++ b/src/SubProcess.cpp Sun Jan 26 21:35:02 2020 +0100 @@ -57,7 +57,6 @@ void write(string_t s) { subInputWriter << convertor.to_bytes(s).c_str(); subInputWriter.put(SEPARATOR); - subInputWriter.flush(); if (subInputWriter.bad()) throw SubProcess::Exception(L"Unable to write to sub-process."); } @@ -65,6 +64,10 @@ write(std::to_wstring(i)); } + void flush() { + subInputWriter.flush(); + } + public: /** @@ -171,6 +174,7 @@ write(m.code); write(m.parameters.size()); for (auto p : m.parameters) write(p); + flush(); } int wait() {