Skip to content

java-wait-notify

Posted on:July 7, 2020 at 01:25 PM

背景

java的线程间通信,偶尔会用到wait和notify

实现

注册:

// Register native methods of Object
void java_lang_Object::register_natives(TRAPS) {
  InstanceKlass* obj = vmClasses::Object_klass();
  Method::register_native(obj, vmSymbols::hashCode_name(),
                          vmSymbols::void_int_signature(), (address) &JVM_IHashCode, CHECK);
  Method::register_native(obj, vmSymbols::wait_name(),
                          vmSymbols::long_void_signature(), (address) &JVM_MonitorWait, CHECK);
  Method::register_native(obj, vmSymbols::notify_name(),
                          vmSymbols::void_method_signature(), (address) &JVM_MonitorNotify, CHECK);
  Method::register_native(obj, vmSymbols::notifyAll_name(),
                          vmSymbols::void_method_signature(), (address) &JVM_MonitorNotifyAll, CHECK);
  Method::register_native(obj, vmSymbols::clone_name(),
                          vmSymbols::void_object_signature(), (address) &JVM_Clone, THREAD);
}
// -----------------------------------------------------------------------------
// Wait/Notify/NotifyAll
//
// Note: a subset of changes to ObjectMonitor::wait()
// will need to be replicated in complete_exit
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
  JavaThread* current = THREAD;

  assert(InitDone, "Unexpectedly not initialized");

  CHECK_OWNER();  // Throws IMSE if not owner.

  EventJavaMonitorWait event;

  // check for a pending interrupt
  if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
    // post monitor waited event.  Note that this is past-tense, we are done waiting.
    if (JvmtiExport::should_post_monitor_waited()) {
      // Note: 'false' parameter is passed here because the
      // wait was not timed out due to thread interrupt.
      JvmtiExport::post_monitor_waited(current, this, false);

      // In this short circuit of the monitor wait protocol, the
      // current thread never drops ownership of the monitor and
      // never gets added to the wait queue so the current thread
      // cannot be made the successor. This means that the
      // JVMTI_EVENT_MONITOR_WAITED event handler cannot accidentally
      // consume an unpark() meant for the ParkEvent associated with
      // this ObjectMonitor.
    }
    if (event.should_commit()) {
      post_monitor_wait_event(&event, this, 0, millis, false);
    }
    THROW(vmSymbols::java_lang_InterruptedException());
    return;
  }

  assert(current->_Stalled == 0, "invariant");
  current->_Stalled = intptr_t(this);
  current->set_current_waiting_monitor(this);

  // create a node to be put into the queue
  // Critically, after we reset() the event but prior to park(), we must check
  // for a pending interrupt.
  ObjectWaiter node(current);
  node.TState = ObjectWaiter::TS_WAIT;
  current->_ParkEvent->reset();
  OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag

  // Enter the waiting queue, which is a circular doubly linked list in this case
  // but it could be a priority queue or any data structure.
  // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
  // by the the owner of the monitor *except* in the case where park()
  // returns because of a timeout of interrupt.  Contention is exceptionally rare
  // so we use a simple spin-lock instead of a heavier-weight blocking lock.

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");
  AddWaiter(&node);
  Thread::SpinRelease(&_WaitSetLock);

  _Responsible = NULL;

  intx save = _recursions;     // record the old recursion count
  _waiters++;                  // increment the number of waiters
  _recursions = 0;             // set the recursion level to be 1
  exit(current);               // exit the monitor
  guarantee(owner_raw() != current, "invariant");

  // The thread is on the WaitSet list - now park() it.
  // On MP systems it's conceivable that a brief spin before we park
  // could be profitable.
  //
  // TODO-FIXME: change the following logic to a loop of the form
  //   while (!timeout && !interrupted && _notified == 0) park()

  int ret = OS_OK;
  int WasNotified = 0;

  // Need to check interrupt state whilst still _thread_in_vm
  bool interrupted = interruptible && current->is_interrupted(false);

  { // State transition wrappers
    OSThread* osthread = current->osthread();
    OSThreadWaitState osts(osthread, true);

    assert(current->thread_state() == _thread_in_vm, "invariant");

    {
      ClearSuccOnSuspend csos(this);
      ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true /* allow_suspend */);
      if (interrupted || HAS_PENDING_EXCEPTION) {
        // Intentionally empty
      } else if (node._notified == 0) {
        if (millis <= 0) {
          current->_ParkEvent->park();
        } else {
          ret = current->_ParkEvent->park(millis);
        }
      }
    }

    // Node may be on the WaitSet, the EntryList (or cxq), or in transition
    // from the WaitSet to the EntryList.
    // See if we need to remove Node from the WaitSet.
    // We use double-checked locking to avoid grabbing _WaitSetLock
    // if the thread is not on the wait queue.
    //
    // Note that we don't need a fence before the fetch of TState.
    // In the worst case we'll fetch a old-stale value of TS_WAIT previously
    // written by the is thread. (perhaps the fetch might even be satisfied
    // by a look-aside into the processor's own store buffer, although given
    // the length of the code path between the prior ST and this load that's
    // highly unlikely).  If the following LD fetches a stale TS_WAIT value
    // then we'll acquire the lock and then re-fetch a fresh TState value.
    // That is, we fail toward safety.

    if (node.TState == ObjectWaiter::TS_WAIT) {
      Thread::SpinAcquire(&_WaitSetLock, "WaitSet - unlink");
      if (node.TState == ObjectWaiter::TS_WAIT) {
        DequeueSpecificWaiter(&node);       // unlink from WaitSet
        assert(node._notified == 0, "invariant");
        node.TState = ObjectWaiter::TS_RUN;
      }
      Thread::SpinRelease(&_WaitSetLock);
    }

    // The thread is now either on off-list (TS_RUN),
    // on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
    // The Node's TState variable is stable from the perspective of this thread.
    // No other threads will asynchronously modify TState.
    guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");
    OrderAccess::loadload();
    if (_succ == current) _succ = NULL;
    WasNotified = node._notified;

    // Reentry phase -- reacquire the monitor.
    // re-enter contended monitor after object.wait().
    // retain OBJECT_WAIT state until re-enter successfully completes
    // Thread state is thread_in_vm and oop access is again safe,
    // although the raw address of the object may have changed.
    // (Don't cache naked oops over safepoints, of course).

    // post monitor waited event. Note that this is past-tense, we are done waiting.
    if (JvmtiExport::should_post_monitor_waited()) {
      JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);

      if (node._notified != 0 && _succ == current) {
        // In this part of the monitor wait-notify-reenter protocol it
        // is possible (and normal) for another thread to do a fastpath
        // monitor enter-exit while this thread is still trying to get
        // to the reenter portion of the protocol.
        //
        // The ObjectMonitor was notified and the current thread is
        // the successor which also means that an unpark() has already
        // been done. The JVMTI_EVENT_MONITOR_WAITED event handler can
        // consume the unpark() that was done when the successor was
        // set because the same ParkEvent is shared between Java
        // monitors and JVM/TI RawMonitors (for now).
        //
        // We redo the unpark() to ensure forward progress, i.e., we
        // don't want all pending threads hanging (parked) with none
        // entering the unlocked monitor.
        node._event->unpark();
      }
    }

    if (event.should_commit()) {
      post_monitor_wait_event(&event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);
    }

    OrderAccess::fence();

    assert(current->_Stalled != 0, "invariant");
    current->_Stalled = 0;

    assert(owner_raw() != current, "invariant");
    ObjectWaiter::TStates v = node.TState;
    if (v == ObjectWaiter::TS_RUN) {
      enter(current);
    } else {
      guarantee(v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant");
      ReenterI(current, &node);
      node.wait_reenter_end(this);
    }

    // current has reacquired the lock.
    // Lifecycle - the node representing current must not appear on any queues.
    // Node is about to go out-of-scope, but even if it were immortal we wouldn't
    // want residual elements associated with this thread left on any lists.
    guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");
    assert(owner_raw() == current, "invariant");
    assert(_succ != current, "invariant");
  } // OSThreadWaitState()

  current->set_current_waiting_monitor(NULL);

  guarantee(_recursions == 0, "invariant");
  _recursions = save      // restore the old recursion count
                + JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current); //  increased by the deferred relock count
  _waiters--;             // decrement the number of waiters

  // Verify a few postconditions
  assert(owner_raw() == current, "invariant");
  assert(_succ != current, "invariant");
  assert(object()->mark() == markWord::encode(this), "invariant");

  // check if the notification happened
  if (!WasNotified) {
    // no, it could be timeout or Thread.interrupt() or both
    // check for interrupt event, otherwise it is timeout
    if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
      THROW(vmSymbols::java_lang_InterruptedException());
    }
  }

  // NOTE: Spurious wake up will be consider as timeout.
  // Monitor notify has precedence over thread interrupt.
}

wait:

Thread 20 "Thread-0" hit Breakpoint 2, __pthread_cond_wait (cond=0x7ffff0510058, mutex=0x7ffff0510030) at forward.c:121
121     forward.c: No such file or directory.
(gdb) bt
#0  __pthread_cond_wait (cond=0x7ffff0510058, mutex=0x7ffff0510030) at forward.c:121
#1  0x00007ffff6c21713 in os::PlatformEvent::park (this=0x7ffff0510000) at /home/ubuntu/daixiao/jdk/src/hotspot/os/posix/os_posix.cpp:1484
#2  0x00007ffff6bd003c in ObjectMonitor::wait (this=0x7fffac0013b0, millis=0, interruptible=true, __the_thread__=0x7ffff050f5b0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1544
#3  0x00007ffff6e90188 in ObjectSynchronizer::wait (obj=..., millis=0, __the_thread__=0x7ffff050f5b0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/synchronizer.cpp:654
#4  0x00007ffff68298ae in JVM_MonitorWait (env=0x7ffff050f8a8, handle=0x7fffd0df77c0, ms=0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/prims/jvm.cpp:617
#5  0x00007fffe100f68b in ?? ()
#6  0x00000008f7c32db8 in ?? ()
#7  0x00007ffff050f5b0 in ?? ()
#8  0x00007fffd0df7760 in ?? ()
#9  0x00007fffd0df7748 in ?? ()
#10 0x0000000000000000 in ?? ()

notify:

(gdb) bt
#0  __pthread_cond_signal (cond=0x7ffff04f0958) at forward.c:110
#1  0x00007ffff6c21c13 in os::PlatformEvent::unpark (this=0x7ffff04f0900) at /home/ubuntu/daixiao/jdk/src/hotspot/os/posix/os_posix.cpp:1590
#2  0x00007ffff6bcf654 in ObjectMonitor::ExitEpilog (this=0x7fffac0010b0, current=0x7ffff04ef410, Wakee=0x0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1350
#3  0x00007ffff6bcf57b in ObjectMonitor::exit (this=0x7fffac0010b0, current=0x7ffff04ef410, not_suspended=true) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1321
#4  0x00007ffff6bcfe8e in ObjectMonitor::wait (this=0x7fffac0010b0, millis=0, interruptible=true, __the_thread__=0x7ffff04ef410) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/objectMonitor.cpp:1515
#5  0x00007ffff6e90188 in ObjectSynchronizer::wait (obj=..., millis=0, __the_thread__=0x7ffff04ef410) at /home/ubuntu/daixiao/jdk/src/hotspot/share/runtime/synchronizer.cpp:654
#6  0x00007ffff68298ae in JVM_MonitorWait (env=0x7ffff04ef708, handle=0x7fffd0df77c0, ms=0) at /home/ubuntu/daixiao/jdk/src/hotspot/share/prims/jvm.cpp:617
#7  0x00007fffe100f68b in ?? ()
#8  0x00000008f7c32db8 in ?? ()
#9  0x00007ffff04ef410 in ?? ()
#10 0x00007fffd0df7760 in ?? ()
#11 0x00007fffd0df7748 in ?? ()
#12 0x0000000000000000 in ?? ()
void PlatformEvent::park() {       // AKA "down()"
  // Transitions for _event:
  //   -1 => -1 : illegal
  //    1 =>  0 : pass - return immediately
  //    0 => -1 : block; then set _event to 0 before returning

  // Invariant: Only the thread associated with the PlatformEvent
  // may call park().
  assert(_nParked == 0, "invariant");

  int v;

  // atomically decrement _event
  for (;;) {
    v = _event;
    if (Atomic::cmpxchg(&_event, v, v - 1) == v) break;
  }
  guarantee(v >= 0, "invariant");

  if (v == 0) { // Do this the hard way by blocking ...
    int status = pthread_mutex_lock(_mutex);
    assert_status(status == 0, status, "mutex_lock");
    guarantee(_nParked == 0, "invariant");
    ++_nParked;
    while (_event < 0) {
      // OS-level "spurious wakeups" are ignored
      status = pthread_cond_wait(_cond, _mutex);
      assert_status(status == 0 MACOS_ONLY(|| status == ETIMEDOUT),
                    status, "cond_wait");
    }
    --_nParked;

    _event = 0;
    status = pthread_mutex_unlock(_mutex);
    assert_status(status == 0, status, "mutex_unlock");
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other.
    OrderAccess::fence();
  }
  guarantee(_event >= 0, "invariant");
}

demo

#include <stdio.h>
#include <pthread.h>
#include<unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int condition = 0;
int count = 0;
pthread_t thread_id;
int consume( void )
{
   while( 1 )
   {
      pthread_mutex_lock( &mutex );
      while( condition == 0 )
         pthread_cond_wait( &cond, &mutex );
      printf( "Consumed %d\n", count );
      condition = 0;
      pthread_cond_signal( &cond );
      pthread_mutex_unlock( &mutex );
   }

   return( 0 );
}

void*  produce( void * arg )
{
   while( 1 )
   {
      pthread_mutex_lock( &mutex );
      while( condition == 1 )
         pthread_cond_wait( &cond, &mutex );
      printf( "Produced %d\n", count++ );
      condition = 1;
      pthread_cond_signal( &cond );
      pthread_mutex_unlock( &mutex );
   }
   return( 0 );
}

int main( void )
{
   pthread_create( thread_id, NULL, &produce, NULL );
   return consume();
}

相关阅读