Okay, here’s a comprehensive article about the Crossbeam library in Rust, aiming for approximately 5000 words:
Learn Crossbeam: A Deep Dive into Rust’s Concurrency Powerhouse
Rust is renowned for its memory safety and fearless concurrency. While the standard library provides fundamental building blocks for concurrent programming (like std::thread
, std::sync::Mutex
, std::sync::Arc
, and channels), managing complex concurrent operations, especially those involving data sharing and synchronization, can quickly become intricate and error-prone. This is where Crossbeam steps in.
Crossbeam is not a single library, but rather a collection of Rust crates that provide a suite of powerful, efficient, and well-tested tools for concurrent and parallel programming. It builds upon the foundations laid by the standard library, offering higher-level abstractions and specialized data structures that simplify common concurrency patterns while maintaining Rust’s commitment to performance and safety.
This article is a deep dive into Crossbeam, exploring its core components, their functionalities, and practical examples. We’ll cover:
- Why Crossbeam? The Need for Advanced Concurrency Tools
- Core Crossbeam Crates: A Detailed Overview
crossbeam-utils
: Fundamental Utilitiescrossbeam-channel
: Advanced Channelscrossbeam-deque
: Work-Stealing Dequescrossbeam-epoch
: Epoch-Based Reclamationcrossbeam-queue
: Lock-free Queuescrossbeam-skiplist
: Concurrent Skip List
- Practical Examples and Use Cases
- Implementing a Work-Stealing Thread Pool
- Building a High-Performance Logging System
- Concurrent Data Processing with Epoch-Based Reclamation
- Lock-free data structures
- Understanding the Underlying Mechanisms
- Atomic Operations and Memory Ordering
- Lock-Free vs. Wait-Free vs. Obstruction-Free
- Hazard Pointers and Epoch-Based Reclamation
- Best Practices and Considerations
- Comparison with Other Concurrency Libraries
- The Future of Crossbeam
1. Why Crossbeam? The Need for Advanced Concurrency Tools
Rust’s ownership and borrowing system, along with its standard library concurrency primitives, are sufficient for many basic concurrent tasks. However, as concurrency requirements become more sophisticated, several challenges arise:
- Deadlocks and Race Conditions: Improper use of mutexes and other synchronization mechanisms can easily lead to deadlocks (where threads are indefinitely blocked waiting for each other) or race conditions (where the outcome of an operation depends on the unpredictable order of thread execution).
- Performance Bottlenecks: Excessive locking can significantly degrade performance, especially in highly concurrent scenarios. Contention for mutexes can become a major bottleneck.
- Complexity: Managing shared mutable state across multiple threads, even with the standard library tools, can become very complex, increasing the likelihood of bugs.
- Memory Management in Lock-Free Data Structures: Implementing lock-free data structures (which avoid the overhead of mutexes) requires careful attention to memory management to prevent use-after-free errors and other memory safety issues.
Crossbeam addresses these challenges by providing:
- Higher-Level Abstractions: Crossbeam’s crates offer abstractions that encapsulate common concurrency patterns, making it easier to write correct and efficient concurrent code without getting bogged down in low-level details.
- Lock-Free and Wait-Free Data Structures: Crossbeam includes several lock-free and wait-free data structures, which can significantly improve performance in high-contention scenarios by eliminating the need for locking.
- Memory Reclamation Mechanisms: Crossbeam provides mechanisms like epoch-based reclamation to safely manage memory in lock-free data structures, preventing memory leaks and use-after-free errors.
- Well-Tested and Optimized Code: Crossbeam is extensively tested and optimized for performance, giving you confidence in its reliability and efficiency.
2. Core Crossbeam Crates: A Detailed Overview
Crossbeam is a collection of crates, each addressing a specific aspect of concurrent programming. Let’s examine the key crates in detail:
2.1 crossbeam-utils
: Fundamental Utilities
crossbeam-utils
provides a set of fundamental utilities that underpin other Crossbeam crates and are also useful in their own right. Key components include:
-
thread::scope
: This is arguably the most important feature ofcrossbeam-utils
. It allows you to spawn threads that can borrow data from the parent thread’s stack without requiring'static
lifetimes. This is a significant improvement overstd::thread::spawn
, which requires all borrowed data to have a'static
lifetime (meaning it must live for the entire duration of the program).“`rust
use crossbeam_utils::thread;fn main() {
let numbers = vec![1, 2, 3];thread::scope(|s| { s.spawn(|_| { println!("Here are some numbers: {:?}", numbers); }); s.spawn(|_| { for number in &numbers { println!("Another thread: {}", number); } }); }).unwrap(); // Handle potential panics in spawned threads.
}
“`The
scope
function guarantees that all spawned threads will complete before the scope exits, ensuring that any borrowed data remains valid. This eliminates a common source of lifetime errors in concurrent Rust code. Theunwrap()
call is used to propagate any panics that occur within the spawned threads to the main thread. -
atomic::AtomicCell
: Provides atomic operations on types that may not be directly supported by the standard library’s atomic types (e.g.,AtomicUsize
,AtomicBool
). It allows you to treat a regular value as atomic, but with some limitations (it cannot be used with types that implementDrop
). -
atomic::Shared
andatomic::Owned
: These types are fundamental to Crossbeam’s lock-free data structures.Shared
represents a shared pointer (similar toArc
), whileOwned
represents an owned pointer (similar toBox
). They are used in conjunction with atomic operations to manage memory safely in concurrent contexts. -
CachePadded
: A utility for preventing false sharing. False sharing occurs when unrelated data items happen to reside on the same cache line, causing unnecessary cache coherence traffic between CPU cores.CachePadded
adds padding to a struct to ensure that it occupies a full cache line, reducing false sharing.“`rust
use crossbeam_utils::CachePadded;[derive(Default)]
struct Counter {
count: CachePadded,
}
“`
2.2 crossbeam-channel
: Advanced Channels
crossbeam-channel
provides a more feature-rich and performant alternative to the standard library’s std::sync::mpsc
channels. It offers several channel types and powerful selection capabilities.
-
Channel Types:
unbounded()
: Creates an unbounded channel, where the sender can send an unlimited number of messages without blocking. The receiver will block only if the channel is empty.bounded(capacity)
: Creates a bounded channel with a specified capacity. The sender will block if the channel is full, and the receiver will block if the channel is empty.select!
: A powerful macro for selecting operations on multiple channels. This allows you to wait for data on multiple channels simultaneously, or to perform non-blocking send and receive operations.
“`rust
use crossbeam_channel::{unbounded, bounded, select};
use std::thread;
use std::time::Duration;fn main() {
let (s1, r1) = unbounded();
let (s2, r2) = bounded(1);thread::spawn(move || { s1.send("Hello from unbounded channel!").unwrap(); thread::sleep(Duration::from_millis(100)); s2.send("Hello from bounded channel!").unwrap(); }); loop { select! { recv(r1) -> msg => println!("Received from r1: {:?}", msg), recv(r2) -> msg => println!("Received from r2: {:?}", msg), default(Duration::from_millis(50)) => println!("No message received yet..."), } }
}
“`The
select!
macro allows you to handle messages from multiple channels in a non-blocking and efficient way. Thedefault
case allows you to specify a timeout, preventing the loop from blocking indefinitely. -
Performance:
crossbeam-channel
is generally faster thanstd::sync::mpsc
, especially in high-contention scenarios. This is due to its optimized implementation, which uses lock-free data structures and minimizes synchronization overhead. -
Disconnect Handling: Crossbeam channels provide robust handling of disconnected senders and receivers. When a sender or receiver is dropped, the other end of the channel will receive an error, allowing for graceful handling of channel closures.
2.3 crossbeam-deque
: Work-Stealing Deques
crossbeam-deque
provides work-stealing deques (double-ended queues), a crucial component for implementing efficient work-stealing thread pools.
-
Work-Stealing: In a work-stealing thread pool, each thread has its own local deque of tasks. When a thread runs out of work, it can “steal” tasks from the deques of other threads. This helps to balance the workload and improve performance.
-
Injector
,Worker
, andStealer
:Injector
: A global queue that threads can push tasks onto.Worker
: A local deque associated with a specific thread. The thread primarily pushes and pops tasks from its ownWorker
.Stealer
: Allows a thread to steal tasks from another thread’sWorker
.
“`rust
use crossbeam_deque::{Injector, Worker};
use std::thread;
use crossbeam_utils::thread::scope;fn main() {
let injector = Injector::new();
let num_threads = 4;scope(|s| { // Create worker threads. for _ in 0..num_threads { let worker = Worker::new_fifo(); // First-In, First-Out for local tasks. let stealer = worker.stealer(); let injector_clone = injector.clone(); s.spawn(move |_| { // Push some initial tasks to the local worker. for i in 0..10 { worker.push(i); } loop { // Try to pop a task from the local worker. if let Some(task) = worker.pop() { println!("Thread {:?} processing task: {}", thread::current().id(), task); } else { // If local worker is empty, try to steal from the injector. if let Some(task) = injector_clone.steal_batch_and_pop(&worker) { println!("Thread {:?} stole task from injector: {}", thread::current().id(), task); } else if let Ok(task) = stealer.steal() { // or, try to steal from another thread. println!("Thread {:?} stole task: {}", thread::current().id(), task); } else { // No tasks available, yield to other threads. thread::yield_now(); } } } }); } // Push some tasks to the global injector. for i in 100..110 { injector.push(i); } }).unwrap();
}
“`This example demonstrates a simplified work-stealing thread pool. Each thread has a
Worker
for local tasks and can steal tasks from the globalInjector
or other threads’Worker
s using aStealer
. Thesteal_batch_and_pop
method is used to efficiently steal a batch of tasks from the injector.
2.4 crossbeam-epoch
: Epoch-Based Reclamation
crossbeam-epoch
provides a powerful mechanism for managing memory in lock-free data structures: epoch-based reclamation. This is crucial for preventing use-after-free errors and memory leaks.
- The Problem: In lock-free data structures, multiple threads can access and modify shared data concurrently without locks. When a thread removes a node from the data structure, it cannot immediately free the memory associated with that node, because another thread might still be accessing it.
-
Epoch-Based Reclamation: Epoch-based reclamation solves this problem by introducing the concept of epochs. Each thread has a local epoch, and a global epoch is maintained.
- When a thread wants to remove a node, it marks the node as “retired” and associates it with the current global epoch.
- A thread can only free retired nodes that belong to an epoch older than the minimum epoch of all active threads. This ensures that no thread is still accessing the node.
- Periodically, the global epoch is advanced.
-
Key Components:
Guard
: Represents a thread’s participation in the epoch system. AGuard
must be created before accessing shared data and dropped when the thread is finished.Atomic
: A shared pointer that can be atomically updated. It’s used to point to data within the lock-free data structure.Owned
: Represents owned data, analogous to a Box.Shared
: Represents shared data, analogous to an Arc.defer
: Schedules a function to be executed when it’s safe to reclaim the memory associated with a retired node.pin
: Creates aGuard
for the current thread, marking it as active in the epoch system.
“`rust
use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering::Relaxed;[derive(Debug)]
struct Node
{
data: T,
next: Atomic>,
}pub struct TreiberStack
{
head: Atomic>,
}impl
TreiberStack {
pub fn new() -> TreiberStack{
TreiberStack {
head: Atomic::null(),
}
}pub fn push(&self, t: T) { let mut n = Owned::new(Node { data: t, next: Atomic::null(), }); let guard = epoch::pin(); loop { let head = self.head.load(Relaxed, &guard); n.next.store(head, Relaxed); match self.head.compare_exchange(head, n, Relaxed, Relaxed, &guard) { Ok(_) => break, Err(e) => n = e.new, // Retry with the updated 'n'. } } } pub fn pop(&self) -> Option<T> { let guard = epoch::pin(); loop { let head = self.head.load(Relaxed, &guard); match unsafe { head.as_ref() } { Some(h) => { let next = h.next.load(Relaxed, &guard); match self .head .compare_exchange(head, next, Relaxed, Relaxed, &guard) { Ok(_) => unsafe { //We have exclusive access to the node, so it's safe to read its data. guard.defer_destroy(head); // Schedule the node for destruction. return Some(ptr::read(&(*h).data)); }, Err(_) => continue, // Retry. } } None => return None, // The stack is empty. } } }
}
impl
Drop for TreiberStack {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}use std::ptr;
use crossbeam_utils::thread::scope;fn main() {
let stack = TreiberStack::new(); scope(|scope|{ for _ in 0..10 { scope.spawn(|_|{ for i in 0..100 { stack.push(i); stack.pop(); } }); } }).unwrap();
}
``
crossbeam-epoch
This example implements a lock-free Treiber stack using.
pushadds a new node to the stack, and
popremoves the top node. The crucial part is the use of
guard.defer_destroy(head)in
pop. This schedules the removed node for destruction, but the actual deallocation will only happen when it's safe, as determined by the epoch-based reclamation system. The
compare_exchangemethod is used to atomically update the
head` pointer, ensuring thread safety.
2.5 crossbeam-queue
: Lock-free Queues
crossbeam-queue
provides implementations of lock-free queues, offering high performance for concurrent enqueue and dequeue operations.
ArrayQueue
: A bounded, array-based lock-free queue. It’s very efficient when the capacity is known in advance.SegQueue
: An unbounded, segmented lock-free queue. It grows dynamically as needed, making it suitable for situations where the number of elements is not known beforehand.
“`rust
use crossbeam_queue::{ArrayQueue, SegQueue};
use std::thread;
use crossbeam_utils::thread::scope;
fn main() {
// Bounded ArrayQueue
let queue = ArrayQueue::new(10);
scope(|s|{
s.spawn(|_| {
for i in 0..5 {
queue.push(i).unwrap();
}
});
s.spawn(|_|{
for _ in 0..5 {
if let Ok(value) = queue.pop() {
println!("Popped from ArrayQueue: {}", value);
}
}
});
}).unwrap();
// Unbounded SegQueue
let queue = SegQueue::new();
scope(|s|{
s.spawn(|_| {
for i in 0..5 {
queue.push(i);
}
});
s.spawn(|_| {
for _ in 0..5 {
if let Ok(value) = queue.pop() {
println!("Popped from SegQueue: {}", value);
}
}
});
}).unwrap();
}
``
ArrayQueue
These examples demonstrate the basic usage of(bounded) and
SegQueue(unbounded). The
pushand
pop` operations are lock-free, allowing for concurrent access from multiple threads without the overhead of mutexes.
2.6 crossbeam-skiplist
: Concurrent Skip List
crossbeam-skiplist
provides a concurrent skip list implementation. A skip list is a probabilistic data structure that offers logarithmic time complexity for search, insertion, and deletion operations, similar to balanced trees, but often with simpler implementation and better concurrency characteristics.
SkipMap
: A concurrent, ordered map (key-value store) implemented using a skip list.SkipSet
: A concurrent, ordered set implemented using a skip list.
“`rust
use crossbeam_skiplist::SkipMap;
use std::thread;
fn main() {
let map = SkipMap::new();
// Insert some key-value pairs from multiple threads.
let thread1 = thread::spawn({
let map = map.clone();
move || {
for i in 0..10 {
map.insert(i, i * 2);
}
}
});
let thread2 = thread::spawn({
let map = map.clone();
move || {
for i in 10..20 {
map.insert(i, i * 3);
}
}
});
thread1.join().unwrap();
thread2.join().unwrap();
// Iterate over the map and print its contents.
for entry in map.iter() {
println!("Key: {}, Value: {}", entry.key(), entry.value());
}
//Check if an entry exist
assert!(map.contains_key(&10));
//Remove an entry
map.remove(&5);
assert!(!map.contains_key(&5));
}
``
SkipMap
This example showcases the. Multiple threads can concurrently insert key-value pairs into the map. The
iter()method allows you to iterate over the entries in the map in sorted order. The
contains_keyand
remove` methods allow for efficient checking and deletion of map entries.
3. Practical Examples and Use Cases
Let’s explore some more detailed practical examples and use cases for Crossbeam:
3.1 Implementing a Work-Stealing Thread Pool
We’ve already touched on a basic work-stealing thread pool using crossbeam-deque
. Here’s a more complete example:
“`rust
use crossbeam_deque::{Injector, Worker, Stealer};
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_utils::thread::scope;
// A task that can be executed by the thread pool.
type Task = Box
pub struct ThreadPool {
injector: Injector
workers: Vec
stealers: Vec
num_threads: usize,
}
impl ThreadPool {
pub fn new(num_threads: usize) -> Self {
assert!(num_threads > 0);
let injector = Injector::new();
let mut workers = Vec::with_capacity(num_threads);
let mut stealers = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
workers.push(worker);
}
ThreadPool {
injector,
workers,
stealers,
num_threads,
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let task = Box::new(f);
self.injector.push(task);
}
pub fn start(&self) {
scope(|scope|{
for i in 0..self.num_threads {
let worker = self.workers[i].clone();
let stealers = self.stealers.clone();
let injector = self.injector.clone();
scope.spawn(move |_| {
loop {
// Try local worker first.
if let Some(task) = worker.pop() {
task();
} else {
//Try stealing from others
let mut task_found = false;
for stealer in &stealers {
if let Ok(task) = stealer.steal() {
task();
task_found = true;
break;
}
}
if !task_found { // Try stealing from injector, if we can't steal from threads.
if let Some(task) = injector.steal_batch_and_pop(&worker){
task();
}
} else {
// Yield to other threads.
thread::yield_now();
}
}
}
});
}
}).unwrap();
}
}
impl Drop for ThreadPool{
fn drop(&mut self){
//To-Do, properly shutdown the pool.
// 1) Signal all threads to stop.
// 2) Join all threads.
println!(“Dropping thread pool”);
}
}
fn main() {
let pool = Arc::new(ThreadPool::new(4));
pool.start();
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..1000 {
let counter_clone = counter.clone();
let pool_clone = Arc::clone(&pool); // Clone the Arc for each task.
pool_clone.execute(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
}
// Wait for all tasks to complete (this is a simple example,
// a real thread pool would have a shutdown mechanism).
while counter.load(Ordering::SeqCst) < 1000 {
thread::yield_now();
}
println!("Counter: {}", counter.load(Ordering::SeqCst));
}
“`
This more robust thread pool example demonstrates:
- Creating a
ThreadPool
with a specified number of worker threads. - Using
Injector
,Worker
, andStealer
for work distribution and stealing. - Using
Arc
andAtomicUsize
to safely share a counter between threads. - A
start
method that spawns the threads, and keeps them running. - A basic
execute
method to submit tasks to the pool. - A
Drop
implementation (incomplete, needs a proper shutdown mechanism).
3.2 Building a High-Performance Logging System
Crossbeam channels are excellent for building high-performance logging systems. You can use a dedicated thread to handle log messages, avoiding blocking the main application threads.
“`rust
use crossbeam_channel::{unbounded, Sender};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::Mutex;
use lazy_static::lazy_static;
// Define a log message struct.
[derive(Debug)]
pub enum LogMessage {
Info(String),
Warning(String),
Error(String),
}
// A simple logger that uses a Crossbeam channel.
pub struct Logger {
sender: Sender
}
// Use lazy_static to create a global, lazily initialized logger.
lazy_static! {
pub static ref LOGGER: Logger = {
let (sender, receiver) = unbounded();
// Spawn a dedicated logger thread.
thread::spawn(move || {
let file = OpenOptions::new()
.create(true)
.append(true)
.open("app.log").unwrap();
let mut file_mutex = Mutex::new(file);
for msg in receiver { // This loop runs indefinitely, processing log messages.
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let log_entry = match msg {
LogMessage::Info(text) => format!("[INFO] {}: {}\n", timestamp, text),
LogMessage::Warning(text) => format!("[WARNING] {}: {}\n", timestamp, text),
LogMessage::Error(text) => format!("[ERROR] {}: {}\n", timestamp, text),
};
//Use a Mutex to write to the file.
if let Ok(mut file) = file_mutex.lock() {
if let Err(e) = file.write_all(log_entry.as_bytes()) {
eprintln!("Failed to write to log file: {}", e); // Error handling is crucial.
}
file.flush().unwrap(); // Ensure the log entry is written to disk.
}
}
});
Logger { sender }
};
}
// Helper methods to log messages.
impl Logger {
pub fn info(&self, message: String) {
self.sender.send(LogMessage::Info(message)).unwrap();
}
pub fn warning(&self, message: String) {
self.sender.send(LogMessage::Warning(message)).unwrap();
}
pub fn error(&self, message: String) {
self.sender.send(LogMessage::Error(message)).unwrap();
}
}
fn main() {
LOGGER.info(“Starting the application…”.to_string());
LOGGER.warning(“This is a warning message.”.to_string());
LOGGER.error(“An error occurred!”.to_string());
// Simulate some work that generates log messages.
for i in 0..10 {
LOGGER.info(format!("Processing item: {}", i));
thread::sleep(std::time::Duration::from_millis(10));
}
thread::sleep(std::time::Duration::from_millis(1000)); //Let the logger thread process all messages
LOGGER.info("Application finished.".to_string());
}
“`
Key features of this logging system:
- Uses a dedicated thread to handle log writing, preventing blocking of application threads.
- Uses an unbounded
crossbeam-channel
to send log messages to the logger thread. - Formats log messages with timestamps.
- Writes log messages to a file (“app.log”).
- Uses
lazy_static
for a global, easily accessible logger instance. - Handles potential errors during file writing.
- Uses a Mutex to synchronize file writing.
3.3 Concurrent Data Processing with Epoch-Based Reclamation
This example demonstrates how to use crossbeam-epoch
for concurrent data processing where nodes might be added and removed frequently. We’ll create a simple concurrent linked list.
“`rust
use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
use std::sync::atomic::Ordering::{Acquire, Release, Relaxed};
use std::ptr;
use crossbeam_utils::thread::scope;
[derive(Debug)]
struct Node
data: T,
next: Atomic
}
pub struct ConcurrentList
head: Atomic
}
impl
pub fn new() -> Self {
ConcurrentList {
head: Atomic::null(),
}
}
// Add a new node to the head of the list.
pub fn push_front(&self, data: T) {
let mut new_node = Owned::new(Node {
data,
next: Atomic::null(),
});
let guard = &epoch::pin();
loop {
let head = self.head.load(Relaxed, guard);
new_node.next.store(head, Relaxed);
match self.head.compare_exchange(head, new_node, Release, Relaxed, guard) {
Ok(_) => break,
Err(e) => new_node = e.new, // Retry with updated 'new_node'.
}
}
}
// Remove nodes that satisfy a predicate.
pub fn remove_if<F>(&self, predicate: F)
where
F: Fn(&T) -> bool,
{
let guard = &epoch::pin();
let mut current = self.head.load(Acquire, guard);
loop {
match unsafe { current.as_ref() } {
Some(curr_node) => {
if predicate(&curr_node.data) {
// Node matches the predicate, try to remove it.
let next = curr_node.next.load(Relaxed, guard);
match self.head.compare_exchange(current, next, Release, Relaxed, guard) {
Ok(_) => {
// Successfully removed the node, schedule it for deletion.
unsafe { guard.defer_destroy(current) };
current = next; // Continue from the next node.
}
Err(e) => {
// Another thread modified the list, start over from the head.
current = e.current;
}
}
} else {
// Node doesn't match, move to the next node.
current = curr_node.next.load(Relaxed, guard);
}
}
None => break, // Reached the end of the list.
}
}
}
// Iterate through the list (read-only).
pub fn iter(&self) -> Vec<T>
where
T: Clone,
{
let guard = &epoch::pin();
let mut current = self.head.load(Acquire, guard);
let mut result = Vec::new();
while let Some(node) = unsafe { current.as_ref() } {
result.push(node.data.clone());
current = node.next.load(Acquire, guard);
}
result
}
}
impl
fn drop(&mut self) {
// Clean up all nodes in the list.
let guard = &epoch::pin();
let mut current = self.head.load(Relaxed, guard);
while let Some(node) = unsafe { current.as_ref() } {
let next = node.next.load(Relaxed, guard);
unsafe {
guard.defer_destroy(current);
}
current = next;
}
}
}
fn main() {
let list = ConcurrentList::new();
scope(|s|{
s.spawn(|_|{
for i in 0..10 {
list.push_front(i);
}
});
s.spawn(|_|{
for i in 10..20{
list.push_front(i);
}
});
}).unwrap();
// Remove all even numbers.
list.remove_if(|x| x % 2 == 0);
// Print the remaining elements.
println!("Remaining elements: {:?}", list.iter());
scope(|s|{
s.spawn(|_|{
for i in 0..10 {
list.push_front(i);
}
});
s.spawn(|_|{
for i in 10..20{
list.push_front(i);
}
});
}).unwrap();
// Remove all multiples of 3.
list.remove_if(|x| x % 3 == 0);
println!("Remaining elements: {:?}", list.iter());
}
“`
This example demonstrates:
- Creating a concurrent linked list using
Atomic
pointers. - Using
push_front
to add elements to the list. - Using
remove_if
to remove elements based on a predicate, safely usingdefer_destroy
to schedule nodes for deletion. - Using
iter
to traverse the list Drop
implementation to clean up the list.- Using
crossbeam::scope
to spawn threads that safely borrow the list.
3.4 Lock-free Data Structures Examples
We have already seen how to implement a lock-free stack. Here’s another example demonstrating a lock-free counter:
“`rust