Appearance
Проблемы многопоточного программирования
Состояние гонки (Race Condition)
Состояние гонки возникает, когда несколько потоков одновременно обращаются к общему ресурсу без должной синхронизации. Это может привести к непредсказуемым результатам, так как итоговое состояние ресурса зависит от порядка выполнения операций потоков.
Рассмотрим простой пример, где два потока увеличивают общую переменную:
python
from threading import Thread
import time
import random
counter = 0
def increment(amount_repeat):
global counter
for _ in range(amount_repeat):
time.sleep(random.random()/5)
data = counter
time.sleep(random.random()/5)
data += 1
counter = data
print(f'{counter} ', end='')
print(f"Start counter value: {counter}")
thread_1 = Thread(target=increment, args=(10, ))
thread_1.start()
thread_2 = Thread(target=increment, args=(10, ))
thread_2.start()
thread_1.join()
thread_2.join()
print(f"\nFinally counter value: {counter}")
Возможный вывод:
Start counter value: 0
1 2 3 1 4 5 5 6 7 8 9 6 10 7 8 9 10 11 12 13
Finally counter value: 13
В этом примере ожидается, что значение counter
будет равно 20, но из-за состояния гонки оно может быть меньше этого значения.
Причина в том, что операция counter += 1
не атомарна и состоит из нескольких шагов: чтение значения, увеличение и запись обратно. Если один поток читает значение, а другой записывает новое значение до того, как первый поток успел записать увеличенное значение, происходит потеря данных.
Lock - Блокировка
В многопоточном программировании часто возникает необходимость в синхронизации доступа к общим ресурсам. Если несколько потоков одновременно пытаются изменить один и тот же объект, это может привести к непредсказуемым результатам и ошибкам. Для решения этой проблемы в Python используется механизм блокировок, в частности объект Lock
из модуля threading
.
Что такое блокировка и для чего она используется
Блокировка (Lock) — это примитив синхронизации, который позволяет только одному потоку в определенный момент времени владеть блокировкой и, соответственно, иметь доступ к общему ресурсу. Когда поток захватывает блокировку, никакой другой поток не может её захватить, пока она не будет освобождена. Это гарантирует, что только один поток может выполнять критическую секцию кода в любой момент времени, предотвращая состояние гонки (race condition) и обеспечивая согласованность данных.
Lock используется для защиты общих ресурсов от одновременного доступа нескольких потоков. Это может быть полезно в следующих ситуациях:
- Изменение общих переменных: Если несколько потоков пытаются изменить одну и ту же переменную, это может привести к непредсказуемым результатам. Блокировка гарантирует, что только один поток может изменять переменную в любой момент времени.
- Работа с файлами: Если несколько потоков пытаются одновременно писать в один и тот же файл, это может привести к повреждению файла. Блокировка гарантирует, что только один поток может писать в файл в любой момент времени.
- Доступ к базам данных: Если несколько потоков пытаются одновременно получить доступ к базе данных, это может привести к конфликтам. Блокировка гарантирует, что только один поток может получить доступ к базе данных в любой момент времени.
Методы объекта Lock
Объект Lock
из модуля threading
предоставляет два основных метода:
acquire(blocking=True, timeout=-1)
: Захватывает блокировку.blocking=True
(по умолчанию): Если блокировка занята другим потоком, текущий поток будет заблокирован до тех пор, пока блокировка не будет освобождена.blocking=False
: Если блокировка занята другим потоком, методacquire()
немедленно вернетFalse
без блокировки.timeout
: Максимальное время ожидания блокировки в секундах. Если блокировка не может быть захвачена в течение этого времени, методacquire()
вернетFalse
.
release()
: Освобождает блокировку. Этот метод должен вызываться только тем потоком, который захватил блокировку..locked()
: ВозвращаетTrue
, еслиLock
заблокирован, иFalse
в противном случае.
Использованием методов Lock
Рассмотрим пример, где мы используем методы acquire
и release
для предотвращения состояния гонки:
python
from threading import Thread, Lock
import time
import random
counter = 0
lock = Lock()
def increment(amount_repeat):
global counter
for _ in range(amount_repeat):
time.sleep(random.random()/5)
lock.acquire()
data = counter
time.sleep(random.random()/5)
data += 1
counter = data
lock.release()
print(f'{counter} ', end='')
print(f"Start counter value: {counter}")
thread_1 = Thread(target=increment, args=(10, ))
thread_1.start()
thread_2 = Thread(target=increment, args=(10, ))
thread_2.start()
thread_1.join()
thread_2.join()
print(f"\nFinally counter value: {counter}")
Вывод:
Start counter value: 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Finally counter value: 20
В этом примере мы используем Lock
для защиты общей переменной counter
от одновременного доступа нескольких потоков. Каждый поток увеличивает значение counter
. Перед изменением counter
поток захватывает блокировку с помощью lock.acquire()
, а после изменения освобождает блокировку с помощью lock.release()
. Это гарантирует, что только один поток может изменять counter
в любой момент времени, что предотвращает состояние гонки.
Использованием контекстного менеджера with
Для упрощения работы с блокировками можно использовать контекстный менеджер with
. В этом случае блокировка автоматически захватывается при входе в блок with
и освобождается при выходе из него, даже если возникает исключение. Это делает код более чистым и безопасным:
python
from threading import Thread, Lock
import time
import random
counter = 0
lock = Lock()
def increment(amount_repeat):
global counter
for _ in range(amount_repeat):
with lock:
data = counter
time.sleep(random.random()/5)
data += 1
counter = data
print(f'{counter} ', end='')
print(f"Start counter value: {counter}")
thread_1 = Thread(target=increment, args=(10, ))
thread_1.start()
thread_2 = Thread(target=increment, args=(10, ))
thread_2.start()
thread_1.join()
thread_2.join()
print(f"\nFinally counter value: {counter}")
Вывод:
Start counter value: 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Finally counter value: 20
В этом примере блокировка автоматически захватывается при входе в блок with
и освобождается при выходе из него, что упрощает управление блокировками и снижает риск ошибок.
Использование метода locked()
Используя исключительно метод locked()
, необходимо изменить подход к управлению блокировкой. Метод locked()
возвращает True
, если блокировка уже захвачена, и False
в противном случае. Однако он сам не захватывает и не освобождает блокировку.
Метод locked()
можно использовать для проверки, захвачена ли блокировка в данный момент:
python
from threading import Thread, Lock
def check_lock():
if lock.locked():
print("The thread is held")
else:
print("The thread is free")
lock = Lock()
Thread(target=lambda: lock.acquire()).start() # thread hold
Thread(target=check_lock).start() # thread check
Вывод:
The thread is held
В этом примере первый поток захватывает блокировку, а второй поток через 0.5 секунды проверяет, удерживается ли блокировка с помощью метода locked()
. Если блокировка захвачена, выводится соответствующее сообщение. Этот метод полезен, когда нужно узнать текущее состояние блокировки перед выполнением какой-либо операции.
Чтобы использовать locked()
, для примеров синхронизации данных, нужно будет внести несколько изменений в логику программы. В данном случае мы будем использовать метод locked()
для проверки состояния блокировки, но саму блокировку будем обрабатывать вручную. Следующий код демонстрирует, как это можно сделать:
python
from threading import Thread, Lock
import time
import random
counter = 0
lock = Lock()
def increment(amount_repeat):
global counter
for _ in range(amount_repeat):
while True:
if not lock.locked(): # Проверяем, не захвачена ли блокировка
lock.acquire() # Ручное захватывание блокировки
try:
data = counter
time.sleep(random.random() / 5)
data += 1
counter = data
print(f'{counter} ', end='')
finally:
lock.release() # Освобождаем блокировку в любом случае
break # Выход из цикла, если операция выполнена успешно
else:
time.sleep(0.01) # Если блокировка захвачена, подождать 10 мс
print(f"Start counter value: {counter}")
thread_1 = Thread(target=increment, args=(10,))
thread_1.start()
thread_2 = Thread(target=increment, args=(10,))
thread_2.start()
thread_1.join()
thread_2.join()
print(f"\nFinally counter value: {counter}")
Вывод:
Start counter value: 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Finally counter value: 20
Проверка состояния блокировки: Используется lock.locked()
для проверки, захвачена ли блокировка другим потоком. Если блокировка не захвачена (not lock.locked()
возвращает True
), то текущий поток может захватить её.
Захват блокировки: Если блокировка свободна, поток захватывает её с помощью метода lock.acquire()
. Это гарантирует, что только один поток может модифицировать переменную counter
в данный момент времени.
Обновление счетчика: После захвата блокировки поток обновляет значение counter
и выводит его на экран.
Освобождение блокировки: Используется finally
для гарантированного освобождения блокировки lock.release()
независимо от того, возникло ли какое-либо исключение в блоке try
.
Цикл ожидания: Если блокировка захвачена, поток ждет 10 миллисекунд, прежде чем снова попробовать захватить блокировку. Это помогает избежать активного ожидания, снижая нагрузку на процессор.
Этот подход позволяет управлять блокировкой вручную, используя метод locked()
для проверки её состояния, хотя использование with lock:
более предпочтительно для упрощения работы с блокировками.
Использования Lock с timeout
python
from threading import Thread, Lock
import time
def worker(name_thread: str):
acquired = lock.acquire(timeout=2) # Пытаемся захватить блокировку с таймаутом 2 секунды
if acquired:
try:
print(f"\n{name_thread} захватил блокировку", end='')
time.sleep(3)
finally:
lock.release()
print(f"\n{name_thread} освободил блокировку", end='')
else:
print(f"\n{name_thread} не смог захватить блокировку", end='')
lock = Lock()
thread_1 = Thread(target=worker, args=('Thread 1',)).start()
time.sleep(1) # Задержка, чтобы thread_1 успел захватить блокировку
thread_2 = Thread(target=worker, args=('Thread 2',)).start()
Вывод:
Поток Thread 1 захватил блокировку
Поток Thread 1 освободил блокировку
Поток Thread 2 не смог захватить блокировку
В этом примере мы используем lock.acquire(timeout=2)
, чтобы попытаться захватить блокировку с таймаутом 2 секунды. thread1
успешно захватывает блокировку, и thread2
не может её захватить в течение 2 секунд, поэтому thread2
выводит сообщение о неудаче.
Заключение
Блокировки являются важным инструментом для синхронизации доступа к общим ресурсам в многопоточных приложениях. Использование Lock
из модуля threading
в Python позволяет избежать состояния гонки и обеспечить согласованность данных. Контекстный менеджер with
упрощает работу с блокировками, делая код более кратким и читаемым.
Упражнения
Переключение контекста: Напишите программу с тремя потоками, каждый из которых выводит свой идентификатор и задерживается на случайный промежуток времени от 0.1 до 0.5 секунд. Пронаблюдайте порядок вывода сообщений.
Состояние гонки: Создайте два потока, каждый из которых уменьшает значение общего счетчика на 1 в цикле 100000 раз без использования замков. Запустите программу несколько раз и убедитесь, что итоговое значение отличается от ожидаемого.
Блокировка: Модифицируйте предыдущий пример, добавив мьютекс для безопасного изменения счетчика. Убедитесь, что итоговое значение стабильно и соответствует ожидаемому.
Использование контекстных менеджеров: Перепишите предыдущие примеры, используя контекстные менеджеры
with
, чтобы управлять замками более элегантно и безопасно.Избежание состояний гонки при работе с файлами: Разработайте программу, в которой несколько потоков записывают данные в один файл. Используйте замки для предотвращения повреждения данных из-за одновременного доступа.
Анализ производительности: Создайте многопоточное приложение с использованием замков и без них для выполнения одной и той же задачи. Измерьте время выполнения и проанализируйте влияние синхронизации на производительность.
Напишите программу, которая создает два потока. Каждый поток должен читать данные из файла и записывать их в другой файл. Используйте блокировку, чтобы предотвратить конфликты при записи в файл.
Напишите программу, в которой один поток удерживает блокировку в течение 3 секунд, а другой поток пытается захватить эту блокировку с таймаутом в 1 секунду. Обработайте ситуацию, когда второй поток не сможет захватить блокировку.
Создайте программу, в которой один поток захватывает блокировку, а другой поток периодически проверяет с помощью метода
locked()
, удерживается ли блокировка, и выводит соответствующие сообщения в консоль.