Очередь Майкла и Скотта — различия между версиями

Материал из Викиконспекты
Перейти к: навигация, поиск
(Реализация push)
м (rollbackEdits.php mass rollback)
 
(не показаны 23 промежуточные версии 8 участников)
Строка 1: Строка 1:
{{В разработке}}
+
'''Очередь Майкла и Скотта''' (англ. ''Michael-Scott Queue'') - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и  Michael L. Scott <ref>[http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf? Simple, Fast, and Practical Non-Blocking and BlockingConcurrent Queue Algorithms]</ref>.
'''Очередь Майкла и Скотта''' ''(Michael-Scott Queue)'' - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и  Michael L. Scot в статье <ref>[http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf? Simple, Fast, and Practical Non-Blocking and BlockingConcurrent Queue Algorithms]</ref>.
 
 
== Структура очереди ==
 
== Структура очереди ==
  
Очередь моделируется с помощью односвязного списка. Каждый элемент списка (<tex>Node</tex>) содержит ссылку на хранимые в нём данные и указатель на следующий элемент списка (который можно менять атомарно).
+
Очередь построена на односвязном списке. Каждый элемент списка <tex>Node</tex> содержит ссылку на хранимые в нём данные и атомарный указатель на следующий элемент списка.
  
  '''case class''' Node('''val''' data: '''Int''', '''val''' next: AtomicReference<Node>)
+
  '''class''' Node('''val''' data: '''Int''', '''val''' next: AtomicReference<Node>)
  
Если узел <tex>node</tex> является последним в списке, то его <tex>next</tex> указывает на <tex>null</tex>.
+
Если узел <tex>node</tex> является последним в списке, то <tex>node.next</tex> указывает на <tex>null</tex>.
  
Сама очередь состоит из двух указателей: на голову <tex>H</tex> и на хвост <tex>T</tex>, которые можно менять атомарно. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.
+
Сама очередь состоит из двух атомарных указателей: <tex>H</tex> на голову и <tex>T</tex> на  хвост. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.
  
Узел списка, на который указывает <tex>H</tex>, является фиктивным (''dummy''). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного ''dummy''-элемента, на который указывают <tex>T</tex> и <tex>H</tex>.
+
Голова списка является фиктивным элементом ''(dummy)''. Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного ''dummy''-элемента, на который указывают <tex>T</tex> и <tex>H</tex>.
 +
 
 +
[[Файл:Структура_msqueue.PNG|500px|thumb|center|Структура очереди Майкла и Скотта]]
  
 
  '''class''' Queue
 
  '''class''' Queue
Строка 18: Строка 19:
 
     tail = '''new''' AtomicReference<Node>(dummy)
 
     tail = '''new''' AtomicReference<Node>(dummy)
  
// TODO; картинка
+
Будем поддерживать следующий инвариант: в нашей очереди <tex>H</tex> указывает на узел, находящийся не правее узла, на который указывает <tex>T</tex>.
  
Будем поддерживать следующий инвариант: в нашей очереди <tex>H</tex> указывает на узел, находящийся не правее узла, на который указывает <tex>T</tex>
+
==Однопоточная реализация==
 
 
==Идея реализации==
 
  
 
=== Удаление элемента ===
 
=== Удаление элемента ===
Строка 43: Строка 42:
 
     T = T.next <font color=green>//Изменение хвоста списка</font>
 
     T = T.next <font color=green>//Изменение хвоста списка</font>
  
== Многопоточная реализация ==
+
== Не lock-free многопоточная реализация ==
  
Будем при всех изменениях указателей на вершины списка использовать <tex>CAS</tex> (то есть при изменении <tex>T</tex>, <tex>H</tex>, и <tex>T.next</tex>)
+
Будем при изменении <tex>T</tex>, <tex>H</tex>, и <tex>T.next</tex> использовать <tex>CAS</tex>.
  
 
=== Удаление элемента ===
 
=== Удаление элемента ===
Строка 79: Строка 78:
 
При данной реализации мы сталкиваемся со следующей проблемой
 
При данной реализации мы сталкиваемся со следующей проблемой
  
== Описание проблемы ==
+
=== Описание проблемы ===
  
 
Рассмотрим ситуацию, при которой два потока <tex>A</tex> и <tex>B</tex> добавляют в очередь элементы <tex>elem</tex> и <tex>elem'</tex>. Рассмотрим следующую последовательность действий:
 
Рассмотрим ситуацию, при которой два потока <tex>A</tex> и <tex>B</tex> добавляют в очередь элементы <tex>elem</tex> и <tex>elem'</tex>. Рассмотрим следующую последовательность действий:
Строка 102: Строка 101:
 
     '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
 
     '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
 
         tail = T.get()
 
         tail = T.get()
         if ('''CAS'''(tail.next, '''null''', newTail)):
+
         '''if''' ('''CAS'''(tail.next, '''null''', newTail)):
             <font color=green>
+
             <font color=green>/*
            /*
+
             Если T указывает на последний добавленный элемент и
             Если получилось добавить ещё один элемент в хвост,  
+
            получилось добавить ещё один элемент в хвост,  
 
             пробуем передвинуть T. Если не получилось передвинуть T,
 
             пробуем передвинуть T. Если не получилось передвинуть T,
 
             значит, другой поток сделал это за нас, завершаем работу.
 
             значит, другой поток сделал это за нас, завершаем работу.
             если получилось - то мы сами передвинули T, завершаем работу
+
             Если получилось - то мы сами передвинули T, завершаем работу
             */
+
             */</font>
            </font>
 
 
             '''CAS'''(T, tail, newTail)
 
             '''CAS'''(T, tail, newTail)
 
             '''return'''
 
             '''return'''
          
+
         else:
 +
            <font color=green>/*
 +
            Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
 +
            Если этого сделать не получилось, значит, это сделал другой поток.
 +
            Если получилось - значит, наш поток передвинул T на текущий последний элемент.
 +
            В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
 +
            */</font>
 +
            '''CAS'''(T, tail, tail.next.get())
  
 +
== Корректная реализация <tex>pop</tex> ==
  
val newTail = Node(x, AtomicReference(null))
+
=== Проблема с <tex>pop</tex> ===
  
        while (true) {
+
Если мы попытаемся воспользоваться написанной выше реализацией метода <tex>pop</tex>, инвариант очереди не будет соблюдён. В силу особенностей реализации метода <tex>push</tex>, в некоторые моменты <tex>T</tex> может указывать не на добавленный последним элемент, а на добавленный предпоследним. В таком случае, с помощью последовательности удалений можно добиться того, что <tex>H</tex> будет указывать на последний добавленный элемент, а <tex>T</tex> - на предпоследний. Таким образом, <tex>H</tex> будет указывать на вершину правее чем та, на которую указывает <tex>T</tex>, то есть инвариант очереди будет нарушен
            val tail = T.get()
+
 
            if (tail.next.compareAndSet(null, newTail)) {
+
=== Корректная реализация ===
                /*
+
 
                Если T - последний добавленный элемент, добавляем ещё один элемент в хвост и
+
Основная проблема предыдущей реализации состоит в том, что в методе <tex>pop</tex> при перемещении <tex>H</tex>, мы никак не следили за положением <tex>T</tex>. Эту проблему можно исправить следующим образом: пусть в методе <tex>pop</tex> рабочий поток будет помогать переместить указатель <tex>T</tex> на последний добавленный элемент (аналогично действиям рабочего потока в методе <tex>push</tex>).
                пробуем передвинуть T. Если не получилось передвинуть T,
+
 
                значит, другой поток сделал это за нас, завершаем работу
+
Для определения того, указывает ли <tex>T</tex> на последний добавленный элемент, воспользуемся следующим соображением: если <tex>T</tex> указывает на последний добавленный элемент, то <tex>T.get().next == '''null'''</tex>, так как за последним добавленным элементом нет других элементов. В противном случае <tex>T</tex> указывает на предпоследний добавленный элемент, и его надо передвинуть на последний добавленный.
                 */
+
 
                T.compareAndSet(tail, newTail)
+
'''def''' pop(): '''Int'''
                return
+
    '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
             } else {
+
        head = H.get() <font color=green>//Сохраняем в локальные переменные текущие голову и хвост, а так же следующий за головным элемент</font>
                 /*
+
        tail = T.get()
                 Если T - не последний элемент, то передвигаем T на последний элемент
+
        nextHead = head.next.get()
                 Если этого сделать не получилось, значит, это сделал другой поток
+
        '''if''' (head == tail):
                 В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь
+
            <font color=green>/*
                */
+
            Если head и tail совпадают, это ещё не означает, что очередь пуста.
                 T.compareAndSet(tail, tail.next.get())
+
            Возможно, что мы просто не успели подвинуть tail. Если tail.next не null,
            }
+
            то мы просто не успели подвинуть tail при добавлении.
        }
+
            */</font>
 +
            '''if''' (nextHead == '''null'''):
 +
                <font color=green>// Следующего элемента нет, очередь пуста</font>
 +
                '''throw''' '''new''' EmptyException()
 +
            '''else''':
 +
                <font color=green>/*
 +
                push не успел подвинуть T, наш поток должен помочь
 +
                tail == head => tail.next == head.next
 +
                 */</font>
 +
                '''CAS'''(T, tail, nextHead)
 +
        '''else''':
 +
            <font color=green>// Очередь гарантированно не пуста, следующий элемент существует</font>
 +
            result = nextHead.data
 +
             '''if''' ('''CAS'''(H, head, nextHead)):
 +
                 <font color=green>/*
 +
                 Если получилось переставить голову, то фиктивным элементом стал
 +
                 H.next, результат - данные, которые в нём лежали. Если не получилось -
 +
                 возвращаемся в начало метода и пробуем ещё раз
 +
                */</font>
 +
                 '''return''' result
  
 
==Примечания==
 
==Примечания==

Текущая версия на 19:08, 4 сентября 2022

Очередь Майкла и Скотта (англ. Michael-Scott Queue) - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и Michael L. Scott [1].

Структура очереди

Очередь построена на односвязном списке. Каждый элемент списка [math]Node[/math] содержит ссылку на хранимые в нём данные и атомарный указатель на следующий элемент списка.

class Node(val data: Int, val next: AtomicReference<Node>)

Если узел [math]node[/math] является последним в списке, то [math]node.next[/math] указывает на [math]null[/math].

Сама очередь состоит из двух атомарных указателей: [math]H[/math] на голову и [math]T[/math] на хвост. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.

Голова списка является фиктивным элементом (dummy). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного dummy-элемента, на который указывают [math]T[/math] и [math]H[/math].

Структура очереди Майкла и Скотта
class Queue
    dummy = new Node(null, new AtomicReference<Node>(null))
    head = new AtomicReference<Node>(dummy)
    tail = new AtomicReference<Node>(dummy)

Будем поддерживать следующий инвариант: в нашей очереди [math]H[/math] указывает на узел, находящийся не правее узла, на который указывает [math]T[/math].

Однопоточная реализация

Удаление элемента

Для удаления элемента необходимо переместить указатель [math]H[/math] на следующую в списке вершину.

def pop(): Int
    if (H.next == null):
        throw new EmptyException()
    H = H.next
    return H.data //H - новый фиктивный элемент

Добавление элемента

Создадим новый узел списка, и добавим его в конец очереди.

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    T.next = newTail //Добавление новой вершины в очередь
    T = T.next //Изменение хвоста списка

Не lock-free многопоточная реализация

Будем при изменении [math]T[/math], [math]H[/math], и [math]T.next[/math] использовать [math]CAS[/math].

Удаление элемента

Для удаления элемента необходимо переместить указатель [math]H[/math] на следующую в списке вершину.

def pop(): Int
    while (true): //Поток пытается в CAS - цикле поменять указатель на H, пока не получится
        head = H.get()
        if (head.next == null):
            throw new EmptyException()
        newHead = head.next.get()
        if (CAS(H, head, nextHead)):
            return newHead.data

Добавление элемента

Создадим новый узел списка, и добавим его в конец очереди.

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    while (true): //Поток пытается в CAS - цикле поменять T.next, пока не получится
        tail = T.get()
        curTail = tail.next
        if (CAS(curTail, null, newTail)): //Поток пытается добавить элемент в конец очереди
            break
   while (true): //Поток пытается в CAS - цикле поменять указатель на T, пока не получится
        tail = T.get()
        nextTail = tail.next.get()
        if (CAS(T, tail, nextTail)):
            break

При данной реализации мы сталкиваемся со следующей проблемой

Описание проблемы

Рассмотрим ситуацию, при которой два потока [math]A[/math] и [math]B[/math] добавляют в очередь элементы [math]elem[/math] и [math]elem'[/math]. Рассмотрим следующую последовательность действий:

  1. Поток [math]A[/math] добавляет в очередь новую вершину, изменяя [math]T.next[/math], но не успевает изменить [math]T[/math] так, чтобы он указывал на только что добавленную вершину.
  2. Планировщик операционной системы усыпляет поток [math]A[/math].
  3. Поток [math]B[/math] собирается добавить новую вершину в очередь, но не может этого сделать, так как постоянно проваливает операцию [math]CAS(T.next, null, newTail)[/math] (T.next не указывает на [math]null[/math], так как поток [math]A[/math] на шаге [math]1[/math] добавил в очередь новую вершину, но не передвинул [math]T[/math])
  4. Поток [math]B[/math] не сможет добавить в очередь новую вершину (а следовательно, завершить операцию [math]push[/math]), до тех пор, пока планировщик операционной системы не разбудит поток [math]A[/math], и поток [math]A[/math] не завершит добавление (то есть не передвинет [math]T[/math] на вершину, добавленную на шаге [math]1[/math].)

Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.

Корректная lock-free реализация

Основная идея

Нельзя выполнить добавление элемента в очередь и перемещение [math]T[/math] атомарно. В таком случае, пусть остальные потоки помогают перенести указатель на хвост очереди. Если поток видит непустой [math]T.next[/math] (то есть если он провалил [math]CAS(tail.next, null, newTail)[/math]), то он должен помочь перенести [math]T[/math], то есть выполнить [math]CAS(T, tail, tail.next.get())[/math] однократно. Если [math]CAS[/math] выполнен успешно, то хвост перемещён успешно (а значит, наш поток должен вернуться к добавлению нового элемента). Если же он выполнен неудачно, то это значит, что [math]T[/math] уже не указывает на [math]tail[/math], а значит, другой поток уже успешно переместил хвост (а значит, наш поток должен вернуться к добавлению нового элемента).

Реализация [math]push[/math]

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    while (true): //CAS-цикл
        tail = T.get()
        if (CAS(tail.next, null, newTail)):
            /*
            Если T указывает на последний добавленный элемент и 
            получилось добавить ещё один элемент в хвост, 
            пробуем передвинуть T. Если не получилось передвинуть T,
            значит, другой поток сделал это за нас, завершаем работу.
            Если получилось - то мы сами передвинули T, завершаем работу
            */
            CAS(T, tail, newTail)
            return
        else:
            /*
            Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
            Если этого сделать не получилось, значит, это сделал другой поток.
            Если получилось - значит, наш поток передвинул T на текущий последний элемент.
            В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
            */
            CAS(T, tail, tail.next.get())

Корректная реализация [math]pop[/math]

Проблема с [math]pop[/math]

Если мы попытаемся воспользоваться написанной выше реализацией метода [math]pop[/math], инвариант очереди не будет соблюдён. В силу особенностей реализации метода [math]push[/math], в некоторые моменты [math]T[/math] может указывать не на добавленный последним элемент, а на добавленный предпоследним. В таком случае, с помощью последовательности удалений можно добиться того, что [math]H[/math] будет указывать на последний добавленный элемент, а [math]T[/math] - на предпоследний. Таким образом, [math]H[/math] будет указывать на вершину правее чем та, на которую указывает [math]T[/math], то есть инвариант очереди будет нарушен

Корректная реализация

Основная проблема предыдущей реализации состоит в том, что в методе [math]pop[/math] при перемещении [math]H[/math], мы никак не следили за положением [math]T[/math]. Эту проблему можно исправить следующим образом: пусть в методе [math]pop[/math] рабочий поток будет помогать переместить указатель [math]T[/math] на последний добавленный элемент (аналогично действиям рабочего потока в методе [math]push[/math]).

Для определения того, указывает ли [math]T[/math] на последний добавленный элемент, воспользуемся следующим соображением: если [math]T[/math] указывает на последний добавленный элемент, то [math]T.get().next == '''null'''[/math], так как за последним добавленным элементом нет других элементов. В противном случае [math]T[/math] указывает на предпоследний добавленный элемент, и его надо передвинуть на последний добавленный.

def pop(): Int
    while (true): //CAS-цикл
        head = H.get() //Сохраняем в локальные переменные текущие голову и хвост, а так же следующий за головным элемент
        tail = T.get()
        nextHead = head.next.get()
        if (head == tail):
            /*
            Если head и tail совпадают, это ещё не означает, что очередь пуста.
            Возможно, что мы просто не успели подвинуть tail. Если tail.next не null,
            то мы просто не успели подвинуть tail при добавлении.
            */
            if (nextHead == null):
                // Следующего элемента нет, очередь пуста
                throw new EmptyException()
            else:
                /*
                push не успел подвинуть T, наш поток должен помочь
                tail == head => tail.next == head.next
                */
                CAS(T, tail, nextHead)
       else:
           // Очередь гарантированно не пуста, следующий элемент существует
           result = nextHead.data
           if (CAS(H, head, nextHead)):
               /*
               Если получилось переставить голову, то фиктивным элементом стал
               H.next, результат - данные, которые в нём лежали. Если не получилось - 
               возвращаемся в начало метода и пробуем ещё раз
               */
               return result

Примечания

Источники информации

  • Maurice Herliny & Nir Shavit - The Art of Multiprocessor programming, стр 230