7127792: Add the ability to change an existing PeriodicTask's execution interval
authorrbackman
Thu, 04 Oct 2012 14:55:57 +0200
changeset 14390 bd0d881cf1c5
parent 14389 655c5b5f7486
child 14391 df0a1573d5bd
7127792: Add the ability to change an existing PeriodicTask's execution interval Summary: Enables dynamic enrollment / disenrollment from the PeriodicTasks in WatcherThread. Reviewed-by: dholmes, mgronlun
hotspot/src/share/vm/runtime/mutexLocker.cpp
hotspot/src/share/vm/runtime/mutexLocker.hpp
hotspot/src/share/vm/runtime/task.cpp
hotspot/src/share/vm/runtime/task.hpp
hotspot/src/share/vm/runtime/thread.cpp
hotspot/src/share/vm/runtime/thread.hpp
--- a/hotspot/src/share/vm/runtime/mutexLocker.cpp	Mon Nov 05 13:55:31 2012 -0800
+++ b/hotspot/src/share/vm/runtime/mutexLocker.cpp	Thu Oct 04 14:55:57 2012 +0200
@@ -140,6 +140,7 @@
 Monitor* JfrMsg_lock                  = NULL;
 Mutex*   JfrBuffer_lock               = NULL;
 Mutex*   JfrStream_lock               = NULL;
+Monitor* PeriodicTask_lock            = NULL;
 
 #define MAX_NUM_MUTEX 128
 static Monitor * _mutex_array[MAX_NUM_MUTEX];
@@ -285,6 +286,7 @@
   def(JfrMsg_lock                  , Monitor, nonleaf+2,   true);
   def(JfrBuffer_lock               , Mutex,   nonleaf+3,   true);
   def(JfrStream_lock               , Mutex,   nonleaf+4,   true);
+  def(PeriodicTask_lock            , Monitor, nonleaf+5,   true);
 }
 
 GCMutexLocker::GCMutexLocker(Monitor * mutex) {
--- a/hotspot/src/share/vm/runtime/mutexLocker.hpp	Mon Nov 05 13:55:31 2012 -0800
+++ b/hotspot/src/share/vm/runtime/mutexLocker.hpp	Thu Oct 04 14:55:57 2012 +0200
@@ -142,6 +142,7 @@
 extern Monitor* JfrMsg_lock;                     // protects JFR messaging
 extern Mutex*   JfrBuffer_lock;                  // protects JFR buffer operations
 extern Mutex*   JfrStream_lock;                  // protects JFR stream access
+extern Monitor* PeriodicTask_lock;               // protects the periodic task structure
 
 // A MutexLocker provides mutual exclusion with respect to a given mutex
 // for the scope which contains the locker.  The lock is an OS lock, not
--- a/hotspot/src/share/vm/runtime/task.cpp	Mon Nov 05 13:55:31 2012 -0800
+++ b/hotspot/src/share/vm/runtime/task.cpp	Thu Oct 04 14:55:57 2012 +0200
@@ -61,7 +61,7 @@
 }
 #endif
 
-void PeriodicTask::real_time_tick(size_t delay_time) {
+void PeriodicTask::real_time_tick(int delay_time) {
 #ifndef PRODUCT
   if (ProfilerCheckIntervals) {
     _ticks++;
@@ -73,19 +73,39 @@
     _intervalHistogram[ms]++;
   }
 #endif
-  int orig_num_tasks = _num_tasks;
-  for(int index = 0; index < _num_tasks; index++) {
-    _tasks[index]->execute_if_pending(delay_time);
-    if (_num_tasks < orig_num_tasks) { // task dis-enrolled itself
-      index--;  // re-do current slot as it has changed
-      orig_num_tasks = _num_tasks;
+
+  {
+    MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
+    int orig_num_tasks = _num_tasks;
+
+    for(int index = 0; index < _num_tasks; index++) {
+      _tasks[index]->execute_if_pending(delay_time);
+      if (_num_tasks < orig_num_tasks) { // task dis-enrolled itself
+        index--;  // re-do current slot as it has changed
+        orig_num_tasks = _num_tasks;
+      }
     }
   }
 }
 
+int PeriodicTask::time_to_wait() {
+  MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
+                     NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
+
+  if (_num_tasks == 0) {
+    return 0; // sleep until shutdown or a task is enrolled
+  }
+
+  int delay = _tasks[0]->time_to_next_interval();
+  for (int index = 1; index < _num_tasks; index++) {
+    delay = MIN2(delay, _tasks[index]->time_to_next_interval());
+  }
+  return delay;
+}
+
 
 PeriodicTask::PeriodicTask(size_t interval_time) :
-  _counter(0), _interval(interval_time) {
+  _counter(0), _interval((int) interval_time) {
   // Sanity check the interval time
   assert(_interval >= PeriodicTask::min_interval &&
          _interval <= PeriodicTask::max_interval &&
@@ -94,33 +114,40 @@
 }
 
 PeriodicTask::~PeriodicTask() {
-  if (is_enrolled())
-    disenroll();
-}
-
-bool PeriodicTask::is_enrolled() const {
-  for(int index = 0; index < _num_tasks; index++)
-    if (_tasks[index] == this) return true;
-  return false;
+  disenroll();
 }
 
 void PeriodicTask::enroll() {
-  assert(WatcherThread::watcher_thread() == NULL, "dynamic enrollment of tasks not yet supported");
+  MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
+                     NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
 
-  if (_num_tasks == PeriodicTask::max_tasks)
+  if (_num_tasks == PeriodicTask::max_tasks) {
     fatal("Overflow in PeriodicTask table");
+  }
   _tasks[_num_tasks++] = this;
+
+  WatcherThread* thread = WatcherThread::watcher_thread();
+  if (thread) {
+    thread->unpark();
+  } else {
+    WatcherThread::start();
+  }
 }
 
 void PeriodicTask::disenroll() {
-  assert(WatcherThread::watcher_thread() == NULL ||
-         Thread::current() == WatcherThread::watcher_thread(),
-         "dynamic disenrollment currently only handled from WatcherThread from within task() method");
+  MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
+                     NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
 
   int index;
-  for(index = 0; index < _num_tasks && _tasks[index] != this; index++);
-  if (index == _num_tasks) return;
+  for(index = 0; index < _num_tasks && _tasks[index] != this; index++)
+    ;
+
+  if (index == _num_tasks) {
+    return;
+  }
+
   _num_tasks--;
+
   for (; index < _num_tasks; index++) {
     _tasks[index] = _tasks[index+1];
   }
--- a/hotspot/src/share/vm/runtime/task.hpp	Mon Nov 05 13:55:31 2012 -0800
+++ b/hotspot/src/share/vm/runtime/task.hpp	Thu Oct 04 14:55:57 2012 +0200
@@ -49,12 +49,12 @@
   static int num_tasks()   { return _num_tasks; }
 
  private:
-  size_t _counter;
-  const size_t _interval;
+  int _counter;
+  const int _interval;
 
   static int _num_tasks;
   static PeriodicTask* _tasks[PeriodicTask::max_tasks];
-  static void real_time_tick(size_t delay_time);
+  static void real_time_tick(int delay_time);
 
 #ifndef PRODUCT
   static elapsedTimer _timer;                      // measures time between ticks
@@ -69,51 +69,36 @@
   PeriodicTask(size_t interval_time); // interval is in milliseconds of elapsed time
   ~PeriodicTask();
 
-  // Tells whether is enrolled
-  bool is_enrolled() const;
-
   // Make the task active
-  // NOTE: this may only be called before the WatcherThread has been started
+  // For dynamic enrollment at the time T, the task will execute somewhere
+  // between T and T + interval_time.
   void enroll();
 
   // Make the task deactive
-  // NOTE: this may only be called either while the WatcherThread is
-  // inactive or by a task from within its task() method. One-shot or
-  // several-shot tasks may be implemented this way.
   void disenroll();
 
-  void execute_if_pending(size_t delay_time) {
-    _counter += delay_time;
-    if (_counter >= _interval) {
+  void execute_if_pending(int delay_time) {
+    // make sure we don't overflow
+    jlong tmp = (jlong) _counter + (jlong) delay_time;
+
+    if (tmp >= (jlong) _interval) {
       _counter = 0;
       task();
+    } else {
+      _counter += delay_time;
     }
   }
 
   // Returns how long (time in milliseconds) before the next time we should
   // execute this task.
-  size_t time_to_next_interval() const {
+  int time_to_next_interval() const {
     assert(_interval > _counter,  "task counter greater than interval?");
     return _interval - _counter;
   }
 
   // Calculate when the next periodic task will fire.
   // Called by the WatcherThread's run method.
-  // This assumes that periodic tasks aren't entering the system
-  // dynamically, except for during startup.
-  static size_t time_to_wait() {
-    if (_num_tasks == 0) {
-      // Don't wait any more; shut down the thread since we don't
-      // currently support dynamic enrollment.
-      return 0;
-    }
-
-    size_t delay = _tasks[0]->time_to_next_interval();
-    for (int index = 1; index < _num_tasks; index++) {
-      delay = MIN2(delay, _tasks[index]->time_to_next_interval());
-    }
-    return delay;
-  }
+  static int time_to_wait();
 
   // The task to perform at each period
   virtual void task() = 0;
--- a/hotspot/src/share/vm/runtime/thread.cpp	Mon Nov 05 13:55:31 2012 -0800
+++ b/hotspot/src/share/vm/runtime/thread.cpp	Thu Oct 04 14:55:57 2012 +0200
@@ -1217,6 +1217,7 @@
 // timer interrupts exists on the platform.
 
 WatcherThread* WatcherThread::_watcher_thread   = NULL;
+bool WatcherThread::_startable = false;
 volatile bool  WatcherThread::_should_terminate = false;
 
 WatcherThread::WatcherThread() : Thread() {
@@ -1237,6 +1238,55 @@
   }
 }
 
+int WatcherThread::sleep() const {
+  MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
+
+  // remaining will be zero if there are no tasks,
+  // causing the WatcherThread to sleep until a task is
+  // enrolled
+  int remaining = PeriodicTask::time_to_wait();
+  int time_slept = 0;
+
+  // we expect this to timeout - we only ever get unparked when
+  // we should terminate or when a new task has been enrolled
+  OSThreadWaitState osts(this->osthread(), false /* not Object.wait() */);
+
+  jlong time_before_loop = os::javaTimeNanos();
+
+  for (;;) {
+    bool timedout = PeriodicTask_lock->wait(Mutex::_no_safepoint_check_flag, remaining);
+    jlong now = os::javaTimeNanos();
+
+    if (remaining == 0) {
+        // if we didn't have any tasks we could have waited for a long time
+        // consider the time_slept zero and reset time_before_loop
+        time_slept = 0;
+        time_before_loop = now;
+    } else {
+        // need to recalulate since we might have new tasks in _tasks
+        time_slept = (int) ((now - time_before_loop) / 1000000);
+    }
+
+    // Change to task list or spurious wakeup of some kind
+    if (timedout || _should_terminate) {
+        break;
+    }
+
+    remaining = PeriodicTask::time_to_wait();
+    if (remaining == 0) {
+        // Last task was just disenrolled so loop around and wait until
+        // another task gets enrolled
+        continue;
+    }
+
+    remaining -= time_slept;
+    if (remaining <= 0)
+      break;
+  }
+
+  return time_slept;
+}
+
 void WatcherThread::run() {
   assert(this == watcher_thread(), "just checking");
 
@@ -1249,26 +1299,7 @@
 
     // Calculate how long it'll be until the next PeriodicTask work
     // should be done, and sleep that amount of time.
-    size_t time_to_wait = PeriodicTask::time_to_wait();
-
-    // we expect this to timeout - we only ever get unparked when
-    // we should terminate
-    {
-      OSThreadWaitState osts(this->osthread(), false /* not Object.wait() */);
-
-      jlong prev_time = os::javaTimeNanos();
-      for (;;) {
-        int res= _SleepEvent->park(time_to_wait);
-        if (res == OS_TIMEOUT || _should_terminate)
-          break;
-        // spurious wakeup of some kind
-        jlong now = os::javaTimeNanos();
-        time_to_wait -= (now - prev_time) / 1000000;
-        if (time_to_wait <= 0)
-          break;
-        prev_time = now;
-      }
-    }
+    int time_waited = sleep();
 
     if (is_error_reported()) {
       // A fatal error has happened, the error handler(VMError::report_and_die)
@@ -1298,13 +1329,7 @@
       }
     }
 
-    PeriodicTask::real_time_tick(time_to_wait);
-
-    // If we have no more tasks left due to dynamic disenrollment,
-    // shut down the thread since we don't currently support dynamic enrollment
-    if (PeriodicTask::num_tasks() == 0) {
-      _should_terminate = true;
-    }
+    PeriodicTask::real_time_tick(time_waited);
   }
 
   // Signal that it is terminated
@@ -1319,22 +1344,33 @@
 }
 
 void WatcherThread::start() {
-  if (watcher_thread() == NULL) {
+  assert(PeriodicTask_lock->owned_by_self(), "PeriodicTask_lock required");
+
+  if (watcher_thread() == NULL && _startable) {
     _should_terminate = false;
     // Create the single instance of WatcherThread
     new WatcherThread();
   }
 }
 
+void WatcherThread::make_startable() {
+  assert(PeriodicTask_lock->owned_by_self(), "PeriodicTask_lock required");
+  _startable = true;
+}
+
 void WatcherThread::stop() {
+  {
+    MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
+    _should_terminate = true;
+    OrderAccess::fence();  // ensure WatcherThread sees update in main loop
+
+    WatcherThread* watcher = watcher_thread();
+    if (watcher != NULL)
+      watcher->unpark();
+  }
+
   // it is ok to take late safepoints here, if needed
   MutexLocker mu(Terminator_lock);
-  _should_terminate = true;
-  OrderAccess::fence();  // ensure WatcherThread sees update in main loop
-
-  Thread* watcher = watcher_thread();
-  if (watcher != NULL)
-    watcher->_SleepEvent->unpark();
 
   while(watcher_thread() != NULL) {
     // This wait should make safepoint checks, wait without a timeout,
@@ -1352,6 +1388,11 @@
   }
 }
 
+void WatcherThread::unpark() {
+  MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ? NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
+  PeriodicTask_lock->notify();
+}
+
 void WatcherThread::print_on(outputStream* st) const {
   st->print("\"%s\" ", name());
   Thread::print_on(st);
@@ -3658,12 +3699,18 @@
     }
   }
 
-  // Start up the WatcherThread if there are any periodic tasks
-  // NOTE:  All PeriodicTasks should be registered by now. If they
-  //   aren't, late joiners might appear to start slowly (we might
-  //   take a while to process their first tick).
-  if (PeriodicTask::num_tasks() > 0) {
-    WatcherThread::start();
+  {
+      MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
+      // Make sure the watcher thread can be started by WatcherThread::start()
+      // or by dynamic enrollment.
+      WatcherThread::make_startable();
+      // Start up the WatcherThread if there are any periodic tasks
+      // NOTE:  All PeriodicTasks should be registered by now. If they
+      //   aren't, late joiners might appear to start slowly (we might
+      //   take a while to process their first tick).
+      if (PeriodicTask::num_tasks() > 0) {
+          WatcherThread::start();
+      }
   }
 
   // Give os specific code one last chance to start
--- a/hotspot/src/share/vm/runtime/thread.hpp	Mon Nov 05 13:55:31 2012 -0800
+++ b/hotspot/src/share/vm/runtime/thread.hpp	Thu Oct 04 14:55:57 2012 +0200
@@ -722,6 +722,7 @@
  private:
   static WatcherThread* _watcher_thread;
 
+  static bool _startable;
   volatile static bool _should_terminate; // updated without holding lock
  public:
   enum SomeConstants {
@@ -738,6 +739,7 @@
   char* name() const { return (char*)"VM Periodic Task Thread"; }
   void print_on(outputStream* st) const;
   void print() const { print_on(tty); }
+  void unpark();
 
   // Returns the single instance of WatcherThread
   static WatcherThread* watcher_thread()         { return _watcher_thread; }
@@ -745,6 +747,12 @@
   // Create and start the single instance of WatcherThread, or stop it on shutdown
   static void start();
   static void stop();
+  // Only allow start once the VM is sufficiently initialized
+  // Otherwise the first task to enroll will trigger the start
+  static void make_startable();
+
+ private:
+  int sleep() const;
 };