src/hotspot/share/jfr/recorder/storage/jfrStorage.cpp
changeset 58863 c16ac7a2eba4
parent 55053 d58e1a447d2b
child 58967 3c2e49d43ba3
--- a/src/hotspot/share/jfr/recorder/storage/jfrStorage.cpp	Wed Oct 30 16:14:56 2019 +0100
+++ b/src/hotspot/share/jfr/recorder/storage/jfrStorage.cpp	Wed Oct 30 19:43:52 2019 +0100
@@ -26,6 +26,7 @@
 #include "jfr/jfrEvents.hpp"
 #include "jfr/jni/jfrJavaSupport.hpp"
 #include "jfr/recorder/jfrRecorder.hpp"
+#include "jfr/recorder/checkpoint/jfrCheckpointManager.hpp"
 #include "jfr/recorder/repository/jfrChunkWriter.hpp"
 #include "jfr/recorder/service/jfrOptionSet.hpp"
 #include "jfr/recorder/service/jfrPostBox.hpp"
@@ -253,6 +254,18 @@
     assert(buffer->empty(), "invariant");
     return true;
   }
+
+  if (buffer->excluded()) {
+    const bool thread_is_excluded = thread->jfr_thread_local()->is_excluded();
+    buffer->reinitialize(thread_is_excluded);
+    assert(buffer->empty(), "invariant");
+    if (!thread_is_excluded) {
+      // state change from exclusion to inclusion requires a thread checkpoint
+      JfrCheckpointManager::write_thread_checkpoint(thread);
+    }
+    return true;
+  }
+
   BufferPtr const promotion_buffer = get_promotion_buffer(unflushed_size, _global_mspace, *this, promotion_retry, thread);
   if (promotion_buffer == NULL) {
     write_data_loss(buffer, thread);
@@ -301,7 +314,7 @@
   assert(buffer != NULL, "invariant");
   assert(buffer->retired(), "invariant");
   const size_t unflushed_size = buffer->unflushed_size();
-  buffer->reinitialize();
+  buffer->concurrent_reinitialization();
   log_registration_failure(unflushed_size);
 }
 
@@ -470,6 +483,7 @@
   assert(t != NULL, "invariant");
   assert(cur != NULL, "invariant");
   assert(cur->lease(), "invariant");
+  assert(!cur->excluded(), "invariant");
   assert(cur_pos != NULL, "invariant");
   assert(native ? t->jfr_thread_local()->native_buffer() == cur : t->jfr_thread_local()->java_buffer() == cur, "invariant");
   assert(t->jfr_thread_local()->shelved_buffer() != NULL, "invariant");
@@ -496,6 +510,9 @@
   // the case for stable thread local buffers; it is not the case for large buffers.
   if (!cur->empty()) {
     flush_regular_buffer(cur, t);
+    if (cur->excluded()) {
+      return cur;
+    }
   }
   assert(t->jfr_thread_local()->shelved_buffer() == NULL, "invariant");
   if (cur->free_size() >= req) {
@@ -584,28 +601,40 @@
 typedef UnBufferedWriteToChunk<JfrBuffer> WriteOperation;
 typedef MutexedWriteOp<WriteOperation> MutexedWriteOperation;
 typedef ConcurrentWriteOp<WriteOperation> ConcurrentWriteOperation;
-typedef ConcurrentWriteOpExcludeRetired<WriteOperation> ThreadLocalConcurrentWriteOperation;
+
+typedef Retired<JfrBuffer, true> NonRetired;
+typedef Excluded<JfrBuffer, true> NonExcluded;
+typedef CompositeOperation<NonRetired, NonExcluded> BufferPredicate;
+typedef PredicatedMutexedWriteOp<WriteOperation, BufferPredicate> ThreadLocalMutexedWriteOperation;
+typedef PredicatedConcurrentWriteOp<WriteOperation, BufferPredicate> ThreadLocalConcurrentWriteOperation;
 
 size_t JfrStorage::write() {
-  const size_t full_size_processed = write_full();
+  const size_t full_elements = write_full();
   WriteOperation wo(_chunkwriter);
-  ThreadLocalConcurrentWriteOperation tlwo(wo);
+  NonRetired nr;
+  NonExcluded ne;
+  BufferPredicate bp(&nr, &ne);
+  ThreadLocalConcurrentWriteOperation tlwo(wo, bp);
   process_full_list(tlwo, _thread_local_mspace);
   ConcurrentWriteOperation cwo(wo);
   process_free_list(cwo, _global_mspace);
-  return full_size_processed + wo.processed();
+  return full_elements + wo.elements();
 }
 
 size_t JfrStorage::write_at_safepoint() {
   assert(SafepointSynchronize::is_at_safepoint(), "invariant");
   WriteOperation wo(_chunkwriter);
   MutexedWriteOperation writer(wo); // mutexed write mode
-  process_full_list(writer, _thread_local_mspace);
+  NonRetired nr;
+  NonExcluded ne;
+  BufferPredicate bp(&nr, &ne);
+  ThreadLocalMutexedWriteOperation tlmwo(wo, bp);
+  process_full_list(tlmwo, _thread_local_mspace);
   assert(_transient_mspace->is_free_empty(), "invariant");
   process_full_list(writer, _transient_mspace);
   assert(_global_mspace->is_full_empty(), "invariant");
   process_free_list(writer, _global_mspace);
-  return wo.processed();
+  return wo.elements();
 }
 
 typedef DiscardOp<DefaultDiscarder<JfrStorage::Buffer> > DiscardOperation;
@@ -613,14 +642,14 @@
 typedef CompositeOperation<MutexedWriteOperation, ReleaseOperation> FullOperation;
 
 size_t JfrStorage::clear() {
-  const size_t full_size_processed = clear_full();
+  const size_t full_elements = clear_full();
   DiscardOperation discarder(concurrent); // concurrent discard mode
   process_full_list(discarder, _thread_local_mspace);
   assert(_transient_mspace->is_free_empty(), "invariant");
   process_full_list(discarder, _transient_mspace);
   assert(_global_mspace->is_full_empty(), "invariant");
   process_free_list(discarder, _global_mspace);
-  return full_size_processed + discarder.processed();
+  return full_elements + discarder.elements();
 }
 
 static void insert_free_age_nodes(JfrStorageAgeMspace* age_mspace, JfrAgeNode* head, JfrAgeNode* tail, size_t count) {
@@ -711,15 +740,25 @@
   ReleaseOperation ro(_transient_mspace, thread);
   FullOperation cmd(&writer, &ro);
   const size_t count = process_full(cmd, control(), _age_mspace);
-  log(count, writer.processed());
-  return writer.processed();
+  if (0 == count) {
+    assert(0 == writer.elements(), "invariant");
+    return 0;
+  }
+  const size_t size = writer.size();
+  log(count, size);
+  return count;
 }
 
 size_t JfrStorage::clear_full() {
   DiscardOperation discarder(mutexed); // a retired buffer implies mutexed access
   const size_t count = process_full(discarder, control(), _age_mspace);
-  log(count, discarder.processed(), true);
-  return discarder.processed();
+  if (0 == count) {
+    assert(0 == discarder.elements(), "invariant");
+    return 0;
+  }
+  const size_t size = discarder.size();
+  log(count, size, true);
+  return count;
 }
 
 static void scavenge_log(size_t count, size_t amount, size_t current) {
@@ -749,6 +788,10 @@
       assert(!t->lease(), "invariant");
       ++_count;
       _amount += t->total_size();
+      if (t->excluded()) {
+        t->clear_excluded();
+      }
+      assert(!t->excluded(), "invariant");
       t->clear_retired();
       t->release();
       _control.decrement_dead();
@@ -767,6 +810,11 @@
   }
   Scavenger<JfrThreadLocalMspace> scavenger(ctrl, _thread_local_mspace);
   process_full_list(scavenger, _thread_local_mspace);
-  scavenge_log(scavenger.processed(), scavenger.amount(), ctrl.dead_count());
-  return scavenger.processed();
+  const size_t count = scavenger.processed();
+  if (0 == count) {
+    assert(0 == scavenger.amount(), "invariant");
+    return 0;
+  }
+  scavenge_log(count, scavenger.amount(), ctrl.dead_count());
+  return count;
 }