# HG changeset patch # User František Kučera # Date 1578778852 -3600 # Node ID 926eb93c302fe059fa6e761ef586ec1323c020dc # Parent 0b9e4af08cc8cca734a4306e72ced218e3810532 streamlets: enable parallel processing diff -r 0b9e4af08cc8 -r 926eb93c302f src/StreamletAttributeFinder.h --- a/src/StreamletAttributeFinder.h Sat Jan 11 22:11:01 2020 +0100 +++ b/src/StreamletAttributeFinder.h Sat Jan 11 22:40:52 2020 +0100 @@ -50,13 +50,19 @@ protected: - virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override { + void startFile(const fs::path& file, const string& fileRaw, bool exists) override { + AttributeFinder::startFile(file, fileRaw, exists); + if (exists) { + for (auto subProcess : subProcesses) { + subProcess.second->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull + subProcess.second->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); + } + } + } + + virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) if (field.group == RequestedField::GROUP_STREAMLET) { - - subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull - subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); - for (auto metadata : cachedMetadata[field.id]) { SubProcess::Message m = subProcesses[field.id]->read(); if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]);