8087324: Use semaphores when starting and stopping GC task threads
authorstefank
Mon, 29 Jun 2015 11:11:12 +0200
changeset 32361 3de9a2e1e2d0
parent 32360 86790204fc23
child 32362 a487889ff52d
child 32363 6d92a74efac3
8087324: Use semaphores when starting and stopping GC task threads Reviewed-by: jmasa, sjohanss
hotspot/src/share/vm/gc/shared/genCollectedHeap.hpp
hotspot/src/share/vm/gc/shared/workgroup.cpp
hotspot/src/share/vm/gc/shared/workgroup.hpp
hotspot/src/share/vm/runtime/globals.hpp
--- a/hotspot/src/share/vm/gc/shared/genCollectedHeap.hpp	Mon Jun 29 11:09:39 2015 +0200
+++ b/hotspot/src/share/vm/gc/shared/genCollectedHeap.hpp	Mon Jun 29 11:11:12 2015 +0200
@@ -30,9 +30,9 @@
 #include "gc/shared/collectorPolicy.hpp"
 #include "gc/shared/generation.hpp"
 
-class WorkGang;
 class StrongRootsScope;
 class SubTasksDone;
+class WorkGang;
 
 // A "GenCollectedHeap" is a CollectedHeap that uses generational
 // collection.  It has two generations, young and old.
--- a/hotspot/src/share/vm/gc/shared/workgroup.cpp	Mon Jun 29 11:09:39 2015 +0200
+++ b/hotspot/src/share/vm/gc/shared/workgroup.cpp	Mon Jun 29 11:11:12 2015 +0200
@@ -28,6 +28,8 @@
 #include "memory/allocation.inline.hpp"
 #include "runtime/atomic.inline.hpp"
 #include "runtime/os.hpp"
+#include "runtime/semaphore.hpp"
+#include "runtime/thread.inline.hpp"
 
 // Definitions of WorkGang methods.
 
@@ -96,87 +98,170 @@
   }
 }
 
+// WorkGang dispatcher implemented with semaphores.
+//
+// Semaphores don't require the worker threads to re-claim the lock when they wake up.
+// This helps lowering the latency when starting and stopping the worker threads.
+class SemaphoreGangTaskDispatcher : public GangTaskDispatcher {
+  // The task currently being dispatched to the GangWorkers.
+  AbstractGangTask* _task;
+
+  volatile uint _started;
+  volatile uint _not_finished;
+
+  // Semaphore used to start the GangWorkers.
+  Semaphore* _start_semaphore;
+  // Semaphore used to notify the coordinator that all workers are done.
+  Semaphore* _end_semaphore;
+
+public:
+  SemaphoreGangTaskDispatcher() :
+      _task(NULL),
+      _started(0),
+      _not_finished(0),
+      _start_semaphore(new Semaphore()),
+      _end_semaphore(new Semaphore())
+{ }
+
+  ~SemaphoreGangTaskDispatcher() {
+    delete _start_semaphore;
+    delete _end_semaphore;
+  }
+
+  void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
+    // No workers are allowed to read the state variables until they have been signaled.
+    _task         = task;
+    _not_finished = num_workers;
+
+    // Dispatch 'num_workers' number of tasks.
+    _start_semaphore->signal(num_workers);
+
+    // Wait for the last worker to signal the coordinator.
+    _end_semaphore->wait();
+
+    // No workers are allowed to read the state variables after the coordinator has been signaled.
+    assert(_not_finished == 0, err_msg("%d not finished workers?", _not_finished));
+    _task    = NULL;
+    _started = 0;
+
+  }
+
+  WorkData worker_wait_for_task() {
+    // Wait for the coordinator to dispatch a task.
+    _start_semaphore->wait();
+
+    uint num_started = (uint) Atomic::add(1, (volatile jint*)&_started);
+
+    // Subtract one to get a zero-indexed worker id.
+    uint worker_id = num_started - 1;
+
+    return WorkData(_task, worker_id);
+  }
+
+  void worker_done_with_task() {
+    // Mark that the worker is done with the task.
+    // The worker is not allowed to read the state variables after this line.
+    uint not_finished = (uint) Atomic::add(-1, (volatile jint*)&_not_finished);
+
+    // The last worker signals to the coordinator that all work is completed.
+    if (not_finished == 0) {
+      _end_semaphore->signal();
+    }
+  }
+};
+
+class MutexGangTaskDispatcher : public GangTaskDispatcher {
+  AbstractGangTask* _task;
+
+  volatile uint _started;
+  volatile uint _finished;
+  volatile uint _num_workers;
+
+  Monitor* _monitor;
+
+ public:
+  MutexGangTaskDispatcher()
+      : _task(NULL),
+        _monitor(new Monitor(Monitor::leaf, "WorkGang dispatcher lock", false, Monitor::_safepoint_check_never)),
+        _started(0),
+        _finished(0),
+        _num_workers(0) {}
+
+  ~MutexGangTaskDispatcher() {
+    delete _monitor;
+  }
+
+  void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
+    MutexLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
+
+    _task        = task;
+    _num_workers = num_workers;
+
+    // Tell the workers to get to work.
+    _monitor->notify_all();
+
+    // Wait for them to finish.
+    while (_finished < _num_workers) {
+      _monitor->wait(/* no_safepoint_check */ true);
+    }
+
+    _task        = NULL;
+    _num_workers = 0;
+    _started     = 0;
+    _finished    = 0;
+  }
+
+  WorkData worker_wait_for_task() {
+    MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
+
+    while (_num_workers == 0 || _started == _num_workers) {
+      _monitor->wait(/* no_safepoint_check */ true);
+    }
+
+    _started++;
+
+    // Subtract one to get a zero-indexed worker id.
+    uint worker_id = _started - 1;
+
+    return WorkData(_task, worker_id);
+  }
+
+  void worker_done_with_task() {
+    MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
+
+    _finished++;
+
+    if (_finished == _num_workers) {
+      // This will wake up all workers and not only the coordinator.
+      _monitor->notify_all();
+    }
+  }
+};
+
+static GangTaskDispatcher* create_dispatcher() {
+  if (UseSemaphoreGCThreadsSynchronization) {
+    return new SemaphoreGangTaskDispatcher();
+  }
+
+  return new MutexGangTaskDispatcher();
+}
+
 WorkGang::WorkGang(const char* name,
-                   uint        workers,
-                   bool        are_GC_task_threads,
-                   bool        are_ConcurrentGC_threads) :
+                   uint  workers,
+                   bool  are_GC_task_threads,
+                   bool  are_ConcurrentGC_threads) :
     AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
-    _started_workers(0),
-    _finished_workers(0),
-    _sequence_number(0),
-    _task(NULL) {
-
-  // Other initialization.
-  _monitor = new Monitor(/* priority */       Mutex::leaf,
-                         /* name */           "WorkGroup monitor",
-                         /* allow_vm_block */ are_GC_task_threads,
-                                              Monitor::_safepoint_check_sometimes);
-
-  assert(monitor() != NULL, "Failed to allocate monitor");
-}
+    _dispatcher(create_dispatcher())
+{ }
 
 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
   return new GangWorker(this, worker_id);
 }
 
 void WorkGang::run_task(AbstractGangTask* task) {
-  run_task(task, (uint)active_workers());
+  _dispatcher->coordinator_execute_on_workers(task, active_workers());
 }
 
-void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
-  // This thread is executed by the VM thread which does not block
-  // on ordinary MutexLocker's.
-  MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
-  if (TraceWorkGang) {
-    tty->print_cr("Running work gang %s task %s", name(), task->name());
-  }
-  // Tell all the workers to run a task.
-  assert(task != NULL, "Running a null task");
-  // Initialize.
-  _task = task;
-  _sequence_number += 1;
-  _started_workers = 0;
-  _finished_workers = 0;
-  // Tell the workers to get to work.
-  monitor()->notify_all();
-  // Wait for them to be finished
-  while (finished_workers() < no_of_parallel_workers) {
-    if (TraceWorkGang) {
-      tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d",
-                    name(), finished_workers(), no_of_parallel_workers,
-                    _sequence_number);
-    }
-    monitor()->wait(/* no_safepoint_check */ true);
-  }
-  _task = NULL;
-  if (TraceWorkGang) {
-    tty->print_cr("\nFinished work gang %s: %u/%u sequence %d",
-                  name(), finished_workers(), no_of_parallel_workers,
-                  _sequence_number);
-    Thread* me = Thread::current();
-    tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());
-  }
-}
-
-void WorkGang::internal_worker_poll(WorkData* data) const {
-  assert(monitor()->owned_by_self(), "worker_poll is an internal method");
-  assert(data != NULL, "worker data is null");
-  data->set_task(task());
-  data->set_sequence_number(sequence_number());
-}
-
-void WorkGang::internal_note_start() {
-  assert(monitor()->owned_by_self(), "note_finish is an internal method");
-  _started_workers += 1;
-}
-
-void WorkGang::internal_note_finish() {
-  assert(monitor()->owned_by_self(), "note_finish is an internal method");
-  _finished_workers += 1;
-}
-
-// GangWorker methods.
-
 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
   _gang = gang;
   set_id(id);
@@ -218,79 +303,43 @@
   st->cr();
 }
 
+WorkData GangWorker::wait_for_task() {
+  return gang()->dispatcher()->worker_wait_for_task();
+}
+
+void GangWorker::signal_task_done() {
+  gang()->dispatcher()->worker_done_with_task();
+}
+
+void GangWorker::print_task_started(WorkData data) {
+  if (TraceWorkGang) {
+    tty->print_cr("Running work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);
+  }
+}
+
+void GangWorker::print_task_done(WorkData data) {
+  if (TraceWorkGang) {
+    tty->print_cr("\nFinished work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);
+    Thread* me = Thread::current();
+    tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());
+  }
+}
+
+void GangWorker::run_task(WorkData data) {
+  print_task_started(data);
+
+  data._task->work(data._worker_id);
+
+  print_task_done(data);
+}
+
 void GangWorker::loop() {
-  int previous_sequence_number = 0;
-  Monitor* gang_monitor = gang()->monitor();
-  for ( ; ; ) {
-    WorkData data;
-    int part;  // Initialized below.
-    {
-      // Grab the gang mutex.
-      MutexLocker ml(gang_monitor);
-      // Wait for something to do.
-      // Polling outside the while { wait } avoids missed notifies
-      // in the outer loop.
-      gang()->internal_worker_poll(&data);
-      if (TraceWorkGang) {
-        tty->print("Polled outside for work in gang %s worker %u",
-                   gang()->name(), id());
-        tty->print("  sequence: %d (prev: %d)",
-                   data.sequence_number(), previous_sequence_number);
-        if (data.task() != NULL) {
-          tty->print("  task: %s", data.task()->name());
-        } else {
-          tty->print("  task: NULL");
-        }
-        tty->cr();
-      }
-      for ( ; /* break */; ) {
-        // Check for new work.
-        if ((data.task() != NULL) &&
-            (data.sequence_number() != previous_sequence_number)) {
-          if (gang()->needs_more_workers()) {
-            gang()->internal_note_start();
-            gang_monitor->notify_all();
-            part = gang()->started_workers() - 1;
-            break;
-          }
-        }
-        // Nothing to do.
-        gang_monitor->wait(/* no_safepoint_check */ true);
-        gang()->internal_worker_poll(&data);
-        if (TraceWorkGang) {
-          tty->print("Polled inside for work in gang %s worker %u",
-                     gang()->name(), id());
-          tty->print("  sequence: %d (prev: %d)",
-                     data.sequence_number(), previous_sequence_number);
-          if (data.task() != NULL) {
-            tty->print("  task: %s", data.task()->name());
-          } else {
-            tty->print("  task: NULL");
-          }
-          tty->cr();
-        }
-      }
-      // Drop gang mutex.
-    }
-    if (TraceWorkGang) {
-      tty->print("Work for work gang %s id %u task %s part %d",
-                 gang()->name(), id(), data.task()->name(), part);
-    }
-    assert(data.task() != NULL, "Got null task");
-    data.task()->work(part);
-    {
-      if (TraceWorkGang) {
-        tty->print("Finish for work gang %s id %u task %s part %d",
-                   gang()->name(), id(), data.task()->name(), part);
-      }
-      // Grab the gang mutex.
-      MutexLocker ml(gang_monitor);
-      gang()->internal_note_finish();
-      // Tell the gang you are done.
-      gang_monitor->notify_all();
-      // Drop the gang mutex.
-    }
-    previous_sequence_number = data.sequence_number();
+  while (true) {
+    WorkData data = wait_for_task();
+
+    run_task(data);
+
+    signal_task_done();
   }
 }
 
--- a/hotspot/src/share/vm/gc/shared/workgroup.hpp	Mon Jun 29 11:09:39 2015 +0200
+++ b/hotspot/src/share/vm/gc/shared/workgroup.hpp	Mon Jun 29 11:11:12 2015 +0200
@@ -25,7 +25,11 @@
 #ifndef SHARE_VM_GC_SHARED_WORKGROUP_HPP
 #define SHARE_VM_GC_SHARED_WORKGROUP_HPP
 
-#include "runtime/thread.inline.hpp"
+#include "memory/allocation.hpp"
+#include "runtime/globals.hpp"
+#include "runtime/thread.hpp"
+#include "utilities/debug.hpp"
+#include "utilities/globalDefinitions.hpp"
 
 // Task class hierarchy:
 //   AbstractGangTask
@@ -43,8 +47,8 @@
 // Forward declarations of classes defined here
 
 class AbstractGangWorker;
-class GangWorker;
-class WorkData;
+class Semaphore;
+class WorkGang;
 
 // An abstract task to be worked on by a gang.
 // You subclass this to supply your own work() method
@@ -62,6 +66,33 @@
   const char* name() const { return _name; }
 };
 
+struct WorkData {
+  AbstractGangTask* _task;
+  uint              _worker_id;
+  WorkData(AbstractGangTask* task, uint worker_id) : _task(task), _worker_id(worker_id) {}
+};
+
+// Interface to handle the synchronization between the coordinator thread and the worker threads,
+// when a task is dispatched out to the worker threads.
+class GangTaskDispatcher : public CHeapObj<mtGC> {
+ public:
+  virtual ~GangTaskDispatcher() {}
+
+  // Coordinator API.
+
+  // Distributes the task out to num_workers workers.
+  // Returns when the task has been completed by all workers.
+  virtual void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) = 0;
+
+  // Worker API.
+
+  // Waits for a task to become available to the worker.
+  // Returns when the worker has been assigned a task.
+  virtual WorkData worker_wait_for_task() = 0;
+
+  // Signal to the coordinator that the worker is done with the assigned task.
+  virtual void     worker_done_with_task() = 0;
+};
 
 // The work gang is the collection of workers to execute tasks.
 // The number of workers run for a task is "_active_workers"
@@ -91,8 +122,6 @@
       _are_ConcurrentGC_threads(are_ConcurrentGC_threads)
   { }
 
-  virtual AbstractGangWorker* allocate_worker(uint which) = 0;
-
   // Initialize workers in the gang.  Return true if initialization succeeded.
   bool initialize_workers();
 
@@ -131,13 +160,24 @@
   void print_worker_threads() const {
     print_worker_threads_on(tty);
   }
+
+ protected:
+  virtual AbstractGangWorker* allocate_worker(uint which) = 0;
 };
 
 // An class representing a gang of workers.
 class WorkGang: public AbstractWorkGang {
-private:
+  // To get access to the GangTaskDispatcher instance.
+  friend class GangWorker;
+
   // Never deleted.
   ~WorkGang();
+
+  GangTaskDispatcher* const _dispatcher;
+  GangTaskDispatcher* dispatcher() const {
+    return _dispatcher;
+  }
+
 public:
   WorkGang(const char* name,
            uint workers,
@@ -146,84 +186,14 @@
 
   // Run a task, returns when the task is done.
   virtual void run_task(AbstractGangTask* task);
-  void run_task(AbstractGangTask* task, uint no_of_parallel_workers);
-
-  // Return true if more workers should be applied to the task.
-  virtual bool needs_more_workers() const {
-    return _started_workers < _active_workers;
-  }
 
 protected:
-  // The monitor which protects these data,
-  // and notifies of changes in it.
-  Monitor*  _monitor;
-  // The task for this gang.
-  AbstractGangTask* _task;
-  // A sequence number for the current task.
-  int _sequence_number;
-  // The number of started workers.
-  uint _started_workers;
-  // The number of finished workers.
-  uint _finished_workers;
-
-public:
   virtual AbstractGangWorker* allocate_worker(uint which);
-
-  // Accessors for fields
-  Monitor* monitor() const {
-    return _monitor;
-  }
-  AbstractGangTask* task() const {
-    return _task;
-  }
-  int sequence_number() const {
-    return _sequence_number;
-  }
-  uint started_workers() const {
-    return _started_workers;
-  }
-  uint finished_workers() const {
-    return _finished_workers;
-  }
-  // Predicates.
-  bool is_idle() const {
-    return (task() == NULL);
-  }
-  // Return the Ith gang worker.
-  GangWorker* gang_worker(uint i) const;
-
-protected:
-  friend class GangWorker;
-  // Note activation and deactivation of workers.
-  // These methods should only be called with the mutex held.
-  void internal_worker_poll(WorkData* data) const;
-  void internal_note_start();
-  void internal_note_finish();
-};
-
-class WorkData: public StackObj {
-  // This would be a struct, but I want accessor methods.
-private:
-  AbstractGangTask* _task;
-  int               _sequence_number;
-public:
-  // Constructor and destructor
-  WorkData() {
-    _task            = NULL;
-    _sequence_number = 0;
-  }
-  ~WorkData() {
-  }
-  AbstractGangTask* task()               const { return _task; }
-  void set_task(AbstractGangTask* value)       { _task = value; }
-  int sequence_number()                  const { return _sequence_number; }
-  void set_sequence_number(int value)          { _sequence_number = value; }
 };
 
 // Several instances of this class run in parallel as workers for a gang.
 class AbstractGangWorker: public WorkerThread {
 public:
-  // Constructors and destructor.
   AbstractGangWorker(AbstractWorkGang* gang, uint id);
 
   // The only real method: run a task for the gang.
@@ -252,30 +222,16 @@
   virtual void loop();
 
 private:
+  WorkData wait_for_task();
+  void run_task(WorkData work);
+  void signal_task_done();
+
+  void print_task_started(WorkData data);
+  void print_task_done(WorkData data);
+
   WorkGang* gang() const { return (WorkGang*)_gang; }
 };
 
-// Dynamic number of worker threads
-//
-// This type of work gang is used to run different numbers of
-// worker threads at different times.  The
-// number of workers run for a task is "_active_workers"
-// instead of "_total_workers" in a WorkGang.  The method
-// "needs_more_workers()" returns true until "_active_workers"
-// have been started and returns false afterwards.  The
-// implementation of "needs_more_workers()" in WorkGang always
-// returns true so that all workers are started.  The method
-// "loop()" in GangWorker was modified to ask "needs_more_workers()"
-// in its loop to decide if it should start working on a task.
-// A worker in "loop()" waits for notification on the WorkGang
-// monitor and execution of each worker as it checks for work
-// is serialized via the same monitor.  The "needs_more_workers()"
-// call is serialized and additionally the calculation for the
-// "part" (effectively the worker id for executing the task) is
-// serialized to give each worker a unique "part".  Workers that
-// are not needed for this tasks (i.e., "_active_workers" have
-// been started before it, continue to wait for work.
-
 // A class that acts as a synchronisation barrier. Workers enter
 // the barrier and must wait until all other workers have entered
 // before any of them may leave.
--- a/hotspot/src/share/vm/runtime/globals.hpp	Mon Jun 29 11:09:39 2015 +0200
+++ b/hotspot/src/share/vm/runtime/globals.hpp	Mon Jun 29 11:11:12 2015 +0200
@@ -1552,6 +1552,10 @@
   product(uint, ParallelGCThreads, 0,                                       \
           "Number of parallel threads parallel gc will use")                \
                                                                             \
+  diagnostic(bool, UseSemaphoreGCThreadsSynchronization, true,              \
+            "Use semaphore synchronization for the GC Threads, "            \
+            "instead of synchronization based on mutexes")                  \
+                                                                            \
   product(bool, UseDynamicNumberOfGCThreads, false,                         \
           "Dynamically choose the number of parallel threads "              \
           "parallel gc will use")                                           \