diff --git a/network/examples/tcp_loadtest.rs b/network/examples/tcp_loadtest.rs index 9d3d1c6bf1..3d0d346fe7 100644 --- a/network/examples/tcp_loadtest.rs +++ b/network/examples/tcp_loadtest.rs @@ -54,7 +54,7 @@ fn main() -> Result<(), u32> { threads.push(thread::spawn(move || { let mut stream = match TcpStream::connect(addr.as_ref()) { Err(err) => { - total_finished_threads.fetch_add(1, Ordering::Relaxed); + total_finished_threads.fetch_add(1, Ordering::SeqCst); panic!("could not open connection: {}", err); }, Ok(s) => s, @@ -70,15 +70,15 @@ fn main() -> Result<(), u32> { if cur.duration_since(thread_last_sync) >= Duration::from_secs(1) { thread_last_sync = cur; println!("[{}]send: {}MiB/s", i, thread_bytes_send / (1024 * 1024)); - total_bytes_send.fetch_add(thread_bytes_send, Ordering::Relaxed); + total_bytes_send.fetch_add(thread_bytes_send, Ordering::SeqCst); thread_bytes_send = 0; } - total_send_count.fetch_add(1, Ordering::Relaxed); + total_send_count.fetch_add(1, Ordering::SeqCst); let ret = stream.write_all(data[0..(tosend as usize)].as_bytes()); if ret.is_err() { println!("[{}] error: {}", i, ret.err().unwrap()); - total_finished_threads.fetch_add(1, Ordering::Relaxed); + total_finished_threads.fetch_add(1, Ordering::SeqCst); return; } //stream.flush(); @@ -86,7 +86,7 @@ fn main() -> Result<(), u32> { })); } - while total_finished_threads.load(Ordering::Relaxed) < thread_count { + while total_finished_threads.load(Ordering::SeqCst) < thread_count { thread::sleep(Duration::from_millis(10)); } @@ -96,16 +96,16 @@ fn main() -> Result<(), u32> { println!("test ended"); println!( "total send: {}MiB", - total_bytes_send.load(Ordering::Relaxed) / (1024 * 1024) + total_bytes_send.load(Ordering::SeqCst) / (1024 * 1024) ); println!("total time: {}s", dur.as_secs()); println!( "average: {}KiB/s", - total_bytes_send.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64 / 1024 + total_bytes_send.load(Ordering::SeqCst) * 1000 / dur.as_millis() as u64 / 1024 ); println!( "send count: {}/s", - total_send_count.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64 + total_send_count.load(Ordering::SeqCst) * 1000 / dur.as_millis() as u64 ); Ok(()) diff --git a/network/src/participant.rs b/network/src/participant.rs index 66668ca898..afa30266e8 100644 --- a/network/src/participant.rs +++ b/network/src/participant.rs @@ -399,7 +399,7 @@ impl BParticipant { self.open_stream_channels.lock().await.take(); trace!("Stop send_mgr"); self.shutdown_barrier - .fetch_sub(Self::BARR_SEND, Ordering::Relaxed); + .fetch_sub(Self::BARR_SEND, Ordering::SeqCst); } #[allow(clippy::too_many_arguments)] @@ -536,12 +536,12 @@ impl BParticipant { } trace!("receiving no longer possible, closing all streams"); for (_, si) in self.streams.write().await.drain() { - si.send_closed.store(true, Ordering::Relaxed); + si.send_closed.store(true, Ordering::SeqCst); self.metrics.streams_closed(&self.remote_pid_string); } trace!("Stop recv_mgr"); self.shutdown_barrier - .fetch_sub(Self::BARR_RECV, Ordering::Relaxed); + .fetch_sub(Self::BARR_RECV, Ordering::SeqCst); } async fn create_channel_mgr( @@ -584,7 +584,7 @@ impl BParticipant { .await; trace!("Stop create_channel_mgr"); self.shutdown_barrier - .fetch_sub(Self::BARR_CHANNEL, Ordering::Relaxed); + .fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst); } /// sink shutdown: @@ -618,7 +618,7 @@ impl BParticipant { let wait_for_manager = || async { let mut sleep = 0.01f64; loop { - let bytes = self.shutdown_barrier.load(Ordering::Relaxed); + let bytes = self.shutdown_barrier.load(Ordering::SeqCst); if bytes == 0 { break; } @@ -635,7 +635,7 @@ impl BParticipant { { let lock = self.streams.read().await; for si in lock.values() { - si.send_closed.store(true, Ordering::Relaxed); + si.send_closed.store(true, Ordering::SeqCst); } } @@ -693,7 +693,7 @@ impl BParticipant { let stream = { self.streams.write().await.remove(&sid) }; match stream { Some(si) => { - si.send_closed.store(true, Ordering::Relaxed); + si.send_closed.store(true, Ordering::SeqCst); si.b2a_msg_recv_s.lock().await.close(); }, None => { diff --git a/network/src/scheduler.rs b/network/src/scheduler.rs index 5d02c24876..fa4ca0fa14 100644 --- a/network/src/scheduler.rs +++ b/network/src/scheduler.rs @@ -334,7 +334,7 @@ impl Scheduler { trace!("Start scheduler_shutdown_mgr"); a2s_scheduler_shutdown_r.await.unwrap(); info!("Shutdown of scheduler requested"); - self.closed.store(true, Ordering::Relaxed); + self.closed.store(true, Ordering::SeqCst); debug!("Shutting down all BParticipants gracefully"); let mut participants = self.participants.lock().await; let waitings = participants