Bug 13916 – Nested std.concurrency.receive doesn't work correctly

Status
NEW
Severity
normal
Priority
P3
Component
phobos
Product
D
Version
D2
Platform
All
OS
All
Creation time
2014-12-30T21:06:35Z
Last change time
2024-12-01T16:23:29Z
Assigned to
No Owner
Creator
yazan.dabain
Moved to GitHub: phobos#9650 →

Comments

Comment #0 by yazan.dabain — 2014-12-30T21:06:35Z
``` import std.concurrency; import std.stdio; // version = working; void child() { int i; while (true) { i = receiveOnly!int(); ownerTid().send(i); ownerTid().send(i); writeln("2 messages sent"); } } void main() { Tid child = spawnLinked(&child); while (true) { int i; child.send(i); receive( // A (int x) { version (working) {} else { writeln("blocking until 2nd message is received"); receiveOnly!int(); // B writeln("2nd message was received"); } } ); version (working) { writeln("blocking until 2nd message is received"); receiveOnly!int(); // C writeln("2nd message was received"); } writeln("one loop"); } } ``` In `version = working`, the program works as expected printing: `2 messages sent blocking until 2nd message is received 2nd message was received one loop 2 messages sent blocking until 2nd message is received 2nd message was received one loop` ... and so on With `version = working` commented out, instead of having the two receives in series (A + C), receive (B) is used in the message handler of receive (A) (i.e receive (B) is nested). Running the program on a multi-core CPU prints: `2 messages sent blocking until 2nd message is received` and it hangs without reaching `2nd message was received`. When running the same program using `taskset 0x01 ./test` (which limits the CPUs that the program can run on to 1) the behavior becomes similar to the working version for a while but it still hangs eventually.
Comment #1 by code — 2015-01-14T22:16:07Z
You'd need to change the onStandardMessage function to remove the value from the list before instead of after dispatching it to the handler. https://github.com/D-Programming-Language/phobos/blob/fc132046bf0dc6efcb1ee40833fd1896cbdd3324/std/concurrency.d#L1956
Comment #2 by greensunny12 — 2018-03-31T17:38:52Z
So I experimented with removing the message from the list before the handler is called, but I always get a LinkTerminated Exception which comes from the child being terminated. The child is in a while(true) loop, so it shouldn't get terminated, but for some reason now its module deconstructor is now immediately called :/ diff --git a/std/concurrency.d b/std/concurrency.d index 0e1b505bc..1c426bcc0 100644 --- a/std/concurrency.d +++ b/std/concurrency.d @@ -1974,8 +1980,9 @@ private enum timedWait = false; } - bool onStandardMsg(ref Message msg) + bool onStandardMsg(ref Message msg, void delegate() beforeSuccess = (){}) { foreach (i, t; Ops) { alias Args = Parameters!(t); @@ -1985,10 +1992,13 @@ private { static if (is(ReturnType!(t) == bool)) { - return msg.map(op); + beforeSuccess(); + auto b = msg.map(op); + return b; } else { + beforeSuccess(); msg.map(op); return true; } @@ -2043,6 +2053,8 @@ private { for (auto range = list[]; !range.empty;) { + import std.stdio; + writefln("range: %s", range.front); // Only the message handler will throw, so if this occurs // we can be certain that the message was handled. scope (failure) @@ -2071,11 +2083,9 @@ private } else { - if (onStandardMsg(range.front)) - { - list.removeAt(range); + if (onStandardMsg(range.front, (){ list.removeAt(range); })) return true; - } + range.popFront(); continue; } ```
Comment #3 by robert.schadek — 2024-12-01T16:23:29Z
THIS ISSUE HAS BEEN MOVED TO GITHUB https://github.com/dlang/phobos/issues/9650 DO NOT COMMENT HERE ANYMORE, NOBODY WILL SEE IT, THIS ISSUE HAS BEEN MOVED TO GITHUB