Изменения

Перейти к: навигация, поиск

Очередь Майкла и Скотта

11 921 байт добавлено, 02:42, 13 января 2019
убрала лишнюю переменную, которая, вероятно, осталась после копипасты
{{В разработке}}'''Очередь Майкла и Скотта''' (англ. ''(Michael-Scott Queue)'' ) - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и Michael L. Scot в статье Scott <ref>[http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf? Simple, Fast, and Practical Non-Blocking and BlockingConcurrent Queue Algorithms]</ref>.
== Структура очереди ==
Очередь моделируется с помощью односвязного спискапостроена на односвязном списке. Каждый элемент списка (<tex>Node</tex>) содержит ссылку на хранимые в нём данные и атомарный указатель на следующий элемент списка (который можно менять атомарно).
'''case class''' Node<'''T'''>('''val''' data: '''TInt''', '''val''' next: AtomicReference<Node<'''T'''>>)
Если узел <tex>node</tex> является последним в списке, то его <tex>node.next</tex> указывает на <tex>null</tex>.
Сама очередь состоит из двух атомарных указателей: на голову <tex>H</tex> на голову и на хвост <tex>T</tex>, которые можно менять атомарнона хвост. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.
Узел Голова списка, на который указывает <tex>H</tex>, является фиктивным (элементом ''(dummy)''). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного ''dummy''-элемента, на который указывают <tex>T</tex> и <tex>H</tex>.
'''class''' Queue<'''T'''> dummy = new Node<'''T'''>(null, new AtomicReference<Node>(null)) head = new AtomicReference<Node<'''T'''>>(dummy) tail = new AtomicReference<Node<'''T'''>>(dummy)[[Файл:Структура_msqueue.PNG|500px|thumb|center|Структура очереди Майкла и Скотта]]
// TODO; картинка '''class''' Queue dummy = '''new''' Node(null, '''new''' AtomicReference<Node>(null)) head = '''new''' AtomicReference<Node>(dummy) tail = '''new''' AtomicReference<Node>(dummy)
Будем поддерживать следующий инвариант: в нашей очереди <tex>H</tex> указывает на узел, находящийся не правее узла, на который указывает <tex>T</tex>.
==Идея реализацииОднопоточная реализация==
=== Удаление элемента ===
Для удаления элемента необходимо переместить указатель <tex>H</tex> на следующую в списке вершину.
'''def''' pop(): '''TInt'''
if (H.next == null):
'''throw''' ''' new ''' EmptyException()
H = H.next
'''return ''' H.data <font color=green>//H - новый фиктивный элемент</font>
=== Добавление элемента ===
 
Создадим новый узел списка, и добавим его в конец очереди.
 
'''def''' push(x: '''Int'''):
newTail = '''new''' Node(x, new AtomicReference<Node>(null))
T.next = newTail <font color=green>//Добавление новой вершины в очередь</font>
T = T.next <font color=green>//Изменение хвоста списка</font>
 
== Не lock-free многопоточная реализация ==
 
Будем при изменении <tex>T</tex>, <tex>H</tex>, и <tex>T.next</tex> использовать <tex>CAS</tex>.
 
=== Удаление элемента ===
 
Для удаления элемента необходимо переместить указатель <tex>H</tex> на следующую в списке вершину.
 
'''def''' pop(): '''Int'''
'''while''' ('''true'''): <font color=green>//Поток пытается в CAS - цикле поменять указатель на H, пока не получится</font>
head = H.get()
if (head.next == null):
'''throw''' '''new''' EmptyException()
newHead = head.next.get()
if ('''CAS'''(H, head, nextHead)):
'''return''' newHead.data
 
=== Добавление элемента ===
 
Создадим новый узел списка, и добавим его в конец очереди.
 
'''def''' push(x: '''Int'''):
newTail = '''new''' Node(x, '''new''' AtomicReference<Node>(null))
'''while''' ('''true'''): <font color=green>//Поток пытается в CAS - цикле поменять T.next, пока не получится</font>
tail = T.get()
curTail = tail.next
if ('''CAS'''(curTail, null, newTail)): <font color=green>//Поток пытается добавить элемент в конец очереди</font>
'''break'''
'''while''' ('''true'''): <font color=green>//Поток пытается в CAS - цикле поменять указатель на T, пока не получится</font>
tail = T.get()
nextTail = tail.next.get()
if ('''CAS'''(T, tail, nextTail)):
'''break'''
 
При данной реализации мы сталкиваемся со следующей проблемой
 
=== Описание проблемы ===
 
Рассмотрим ситуацию, при которой два потока <tex>A</tex> и <tex>B</tex> добавляют в очередь элементы <tex>elem</tex> и <tex>elem'</tex>. Рассмотрим следующую последовательность действий:
 
# Поток <tex>A</tex> добавляет в очередь новую вершину, изменяя <tex>T.next</tex>, но не успевает изменить <tex>T</tex> так, чтобы он указывал на только что добавленную вершину.
# Планировщик операционной системы усыпляет поток <tex>A</tex>.
# Поток <tex>B</tex> собирается добавить новую вершину в очередь, но не может этого сделать, так как постоянно проваливает операцию <tex>CAS(T.next, null, newTail)</tex> (T.next не указывает на <tex>null</tex>, так как поток <tex>A</tex> на шаге <tex>1</tex> добавил в очередь новую вершину, но не передвинул <tex>T</tex>)
# Поток <tex>B</tex> не сможет добавить в очередь новую вершину (а следовательно, завершить операцию <tex>push</tex>), до тех пор, пока планировщик операционной системы не разбудит поток <tex>A</tex>, и поток <tex>A</tex> не завершит добавление (то есть не передвинет <tex>T</tex> на вершину, добавленную на шаге <tex>1</tex>.)
 
Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.
 
== Корректная lock-free реализация ==
 
=== Основная идея ===
 
Нельзя выполнить добавление элемента в очередь и перемещение <tex>T</tex> атомарно. В таком случае, пусть остальные потоки помогают перенести указатель на хвост очереди. Если поток видит непустой <tex>T.next</tex> (то есть если он провалил <tex>CAS(tail.next, null, newTail)</tex>), то он должен помочь перенести <tex>T</tex>, то есть выполнить <tex>CAS(T, tail, tail.next.get())</tex> однократно. Если <tex>CAS</tex> выполнен успешно, то хвост перемещён успешно (а значит, наш поток должен вернуться к добавлению нового элемента). Если же он выполнен неудачно, то это значит, что <tex>T</tex> уже не указывает на <tex>tail</tex>, а значит, другой поток уже успешно переместил хвост (а значит, наш поток должен вернуться к добавлению нового элемента).
 
=== Реализация <tex>push</tex> ===
 
'''def''' push(x: '''Int'''):
newTail = '''new''' Node(x, '''new''' AtomicReference<Node>(null))
'''while''' ('''true'''): <font color=green>//CAS-цикл</font>
tail = T.get()
'''if''' ('''CAS'''(tail.next, '''null''', newTail)):
<font color=green>/*
Если T указывает на последний добавленный элемент и
получилось добавить ещё один элемент в хвост,
пробуем передвинуть T. Если не получилось передвинуть T,
значит, другой поток сделал это за нас, завершаем работу.
Если получилось - то мы сами передвинули T, завершаем работу
*/</font>
'''CAS'''(T, tail, newTail)
'''return'''
else:
<font color=green>/*
Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент
Если этого сделать не получилось, значит, это сделал другой поток.
Если получилось - значит, наш поток передвинул T на текущий последний элемент.
В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины.
*/</font>
'''CAS'''(T, tail, tail.next.get())
 
== Корректная реализация <tex>pop</tex> ==
 
=== Проблема с <tex>pop</tex> ===
 
Если мы попытаемся воспользоваться написанной выше реализацией метода <tex>pop</tex>, инвариант очереди не будет соблюдён. В силу особенностей реализации метода <tex>push</tex>, в некоторые моменты <tex>T</tex> может указывать не на добавленный последним элемент, а на добавленный предпоследним. В таком случае, с помощью последовательности удалений можно добиться того, что <tex>H</tex> будет указывать на последний добавленный элемент, а <tex>T</tex> - на предпоследний. Таким образом, <tex>H</tex> будет указывать на вершину правее чем та, на которую указывает <tex>T</tex>, то есть инвариант очереди будет нарушен
 
=== Корректная реализация ===
 
Основная проблема предыдущей реализации состоит в том, что в методе <tex>pop</tex> при перемещении <tex>H</tex>, мы никак не следили за положением <tex>T</tex>. Эту проблему можно исправить следующим образом: пусть в методе <tex>pop</tex> рабочий поток будет помогать переместить указатель <tex>T</tex> на последний добавленный элемент (аналогично действиям рабочего потока в методе <tex>push</tex>).
 
Для определения того, указывает ли <tex>T</tex> на последний добавленный элемент, воспользуемся следующим соображением: если <tex>T</tex> указывает на последний добавленный элемент, то <tex>T.get().next == '''null'''</tex>, так как за последним добавленным элементом нет других элементов. В противном случае <tex>T</tex> указывает на предпоследний добавленный элемент, и его надо передвинуть на последний добавленный.
 
'''def''' pop(): '''Int'''
'''while''' ('''true'''): <font color=green>//CAS-цикл</font>
head = H.get() <font color=green>//Сохраняем в локальные переменные текущие голову и хвост, а так же следующий за головным элемент</font>
tail = T.get()
nextHead = head.next.get()
'''if''' (head == tail):
<font color=green>/*
Если head и tail совпадают, это ещё не означает, что очередь пуста.
Возможно, что мы просто не успели подвинуть tail. Если tail.next не null,
то мы просто не успели подвинуть tail при добавлении.
*/</font>
'''if''' (nextHead == '''null'''):
<font color=green>// Следующего элемента нет, очередь пуста</font>
'''throw''' '''new''' EmptyException()
'''else''':
<font color=green>/*
push не успел подвинуть T, наш поток должен помочь
tail == head => tail.next == head.next
*/</font>
'''else''':
<font color=green>// Очередь гарантированно не пуста, следующий элемент существует</font>
result = nextHead.data
'''if''' ('''CAS'''(H, head, nextHead)):
<font color=green>/*
Если получилось переставить голову, то фиктивным элементом стал
H.next, результат - данные, которые в нём лежали. Если не получилось -
возвращаемся в начало метода и пробуем ещё раз
*/</font>
'''return''' result
==Примечания==
Анонимный участник

Навигация