Learn Crossbeam: A Rust Concurrency Library

Okay, here’s a comprehensive article about the Crossbeam library in Rust, aiming for approximately 5000 words:

Learn Crossbeam: A Deep Dive into Rust’s Concurrency Powerhouse

Rust is renowned for its memory safety and fearless concurrency. While the standard library provides fundamental building blocks for concurrent programming (like std::thread, std::sync::Mutex, std::sync::Arc, and channels), managing complex concurrent operations, especially those involving data sharing and synchronization, can quickly become intricate and error-prone. This is where Crossbeam steps in.

Crossbeam is not a single library, but rather a collection of Rust crates that provide a suite of powerful, efficient, and well-tested tools for concurrent and parallel programming. It builds upon the foundations laid by the standard library, offering higher-level abstractions and specialized data structures that simplify common concurrency patterns while maintaining Rust’s commitment to performance and safety.

This article is a deep dive into Crossbeam, exploring its core components, their functionalities, and practical examples. We’ll cover:

  1. Why Crossbeam? The Need for Advanced Concurrency Tools
  2. Core Crossbeam Crates: A Detailed Overview
    • crossbeam-utils: Fundamental Utilities
    • crossbeam-channel: Advanced Channels
    • crossbeam-deque: Work-Stealing Deques
    • crossbeam-epoch: Epoch-Based Reclamation
    • crossbeam-queue: Lock-free Queues
    • crossbeam-skiplist: Concurrent Skip List
  3. Practical Examples and Use Cases
    • Implementing a Work-Stealing Thread Pool
    • Building a High-Performance Logging System
    • Concurrent Data Processing with Epoch-Based Reclamation
    • Lock-free data structures
  4. Understanding the Underlying Mechanisms
    • Atomic Operations and Memory Ordering
    • Lock-Free vs. Wait-Free vs. Obstruction-Free
    • Hazard Pointers and Epoch-Based Reclamation
  5. Best Practices and Considerations
  6. Comparison with Other Concurrency Libraries
  7. The Future of Crossbeam

1. Why Crossbeam? The Need for Advanced Concurrency Tools

Rust’s ownership and borrowing system, along with its standard library concurrency primitives, are sufficient for many basic concurrent tasks. However, as concurrency requirements become more sophisticated, several challenges arise:

  • Deadlocks and Race Conditions: Improper use of mutexes and other synchronization mechanisms can easily lead to deadlocks (where threads are indefinitely blocked waiting for each other) or race conditions (where the outcome of an operation depends on the unpredictable order of thread execution).
  • Performance Bottlenecks: Excessive locking can significantly degrade performance, especially in highly concurrent scenarios. Contention for mutexes can become a major bottleneck.
  • Complexity: Managing shared mutable state across multiple threads, even with the standard library tools, can become very complex, increasing the likelihood of bugs.
  • Memory Management in Lock-Free Data Structures: Implementing lock-free data structures (which avoid the overhead of mutexes) requires careful attention to memory management to prevent use-after-free errors and other memory safety issues.

Crossbeam addresses these challenges by providing:

  • Higher-Level Abstractions: Crossbeam’s crates offer abstractions that encapsulate common concurrency patterns, making it easier to write correct and efficient concurrent code without getting bogged down in low-level details.
  • Lock-Free and Wait-Free Data Structures: Crossbeam includes several lock-free and wait-free data structures, which can significantly improve performance in high-contention scenarios by eliminating the need for locking.
  • Memory Reclamation Mechanisms: Crossbeam provides mechanisms like epoch-based reclamation to safely manage memory in lock-free data structures, preventing memory leaks and use-after-free errors.
  • Well-Tested and Optimized Code: Crossbeam is extensively tested and optimized for performance, giving you confidence in its reliability and efficiency.

2. Core Crossbeam Crates: A Detailed Overview

Crossbeam is a collection of crates, each addressing a specific aspect of concurrent programming. Let’s examine the key crates in detail:

2.1 crossbeam-utils: Fundamental Utilities

crossbeam-utils provides a set of fundamental utilities that underpin other Crossbeam crates and are also useful in their own right. Key components include:

  • thread::scope: This is arguably the most important feature of crossbeam-utils. It allows you to spawn threads that can borrow data from the parent thread’s stack without requiring 'static lifetimes. This is a significant improvement over std::thread::spawn, which requires all borrowed data to have a 'static lifetime (meaning it must live for the entire duration of the program).

    “`rust
    use crossbeam_utils::thread;

    fn main() {
    let numbers = vec![1, 2, 3];

    thread::scope(|s| {
        s.spawn(|_| {
            println!("Here are some numbers: {:?}", numbers);
        });
        s.spawn(|_| {
            for number in &numbers {
                println!("Another thread: {}", number);
            }
        });
    }).unwrap(); // Handle potential panics in spawned threads.
    

    }
    “`

    The scope function guarantees that all spawned threads will complete before the scope exits, ensuring that any borrowed data remains valid. This eliminates a common source of lifetime errors in concurrent Rust code. The unwrap() call is used to propagate any panics that occur within the spawned threads to the main thread.

  • atomic::AtomicCell: Provides atomic operations on types that may not be directly supported by the standard library’s atomic types (e.g., AtomicUsize, AtomicBool). It allows you to treat a regular value as atomic, but with some limitations (it cannot be used with types that implement Drop).

  • atomic::Shared and atomic::Owned: These types are fundamental to Crossbeam’s lock-free data structures. Shared represents a shared pointer (similar to Arc), while Owned represents an owned pointer (similar to Box). They are used in conjunction with atomic operations to manage memory safely in concurrent contexts.

  • CachePadded: A utility for preventing false sharing. False sharing occurs when unrelated data items happen to reside on the same cache line, causing unnecessary cache coherence traffic between CPU cores. CachePadded adds padding to a struct to ensure that it occupies a full cache line, reducing false sharing.

    “`rust
    use crossbeam_utils::CachePadded;

    [derive(Default)]

    struct Counter {
    count: CachePadded,
    }
    “`

2.2 crossbeam-channel: Advanced Channels

crossbeam-channel provides a more feature-rich and performant alternative to the standard library’s std::sync::mpsc channels. It offers several channel types and powerful selection capabilities.

  • Channel Types:

    • unbounded(): Creates an unbounded channel, where the sender can send an unlimited number of messages without blocking. The receiver will block only if the channel is empty.
    • bounded(capacity): Creates a bounded channel with a specified capacity. The sender will block if the channel is full, and the receiver will block if the channel is empty.
    • select!: A powerful macro for selecting operations on multiple channels. This allows you to wait for data on multiple channels simultaneously, or to perform non-blocking send and receive operations.

    “`rust
    use crossbeam_channel::{unbounded, bounded, select};
    use std::thread;
    use std::time::Duration;

    fn main() {
    let (s1, r1) = unbounded();
    let (s2, r2) = bounded(1);

    thread::spawn(move || {
        s1.send("Hello from unbounded channel!").unwrap();
        thread::sleep(Duration::from_millis(100));
        s2.send("Hello from bounded channel!").unwrap();
    });
    
    loop {
        select! {
            recv(r1) -> msg => println!("Received from r1: {:?}", msg),
            recv(r2) -> msg => println!("Received from r2: {:?}", msg),
            default(Duration::from_millis(50)) => println!("No message received yet..."),
        }
    }
    

    }
    “`

    The select! macro allows you to handle messages from multiple channels in a non-blocking and efficient way. The default case allows you to specify a timeout, preventing the loop from blocking indefinitely.

  • Performance: crossbeam-channel is generally faster than std::sync::mpsc, especially in high-contention scenarios. This is due to its optimized implementation, which uses lock-free data structures and minimizes synchronization overhead.

  • Disconnect Handling: Crossbeam channels provide robust handling of disconnected senders and receivers. When a sender or receiver is dropped, the other end of the channel will receive an error, allowing for graceful handling of channel closures.

2.3 crossbeam-deque: Work-Stealing Deques

crossbeam-deque provides work-stealing deques (double-ended queues), a crucial component for implementing efficient work-stealing thread pools.

  • Work-Stealing: In a work-stealing thread pool, each thread has its own local deque of tasks. When a thread runs out of work, it can “steal” tasks from the deques of other threads. This helps to balance the workload and improve performance.

  • Injector, Worker, and Stealer:

    • Injector: A global queue that threads can push tasks onto.
    • Worker: A local deque associated with a specific thread. The thread primarily pushes and pops tasks from its own Worker.
    • Stealer: Allows a thread to steal tasks from another thread’s Worker.

    “`rust
    use crossbeam_deque::{Injector, Worker};
    use std::thread;
    use crossbeam_utils::thread::scope;

    fn main() {
    let injector = Injector::new();
    let num_threads = 4;

    scope(|s| {
        // Create worker threads.
        for _ in 0..num_threads {
            let worker = Worker::new_fifo(); // First-In, First-Out for local tasks.
            let stealer = worker.stealer();
            let injector_clone = injector.clone();
    
            s.spawn(move |_| {
                // Push some initial tasks to the local worker.
                for i in 0..10 {
                    worker.push(i);
                }
    
                loop {
                    // Try to pop a task from the local worker.
                    if let Some(task) = worker.pop() {
                        println!("Thread {:?} processing task: {}", thread::current().id(), task);
                    } else {
                        // If local worker is empty, try to steal from the injector.
                        if let Some(task) = injector_clone.steal_batch_and_pop(&worker) {
                                println!("Thread {:?} stole task from injector: {}", thread::current().id(), task);
                        } else if let Ok(task) = stealer.steal() { // or, try to steal from another thread.
                             println!("Thread {:?} stole task: {}", thread::current().id(), task);
    
                        } else {
                            // No tasks available, yield to other threads.
                            thread::yield_now();
                        }
                    }
                }
            });
        }
    
        // Push some tasks to the global injector.
        for i in 100..110 {
            injector.push(i);
        }
    }).unwrap();
    

    }
    “`

    This example demonstrates a simplified work-stealing thread pool. Each thread has a Worker for local tasks and can steal tasks from the global Injector or other threads’ Workers using a Stealer. The steal_batch_and_pop method is used to efficiently steal a batch of tasks from the injector.

2.4 crossbeam-epoch: Epoch-Based Reclamation

crossbeam-epoch provides a powerful mechanism for managing memory in lock-free data structures: epoch-based reclamation. This is crucial for preventing use-after-free errors and memory leaks.

  • The Problem: In lock-free data structures, multiple threads can access and modify shared data concurrently without locks. When a thread removes a node from the data structure, it cannot immediately free the memory associated with that node, because another thread might still be accessing it.
  • Epoch-Based Reclamation: Epoch-based reclamation solves this problem by introducing the concept of epochs. Each thread has a local epoch, and a global epoch is maintained.

    • When a thread wants to remove a node, it marks the node as “retired” and associates it with the current global epoch.
    • A thread can only free retired nodes that belong to an epoch older than the minimum epoch of all active threads. This ensures that no thread is still accessing the node.
    • Periodically, the global epoch is advanced.
  • Key Components:

    • Guard: Represents a thread’s participation in the epoch system. A Guard must be created before accessing shared data and dropped when the thread is finished.
    • Atomic: A shared pointer that can be atomically updated. It’s used to point to data within the lock-free data structure.
    • Owned: Represents owned data, analogous to a Box.
    • Shared: Represents shared data, analogous to an Arc.
    • defer: Schedules a function to be executed when it’s safe to reclaim the memory associated with a retired node.
    • pin: Creates a Guard for the current thread, marking it as active in the epoch system.

    “`rust
    use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
    use std::sync::atomic::Ordering::Relaxed;

    [derive(Debug)]

    struct Node {
    data: T,
    next: Atomic>,
    }

    pub struct TreiberStack {
    head: Atomic>,
    }

    impl TreiberStack {
    pub fn new() -> TreiberStack {
    TreiberStack {
    head: Atomic::null(),
    }
    }

    pub fn push(&self, t: T) {
        let mut n = Owned::new(Node {
            data: t,
            next: Atomic::null(),
        });
    
        let guard = epoch::pin();
    
        loop {
            let head = self.head.load(Relaxed, &guard);
            n.next.store(head, Relaxed);
    
            match self.head.compare_exchange(head, n, Relaxed, Relaxed, &guard) {
                Ok(_) => break,
                Err(e) => n = e.new, // Retry with the updated 'n'.
            }
        }
    }
    
    pub fn pop(&self) -> Option<T> {
        let guard = epoch::pin();
        loop {
            let head = self.head.load(Relaxed, &guard);
    
            match unsafe { head.as_ref() } {
                Some(h) => {
                    let next = h.next.load(Relaxed, &guard);
    
                    match self
                        .head
                        .compare_exchange(head, next, Relaxed, Relaxed, &guard)
                    {
                        Ok(_) => unsafe {
                            //We have exclusive access to the node, so it's safe to read its data.
                            guard.defer_destroy(head); // Schedule the node for destruction.
                            return Some(ptr::read(&(*h).data));
                        },
                        Err(_) => continue, // Retry.
                    }
                }
                None => return None, // The stack is empty.
            }
        }
    }
    

    }

    impl Drop for TreiberStack {
    fn drop(&mut self) {
    while self.pop().is_some() {}
    }
    }

    use std::ptr;
    use crossbeam_utils::thread::scope;

    fn main() {

    let stack = TreiberStack::new();
    
    scope(|scope|{
        for _ in 0..10 {
            scope.spawn(|_|{
                for i in 0..100 {
                    stack.push(i);
                    stack.pop();
                }
            });
        }
    
    }).unwrap();
    

    }
    ``
    This example implements a lock-free Treiber stack using
    crossbeam-epoch.pushadds a new node to the stack, andpopremoves the top node. The crucial part is the use ofguard.defer_destroy(head)inpop. This schedules the removed node for destruction, but the actual deallocation will only happen when it's safe, as determined by the epoch-based reclamation system. Thecompare_exchangemethod is used to atomically update thehead` pointer, ensuring thread safety.

2.5 crossbeam-queue: Lock-free Queues

crossbeam-queue provides implementations of lock-free queues, offering high performance for concurrent enqueue and dequeue operations.

  • ArrayQueue: A bounded, array-based lock-free queue. It’s very efficient when the capacity is known in advance.
  • SegQueue: An unbounded, segmented lock-free queue. It grows dynamically as needed, making it suitable for situations where the number of elements is not known beforehand.

“`rust
use crossbeam_queue::{ArrayQueue, SegQueue};
use std::thread;
use crossbeam_utils::thread::scope;
fn main() {
// Bounded ArrayQueue
let queue = ArrayQueue::new(10);

    scope(|s|{
        s.spawn(|_| {
            for i in 0..5 {
                queue.push(i).unwrap();
            }
        });

        s.spawn(|_|{
            for _ in 0..5 {
                if let Ok(value) = queue.pop() {
                     println!("Popped from ArrayQueue: {}", value);
                }
            }
        });
    }).unwrap();

    // Unbounded SegQueue
    let queue = SegQueue::new();

    scope(|s|{
        s.spawn(|_| {
            for i in 0..5 {
                queue.push(i);
            }
        });

        s.spawn(|_| {
            for _ in 0..5 {
                if let Ok(value) = queue.pop() {
                    println!("Popped from SegQueue: {}", value);
                }
            }

        });
    }).unwrap();

}

``
These examples demonstrate the basic usage of
ArrayQueue(bounded) andSegQueue(unbounded). Thepushandpop` operations are lock-free, allowing for concurrent access from multiple threads without the overhead of mutexes.

2.6 crossbeam-skiplist: Concurrent Skip List

crossbeam-skiplist provides a concurrent skip list implementation. A skip list is a probabilistic data structure that offers logarithmic time complexity for search, insertion, and deletion operations, similar to balanced trees, but often with simpler implementation and better concurrency characteristics.

  • SkipMap: A concurrent, ordered map (key-value store) implemented using a skip list.
  • SkipSet: A concurrent, ordered set implemented using a skip list.

“`rust
use crossbeam_skiplist::SkipMap;
use std::thread;

fn main() {
let map = SkipMap::new();

// Insert some key-value pairs from multiple threads.
let thread1 = thread::spawn({
    let map = map.clone();
    move || {
        for i in 0..10 {
            map.insert(i, i * 2);
        }
    }
});

let thread2 = thread::spawn({
    let map = map.clone();
    move || {
        for i in 10..20 {
            map.insert(i, i * 3);
        }
    }
});
thread1.join().unwrap();
thread2.join().unwrap();

// Iterate over the map and print its contents.
 for entry in map.iter() {
    println!("Key: {}, Value: {}", entry.key(), entry.value());
}

//Check if an entry exist
assert!(map.contains_key(&10));

//Remove an entry
map.remove(&5);
assert!(!map.contains_key(&5));

}
``
This example showcases the
SkipMap. Multiple threads can concurrently insert key-value pairs into the map. Theiter()method allows you to iterate over the entries in the map in sorted order. Thecontains_keyandremove` methods allow for efficient checking and deletion of map entries.

3. Practical Examples and Use Cases

Let’s explore some more detailed practical examples and use cases for Crossbeam:

3.1 Implementing a Work-Stealing Thread Pool

We’ve already touched on a basic work-stealing thread pool using crossbeam-deque. Here’s a more complete example:

“`rust
use crossbeam_deque::{Injector, Worker, Stealer};
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_utils::thread::scope;

// A task that can be executed by the thread pool.
type Task = Box;

pub struct ThreadPool {
injector: Injector,
workers: Vec>,
stealers: Vec>,
num_threads: usize,
}

impl ThreadPool {
pub fn new(num_threads: usize) -> Self {
assert!(num_threads > 0);

    let injector = Injector::new();
    let mut workers = Vec::with_capacity(num_threads);
    let mut stealers = Vec::with_capacity(num_threads);

    for _ in 0..num_threads {
        let worker = Worker::new_fifo();
        stealers.push(worker.stealer());
        workers.push(worker);
    }

    ThreadPool {
        injector,
        workers,
        stealers,
        num_threads,
    }
}

pub fn execute<F>(&self, f: F)
where
    F: FnOnce() + Send + 'static,
{
    let task = Box::new(f);
    self.injector.push(task);
}

pub fn start(&self) {
  scope(|scope|{
      for i in 0..self.num_threads {
          let worker = self.workers[i].clone();
          let stealers = self.stealers.clone();
          let injector = self.injector.clone();

          scope.spawn(move |_| {
              loop {
                  // Try local worker first.
                  if let Some(task) = worker.pop() {
                      task();
                  } else {
                    //Try stealing from others
                    let mut task_found = false;
                      for stealer in &stealers {
                          if let Ok(task) = stealer.steal() {
                              task();
                              task_found = true;
                              break;
                          }
                      }
                     if !task_found { // Try stealing from injector, if we can't steal from threads.
                          if let Some(task) = injector.steal_batch_and_pop(&worker){
                            task();
                          }
                      } else {
                        // Yield to other threads.
                          thread::yield_now();
                      }

                  }
              }
          });
      }

    }).unwrap();

}

}
impl Drop for ThreadPool{
fn drop(&mut self){
//To-Do, properly shutdown the pool.
// 1) Signal all threads to stop.
// 2) Join all threads.
println!(“Dropping thread pool”);

}

}

fn main() {
let pool = Arc::new(ThreadPool::new(4));
pool.start();
let counter = Arc::new(AtomicUsize::new(0));

for _ in 0..1000 {
    let counter_clone = counter.clone();
    let pool_clone = Arc::clone(&pool); // Clone the Arc for each task.
    pool_clone.execute(move || {
        counter_clone.fetch_add(1, Ordering::SeqCst);
    });
}

// Wait for all tasks to complete (this is a simple example,
// a real thread pool would have a shutdown mechanism).
while counter.load(Ordering::SeqCst) < 1000 {
    thread::yield_now();
}

println!("Counter: {}", counter.load(Ordering::SeqCst));

}

“`

This more robust thread pool example demonstrates:

  • Creating a ThreadPool with a specified number of worker threads.
  • Using Injector, Worker, and Stealer for work distribution and stealing.
  • Using Arc and AtomicUsize to safely share a counter between threads.
  • A start method that spawns the threads, and keeps them running.
  • A basic execute method to submit tasks to the pool.
  • A Drop implementation (incomplete, needs a proper shutdown mechanism).

3.2 Building a High-Performance Logging System

Crossbeam channels are excellent for building high-performance logging systems. You can use a dedicated thread to handle log messages, avoiding blocking the main application threads.

“`rust
use crossbeam_channel::{unbounded, Sender};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::Mutex;
use lazy_static::lazy_static;

// Define a log message struct.

[derive(Debug)]

pub enum LogMessage {
Info(String),
Warning(String),
Error(String),
}

// A simple logger that uses a Crossbeam channel.
pub struct Logger {
sender: Sender,
}

// Use lazy_static to create a global, lazily initialized logger.
lazy_static! {
pub static ref LOGGER: Logger = {
let (sender, receiver) = unbounded();

    // Spawn a dedicated logger thread.
    thread::spawn(move || {
       let file = OpenOptions::new()
            .create(true)
            .append(true)
            .open("app.log").unwrap();

        let mut file_mutex = Mutex::new(file);

        for msg in receiver { // This loop runs indefinitely, processing log messages.
            let timestamp = SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap()
                .as_secs();
            let log_entry = match msg {
                LogMessage::Info(text) => format!("[INFO] {}: {}\n", timestamp, text),
                LogMessage::Warning(text) => format!("[WARNING] {}: {}\n", timestamp, text),
                LogMessage::Error(text) => format!("[ERROR] {}: {}\n", timestamp, text),
            };
            //Use a Mutex to write to the file.
            if let Ok(mut file) = file_mutex.lock() {
                if let Err(e) = file.write_all(log_entry.as_bytes()) {
                    eprintln!("Failed to write to log file: {}", e); // Error handling is crucial.
                }
                file.flush().unwrap(); // Ensure the log entry is written to disk.
            }
        }
    });
    Logger { sender }
};

}

// Helper methods to log messages.
impl Logger {
pub fn info(&self, message: String) {
self.sender.send(LogMessage::Info(message)).unwrap();
}

pub fn warning(&self, message: String) {
    self.sender.send(LogMessage::Warning(message)).unwrap();
}

pub fn error(&self, message: String) {
    self.sender.send(LogMessage::Error(message)).unwrap();
}

}
fn main() {
LOGGER.info(“Starting the application…”.to_string());
LOGGER.warning(“This is a warning message.”.to_string());
LOGGER.error(“An error occurred!”.to_string());

 // Simulate some work that generates log messages.
for i in 0..10 {
    LOGGER.info(format!("Processing item: {}", i));
    thread::sleep(std::time::Duration::from_millis(10));
}

thread::sleep(std::time::Duration::from_millis(1000)); //Let the logger thread process all messages
LOGGER.info("Application finished.".to_string());

}

“`

Key features of this logging system:

  • Uses a dedicated thread to handle log writing, preventing blocking of application threads.
  • Uses an unbounded crossbeam-channel to send log messages to the logger thread.
  • Formats log messages with timestamps.
  • Writes log messages to a file (“app.log”).
  • Uses lazy_static for a global, easily accessible logger instance.
  • Handles potential errors during file writing.
  • Uses a Mutex to synchronize file writing.

3.3 Concurrent Data Processing with Epoch-Based Reclamation

This example demonstrates how to use crossbeam-epoch for concurrent data processing where nodes might be added and removed frequently. We’ll create a simple concurrent linked list.

“`rust
use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering::{Acquire, Release, Relaxed};
use std::ptr;
use crossbeam_utils::thread::scope;

[derive(Debug)]

struct Node {
data: T,
next: Atomic>,
}

pub struct ConcurrentList {
head: Atomic>,
}

impl ConcurrentList {
pub fn new() -> Self {
ConcurrentList {
head: Atomic::null(),
}
}

// Add a new node to the head of the list.
pub fn push_front(&self, data: T) {
    let mut new_node = Owned::new(Node {
        data,
        next: Atomic::null(),
    });

    let guard = &epoch::pin();

    loop {
        let head = self.head.load(Relaxed, guard);
        new_node.next.store(head, Relaxed);

        match self.head.compare_exchange(head, new_node, Release, Relaxed, guard) {
            Ok(_) => break,
            Err(e) => new_node = e.new, // Retry with updated 'new_node'.
        }
    }
}

// Remove nodes that satisfy a predicate.
pub fn remove_if<F>(&self, predicate: F)
where
    F: Fn(&T) -> bool,
{
    let guard = &epoch::pin();

    let mut current = self.head.load(Acquire, guard);

    loop {
        match unsafe { current.as_ref() } {
            Some(curr_node) => {
                if predicate(&curr_node.data) {
                    // Node matches the predicate, try to remove it.
                    let next = curr_node.next.load(Relaxed, guard);

                    match self.head.compare_exchange(current, next, Release, Relaxed, guard) {
                        Ok(_) => {
                            // Successfully removed the node, schedule it for deletion.
                            unsafe { guard.defer_destroy(current) };
                            current = next; // Continue from the next node.
                        }
                        Err(e) => {
                            // Another thread modified the list, start over from the head.
                            current = e.current;
                        }
                    }
                } else {
                    // Node doesn't match, move to the next node.
                     current = curr_node.next.load(Relaxed, guard);
                }
            }
            None => break, // Reached the end of the list.
        }
    }
}

 // Iterate through the list (read-only).
pub fn iter(&self) -> Vec<T>
where
    T: Clone,
{
    let guard = &epoch::pin();
    let mut current = self.head.load(Acquire, guard);
    let mut result = Vec::new();

    while let Some(node) = unsafe { current.as_ref() } {
        result.push(node.data.clone());
        current = node.next.load(Acquire, guard);
    }

    result
}

}

impl Drop for ConcurrentList {
fn drop(&mut self) {
// Clean up all nodes in the list.
let guard = &epoch::pin();
let mut current = self.head.load(Relaxed, guard);
while let Some(node) = unsafe { current.as_ref() } {
let next = node.next.load(Relaxed, guard);
unsafe {
guard.defer_destroy(current);
}
current = next;
}

}

}

fn main() {
let list = ConcurrentList::new();
scope(|s|{

    s.spawn(|_|{
        for i in 0..10 {
            list.push_front(i);
        }
    });
    s.spawn(|_|{
        for i in 10..20{
            list.push_front(i);
        }
    });

}).unwrap();

// Remove all even numbers.
list.remove_if(|x| x % 2 == 0);
 // Print the remaining elements.
println!("Remaining elements: {:?}", list.iter());

scope(|s|{

    s.spawn(|_|{
        for i in 0..10 {
            list.push_front(i);
        }
    });
    s.spawn(|_|{
        for i in 10..20{
            list.push_front(i);
        }
    });

}).unwrap();

  // Remove all multiples of 3.
list.remove_if(|x| x % 3 == 0);
println!("Remaining elements: {:?}", list.iter());

}
“`

This example demonstrates:

  • Creating a concurrent linked list using Atomic pointers.
  • Using push_front to add elements to the list.
  • Using remove_if to remove elements based on a predicate, safely using defer_destroy to schedule nodes for deletion.
  • Using iter to traverse the list
  • Drop implementation to clean up the list.
  • Using crossbeam::scope to spawn threads that safely borrow the list.

3.4 Lock-free Data Structures Examples

We have already seen how to implement a lock-free stack. Here’s another example demonstrating a lock-free counter:

“`rust

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top