带安全内存回收的无锁双向链表





3.00/5 (2投票s)
这是第二次无锁实现(第一次由 H°akan Sundell 实现),仅需要单字比较并交换原子原语。
引言
在上一篇文章中,我介绍了一个无锁双向链表。在具有垃圾回收的环境中,内存回收是自动的。然而,在没有垃圾回收的情况下,我们需要谨慎地回收内存,因为这可能会导致悬空引用(指向已回收内存单元的引用)。我们使用危险指针(Hazard Pointers)[Michael 2004] 进行安全的内存回收。危险指针方法论的抽象层面是,线程在使用 ABA 易损比较之前,将一个全局(危险)指针设置为共享指针的值。然后,当使用 CAS 将共享指针的值设置为新值时,要确保该新值不是共享指针之前拥有的任何值,并且危险指针也指向该值。这确保了如果一个线程已将危险指针设置为共享指针的值,那么该值就不能用于 ABA 易损比较,因为计算共享指针新值的函数不会返回该值。我们将危险指针方法论应用于无锁双向链表。
背景
双向链表是项目的列表,项目可以位于内存中的任何位置。每个项目包含两个字段。Next 字段指向列表中的下一个项目。Prev 字段指向列表中的上一个项目。这两个字段允许在任一方向上遍历列表。双向链表支持插入和删除操作。这些操作的实现非常简单。但在多线程环境中,最常用的方法是互斥,它存在死锁和优先级反转的风险。这些问题的答案是无锁性。在无锁系统中,当一个线程执行几步操作后,整个系统就会取得进展。根据定义,无锁系统没有死锁。但是存在饥饿的风险。无等待系统是无锁的,并且还可以避免饥饿。欲了解更多信息,您可以阅读我之前的文章。
危险指针
主要的共享数据结构 (HPRecType) 是危险指针 (HP) 结构体的链表。链表被初始化,包含 N 个参与线程中每个线程的一个 HP 数组。危险指针的总数是 N*K,其中 K 是危险指针的最大数量。
struct HPRecType
{
Node *HP[K];
HPRecType *Next;
};
链表数据结构增加了 rlist 数组(已退休列表)和 rcount 数组(已退休计数),以维护每个线程的已退休节点列表。
__declspec(align(InterLockedAlign))struct LinkedList
{
volatile AnnounceOp *announce;//current announcement
Node *first;//first node
Node *end;//end node
list <Node* > rlist[THREADCOUNT];
int rcount[THREADCOUNT];
HPRecType *HeadHPRec;
};
RetireNode 例程负责将已退休的节点插入 rlist 并更新 rcount。每当 rcount 达到阈值 R 时,线程就会使用 Scan 例程扫描危险指针列表。
void RetireNode(HPRecType*HeadHPRec, Node *node,list < Node* > & rlist,int & rcount)
{
rlist.push_back(node);
rcount++;
if(rcount>=THREADCOUNT*K)
{
Scan(HeadHPRec,rlist,rcount);
}
}
Scan 由两部分组成。第一部分涉及扫描 HP 链表中的非空值。每当遇到非空值时,它会被插入到本地列表 plist 中。
Scan 的第二部分涉及查找 rlist 中的每个节点是否与 plist 中的指针匹配。如果查找失败,则该节点被识别为可以释放。否则,它将保留在 rlist 中,直到当前线程下次扫描。
void Scan(HPRecType *head,list<Node*> & rlist,int & rcount)
{
list < Node* > plist;
HPRecType *hprec=head;
while(hprec!=NULL)
{
for(int i=0;i < K;i++)
{
Node *hptr=hprec-> HP[i];
if(hptr!=NULL)
{
plist.push_back(hptr);
}
}
hprec=hprec-> Next;
}
list <Node*> tmplist=rlist;
rlist.clear();
rcount=0;
Node *node=tmplist.back();
tmplist.pop_back();
while(node!=NULL)
{
if(Lookup(plist,node))
{
rlist.push_back(node);
rcount++;
}
else
{
_aligned_free(node);
}
if(tmplist.empty()==false)
{
node=tmplist.back();
tmplist.pop_back();
}
else
{
node=NULL;
}
}
plist.clear();
}
危险指针在以下例程中进行初始化。
void Initialize(LinkedList *l)
{
l->HeadHPRec=new HPRecType;
HPRecType *currHPRec=l->HeadHPRec;
for(int i=0;i<THREADCOUNT;i++)
{
for(int i=0;i<K;i++)
{
currHPRec->HP[i]=0;
}
currHPRec->Next=new HPRecType;
currHPRec=currHPRec->Next;
}
currHPRec->Next=0;
}
危险引用是指在没有进一步安全验证的情况下将被用于访问可能不安全内存的地址,或作为 ABA 易损比较的目标地址或预期值。
首先,我们检查遍历链表以识别危险和危险引用的线程。
for(i=0;i<iterations;i++)
{
Node *p=(Node *)_aligned_malloc(sizeof(struct Node),InterLockedAlign );//new Node;
assert(p);
p->data=i;
do
{
insertCount++;
x=first->rlink;
while(1)
{
if(rand()%2==0)
break;
else
{
if(x->rlink==l.end)
x=first->rlink;
else if(x->rlink!=0)
x=x->rlink;
else
x=first->rlink;
}
}
}while(!Insert(p,x));
}
x=first->rlink;
上述访问是一个访问危险,因为链表中的第二个节点可能已被另一个线程移除并回收。
x->rlink==l.end
上述访问 (x->rlink) 是一个访问危险,因为节点 x 可能已被另一个线程移除并回收。
x=x->rlink;
上述访问 (x->rlink) 是一个访问危险,因为节点 x 可能已被另一个线程移除并回收。
对于每个危险引用,我们在上述例程中插入以下步骤:
将作为引用的目标节点的地址写入一个可用的危险指针。
验证节点是否安全(节点可能已被移除)。如果验证成功,则遵循目标算法的正常控制流。否则,重试。以下是已添加危险指针的代码。
for(i=0;i<iterations;i++)
{
Node *p=(Node *)_aligned_malloc(sizeof(struct Node),InterLockedAlign );//new Node;
assert(p);
p->data=i;
bool inserted=false;
do
{
x=first->rlink;
hPrec->HP[0]=x;
if(first->rlink!=x) continue;
Node *x_rlink=x->rlink;
hPrec->HP[1]=x_rlink;
if(x->rlink!=x_rlink) continue;
bool safe=true;
while(1)
{
if(rand()%2==0)
break;
else
{
Node *x_rlink=x->rlink;
if(x_rlink!=0)
{
hPrec->HP[0]=x_rlink;
if(x->rlink!=x_rlink)
{
safe=false;
break;
}
x=x->rlink;
x_rlink=x->rlink;
hPrec->HP[1]=x_rlink;
if(x->rlink!=x_rlink)
{
safe=false;
break;
}
}
else
{
x=first->rlink;
hPrec->HP[0]=x;
if(first->rlink!=x) {
safe=false;
break;
}
Node *x_rlink=x->rlink;
hPrec->HP[1]=x_rlink;
if(x->rlink!=x_rlink) {
safe=false;
break;
}
}
}
}
if(safe==false)
continue;
inserted=Insert(&l,p,x,hPrec,l.rlist[threadNum],l.rcount[threadNum]);
}while(!inserted);
}
接下来,我们检查 Insert 以识别危险和危险引用。
int Insert(Node *p,Node *x)
{
if(p==0||x==0) return 0;
AnnounceOp *curAnnouncedOp;
AnnounceOp *nextAnnounceOp=(AnnounceOp*)_aligned_malloc(sizeof(struct AnnounceOp),InterLockedAlign );//To announce an insert operation.
assert(nextAnnounceOp);
while(1)
{
curAnnouncedOp=(AnnounceOp *)l.announce;
if(curAnnouncedOp->opName==NONE)
{
try
{
if(l.first==x||l.end==x||l.end->llink==x)
{
_aligned_free(nextAnnounceOp);
return 0;
}
p->llink=x;
p->rlink=x->rlink;
if(p->rlink==0||p->llink==0) goto label;
nextAnnounceOp->opName=INSERT;
nextAnnounceOp->insert.args.p=p;
nextAnnounceOp->insert.args.x=x;
nextAnnounceOp->insert.lv.x_rlink=x->rlink;
if(nextAnnounceOp->insert.lv.x_rlink==0) goto label;
nextAnnounceOp->insert.lv.x_rlink_address=&x->rlink;
nextAnnounceOp->insert.lv.x_rlink_llink=nextAnnounceOp->insert.lv.x_rlink->llink;
if(nextAnnounceOp->insert.lv.x_rlink_llink==0) goto label;
nextAnnounceOp->insert.lv.x_rlink_llink_address=&nextAnnounceOp->insert.lv.x_rlink->llink;
void *v1=reinterpret_cast<void *>(nextAnnounceOp);
void *v2=reinterpret_cast<void *>(curAnnouncedOp);
void *res=(void *)InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(&l.announce),(LONG)v1,(LONG)v2);
if(res==v2)
{
InsertHelper(nextAnnounceOp);
return 1;
}
}
catch(...)
{
printf("Exception in Insert\n");
_aligned_free(nextAnnounceOp);
return 0;
}
}
else if(curAnnouncedOp->opName==INSERT)
{
InsertHelper(curAnnouncedOp);
}
else if(curAnnouncedOp->opName==DELET)
{
DeleteHelper(curAnnouncedOp);
}
}
label:
_aligned_free(nextAnnounceOp);
return 0;
}
curAnnouncedOp=(AnnounceOp *)l->announce;
上述访问 (l->announce) 是一个访问危险,因为该结构可能已被另一个线程移除并回收。
nextAnnounceOp->insert.lv.x_rlink=x->rlink;
上述访问 (x->rlink) 是一个访问危险,因为节点 x 可能已被另一个线程移除并回收。
nextAnnounceOp->insert.lv.x_rlink_address=&x->rlink;
上述访问 (x->rlink) 是一个访问危险,因为节点 x 可能已被另一个线程移除并回收。
nextAnnounceOp->insert.lv.x_rlink_llink=nextAnnounceOp->insert.lv.x_rlink->llink;
上述访问 (x_rlink->llink) 是一个访问危险,因为节点 x 可能已被另一个线程移除并回收。
如果 CAS 成功,nextAnnounceOp 将成为一个危险引用。
上面的代码中的 InterlockedCompareExchange 是 ABA 危险。
以下是已添加危险指针的代码,可确保安全的内存回收和 ABA 防止。
int Insert(LinkedList *l,Node *p,Node *x,HPRecType *hPrec,list<Node*>&rlist,int &rcount)
{
if(p==0||x==0) return 0;
AnnounceOp *curAnnouncedOp;
AnnounceOp *nextAnnounceOp=(AnnounceOp*)_aligned_malloc(sizeof(struct AnnounceOp),InterLockedAlign );
assert(nextAnnounceOp);
while(1)
{
curAnnouncedOp=(AnnounceOp *)l->announce;
hPrec->HP[2]=(Node*)curAnnouncedOp;
if(l->announce!=curAnnouncedOp) continue;
if(curAnnouncedOp->opName==NONE)
{
try
{
if(l->first==x||l->end==x||l->end->llink==x)
{
_aligned_free(nextAnnounceOp);
return 0;
}
p->llink=x;
p->rlink=x->rlink;
if(p->rlink==0||p->llink==0) goto label;
nextAnnounceOp->opName=INSERT;
nextAnnounceOp->insert.args.p=p;
nextAnnounceOp->insert.args.x=x;
nextAnnounceOp->insert.lv.x_rlink=x->rlink;
hPrec->HP[3]=nextAnnounceOp->insert.lv.x_rlink;
nextAnnounceOp->insert.lv.x_rlink_address=&x->rlink;
if(nextAnnounceOp->insert.lv.x_rlink!=x->rlink) goto label;
if(nextAnnounceOp->insert.lv.x_rlink==0) goto label;
nextAnnounceOp->insert.lv.x_rlink_llink=nextAnnounceOp->insert.lv.x_rlink->llink;
hPrec->HP[4]=nextAnnounceOp->insert.lv.x_rlink_llink;
nextAnnounceOp->insert.lv.x_rlink_llink_address=&nextAnnounceOp->insert.lv.x_rlink->llink;
if(nextAnnounceOp->insert.lv.x_rlink->llink!=nextAnnounceOp->insert.lv.x_rlink_llink) goto label;
if(nextAnnounceOp->insert.lv.x_rlink_llink==0) goto label;//node next to node x is unlinked
hPrec->HP[5]=(Node*)nextAnnounceOp;
void *v1=reinterpret_cast<void *>(nextAnnounceOp);
void *v2=reinterpret_cast<void *>(curAnnouncedOp);
void *res=(void *)InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(&l->announce),(LONG)v1,(LONG)v2);
if(res==v2)
{
RetireNode(l->HeadHPRec,(Node*)v2,rlist,rcount);
InsertHelper(l,nextAnnounceOp,rlist,rcount);
return 1;
}
}
catch(...)
{
printf("Exception in Insert\n");
_aligned_free(nextAnnounceOp);
return 0;
}
}
else if(curAnnouncedOp->opName==INSERT)
{
InsertHelper(l,curAnnouncedOp,rlist,rcount);
}
}
label:
_aligned_free(nextAnnounceOp);
return 0;
}
接下来,我们检查 InsertHelper 例程。
void InsertHelper(AnnounceOp *curAnnouncedOp)
{
InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(curAnnouncedOp->insert.lv.x_rlink_address),(LONG)curAnnouncedOp->insert.args.p,(LONG)curAnnouncedOp->insert.lv.x_rlink);
InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(curAnnouncedOp->insert.lv.x_rlink_llink_address),(LONG)curAnnouncedOp->insert.args.p,(LONG)curAnnouncedOp->insert.lv.x_rlink_llink);
AnnounceOp *nextAnnounceOp=(AnnounceOp*)_aligned_malloc(sizeof(struct AnnounceOp),InterLockedAlign );
assert(nextAnnounceOp);
nextAnnounceOp->opName=NONE;
void *v1=reinterpret_cast<void *>(nextAnnounceOp);
void *v2=reinterpret_cast<void *>(curAnnouncedOp);
if(InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(&l.announce),(LONG)v1,(LONG)v2)==(LONG)v2)
{
}
else
{
_aligned_free(nextAnnounceOp);
}
}
InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(curAnnouncedOp->insert.lv.x_rlink_address),(LONG)curAnnouncedOp->insert.args.p,(LONG)curAnnouncedOp->insert.lv.x_rlink);
上述访问 (curAnnouncedOp->insert.lv.x_rlink_address) 也是一个访问危险,因为节点 x 可能已被另一个线程移除并回收。
InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(curAnnouncedOp->insert.lv.x_rlink_llink_address),(LONG)curAnnouncedOp->insert.args.p,(LONG)curAnnouncedOp->insert.lv.x_rlink_llink);
上述访问 (curAnnouncedOp->insert.lv.x_rlink_llink_address) 也是一个访问危险,因为节点 x 的右侧节点可能已被另一个线程移除并回收。
以下是已添加危险指针的代码,可确保安全的内存回收和 ABA 防止。
void InsertHelper(LinkedList *l,AnnounceOp *curAnnouncedOp,list<Node*>&rlist,int &rcount)
{
InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(curAnnouncedOp->insert.lv.x_rlink_address),(LONG)curAnnouncedOp->insert.args.p,(LONG)curAnnouncedOp->insert.lv.x_rlink);
InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(curAnnouncedOp->insert.lv.x_rlink_llink_address),(LONG)curAnnouncedOp->insert.args.p,(LONG)curAnnouncedOp->insert.lv.x_rlink_llink);
AnnounceOp *nextAnnounceOp=(AnnounceOp*)_aligned_malloc(sizeof(struct AnnounceOp),InterLockedAlign );
assert(nextAnnounceOp);
nextAnnounceOp->opName=NONE;
void *v1=reinterpret_cast<void *>(nextAnnounceOp);
void *v2=reinterpret_cast<void *>(curAnnouncedOp);
if(InterlockedCompareExchange(reinterpret_cast<volatile LONG *>(&l->announce),(LONG)v1,(LONG)v2)==(LONG)v2)
{
RetireNode(l->HeadHPRec, (Node*)v2,rlist,rcount);
}
else
{
_aligned_free(nextAnnounceOp);
}
}
我使用 _CrtMemCheckpoint 和 _CrtMemDifference 检查了内存泄漏,没有发现任何问题。
在下一篇文章中,我将把危险指针方法论应用于 delete 例程(该例程负责从双向链表中删除节点)。
参考
- [Michael 2004]Michael, M.M. “Hazard pointers: Safe memory reclamation for lock-free objects,” IEEE Transactions on Parallel and Distributed Systems, vol. 15, no. 8, Aug. 2004。
- A Lock Free Doubly Linked List by Muhammad Sarmad Asghar published in codeproject。