Python threading BoundedSemaphore Class

The threading.BoundedSemaphore class in Python's threading module is a subclass of threading.Semaphore that ensures the semaphore's value does not exceed its initial value. This is useful for preventing programming errors where a semaphore might be released more times than it was acquired.

Table of Contents

  1. Introduction
  2. threading.BoundedSemaphore Class Syntax
  3. Examples
    • Basic Usage
    • Controlling Access to a Resource
    • Using BoundedSemaphore in a Real-World Scenario
  4. Real-World Use Case
  5. Conclusion

Introduction

The threading.BoundedSemaphore class works similarly to threading.Semaphore, with the added guarantee that the semaphore's value cannot exceed its initial value. This helps to prevent common errors in multithreading where a semaphore might be over-released.

threading.BoundedSemaphore Class Syntax

Here is how you create and use a bounded semaphore with the threading.BoundedSemaphore class:

import threading

bounded_semaphore = threading.BoundedSemaphore(value=1)

Parameters:

  • value: The initial value of the semaphore counter. The default is 1.

Methods:

  • acquire(blocking=True, timeout=None): Decrement the counter and block if the counter is zero. If blocking is True (the default), the method will block until the counter is incremented or the optional timeout occurs. If blocking is False, the method will return immediately with True if the semaphore is acquired and False otherwise.
  • release(): Increment the counter and wake up any threads blocked on acquire(). If the counter would exceed its initial value, a ValueError is raised.

Examples

Basic Usage

Create and use a bounded semaphore to synchronize threads.

Example

import threading
import time

bounded_semaphore = threading.BoundedSemaphore(2)

def worker(worker_id):
    print(f"Worker {worker_id} waiting to acquire semaphore")
    with bounded_semaphore:
        print(f"Worker {worker_id} acquired semaphore")
        time.sleep(2)
        print(f"Worker {worker_id} releasing semaphore")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

Controlling Access to a Resource

Use a bounded semaphore to limit access to a resource, such as a database connection pool.

Example

import threading
import time

bounded_semaphore = threading.BoundedSemaphore(3)

def access_resource(thread_id):
    with bounded_semaphore:
        print(f"Thread {thread_id} accessing resource")
        time.sleep(1)
        print(f"Thread {thread_id} releasing resource")

threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(5)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

Using BoundedSemaphore in a Real-World Scenario

Ensure a semaphore does not exceed its initial value in a practical example.

Example

import threading
import time

bounded_semaphore = threading.BoundedSemaphore(2)

def worker(worker_id):
    print(f"Worker {worker_id} waiting to acquire semaphore")
    try:
        with bounded_semaphore:
            print(f"Worker {worker_id} acquired semaphore")
            time.sleep(2)
            print(f"Worker {worker_id} releasing semaphore")
    except ValueError as e:
        print(f"Worker {worker_id} error: {e}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

# Intentionally release the semaphore more than it was acquired
try:
    bounded_semaphore.release()
    bounded_semaphore.release()
    bounded_semaphore.release()
except ValueError as e:
    print(f"Error: {e}")  # Output: Error: Semaphore released too many times

Real-World Use Case

Controlling Access to a Limited Resource

Use a bounded semaphore to control access to a limited resource, such as a pool of connections or a set of limited hardware devices.

Example

import threading
import time

class ConnectionPool:
    def __init__(self, size):
        self.semaphore = threading.BoundedSemaphore(size)
        self.size = size

    def acquire_connection(self):
        self.semaphore.acquire()
        print(f"Acquired a connection, available connections: {self.size - self.semaphore._value}")

    def release_connection(self):
        self.semaphore.release()
        print(f"Released a connection, available connections: {self.size - self.semaphore._value}")

def worker(pool, worker_id):
    pool.acquire_connection()
    time.sleep(2)
    pool.release_connection()

pool = ConnectionPool(3)
threads = [threading.Thread(target=worker, args=(pool, i)) for i in range(6)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

Conclusion

The threading.BoundedSemaphore class is used for managing access to shared resources in multithreaded programs. It ensures that the semaphore's value does not exceed its initial value, helping to prevent common synchronization errors. Proper usage can significantly enhance the reliability and efficiency of your concurrent applications.

Comments

Spring Boot 3 Paid Course Published for Free
on my Java Guides YouTube Channel

Subscribe to my YouTube Channel (165K+ subscribers):
Java Guides Channel

Top 10 My Udemy Courses with Huge Discount:
Udemy Courses - Ramesh Fadatare