Очередь Майкла и Скотта — различия между версиями
(→Реализация push) |
(→Реализация push) |
||
Строка 102: | Строка 102: | ||
'''while''' ('''true'''): <font color=green>//CAS-цикл</font> | '''while''' ('''true'''): <font color=green>//CAS-цикл</font> | ||
tail = T.get() | tail = T.get() | ||
− | if ('''CAS'''(tail.next, '''null''', newTail)): | + | '''if''' ('''CAS'''(tail.next, '''null''', newTail)): |
− | <font color=green> | + | <font color=green>/* |
− | + | Если T указывает на последний добавленный элемент и | |
− | Если получилось добавить ещё один элемент в хвост, | + | получилось добавить ещё один элемент в хвост, |
пробуем передвинуть T. Если не получилось передвинуть T, | пробуем передвинуть T. Если не получилось передвинуть T, | ||
значит, другой поток сделал это за нас, завершаем работу. | значит, другой поток сделал это за нас, завершаем работу. | ||
− | + | Если получилось - то мы сами передвинули T, завершаем работу | |
− | */ | + | */</font> |
− | |||
'''CAS'''(T, tail, newTail) | '''CAS'''(T, tail, newTail) | ||
'''return''' | '''return''' | ||
− | + | else: | |
− | + | <font color=green>/* | |
− | + | Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент | |
− | + | Если этого сделать не получилось, значит, это сделал другой поток. | |
− | + | Если получилось - значит, наш поток передвинул T на текущий последний элемент. | |
− | + | В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины. | |
− | + | */</font> | |
− | + | '''CAS'''(T, tail, tail.next.get()) | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
==Примечания== | ==Примечания== |
Версия 01:39, 2 октября 2018
Очередь Майкла и Скотта (Michael-Scott Queue) - алгоритм построения lock-free очереди. Впервые был предложен Maged M. Michael и Michael L. Scot в статье [1].
Содержание
Структура очереди
Очередь моделируется с помощью односвязного списка. Каждый элемент списка (
) содержит ссылку на хранимые в нём данные и указатель на следующий элемент списка (который можно менять атомарно).case class Node(val data: Int, val next: AtomicReference<Node>)
Если узел
является последним в списке, то его указывает на .Сама очередь состоит из двух указателей: на голову
и на хвост , которые можно менять атомарно. Удаление из очереди происходит со стороны головы, добавление - со стороны хвоста.Узел списка, на который указывает
, является фиктивным (dummy). Данные, хранимые в этом узле, не имеют значения. Изначально очередь состоит из одного dummy-элемента, на который указывают и .class Queue dummy = new Node(null, new AtomicReference<Node>(null)) head = new AtomicReference<Node>(dummy) tail = new AtomicReference<Node>(dummy)
// TODO; картинка
Будем поддерживать следующий инвариант: в нашей очереди
указывает на узел, находящийся не правее узла, на который указываетИдея реализации
Удаление элемента
Для удаления элемента необходимо переместить указатель
на следующую в списке вершину.def pop(): Int if (H.next == null): throw new EmptyException() H = H.next return H.data //H - новый фиктивный элемент
Добавление элемента
Создадим новый узел списка, и добавим его в конец очереди.
def push(x: Int): newTail = new Node(x, new AtomicReference<Node>(null)) T.next = newTail //Добавление новой вершины в очередь T = T.next //Изменение хвоста списка
Многопоточная реализация
Будем при всех изменениях указателей на вершины списка использовать
(то есть при изменении , , и )Удаление элемента
Для удаления элемента необходимо переместить указатель
на следующую в списке вершину.def pop(): Int while (true): //Поток пытается в CAS - цикле поменять указатель на H, пока не получится head = H.get() if (head.next == null): throw new EmptyException() newHead = head.next.get() if (CAS(H, head, nextHead)): return newHead.data
Добавление элемента
Создадим новый узел списка, и добавим его в конец очереди.
def push(x: Int): newTail = new Node(x, new AtomicReference<Node>(null)) while (true): //Поток пытается в CAS - цикле поменять T.next, пока не получится tail = T.get() curTail = tail.next if (CAS(curTail, null, newTail)): //Поток пытается добавить элемент в конец очереди break while (true): //Поток пытается в CAS - цикле поменять указатель на T, пока не получится tail = T.get() nextTail = tail.next.get() if (CAS(T, tail, nextTail)): break
При данной реализации мы сталкиваемся со следующей проблемой
Описание проблемы
Рассмотрим ситуацию, при которой два потока
и добавляют в очередь элементы и . Рассмотрим следующую последовательность действий:- Поток добавляет в очередь новую вершину, изменяя , но не успевает изменить так, чтобы он указывал на только что добавленную вершину.
- Планировщик операционной системы усыпляет поток .
- Поток собирается добавить новую вершину в очередь, но не может этого сделать, так как постоянно проваливает операцию (T.next не указывает на , так как поток на шаге добавил в очередь новую вершину, но не передвинул )
- Поток не сможет добавить в очередь новую вершину (а следовательно, завершить операцию ), до тех пор, пока планировщик операционной системы не разбудит поток , и поток не завершит добавление (то есть не передвинет на вершину, добавленную на шаге .)
Следовательно, у такой очереди нет гарантии прогресса, и этот алгоритм не lock-free.
Корректная lock-free реализация
Основная идея
Нельзя выполнить добавление элемента в очередь и перемещение
атомарно. В таком случае, пусть остальные потоки помогают перенести указатель на хвост очереди. Если поток видит непустой (то есть если он провалил ), то он должен помочь перенести , то есть выполнить однократно. Если выполнен успешно, то хвост перемещён успешно (а значит, наш поток должен вернуться к добавлению нового элемента). Если же он выполнен неудачно, то это значит, что уже не указывает на , а значит, другой поток уже успешно переместил хвост (а значит, наш поток должен вернуться к добавлению нового элемента).Реализация
def push(x: Int): newTail = new Node(x, new AtomicReference<Node>(null)) while (true): //CAS-цикл tail = T.get() if (CAS(tail.next, null, newTail)): /* Если T указывает на последний добавленный элемент и получилось добавить ещё один элемент в хвост, пробуем передвинуть T. Если не получилось передвинуть T, значит, другой поток сделал это за нас, завершаем работу. Если получилось - то мы сами передвинули T, завершаем работу */ CAS(T, tail, newTail) return else: /* Если T - не последний добавленный элемент элемент, то передвигаем T на последний элемент Если этого сделать не получилось, значит, это сделал другой поток. Если получилось - значит, наш поток передвинул T на текущий последний элемент. В любом случае, возвращаемся в начало CAS-цикла, чтобы завершить добавление в очередь новой вершины. */ CAS(T, tail, tail.next.get())
Примечания
Источники информации
- Maurice Herliny & Nir Shavit - The Art of Multiprocessor programming, стр 230