src/hotspot/share/gc/shared/satbMarkQueue.cpp
changeset 55498 e64383344f14
parent 54970 76d3d96a8bc2
child 58059 baa4dd528de0
--- a/src/hotspot/share/gc/shared/satbMarkQueue.cpp	Wed Jun 26 09:06:32 2019 -0400
+++ b/src/hotspot/share/gc/shared/satbMarkQueue.cpp	Wed Jun 26 13:18:38 2019 -0400
@@ -28,12 +28,15 @@
 #include "logging/log.hpp"
 #include "memory/allocation.inline.hpp"
 #include "oops/oop.inline.hpp"
+#include "runtime/atomic.hpp"
 #include "runtime/mutexLocker.hpp"
+#include "runtime/orderAccess.hpp"
 #include "runtime/os.hpp"
 #include "runtime/safepoint.hpp"
 #include "runtime/thread.hpp"
 #include "runtime/threadSMR.hpp"
 #include "runtime/vmThread.hpp"
+#include "utilities/globalCounter.inline.hpp"
 
 SATBMarkQueue::SATBMarkQueue(SATBMarkQueueSet* qset) :
   // SATB queues are only active during marking cycles. We create
@@ -107,15 +110,66 @@
 
 SATBMarkQueueSet::SATBMarkQueueSet() :
   PtrQueueSet(),
+  _list(),
+  _count_and_process_flag(0),
+  _process_completed_buffers_threshold(SIZE_MAX),
   _buffer_enqueue_threshold(0)
 {}
 
-void SATBMarkQueueSet::initialize(Monitor* cbl_mon,
-                                  BufferNode::Allocator* allocator,
+SATBMarkQueueSet::~SATBMarkQueueSet() {
+  abandon_completed_buffers();
+}
+
+// _count_and_process_flag has flag in least significant bit, count in
+// remaining bits.  _process_completed_buffers_threshold is scaled
+// accordingly, with the lsbit set, so a _count_and_process_flag value
+// is directly comparable with the recorded threshold value.  The
+// process flag is set whenever the count exceeds the threshold, and
+// remains set until the count is reduced to zero.
+
+// Increment count.  If count > threshold, set flag, else maintain flag.
+static void increment_count(volatile size_t* cfptr, size_t threshold) {
+  size_t old;
+  size_t value = Atomic::load(cfptr);
+  do {
+    old = value;
+    value += 2;
+    assert(value > old, "overflow");
+    if (value > threshold) value |= 1;
+    value = Atomic::cmpxchg(value, cfptr, old);
+  } while (value != old);
+}
+
+// Decrement count.  If count == 0, clear flag, else maintain flag.
+static void decrement_count(volatile size_t* cfptr) {
+  size_t old;
+  size_t value = Atomic::load(cfptr);
+  do {
+    assert((value >> 1) != 0, "underflow");
+    old = value;
+    value -= 2;
+    if (value <= 1) value = 0;
+    value = Atomic::cmpxchg(value, cfptr, old);
+  } while (value != old);
+}
+
+// Scale requested threshold to align with count field.  If scaling
+// overflows, just use max value.  Set process flag field to make
+// comparison in increment_count exact.
+static size_t scale_threshold(size_t value) {
+  size_t scaled_value = value << 1;
+  if ((scaled_value >> 1) != value) {
+    scaled_value = SIZE_MAX;
+  }
+  return scaled_value | 1;
+}
+
+void SATBMarkQueueSet::initialize(BufferNode::Allocator* allocator,
                                   size_t process_completed_buffers_threshold,
                                   uint buffer_enqueue_threshold_percentage) {
-  PtrQueueSet::initialize(cbl_mon, allocator);
-  set_process_completed_buffers_threshold(process_completed_buffers_threshold);
+  PtrQueueSet::initialize(allocator);
+  _process_completed_buffers_threshold =
+    scale_threshold(process_completed_buffers_threshold);
   assert(buffer_size() != 0, "buffer size not initialized");
   // Minimum threshold of 1 ensures enqueuing of completely full buffers.
   size_t size = buffer_size();
@@ -207,6 +261,38 @@
   }
 }
 
+// SATB buffer life-cycle - Per-thread queues obtain buffers from the
+// qset's buffer allocator, fill them, and push them onto the qset's
+// list.  The GC concurrently pops buffers from the qset, processes
+// them, and returns them to the buffer allocator for re-use.  Both
+// the allocator and the qset use lock-free stacks.  The ABA problem
+// is solved by having both allocation pops and GC pops performed
+// within GlobalCounter critical sections, while the return of buffers
+// to the allocator performs a GlobalCounter synchronize before
+// pushing onto the allocator's list.
+
+void SATBMarkQueueSet::enqueue_completed_buffer(BufferNode* node) {
+  assert(node != NULL, "precondition");
+  // Increment count and update flag appropriately.  Done before
+  // pushing buffer so count is always at least the actual number in
+  // the list, and decrement never underflows.
+  increment_count(&_count_and_process_flag, _process_completed_buffers_threshold);
+  _list.push(*node);
+}
+
+BufferNode* SATBMarkQueueSet::get_completed_buffer() {
+  BufferNode* node;
+  {
+    GlobalCounter::CriticalSection cs(Thread::current());
+    node = _list.pop();
+  }
+  if (node != NULL) {
+    // Got a buffer so decrement count and update flag appropriately.
+    decrement_count(&_count_and_process_flag);
+  }
+  return node;
+}
+
 #ifndef PRODUCT
 // Helpful for debugging
 
@@ -219,7 +305,7 @@
   tty->cr();
   tty->print_cr("SATB BUFFERS [%s]", msg);
 
-  BufferNode* nd = completed_buffers_head();
+  BufferNode* nd = _list.top();
   int i = 0;
   while (nd != NULL) {
     void** buf = BufferNode::make_buffer_from_node(nd);
@@ -248,6 +334,17 @@
 }
 #endif // PRODUCT
 
+void SATBMarkQueueSet::abandon_completed_buffers() {
+  Atomic::store(size_t(0), &_count_and_process_flag);
+  BufferNode* buffers_to_delete = _list.pop_all();
+  while (buffers_to_delete != NULL) {
+    BufferNode* bn = buffers_to_delete;
+    buffers_to_delete = bn->next();
+    bn->set_next(NULL);
+    deallocate_buffer(bn);
+  }
+}
+
 void SATBMarkQueueSet::abandon_partial_marking() {
   assert(SafepointSynchronize::is_at_safepoint(), "Must be at safepoint.");
   abandon_completed_buffers();