liuzhiyu123 发表于 2012-8-8 13:41:32

OSG源码解读 —— OpenThreads库

本帖最后由 liuzhiyu123 于 2012-8-10 14:02 编辑

HandleHolder 类:
这个类封装了HANDLE 句柄。实现了一些对句柄的操作的函数,比如赋值、获取、关闭句柄等等。

Mutex类:
使用的是临界区进行资源的管理。
使用了Win32MutexPrivateData (在Windows系统中)这个类对临界区 CRITICAL_SECTION 进行封装,然后在Mutex 这个类中保存了一个Win32MutexPrivateData 的成员对象,实现了对临界区 CRITICAL_SECTION 的进入lock()和退出 unlock()。

Condition类:
条件量接口类。它依赖于某个Mutex互斥体,互斥体加锁时阻塞所在的线程,解锁或者超过时限则释放此线程,允许其继续运行。
封装了事件内核对象和信号量内核对象的类 Win32ConditionPrivateData。在Condition类中有一个Win32ConditionPrivateData 的类的数据成员,Condition的操作都是对这个数据成员的操作。
Win32ConditionPrivateData 这个类封装了一个 long waiters_,表示等待这个条件(可进入)的线程的数量,一个信号量sema句柄,一个事件内核对象的句柄waiters_done_(表示所有等待者都已经唤醒),和一个是否全部唤醒等待线程的bool was_broadcast 变量。

template <class M> class ReverseScopedLock
{
    private:
      M& m_lock;
      ReverseScopedLock(const ReverseScopedLock&); // prevent copy
      ReverseScopedLock& operator=(const ReverseScopedLock&); // prevent assign
    public:
      explicit ReverseScopedLock(M& m):m_lock(m) {m_lock.unlock();}
      ~ReverseScopedLock(){m_lock.lock();}
};
//在作用域内给这个Mutex解锁,这样别的线程就可以访问,在出了作用域之后加锁

DWORD OpenThreads::cooperativeWait(HANDLE waitHandle, unsigned long timeout){
    Thread* current = Thread::CurrentThread();
    DWORD dwResult ;
    if(current)
    {
      HANDLE cancelHandle = static_cast<Win32ThreadPrivateData*>(current->getImplementation())->cancelEvent.get();
                //获得当前线程的结束内核信号量
      HANDLE handleSet = {waitHandle, cancelHandle};
                //组成一个等待队列
      dwResult = WaitForMultipleObjects(2,handleSet,FALSE,timeout);

      if(dwResult == WAIT_OBJECT_0 + 1 ) throw Win32ThreadCanceled();
                //如果等待的结果是第二个HANDLE授信,也就是当前的线程结束,将抛出一个线程结束异常
    }
    Else //如果当前线程不存在,则一直等待,直到返回结果
    {
      dwResult = WaitForSingleObject(waitHandle,timeout);
    }

    return dwResult;//返回结果
}

    Win32ConditionPrivateData ()
      :waiters_(0),
         sema_(CreateSemaphore(NULL,0,0x7fffffff,NULL)),//设置当前可用的最大资源数都为0,所以线程进来的时候都是被阻塞的
         waiters_done_(CreateEvent(NULL,FALSE,FALSE,NULL)),//表示线程唤醒工作是否完成,在完成之后自动变为未授信状态,留给下一次使用
         was_broadcast_(0)
    {
    }
inline int wait (Mutex& external_mutex, long timeout_ms) // 可能阻塞一个或者多个线程,唤醒的时候一个用signal 多个用broadcast
    {
      // Prevent race conditions on the <waiters_> count.
      InterlockedIncrement(&waiters_);//增加等待的线程的数量

      int result = 0;

      ReverseScopedLock<Mutex> lock(external_mutex);//这个Mutex的作用是退出临界区,这样其他外界的操作函数就可以访问它,参照Block中的block()函数

      // wait in timeslices, giving testCancel() a change to
      // exit the thread if requested.
      try {
            DWORD dwResult = cooperativeWait(sema_.get(), timeout_ms);
                        //等待这个返回结果,可能会抛出异常(当前线程结束)

            if(dwResult != WAIT_OBJECT_0)
                result = (int)dwResult;//如果返回的不是这个mutex授信,可能是超时WAIT_TIMEOUT 则进行结果赋值
      }
      catch(...){//如果当前的线程结束,则会抛出异常,到这里来执行
            // thread is canceled in cooperative wait , do cleanup
            long w = InterlockedDecrement(&waiters_);
            int last_waiter = was_broadcast_ && w == 0;

            if (last_waiter)SetEvent(waiters_done_.get());
            // rethrow
            throw;
      }

      // We're ready to return, so there's one less waiter.
      long w = InterlockedDecrement(&waiters_);
      int last_waiter = was_broadcast_ && w == 0; //表示当前挂起线程是否全部被唤醒

      if (result != -1 && last_waiter)
            SetEvent(waiters_done_.get());//如果全部被唤醒,则waiter_done处于授信状态,broadcast可以结束。

      return result;
}

inline int signal()//执行释放一个,当前的一个线程被唤醒,阻塞一个线程的时候用这个函数唤醒(阻塞,强制一个线程在某个执行点上等待,直到满足继续运行的条件为止)
{
      long w = InterlockedGet(&waiters_);
      int have_waiters = w > 0;//获得当前等待的线程的数目

      int result = 0;

      if (have_waiters)
      {
            if( !ReleaseSemaphore(sema_.get(),1,NULL) )//释放一个信号量,满足一个线程执行
                result = -1;
      }
      return result;
}

    inline int broadcast ()//同步,使同一进程的多个线程可以协调工作,例如让它们都在指定的执行点等待对方,直到全员到齐之后才开始同不运行。
    {
      int have_waiters = 0;
      long w = InterlockedGet(&waiters_);

      if (w > 0)
      {
          // we are broadcasting.
          was_broadcast_ = 1;
          have_waiters = 1;
      }

      int result = 0;
      if (have_waiters)
      {
            // Wake up all the waiters.
            ReleaseSemaphore(sema_.get(), w, NULL);//唤醒所有等待的线程

            cooperativeWait(waiters_done_.get(), INFINITE);//等待所有唤醒的线程被唤醒操作执行完毕

            //end of broadcasting
            was_broadcast_ = 0;
      }
      return result;
    }


Atomic 类:
原子操作类,使用互锁函数进行操作。有两个实现类:Atomic和 AtomicPtr,分别实现了对数的原子操作和对指针的原子操作,分别封装了一个long 类型和一个 void* 类型的volatile 变量(volatile 指明一种特殊的变量,它的值可能随时变化)。对这个变量还有一定的操作函数实现。

在Atomic这个类中,实现了对数据的增加、减少、AND、OR等操作的原子操作的封装。
_InterlockedAnd 、 _InterlockedOr 等等。

在AtomicPtr这个类中,实现了对指针的交换、获取等原子操作。
InterlockedCompareExchangePointer((PVOID volatile*)&_ptr, (PVOID)ptrNew, (PVOID)ptrOld)原子操作。比较_ptr 的值和 ptrOld 的值,如果_ptr 的值与 ptrOld 的值相等,这个ptrNew的值存储的就是 ptrOld ,否则没有其他行为执行。

Block类:
暂停一个线程,等待别的线程去唤醒这个线程。
      inline bool block()
      {
            ScopedLock<OpenThreads::Mutex> mutlock(_mut);//首先对一个Mutex加锁
            if( !_released )
            {
                return _cond.wait(&_mut)==0; //在wait中对这个Mutex解锁,这样release函数就能访问到这个临界区,从而对这个Block解锁
            }
            else
            {
                return true;
            }
      }

      inline void release()
      {
            ScopedLock<OpenThreads::Mutex> mutlock(_mut);//在block之后可以访问到这个变量,所以可以向下进行
            if (!_released)
            {
                _released = true;
                _cond.broadcast();//释放全部block的线程
            }
      }

BlockCount 类:
与Block 的功能相似,需要外界的线程调用complete一定次数,才能释放这个线程。
      inline void block()
      {
            OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
            if (_currentCount)
                _cond.wait(&_mut);
      }//阻塞当前执行这个block 函数的线程,包括主进程

      inline void completed()
      {
            OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
            if (_currentCount>0)
            {
                --_currentCount;

                if (_currentCount==0)
                {
                  // osg::notify(osg::NOTICE)<<"Released"<<std::endl;
                  _cond.broadcast();
                }
            }
      } //执行一次complete,直到_currentCount 为0的时候,释放这个被阻塞的线程



      inline void reset()
      {
            OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
            if (_currentCount!=_blockCount)
            {
                if (_blockCount==0) _cond.broadcast();//释放上一次被阻塞的线程
                _currentCount = _blockCount;//新的开始计数
            }
      }//重置函数,在每次新使用这个类的时候,都要调用reset这个函数,才能发挥作用

      inline void release()
      {
            OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
            if (_currentCount)
            {
                _currentCount = 0;
                _cond.broadcast();
            }
      }//直接释放被阻塞的线程

每次调用Block之前,都要进行reset()函数,已恢复Block的功能,进行下面的工作

Barrier 类:
Win32BarrierPrivateData 这个类封装了Barrier中的一些数据
volatile int       maxcnt;          // number of threads to wait for
volatile int       cnt;             // number of waiting threads

可以阻塞多个线程,并同时释放这些线程。
void Barrier::block(unsigned int numThreads) // numThreads的默认值为0,如果不为0则从新设定最大的阻塞值
{
    Win32BarrierPrivateData *pd =
      static_cast<Win32BarrierPrivateData*>(_prvData);

    if(numThreads != 0) pd->maxcnt = numThreads;//从新设定阻塞值
    int my_phase;

    ScopedLock<Mutex> lock(pd->lock);//防止同时操作
    if( _valid )
    {
      my_phase = pd->phase;
      ++pd->cnt;

      if (pd->cnt == pd->maxcnt) {             // I am the last one
            pd->cnt = 0;                         // reset for next use
            pd->phase = 1 - my_phase;            // toggle phase
            pd->cond.broadcast();
      }else{
            while (pd->phase == my_phase ) {
                pd->cond.wait(&pd->lock);//释放这个Mutex,这样别的线程执行block()函数的时候就不会被阻塞,如果没有达到预定的数目,就会再这里等待,直到释放
            }
      }
    }
}


Thread 类:
需要从写run() 函数,这个是线程的执行函数执行的内容
Win32ThreadPrivateData 这个类封装了Thread需要的一些参数 TLS等等
    size_t stackSize;
    bool isRunning;
    Block threadStartedBlock;
intcancelMode; // 0 - deffered (default) 1-asynch 2-disabled
0:延迟结束线程 这样会使结束事件内核对象有效,通知外面的函数
1:立即结束线程
2:不能结束线程
    bool detached;
    Thread::ThreadPriority threadPriority;
    Thread::ThreadPolicy threadPolicy;
    HandleHolder tid;
    int uniqueId;
int cpunum;

public:
    HandleHolder cancelEvent; //结束事件内核信号量
    struct TlsHolder{ // thread local storage slot
      DWORD getId()
      {
            if (!initialized) {
                ID = TlsAlloc();
                initialized = true;
            }
            return ID;
      }
      TlsHolder() : ID(TLS_OUT_OF_INDEXES), initialized(false) {} //默认初始化TLS
      ~TlsHolder(){
            if (initialized)
                TlsFree(ID);
            ID = TLS_OUT_OF_INDEXES;
      }
    private:
      DWORD ID;
      bool initialized;
    };
static TlsHolder TLS; //申请TLS

ReadWriteMutex 类:
表示读写锁
      virtual int readLock()
      {
            OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_readCountMutex);
            int result = 0;
            if (_readCount==0)
            {
                result = _readWriteMutex.lock();
            }
            ++_readCount;
            return result;
      }

      virtual int readUnlock()
      {
            OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_readCountMutex);
            int result = 0;
            if (_readCount>0)
            {
                --_readCount;
                if (_readCount==0)
                {
                  result = _readWriteMutex.unlock();
                }
            }
            return result;
      }
//readcount表示当前需要加读锁的数目,在ReadLock的时候直到读得数目为0才能加锁(因为还有别的需要读取,所以不能在给被人加锁),在ReadUnlock的时候直到读数目为0才能解锁(因为直到所有读取的全部完成,才能释放这个Mutex)


线程的操作类中一般都有一个Mutex的类数据成员,防止在多个线程操作这个类对象的时候,造成数据的污染,使用了这个Mutex在每个操作之前,这就保证了每一个时刻都只有一个线程在操作这个数据,防止数据的污染。


好久之前的了,还有很多,有时间整理出来大家共享:D

liuzhiyu123 发表于 2012-8-8 13:45:36

这几天网速慢如蜗牛,格式暂时没整理

heye 发表于 2012-8-11 17:07:06

Gooogle 发表于 2012-8-11 21:42:34

不错,改天也写一个分享一下~:victory:

xubaolong 发表于 2012-8-12 19:51:35

顶一个先

liuzhiyu123 发表于 2012-8-13 08:03:04

:)

木子匕 发表于 2012-8-16 17:16:04

我表示有的头晕

tianxiao888 发表于 2012-8-29 11:50:19

支持一下~~~~~~~

liuzhiyu123 发表于 2012-8-29 12:18:12

tianxiao888 发表于 2012-8-29 11:50 static/image/common/back.gif
支持一下~~~~~~~

:lol

lxl010431 发表于 2013-1-22 15:39:34

很好的资源!

hshsh 发表于 2013-1-24 21:56:00

改天仔细看看

bigboy 发表于 2013-1-26 12:50:56

坚决支持

hunter_wwq 发表于 2013-5-16 16:25:08

顶:victory:

ainikuangbiao 发表于 2013-5-29 12:34:24

多谢多谢,期待再出佳作!

xiang_521 发表于 2013-11-20 18:03:41

探究的很深,向前辈们学习。

寻灿者 发表于 2013-11-21 17:50:46

好!

hanml 发表于 2013-12-5 15:11:49

好帖、好帖

foeming 发表于 2015-8-25 11:28:45

看了好几遍了,谢谢!

laksmi 发表于 2016-4-8 11:21:32

受益匪浅
页: [1]
查看完整版本: OSG源码解读 —— OpenThreads库