Очередь Майкла и Скотта — различия между версиями

Материал из Викиконспекты
Перейти к: навигация, поиск
(Реализация push)
(Реализация push)
Строка 102: Строка 102:
 
     '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
 
     '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
 
         tail = T.get()
 
         tail = T.get()
         if ('''CAS'''(tail.next, '''null''', newTail)):
+
         '''if''' ('''CAS'''(tail.next, '''null''', newTail)):
             <font color=green>
+
             <font color=green>/*
            /*
+
             Если T указывает на последний добавленный элемент и
             Если получилось добавить ещё один элемент в хвост,  
+
            получилось добавить ещё один элемент в хвост,  
 
             пробуем передвинуть T. Если не получилось передвинуть T,
 
             пробуем передвинуть T. Если не получилось передвинуть T,
 
             значит, другой поток сделал это за нас, завершаем работу.
 
             значит, другой поток сделал это за нас, завершаем работу.
             если получилось - то мы сами передвинули T, завершаем работу
+
             Если получилось - то мы сами передвинули T, завершаем работу
             */
+
             */</font>
            </font>
 
 
             '''CAS'''(T, tail, newTail)
 
             '''CAS'''(T, tail, newTail)
 
             '''return'''
 
             '''return'''
          
+
         else:
 
+
            <font color=green>/*
 
+
            Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
val newTail = Node(x, AtomicReference(null))
+
            Если этого сделать не получилось, значит, это сделал другой поток.
 
+
            Если получилось - значит, наш поток передвинул T на текущий последний элемент.
        while (true) {
+
            В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
            val tail = T.get()
+
            */</font>
            if (tail.next.compareAndSet(null, newTail)) {
+
            '''CAS'''(T, tail, tail.next.get())
                /*
 
                Если T - последний добавленный элемент, добавляем ещё один элемент в хвост и
 
                пробуем передвинуть T. Если не получилось передвинуть T,
 
                значит, другой поток сделал это за нас, завершаем работу
 
                */
 
                T.compareAndSet(tail, newTail)
 
                return
 
            } else {
 
                /*
 
                Если T - не последний элемент, то передвигаем T на последний элемент
 
                Если этого сделать не получилось, значит, это сделал другой поток
 
                В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь
 
                */
 
                T.compareAndSet(tail, tail.next.get())
 
            }
 
        }
 
  
 
==Примечания==
 
==Примечания==

Версия 01:39, 2 октября 2018

Эта статья находится в разработке!

Очередь Майкла и Скотта (Michael-Scott Queue) - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и Michael L. Scot в статье [1].

Структура очереди

Очередь моделируется с помощью односвязного списка. Каждый элемент списка ([math]Node[/math]) содержит ссылку на хранимые в нём данные и указатель на следующий элемент списка (который можно менять атомарно).

case class Node(val data: Int, val next: AtomicReference<Node>)

Если узел [math]node[/math] является последним в списке, то его [math]next[/math] указывает на [math]null[/math].

Сама очередь состоит из двух указателей: на голову [math]H[/math] и на хвост [math]T[/math], которые можно менять атомарно. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.

Узел списка, на который указывает [math]H[/math], является фиктивным (dummy). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного dummy-элемента, на который указывают [math]T[/math] и [math]H[/math].

class Queue
    dummy = new Node(null, new AtomicReference<Node>(null))
    head = new AtomicReference<Node>(dummy)
    tail = new AtomicReference<Node>(dummy)

// TODO; картинка

Будем поддерживать следующий инвариант: в нашей очереди [math]H[/math] указывает на узел, находящийся не правее узла, на который указывает [math]T[/math]

Идея реализации

Удаление элемента

Для удаления элемента необходимо переместить указатель [math]H[/math] на следующую в списке вершину.

def pop(): Int
    if (H.next == null):
        throw new EmptyException()
    H = H.next
    return H.data //H - новый фиктивный элемент

Добавление элемента

Создадим новый узел списка, и добавим его в конец очереди.

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    T.next = newTail //Добавление новой вершины в очередь
    T = T.next //Изменение хвоста списка

Многопоточная реализация

Будем при всех изменениях указателей на вершины списка использовать [math]CAS[/math] (то есть при изменении [math]T[/math], [math]H[/math], и [math]T.next[/math])

Удаление элемента

Для удаления элемента необходимо переместить указатель [math]H[/math] на следующую в списке вершину.

def pop(): Int
    while (true): //Поток пытается в CAS - цикле поменять указатель на H, пока не получится
        head = H.get()
        if (head.next == null):
            throw new EmptyException()
        newHead = head.next.get()
        if (CAS(H, head, nextHead)):
            return newHead.data

Добавление элемента

Создадим новый узел списка, и добавим его в конец очереди.

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    while (true): //Поток пытается в CAS - цикле поменять T.next, пока не получится
        tail = T.get()
        curTail = tail.next
        if (CAS(curTail, null, newTail)): //Поток пытается добавить элемент в конец очереди
            break
   while (true): //Поток пытается в CAS - цикле поменять указатель на T, пока не получится
        tail = T.get()
        nextTail = tail.next.get()
        if (CAS(T, tail, nextTail)):
            break

При данной реализации мы сталкиваемся со следующей проблемой

Описание проблемы

Рассмотрим ситуацию, при которой два потока [math]A[/math] и [math]B[/math] добавляют в очередь элементы [math]elem[/math] и [math]elem'[/math]. Рассмотрим следующую последовательность действий:

  1. Поток [math]A[/math] добавляет в очередь новую вершину, изменяя [math]T.next[/math], но не успевает изменить [math]T[/math] так, чтобы он указывал на только что добавленную вершину.
  2. Планировщик операционной системы усыпляет поток [math]A[/math].
  3. Поток [math]B[/math] собирается добавить новую вершину в очередь, но не может этого сделать, так как постоянно проваливает операцию [math]CAS(T.next, null, newTail)[/math] (T.next не указывает на [math]null[/math], так как поток [math]A[/math] на шаге [math]1[/math] добавил в очередь новую вершину, но не передвинул [math]T[/math])
  4. Поток [math]B[/math] не сможет добавить в очередь новую вершину (а следовательно, завершить операцию [math]push[/math]), до тех пор, пока планировщик операционной системы не разбудит поток [math]A[/math], и поток [math]A[/math] не завершит добавление (то есть не передвинет [math]T[/math] на вершину, добавленную на шаге [math]1[/math].)

Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.

Корректная lock-free реализация

Основная идея

Нельзя выполнить добавление элемента в очередь и перемещение [math]T[/math] атомарно. В таком случае, пусть остальные потоки помогают перенести указатель на хвост очереди. Если поток видит непустой [math]T.next[/math] (то есть если он провалил [math]CAS(tail.next, null, newTail)[/math]), то он должен помочь перенести [math]T[/math], то есть выполнить [math]CAS(T, tail, tail.next.get())[/math] однократно. Если [math]CAS[/math] выполнен успешно, то хвост перемещён успешно (а значит, наш поток должен вернуться к добавлению нового элемента). Если же он выполнен неудачно, то это значит, что [math]T[/math] уже не указывает на [math]tail[/math], а значит, другой поток уже успешно переместил хвост (а значит, наш поток должен вернуться к добавлению нового элемента).

Реализация [math]push[/math]

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    while (true): //CAS-цикл
        tail = T.get()
        if (CAS(tail.next, null, newTail)):
            /*
            Если T указывает на последний добавленный элемент и 
            получилось добавить ещё один элемент в хвост, 
            пробуем передвинуть T. Если не получилось передвинуть T,
            значит, другой поток сделал это за нас, завершаем работу.
            Если получилось - то мы сами передвинули T, завершаем работу
            */
            CAS(T, tail, newTail)
            return
        else:
            /*
            Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
            Если этого сделать не получилось, значит, это сделал другой поток.
            Если получилось - значит, наш поток передвинул T на текущий последний элемент.
            В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
            */
            CAS(T, tail, tail.next.get())

Примечания

Источники информации

  • Maurice Herliny & Nir Shavit - The Art of Multiprocessor programming, стр 230