8224167: Refactor PtrQueue completed buffer processing
authorkbarrett
Tue, 21 May 2019 19:19:44 -0400
changeset 54970 76d3d96a8bc2
parent 54969 6bd29804ace0
child 54971 4285b4d13471
8224167: Refactor PtrQueue completed buffer processing Summary: Add handle_completed_buffer and refactor. Reviewed-by: tschatzl, shade
src/hotspot/share/gc/g1/g1DirtyCardQueue.cpp
src/hotspot/share/gc/g1/g1DirtyCardQueue.hpp
src/hotspot/share/gc/shared/ptrQueue.cpp
src/hotspot/share/gc/shared/ptrQueue.hpp
src/hotspot/share/gc/shared/satbMarkQueue.cpp
src/hotspot/share/gc/shared/satbMarkQueue.hpp
src/hotspot/share/gc/shenandoah/shenandoahSATBMarkQueueSet.cpp
src/hotspot/share/gc/shenandoah/shenandoahSATBMarkQueueSet.hpp
--- a/src/hotspot/share/gc/g1/g1DirtyCardQueue.cpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/g1/g1DirtyCardQueue.cpp	Tue May 21 19:19:44 2019 -0400
@@ -66,8 +66,21 @@
   flush();
 }
 
+void G1DirtyCardQueue::handle_completed_buffer() {
+  assert(_buf != NULL, "precondition");
+  BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
+  G1DirtyCardQueueSet* dcqs = dirty_card_qset();
+  if (dcqs->process_or_enqueue_completed_buffer(node)) {
+    reset();                    // Buffer fully processed, reset index.
+  } else {
+    allocate_buffer();          // Buffer enqueued, get a new one.
+  }
+}
+
 G1DirtyCardQueueSet::G1DirtyCardQueueSet(bool notify_when_complete) :
   PtrQueueSet(notify_when_complete),
+  _max_completed_buffers(MaxCompletedBuffersUnlimited),
+  _completed_buffers_padding(0),
   _free_ids(NULL),
   _processed_buffers_mut(0),
   _processed_buffers_rs_thread(0),
@@ -136,6 +149,24 @@
   } while (0)
 #endif // ASSERT
 
+bool G1DirtyCardQueueSet::process_or_enqueue_completed_buffer(BufferNode* node) {
+  if (Thread::current()->is_Java_thread()) {
+    // If the number of buffers exceeds the limit, make this Java
+    // thread do the processing itself.  We don't lock to access
+    // buffer count or padding; it is fine to be imprecise here.  The
+    // add of padding could overflow, which is treated as unlimited.
+    size_t max_buffers = max_completed_buffers();
+    size_t limit = max_buffers + completed_buffers_padding();
+    if ((completed_buffers_num() > limit) && (limit >= max_buffers)) {
+      if (mut_process_buffer(node)) {
+        return true;
+      }
+    }
+  }
+  enqueue_completed_buffer(node);
+  return false;
+}
+
 bool G1DirtyCardQueueSet::mut_process_buffer(BufferNode* node) {
   guarantee(_free_ids != NULL, "must be");
 
--- a/src/hotspot/share/gc/g1/g1DirtyCardQueue.hpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/g1/g1DirtyCardQueue.hpp	Tue May 21 19:19:44 2019 -0400
@@ -47,6 +47,9 @@
 
 // A ptrQueue whose elements are "oops", pointers to object heads.
 class G1DirtyCardQueue: public PtrQueue {
+protected:
+  virtual void handle_completed_buffer();
+
 public:
   G1DirtyCardQueue(G1DirtyCardQueueSet* qset);
 
@@ -57,6 +60,8 @@
   // Process queue entries and release resources.
   void flush() { flush_impl(); }
 
+  inline G1DirtyCardQueueSet* dirty_card_qset() const;
+
   // Compiler support.
   static ByteSize byte_offset_of_index() {
     return PtrQueue::byte_offset_of_index<G1DirtyCardQueue>();
@@ -102,6 +107,12 @@
 
   bool mut_process_buffer(BufferNode* node);
 
+  // If the queue contains more buffers than configured here, the
+  // mutator must start doing some of the concurrent refinement work,
+  size_t _max_completed_buffers;
+  size_t _completed_buffers_padding;
+  static const size_t MaxCompletedBuffersUnlimited = ~size_t(0);
+
   G1FreeIdSet* _free_ids;
 
   // The number of completed buffers processed by mutator and rs thread,
@@ -126,6 +137,11 @@
 
   static void handle_zero_index_for_thread(Thread* t);
 
+  // Either process the entire buffer and return true, or enqueue the
+  // buffer and return false.  If the buffer is completely processed,
+  // it can be reused in place.
+  bool process_or_enqueue_completed_buffer(BufferNode* node);
+
   // Apply G1RefineCardConcurrentlyClosure to completed buffers until there are stop_at
   // completed buffers remaining.
   bool refine_completed_buffer_concurrently(uint worker_i, size_t stop_at);
@@ -147,6 +163,20 @@
   // If any threads have partial logs, add them to the global list of logs.
   void concatenate_logs();
 
+  void set_max_completed_buffers(size_t m) {
+    _max_completed_buffers = m;
+  }
+  size_t max_completed_buffers() const {
+    return _max_completed_buffers;
+  }
+
+  void set_completed_buffers_padding(size_t padding) {
+    _completed_buffers_padding = padding;
+  }
+  size_t completed_buffers_padding() const {
+    return _completed_buffers_padding;
+  }
+
   jint processed_buffers_mut() {
     return _processed_buffers_mut;
   }
@@ -156,4 +186,8 @@
 
 };
 
+inline G1DirtyCardQueueSet* G1DirtyCardQueue::dirty_card_qset() const {
+  return static_cast<G1DirtyCardQueueSet*>(qset());
+}
+
 #endif // SHARE_GC_G1_G1DIRTYCARDQUEUE_HPP
--- a/src/hotspot/share/gc/shared/ptrQueue.cpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/shared/ptrQueue.cpp	Tue May 21 19:19:44 2019 -0400
@@ -62,7 +62,6 @@
   }
 }
 
-
 void PtrQueue::enqueue_known_active(void* ptr) {
   while (_index == 0) {
     handle_zero_index();
@@ -75,6 +74,35 @@
   _buf[index()] = ptr;
 }
 
+void PtrQueue::handle_zero_index() {
+  assert(index() == 0, "precondition");
+
+  if (_buf != NULL) {
+    handle_completed_buffer();
+  } else {
+    // Bootstrapping kludge; lazily initialize capacity.  The initial
+    // thread's queues are constructed before the second phase of the
+    // two-phase initialization of the associated qsets.  As a result,
+    // we can't initialize _capacity_in_bytes in the queue constructor.
+    if (_capacity_in_bytes == 0) {
+      _capacity_in_bytes = index_to_byte_index(qset()->buffer_size());
+    }
+    allocate_buffer();
+  }
+}
+
+void PtrQueue::allocate_buffer() {
+  _buf = qset()->allocate_buffer();
+  reset();
+}
+
+void PtrQueue::enqueue_completed_buffer() {
+  assert(_buf != NULL, "precondition");
+  BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
+  qset()->enqueue_completed_buffer(node);
+  allocate_buffer();
+}
+
 BufferNode* BufferNode::allocate(size_t size) {
   size_t byte_size = size * sizeof(void*);
   void* data = NEW_C_HEAP_ARRAY(char, buffer_offset() + byte_size, mtGC);
@@ -231,8 +259,6 @@
   _process_completed_buffers_threshold(ProcessCompletedBuffersThresholdNever),
   _process_completed_buffers(false),
   _notify_when_complete(notify_when_complete),
-  _max_completed_buffers(MaxCompletedBuffersUnlimited),
-  _completed_buffers_padding(0),
   _all_active(false)
 {}
 
@@ -258,52 +284,6 @@
   _allocator->release(node);
 }
 
-void PtrQueue::handle_zero_index() {
-  assert(index() == 0, "precondition");
-
-  // This thread records the full buffer and allocates a new one (while
-  // holding the lock if there is one).
-  if (_buf != NULL) {
-    if (!should_enqueue_buffer()) {
-      assert(index() > 0, "the buffer can only be re-used if it's not full");
-      return;
-    }
-
-    BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
-    if (qset()->process_or_enqueue_completed_buffer(node)) {
-      // Recycle the buffer. No allocation.
-      assert(_buf == BufferNode::make_buffer_from_node(node), "invariant");
-      assert(capacity() == qset()->buffer_size(), "invariant");
-      reset();
-      return;
-    }
-  }
-  // Set capacity in case this is the first allocation.
-  set_capacity(qset()->buffer_size());
-  // Allocate a new buffer.
-  _buf = qset()->allocate_buffer();
-  reset();
-}
-
-bool PtrQueueSet::process_or_enqueue_completed_buffer(BufferNode* node) {
-  if (Thread::current()->is_Java_thread()) {
-    // If the number of buffers exceeds the limit, make this Java
-    // thread do the processing itself.  We don't lock to access
-    // buffer count or padding; it is fine to be imprecise here.  The
-    // add of padding could overflow, which is treated as unlimited.
-    size_t limit = _max_completed_buffers + _completed_buffers_padding;
-    if ((_n_completed_buffers > limit) && (limit >= _max_completed_buffers)) {
-      if (mut_process_buffer(node)) {
-        // Successfully processed; return true to allow buffer reuse.
-        return true;
-      }
-    }
-  }
-  // The buffer will be enqueued. The caller will have to get a new one.
-  enqueue_completed_buffer(node);
-  return false;
-}
-
 void PtrQueueSet::enqueue_completed_buffer(BufferNode* cbn) {
   MutexLocker x(_cbl_mon, Mutex::_no_safepoint_check_flag);
   cbn->set_next(NULL);
--- a/src/hotspot/share/gc/shared/ptrQueue.hpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/shared/ptrQueue.hpp	Tue May 21 19:19:44 2019 -0400
@@ -71,14 +71,6 @@
     return _capacity_in_bytes;
   }
 
-  void set_capacity(size_t entries) {
-    size_t byte_capacity = index_to_byte_index(entries);
-    assert(_capacity_in_bytes == 0 || _capacity_in_bytes == byte_capacity,
-           "changing capacity " SIZE_FORMAT " -> " SIZE_FORMAT,
-           _capacity_in_bytes, byte_capacity);
-    _capacity_in_bytes = byte_capacity;
-  }
-
   static size_t byte_index_to_index(size_t ind) {
     assert(is_aligned(ind, _element_size), "precondition");
     return ind / _element_size;
@@ -106,11 +98,20 @@
     return byte_index_to_index(capacity_in_bytes());
   }
 
-  PtrQueueSet* qset() { return _qset; }
+  PtrQueueSet* qset() const { return _qset; }
 
   // Process queue entries and release resources.
   void flush_impl();
 
+  // Process (some of) the buffer and leave it in place for further use,
+  // or enqueue the buffer and allocate a new one.
+  virtual void handle_completed_buffer() = 0;
+
+  void allocate_buffer();
+
+  // Enqueue the current buffer in the qset and allocate a new buffer.
+  void enqueue_completed_buffer();
+
   // Initialize this queue to contain a null buffer, and be part of the
   // given PtrQueueSet.
   PtrQueue(PtrQueueSet* qset, bool active = false);
@@ -137,14 +138,6 @@
     else enqueue_known_active(ptr);
   }
 
-  // This method is called when we're doing the zero index handling
-  // and gives a chance to the queues to do any pre-enqueueing
-  // processing they might want to do on the buffer. It should return
-  // true if the buffer should be enqueued, or false if enough
-  // entries were cleared from it so that it can be re-used. It should
-  // not return false if the buffer is still full (otherwise we can
-  // get into an infinite loop).
-  virtual bool should_enqueue_buffer() { return true; }
   void handle_zero_index();
 
   void enqueue_known_active(void* ptr);
@@ -306,7 +299,7 @@
   Monitor* _cbl_mon;  // Protects the fields below.
   BufferNode* _completed_buffers_head;
   BufferNode* _completed_buffers_tail;
-  size_t _n_completed_buffers;
+  volatile size_t _n_completed_buffers;
 
   size_t _process_completed_buffers_threshold;
   volatile bool _process_completed_buffers;
@@ -314,24 +307,11 @@
   // If true, notify_all on _cbl_mon when the threshold is reached.
   bool _notify_when_complete;
 
-  // Maximum number of elements allowed on completed queue: after that,
-  // enqueuer does the work itself.
-  size_t _max_completed_buffers;
-  size_t _completed_buffers_padding;
-
   void assert_completed_buffers_list_len_correct_locked() NOT_DEBUG_RETURN;
 
 protected:
   bool _all_active;
 
-  // A mutator thread does the the work of processing a buffer.
-  // Returns "true" iff the work is complete (and the buffer may be
-  // deallocated).
-  virtual bool mut_process_buffer(BufferNode* node) {
-    ShouldNotReachHere();
-    return false;
-  }
-
   // Create an empty ptr queue set.
   PtrQueueSet(bool notify_when_complete = false);
   ~PtrQueueSet();
@@ -365,9 +345,6 @@
   // return a completed buffer from the list.  Otherwise, return NULL.
   BufferNode* get_completed_buffer(size_t stop_at = 0);
 
-  // To be invoked by the mutator.
-  bool process_or_enqueue_completed_buffer(BufferNode* node);
-
   bool process_completed_buffers() { return _process_completed_buffers; }
   void set_process_completed_buffers(bool x) { _process_completed_buffers = x; }
 
@@ -392,21 +369,6 @@
 
   void merge_bufferlists(PtrQueueSet* src);
 
-  void set_max_completed_buffers(size_t m) {
-    _max_completed_buffers = m;
-  }
-  size_t max_completed_buffers() const {
-    return _max_completed_buffers;
-  }
-  static const size_t MaxCompletedBuffersUnlimited = ~size_t(0);
-
-  void set_completed_buffers_padding(size_t padding) {
-    _completed_buffers_padding = padding;
-  }
-  size_t completed_buffers_padding() const {
-    return _completed_buffers_padding;
-  }
-
   // Notify the consumer if the number of buffers crossed the threshold
   void notify_if_necessary();
 };
--- a/src/hotspot/share/gc/shared/satbMarkQueue.cpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/shared/satbMarkQueue.cpp	Tue May 21 19:19:44 2019 -0400
@@ -56,7 +56,7 @@
 // retains a small enough collection in the buffer, we can continue to
 // use the buffer as-is, instead of enqueueing and replacing it.
 
-bool SATBMarkQueue::should_enqueue_buffer() {
+void SATBMarkQueue::handle_completed_buffer() {
   // This method should only be called if there is a non-NULL buffer
   // that is full.
   assert(index() == 0, "pre-condition");
@@ -64,15 +64,18 @@
 
   filter();
 
-  SATBMarkQueueSet* satb_qset = static_cast<SATBMarkQueueSet*>(qset());
-  size_t threshold = satb_qset->buffer_enqueue_threshold();
+  size_t threshold = satb_qset()->buffer_enqueue_threshold();
   // Ensure we'll enqueue completely full buffers.
   assert(threshold > 0, "enqueue threshold = 0");
   // Ensure we won't enqueue empty buffers.
   assert(threshold <= capacity(),
          "enqueue threshold " SIZE_FORMAT " exceeds capacity " SIZE_FORMAT,
          threshold, capacity());
-  return index() < threshold;
+
+  if (index() < threshold) {
+    // Buffer is sufficiently full; enqueue and allocate a new one.
+    enqueue_completed_buffer();
+  } // Else continue to accumulate in buffer.
 }
 
 void SATBMarkQueue::apply_closure_and_empty(SATBBufferClosure* cl) {
--- a/src/hotspot/share/gc/shared/satbMarkQueue.hpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/shared/satbMarkQueue.hpp	Tue May 21 19:19:44 2019 -0400
@@ -54,20 +54,21 @@
   template<typename Filter>
   inline void apply_filter(Filter filter_out);
 
+protected:
+  virtual void handle_completed_buffer();
+
 public:
   SATBMarkQueue(SATBMarkQueueSet* qset);
 
   // Process queue entries and free resources.
   void flush();
 
+  inline SATBMarkQueueSet* satb_qset() const;
+
   // Apply cl to the active part of the buffer.
   // Prerequisite: Must be at a safepoint.
   void apply_closure_and_empty(SATBBufferClosure* cl);
 
-  // Overrides PtrQueue::should_enqueue_buffer(). See the method's
-  // definition for more information.
-  virtual bool should_enqueue_buffer();
-
 #ifndef PRODUCT
   // Helpful for debugging
   void print(const char* name);
@@ -140,8 +141,12 @@
   void abandon_partial_marking();
 };
 
+inline SATBMarkQueueSet* SATBMarkQueue::satb_qset() const {
+  return static_cast<SATBMarkQueueSet*>(qset());
+}
+
 inline void SATBMarkQueue::filter() {
-  static_cast<SATBMarkQueueSet*>(qset())->filter(this);
+  satb_qset()->filter(this);
 }
 
 // Removes entries from the buffer that are no longer needed, as
--- a/src/hotspot/share/gc/shenandoah/shenandoahSATBMarkQueueSet.cpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/shenandoah/shenandoahSATBMarkQueueSet.cpp	Tue May 21 19:19:44 2019 -0400
@@ -70,19 +70,17 @@
   }
 }
 
-bool ShenandoahSATBMarkQueue::should_enqueue_buffer() {
-  bool should_enqueue = SATBMarkQueue::should_enqueue_buffer();
-  size_t cap = capacity();
-  Thread* t = Thread::current();
-  if (ShenandoahThreadLocalData::is_force_satb_flush(t)) {
-    if (!should_enqueue && cap != index()) {
+void ShenandoahSATBMarkQueue::handle_completed_buffer() {
+  SATBMarkQueue::handle_completed_buffer();
+  if (!is_empty()) {
+    Thread* t = Thread::current();
+    if (ShenandoahThreadLocalData::is_force_satb_flush(t)) {
       // Non-empty buffer is compacted, and we decided not to enqueue it.
       // We still want to know about leftover work in that buffer eventually.
       // This avoid dealing with these leftovers during the final-mark, after
       // the buffers are drained completely. See JDK-8205353 for more discussion.
-      should_enqueue = true;
+      ShenandoahThreadLocalData::set_force_satb_flush(t, false);
+      enqueue_completed_buffer();
     }
-    ShenandoahThreadLocalData::set_force_satb_flush(t, false);
   }
-  return should_enqueue;
 }
--- a/src/hotspot/share/gc/shenandoah/shenandoahSATBMarkQueueSet.hpp	Tue May 21 14:55:30 2019 -0700
+++ b/src/hotspot/share/gc/shenandoah/shenandoahSATBMarkQueueSet.hpp	Tue May 21 19:19:44 2019 -0400
@@ -30,9 +30,10 @@
 #include "runtime/thread.hpp"
 
 class ShenandoahSATBMarkQueue: public SATBMarkQueue {
+protected:
+  virtual void handle_completed_buffer();
 public:
   ShenandoahSATBMarkQueue(SATBMarkQueueSet* qset) : SATBMarkQueue(qset) {}
-  virtual bool should_enqueue_buffer();
 };
 
 class ShenandoahSATBMarkQueueSet : public SATBMarkQueueSet {