--- a/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp Tue Nov 22 04:47:10 2011 -0500
+++ b/hotspot/src/share/vm/gc_implementation/parNew/parNewGeneration.cpp Tue Aug 09 10:16:01 2011 -0700
@@ -305,7 +305,7 @@
inline ParScanThreadState& thread_state(int i);
- void reset(bool promotion_failed);
+ void reset(int active_workers, bool promotion_failed);
void flush();
#if TASKQUEUE_STATS
@@ -322,6 +322,9 @@
ParallelTaskTerminator& _term;
ParNewGeneration& _gen;
Generation& _next_gen;
+ public:
+ bool is_valid(int id) const { return id < length(); }
+ ParallelTaskTerminator* terminator() { return &_term; }
};
@@ -351,9 +354,9 @@
}
-void ParScanThreadStateSet::reset(bool promotion_failed)
+void ParScanThreadStateSet::reset(int active_threads, bool promotion_failed)
{
- _term.reset_for_reuse();
+ _term.reset_for_reuse(active_threads);
if (promotion_failed) {
for (int i = 0; i < length(); ++i) {
thread_state(i).print_and_clear_promotion_failure_size();
@@ -569,6 +572,24 @@
_state_set(state_set)
{}
+// Reset the terminator for the given number of
+// active threads.
+void ParNewGenTask::set_for_termination(int active_workers) {
+ _state_set->reset(active_workers, _gen->promotion_failed());
+ // Should the heap be passed in? There's only 1 for now so
+ // grab it instead.
+ GenCollectedHeap* gch = GenCollectedHeap::heap();
+ gch->set_n_termination(active_workers);
+}
+
+// The "i" passed to this method is the part of the work for
+// this thread. It is not the worker ID. The "i" is derived
+// from _started_workers which is incremented in internal_note_start()
+// called in GangWorker loop() and which is called under the
+// which is called under the protection of the gang monitor and is
+// called after a task is started. So "i" is based on
+// first-come-first-served.
+
void ParNewGenTask::work(int i) {
GenCollectedHeap* gch = GenCollectedHeap::heap();
// Since this is being done in a separate thread, need new resource
@@ -581,6 +602,8 @@
Generation* old_gen = gch->next_gen(_gen);
ParScanThreadState& par_scan_state = _state_set->thread_state(i);
+ assert(_state_set->is_valid(i), "Should not have been called");
+
par_scan_state.set_young_old_boundary(_young_old_boundary);
par_scan_state.start_strong_roots();
@@ -733,7 +756,9 @@
private:
virtual void work(int i);
-
+ virtual void set_for_termination(int active_workers) {
+ _state_set.terminator()->reset_for_reuse(active_workers);
+ }
private:
ParNewGeneration& _gen;
ProcessTask& _task;
@@ -789,18 +814,20 @@
GenCollectedHeap* gch = GenCollectedHeap::heap();
assert(gch->kind() == CollectedHeap::GenCollectedHeap,
"not a generational heap");
- WorkGang* workers = gch->workers();
+ FlexibleWorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
+ _state_set.reset(workers->active_workers(), _generation.promotion_failed());
ParNewRefProcTaskProxy rp_task(task, _generation, *_generation.next_gen(),
_generation.reserved().end(), _state_set);
workers->run_task(&rp_task);
- _state_set.reset(_generation.promotion_failed());
+ _state_set.reset(0 /* bad value in debug if not reset */,
+ _generation.promotion_failed());
}
void ParNewRefProcTaskExecutor::execute(EnqueueTask& task)
{
GenCollectedHeap* gch = GenCollectedHeap::heap();
- WorkGang* workers = gch->workers();
+ FlexibleWorkGang* workers = gch->workers();
assert(workers != NULL, "Need parallel worker threads.");
ParNewRefEnqueueTaskProxy enq_task(task);
workers->run_task(&enq_task);
@@ -856,7 +883,13 @@
assert(gch->kind() == CollectedHeap::GenCollectedHeap,
"not a CMS generational heap");
AdaptiveSizePolicy* size_policy = gch->gen_policy()->size_policy();
- WorkGang* workers = gch->workers();
+ FlexibleWorkGang* workers = gch->workers();
+ assert(workers != NULL, "Need workgang for parallel work");
+ int active_workers =
+ AdaptiveSizePolicy::calc_active_workers(workers->total_workers(),
+ workers->active_workers(),
+ Threads::number_of_non_daemon_threads());
+ workers->set_active_workers(active_workers);
_next_gen = gch->next_gen(this);
assert(_next_gen != NULL,
"This must be the youngest gen, and not the only gen");
@@ -894,13 +927,19 @@
gch->save_marks();
assert(workers != NULL, "Need parallel worker threads.");
- ParallelTaskTerminator _term(workers->total_workers(), task_queues());
- ParScanThreadStateSet thread_state_set(workers->total_workers(),
+ int n_workers = active_workers;
+
+ // Set the correct parallelism (number of queues) in the reference processor
+ ref_processor()->set_active_mt_degree(n_workers);
+
+ // Always set the terminator for the active number of workers
+ // because only those workers go through the termination protocol.
+ ParallelTaskTerminator _term(n_workers, task_queues());
+ ParScanThreadStateSet thread_state_set(workers->active_workers(),
*to(), *this, *_next_gen, *task_queues(),
_overflow_stacks, desired_plab_sz(), _term);
ParNewGenTask tsk(this, _next_gen, reserved().end(), &thread_state_set);
- int n_workers = workers->total_workers();
gch->set_par_threads(n_workers);
gch->rem_set()->prepare_for_younger_refs_iterate(true);
// It turns out that even when we're using 1 thread, doing the work in a
@@ -914,7 +953,8 @@
GenCollectedHeap::StrongRootsScope srs(gch);
tsk.work(0);
}
- thread_state_set.reset(promotion_failed());
+ thread_state_set.reset(0 /* Bad value in debug if not reset */,
+ promotion_failed());
// Process (weak) reference objects found during scavenge.
ReferenceProcessor* rp = ref_processor();
@@ -927,6 +967,8 @@
EvacuateFollowersClosureGeneral evacuate_followers(gch, _level,
&scan_without_gc_barrier, &scan_with_gc_barrier);
rp->setup_policy(clear_all_soft_refs);
+ // Can the mt_degree be set later (at run_task() time would be best)?
+ rp->set_active_mt_degree(active_workers);
if (rp->processing_is_mt()) {
ParNewRefProcTaskExecutor task_executor(*this, thread_state_set);
rp->process_discovered_references(&is_alive, &keep_alive,