change some Ordering::Relaxed to Ordering::SeqCst when we do not want to have it moved/or taken effects from other threads.

some id increases are kept Relaxed, SeqCst shouldn't be necessary there.
Not sure about the bool checks in api.rs
This commit is contained in:
Marcel Märtens 2021-04-07 23:17:09 +02:00
parent fd77966293
commit ea5a02d7cd
3 changed files with 16 additions and 16 deletions

View File

@ -54,7 +54,7 @@ fn main() -> Result<(), u32> {
threads.push(thread::spawn(move || {
let mut stream = match TcpStream::connect(addr.as_ref()) {
Err(err) => {
total_finished_threads.fetch_add(1, Ordering::Relaxed);
total_finished_threads.fetch_add(1, Ordering::SeqCst);
panic!("could not open connection: {}", err);
},
Ok(s) => s,
@ -70,15 +70,15 @@ fn main() -> Result<(), u32> {
if cur.duration_since(thread_last_sync) >= Duration::from_secs(1) {
thread_last_sync = cur;
println!("[{}]send: {}MiB/s", i, thread_bytes_send / (1024 * 1024));
total_bytes_send.fetch_add(thread_bytes_send, Ordering::Relaxed);
total_bytes_send.fetch_add(thread_bytes_send, Ordering::SeqCst);
thread_bytes_send = 0;
}
total_send_count.fetch_add(1, Ordering::Relaxed);
total_send_count.fetch_add(1, Ordering::SeqCst);
let ret = stream.write_all(data[0..(tosend as usize)].as_bytes());
if ret.is_err() {
println!("[{}] error: {}", i, ret.err().unwrap());
total_finished_threads.fetch_add(1, Ordering::Relaxed);
total_finished_threads.fetch_add(1, Ordering::SeqCst);
return;
}
//stream.flush();
@ -86,7 +86,7 @@ fn main() -> Result<(), u32> {
}));
}
while total_finished_threads.load(Ordering::Relaxed) < thread_count {
while total_finished_threads.load(Ordering::SeqCst) < thread_count {
thread::sleep(Duration::from_millis(10));
}
@ -96,16 +96,16 @@ fn main() -> Result<(), u32> {
println!("test ended");
println!(
"total send: {}MiB",
total_bytes_send.load(Ordering::Relaxed) / (1024 * 1024)
total_bytes_send.load(Ordering::SeqCst) / (1024 * 1024)
);
println!("total time: {}s", dur.as_secs());
println!(
"average: {}KiB/s",
total_bytes_send.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64 / 1024
total_bytes_send.load(Ordering::SeqCst) * 1000 / dur.as_millis() as u64 / 1024
);
println!(
"send count: {}/s",
total_send_count.load(Ordering::Relaxed) * 1000 / dur.as_millis() as u64
total_send_count.load(Ordering::SeqCst) * 1000 / dur.as_millis() as u64
);
Ok(())

View File

@ -399,7 +399,7 @@ impl BParticipant {
self.open_stream_channels.lock().await.take();
trace!("Stop send_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_SEND, Ordering::Relaxed);
.fetch_sub(Self::BARR_SEND, Ordering::SeqCst);
}
#[allow(clippy::too_many_arguments)]
@ -536,12 +536,12 @@ impl BParticipant {
}
trace!("receiving no longer possible, closing all streams");
for (_, si) in self.streams.write().await.drain() {
si.send_closed.store(true, Ordering::Relaxed);
si.send_closed.store(true, Ordering::SeqCst);
self.metrics.streams_closed(&self.remote_pid_string);
}
trace!("Stop recv_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_RECV, Ordering::Relaxed);
.fetch_sub(Self::BARR_RECV, Ordering::SeqCst);
}
async fn create_channel_mgr(
@ -584,7 +584,7 @@ impl BParticipant {
.await;
trace!("Stop create_channel_mgr");
self.shutdown_barrier
.fetch_sub(Self::BARR_CHANNEL, Ordering::Relaxed);
.fetch_sub(Self::BARR_CHANNEL, Ordering::SeqCst);
}
/// sink shutdown:
@ -618,7 +618,7 @@ impl BParticipant {
let wait_for_manager = || async {
let mut sleep = 0.01f64;
loop {
let bytes = self.shutdown_barrier.load(Ordering::Relaxed);
let bytes = self.shutdown_barrier.load(Ordering::SeqCst);
if bytes == 0 {
break;
}
@ -635,7 +635,7 @@ impl BParticipant {
{
let lock = self.streams.read().await;
for si in lock.values() {
si.send_closed.store(true, Ordering::Relaxed);
si.send_closed.store(true, Ordering::SeqCst);
}
}
@ -693,7 +693,7 @@ impl BParticipant {
let stream = { self.streams.write().await.remove(&sid) };
match stream {
Some(si) => {
si.send_closed.store(true, Ordering::Relaxed);
si.send_closed.store(true, Ordering::SeqCst);
si.b2a_msg_recv_s.lock().await.close();
},
None => {

View File

@ -334,7 +334,7 @@ impl Scheduler {
trace!("Start scheduler_shutdown_mgr");
a2s_scheduler_shutdown_r.await.unwrap();
info!("Shutdown of scheduler requested");
self.closed.store(true, Ordering::Relaxed);
self.closed.store(true, Ordering::SeqCst);
debug!("Shutting down all BParticipants gracefully");
let mut participants = self.participants.lock().await;
let waitings = participants