Очередь Майкла и Скотта — различия между версиями

Материал из Викиконспекты
Перейти к: навигация, поиск
(Описание проблемы)
м (rollbackEdits.php mass rollback)
 
(не показаны 32 промежуточные версии 9 участников)
Строка 1: Строка 1:
{{В разработке}}
+
'''Очередь Майкла и Скотта''' (англ. ''Michael-Scott Queue'') - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и  Michael L. Scott <ref>[http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf? Simple, Fast, and Practical Non-Blocking and BlockingConcurrent Queue Algorithms]</ref>.
'''Очередь Майкла и Скотта''' ''(Michael-Scott Queue)'' - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и  Michael L. Scot в статье <ref>[http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf? Simple, Fast, and Practical Non-Blocking and BlockingConcurrent Queue Algorithms]</ref>.
 
 
== Структура очереди ==
 
== Структура очереди ==
  
Очередь моделируется с помощью односвязного списка. Каждый элемент списка (<tex>Node</tex>) содержит ссылку на хранимые в нём данные и указатель на следующий элемент списка (который можно менять атомарно).
+
Очередь построена на односвязном списке. Каждый элемент списка <tex>Node</tex> содержит ссылку на хранимые в нём данные и атомарный указатель на следующий элемент списка.
  
  '''case class''' Node('''val''' data: '''Int''', '''val''' next: AtomicReference<Node>)
+
  '''class''' Node('''val''' data: '''Int''', '''val''' next: AtomicReference<Node>)
  
Если узел <tex>node</tex> является последним в списке, то его <tex>next</tex> указывает на <tex>null</tex>.
+
Если узел <tex>node</tex> является последним в списке, то <tex>node.next</tex> указывает на <tex>null</tex>.
  
Сама очередь состоит из двух указателей: на голову <tex>H</tex> и на хвост <tex>T</tex>, которые можно менять атомарно. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.
+
Сама очередь состоит из двух атомарных указателей: <tex>H</tex> на голову и <tex>T</tex> на  хвост. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.
  
Узел списка, на который указывает <tex>H</tex>, является фиктивным (''dummy''). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного ''dummy''-элемента, на который указывают <tex>T</tex> и <tex>H</tex>.
+
Голова списка является фиктивным элементом ''(dummy)''. Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного ''dummy''-элемента, на который указывают <tex>T</tex> и <tex>H</tex>.
 +
 
 +
[[Файл:Структура_msqueue.PNG|500px|thumb|center|Структура очереди Майкла и Скотта]]
  
 
  '''class''' Queue
 
  '''class''' Queue
Строка 18: Строка 19:
 
     tail = '''new''' AtomicReference<Node>(dummy)
 
     tail = '''new''' AtomicReference<Node>(dummy)
  
// TODO; картинка
+
Будем поддерживать следующий инвариант: в нашей очереди <tex>H</tex> указывает на узел, находящийся не правее узла, на который указывает <tex>T</tex>.
  
Будем поддерживать следующий инвариант: в нашей очереди <tex>H</tex> указывает на узел, находящийся не правее узла, на который указывает <tex>T</tex>
+
==Однопоточная реализация==
 
 
==Идея реализации==
 
  
 
=== Удаление элемента ===
 
=== Удаление элемента ===
Строка 43: Строка 42:
 
     T = T.next <font color=green>//Изменение хвоста списка</font>
 
     T = T.next <font color=green>//Изменение хвоста списка</font>
  
== Многопоточная реализация ==
+
== Не lock-free многопоточная реализация ==
  
Будем при всех изменениях указателей на вершины списка использовать <tex>CAS</tex> (то есть при изменении <tex>T</tex>, <tex>H</tex>, и <tex>T.next</tex>)
+
Будем при изменении <tex>T</tex>, <tex>H</tex>, и <tex>T.next</tex> использовать <tex>CAS</tex>.
  
 
=== Удаление элемента ===
 
=== Удаление элемента ===
Строка 77: Строка 76:
 
             '''break'''
 
             '''break'''
  
При этом мы сталкиваемся со следующей проблемой
+
При данной реализации мы сталкиваемся со следующей проблемой
  
 
=== Описание проблемы ===
 
=== Описание проблемы ===
  
Рассмотрим ситуацию, при которой два потока <tex>A</tex> и <tex>B</tex> добавляют в очередь элементы <tex>6</tex> и <tex>7</tex>. Рассмотрим следующую последовательность действий:
+
Рассмотрим ситуацию, при которой два потока <tex>A</tex> и <tex>B</tex> добавляют в очередь элементы <tex>elem</tex> и <tex>elem'</tex>. Рассмотрим следующую последовательность действий:
  
 
# Поток <tex>A</tex> добавляет в очередь новую вершину, изменяя <tex>T.next</tex>, но не успевает изменить <tex>T</tex> так, чтобы он указывал на только что добавленную вершину.
 
# Поток <tex>A</tex> добавляет в очередь новую вершину, изменяя <tex>T.next</tex>, но не успевает изменить <tex>T</tex> так, чтобы он указывал на только что добавленную вершину.
Строка 89: Строка 88:
  
 
Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.
 
Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.
 +
 +
== Корректная lock-free реализация ==
 +
 +
=== Основная идея ===
 +
 +
Нельзя выполнить добавление элемента в очередь и перемещение <tex>T</tex> атомарно. В таком случае, пусть остальные потоки помогают перенести указатель на хвост очереди. Если поток видит непустой <tex>T.next</tex> (то есть если он провалил <tex>CAS(tail.next, null, newTail)</tex>), то он должен помочь перенести <tex>T</tex>, то есть выполнить <tex>CAS(T, tail, tail.next.get())</tex> однократно. Если <tex>CAS</tex> выполнен успешно, то хвост перемещён успешно (а значит, наш поток должен вернуться к добавлению нового элемента). Если же он выполнен неудачно, то это значит, что <tex>T</tex> уже не указывает на <tex>tail</tex>, а значит, другой поток уже успешно переместил хвост (а значит, наш поток должен вернуться к добавлению нового элемента).
 +
 +
=== Реализация <tex>push</tex> ===
 +
 +
'''def''' push(x: '''Int'''):
 +
    newTail = '''new''' Node(x, '''new''' AtomicReference<Node>(null))
 +
    '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
 +
        tail = T.get()
 +
        '''if''' ('''CAS'''(tail.next, '''null''', newTail)):
 +
            <font color=green>/*
 +
            Если T указывает на последний добавленный элемент и
 +
            получилось добавить ещё один элемент в хвост,
 +
            пробуем передвинуть T. Если не получилось передвинуть T,
 +
            значит, другой поток сделал это за нас, завершаем работу.
 +
            Если получилось - то мы сами передвинули T, завершаем работу
 +
            */</font>
 +
            '''CAS'''(T, tail, newTail)
 +
            '''return'''
 +
        else:
 +
            <font color=green>/*
 +
            Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
 +
            Если этого сделать не получилось, значит, это сделал другой поток.
 +
            Если получилось - значит, наш поток передвинул T на текущий последний элемент.
 +
            В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
 +
            */</font>
 +
            '''CAS'''(T, tail, tail.next.get())
 +
 +
== Корректная реализация <tex>pop</tex> ==
 +
 +
=== Проблема с <tex>pop</tex> ===
 +
 +
Если мы попытаемся воспользоваться написанной выше реализацией метода <tex>pop</tex>, инвариант очереди не будет соблюдён. В силу особенностей реализации метода <tex>push</tex>, в некоторые моменты <tex>T</tex> может указывать не на добавленный последним элемент, а на добавленный предпоследним. В таком случае, с помощью последовательности удалений можно добиться того, что <tex>H</tex> будет указывать на последний добавленный элемент, а <tex>T</tex> - на предпоследний. Таким образом, <tex>H</tex> будет указывать на вершину правее чем та, на которую указывает <tex>T</tex>, то есть инвариант очереди будет нарушен
 +
 +
=== Корректная реализация ===
 +
 +
Основная проблема предыдущей реализации состоит в том, что в методе <tex>pop</tex> при перемещении <tex>H</tex>, мы никак не следили за положением <tex>T</tex>. Эту проблему можно исправить следующим образом: пусть в методе <tex>pop</tex> рабочий поток будет помогать переместить указатель <tex>T</tex> на последний добавленный элемент (аналогично действиям рабочего потока в методе <tex>push</tex>).
 +
 +
Для определения того, указывает ли <tex>T</tex> на последний добавленный элемент, воспользуемся следующим соображением: если <tex>T</tex> указывает на последний добавленный элемент, то <tex>T.get().next == '''null'''</tex>, так как за последним добавленным элементом нет других элементов. В противном случае <tex>T</tex> указывает на предпоследний добавленный элемент, и его надо передвинуть на последний добавленный.
 +
 +
'''def''' pop(): '''Int'''
 +
    '''while''' ('''true'''): <font color=green>//CAS-цикл</font>
 +
        head = H.get() <font color=green>//Сохраняем в локальные переменные текущие голову и хвост, а так же следующий за головным элемент</font>
 +
        tail = T.get()
 +
        nextHead = head.next.get()
 +
        '''if''' (head == tail):
 +
            <font color=green>/*
 +
            Если head и tail совпадают, это ещё не означает, что очередь пуста.
 +
            Возможно, что мы просто не успели подвинуть tail. Если tail.next не null,
 +
            то мы просто не успели подвинуть tail при добавлении.
 +
            */</font>
 +
            '''if''' (nextHead == '''null'''):
 +
                <font color=green>// Следующего элемента нет, очередь пуста</font>
 +
                '''throw''' '''new''' EmptyException()
 +
            '''else''':
 +
                <font color=green>/*
 +
                push не успел подвинуть T, наш поток должен помочь
 +
                tail == head => tail.next == head.next
 +
                */</font>
 +
                '''CAS'''(T, tail, nextHead)
 +
        '''else''':
 +
            <font color=green>// Очередь гарантированно не пуста, следующий элемент существует</font>
 +
            result = nextHead.data
 +
            '''if''' ('''CAS'''(H, head, nextHead)):
 +
                <font color=green>/*
 +
                Если получилось переставить голову, то фиктивным элементом стал
 +
                H.next, результат - данные, которые в нём лежали. Если не получилось -
 +
                возвращаемся в начало метода и пробуем ещё раз
 +
                */</font>
 +
                '''return''' result
  
 
==Примечания==
 
==Примечания==

Текущая версия на 19:08, 4 сентября 2022

Очередь Майкла и Скотта (англ. Michael-Scott Queue) - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и Michael L. Scott [1].

Структура очереди

Очередь построена на односвязном списке. Каждый элемент списка [math]Node[/math] содержит ссылку на хранимые в нём данные и атомарный указатель на следующий элемент списка.

class Node(val data: Int, val next: AtomicReference<Node>)

Если узел [math]node[/math] является последним в списке, то [math]node.next[/math] указывает на [math]null[/math].

Сама очередь состоит из двух атомарных указателей: [math]H[/math] на голову и [math]T[/math] на хвост. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.

Голова списка является фиктивным элементом (dummy). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного dummy-элемента, на который указывают [math]T[/math] и [math]H[/math].

Структура очереди Майкла и Скотта
class Queue
    dummy = new Node(null, new AtomicReference<Node>(null))
    head = new AtomicReference<Node>(dummy)
    tail = new AtomicReference<Node>(dummy)

Будем поддерживать следующий инвариант: в нашей очереди [math]H[/math] указывает на узел, находящийся не правее узла, на который указывает [math]T[/math].

Однопоточная реализация

Удаление элемента

Для удаления элемента необходимо переместить указатель [math]H[/math] на следующую в списке вершину.

def pop(): Int
    if (H.next == null):
        throw new EmptyException()
    H = H.next
    return H.data //H - новый фиктивный элемент

Добавление элемента

Создадим новый узел списка, и добавим его в конец очереди.

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    T.next = newTail //Добавление новой вершины в очередь
    T = T.next //Изменение хвоста списка

Не lock-free многопоточная реализация

Будем при изменении [math]T[/math], [math]H[/math], и [math]T.next[/math] использовать [math]CAS[/math].

Удаление элемента

Для удаления элемента необходимо переместить указатель [math]H[/math] на следующую в списке вершину.

def pop(): Int
    while (true): //Поток пытается в CAS - цикле поменять указатель на H, пока не получится
        head = H.get()
        if (head.next == null):
            throw new EmptyException()
        newHead = head.next.get()
        if (CAS(H, head, nextHead)):
            return newHead.data

Добавление элемента

Создадим новый узел списка, и добавим его в конец очереди.

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    while (true): //Поток пытается в CAS - цикле поменять T.next, пока не получится
        tail = T.get()
        curTail = tail.next
        if (CAS(curTail, null, newTail)): //Поток пытается добавить элемент в конец очереди
            break
   while (true): //Поток пытается в CAS - цикле поменять указатель на T, пока не получится
        tail = T.get()
        nextTail = tail.next.get()
        if (CAS(T, tail, nextTail)):
            break

При данной реализации мы сталкиваемся со следующей проблемой

Описание проблемы

Рассмотрим ситуацию, при которой два потока [math]A[/math] и [math]B[/math] добавляют в очередь элементы [math]elem[/math] и [math]elem'[/math]. Рассмотрим следующую последовательность действий:

  1. Поток [math]A[/math] добавляет в очередь новую вершину, изменяя [math]T.next[/math], но не успевает изменить [math]T[/math] так, чтобы он указывал на только что добавленную вершину.
  2. Планировщик операционной системы усыпляет поток [math]A[/math].
  3. Поток [math]B[/math] собирается добавить новую вершину в очередь, но не может этого сделать, так как постоянно проваливает операцию [math]CAS(T.next, null, newTail)[/math] (T.next не указывает на [math]null[/math], так как поток [math]A[/math] на шаге [math]1[/math] добавил в очередь новую вершину, но не передвинул [math]T[/math])
  4. Поток [math]B[/math] не сможет добавить в очередь новую вершину (а следовательно, завершить операцию [math]push[/math]), до тех пор, пока планировщик операционной системы не разбудит поток [math]A[/math], и поток [math]A[/math] не завершит добавление (то есть не передвинет [math]T[/math] на вершину, добавленную на шаге [math]1[/math].)

Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.

Корректная lock-free реализация

Основная идея

Нельзя выполнить добавление элемента в очередь и перемещение [math]T[/math] атомарно. В таком случае, пусть остальные потоки помогают перенести указатель на хвост очереди. Если поток видит непустой [math]T.next[/math] (то есть если он провалил [math]CAS(tail.next, null, newTail)[/math]), то он должен помочь перенести [math]T[/math], то есть выполнить [math]CAS(T, tail, tail.next.get())[/math] однократно. Если [math]CAS[/math] выполнен успешно, то хвост перемещён успешно (а значит, наш поток должен вернуться к добавлению нового элемента). Если же он выполнен неудачно, то это значит, что [math]T[/math] уже не указывает на [math]tail[/math], а значит, другой поток уже успешно переместил хвост (а значит, наш поток должен вернуться к добавлению нового элемента).

Реализация [math]push[/math]

def push(x: Int):
    newTail = new Node(x, new AtomicReference<Node>(null))
    while (true): //CAS-цикл
        tail = T.get()
        if (CAS(tail.next, null, newTail)):
            /*
            Если T указывает на последний добавленный элемент и 
            получилось добавить ещё один элемент в хвост, 
            пробуем передвинуть T. Если не получилось передвинуть T,
            значит, другой поток сделал это за нас, завершаем работу.
            Если получилось - то мы сами передвинули T, завершаем работу
            */
            CAS(T, tail, newTail)
            return
        else:
            /*
            Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
            Если этого сделать не получилось, значит, это сделал другой поток.
            Если получилось - значит, наш поток передвинул T на текущий последний элемент.
            В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
            */
            CAS(T, tail, tail.next.get())

Корректная реализация [math]pop[/math]

Проблема с [math]pop[/math]

Если мы попытаемся воспользоваться написанной выше реализацией метода [math]pop[/math], инвариант очереди не будет соблюдён. В силу особенностей реализации метода [math]push[/math], в некоторые моменты [math]T[/math] может указывать не на добавленный последним элемент, а на добавленный предпоследним. В таком случае, с помощью последовательности удалений можно добиться того, что [math]H[/math] будет указывать на последний добавленный элемент, а [math]T[/math] - на предпоследний. Таким образом, [math]H[/math] будет указывать на вершину правее чем та, на которую указывает [math]T[/math], то есть инвариант очереди будет нарушен

Корректная реализация

Основная проблема предыдущей реализации состоит в том, что в методе [math]pop[/math] при перемещении [math]H[/math], мы никак не следили за положением [math]T[/math]. Эту проблему можно исправить следующим образом: пусть в методе [math]pop[/math] рабочий поток будет помогать переместить указатель [math]T[/math] на последний добавленный элемент (аналогично действиям рабочего потока в методе [math]push[/math]).

Для определения того, указывает ли [math]T[/math] на последний добавленный элемент, воспользуемся следующим соображением: если [math]T[/math] указывает на последний добавленный элемент, то [math]T.get().next == '''null'''[/math], так как за последним добавленным элементом нет других элементов. В противном случае [math]T[/math] указывает на предпоследний добавленный элемент, и его надо передвинуть на последний добавленный.

def pop(): Int
    while (true): //CAS-цикл
        head = H.get() //Сохраняем в локальные переменные текущие голову и хвост, а так же следующий за головным элемент
        tail = T.get()
        nextHead = head.next.get()
        if (head == tail):
            /*
            Если head и tail совпадают, это ещё не означает, что очередь пуста.
            Возможно, что мы просто не успели подвинуть tail. Если tail.next не null,
            то мы просто не успели подвинуть tail при добавлении.
            */
            if (nextHead == null):
                // Следующего элемента нет, очередь пуста
                throw new EmptyException()
            else:
                /*
                push не успел подвинуть T, наш поток должен помочь
                tail == head => tail.next == head.next
                */
                CAS(T, tail, nextHead)
       else:
           // Очередь гарантированно не пуста, следующий элемент существует
           result = nextHead.data
           if (CAS(H, head, nextHead)):
               /*
               Если получилось переставить голову, то фиктивным элементом стал
               H.next, результат - данные, которые в нём лежали. Если не получилось - 
               возвращаемся в начало метода и пробуем ещё раз
               */
               return result

Примечания

Источники информации

  • Maurice Herliny & Nir Shavit - The Art of Multiprocessor programming, стр 230